「どんどんレコード(行)が追加されていく複数のテキストデータを一定間隔(1秒)で読んで、追加されたレコードをDBに登録していく」バッチ処理(Spring Batch 6)の話の最後。
一気に、実際の実行部のソースを貼っておきます。
いや、俺も勉強中でしっかり中身が理解できているわけではないので(^^;;;
でも、とりあえず「ネットに公開されているソースをパチって実際に動かして内容を理解したい」って人もいるでしょ?(俺はもろにそっち系(笑))
■バッチ処理全体構成・設定クラス(BatchConfig.java)※Bean 定義クラス
package com.netandfield.test;import jakarta.persistence.EntityManagerFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.core.job.Job;import org.springframework.batch.core.job.builder.JobBuilder;import org.springframework.batch.core.repository.JobRepository;import org.springframework.batch.core.step.Step;import org.springframework.batch.core.step.builder.StepBuilder;import org.springframework.batch.infrastructure.item.ItemProcessor;import org.springframework.batch.infrastructure.item.database.JpaItemWriter;import org.springframework.batch.infrastructure.item.database.builder.JpaItemWriterBuilder;import org.springframework.batch.infrastructure.item.file.FlatFileItemReader;import org.springframework.batch.infrastructure.item.file.builder.FlatFileItemReaderBuilder;import org.springframework.batch.infrastructure.item.file.transform.FixedLengthTokenizer;import org.springframework.batch.infrastructure.item.file.transform.Range;import org.springframework.batch.infrastructure.item.support.SynchronizedItemStreamReader;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;import org.springframework.transaction.PlatformTransactionManager;@Configurationpublic class BatchConfig {@Beanpublic Job fileProcessJob(JobRepository jobRepository, Step fileProcessStep) {return new JobBuilder("fileProcessJob", jobRepository).start(fileProcessStep).build();}@Beanpublic Step fileProcessStep(JobRepository jobRepository,PlatformTransactionManager transactionManager,SynchronizedItemStreamReader<ProcessedData> synchronizedReader,ItemProcessor<ProcessedData, ProcessedData> processor,JpaItemWriter<ProcessedData> jpaWriter,StepProgressUpdateListener progressListener) {return new StepBuilder("fileProcessStep", jobRepository).<ProcessedData, ProcessedData>chunk(100).transactionManager(transactionManager).reader(synchronizedReader).processor(processor).writer(jpaWriter).listener(progressListener) // リスナーをここで登録!.build();}@Bean@StepScopepublic SynchronizedItemStreamReader<ProcessedData> synchronizedReader(@Value("#{jobParameters['filePath']}") String filePath,@Value("#{jobParameters['linesToSkip']}") Long linesToSkip) {FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();tokenizer.setColumns(new Range(1, 4), new Range(5, 24), new Range(25, 27));tokenizer.setNames("code", "name", "value");FlatFileItemReader<ProcessedData> delegate = new FlatFileItemReaderBuilder<ProcessedData>().name("flatFileItemReader").resource(new FileSystemResource(filePath)).linesToSkip(linesToSkip.intValue()).lineMapper((line, lineNumber) -> {var fieldSet = tokenizer.tokenize(line);ProcessedData data = new ProcessedData();data.setCode(fieldSet.readString("code").trim());data.setName(fieldSet.readString("name").trim());data.setValue(fieldSet.readInt("value"));return data;}).build();return new SynchronizedItemStreamReader<>(delegate);}@Beanpublic ItemProcessor<ProcessedData, ProcessedData> processor() {return item -> {item.setName(item.getName().toUpperCase()); // Name を大文字に変換するreturn item;};}@Beanpublic JpaItemWriter<ProcessedData> jpaWriter(EntityManagerFactory entityManagerFactory) {return new JpaItemWriterBuilder<ProcessedData>().entityManagerFactory(entityManagerFactory).build();}}
■スケジューラ(FileWatchScheduler.java)※ジョブを定期的に実行する Bean
package com.netandfield.test;import java.io.IOException;import java.nio.file.Files;import java.nio.file.Path;import java.nio.file.Paths;import java.time.LocalDateTime;import java.util.List;import org.springframework.batch.core.job.Job;import org.springframework.batch.core.job.parameters.JobParameters;import org.springframework.batch.core.job.parameters.JobParametersBuilder;import org.springframework.batch.core.launch.JobOperator;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;@Component@RequiredArgsConstructor@Slf4jpublic class FileWatchScheduler {// Spring Batch 6 では、JobLauncher の役割を内包した JobOperator を使用private final JobOperator jobOperator;private final Job fileProcessJob;private final FileProgressRepository progressRepository;private final AppProperties appProperties;@Scheduled(fixedDelay = 1000)public void runPeriodicBatchJob() {Path listFile = Paths.get(appProperties.getFilesListPath());if (!Files.exists(listFile)) {log.warn("ファイル一覧が見つかりません: {}", appProperties.getFilesListPath());return;}try {List<String> targetFilePaths = Files.readAllLines(listFile);for (String filePath : targetFilePaths) {if (filePath.trim().isEmpty() || !Files.exists(Paths.get(filePath))) {continue;}processSingleFile(filePath.trim());}} catch (IOException e) {log.error("ファイル一覧の読み込みに失敗しました", e);}}private void processSingleFile(String filePath) {// 1. 進捗管理DBから前回の行数を取得FileProgress progress = progressRepository.findById(filePath).orElseGet(() -> {FileProgress newProgress = new FileProgress();newProgress.setFilePath(filePath);newProgress.setLastReadLines(0);return progressRepository.save(newProgress);});try {// 2. パラメータの組み立てJobParameters jobParameters = new JobParametersBuilder().addString("filePath", filePath).addLong("linesToSkip", (long) progress.getLastReadLines()).addLocalDateTime("runTime", LocalDateTime.now()) // 毎回新規実行にするため.toJobParameters();// 3. [Spring Batch 6 正攻法]// JobOperator が JobLauncher を継承し、run メソッドを直接安全に呼び出せる(警告なし)jobOperator.run(fileProcessJob, jobParameters);// ※進捗(行数)の更新処理は、バッチのライフサイクルに則り「StepProgressUpdateListener」が自動的に実行} catch (Exception e) {log.error("ファイル [{}] のバッチ処理中にエラーが発生しました", filePath, e);}}}
■進捗(行数)更新処理クラス(StepProgressUpdateListener.java)※もちろん Bean
package com.netandfield.test;import org.springframework.batch.core.ExitStatus;import org.springframework.batch.core.listener.StepExecutionListener;import org.springframework.batch.core.step.StepExecution;import org.springframework.stereotype.Component;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;@Component@RequiredArgsConstructor@Slf4jpublic class StepProgressUpdateListener implements StepExecutionListener {private final FileProgressRepository progressRepository;@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {// ステップが正常終了した場合のみ、進捗を更新するif (stepExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {// 起動パラメータからファイルパスを取得String filePath = stepExecution.getJobParameters().getString("filePath");long newlyRead = stepExecution.getReadCount();if (filePath != null && newlyRead > 0) {FileProgress progress = progressRepository.findById(filePath).orElseGet(() -> {FileProgress newProgress = new FileProgress();newProgress.setFilePath(filePath);newProgress.setLastReadLines(0);return newProgress;});progress.setLastReadLines(progress.getLastReadLines() + (int) newlyRead);progressRepository.save(progress);log.info("ファイル [{}] の進捗を更新しました。新たに {} 行処理、累計 {} 行。",filePath, newlyRead, progress.getLastReadLines());}}return stepExecution.getExitStatus();}}
「Spring Batch 6 、だんだん増えていく固定長レコードを処理する」の(1)~(3)に載せているソースを使って新しいプロジェクトを作れば、Spring atch 6 で、「だんだん増えていく固定長レコード」が処理できます(笑)
Spring Batch は ver 6 大きくライブラリ構成などが変わっちゃったり、古い機能が完全に削除されたりしてるので、2026年6月現在の最新の Eclipse Pleiades All in One版を落としてきて Spring Batch の勉強をしようとすると、ネット上に公開されている色々なサンプルソースではまず動かない(^^;;;
AI に「Spring Batch 6 で動かすにはどうすればいいの?」と聞いても、3 やら 5 やらのソースばかり返してくるのよね(^^;;; AI はネット上の情報をかき集めて自分の知識としてるんだけど、そもそもネット上に pring Batch 6 に対応したソースなどの情報が少ないので、AI 自身その関係の知識がないのよね(笑)
でも、このエントリーのソースなら動きます(笑)
さあ、実際に動かして、俺に色々教えてくれたまえ。

コメントする