【PL/SQL】非同期処理設計の高度化:ジョブチェーンとイベント駆動制御

【PL/SQL】非同期処理設計の高度化:ジョブチェーンとイベント駆動制御 PL/SQL

非同期処理を単なるバッチの定刻実行として捉えると、ピーク負荷への追従や外部イベントの変化に対して鈍重になる。DBMS_SCHEDULERはチェーンとイベント駆動の二本柱により、処理を細かな段階へ分解し、条件や結果に応じて分岐させることができる。これにアプリ側の相関IDや標準ロガーを組み合わせれば、スループットと可観測性を両立した堅牢な非同期基盤を構築できる。本稿では、プログラム・チェーン・ルールを核にしたジョブオーケストレーションと、Advanced Queuing(AQ)をトリガにしたイベントベース実行、監視・再実行・スロットリングまでを実務の観点で解説する。

プログラムとジョブクラスで再利用と隔離を設計する

同一のPL/SQLブロックを複数スケジュールから呼ぶと、定義の重複や権限のばらつきが問題になる。DBMS_SCHEDULERのプログラムに処理本体を集約し、リソース制御や優先度はジョブクラスで分離する。サービス名を指定してワークロードを別プールへ逃がすと、OLTPとの干渉を減らせる。


BEGIN
  DBMS_SCHEDULER.create_program(
    program_name   => 'app.prg_import_orders',
    program_type   => 'PLSQL_BLOCK',
    program_action => q'[
      DECLARE
        v_cid VARCHAR2(64) := 'CID-' || TO_CHAR(SYSTIMESTAMP,'YYYYMMDDHH24MISSFF3');
      BEGIN
        DBMS_SESSION.set_identifier(v_cid);
        -- ここで実処理(例:外部表やステージングからの取り込み)
        app_pkg_import.run_batch; -- 例:処理本体は別パッケージへ
      EXCEPTION
        WHEN OTHERS THEN
          -- 例外はロガーへ記録して再送出
          pkg_logger.log_error(SQLERRM);
          RAISE;
      END;]',
    number_of_arguments => 0,
    enabled            => TRUE
  );

  DBMS_SCHEDULER.create_job_class(
    job_class_name => 'app.cls_batch_low',
    resource_consumer_group => 'BATCH_GROUP', -- 事前にDBMS_RESOURCE_MANAGERで定義
    service        => 'BATCH_SVC'             -- 事前にDBサービスとして設定
  );
END;
/

チェーンで段階実行と条件分岐をモデル化する

チェーンは一連の処理を「ステップ」と「ルール」に分解し、成功・失敗・完了などの状態に応じて次の遷移を定義する。前処理・本処理・後処理を分け、失敗時はロールバックやクリーンアップへ分岐させると、再実行時の安全性が増す。


BEGIN
  -- 1) チェーン定義
  DBMS_SCHEDULER.create_chain(chain_name => 'app.chn_nightly_import');

  -- 2) ステップ(プログラムまたは匿名PL/SQLを指定可能)
  DBMS_SCHEDULER.define_chain_step('app.chn_nightly_import','step_prepare','app.prg_prepare_env');
  DBMS_SCHEDULER.define_chain_step('app.chn_nightly_import','step_import','app.prg_import_orders');
  DBMS_SCHEDULER.define_chain_step('app.chn_nightly_import','step_validate','app.prg_validate');
  DBMS_SCHEDULER.define_chain_step('app.chn_nightly_import','step_finalize','app.prg_finalize');
  DBMS_SCHEDULER.define_chain_step('app.chn_nightly_import','step_cleanup' ,'app.prg_cleanup');

  -- 3) ルール(状態遷移)
  DBMS_SCHEDULER.define_chain_rule('app.chn_nightly_import',
    condition => 'TRUE',
    action    => 'START step_prepare',
    rule_name => 'r_start');

  DBMS_SCHEDULER.define_chain_rule('app.chn_nightly_import',
    condition => 'step_prepare COMPLETED',
    action    => 'START step_import',
    rule_name => 'r_to_import');

  DBMS_SCHEDULER.define_chain_rule('app.chn_nightly_import',
    condition => 'step_import SUCCEEDED',
    action    => 'START step_validate',
    rule_name => 'r_to_validate');

  DBMS_SCHEDULER.define_chain_rule('app.chn_nightly_import',
    condition => 'step_import FAILED OR step_validate FAILED',
    action    => 'START step_cleanup',
    rule_name => 'r_to_cleanup');

  DBMS_SCHEDULER.define_chain_rule('app.chn_nightly_import',
    condition => 'step_validate SUCCEEDED',
    action    => 'START step_finalize',
    rule_name => 'r_to_finalize');

  DBMS_SCHEDULER.define_chain_rule('app.chn_nightly_import',
    condition => 'step_finalize COMPLETED OR step_cleanup COMPLETED',
    action    => 'END',
    rule_name => 'r_end');

  DBMS_SCHEDULER.enable('app.chn_nightly_import');

  -- 4) チェーンを起動するジョブ
  DBMS_SCHEDULER.create_job(
    job_name      => 'app.job_nightly_import',
    job_type      => 'CHAIN',
    job_action    => 'app.chn_nightly_import',
    job_class     => 'app.cls_batch_low',
    start_date    => SYSTIMESTAMP,
    repeat_interval => 'FREQ=DAILY;BYHOUR=2;BYMINUTE=0;BYSECOND=0',
    auto_drop     => FALSE,
    enabled       => TRUE
  );
END;
/

リトライとバックオフはアプリ設計で制御する

スケジューラ自体に一般的な指数バックオフはないため、失敗時に専用ステップへ遷移し、アプリ側で待機と回数管理を行うと挙動を読みやすい。カウンタは制御テーブルに保持し、閾値超過でチェーンを終了させる。過剰なSLEEPは並列度を圧迫するため、待機は短くし、失敗を早く表面化させる方針が運用上安全である。


-- 例:step_importの内部で試行回数を参照し短時間の待機を実施(簡略化)
DECLARE
  v_retry NUMBER := app_ctl.get_retry('IMPORT'); -- 例:制御テーブル
BEGIN
  BEGIN
    app_pkg_import.run_batch;
    app_ctl.clear_retry('IMPORT');
  EXCEPTION
    WHEN OTHERS THEN
      IF v_retry < 3 THEN
        app_ctl.inc_retry('IMPORT');
        DBMS_LOCK.sleep(10 * v_retry); -- 緩いバックオフ
        RAISE_APPLICATION_ERROR(-20001,'RETRY'); -- 失敗としてチェーンルールへ
      ELSE
        RAISE;
      END IF;
  END;
END;

AQとイベントスケジュールで外部トリガを受ける

イベント駆動では、AQキューに投入されたメッセージを条件式でフィルタし、合致時にジョブを自動起動する。イベントスケジュールをオブジェクトとして作成し、ジョブへ割り当てれば、ポーリングなしで低遅延に反応できる。


-- 1) ペイロード型とキュー
CREATE TYPE app_event_typ AS OBJECT(
  event_name    VARCHAR2(64),
  correlation   VARCHAR2(64),
  object_schema VARCHAR2(30),
  object_name   VARCHAR2(128),
  payload_json  CLOB
);
/
BEGIN
  DBMS_AQADM.create_queue_table(
    queue_table        => 'app.event_qtab',
    multiple_consumers => TRUE,
    queue_payload_type => 'APP_EVENT_TYP');
  DBMS_AQADM.create_queue(queue_name => 'app.event_queue', queue_table => 'app.event_qtab');
  DBMS_AQADM.start_queue(queue_name => 'app.event_queue');
END;
/

-- 2) イベントスケジュール(条件はtab.user_dataから参照)
BEGIN
  DBMS_SCHEDULER.add_event_schedule(
    schedule_name   => 'app.es_file_ready',
    start_date      => SYSTIMESTAMP,
    event_condition => q'[tab.user_data.event_name = 'FILE_READY' AND tab.user_data.object_name LIKE 'orders_%']',
    queue_spec      => 'app.event_queue'
  );
END;
/

-- 3) 反応するジョブ(イベント毎に起動)
BEGIN
  DBMS_SCHEDULER.create_job(
    job_name      => 'app.job_import_on_file',
    job_type      => 'PLSQL_BLOCK',
    job_action    => q'[
      DECLARE
        v_evt app_event_typ := SYS.AQ$_JMS_TEXT_MESSAGE(NULL); -- ダミー初期化回避のための型注意は実装側で調整
      BEGIN
        DBMS_SESSION.set_identifier(SYS_CONTEXT('USERENV','CLIENT_IDENTIFIER'));
        -- 実際はSYS.AQ$_*_ ヘッダ経由で属性を取得し、payloadを解析して分岐
        app_pkg_import.run_batch;
      EXCEPTION
        WHEN OTHERS THEN
          pkg_logger.log_error(SQLERRM);
          RAISE;
      END;]',
    schedule_name  => 'app.es_file_ready',
    job_class      => 'app.cls_batch_low',
    enabled        => TRUE,
    auto_drop      => FALSE
  );
END;
/

イベント発火側の設計と相関の受け渡し

アプリや外部連携が条件成立時にキューへメッセージを投入する。相関IDをCLIENT IDENTIFIERへ写しておくと、起動したジョブのログや監査を縦断的に追える。


DECLARE
  enqopt    DBMS_AQ.enqueue_options_t;
  msgprop   DBMS_AQ.message_properties_t;
  msgid     RAW(16);
  l_payload app_event_typ := app_event_typ(
                   event_name    => 'FILE_READY',
                   correlation   => 'CID-'||TO_CHAR(SYSTIMESTAMP,'YYYYMMDDHH24MISSFF3'),
                   object_schema => 'STAGE',
                   object_name   => 'orders_20251009.csv',
                   payload_json  => '{"bucket":"ingest","key":"orders_20251009.csv"}');
BEGIN
  DBMS_SESSION.set_identifier(l_payload.correlation);
  DBMS_AQ.enqueue(
    queue_name         => 'app.event_queue',
    enqueue_options    => enqopt,
    message_properties => msgprop,
    payload            => l_payload,
    msgid              => msgid);
  COMMIT;
END;
/

監視・可観測性・手動再実行の整備

運用では「いま何が走っているか」「どこで止まったか」「再試行は可能か」の三点が重要になる。ビューで直近の実行とエラーを拾い、相関IDでアプリログと突合し、必要時はチェーン単位で再始動する。中間結果が重複処理に耐えるよう、各ステップは冪等に設計しておく。


-- 実行状況と最後のエラー
SELECT job_name, status, actual_start_date, run_duration, additional_info
  FROM dba_scheduler_job_run_details
 WHERE log_date > SYSTIMESTAMP - INTERVAL '1' DAY
 ORDER BY log_date DESC;

-- チェーン内の最新ステップ状態
SELECT job_name, chain_step, step_status, error_code, completion_time
  FROM dba_scheduler_running_chains;

-- 次回起動予定
SELECT job_name, state, next_run_date
  FROM dba_scheduler_jobs
 WHERE enabled = 'TRUE';

-- 停止したチェーンを最初からやり直す(必要に応じて個別ステップ開始も可)
BEGIN
  DBMS_SCHEDULER.stop_job('app.job_nightly_import', force => TRUE);
  DBMS_SCHEDULER.run_chain('app.chn_nightly_import', 'app.r_start'); -- 先頭ルール名で起動
END;
/

スロットリングと並列度の制御

同時実行数の上限や時間帯の制御はジョブクラスとウィンドウで行う。業務時間帯は消極的な並列度、夜間は積極的な並列度に切り替えると安定する。プログラム側では細粒度ロックやアドバイザリロックで衝突を避け、同一データの二重処理を防ぐ。


BEGIN
  DBMS_SCHEDULER.set_attribute('app.cls_batch_low','logging_level',DBMS_SCHEDULER.LOGGING_RUNS);
  -- ウィンドウの例(詳細なRESOURCE MANAGER設定は省略)
  DBMS_SCHEDULER.create_window(
    window_name   => 'app.wdw_night',
    resource_plan => 'APP_BATCH_PLAN',
    repeat_interval => 'FREQ=DAILY;BYHOUR=1;BYMINUTE=0;BYSECOND=0;DURATION=PT6H',
    enabled       => TRUE);
END;
/

テスト戦略とリリースの要点

チェーンは単体で起動できるため、各ステップの正常・異常パスを個別に再現し、ルールが意図通りに遷移するかを検証する。イベント駆動はステージングでAQを使って実際にメッセージを投入し、フィルタ条件と起動の相関IDがログへ通っているかを確認する。スキーマ移送ではチェーン・プログラム・スケジュール・イベントスケジュール・キューの依存順を崩さないこと、権限とサービスの事前調整を怠らないことが安定稼働の鍵となる。

まとめ

DBMS_SCHEDULERのチェーンは段階実行と分岐をモデル化し、AQ連携のイベントスケジュールは外部変化に即応する。ジョブクラス・サービス・ウィンドウでリソースを隔離し、相関IDと標準ロガーで実行経路を観測可能にすれば、非同期処理は失敗に強く、運用しやすい形へ収斂する。冪等性・明示的なルール・最小限の待機という原則を守りつつ、段階的に高度化していくことが、大規模PL/SQLにおける非同期設計の最短距離である。