nikkie-ftnextの日記

イベントレポートや読書メモを発信

Spring BatchのGuide「Creating a Batch Service」で素振り

はじめに

推しってほんと、わくわく! nikkieです。

Spring Batchという技術があることを知り、素振りしました1

目次

Creating a Batch Service

You will build a service that imports data from a CSV spreadsheet, transforms it with custom code, and stores the final results in a database.

CSVファイルに人名(姓と名)があり、それを全部大文字の表記に変換し、データベース(インメモリ)に保存します。

ソースコードはこちら。
completeディレクトリの下に完成形のコードがありました。

素振りの地はこちら

Spring Initializr

https://start.spring.io/ から雛形を作ります。

設定値の控え

  • Projectは、Maven
  • Languageは、Java
  • Spring Bootは、3.3.2
  • ArtifactとNameは「batchprocessing」
    • Package nameは com.example.batchprocessing
  • Javaのバージョンは雛形をダウンロードした後、pom.xmlを編集して20に変えています
  • Dependencies
    • Spring Batch
    • HyperSQL Database

扱うデータ

入力する人名のCSV

Jill,Doe
Joe,Doe

出力先のテーブル定義(抜粋)
src/main/resources/schema-all.sql

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);

Spring Bootは起動時にschema-all.sqlを実行するそうです。便利だ

Javaプログラム上での中間データ

public record Person(String firstName, String lastName) {}

reader / processor / writer

以下のような分担です

  • reader: 人名のCSVファイルを読み込む
  • processor: 全部大文字の表記の人名に変換
  • writer: データベース(インメモリ)に保存

src/main/java/com/example/batchprocessing/BatchConfiguration.java

@Configuration
public class BatchConfiguration {
    // 後述
}

reader / processor / writer をシグネチャで示します

@Configuration
public class BatchConfiguration {
    @Bean
    public FlatFileItemReader<Person> reader() { }

    @Bean
    public PersonItemProcessor processor() { }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) { }
}
  • readerは、中間データのPersonクラスを返すFlatFileItemReader
    • 今回はヘッダがないCSVですが、ここでfirstNameの列、lastNameの列と意味合いが与えられています
  • processorは、中間データのPersonクラスを受け取り、中間データのPersonクラスを返すPersonItemProcessor(独自定義)
    • 姓名を大文字(upper case)にしたPersonを返します
  • writerは、中間データのPersonクラスを書き出すJdbcBatchItemWriter
    • Person 1件をINSERTするSQLが与えられます

reader / processor / writer と、データ1件(ここではCSV1行)あたりの処理を記しているわけですね

StepからなるJobにする

BatchConfiguration.java の実装は続きます

@Configuration
public class BatchConfiguration {
    @Bean
    public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {
        return new JobBuilder("importUserJob", jobRepository)
                .listener(listener)
                .start(step1)
                .build();
    }

    @Bean
    public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
                      FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
        return new StepBuilder("step1", jobRepository)
                .<Person, Person>chunk(3, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
  • Jobは1つ以上のStepからなる
  • Stepは、reader / processor / writer を持つ
    • これらはBatchConfigurationクラスに定義したメソッド!
    • chunkで一度に3件ずつ処理すると指定(1件あたりの処理は、3件まとめて実行される!)

上で定義した reader / processor / writer をStepにまとめあげ、そこからJobを構成しています。

JobCompletionNotificationListener も自分で定義します。
src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java
Jobが完了したときに、インメモリデータベースのデータを表示します

いざ実行

main() 関数を実装します。
src/main/java/com/example/batchprocessing/BatchProcessingApplication.java
これで実行可能になりました。

mvnwはInitializrで作った雛形に含まれました。

 % java -version
openjdk version "20.0.2" 2023-07-18
OpenJDK Runtime Environment (build 20.0.2+9-78)
OpenJDK 64-Bit Server VM (build 20.0.2+9-78, mixed mode, sharing)

% ./mvnw clean package

% java -jar target/batchprocessing-0.0.1-SNAPSHOT.jar 

実行中のログです

2024-08-16T22:04:20.760+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.b.BatchProcessingApplication         : Started BatchProcessingApplication in 0.803 seconds (process running for 1.057)
2024-08-16T22:04:20.762+09:00  INFO 29791 --- [batchprocessing] [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2024-08-16T22:04:20.785+09:00  INFO 29791 --- [batchprocessing] [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=importUserJob]] launched with the following parameters: [{}]
2024-08-16T22:04:20.796+09:00  INFO 29791 --- [batchprocessing] [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
2024-08-16T22:04:20.805+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.batchprocessing.PersonItemProcessor  : Converting (Person[firstName=Jill, lastName=Doe]) into (Person[firstName=JILL, lastName=DOE])
2024-08-16T22:04:20.805+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.batchprocessing.PersonItemProcessor  : Converting (Person[firstName=Joe, lastName=Doe]) into (Person[firstName=JOE, lastName=DOE])
2024-08-16T22:04:20.805+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.batchprocessing.PersonItemProcessor  : Converting (Person[firstName=Justin, lastName=Doe]) into (Person[firstName=JUSTIN, lastName=DOE])
2024-08-16T22:04:20.810+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.batchprocessing.PersonItemProcessor  : Converting (Person[firstName=Jane, lastName=Doe]) into (Person[firstName=JANE, lastName=DOE])
2024-08-16T22:04:20.810+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.batchprocessing.PersonItemProcessor  : Converting (Person[firstName=John, lastName=Doe]) into (Person[firstName=JOHN, lastName=DOE])
2024-08-16T22:04:20.812+09:00  INFO 29791 --- [batchprocessing] [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 15ms
2024-08-16T22:04:20.815+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.b.JobCompletionNotificationListener  : !!! JOB FINISHED! Time to verify the results
2024-08-16T22:04:20.817+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.b.JobCompletionNotificationListener  : Found <{Person[firstName=JILL, lastName=DOE]}> in the database.
2024-08-16T22:04:20.817+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.b.JobCompletionNotificationListener  : Found <{Person[firstName=JOE, lastName=DOE]}> in the database.
2024-08-16T22:04:20.817+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.b.JobCompletionNotificationListener  : Found <{Person[firstName=JUSTIN, lastName=DOE]}> in the database.
2024-08-16T22:04:20.817+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.b.JobCompletionNotificationListener  : Found <{Person[firstName=JANE, lastName=DOE]}> in the database.
2024-08-16T22:04:20.817+09:00  INFO 29791 --- [batchprocessing] [           main] c.e.b.JobCompletionNotificationListener  : Found <{Person[firstName=JOHN, lastName=DOE]}> in the database.
2024-08-16T22:04:20.819+09:00  INFO 29791 --- [batchprocessing] [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=importUserJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 24ms

大文字の人名がデータベースに入っていますね(listenerのログ出力より)

終わりに

初めてのSpring Batchで、CSVファイルを読み込み、データを加工して、データベースに保存しました。

データ1件あたりの処理を reader / processor / writer に書き、それらをまとめたStepからJobを組み上げる。
これは、レゴブロックを組合せていく2感じがあります。

Springにはバッチ処理用のフレームワークがあるというのは興味深いです(Pythonではこれまで出会っていないような3


  1. ちなみに前回のSpringの記事です
  2. Kubeflowもレゴブロック!(再利用簡単
  3. Pythonはワークフローエンジンが豊富な気がします(パッケージマネージャくらい種類が豊富)