夜間バッチや外部連携で毎回全件を処理すると、データ量が増えたときに処理時間が伸び続けます。そこで使うのが、前回処理以降に変わったデータだけを取り込む差分抽出や増分処理です。ただし、単純にupdated_at > 前回実行日時と書くだけでは、同時刻データ、遅延登録、再実行、コミット失敗、重複取込でつまずきます。
この記事では、PL/SQLで差分抽出・増分処理を安全に設計する方法を整理します。前回実行日時を保存する管理テーブル、ウォーターマーク、抽出範囲の固定、ステージングテーブル、重複防止、再実行までを1つの流れとして扱います。取込後の検証は ステージングテーブル設計、処理履歴の残し方は ジョブ実行履歴テーブル設計 も参考になります。
- 全件処理と増分処理の使い分け
- 前回実行日時を管理するテーブル設計
- ウォーターマークによる抽出範囲の固定
- 同時刻データ・遅延データの取りこぼし対策
- ステージングテーブルへの差分取込
- 再実行しても重複しないMERGE設計
- 失敗時の復旧とチェックリスト
差分抽出で解決したい問題
差分抽出の目的は、処理対象を減らしてバッチ時間を短くすることだけではありません。前回どこまで処理したかを明確にし、失敗時に同じ範囲を再実行できるようにし、外部連携先へ同じデータを重複送信しないようにすることも重要です。
差分抽出は、単なるWHERE句ではなく、運用手順を含む設計です。抽出範囲、処理状態、失敗時の戻し方、再実行時のふるまいをセットで決めます。
全件処理と増分処理の違い
全件処理は単純で、毎回すべてのデータを読み直します。件数が少ないマスタや、処理結果を毎回作り直す集計では有効です。一方で、注文、ログ、明細、外部連携データのように増え続けるテーブルでは、全件処理がすぐに重くなります。
増分処理を選ぶ場合は、必ず「何をもって変更とみなすか」を決めます。updated_atなのか、連番IDなのか、更新履歴テーブルなのか、ソース側の仕様によって設計が変わります。
前回実行日時を管理する
差分抽出では、前回成功した抽出上限を保存する管理テーブルを用意します。ここで大切なのは、ジョブ開始時刻ではなく、前回どこまで正常に処理できたかを保存することです。
CREATE TABLE incremental_job_state (
job_name VARCHAR2(100) PRIMARY KEY,
last_success_to TIMESTAMP,
overlap_minutes NUMBER DEFAULT 5 NOT NULL,
status VARCHAR2(20) DEFAULT 'IDLE' NOT NULL,
last_started_at TIMESTAMP,
last_finished_at TIMESTAMP,
last_error_message VARCHAR2(2000),
updated_by VARCHAR2(128) DEFAULT SYS_CONTEXT('USERENV','SESSION_USER') NOT NULL,
updated_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
CONSTRAINT ck_incremental_state_status
CHECK (status IN ('IDLE','RUNNING','FAILED'))
);
last_success_toは、前回処理が成功した抽出上限です。今回の抽出開始時刻ではありません。処理が途中で失敗した場合、この値を進めないことで、次回同じ範囲を再実行できます。
INSERT INTO incremental_job_state ( job_name, last_success_to, overlap_minutes ) VALUES ( 'ORDER_INCREMENTAL_LOAD', TIMESTAMP '2026-06-01 00:00:00', 5 );
ウォーターマークで抽出範囲を固定する
差分処理では、処理中に新しいデータが入ってくることを前提にします。そのため、開始時点で今回の抽出上限、つまりウォーターマークを固定します。固定せずにSYSTIMESTAMPを何度も参照すると、処理中に対象範囲が広がってしまいます。
DECLARE
v_job_name VARCHAR2(100) := 'ORDER_INCREMENTAL_LOAD';
v_from_ts TIMESTAMP;
v_to_ts TIMESTAMP;
v_overlap NUMBER;
BEGIN
SELECT
last_success_to,
SYSTIMESTAMP,
overlap_minutes
INTO
v_from_ts,
v_to_ts,
v_overlap
FROM incremental_job_state
WHERE job_name = v_job_name
FOR UPDATE;
v_from_ts := v_from_ts - NUMTODSINTERVAL(v_overlap, 'MINUTE');
DBMS_APPLICATION_INFO.SET_MODULE(
module_name => v_job_name,
action_name => 'from=' || TO_CHAR(v_from_ts, 'YYYY-MM-DD HH24:MI:SS')
);
-- ここから v_from_ts 以上、v_to_ts 未満を処理する
END;
/
抽出条件は、原則として>= fromかつ< toにします。上限を未満にすることで、次回の境界と扱いやすくなります。同じ時刻のデータを確実に拾うため、少し重ねて再抽出し、後段で重複排除する設計が安全です。重ね取りでは、同じデータが再び抽出されることを前提にします。そのため、後続処理にはMERGE、一意制約、冪等キーのいずれかを必ず入れます。
updated_atだけに頼る危険
updated_atを使った差分抽出は分かりやすいですが、注意点があります。アプリやバッチがupdated_atを更新しない場合、データは変わっているのに差分として拾えません。また、秒単位の時刻しか持たない場合、同じ秒に更新された複数行の境界処理で抜け漏れが起きることがあります。
本番データ補正と差分処理が関係する場合は、補正時に差分対象へ載せるかどうかも決めておきます。補正作業の設計は 本番データ補正スクリプトの作り方 と合わせて考えると安全です。
差分データをステージングに入れる
抽出した差分データを直接本番テーブルへ反映するのではなく、一度ステージングテーブルに入れると、検証と再実行がしやすくなります。抽出範囲、取込バッチID、ソースキー、ハッシュ値、処理状態を持たせるのが基本です。
CREATE TABLE stg_order_incremental (
batch_id NUMBER NOT NULL,
source_order_id NUMBER NOT NULL,
customer_id NUMBER,
order_status VARCHAR2(30),
amount NUMBER,
source_updated_at TIMESTAMP,
row_hash VARCHAR2(64),
process_status VARCHAR2(20) DEFAULT 'NEW' NOT NULL,
error_message VARCHAR2(2000),
loaded_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
CONSTRAINT pk_stg_order_incremental
PRIMARY KEY (batch_id, source_order_id)
);
batch_idで今回の差分範囲を固定し、source_order_idでソース側の一意キーを持ちます。再実行時に同じbatch_idへ入れ直すのか、新しいbatch_idで同じ範囲を再処理するのかは運用方針で決めます。
INSERT INTO stg_order_incremental (
batch_id,
source_order_id,
customer_id,
order_status,
amount,
source_updated_at,
row_hash
)
SELECT
:batch_id,
o.order_id,
o.customer_id,
o.order_status,
o.amount,
o.updated_at,
STANDARD_HASH(
NVL(TO_CHAR(o.customer_id), '#NULL#') || '|'
|| NVL(o.order_status, '#NULL#') || '|'
|| NVL(TO_CHAR(o.amount, 'FM9999999999990D00', 'NLS_NUMERIC_CHARACTERS=.,'), '#NULL#'),
'SHA256'
) AS row_hash
FROM source_orders o
WHERE o.updated_at >= :from_ts
AND o.updated_at < :to_ts;
ここではrow_hashを使い、値が変わったかを後続処理で比較できるようにしています。NULL、数値の暗黙変換、NLS設定の違いで誤判定しないよう、NVLとTO_CHARで値を正規化してからハッシュ化します。区切り文字が値に含まれる可能性が高い列では、長さ付きで連結するなど、さらに厳密な形式にします。
MERGEで重複登録を防ぐ
増分データを本テーブルへ反映するときは、再実行を前提にMERGEを使います。同じソースキーが来たら更新し、存在しなければ挿入する形にしておくと、失敗後の再実行でも重複しにくくなります。
MERGE INTO order_summary d
USING (
SELECT
source_order_id,
customer_id,
order_status,
amount,
source_updated_at,
row_hash
FROM stg_order_incremental
WHERE batch_id = :batch_id
AND process_status = 'NEW'
) s
ON (d.source_order_id = s.source_order_id)
WHEN MATCHED THEN
UPDATE SET
d.customer_id = s.customer_id,
d.order_status = s.order_status,
d.amount = s.amount,
d.source_updated_at = s.source_updated_at,
d.row_hash = s.row_hash,
d.updated_at = SYSTIMESTAMP
WHERE NVL(d.row_hash, '-') <> NVL(s.row_hash, '-')
WHEN NOT MATCHED THEN
INSERT (
source_order_id,
customer_id,
order_status,
amount,
source_updated_at,
row_hash,
created_at,
updated_at
)
VALUES (
s.source_order_id,
s.customer_id,
s.order_status,
s.amount,
s.source_updated_at,
s.row_hash,
SYSTIMESTAMP,
SYSTIMESTAMP
);
WHEN MATCHEDのWHERE句でハッシュ差分がある行だけ更新しています。これにより、重複再実行で同じ値を何度も更新することを避けられます。MERGEの排他や条件設計は、既存の PL/SQLのMERGE文とUPSERT も参考になります。
成功したときだけ前回実行日時を進める
差分処理で最も重要なのは、処理が成功したときだけlast_success_toを進めることです。ステージング投入だけ成功して本テーブル反映に失敗した場合や、コミット前に例外が出た場合に時刻だけ進めると、次回以降その範囲を処理できなくなります。
UPDATE incremental_job_state SET last_success_to = :to_ts, status = 'IDLE', last_finished_at = SYSTIMESTAMP, last_error_message = NULL, updated_at = SYSTIMESTAMP WHERE job_name = 'ORDER_INCREMENTAL_LOAD'; COMMIT;
逆に、途中で失敗した場合はlast_success_toを更新しません。エラー情報だけを残し、次回同じ範囲を再実行できるようにします。トランザクション境界の考え方は PL/SQLのトランザクション設計 と合わせて押さえておきましょう。
-- まず本処理の変更を戻す ROLLBACK; -- その後、失敗状態を別トランザクションとして記録する UPDATE incremental_job_state SET status = 'FAILED', last_finished_at = SYSTIMESTAMP, last_error_message = :error_message, updated_at = SYSTIMESTAMP WHERE job_name = 'ORDER_INCREMENTAL_LOAD'; COMMIT;
失敗時の状態更新は、本処理のROLLBACK後に別トランザクションとして残します。先にUPDATE incremental_job_stateしてからROLLBACKすると、失敗状態まで取り消されてしまいます。PL/SQLパッケージ化する場合は、失敗ログだけを残す小さな手続きを用意し、呼び出し側でエラーメッセージを渡す形にすると安全です。自律トランザクションを使う場合も、何を確実に残すための別トランザクションなのかを明確にします。
遅延データをどう拾うか
差分抽出で厄介なのは、後から古い更新日時のデータが届くケースです。たとえば外部システムからの連携が遅れ、updated_atは昨日のまま、DBへの登録は今日になるようなケースです。この場合、updated_atだけでは差分対象から漏れます。
WHERE (
o.updated_at >= :from_ts
AND o.updated_at < :to_ts
)
OR (
o.received_at >= :from_ts
AND o.received_at < :to_ts
)
このように、業務上の更新時刻と、システムが受け取った時刻を分けて考えると、遅延データに強くなります。どちらを主キーにするかではなく、何を取りこぼしたくないかから決めます。
同時実行を防ぐ
同じ増分ジョブが二重起動すると、同じ範囲を同時に処理して競合することがあります。管理テーブルの行をFOR UPDATEでロックし、同時実行を防ぎます。より厳密に制御したい場合はDBMS_LOCKも候補です。
SELECT last_success_to, overlap_minutes INTO v_last_success_to, v_overlap_minutes FROM incremental_job_state WHERE job_name = 'ORDER_INCREMENTAL_LOAD' FOR UPDATE NOWAIT; UPDATE incremental_job_state SET status = 'RUNNING', last_started_at = SYSTIMESTAMP, updated_at = SYSTIMESTAMP WHERE job_name = 'ORDER_INCREMENTAL_LOAD';
NOWAITでロックできなければ、すでに別セッションが実行中の可能性があります。その場合は処理を中止し、ジョブ管理側に二重起動として通知します。ロック制御の考え方は DBMS_LOCKの使い方 とも関連します。
外部連携では送信済みキーを持つ
差分抽出したデータを外部APIへ送る場合は、送信済みキーを管理します。差分処理が再実行されると、同じデータをもう一度送る可能性があるためです。送信先が冪等キーを受け付けるなら、ソースキーと更新時刻から一意なキーを作ります。
SELECT
source_order_id,
source_updated_at,
'ORDER:' || source_order_id || ':'
|| TO_CHAR(source_updated_at, 'YYYYMMDDHH24MISSFF3') AS idempotency_key
FROM stg_order_incremental
WHERE batch_id = :batch_id;
外部連携では、DB側の差分抽出だけでなく、送信、応答、再送の状態管理も必要です。このあたりは アウトボックスパターンと再送キュー設計 と組み合わせると安全に設計できます。
大規模データでは分割処理する
差分とはいえ、障害復旧や初回移行では大量データを処理することがあります。その場合は、抽出範囲を日付やIDで分割し、処理単位を小さくします。大量更新や再実行が必要な場合は、OracleのDBMS_PARALLEL_EXECUTEも候補になります。
SELECT TRUNC(updated_at, 'HH24') AS chunk_hour, COUNT(*) AS rows_count FROM source_orders WHERE updated_at >= :from_ts AND updated_at < :to_ts GROUP BY TRUNC(updated_at, 'HH24') ORDER BY chunk_hour;
チャンク単位で件数を確認しておくと、特定時間帯だけ異常に多い場合にも気づけます。大規模処理の分割は DBMS_PARALLEL_EXECUTE完全ガイド も参考になります。
やってはいけない差分抽出
最後に、避けるべきパターンを整理します。差分処理は一見シンプルですが、次のような実装は本番で事故につながります。
設計チェックリスト
差分抽出・増分処理を実装する前に、次の点を確認します。ここを決めずに実装すると、後から再実行や障害対応で苦しくなります。
- 変更判定に使う列が明確になっている
- 前回成功した抽出上限を保存している
- 今回の抽出上限を処理開始時に固定している
- 境界時刻の同時刻データを取りこぼさない設計になっている
- 遅延データを拾う方法を決めている
- ステージングにbatch_idとソースキーを持たせている
- 再実行しても重複しないMERGEまたは一意制約がある
- 成功したときだけlast_success_toを進めている
- 失敗時に同じ範囲を再実行できる
- 外部連携では冪等キーや送信済み状態を管理している
まとめ
PL/SQLの差分抽出・増分処理は、updated_atのWHERE句だけで完成するものではありません。前回成功時刻、今回のウォーターマーク、抽出範囲の固定、ステージング、MERGE、再実行、遅延データ対策までをセットで設計する必要があります。
特に重要なのは、成功したときだけ前回実行日時を進めること、境界データを取りこぼさないこと、再実行しても重複しないことです。この3点を守ると、バッチ時間を短縮しながら、障害時にも説明できる増分処理になります。
