【PL/SQL】差分抽出・増分処理の設計|前回実行日時・再実行・重複防止まで

【PL/SQL】差分抽出・増分処理の設計|前回実行日時・再実行・重複防止まで PL/SQL

夜間バッチや外部連携で毎回全件を処理すると、データ量が増えたときに処理時間が伸び続けます。そこで使うのが、前回処理以降に変わったデータだけを取り込む差分抽出増分処理です。ただし、単純にupdated_at > 前回実行日時と書くだけでは、同時刻データ、遅延登録、再実行、コミット失敗、重複取込でつまずきます。

この記事では、PL/SQLで差分抽出・増分処理を安全に設計する方法を整理します。前回実行日時を保存する管理テーブル、ウォーターマーク、抽出範囲の固定、ステージングテーブル、重複防止、再実行までを1つの流れとして扱います。取込後の検証は ステージングテーブル設計、処理履歴の残し方は ジョブ実行履歴テーブル設計 も参考になります。

この記事で扱うこと

  • 全件処理と増分処理の使い分け
  • 前回実行日時を管理するテーブル設計
  • ウォーターマークによる抽出範囲の固定
  • 同時刻データ・遅延データの取りこぼし対策
  • ステージングテーブルへの差分取込
  • 再実行しても重複しないMERGE設計
  • 失敗時の復旧とチェックリスト
スポンサーリンク

差分抽出で解決したい問題

差分抽出の目的は、処理対象を減らしてバッチ時間を短くすることだけではありません。前回どこまで処理したかを明確にし、失敗時に同じ範囲を再実行できるようにし、外部連携先へ同じデータを重複送信しないようにすることも重要です。

処理時間を安定させる全件処理ではデータ量の増加に比例して処理時間が伸びます。差分処理なら変更分だけを対象にできます。
再実行範囲を説明できる前回成功時刻と今回の抽出上限が残っていれば、どの範囲を処理したか追跡できます。
取りこぼしを防ぐ同時刻データや遅延反映を考慮しないと、一部のデータが永久に処理されません。
重複を防ぐ再実行時に同じデータを二重登録しないよう、キー設計とMERGEが必要です。

差分抽出は、単なるWHERE句ではなく、運用手順を含む設計です。抽出範囲、処理状態、失敗時の戻し方、再実行時のふるまいをセットで決めます。

全件処理と増分処理の違い

全件処理は単純で、毎回すべてのデータを読み直します。件数が少ないマスタや、処理結果を毎回作り直す集計では有効です。一方で、注文、ログ、明細、外部連携データのように増え続けるテーブルでは、全件処理がすぐに重くなります。

全件処理が向くケース小さなマスタ、日次で完全再作成する集計、対象件数が安定して少ない処理です。
増分処理が向くケース大量明細、更新履歴、外部連携、監査ログ、日々追加されるトランザクションです。
ハイブリッドもある直近数日だけ差分処理し、月次で全件再計算して整合性を確認する方法もあります。

増分処理を選ぶ場合は、必ず「何をもって変更とみなすか」を決めます。updated_atなのか、連番IDなのか、更新履歴テーブルなのか、ソース側の仕様によって設計が変わります。

前回実行日時を管理する

差分抽出では、前回成功した抽出上限を保存する管理テーブルを用意します。ここで大切なのは、ジョブ開始時刻ではなく、前回どこまで正常に処理できたかを保存することです。

incremental-job-state-ddl.sql
CREATE TABLE incremental_job_state (
  job_name              VARCHAR2(100) PRIMARY KEY,
  last_success_to       TIMESTAMP,
  overlap_minutes       NUMBER DEFAULT 5 NOT NULL,
  status                VARCHAR2(20) DEFAULT 'IDLE' NOT NULL,
  last_started_at       TIMESTAMP,
  last_finished_at      TIMESTAMP,
  last_error_message    VARCHAR2(2000),
  updated_by            VARCHAR2(128) DEFAULT SYS_CONTEXT('USERENV','SESSION_USER') NOT NULL,
  updated_at            TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
  CONSTRAINT ck_incremental_state_status
    CHECK (status IN ('IDLE','RUNNING','FAILED'))
);

last_success_toは、前回処理が成功した抽出上限です。今回の抽出開始時刻ではありません。処理が途中で失敗した場合、この値を進めないことで、次回同じ範囲を再実行できます。

incremental-job-state-seed.sql
INSERT INTO incremental_job_state (
  job_name,
  last_success_to,
  overlap_minutes
) VALUES (
  'ORDER_INCREMENTAL_LOAD',
  TIMESTAMP '2026-06-01 00:00:00',
  5
);

ウォーターマークで抽出範囲を固定する

差分処理では、処理中に新しいデータが入ってくることを前提にします。そのため、開始時点で今回の抽出上限、つまりウォーターマークを固定します。固定せずにSYSTIMESTAMPを何度も参照すると、処理中に対象範囲が広がってしまいます。

fixed-watermark.sql
DECLARE
  v_job_name  VARCHAR2(100) := 'ORDER_INCREMENTAL_LOAD';
  v_from_ts   TIMESTAMP;
  v_to_ts     TIMESTAMP;
  v_overlap   NUMBER;
BEGIN
  SELECT
    last_success_to,
    SYSTIMESTAMP,
    overlap_minutes
  INTO
    v_from_ts,
    v_to_ts,
    v_overlap
  FROM incremental_job_state
  WHERE job_name = v_job_name
  FOR UPDATE;

  v_from_ts := v_from_ts - NUMTODSINTERVAL(v_overlap, 'MINUTE');

  DBMS_APPLICATION_INFO.SET_MODULE(
    module_name => v_job_name,
    action_name => 'from=' || TO_CHAR(v_from_ts, 'YYYY-MM-DD HH24:MI:SS')
  );

  -- ここから v_from_ts 以上、v_to_ts 未満を処理する
END;
/

抽出条件は、原則として>= fromかつ< toにします。上限を未満にすることで、次回の境界と扱いやすくなります。同じ時刻のデータを確実に拾うため、少し重ねて再抽出し、後段で重複排除する設計が安全です。重ね取りでは、同じデータが再び抽出されることを前提にします。そのため、後続処理にはMERGE、一意制約、冪等キーのいずれかを必ず入れます。

updated_atだけに頼る危険

updated_atを使った差分抽出は分かりやすいですが、注意点があります。アプリやバッチがupdated_atを更新しない場合、データは変わっているのに差分として拾えません。また、秒単位の時刻しか持たない場合、同じ秒に更新された複数行の境界処理で抜け漏れが起きることがあります。

同時刻データ境界時刻と同じupdated_atを持つデータを取りこぼさないよう、重複許容で再抽出します。
遅延データソース側で古いupdated_atのまま後から登録されるデータは、通常の差分条件では拾えません。
時刻精度DATE型や秒単位のTIMESTAMPでは、境界が粗くなるためキーとの組み合わせが必要になることがあります。
手動補正本番データ補正でupdated_atを変えない運用だと、差分処理に反映されません。

本番データ補正と差分処理が関係する場合は、補正時に差分対象へ載せるかどうかも決めておきます。補正作業の設計は 本番データ補正スクリプトの作り方 と合わせて考えると安全です。

差分データをステージングに入れる

抽出した差分データを直接本番テーブルへ反映するのではなく、一度ステージングテーブルに入れると、検証と再実行がしやすくなります。抽出範囲、取込バッチID、ソースキー、ハッシュ値、処理状態を持たせるのが基本です。

incremental-stage-ddl.sql
CREATE TABLE stg_order_incremental (
  batch_id        NUMBER       NOT NULL,
  source_order_id NUMBER       NOT NULL,
  customer_id     NUMBER,
  order_status    VARCHAR2(30),
  amount          NUMBER,
  source_updated_at TIMESTAMP,
  row_hash        VARCHAR2(64),
  process_status  VARCHAR2(20) DEFAULT 'NEW' NOT NULL,
  error_message    VARCHAR2(2000),
  loaded_at        TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
  CONSTRAINT pk_stg_order_incremental
    PRIMARY KEY (batch_id, source_order_id)
);

batch_idで今回の差分範囲を固定し、source_order_idでソース側の一意キーを持ちます。再実行時に同じbatch_idへ入れ直すのか、新しいbatch_idで同じ範囲を再処理するのかは運用方針で決めます。

insert-incremental-stage.sql
INSERT INTO stg_order_incremental (
  batch_id,
  source_order_id,
  customer_id,
  order_status,
  amount,
  source_updated_at,
  row_hash
)
SELECT
  :batch_id,
  o.order_id,
  o.customer_id,
  o.order_status,
  o.amount,
  o.updated_at,
  STANDARD_HASH(
    NVL(TO_CHAR(o.customer_id), '#NULL#') || '|'
    || NVL(o.order_status, '#NULL#') || '|'
    || NVL(TO_CHAR(o.amount, 'FM9999999999990D00', 'NLS_NUMERIC_CHARACTERS=.,'), '#NULL#'),
    'SHA256'
  ) AS row_hash
FROM source_orders o
WHERE o.updated_at >= :from_ts
  AND o.updated_at <  :to_ts;

ここではrow_hashを使い、値が変わったかを後続処理で比較できるようにしています。NULL、数値の暗黙変換、NLS設定の違いで誤判定しないよう、NVLTO_CHARで値を正規化してからハッシュ化します。区切り文字が値に含まれる可能性が高い列では、長さ付きで連結するなど、さらに厳密な形式にします。

MERGEで重複登録を防ぐ

増分データを本テーブルへ反映するときは、再実行を前提にMERGEを使います。同じソースキーが来たら更新し、存在しなければ挿入する形にしておくと、失敗後の再実行でも重複しにくくなります。

merge-incremental-target.sql
MERGE INTO order_summary d
USING (
  SELECT
    source_order_id,
    customer_id,
    order_status,
    amount,
    source_updated_at,
    row_hash
  FROM stg_order_incremental
  WHERE batch_id = :batch_id
    AND process_status = 'NEW'
) s
ON (d.source_order_id = s.source_order_id)
WHEN MATCHED THEN
  UPDATE SET
    d.customer_id = s.customer_id,
    d.order_status = s.order_status,
    d.amount = s.amount,
    d.source_updated_at = s.source_updated_at,
    d.row_hash = s.row_hash,
    d.updated_at = SYSTIMESTAMP
  WHERE NVL(d.row_hash, '-') <> NVL(s.row_hash, '-')
WHEN NOT MATCHED THEN
  INSERT (
    source_order_id,
    customer_id,
    order_status,
    amount,
    source_updated_at,
    row_hash,
    created_at,
    updated_at
  )
  VALUES (
    s.source_order_id,
    s.customer_id,
    s.order_status,
    s.amount,
    s.source_updated_at,
    s.row_hash,
    SYSTIMESTAMP,
    SYSTIMESTAMP
  );

WHEN MATCHEDWHERE句でハッシュ差分がある行だけ更新しています。これにより、重複再実行で同じ値を何度も更新することを避けられます。MERGEの排他や条件設計は、既存の PL/SQLのMERGE文とUPSERT も参考になります。

成功したときだけ前回実行日時を進める

差分処理で最も重要なのは、処理が成功したときだけlast_success_toを進めることです。ステージング投入だけ成功して本テーブル反映に失敗した場合や、コミット前に例外が出た場合に時刻だけ進めると、次回以降その範囲を処理できなくなります。

update-watermark-on-success.sql
UPDATE incremental_job_state
SET
  last_success_to = :to_ts,
  status = 'IDLE',
  last_finished_at = SYSTIMESTAMP,
  last_error_message = NULL,
  updated_at = SYSTIMESTAMP
WHERE job_name = 'ORDER_INCREMENTAL_LOAD';

COMMIT;

逆に、途中で失敗した場合はlast_success_toを更新しません。エラー情報だけを残し、次回同じ範囲を再実行できるようにします。トランザクション境界の考え方は PL/SQLのトランザクション設計 と合わせて押さえておきましょう。

rollback-and-mark-failed.sql
-- まず本処理の変更を戻す
ROLLBACK;

-- その後、失敗状態を別トランザクションとして記録する
UPDATE incremental_job_state
SET
  status = 'FAILED',
  last_finished_at = SYSTIMESTAMP,
  last_error_message = :error_message,
  updated_at = SYSTIMESTAMP
WHERE job_name = 'ORDER_INCREMENTAL_LOAD';

COMMIT;

失敗時の状態更新は、本処理のROLLBACK後に別トランザクションとして残します。先にUPDATE incremental_job_stateしてからROLLBACKすると、失敗状態まで取り消されてしまいます。PL/SQLパッケージ化する場合は、失敗ログだけを残す小さな手続きを用意し、呼び出し側でエラーメッセージを渡す形にすると安全です。自律トランザクションを使う場合も、何を確実に残すための別トランザクションなのかを明確にします。

遅延データをどう拾うか

差分抽出で厄介なのは、後から古い更新日時のデータが届くケースです。たとえば外部システムからの連携が遅れ、updated_atは昨日のまま、DBへの登録は今日になるようなケースです。この場合、updated_atだけでは差分対象から漏れます。

重ね取りする前回成功時刻から数分から数時間だけ戻して抽出し、MERGEで重複を吸収します。
登録日時も見るsource_updated_atだけでなく、created_atやreceived_atも差分条件に入れます。
履歴テーブルを使う変更イベントや連携受信履歴を別テーブルに残し、その履歴を差分元にします。
日次リカバリを用意する直近数日を再照合するバッチを別に持ち、遅延データを拾います。
delayed-data-condition.sql
WHERE (
      o.updated_at >= :from_ts
  AND o.updated_at <  :to_ts
)
OR (
      o.received_at >= :from_ts
  AND o.received_at <  :to_ts
)

このように、業務上の更新時刻と、システムが受け取った時刻を分けて考えると、遅延データに強くなります。どちらを主キーにするかではなく、何を取りこぼしたくないかから決めます。

同時実行を防ぐ

同じ増分ジョブが二重起動すると、同じ範囲を同時に処理して競合することがあります。管理テーブルの行をFOR UPDATEでロックし、同時実行を防ぎます。より厳密に制御したい場合はDBMS_LOCKも候補です。

prevent-double-run.sql
SELECT last_success_to, overlap_minutes
INTO   v_last_success_to, v_overlap_minutes
FROM   incremental_job_state
WHERE  job_name = 'ORDER_INCREMENTAL_LOAD'
FOR UPDATE NOWAIT;

UPDATE incremental_job_state
SET
  status = 'RUNNING',
  last_started_at = SYSTIMESTAMP,
  updated_at = SYSTIMESTAMP
WHERE job_name = 'ORDER_INCREMENTAL_LOAD';

NOWAITでロックできなければ、すでに別セッションが実行中の可能性があります。その場合は処理を中止し、ジョブ管理側に二重起動として通知します。ロック制御の考え方は DBMS_LOCKの使い方 とも関連します。

外部連携では送信済みキーを持つ

差分抽出したデータを外部APIへ送る場合は、送信済みキーを管理します。差分処理が再実行されると、同じデータをもう一度送る可能性があるためです。送信先が冪等キーを受け付けるなら、ソースキーと更新時刻から一意なキーを作ります。

idempotency-key.sql
SELECT
  source_order_id,
  source_updated_at,
  'ORDER:' || source_order_id || ':'
    || TO_CHAR(source_updated_at, 'YYYYMMDDHH24MISSFF3') AS idempotency_key
FROM stg_order_incremental
WHERE batch_id = :batch_id;

外部連携では、DB側の差分抽出だけでなく、送信、応答、再送の状態管理も必要です。このあたりは アウトボックスパターンと再送キュー設計 と組み合わせると安全に設計できます。

大規模データでは分割処理する

差分とはいえ、障害復旧や初回移行では大量データを処理することがあります。その場合は、抽出範囲を日付やIDで分割し、処理単位を小さくします。大量更新や再実行が必要な場合は、OracleのDBMS_PARALLEL_EXECUTEも候補になります。

chunk-by-updated-at.sql
SELECT
  TRUNC(updated_at, 'HH24') AS chunk_hour,
  COUNT(*) AS rows_count
FROM source_orders
WHERE updated_at >= :from_ts
  AND updated_at <  :to_ts
GROUP BY TRUNC(updated_at, 'HH24')
ORDER BY chunk_hour;

チャンク単位で件数を確認しておくと、特定時間帯だけ異常に多い場合にも気づけます。大規模処理の分割は DBMS_PARALLEL_EXECUTE完全ガイド も参考になります。

やってはいけない差分抽出

最後に、避けるべきパターンを整理します。差分処理は一見シンプルですが、次のような実装は本番で事故につながります。

処理開始時刻を前回時刻として保存する途中失敗しても時刻だけ進み、未処理データを取りこぼします。
updated_at > 前回時刻だけで抽出する同時刻データや時刻精度の問題で境界データが漏れることがあります。
抽出上限を固定しない処理中に対象範囲が広がり、件数や再実行範囲を説明できません。
再実行でINSERTだけ行う同じデータが二重登録されます。MERGEや一意制約で防ぎます。
成功前にlast_success_toを進める本処理が失敗した範囲を次回処理できなくなります。
エラー行を捨てるどのデータが失敗したか分からず、再処理できません。

設計チェックリスト

差分抽出・増分処理を実装する前に、次の点を確認します。ここを決めずに実装すると、後から再実行や障害対応で苦しくなります。

  • 変更判定に使う列が明確になっている
  • 前回成功した抽出上限を保存している
  • 今回の抽出上限を処理開始時に固定している
  • 境界時刻の同時刻データを取りこぼさない設計になっている
  • 遅延データを拾う方法を決めている
  • ステージングにbatch_idとソースキーを持たせている
  • 再実行しても重複しないMERGEまたは一意制約がある
  • 成功したときだけlast_success_toを進めている
  • 失敗時に同じ範囲を再実行できる
  • 外部連携では冪等キーや送信済み状態を管理している

まとめ

PL/SQLの差分抽出・増分処理は、updated_atのWHERE句だけで完成するものではありません。前回成功時刻、今回のウォーターマーク、抽出範囲の固定、ステージング、MERGE、再実行、遅延データ対策までをセットで設計する必要があります。

特に重要なのは、成功したときだけ前回実行日時を進めること、境界データを取りこぼさないこと、再実行しても重複しないことです。この3点を守ると、バッチ時間を短縮しながら、障害時にも説明できる増分処理になります。