Spring Batch 6 で、だんだん増えていく固定長レコードを処理する(3)

「どんどんレコード(行)が追加されていく複数のテキストデータを一定間隔(1秒)で読んで、追加されたレコードをDBに登録していく」バッチ処理(Spring Batch 6)の話の最後。

一気に、実際の実行部のソースを貼っておきます。
いや、俺も勉強中でしっかり中身が理解できているわけではないので(^^;;;
でも、とりあえず「ネットに公開されているソースをパチって実際に動かして内容を理解したい」って人もいるでしょ?(俺はもろにそっち系(笑))

■バッチ処理全体構成・設定クラス(BatchConfig.java)※Bean 定義クラス

package com.netandfield.test;

import jakarta.persistence.EntityManagerFactory;

import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.Step;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.infrastructure.item.ItemProcessor;
import org.springframework.batch.infrastructure.item.database.JpaItemWriter;
import org.springframework.batch.infrastructure.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.infrastructure.item.file.FlatFileItemReader;
import org.springframework.batch.infrastructure.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.infrastructure.item.file.transform.FixedLengthTokenizer;
import org.springframework.batch.infrastructure.item.file.transform.Range;
import org.springframework.batch.infrastructure.item.support.SynchronizedItemStreamReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class BatchConfig {

    @Bean
    public Job fileProcessJob(JobRepository jobRepository, Step fileProcessStep) {
        return new JobBuilder("fileProcessJob", jobRepository)
                .start(fileProcessStep)
                .build();
    }

    @Bean
    public Step fileProcessStep(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager,
                                SynchronizedItemStreamReader<ProcessedData> synchronizedReader,
                                ItemProcessor<ProcessedData, ProcessedData> processor,
                                JpaItemWriter<ProcessedData> jpaWriter,
                                StepProgressUpdateListener progressListener) {
        return new StepBuilder("fileProcessStep", jobRepository)
                .<ProcessedData, ProcessedData>chunk(100)
                .transactionManager(transactionManager)
                .reader(synchronizedReader)
                .processor(processor)
                .writer(jpaWriter)
                .listener(progressListener) // リスナーをここで登録!
                .build();
    }

    @Bean
    @StepScope
    public SynchronizedItemStreamReader<ProcessedData> synchronizedReader(
            @Value("#{jobParameters['filePath']}") String filePath,
            @Value("#{jobParameters['linesToSkip']}") Long linesToSkip) {

        FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
        tokenizer.setColumns(new Range(1, 4), new Range(5, 24), new Range(25, 27));
        tokenizer.setNames("code", "name", "value");

        FlatFileItemReader<ProcessedData> delegate = new FlatFileItemReaderBuilder<ProcessedData>()
                .name("flatFileItemReader")
                .resource(new FileSystemResource(filePath))
                .linesToSkip(linesToSkip.intValue())
                .lineMapper((line, lineNumber) -> {
                    var fieldSet = tokenizer.tokenize(line);
                    ProcessedData data = new ProcessedData();
                    data.setCode(fieldSet.readString("code").trim());
                    data.setName(fieldSet.readString("name").trim());
                    data.setValue(fieldSet.readInt("value"));
                    return data;
                })
                .build();

        return new SynchronizedItemStreamReader<>(delegate);
    }

    @Bean
    public ItemProcessor<ProcessedData, ProcessedData> processor() {
        return item -> {
            item.setName(item.getName().toUpperCase()); // Name を大文字に変換する
            return item;
        };
    }

    @Bean
    public JpaItemWriter<ProcessedData> jpaWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<ProcessedData>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }
}


■スケジューラ(FileWatchScheduler.java)※ジョブを定期的に実行する Bean

package com.netandfield.test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.util.List;

import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.parameters.JobParameters;
import org.springframework.batch.core.job.parameters.JobParametersBuilder;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Component
@RequiredArgsConstructor
@Slf4j
public class FileWatchScheduler {

    // Spring Batch 6 では、JobLauncher の役割を内包した JobOperator を使用
    private final JobOperator jobOperator;
    private final Job fileProcessJob;
    private final FileProgressRepository progressRepository;

    private final AppProperties appProperties;

    @Scheduled(fixedDelay = 1000)
    public void runPeriodicBatchJob() {
        Path listFile = Paths.get(appProperties.getFilesListPath());
        if (!Files.exists(listFile)) {
            log.warn("ファイル一覧が見つかりません: {}", appProperties.getFilesListPath());
            return;
        }

        try {
            List<String> targetFilePaths = Files.readAllLines(listFile);
            for (String filePath : targetFilePaths) {
                if (filePath.trim().isEmpty() || !Files.exists(Paths.get(filePath))) {
                    continue;
                }
                processSingleFile(filePath.trim());
            }
        } catch (IOException e) {
            log.error("ファイル一覧の読み込みに失敗しました", e);
        }
    }

    private void processSingleFile(String filePath) {
        // 1. 進捗管理DBから前回の行数を取得
        FileProgress progress = progressRepository.findById(filePath)
                .orElseGet(() -> {
                    FileProgress newProgress = new FileProgress();
                    newProgress.setFilePath(filePath);
                    newProgress.setLastReadLines(0);
                    return progressRepository.save(newProgress);
                });

        try {
            // 2. パラメータの組み立て
            JobParameters jobParameters = new JobParametersBuilder()
                    .addString("filePath", filePath)
                    .addLong("linesToSkip", (long) progress.getLastReadLines())
                    .addLocalDateTime("runTime", LocalDateTime.now()) // 毎回新規実行にするため
                    .toJobParameters();

            // 3. [Spring Batch 6 正攻法]
            // JobOperator が JobLauncher を継承し、run メソッドを直接安全に呼び出せる(警告なし)
            jobOperator.run(fileProcessJob, jobParameters);

            // ※進捗(行数)の更新処理は、バッチのライフサイクルに則り「StepProgressUpdateListener」が自動的に実行

        } catch (Exception e) {
            log.error("ファイル [{}] のバッチ処理中にエラーが発生しました", filePath, e);
        }
    }
}


■進捗(行数)更新処理クラス(StepProgressUpdateListener.java)※もちろん Bean

package com.netandfield.test;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.listener.StepExecutionListener;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Component
@RequiredArgsConstructor
@Slf4j
public class StepProgressUpdateListener implements StepExecutionListener {

    private final FileProgressRepository progressRepository;

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        // ステップが正常終了した場合のみ、進捗を更新する
        if (stepExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
            // 起動パラメータからファイルパスを取得
            String filePath = stepExecution.getJobParameters().getString("filePath");
            long newlyRead = stepExecution.getReadCount();

            if (filePath != null && newlyRead > 0) {
                FileProgress progress = progressRepository.findById(filePath)
                        .orElseGet(() -> {
                            FileProgress newProgress = new FileProgress();
                            newProgress.setFilePath(filePath);
                            newProgress.setLastReadLines(0);
                            return newProgress;
                        });

                progress.setLastReadLines(progress.getLastReadLines() + (int) newlyRead);
                progressRepository.save(progress);
                log.info("ファイル [{}] の進捗を更新しました。新たに {} 行処理、累計 {} 行。",
                        filePath, newlyRead, progress.getLastReadLines());
            }
        }
        return stepExecution.getExitStatus();
    }
}

「Spring Batch 6 、だんだん増えていく固定長レコードを処理する」の(1)~(3)に載せているソースを使って新しいプロジェクトを作れば、Spring atch 6 で、「だんだん増えていく固定長レコード」が処理できます(笑)

Spring Batch は ver 6 大きくライブラリ構成などが変わっちゃったり、古い機能が完全に削除されたりしてるので、2026年6月現在の最新の Eclipse Pleiades All in One版を落としてきて Spring Batch の勉強をしようとすると、ネット上に公開されている色々なサンプルソースではまず動かない(^^;;;

AI に「Spring Batch 6 で動かすにはどうすればいいの?」と聞いても、3 やら 5 やらのソースばかり返してくるのよね(^^;;; AI はネット上の情報をかき集めて自分の知識としてるんだけど、そもそもネット上に pring Batch 6 に対応したソースなどの情報が少ないので、AI 自身その関係の知識がないのよね(笑)

でも、このエントリーのソースなら動きます(笑)
さあ、実際に動かして、俺に色々教えてくれたまえ。

トラックバック(0)

このブログ記事を参照しているブログ一覧: Spring Batch 6 で、だんだん増えていく固定長レコードを処理する(3)

このブログ記事に対するトラックバックURL: https://blog.netandfield.com/mt/mt-tb.cgi/7229

コメントする

このブログ記事について

このページは、shinodaが2026年7月 1日 23:16に書いたブログ記事です。

ひとつ前のブログ記事は「Spring Batch 6 で、だんだん増えていく固定長レコードを処理する(2)」です。

次のブログ記事は「Oracle Database 21c Express Edition (XE) をインストール」です。

最近のコンテンツはインデックスページで見られます。過去に書かれたものはアーカイブのページで見られます。

月別 アーカイブ

電気ウナギ的○○ mobile ver.

携帯版「電気ウナギ的○○」はこちら