【PL/SQL】BULK COLLECTとFORALLの再実行設計|部分失敗・二重処理を防ぐ

【PL/SQL】BULK COLLECTとFORALLによる一括処理パターンの最適化 PL/SQL

BULK COLLECTFORALLは、PL/SQLとSQLエンジン間の往復を減らし、大量DMLを高速化します。しかし、本番運用で難しいのは速度よりも途中で一部の行だけ失敗した後、どこから安全に再実行するかです。

SAVE EXCEPTIONSを付けるだけでは再実行設計は完成しません。成功した行を再投入すると二重登録になり、失敗行を誤った添字で記録すると原因を追えず、ステータス更新とDMLのコミットが分かれると処理済み状態だけが残ることがあります。

安全なバルク処理の原則

  • 取得用・適用用とも、同じ添字を持つスカラー配列で管理します。
  • 検証済みデータを密な配列へ詰め替えてからFORALLへ渡します。
  • SAVE EXCEPTIONSの情報は、次のSQLを実行する前に別配列へ退避します。
  • 成功行だけをDONE、失敗行だけをERRORへ更新します。
  • 対象表への反映は、業務キーによるMERGEなど冪等なDMLにします。
  • 対象表、エラーログ、処理状態を同じトランザクションで確定します。

基本構文を網羅的に確認する場合はPL/SQLバルク処理完全ガイド、ORA-24381と個別エラーの扱いはFORALL SAVE EXCEPTIONSで失敗行を記録する方法を参照してください。本記事は、部分失敗後の再実行と整合性へ焦点を絞ります。

スポンサーリンク

処理単位と状態遷移を先に決める

例では、受信データをapp.order_stageへ保存し、正常行をapp.ordersへ反映します。ステージング行はREADYPROCESSINGDONEERRORの状態を持ちます。

  • READY:未処理、または再実行対象
  • PROCESSING:現在のワーカーが確保済み
  • DONE:対象表への反映と状態更新が同じコミットで完了
  • ERROR:検証エラーまたはDMLエラー。原因を保存済み
bulk-retry-schema.sql
CREATE TABLE app.order_stage (
  stage_id       NUMBER        CONSTRAINT pk_order_stage PRIMARY KEY,
  source_key     VARCHAR2(100) NOT NULL,
  customer_id    NUMBER        NOT NULL,
  amount         NUMBER(18, 2),
  process_status VARCHAR2(20)   DEFAULT 'READY' NOT NULL,
  error_code     NUMBER,
  error_message  VARCHAR2(1000),
  batch_id       VARCHAR2(32),
  attempt_count  NUMBER        DEFAULT 0 NOT NULL,
  processed_at   TIMESTAMP,
  CONSTRAINT uq_order_stage_source UNIQUE (source_key),
  CONSTRAINT ck_order_stage_status
    CHECK (process_status IN ('READY', 'PROCESSING', 'DONE', 'ERROR'))
);

CREATE TABLE app.orders (
  order_id    NUMBER        CONSTRAINT pk_orders PRIMARY KEY,
  source_key  VARCHAR2(100) NOT NULL,
  customer_id NUMBER        NOT NULL,
  amount      NUMBER(12, 2) NOT NULL,
  updated_at  TIMESTAMP     NOT NULL,
  CONSTRAINT uq_orders_source UNIQUE (source_key)
);

CREATE TABLE app.order_bulk_error (
  stage_id      NUMBER,
  source_key    VARCHAR2(100),
  batch_id      VARCHAR2(32),
  attempt_no    NUMBER,
  error_code    NUMBER,
  error_message VARCHAR2(1000),
  logged_at     TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
);

CREATE SEQUENCE app.orders_seq
    START WITH 1
    INCREMENT BY 1
    CACHE 100;

CREATE INDEX app.idx_order_stage_status_id
    ON app.order_stage (process_status, stage_id);

source_keyは送信元が発行する一意な業務キーです。対象表にも一意制約を設け、同じ入力を再実行しても同じ行へ収束させます。シーケンスで発行した新しいIDだけを冪等性の根拠にすると、再実行のたびに別行として登録されます。

batch_idは1回の起動を識別し、attempt_countは各行を処理した回数を記録します。同じ行が複数回失敗しても、どの起動の何回目で発生したエラーかを追跡できます。

テストデータを用意する

正常行、金額がNULLの検証エラー、桁あふれを起こすDMLエラーを混在させます。エラー後に修正して再実行できることまで確認します。

bulk-retry-test-data.sql
INSERT INTO app.order_stage (
  stage_id, source_key, customer_id, amount, process_status
) VALUES (101, 'SRC-101', 10, 1200, 'READY');

INSERT INTO app.order_stage (
  stage_id, source_key, customer_id, amount, process_status
) VALUES (102, 'SRC-102', 20, NULL, 'READY');

INSERT INTO app.order_stage (
  stage_id, source_key, customer_id, amount, process_status
) VALUES (103, 'SRC-103', 30, 999999999999.99, 'READY');

INSERT INTO app.order_stage (
  stage_id, source_key, customer_id, amount, process_status
) VALUES (104, 'SRC-104', 40, 2500, 'READY');

COMMIT;

レコード配列ではなくスカラー配列へ取得する

Oracleのバージョン差やDML形式による制約を避けるため、FORALLで参照する値は列ごとのスカラー配列へ取得します。すべての配列が同じ添字を持つことが前提です。

parallel-scalar-collections.sql
DECLARE
  TYPE t_number_tab IS TABLE OF NUMBER
    INDEX BY PLS_INTEGER;
  TYPE t_varchar_tab IS TABLE OF VARCHAR2(100)
    INDEX BY PLS_INTEGER;

  v_stage_ids    t_number_tab;
  v_source_keys  t_varchar_tab;
  v_customer_ids t_number_tab;
  v_amounts      t_number_tab;
  v_attempts     t_number_tab;

BEGIN
  SELECT stage_id, source_key, customer_id, amount, attempt_count
  BULK COLLECT INTO
    v_stage_ids,
    v_source_keys,
    v_customer_ids,
    v_amounts,
    v_attempts
  FROM (
    SELECT stage_id, source_key, customer_id, amount, attempt_count
    FROM app.order_stage
    WHERE process_status = 'READY'
    ORDER BY stage_id
  )
  WHERE ROWNUM <= 500;
END;
/

完全版では、READY行から最大500件をチャンクごとに再取得します。開いたカーソルをコミット後も読み続けないため、確定済みチャンクを次の取得対象から自然に除外できます。500件は正解値ではないので、PGA、経過時間、UNDO、ロック時間を測定して調整します。BULK化すべき件数や測定方法はカーソルFORループをBULK化する判断基準も参考になります。

テンプレートの実行前提
以下の完全版は単一ワーカーで順番に処理する構成です。複数ワーカーを同時起動する場合は、別トランザクションで対象行をFOR UPDATE SKIP LOCKEDにより確保し、PROCESSINGへ変更してコミットしてから、確保したbatch_idだけを処理してください。

検証済み行を密な適用配列へ詰め替える

取得配列から要素をDELETEして稀疎化するより、正常行だけを1から始まる別配列へ詰め替える方が、ERROR_INDEXとの対応が単純になります。検証エラーはDMLへ渡す前にERRORへ更新します。

compact-valid-rows.sql
v_apply_count := 0;
v_validation_count := 0;

FOR i IN 1 .. v_stage_ids.COUNT LOOP
  IF v_amounts(i) IS NULL OR v_amounts(i) < 0 THEN
    v_validation_count := v_validation_count + 1;
    v_validation_stage_ids(v_validation_count) := v_stage_ids(i);
    v_validation_source_keys(v_validation_count) := v_source_keys(i);
    v_validation_attempts(v_validation_count) := v_attempts(i) + 1;
    v_validation_codes(v_validation_count) := -20001;
    v_validation_messages(v_validation_count) := 'amount must be zero or greater';
  ELSE
    v_apply_count := v_apply_count + 1;
    v_apply_stage_ids(v_apply_count) := v_stage_ids(i);
    v_apply_source_keys(v_apply_count) := v_source_keys(i);
    v_apply_customer_ids(v_apply_count) := v_customer_ids(i);
    v_apply_amounts(v_apply_count) := v_amounts(i);
  END IF;
END LOOP;

IF v_validation_count > 0 THEN
  FORALL i IN 1 .. v_validation_count
    INSERT INTO app.order_bulk_error (
      stage_id, source_key, batch_id, attempt_no,
      error_code, error_message
    ) VALUES (
      v_validation_stage_ids(i),
      v_validation_source_keys(i),
      v_batch_id,
      v_validation_attempts(i),
      v_validation_codes(i),
      v_validation_messages(i)
    );

  FORALL i IN 1 .. v_validation_count
    UPDATE app.order_stage
    SET process_status = 'ERROR',
        error_code = v_validation_codes(i),
        error_message = v_validation_messages(i),
        processed_at = SYSTIMESTAMP
    WHERE stage_id = v_validation_stage_ids(i);
END IF;

FORALLへ空配列を渡すと例外になるため、必ずCOUNT > 0を確認します。正常行を密に詰め替えれば、FORALL i IN 1 .. v_apply_countのiと適用配列の添字が一致します。

MERGEで再実行しても同じ結果へ収束させる

対象表への反映はsource_keyで一致させます。初回はINSERT、再実行時はUPDATEになるため、同じ入力から別の注文行が増えません。

idempotent-forall-merge.sql
v_failed.DELETE;
v_error_count := 0;

IF v_apply_count > 0 THEN
  BEGIN
    FORALL i IN 1 .. v_apply_count SAVE EXCEPTIONS
      MERGE INTO app.orders dst
      USING (
        SELECT
          v_apply_source_keys(i) AS source_key,
          v_apply_customer_ids(i) AS customer_id,
          v_apply_amounts(i) AS amount
        FROM dual
      ) src
      ON (dst.source_key = src.source_key)
      WHEN MATCHED THEN
        UPDATE SET
          dst.customer_id = src.customer_id,
          dst.amount = src.amount,
          dst.updated_at = SYSTIMESTAMP
      WHEN NOT MATCHED THEN
        INSERT (
          order_id, source_key, customer_id, amount, updated_at
        )
        VALUES (
          app.orders_seq.NEXTVAL,
          src.source_key,
          src.customer_id,
          src.amount,
          SYSTIMESTAMP
        );
  EXCEPTION
    WHEN e_bulk_errors THEN
      -- 次のSQLで暗黙カーソル情報が変わる前に退避する
      FOR j IN 1 .. SQL%BULK_EXCEPTIONS.COUNT LOOP
        v_error_count := v_error_count + 1;
        v_error_indexes(v_error_count) :=
          SQL%BULK_EXCEPTIONS(j).ERROR_INDEX;
        v_error_codes(v_error_count) :=
          SQL%BULK_EXCEPTIONS(j).ERROR_CODE;
        v_failed(SQL%BULK_EXCEPTIONS(j).ERROR_INDEX) := 1;
      END LOOP;
  END;
END IF;

SQL%BULK_EXCEPTIONS(j).ERROR_INDEXは、失敗したFORALL実行の番号です。この例は密な1..v_apply_countを使っているため、そのまま適用配列の添字として扱えます。

先にエラーログをINSERTしない
SQL%BULK_EXCEPTIONSは暗黙カーソルの情報です。例外ハンドラ内で別のINSERTやUPDATEを実行する前に、ERROR_INDEXとERROR_CODEをPL/SQL配列へ退避してください。

成功行と失敗行の配列を分ける

MERGE後は失敗フラグを見て、成功したstage_idと失敗したstage_idを別々の密な配列へ詰め替えます。失敗行のメッセージは、退避済みのエラーコードから生成します。

split-success-and-errors.sql
v_success_count := 0;

FOR i IN 1 .. v_apply_count LOOP
  IF NOT v_failed.EXISTS(i) THEN
    v_success_count := v_success_count + 1;
    v_success_stage_ids(v_success_count) := v_apply_stage_ids(i);
  END IF;
END LOOP;

FOR j IN 1 .. v_error_count LOOP
  v_index := v_error_indexes(j);
  v_error_stage_ids(j) := v_apply_stage_ids(v_index);
  v_error_source_keys(j) := v_apply_source_keys(v_index);
  v_error_attempts(j) := v_apply_attempts(v_index);
  v_error_messages(j) := SQLERRM(-v_error_codes(j));
END LOOP;

ここで使うSQLERRMのメッセージは、エラーによって置換文字列が省略されることがあります。業務調査に必要な入力値、stage_id、source_keyは別列で必ず保存します。

対象表・状態・エラーログを同じトランザクションで確定する

成功行をDONE、失敗行をERRORへ更新し、エラーログを保存します。これらを対象表へのMERGEと同じトランザクションでコミットすれば、「反映済みなのにREADY」「未反映なのにDONE」という不整合を防げます。

finalize-bulk-chunk.sql
IF v_success_count > 0 THEN
  FORALL i IN 1 .. v_success_count
    UPDATE app.order_stage
    SET process_status = 'DONE',
        error_code = NULL,
        error_message = NULL,
        batch_id = v_batch_id,
        processed_at = SYSTIMESTAMP
    WHERE stage_id = v_success_stage_ids(i);
END IF;

IF v_error_count > 0 THEN
  FORALL i IN 1 .. v_error_count
    INSERT INTO app.order_bulk_error (
      stage_id, source_key, batch_id, attempt_no,
      error_code, error_message
    ) VALUES (
      v_error_stage_ids(i),
      v_error_source_keys(i),
      v_batch_id,
      v_error_attempts(i),
      v_error_codes(i),
      v_error_messages(i)
    );

  FORALL i IN 1 .. v_error_count
    UPDATE app.order_stage
    SET process_status = 'ERROR',
        error_code = v_error_codes(i),
        error_message = v_error_messages(i),
        batch_id = v_batch_id,
        processed_at = SYSTIMESTAMP
    WHERE stage_id = v_error_stage_ids(i);
END IF;

COMMIT;

エラーログだけを自律型トランザクションで先にコミットすると、主処理がロールバックしてもエラー記録だけが残ります。それが監査要件なら有効ですが、本記事の例ではチャンク結果との整合性を優先して同じトランザクションへ含めます。

実行可能なチャンク処理テンプレート

前述の要素を1つにまとめたテンプレートです。配列宣言を省略せず、検証、MERGE、例外情報退避、状態更新、コミットまでを一連の処理にしています。

process-order-stage.sql
DECLARE
  TYPE t_number_tab IS TABLE OF NUMBER INDEX BY PLS_INTEGER;
  TYPE t_varchar100_tab IS TABLE OF VARCHAR2(100) INDEX BY PLS_INTEGER;
  TYPE t_varchar1000_tab IS TABLE OF VARCHAR2(1000) INDEX BY PLS_INTEGER;

  v_stage_ids       t_number_tab;
  v_source_keys     t_varchar100_tab;
  v_customer_ids    t_number_tab;
  v_amounts         t_number_tab;
  v_attempts        t_number_tab;

  v_apply_stage_ids    t_number_tab;
  v_apply_source_keys  t_varchar100_tab;
  v_apply_customer_ids t_number_tab;
  v_apply_amounts      t_number_tab;
  v_apply_attempts     t_number_tab;

  v_validation_stage_ids t_number_tab;
  v_validation_source_keys t_varchar100_tab;
  v_validation_attempts    t_number_tab;
  v_validation_codes     t_number_tab;
  v_validation_messages  t_varchar1000_tab;

  v_error_indexes     t_number_tab;
  v_error_stage_ids   t_number_tab;
  v_error_source_keys t_varchar100_tab;
  v_error_attempts    t_number_tab;
  v_error_codes       t_number_tab;
  v_error_messages    t_varchar1000_tab;
  v_success_stage_ids t_number_tab;
  v_failed            t_number_tab;

  v_apply_count      PLS_INTEGER;
  v_validation_count PLS_INTEGER;
  v_error_count      PLS_INTEGER;
  v_success_count    PLS_INTEGER;
  v_index            PLS_INTEGER;
  v_batch_id         VARCHAR2(32) := RAWTOHEX(SYS_GUID());

  e_bulk_errors EXCEPTION;
  PRAGMA EXCEPTION_INIT(e_bulk_errors, -24381);

BEGIN
  LOOP
    SELECT stage_id, source_key, customer_id, amount, attempt_count
    BULK COLLECT INTO
      v_stage_ids, v_source_keys, v_customer_ids, v_amounts, v_attempts
    FROM (
      SELECT stage_id, source_key, customer_id, amount, attempt_count
      FROM app.order_stage
      WHERE process_status = 'READY'
      ORDER BY stage_id
    )
    WHERE ROWNUM <= 500;

    EXIT WHEN v_stage_ids.COUNT = 0;

    v_apply_stage_ids.DELETE;
    v_apply_source_keys.DELETE;
    v_apply_customer_ids.DELETE;
    v_apply_amounts.DELETE;
    v_apply_attempts.DELETE;
    v_validation_stage_ids.DELETE;
    v_validation_source_keys.DELETE;
    v_validation_attempts.DELETE;
    v_validation_codes.DELETE;
    v_validation_messages.DELETE;
    v_error_indexes.DELETE;
    v_error_stage_ids.DELETE;
    v_error_source_keys.DELETE;
    v_error_attempts.DELETE;
    v_error_codes.DELETE;
    v_error_messages.DELETE;
    v_success_stage_ids.DELETE;
    v_failed.DELETE;

    v_apply_count := 0;
    v_validation_count := 0;
    v_error_count := 0;
    v_success_count := 0;

    FORALL i IN 1 .. v_stage_ids.COUNT
      UPDATE app.order_stage
      SET process_status = 'PROCESSING',
          batch_id = v_batch_id,
          attempt_count = attempt_count + 1
      WHERE stage_id = v_stage_ids(i)
        AND process_status = 'READY';

    FOR i IN 1 .. v_stage_ids.COUNT LOOP
      IF v_amounts(i) IS NULL OR v_amounts(i) < 0 THEN
        v_validation_count := v_validation_count + 1;
        v_validation_stage_ids(v_validation_count) := v_stage_ids(i);
        v_validation_source_keys(v_validation_count) := v_source_keys(i);
        v_validation_attempts(v_validation_count) := v_attempts(i) + 1;
        v_validation_codes(v_validation_count) := -20001;
        v_validation_messages(v_validation_count) :=
          'amount must be zero or greater';
      ELSE
        v_apply_count := v_apply_count + 1;
        v_apply_stage_ids(v_apply_count) := v_stage_ids(i);
        v_apply_source_keys(v_apply_count) := v_source_keys(i);
        v_apply_customer_ids(v_apply_count) := v_customer_ids(i);
        v_apply_amounts(v_apply_count) := v_amounts(i);
        v_apply_attempts(v_apply_count) := v_attempts(i) + 1;
      END IF;
    END LOOP;

    IF v_validation_count > 0 THEN
      FORALL i IN 1 .. v_validation_count
        INSERT INTO app.order_bulk_error (
          stage_id, source_key, batch_id, attempt_no,
          error_code, error_message
        ) VALUES (
          v_validation_stage_ids(i),
          v_validation_source_keys(i),
          v_batch_id,
          v_validation_attempts(i),
          v_validation_codes(i),
          v_validation_messages(i)
        );

      FORALL i IN 1 .. v_validation_count
        UPDATE app.order_stage
        SET process_status = 'ERROR',
            error_code = v_validation_codes(i),
            error_message = v_validation_messages(i),
            batch_id = v_batch_id,
            processed_at = SYSTIMESTAMP
        WHERE stage_id = v_validation_stage_ids(i);
    END IF;

    IF v_apply_count > 0 THEN
      BEGIN
        FORALL i IN 1 .. v_apply_count SAVE EXCEPTIONS
          MERGE INTO app.orders dst
          USING (
            SELECT
              v_apply_source_keys(i) source_key,
              v_apply_customer_ids(i) customer_id,
              v_apply_amounts(i) amount
            FROM dual
          ) src
          ON (dst.source_key = src.source_key)
          WHEN MATCHED THEN
            UPDATE SET
              dst.customer_id = src.customer_id,
              dst.amount = src.amount,
              dst.updated_at = SYSTIMESTAMP
          WHEN NOT MATCHED THEN
            INSERT (
              order_id, source_key, customer_id, amount, updated_at
            )
            VALUES (
              app.orders_seq.NEXTVAL,
              src.source_key,
              src.customer_id,
              src.amount,
              SYSTIMESTAMP
            );
      EXCEPTION
        WHEN e_bulk_errors THEN
          FOR j IN 1 .. SQL%BULK_EXCEPTIONS.COUNT LOOP
            v_error_count := v_error_count + 1;
            v_error_indexes(v_error_count) :=
              SQL%BULK_EXCEPTIONS(j).ERROR_INDEX;
            v_error_codes(v_error_count) :=
              SQL%BULK_EXCEPTIONS(j).ERROR_CODE;
            v_failed(SQL%BULK_EXCEPTIONS(j).ERROR_INDEX) := 1;
          END LOOP;
      END;
    END IF;

    IF v_apply_count > 0 THEN
      FOR i IN 1 .. v_apply_count LOOP
        IF NOT v_failed.EXISTS(i) THEN
          v_success_count := v_success_count + 1;
          v_success_stage_ids(v_success_count) := v_apply_stage_ids(i);
        END IF;
      END LOOP;
    END IF;

    FOR j IN 1 .. v_error_count LOOP
      v_index := v_error_indexes(j);
      v_error_stage_ids(j) := v_apply_stage_ids(v_index);
      v_error_source_keys(j) := v_apply_source_keys(v_index);
      v_error_attempts(j) := v_apply_attempts(v_index);
      v_error_messages(j) := SQLERRM(-v_error_codes(j));
    END LOOP;

    IF v_success_count > 0 THEN
      FORALL i IN 1 .. v_success_count
        UPDATE app.order_stage
        SET process_status = 'DONE',
            error_code = NULL,
            error_message = NULL,
            batch_id = v_batch_id,
            processed_at = SYSTIMESTAMP
        WHERE stage_id = v_success_stage_ids(i);
    END IF;

    IF v_error_count > 0 THEN
      FORALL i IN 1 .. v_error_count
        INSERT INTO app.order_bulk_error (
          stage_id, source_key, batch_id, attempt_no,
          error_code, error_message
        ) VALUES (
          v_error_stage_ids(i),
          v_error_source_keys(i),
          v_batch_id,
          v_error_attempts(i),
          v_error_codes(i),
          v_error_messages(i)
        );

      FORALL i IN 1 .. v_error_count
        UPDATE app.order_stage
        SET process_status = 'ERROR',
            error_code = v_error_codes(i),
            error_message = v_error_messages(i),
            batch_id = v_batch_id,
            processed_at = SYSTIMESTAMP
        WHERE stage_id = v_error_stage_ids(i);
    END IF;

    COMMIT;
  END LOOP;
EXCEPTION
  WHEN OTHERS THEN
    ROLLBACK;
    RAISE;
END;
/

この例では、検証エラー行はMERGEへ渡さず、DMLエラー行はERRORへ、成功行だけはDONEへ更新します。チャンク内で想定外の例外が発生した場合は、そのチャンク全体をロールバックします。

エラーを修正して再実行する

ERROR行を無条件にREADYへ戻すのではなく、原因となった入力値を修正してから再投入します。対象表はsource_keyでMERGEされるため、すでに成功した行を誤って再投入しても別行にはなりません。

retry-error-rows.sql
UPDATE app.order_stage
SET amount = 3500,
    process_status = 'READY',
    error_code = NULL,
    error_message = NULL,
    batch_id = NULL,
    processed_at = NULL
WHERE stage_id = 102
  AND process_status = 'ERROR';

UPDATE app.order_stage
SET amount = 5000,
    process_status = 'READY',
    error_code = NULL,
    error_message = NULL,
    batch_id = NULL,
    processed_at = NULL
WHERE stage_id = 103
  AND process_status = 'ERROR';

COMMIT;

運用では、手作業のUPDATEより再実行用プロシージャを用意し、変更前後の値、操作者、再実行理由を監査ログへ残します。複数ワーカーで処理する場合はFOR UPDATE SKIP LOCKEDなどで対象行を確保し、PROCESSINGへ更新してから取得する設計が必要です。

実行結果を確認する

初回実行後は101と104がDONE、102と103がERRORになります。修正後に再実行すると4行すべてがDONEになり、102と103のattempt_countだけが2になります。エラーログには初回失敗時の履歴が残ります。

verify-bulk-retry-result.sql
SELECT
  stage_id,
  source_key,
  process_status,
  attempt_count,
  error_code,
  batch_id
FROM app.order_stage
ORDER BY stage_id;

SELECT
  source_key,
  customer_id,
  amount
FROM app.orders
ORDER BY source_key;

SELECT
  stage_id,
  source_key,
  batch_id,
  attempt_no,
  error_code,
  error_message
FROM app.order_bulk_error
ORDER BY logged_at, stage_id;
再実行後の期待結果

  • order_stageは4行すべてDONE
  • stage_id 101・104のattempt_countは1
  • stage_id 102・103のattempt_countは2
  • ordersはSRC-101からSRC-104まで各1行
  • order_bulk_errorには初回の検証エラーとDMLエラーが残る

コミット境界は件数ではなく再処理単位で決める

500件ごとにコミットするのは技術的な初期値であり、業務上の正解とは限りません。1つの請求、1ファイル、1取引単位を分割できない場合は、その単位をチャンク境界にします。

  • チャンク途中でコミットしない
  • 対象表とステージング状態を別コミットにしない
  • チェックポイントは最後に確定した業務キーまたはstage_idで記録する
  • 再開条件にはstage_id > :last_committed_idだけでなく状態も含める
  • 長時間トランザクションと業務上の原子性を両方測定する

トランザクション全体の設計はCOMMIT境界・ロック・再処理の設計パターン、MERGEの同時実行対策はMERGE文の競合対策で詳しく解説しています。

確認すべきテストケース

  • 全行成功し、すべてDONEになる
  • 先頭、中間、末尾の1行だけDMLエラーになる
  • 複数行が異なるエラーコードで失敗する
  • 検証済み行が0件でもFORALLを実行しない
  • 成功行だけが対象表へ反映される
  • 想定外エラーでチャンク全体がロールバックされる
  • 同じsource_keyを再実行しても対象行が増えない
  • ERRORを修正してREADYへ戻すとDONEまで進む
  • 2ワーカーが同時に起動しても同じ行を処理しない

よくある誤り

  • SAVE EXCEPTIONS後にすぐINSERTする:先にSQL%BULK_EXCEPTIONSを配列へ退避します。
  • 空のVALUES OFを実行する:選択用配列が空ならFORALL自体をスキップします。
  • 成功・失敗を分けず全行DONEにする:失敗フラグから成功配列を作ります。
  • シーケンス値だけで重複を防ぐ:送信元の一意な業務キーを対象表へ保存します。
  • ログを先にCOMMITする:求める監査要件を確認し、主処理との整合性を決めます。
  • LIMITを大きくすれば速いと考える:PGA、UNDO、ロック時間、再処理量を実測します。

まとめ

BULK COLLECTとFORALLを本番で安全に使うには、速度だけでなく部分失敗後の状態を設計する必要があります。入力を密なスカラー配列へ詰め替え、SAVE EXCEPTIONSをSQL実行前に退避し、成功行と失敗行を分けて状態更新します。

対象表へのDMLは業務キーで冪等にし、対象表、エラーログ、ステージング状態を同じチャンクトランザクションで確定します。この形なら、一部の行が失敗しても成功行を失わず、原因を修正した行だけを安全に再実行できます。