プログラミング: 2026年7月アーカイブ

「どんどんレコード(行)が追加されていく複数のテキストデータを一定間隔(1秒)で読んで、追加されたレコードをDBに登録していく」バッチ処理(Spring Batch 6)の話の最後。

一気に、実際の実行部のソースを貼っておきます。
いや、俺も勉強中でしっかり中身が理解できているわけではないので(^^;;;
でも、とりあえず「ネットに公開されているソースをパチって実際に動かして内容を理解したい」って人もいるでしょ?(俺はもろにそっち系(笑))

■バッチ処理全体構成・設定クラス(BatchConfig.java)※Bean 定義クラス

package com.netandfield.test;

import jakarta.persistence.EntityManagerFactory;

import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.Step;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.infrastructure.item.ItemProcessor;
import org.springframework.batch.infrastructure.item.database.JpaItemWriter;
import org.springframework.batch.infrastructure.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.infrastructure.item.file.FlatFileItemReader;
import org.springframework.batch.infrastructure.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.infrastructure.item.file.transform.FixedLengthTokenizer;
import org.springframework.batch.infrastructure.item.file.transform.Range;
import org.springframework.batch.infrastructure.item.support.SynchronizedItemStreamReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class BatchConfig {

    @Bean
    public Job fileProcessJob(JobRepository jobRepository, Step fileProcessStep) {
        return new JobBuilder("fileProcessJob", jobRepository)
                .start(fileProcessStep)
                .build();
    }

    @Bean
    public Step fileProcessStep(JobRepository jobRepository,
                                PlatformTransactionManager transactionManager,
                                SynchronizedItemStreamReader<ProcessedData> synchronizedReader,
                                ItemProcessor<ProcessedData, ProcessedData> processor,
                                JpaItemWriter<ProcessedData> jpaWriter,
                                StepProgressUpdateListener progressListener) {
        return new StepBuilder("fileProcessStep", jobRepository)
                .<ProcessedData, ProcessedData>chunk(100)
                .transactionManager(transactionManager)
                .reader(synchronizedReader)
                .processor(processor)
                .writer(jpaWriter)
                .listener(progressListener) // リスナーをここで登録!
                .build();
    }

    @Bean
    @StepScope
    public SynchronizedItemStreamReader<ProcessedData> synchronizedReader(
            @Value("#{jobParameters['filePath']}") String filePath,
            @Value("#{jobParameters['linesToSkip']}") Long linesToSkip) {

        FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
        tokenizer.setColumns(new Range(1, 4), new Range(5, 24), new Range(25, 27));
        tokenizer.setNames("code", "name", "value");

        FlatFileItemReader<ProcessedData> delegate = new FlatFileItemReaderBuilder<ProcessedData>()
                .name("flatFileItemReader")
                .resource(new FileSystemResource(filePath))
                .linesToSkip(linesToSkip.intValue())
                .lineMapper((line, lineNumber) -> {
                    var fieldSet = tokenizer.tokenize(line);
                    ProcessedData data = new ProcessedData();
                    data.setCode(fieldSet.readString("code").trim());
                    data.setName(fieldSet.readString("name").trim());
                    data.setValue(fieldSet.readInt("value"));
                    return data;
                })
                .build();

        return new SynchronizedItemStreamReader<>(delegate);
    }

    @Bean
    public ItemProcessor<ProcessedData, ProcessedData> processor() {
        return item -> {
            item.setName(item.getName().toUpperCase()); // Name を大文字に変換する
            return item;
        };
    }

    @Bean
    public JpaItemWriter<ProcessedData> jpaWriter(EntityManagerFactory entityManagerFactory) {
        return new JpaItemWriterBuilder<ProcessedData>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }
}


■スケジューラ(FileWatchScheduler.java)※ジョブを定期的に実行する Bean

package com.netandfield.test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.util.List;

import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.parameters.JobParameters;
import org.springframework.batch.core.job.parameters.JobParametersBuilder;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Component
@RequiredArgsConstructor
@Slf4j
public class FileWatchScheduler {

    // Spring Batch 6 では、JobLauncher の役割を内包した JobOperator を使用
    private final JobOperator jobOperator;
    private final Job fileProcessJob;
    private final FileProgressRepository progressRepository;

    private final AppProperties appProperties;

    @Scheduled(fixedDelay = 1000)
    public void runPeriodicBatchJob() {
        Path listFile = Paths.get(appProperties.getFilesListPath());
        if (!Files.exists(listFile)) {
            log.warn("ファイル一覧が見つかりません: {}", appProperties.getFilesListPath());
            return;
        }

        try {
            List<String> targetFilePaths = Files.readAllLines(listFile);
            for (String filePath : targetFilePaths) {
                if (filePath.trim().isEmpty() || !Files.exists(Paths.get(filePath))) {
                    continue;
                }
                processSingleFile(filePath.trim());
            }
        } catch (IOException e) {
            log.error("ファイル一覧の読み込みに失敗しました", e);
        }
    }

    private void processSingleFile(String filePath) {
        // 1. 進捗管理DBから前回の行数を取得
        FileProgress progress = progressRepository.findById(filePath)
                .orElseGet(() -> {
                    FileProgress newProgress = new FileProgress();
                    newProgress.setFilePath(filePath);
                    newProgress.setLastReadLines(0);
                    return progressRepository.save(newProgress);
                });

        try {
            // 2. パラメータの組み立て
            JobParameters jobParameters = new JobParametersBuilder()
                    .addString("filePath", filePath)
                    .addLong("linesToSkip", (long) progress.getLastReadLines())
                    .addLocalDateTime("runTime", LocalDateTime.now()) // 毎回新規実行にするため
                    .toJobParameters();

            // 3. [Spring Batch 6 正攻法]
            // JobOperator が JobLauncher を継承し、run メソッドを直接安全に呼び出せる(警告なし)
            jobOperator.run(fileProcessJob, jobParameters);

            // ※進捗(行数)の更新処理は、バッチのライフサイクルに則り「StepProgressUpdateListener」が自動的に実行

        } catch (Exception e) {
            log.error("ファイル [{}] のバッチ処理中にエラーが発生しました", filePath, e);
        }
    }
}


■進捗(行数)更新処理クラス(StepProgressUpdateListener.java)※もちろん Bean

package com.netandfield.test;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.listener.StepExecutionListener;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Component
@RequiredArgsConstructor
@Slf4j
public class StepProgressUpdateListener implements StepExecutionListener {

    private final FileProgressRepository progressRepository;

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        // ステップが正常終了した場合のみ、進捗を更新する
        if (stepExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
            // 起動パラメータからファイルパスを取得
            String filePath = stepExecution.getJobParameters().getString("filePath");
            long newlyRead = stepExecution.getReadCount();

            if (filePath != null && newlyRead > 0) {
                FileProgress progress = progressRepository.findById(filePath)
                        .orElseGet(() -> {
                            FileProgress newProgress = new FileProgress();
                            newProgress.setFilePath(filePath);
                            newProgress.setLastReadLines(0);
                            return newProgress;
                        });

                progress.setLastReadLines(progress.getLastReadLines() + (int) newlyRead);
                progressRepository.save(progress);
                log.info("ファイル [{}] の進捗を更新しました。新たに {} 行処理、累計 {} 行。",
                        filePath, newlyRead, progress.getLastReadLines());
            }
        }
        return stepExecution.getExitStatus();
    }
}

「Spring Batch 6 、だんだん増えていく固定長レコードを処理する」の(1)~(3)に載せているソースを使って新しいプロジェクトを作れば、Spring atch 6 で、「だんだん増えていく固定長レコード」が処理できます(笑)

Spring Batch は ver 6 大きくライブラリ構成などが変わっちゃったり、古い機能が完全に削除されたりしてるので、2026年6月現在の最新の Eclipse Pleiades All in One版を落としてきて Spring Batch の勉強をしようとすると、ネット上に公開されている色々なサンプルソースではまず動かない(^^;;;

AI に「Spring Batch 6 で動かすにはどうすればいいの?」と聞いても、3 やら 5 やらのソースばかり返してくるのよね(^^;;; AI はネット上の情報をかき集めて自分の知識としてるんだけど、そもそもネット上に pring Batch 6 に対応したソースなどの情報が少ないので、AI 自身その関係の知識がないのよね(笑)

でも、このエントリーのソースなら動きます(笑)
さあ、実際に動かして、俺に色々教えてくれたまえ。

周回レースの集計データの処理を想定した「どんどんレコード(行)が追加されていく複数のテキストデータを一定間隔(1秒)で読んで、追加されたレコードをDBに登録していく」バッチ処理(Spring
Batch 6)の話のつづき

ちなみに(1)で「仕様」に書き忘れてたけど、データベースへのアクセスは JDBC ではなく JPA を使っています。
俺的には全然 SQL は苦手ではないし、もっと言えば複雑な SQL を書くのは好きな方ですが(笑)、一昨年から昨年にかけて C#.NET
で「DB操作は LINQ」って案件やって、ああ、糞っ、直接生の SQL 書けば一発なのにと苦労した記憶があるので(^^;;;、敢えて JPA
を選んでみたわけです。勉強のために。

※LINQ でも直接 SQL 書けるじゃんってツッコミは無しで願います。言語仕様の話ではなく、コーディングルールで禁止されていたということなのよ。

ま、とういうわけで、今回はプロパティファイルや、テーブルの設定などを。

■Apache Maven プロジェクト設定ファイル(pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>4.1.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>BatchTest5</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>BatchTest5</name>
    <description/>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>21</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-h2console</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webmvc</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webmvc-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <annotationProcessorPaths>
                                <path>
                                    <groupId>org.projectlombok</groupId>
                                    <artifactId>lombok</artifactId>
                                </path>
                            </annotationProcessorPaths>
                        </configuration>
                    </execution>
                    <execution>
                        <id>default-testCompile</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <annotationProcessorPaths>
                                <path>
                                    <groupId>org.projectlombok</groupId>
                                    <artifactId>lombok</artifactId>
                                </path>
                            </annotationProcessorPaths>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

他のエントリーでも書いたけど、バッチ処理だけど H2 コンソールを動かすために tomcat
が必要なので、spring-boot-starter-webmvc モジュールが依存関係(dependency)として登録されていること。


■プロパティ(resource/application.properties)

spring.application.name=BatchTest5

# ファイル一覧
app.files-list-path=C:\\work\\files.txt

# H2 を PostgreSQL 互換モードで使用(テーブル名等は小文字で)
spring.datasource.url=jdbc:h2:./.data/h2/db;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE
spring.datasource.driver-class-name=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=

# H2 Console を有効化
spring.h2.console.enabled=true
spring.h2.console.path=/h2-console

# JPA/Hibernate設定
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.batch.job.enabled=false

# Webアプリケーションとして常駐
spring.main.web-application-type=servlet

特に注意するのは
spring.batch.job.enabled=false
かな。
これは起動時に自動でバッチジョブを実行しないようにしている。

Spring Boot はバッチ用のライブラリ(spring-boot-starter-batch)を検知すると、定義されている @Bean
のジョブを起動時にすべて片っ端から実行しようとしてしまう。今回はスケジューラでパラメータ(読み込むファイル名とか)を与えてジョブを起動する形にしているから、勝手にジョブを起動されちゃうと「パラメータが無い状態」の実行となりエラーが発生する。なので「自動で起動すんなよ」と抑制しているわけやね。

あと、app.files-list-path は「'app.files-list-path' is an unknown property.
[PROP_UNKNOWN_PROPERTY]」という警告が出るので、プロパティを認識させるためのメタデータを作ってやる。


■メタデータファイル(resource/META-INF/additional-spring-configuration-metadata.json)

{"properties": [{
  "name": "app.files-list-path",
  "type": "java.lang.String",
  "description": "A description for 'app.files-list-path'"
}]}


■Spring Boot メインクラス(BatchTest5Application.java)

package com.netandfield.test;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class BatchTest5Application {
    public static void main(String[] args) {
        SpringApplication.run(BatchTest5Application.class, args);
    }
}

Eclipse で Spring Batch のバッチを実行するときは、このファイルを Java アプリケーションとして実行する。


■DBマッピングクラス(FileProgress.java)
 ※各ファイルが何行目まで読まれているかを保持するテーブル(file_progress)

package com.netandfield.test;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;

import lombok.Data;

@Entity
@Table(name = "file_progress")
@Data
// 各ファイルの読み込み行数を管理するエンティティ
public class FileProgress {
    @Id
    private String filePath; // ファイルのフルパス
    private int lastReadLines; // 前回までに読み込み完了した行数
}

20260630_e2_01.jpg


■DBマッピングクラス(ProcessedData.java)
 ※読み込まれたデータが保存されるテーブル(processed_data)

package com.netandfield.test;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;

import lombok.Data;

@Entity
@Table(name = "processed_data")
@Data
// データを保存するエンティティ
public class ProcessedData {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String code;
    private String name;
    // データベース上の列名を「data_value」に退避させる(value は予約語なので)
    @Column(name = "data_value")
    private Integer value;
}

20260630_e2_02.jpg

id は、@Id アノテーションで主キーに設定され、データベースの Auto Increment機能を利用して自動採番されている。
id が 1~5までは続いて、急に 33に飛んでいるのは、DBに採番を依頼するとき、ある程度まとめて採番してもらっているから。

今回は、一度に 32個の ID を発行し、読み込んだデータに順に割り当てている。足らなければ再度採番依頼をするが、今回は最初の処理では
5件しかデータが無かったので、5番まで使って 6~32は捨てている。
そして、ファイルに新しい行が追加されたので次の処理が実行されるが、この時に前回の続きで、「33~64までの32個」のIDを採番し先頭から新しいレコードの
id に割り当てたというわけである。

せっかく採番した ID が捨てられるのはもったいないということなら、この同時採番される数を 1 にしても良いが、当然
1レコードずつ採番要求が発生するので処理スピードは落ちる。


■FileProgressクラスのインタフェース(FileProgressRepository.java)

package com.netandfield.test;

import org.springframework.data.jpa.repository.JpaRepository;

public interface FileProgressRepository extends
JpaRepository<FileProgress, String> {
}

org.springframework.data.jpa.repository.JpaRepository を継承しており、findById
や save といったメソッドが実行可能に。


ふう。Gemini に聞きながら、何とか以下のような仕様のバッチ処理完成。

■仕様(簡単に)

・Spring Batch 6(Spring Boot 4)で実行されるバッチ
・DBはとりあえず H2で
・複数の固定長ファイルを読み込み、項目ごとに分割しDBに登録する
・複数の固定長ファイルのパスはひとつのファイル(files.txt)に記述されている
・固定長ファイルのレコード数は追加されていく
・1秒ごとにファイルを全てチェックし、レコード数が増えていれば増えたレコードを読み込む

例えばどんどん新しい行(ラップタイム)がレース終了まで追加されていく周回レースの集計データの処理などを想定している。
(ゴールだけではなく、周回するコースの途中に設置された複数の計測機器(中間計測機)からのデータ(それぞれ別のネットワークストレージ上に計測データを吐いている(ただし、バッチ処理を行うPCには共有されている))を一括でDB登録するイメージ)


■処理対象のファイル一覧ファイル(C:\work\files.txt)

C:\work\test1.txt
C:\work\test\test2.txt


■データファイル1(C:\work\test1.txt)

A001SHINODA Masanori    12
A002SHINODA Nyunyumi    18

あとで、
C023Shiranui Shinobu    123
が追加される(手動で(笑))


■データファイル2(C:\work\test\test2.txt)

B001Tanaka Ichiro       68
B122Yamamoto Ahomaru    48
B245Yamamoto Murishi    47

あとで、
D100Shinoda Gengoromaru 5
が追加される(手動で(笑))


■データの構成(固定長)

ユーザID(code)... 1~4桁目(4桁)
ユーザ名(name)... 5~24桁目(20桁)
年齢(value)... 25~27桁(3桁)
※テーブルの項目名は data_value(value は H2データベースの予約語のため)


実際にこのようなデータを準備し、作成したバッチ処理を実行すると、DBにデータが登録される。
H2 コンソールで確認すると、最初の 5件+追加の 2件の計 7件のデータが登録されているのがわかる。

20260630_e2_02.jpg

自動採番している id が途中で飛んでいることや、データは大文字小文字混在だった name が大文字だけになっている点などは、またソースの説明のときに・・・

このアーカイブについて

このページには、2026年7月以降に書かれたブログ記事のうちプログラミングカテゴリに属しているものが含まれています。

前のアーカイブはプログラミング: 2026年6月です。

最近のコンテンツはインデックスページで見られます。過去に書かれたものはアーカイブのページで見られます。

月別 アーカイブ

電気ウナギ的○○ mobile ver.

携帯版「電気ウナギ的○○」はこちら