【PL/SQL】BULK COLLECTとFORALLによる一括処理パターンの最適化

【PL/SQL】BULK COLLECTとFORALLによる一括処理パターンの最適化 PL/SQL

逐次処理で1行ずつFETCHしてDMLを発行する構造は、SQLエンジンとの往復とコンテキストスイッチが支配的になりスループットが頭打ちになる。PL/SQLはBULK COLLECTとFORALLを併用することで、抽出・検証・適用を「行の塊」によって運ぶ一括処理へ転換できる。ただし無闇に配列化するとPGAを圧迫し、例外発生時の再実行や補償が不安定になるため、LIMIT、SAVE EXCEPTIONS、稀疎配列対応、コミット計画を含むパターンとして設計することが重要である。本稿では、コレクション型の選択、フェッチと適用の最適化、例外収集、RETURNINGによるキー回収、動的SQLとの併用、冪等な再実行設計までをテンプレートコードで示す。

コレクション型の選択と前提整理

一括処理の配列は、稀疎インデックスが扱える連想配列(INDEX BY PLS_INTEGER)、SQL層との橋渡しに向くネスト表(TABLE OF)、要素数固定のVARRAYという選択肢がある。FORALLは連想配列・ネスト表のいずれにも対応し、INDICES OFやVALUES OFで稀疎配列の実要素を走査できる。抽出側(BULK COLLECT)は連想配列でもネスト表でも取得可能だが、TABLE()化してSQLに戻す必要がある場面ではネスト表が有利になる。

BULK COLLECT/LIMITでフェッチを塊化する

フェッチの塊サイズはPGAと後段処理時間のバランスで決め、配列件数でループ終端を判定する。以下は与信超過データを抽出して一括で検証・更新する基本骨格である。


DECLARE
  TYPE t_row IS RECORD(id NUMBER, cust NUMBER, amt NUMBER, limit_amt NUMBER);
  TYPE t_tab IS TABLE OF t_row INDEX BY PLS_INTEGER;
  v_rows  t_tab;
  v_lim   PLS_INTEGER := 1000;
  CURSOR c IS
    SELECT o.order_id, o.customer_id, o.amount, c.credit_limit
      FROM app.orders o
      JOIN app.customers c ON c.customer_id = o.customer_id
     WHERE o.status = 'NEW'
     ORDER BY o.order_id;
BEGIN
  OPEN c;
  LOOP
    FETCH c BULK COLLECT INTO v_rows LIMIT v_lim;
    EXIT WHEN v_rows.COUNT = 0;

    FOR i IN 1..v_rows.COUNT LOOP
      IF v_rows(i).amt > v_rows(i).limit_amt THEN
        NULL; -- 必要な検証・付帯情報の生成
      END IF;
    END LOOP;

    FORALL i IN 1..v_rows.COUNT
      UPDATE app.orders
         SET status = CASE WHEN amt > limit_amt THEN 'ON_HOLD' ELSE 'OK' END
       WHERE order_id = v_rows(i).id;

    COMMIT; -- チャンク終端でコミット
  END LOOP;
  CLOSE c;
END;
/

FORALLのSAVE EXCEPTIONSで部分継続する

行中の一部が一意制約や参照整合性で失敗しても、全体を巻き戻さずに継続するにはSAVE EXCEPTIONSを使う。失敗行はSQL%BULK_EXCEPTIONSへ集約されるため、補償キューへ送るなどの後処理へ回す。


DECLARE
  TYPE t_id_tab IS TABLE OF NUMBER INDEX BY PLS_INTEGER;
  v_ids t_id_tab := t_id_tab(101,102,103,104);
  dml_errors EXCEPTION;
  PRAGMA EXCEPTION_INIT(dml_errors, -24381);
BEGIN
  FORALL i IN 1..v_ids.COUNT SAVE EXCEPTIONS
    DELETE FROM app.work_queue WHERE job_id = v_ids(i);

EXCEPTION
  WHEN dml_errors THEN
    FOR j IN 1..SQL%BULK_EXCEPTIONS.COUNT LOOP
      INSERT INTO app.comp_queue(job_id, err_code, err_index)
      VALUES (v_ids(SQL%BULK_EXCEPTIONS(j).ERROR_INDEX),
              SQL%BULK_EXCEPTIONS(j).ERROR_CODE,
              SQL%BULK_EXCEPTIONS(j).ERROR_INDEX);
    END LOOP;
    COMMIT;
END;
/

稀疎配列を確実に走査するINDICES OF/VALUES OF

検証でNGと判定した要素を除外してDMLしたい場合、連番1..COUNTでは欠番を踏む。INDICES OFは存在するインデックスのみ、VALUES OFは別配列で与えたキー集合に一致する要素のみをDML対象にできる。


DECLARE
  TYPE t_rec IS RECORD(id NUMBER, amt NUMBER);
  TYPE t_tab IS TABLE OF t_rec INDEX BY PLS_INTEGER;
  TYPE t_idx IS TABLE OF PLS_INTEGER INDEX BY PLS_INTEGER;

  v_rows t_tab;
  v_ok   t_idx;  -- DML対象の行番号だけを格納

BEGIN
  -- v_rowsへBULK COLLECT済みとする
  FOR i IN 1..v_rows.COUNT LOOP
    IF v_rows.EXISTS(i) AND v_rows(i).amt >= 0 THEN
      v_ok(v_ok.COUNT+1) := i;
    END IF;
  END LOOP;

  FORALL j IN VALUES OF v_ok
    INSERT INTO app.fact(id, amount) VALUES (v_rows(j).id, v_rows(j).amt);
  COMMIT;
END;
/

RETURNING BULK COLLECTで生成キーを一括回収する

INSERTやUPDATEで生成したIDや影響行のキーをまとめて回収し、後段のログや連携へ引き渡す。RETURNINGはFORALLでも配列出力できる。


DECLARE
  TYPE t_num_tab IS TABLE OF NUMBER INDEX BY PLS_INTEGER;
  TYPE t_cust   IS TABLE OF NUMBER INDEX BY PLS_INTEGER;
  TYPE t_amt    IS TABLE OF NUMBER INDEX BY PLS_INTEGER;

  v_cust t_cust; v_amt t_amt; v_ids t_num_tab;
BEGIN
  -- 入力配列の構築は省略
  FORALL i IN INDICES OF v_cust
    INSERT INTO app.orders(order_id, customer_id, amount)
    VALUES (orders_seq.NEXTVAL, v_cust(i), v_amt(i))
    RETURNING order_id BULK COLLECT INTO v_ids;

  -- 生成IDの利用例
  FOR i IN v_ids.FIRST..v_ids.LAST LOOP
    IF v_ids.EXISTS(i) THEN
      NULL; -- ログや後続キューへ
    END IF;
  END LOOP;
  COMMIT;
END;
/

動的SQL×FORALLで可変DMLを塊化する

FORALLの本体は動的DMLでもよく、EXECUTE IMMEDIATEにUSINGで配列要素を渡せる。バインド順はプレースホルダ順に合わせる。DDLはFORALL対象外のため別管理に分離する。


DECLARE
  TYPE t_id_tab IS TABLE OF NUMBER INDEX BY PLS_INTEGER;
  TYPE t_amt_tab IS TABLE OF NUMBER INDEX BY PLS_INTEGER;

  v_ids t_id_tab; v_amt t_amt_tab;
  v_sql VARCHAR2(200) := 'UPDATE app.orders SET amount = :1 WHERE order_id = :2';
BEGIN
  -- v_ids, v_amt は同じインデックスで整列済みとする
  FORALL i IN INDICES OF v_ids
    EXECUTE IMMEDIATE v_sql USING v_amt(i), v_ids(i);
  COMMIT;
END;
/

抽出→検証→適用→補償の三相テンプレート

一括処理の標準形は、抽出をBULK/LIMITで分割し、検証でNGを稀疎化し、適用をFORALLで送信し、失敗はSAVE EXCEPTIONSないしエラーログ表へ退避する。コミットはチャンク終端に限定する。


DECLARE
  TYPE t_rec IS RECORD(id NUMBER, amt NUMBER);
  TYPE t_tab IS TABLE OF t_rec INDEX BY PLS_INTEGER;
  TYPE t_idx IS TABLE OF PLS_INTEGER INDEX BY PLS_INTEGER;

  v_rows t_tab;
  v_ok   t_idx;
  v_lim  PLS_INTEGER := 2000;
  CURSOR c IS SELECT id, amt FROM app.staging WHERE status = 'READY' ORDER BY id;

  dml_errors EXCEPTION;
  PRAGMA EXCEPTION_INIT(dml_errors, -24381);
BEGIN
  OPEN c;
  LOOP
    FETCH c BULK COLLECT INTO v_rows LIMIT v_lim;
    EXIT WHEN v_rows.COUNT = 0;

    v_ok.DELETE;
    FOR i IN 1..v_rows.COUNT LOOP
      IF v_rows.EXISTS(i) AND v_rows(i).amt >= 0 THEN
        v_ok(v_ok.COUNT+1) := i;
      END IF;
    END LOOP;

    BEGIN
      FORALL j IN VALUES OF v_ok SAVE EXCEPTIONS
        INSERT INTO app.fact(id, amount) VALUES (v_rows(j).id, v_rows(j).amt);

      FORALL j IN VALUES OF v_ok
        UPDATE app.staging SET status = 'DONE' WHERE id = v_rows(j).id;

      COMMIT;
    EXCEPTION
      WHEN dml_errors THEN
        FOR k IN 1..SQL%BULK_EXCEPTIONS.COUNT LOOP
          INSERT INTO app.comp_queue(id, reason, err_code)
          VALUES (v_rows(SQL%BULK_EXCEPTIONS(k).ERROR_INDEX).id,
                  'DML_ERROR',
                  SQL%BULK_EXCEPTIONS(k).ERROR_CODE);
        END LOOP;
        COMMIT;
      WHEN OTHERS THEN
        ROLLBACK;
        RAISE;
    END;
  END LOOP;
  CLOSE c;
END;
/

ステージング×MERGEの併用とI/O削減

FORALLでの逐次UPDATEより、差分抽出済みステージングからのMERGEの方がロックとUNDOを抑えやすい。大量適用ではネスト表や外部表からステージングへAPPEND→MERGEが安定する。


INSERT /*+ APPEND */ INTO app.stage_orders(order_id, customer_id, amount, upd_ts)
SELECT order_id, customer_id, amount, upd_ts FROM app.stage_orders_src;
COMMIT;

MERGE INTO app.orders d
USING (
  SELECT s.*
    FROM app.stage_orders s
    LEFT JOIN app.orders d ON d.order_id = s.order_id
   WHERE d.order_id IS NULL
      OR d.customer_id != s.customer_id
      OR d.amount      != s.amount
) s
ON (d.order_id = s.order_id)
WHEN MATCHED THEN
  UPDATE SET d.customer_id = s.customer_id, d.amount = s.amount, d.updated_at = s.upd_ts
WHEN NOT MATCHED THEN
  INSERT (order_id, customer_id, amount, created_at, updated_at)
  VALUES (s.order_id, s.customer_id, s.amount, s.upd_ts, s.upd_ts);

PGAとUNDO、ロック時間を同時に最適化する指針

LIMITを大きくすると往復は減るがPGAとロック時間が伸びる。検証やJOINを含む重い後段処理では500~2000程度から開始し、経過時間、redo/undo、同時実行の競合を観測しつつ調整する。コミットはチャンク終端に揃え、再実行の冪等性を保つ。SKIP LOCKEDで取り合いを避け、行レベルでの条件(現状値チェック)を更新側のWHEREに残すと踏み潰しを防げる。

可観測性:実行文脈と配列統計の記録

DBMS_APPLICATION_INFOでMODULE/ACTIONを固定し、相関IDをCLIENT IDENTIFIERへ設定しておくと、失敗時にv$セッション、トレース、アプリログの突合が容易になる。バルク処理では配列件数、成功件数、失敗件数、経過時間をチャンク単位で記録し、閾値超過でLIMITを自動調整する仕組みを用意すると運用が安定する。

よくある落とし穴と対策

BULK COLLECTの取り過ぎでPGAが膨らみ、ガベージが遅れてスパイクする事象は珍しくない。配列を長生きさせず、チャンクごとにDELETEして再利用する。FORALLの内部で関数呼び出しはできないため、式はあらかじめ配列側で評価しておく。RETURNINGの配列は事前にDELETEしてから受け取り、OUT側のスロットとインデックスの対応を誤らない。動的SQLでは型一致を徹底し、NLSによる暗黙変換を排除する。

まとめ

BULK COLLECTとFORALLは、抽出と適用の単位を「行」から「塊」へ引き上げ、往復とパースを削減する強力な手段である。LIMITでPGAとロック時間の妥協点を作り、SAVE EXCEPTIONSで部分継続し、INDICES OF/VALUES OFで稀疎配列を正しく捉える。RETURNINGで生成キーを一括回収し、重い適用はMERGEやステージングと組み合わせてI/Oを減らす。相関IDと実行文脈の記録、チャンク終端コミット、冪等な再実行という運用三点を守れば、大規模処理でも一括パターンを安全かつ高速に標準化できる。