【PL/SQL】非同期処理設計の高度化:ジョブチェーンとイベント駆動制御

【PL/SQL】非同期処理設計の高度化:ジョブチェーンとイベント駆動制御 PL/SQL

Oracleで非同期処理を設計するとき、定刻に1本のバッチを起動するだけでは、前処理・本処理・検証・後処理の分岐や、外部イベントへの即時反応を表現しにくくなります。DBMS_SCHEDULER のチェーンとOracle Advanced Queuing(AQ)を組み合わせると、段階実行とイベント駆動をデータベース内で管理できます。

ただし、チェーンの開始ステップ、イベントメッセージの受け取り方、監視ビューの列、再実行方法を誤ると、コード例がそのまま動きません。この記事では、名前付きプログラム、EVENT_MESSAGE メタデータ引数、正しい RUN_CHAIN、実行中チェーン監視、冪等な受信箱まで、実行可能性を重視して整理します。

先に結論

  • 処理本体は名前付きプログラムへ分離し、チェーンはステップと遷移だけを管理します。
  • AQイベントの内容をジョブへ渡すには、STORED_PROCEDUREプログラムと DEFINE_METADATA_ARGUMENT(..., 'EVENT_MESSAGE', ...) を使います。
  • RUN_CHAINstart_steps にはルール名ではなくステップ名を渡します。
  • イベントジョブの実行中に届いたイベントが次の起動にならない場合があるため、受信処理は短くし、受信箱へ永続化します。
  • 長いバックオフでSchedulerワーカーを SLEEP させず、次回実行時刻を持つキューや一回限りのジョブへ逃がします。

Schedulerの基本操作は DBMS_SCHEDULER完全ガイド、実行履歴と異常検知は ジョブ実行履歴の集中管理と異常検知、例外と再試行は 例外設計と再試行パターン と合わせて読むと整理しやすくなります。

スポンサーリンク

プログラム・チェーン・ジョブの責務を分ける

Schedulerでは、処理本体をプログラム、処理順序をチェーン、実行日時やジョブクラスをジョブへ分けると再利用しやすくなります。ジョブクラスやサービスは、OLTP処理とバッチ処理のリソースを分離する目的で使います。

権限と事前設定

ジョブクラスの作成には通常 MANAGE SCHEDULER が必要です。ジョブクラスとウィンドウはSYSスキーマで管理されるグローバルなSchedulerオブジェクトなので、作成・参照時の名前にアプリケーションスキーマを付けません。DBAが作成したジョブクラスをAPPから使う場合は、APPへそのジョブクラスの EXECUTE 権限を付与します。Resource ManagerのコンシューマグループやDBサービスも事前にDBAが作成します。アプリケーションスキーマへ強い権限を常時付与しない運用が安全です。

create-programs-and-job-class.sql
BEGIN
  DBMS_SCHEDULER.CREATE_PROGRAM(
    program_name        => 'APP.PRG_PREPARE_ENV',
    program_type        => 'STORED_PROCEDURE',
    program_action      => 'APP_PKG_IMPORT.PREPARE_ENV',
    number_of_arguments => 0,
    enabled             => TRUE
  );

  DBMS_SCHEDULER.CREATE_PROGRAM(
    program_name        => 'APP.PRG_IMPORT_ORDERS',
    program_type        => 'STORED_PROCEDURE',
    program_action      => 'APP_PKG_IMPORT.RUN_BATCH',
    number_of_arguments => 0,
    enabled             => TRUE
  );

  DBMS_SCHEDULER.CREATE_PROGRAM(
    program_name        => 'APP.PRG_VALIDATE',
    program_type        => 'STORED_PROCEDURE',
    program_action      => 'APP_PKG_IMPORT.VALIDATE_RESULT',
    number_of_arguments => 0,
    enabled             => TRUE
  );

  DBMS_SCHEDULER.CREATE_PROGRAM(
    program_name        => 'APP.PRG_FINALIZE',
    program_type        => 'STORED_PROCEDURE',
    program_action      => 'APP_PKG_IMPORT.FINALIZE_IMPORT',
    number_of_arguments => 0,
    enabled             => TRUE
  );

  DBMS_SCHEDULER.CREATE_PROGRAM(
    program_name        => 'APP.PRG_CLEANUP',
    program_type        => 'STORED_PROCEDURE',
    program_action      => 'APP_PKG_IMPORT.CLEANUP_FAILED_RUN',
    number_of_arguments => 0,
    enabled             => TRUE
  );
END;
/

-- DBA管理下で作る例。BATCH_GROUPとBATCH_SVCは事前作成が必要。
BEGIN
  DBMS_SCHEDULER.CREATE_JOB_CLASS(
    job_class_name         => 'CLS_BATCH_LOW',
    resource_consumer_group => 'BATCH_GROUP',
    service                => 'BATCH_SVC',
    logging_level          => DBMS_SCHEDULER.LOGGING_RUNS
  );
END;
/

GRANT EXECUTE ON SYS.CLS_BATCH_LOW TO APP;

チェーンの分岐を正しい状態条件で定義する

チェーンは、各ステップの状態に応じて次の処理を開始します。成功時と失敗時の両方を明示し、最後に必ず END へ到達するルールを作ります。

create-nightly-import-chain.sql
BEGIN
  DBMS_SCHEDULER.CREATE_CHAIN(
    chain_name => 'APP.CHN_NIGHTLY_IMPORT'
  );

  DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
    'APP.CHN_NIGHTLY_IMPORT', 'STEP_PREPARE', 'APP.PRG_PREPARE_ENV');
  DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
    'APP.CHN_NIGHTLY_IMPORT', 'STEP_IMPORT', 'APP.PRG_IMPORT_ORDERS');
  DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
    'APP.CHN_NIGHTLY_IMPORT', 'STEP_VALIDATE', 'APP.PRG_VALIDATE');
  DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
    'APP.CHN_NIGHTLY_IMPORT', 'STEP_FINALIZE', 'APP.PRG_FINALIZE');
  DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
    'APP.CHN_NIGHTLY_IMPORT', 'STEP_CLEANUP', 'APP.PRG_CLEANUP');

  DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
    chain_name => 'APP.CHN_NIGHTLY_IMPORT',
    condition  => 'TRUE',
    action     => 'START STEP_PREPARE',
    rule_name  => 'R_START'
  );

  DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
    chain_name => 'APP.CHN_NIGHTLY_IMPORT',
    condition  => ':STEP_PREPARE.STATE = ''SUCCEEDED''',
    action     => 'START STEP_IMPORT',
    rule_name  => 'R_PREPARE_OK'
  );

  DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
    chain_name => 'APP.CHN_NIGHTLY_IMPORT',
    condition  => ':STEP_IMPORT.STATE = ''SUCCEEDED''',
    action     => 'START STEP_VALIDATE',
    rule_name  => 'R_IMPORT_OK'
  );

  DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
    chain_name => 'APP.CHN_NIGHTLY_IMPORT',
    condition  => ':STEP_VALIDATE.STATE = ''SUCCEEDED''',
    action     => 'START STEP_FINALIZE',
    rule_name  => 'R_VALIDATE_OK'
  );

  DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
    chain_name => 'APP.CHN_NIGHTLY_IMPORT',
    condition  => ':STEP_PREPARE.STATE = ''FAILED'' OR
                   :STEP_IMPORT.STATE = ''FAILED'' OR
                   :STEP_VALIDATE.STATE = ''FAILED'' OR
                   :STEP_FINALIZE.STATE = ''FAILED''',
    action     => 'START STEP_CLEANUP',
    rule_name  => 'R_FAILURE_CLEANUP'
  );

  DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
    chain_name => 'APP.CHN_NIGHTLY_IMPORT',
    condition  => ':STEP_FINALIZE.STATE = ''SUCCEEDED'' OR
                   :STEP_CLEANUP.COMPLETED = ''TRUE''',
    action     => 'END',
    rule_name  => 'R_END'
  );

  DBMS_SCHEDULER.ENABLE('APP.CHN_NIGHTLY_IMPORT');
END;
/

ステップ状態は SUCCEEDEDFAILEDSTOPPED などで判定できます。クリーンアップが失敗する可能性もあるため、運用要件によってはクリーンアップ失敗時の終了コードや通知ルールも追加します。

チェーンジョブを作成する

schedule-chain-job.sql
BEGIN
  DBMS_SCHEDULER.CREATE_JOB(
    job_name        => 'APP.JOB_NIGHTLY_IMPORT',
    job_type        => 'CHAIN',
    job_action      => 'APP.CHN_NIGHTLY_IMPORT',
    job_class       => 'CLS_BATCH_LOW',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=DAILY;BYHOUR=2;BYMINUTE=0;BYSECOND=0',
    auto_drop       => FALSE,
    enabled         => TRUE
  );
END;
/

長いSLEEPを使わずに再試行する

DBMS_LOCK.SLEEP は権限が必要な環境があり、待機中もSchedulerワーカーを占有します。数秒の同期調整を除き、長い指数バックオフには向きません。失敗した処理は、次回実行時刻を持つ再試行テーブルへ記録し、別の短時間ジョブが期限到来分を処理する構成にすると安定します。

retry-queue-table.sql
CREATE TABLE async_retry_queue (
  retry_id       NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
  task_type      VARCHAR2(50) NOT NULL,
  business_key   VARCHAR2(128) NOT NULL,
  correlation_id VARCHAR2(64) NOT NULL,
  attempt_count  NUMBER DEFAULT 0 NOT NULL,
  next_run_at    TIMESTAMP WITH TIME ZONE NOT NULL,
  status         VARCHAR2(20) DEFAULT 'WAITING' NOT NULL,
  last_error     VARCHAR2(4000),
  created_at     TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL,
  CONSTRAINT uq_async_retry UNIQUE(task_type, business_key, correlation_id)
);

CREATE INDEX ix_async_retry_due
  ON async_retry_queue(status, next_run_at);

-- 失敗時に次回実行時刻を計算して登録
MERGE INTO async_retry_queue q
USING (
  SELECT
    'IMPORT_ORDER' AS task_type,
    :business_key AS business_key,
    :correlation_id AS correlation_id
  FROM dual
) s
ON (
  q.task_type = s.task_type
  AND q.business_key = s.business_key
  AND q.correlation_id = s.correlation_id
)
WHEN MATCHED THEN UPDATE SET
  q.attempt_count = q.attempt_count + 1,
  q.next_run_at = SYSTIMESTAMP
                  + NUMTODSINTERVAL(POWER(2, LEAST(q.attempt_count + 1, 6)), 'MINUTE'),
  q.last_error = SUBSTR(:error_message, 1, 4000),
  q.status = 'WAITING'
WHEN NOT MATCHED THEN INSERT(
  task_type, business_key, correlation_id,
  attempt_count, next_run_at, last_error
) VALUES(
  s.task_type, s.business_key, s.correlation_id,
  1, SYSTIMESTAMP + INTERVAL '2' MINUTE, SUBSTR(:error_message, 1, 4000)
);

再試行では、同じ業務キーを複数回処理しても結果が壊れない冪等性が必要です。トランザクション競合や再試行設計は トランザクション分離レベル別の一貫性テスト も参考になります。

AQのイベント型と受信箱を作る

イベント起動ジョブは短く保ちます。イベントを受け取ったら、業務処理をその場で長時間実行せず、まず受信箱へ冪等に保存します。別のワーカーが受信箱を処理すれば、同時実行中に次のイベントが届く場合や再送にも対応しやすくなります。

create-aq-event-and-inbox.sql
CREATE TYPE app_event_typ AS OBJECT(
  event_id       VARCHAR2(64),
  event_name     VARCHAR2(64),
  correlation_id VARCHAR2(64),
  object_schema  VARCHAR2(128),
  object_name    VARCHAR2(128),
  payload_json   CLOB
);
/

CREATE TABLE event_inbox (
  event_id       VARCHAR2(64) PRIMARY KEY,
  event_name     VARCHAR2(64) NOT NULL,
  correlation_id VARCHAR2(64),
  object_schema  VARCHAR2(128),
  object_name    VARCHAR2(128),
  payload_json   CLOB CHECK (payload_json IS JSON),
  status         VARCHAR2(20) DEFAULT 'NEW' NOT NULL,
  received_at    TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL,
  processed_at   TIMESTAMP WITH TIME ZONE
);

CREATE INDEX ix_event_inbox_status
  ON event_inbox(status, received_at);

BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table        => 'APP.EVENT_QTAB',
    multiple_consumers => TRUE,
    queue_payload_type => 'APP_EVENT_TYP'
  );

  DBMS_AQADM.CREATE_QUEUE(
    queue_name  => 'APP.EVENT_QUEUE',
    queue_table => 'APP.EVENT_QTAB'
  );

  DBMS_AQADM.START_QUEUE(
    queue_name => 'APP.EVENT_QUEUE'
  );
END;
/

EVENT_MESSAGEを受け取る名前付きプログラムを作る

AQイベントのペイロードをジョブへ渡すには、イベント型を引数に持つストアドプロシージャを作り、名前付きプログラムの引数を EVENT_MESSAGE として定義します。匿名PL/SQLブロックへダミー型を代入する方法では受け取れません。

receive-event-message.sql
CREATE OR REPLACE PROCEDURE app.receive_file_ready_event(
  p_event IN app_event_typ
) AUTHID DEFINER IS
BEGIN
  DBMS_SESSION.SET_IDENTIFIER(p_event.correlation_id);
  DBMS_APPLICATION_INFO.SET_MODULE('AQ_EVENT', p_event.event_name);

  -- event_idを主キーにして重複イベントを無害化する
  MERGE INTO event_inbox i
  USING (
    SELECT
      p_event.event_id AS event_id,
      p_event.event_name AS event_name,
      p_event.correlation_id AS correlation_id,
      p_event.object_schema AS object_schema,
      p_event.object_name AS object_name,
      p_event.payload_json AS payload_json
    FROM dual
  ) s
  ON (i.event_id = s.event_id)
  WHEN NOT MATCHED THEN INSERT(
    event_id, event_name, correlation_id,
    object_schema, object_name, payload_json
  ) VALUES(
    s.event_id, s.event_name, s.correlation_id,
    s.object_schema, s.object_name, s.payload_json
  );

  COMMIT;
EXCEPTION
  WHEN OTHERS THEN
    ROLLBACK;
    RAISE;
END;
/

BEGIN
  DBMS_SCHEDULER.CREATE_PROGRAM(
    program_name        => 'APP.PRG_RECEIVE_FILE_EVENT',
    program_type        => 'STORED_PROCEDURE',
    program_action      => 'APP.RECEIVE_FILE_READY_EVENT',
    number_of_arguments => 1,
    enabled             => FALSE
  );

  DBMS_SCHEDULER.DEFINE_METADATA_ARGUMENT(
    program_name       => 'APP.PRG_RECEIVE_FILE_EVENT',
    metadata_attribute => 'EVENT_MESSAGE',
    argument_position  => 1,
    argument_name      => 'P_EVENT'
  );

  DBMS_SCHEDULER.ENABLE('APP.PRG_RECEIVE_FILE_EVENT');
END;
/

イベントスケジュールとジョブを作成する

event_condition はAQルールの構文で書き、オブジェクト型の属性は tab.user_data から参照します。ジョブは名前付きプログラムを指定します。

create-event-schedule-and-job.sql
BEGIN
  DBMS_SCHEDULER.CREATE_EVENT_SCHEDULE(
    schedule_name   => 'APP.ES_FILE_READY',
    start_date      => SYSTIMESTAMP,
    event_condition => q'[
      tab.user_data.event_name = 'FILE_READY'
      AND tab.user_data.object_name LIKE 'orders_%'
    ]',
    queue_spec      => 'APP.EVENT_QUEUE'
  );

  DBMS_SCHEDULER.CREATE_JOB(
    job_name     => 'APP.JOB_RECEIVE_FILE_EVENT',
    program_name => 'APP.PRG_RECEIVE_FILE_EVENT',
    schedule_name => 'APP.ES_FILE_READY',
    job_class    => 'CLS_BATCH_LOW',
    auto_drop    => FALSE,
    enabled      => TRUE
  );
END;
/
イベント取りこぼしへの注意

Oracle Schedulerのイベント起動では、同じジョブがすでに実行中の間に届いたイベントが消費されても、追加実行を起こさない場合があります。そのため受信ジョブは短時間で終了させ、重い処理は受信箱を読む別ワーカーへ分離します。

イベントをAQへ投入する

enqueue-file-ready-event.sql
DECLARE
  v_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
  v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_msgid              RAW(16);
  v_correlation_id     VARCHAR2(64) := 'CID-' || RAWTOHEX(SYS_GUID());
  v_payload            app_event_typ;
BEGIN
  v_payload := app_event_typ(
    event_id       => RAWTOHEX(SYS_GUID()),
    event_name     => 'FILE_READY',
    correlation_id => v_correlation_id,
    object_schema  => 'STAGE',
    object_name    => 'orders_20260606.csv',
    payload_json   => JSON_OBJECT(
      'bucket' VALUE 'ingest',
      'key' VALUE 'orders_20260606.csv'
      RETURNING CLOB
    )
  );

  DBMS_AQ.ENQUEUE(
    queue_name         => 'APP.EVENT_QUEUE',
    enqueue_options    => v_enqueue_options,
    message_properties => v_message_properties,
    payload            => v_payload,
    msgid              => v_msgid
  );

  COMMIT;
END;
/

AQは少なくとも1回配送を前提に、重複しても壊れない受信処理を設計します。例外キュー、再配送回数、保持期間、古いメッセージの削除方針も運用設計に含めます。

監視ビューの正しい列を使う

実行中チェーンの監視には、STEP_NAMESTATEERROR_CODESTART_DATEEND_DATEDURATION を使います。CHAIN_STEPSTEP_STATUS という列はありません。

monitor-scheduler-and-chains.sql
-- 直近のジョブ実行履歴
SELECT
  job_name,
  status,
  actual_start_date,
  run_duration,
  error#,
  additional_info
FROM user_scheduler_job_run_details
WHERE log_date >= SYSTIMESTAMP - INTERVAL '1' DAY
ORDER BY log_date DESC;

-- 実行中チェーンの各ステップ
SELECT
  job_name,
  chain_name,
  step_name,
  state,
  error_code,
  start_date,
  end_date,
  duration
FROM user_scheduler_running_chains
ORDER BY job_name, step_name;

-- 次回実行予定
SELECT job_name, state, next_run_date
FROM user_scheduler_jobs
WHERE enabled = 'TRUE'
ORDER BY next_run_date;

チェーンを正しく再実行する

RUN_CHAINstart_steps に渡すのはルール名ではなく、開始したいステップ名です。通常の先頭ルールから開始するなら start_steps を省略します。

rerun-chain.sql
-- 通常どおり先頭から起動
BEGIN
  DBMS_SCHEDULER.RUN_CHAIN(
    chain_name => 'APP.CHN_NIGHTLY_IMPORT',
    job_name   => 'APP.RUN_NIGHTLY_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF3')
  );
END;
/

-- 検証済みの中間結果があり、STEP_VALIDATEから再開する場合
BEGIN
  DBMS_SCHEDULER.RUN_CHAIN(
    chain_name  => 'APP.CHN_NIGHTLY_IMPORT',
    job_name    => 'APP.RUN_VALIDATE_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF3'),
    start_steps => 'STEP_VALIDATE'
  );
END;
/

途中ステップから再開する前に、前段の成果物が存在し、再利用して安全かを確認します。単に失敗したステップから始めればよいとは限りません。各ステップに処理済みキー、入力版、出力件数、チェックサムを残すと判断しやすくなります。

ウィンドウとリソース制御を設定する

ウィンドウの継続時間は duration 引数で指定します。カレンダー式の repeat_interval に期間を混ぜません。

create-batch-window.sql
BEGIN
  DBMS_SCHEDULER.CREATE_WINDOW(
    window_name     => 'WDW_NIGHT',
    resource_plan   => 'APP_BATCH_PLAN',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=DAILY;BYHOUR=1;BYMINUTE=0;BYSECOND=0',
    duration        => INTERVAL '6' HOUR,
    window_priority => 'LOW'
  );

  DBMS_SCHEDULER.ENABLE('WDW_NIGHT');
END;
/

テストとリリースで確認すること

  • 各プログラムを単体実行し、正常・異常・再実行を確認する
  • 各チェーンルールが想定した状態でだけ発火するか確認する
  • イベントペイロードがEVENT_MESSAGE引数へ正しい型で渡るか確認する
  • 同じevent_idを複数回投入しても受信箱が重複しないか確認する
  • イベントジョブ実行中に追加イベントを投入して挙動を確認する
  • AQ例外キューと再配送回数を確認する
  • 途中ステップ再実行時の前提データを検証する
  • サービス、ジョブクラス、Resource Manager設定を本番前に確認する

まとめ

DBMS_SCHEDULER のチェーンは、処理の段階と分岐を明示するのに向いています。AQイベントからジョブを起動する場合は、名前付き STORED_PROCEDURE プログラムと EVENT_MESSAGE メタデータ引数を使い、実際のイベント型で受け取ります。

イベント受信ジョブは短くし、受信箱へ冪等に保存してから別ワーカーで処理すると、重複・再送・同時実行に強くなります。再実行ではルール名ではなくステップ名を指定し、監視では正しいSchedulerビュー列を使います。チェーン、AQ、再試行、監視、リソース制御を分けて設計すれば、非同期処理を運用可能な形へ育てられます。