Oracleで非同期処理を設計するとき、定刻に1本のバッチを起動するだけでは、前処理・本処理・検証・後処理の分岐や、外部イベントへの即時反応を表現しにくくなります。DBMS_SCHEDULER のチェーンとOracle Advanced Queuing(AQ)を組み合わせると、段階実行とイベント駆動をデータベース内で管理できます。
ただし、チェーンの開始ステップ、イベントメッセージの受け取り方、監視ビューの列、再実行方法を誤ると、コード例がそのまま動きません。この記事では、名前付きプログラム、EVENT_MESSAGE メタデータ引数、正しい RUN_CHAIN、実行中チェーン監視、冪等な受信箱まで、実行可能性を重視して整理します。
- 処理本体は名前付きプログラムへ分離し、チェーンはステップと遷移だけを管理します。
- AQイベントの内容をジョブへ渡すには、
STORED_PROCEDUREプログラムとDEFINE_METADATA_ARGUMENT(..., 'EVENT_MESSAGE', ...)を使います。 RUN_CHAINのstart_stepsにはルール名ではなくステップ名を渡します。- イベントジョブの実行中に届いたイベントが次の起動にならない場合があるため、受信処理は短くし、受信箱へ永続化します。
- 長いバックオフでSchedulerワーカーを
SLEEPさせず、次回実行時刻を持つキューや一回限りのジョブへ逃がします。
Schedulerの基本操作は DBMS_SCHEDULER完全ガイド、実行履歴と異常検知は ジョブ実行履歴の集中管理と異常検知、例外と再試行は 例外設計と再試行パターン と合わせて読むと整理しやすくなります。
プログラム・チェーン・ジョブの責務を分ける
Schedulerでは、処理本体をプログラム、処理順序をチェーン、実行日時やジョブクラスをジョブへ分けると再利用しやすくなります。ジョブクラスやサービスは、OLTP処理とバッチ処理のリソースを分離する目的で使います。
ジョブクラスの作成には通常 MANAGE SCHEDULER が必要です。ジョブクラスとウィンドウはSYSスキーマで管理されるグローバルなSchedulerオブジェクトなので、作成・参照時の名前にアプリケーションスキーマを付けません。DBAが作成したジョブクラスをAPPから使う場合は、APPへそのジョブクラスの EXECUTE 権限を付与します。Resource ManagerのコンシューマグループやDBサービスも事前にDBAが作成します。アプリケーションスキーマへ強い権限を常時付与しない運用が安全です。
BEGIN
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name => 'APP.PRG_PREPARE_ENV',
program_type => 'STORED_PROCEDURE',
program_action => 'APP_PKG_IMPORT.PREPARE_ENV',
number_of_arguments => 0,
enabled => TRUE
);
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name => 'APP.PRG_IMPORT_ORDERS',
program_type => 'STORED_PROCEDURE',
program_action => 'APP_PKG_IMPORT.RUN_BATCH',
number_of_arguments => 0,
enabled => TRUE
);
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name => 'APP.PRG_VALIDATE',
program_type => 'STORED_PROCEDURE',
program_action => 'APP_PKG_IMPORT.VALIDATE_RESULT',
number_of_arguments => 0,
enabled => TRUE
);
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name => 'APP.PRG_FINALIZE',
program_type => 'STORED_PROCEDURE',
program_action => 'APP_PKG_IMPORT.FINALIZE_IMPORT',
number_of_arguments => 0,
enabled => TRUE
);
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name => 'APP.PRG_CLEANUP',
program_type => 'STORED_PROCEDURE',
program_action => 'APP_PKG_IMPORT.CLEANUP_FAILED_RUN',
number_of_arguments => 0,
enabled => TRUE
);
END;
/
-- DBA管理下で作る例。BATCH_GROUPとBATCH_SVCは事前作成が必要。
BEGIN
DBMS_SCHEDULER.CREATE_JOB_CLASS(
job_class_name => 'CLS_BATCH_LOW',
resource_consumer_group => 'BATCH_GROUP',
service => 'BATCH_SVC',
logging_level => DBMS_SCHEDULER.LOGGING_RUNS
);
END;
/
GRANT EXECUTE ON SYS.CLS_BATCH_LOW TO APP;
チェーンの分岐を正しい状態条件で定義する
チェーンは、各ステップの状態に応じて次の処理を開始します。成功時と失敗時の両方を明示し、最後に必ず END へ到達するルールを作ります。
BEGIN
DBMS_SCHEDULER.CREATE_CHAIN(
chain_name => 'APP.CHN_NIGHTLY_IMPORT'
);
DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
'APP.CHN_NIGHTLY_IMPORT', 'STEP_PREPARE', 'APP.PRG_PREPARE_ENV');
DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
'APP.CHN_NIGHTLY_IMPORT', 'STEP_IMPORT', 'APP.PRG_IMPORT_ORDERS');
DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
'APP.CHN_NIGHTLY_IMPORT', 'STEP_VALIDATE', 'APP.PRG_VALIDATE');
DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
'APP.CHN_NIGHTLY_IMPORT', 'STEP_FINALIZE', 'APP.PRG_FINALIZE');
DBMS_SCHEDULER.DEFINE_CHAIN_STEP(
'APP.CHN_NIGHTLY_IMPORT', 'STEP_CLEANUP', 'APP.PRG_CLEANUP');
DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
chain_name => 'APP.CHN_NIGHTLY_IMPORT',
condition => 'TRUE',
action => 'START STEP_PREPARE',
rule_name => 'R_START'
);
DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
chain_name => 'APP.CHN_NIGHTLY_IMPORT',
condition => ':STEP_PREPARE.STATE = ''SUCCEEDED''',
action => 'START STEP_IMPORT',
rule_name => 'R_PREPARE_OK'
);
DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
chain_name => 'APP.CHN_NIGHTLY_IMPORT',
condition => ':STEP_IMPORT.STATE = ''SUCCEEDED''',
action => 'START STEP_VALIDATE',
rule_name => 'R_IMPORT_OK'
);
DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
chain_name => 'APP.CHN_NIGHTLY_IMPORT',
condition => ':STEP_VALIDATE.STATE = ''SUCCEEDED''',
action => 'START STEP_FINALIZE',
rule_name => 'R_VALIDATE_OK'
);
DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
chain_name => 'APP.CHN_NIGHTLY_IMPORT',
condition => ':STEP_PREPARE.STATE = ''FAILED'' OR
:STEP_IMPORT.STATE = ''FAILED'' OR
:STEP_VALIDATE.STATE = ''FAILED'' OR
:STEP_FINALIZE.STATE = ''FAILED''',
action => 'START STEP_CLEANUP',
rule_name => 'R_FAILURE_CLEANUP'
);
DBMS_SCHEDULER.DEFINE_CHAIN_RULE(
chain_name => 'APP.CHN_NIGHTLY_IMPORT',
condition => ':STEP_FINALIZE.STATE = ''SUCCEEDED'' OR
:STEP_CLEANUP.COMPLETED = ''TRUE''',
action => 'END',
rule_name => 'R_END'
);
DBMS_SCHEDULER.ENABLE('APP.CHN_NIGHTLY_IMPORT');
END;
/
ステップ状態は SUCCEEDED、FAILED、STOPPED などで判定できます。クリーンアップが失敗する可能性もあるため、運用要件によってはクリーンアップ失敗時の終了コードや通知ルールも追加します。
チェーンジョブを作成する
BEGIN
DBMS_SCHEDULER.CREATE_JOB(
job_name => 'APP.JOB_NIGHTLY_IMPORT',
job_type => 'CHAIN',
job_action => 'APP.CHN_NIGHTLY_IMPORT',
job_class => 'CLS_BATCH_LOW',
start_date => SYSTIMESTAMP,
repeat_interval => 'FREQ=DAILY;BYHOUR=2;BYMINUTE=0;BYSECOND=0',
auto_drop => FALSE,
enabled => TRUE
);
END;
/
長いSLEEPを使わずに再試行する
DBMS_LOCK.SLEEP は権限が必要な環境があり、待機中もSchedulerワーカーを占有します。数秒の同期調整を除き、長い指数バックオフには向きません。失敗した処理は、次回実行時刻を持つ再試行テーブルへ記録し、別の短時間ジョブが期限到来分を処理する構成にすると安定します。
CREATE TABLE async_retry_queue (
retry_id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
task_type VARCHAR2(50) NOT NULL,
business_key VARCHAR2(128) NOT NULL,
correlation_id VARCHAR2(64) NOT NULL,
attempt_count NUMBER DEFAULT 0 NOT NULL,
next_run_at TIMESTAMP WITH TIME ZONE NOT NULL,
status VARCHAR2(20) DEFAULT 'WAITING' NOT NULL,
last_error VARCHAR2(4000),
created_at TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL,
CONSTRAINT uq_async_retry UNIQUE(task_type, business_key, correlation_id)
);
CREATE INDEX ix_async_retry_due
ON async_retry_queue(status, next_run_at);
-- 失敗時に次回実行時刻を計算して登録
MERGE INTO async_retry_queue q
USING (
SELECT
'IMPORT_ORDER' AS task_type,
:business_key AS business_key,
:correlation_id AS correlation_id
FROM dual
) s
ON (
q.task_type = s.task_type
AND q.business_key = s.business_key
AND q.correlation_id = s.correlation_id
)
WHEN MATCHED THEN UPDATE SET
q.attempt_count = q.attempt_count + 1,
q.next_run_at = SYSTIMESTAMP
+ NUMTODSINTERVAL(POWER(2, LEAST(q.attempt_count + 1, 6)), 'MINUTE'),
q.last_error = SUBSTR(:error_message, 1, 4000),
q.status = 'WAITING'
WHEN NOT MATCHED THEN INSERT(
task_type, business_key, correlation_id,
attempt_count, next_run_at, last_error
) VALUES(
s.task_type, s.business_key, s.correlation_id,
1, SYSTIMESTAMP + INTERVAL '2' MINUTE, SUBSTR(:error_message, 1, 4000)
);
再試行では、同じ業務キーを複数回処理しても結果が壊れない冪等性が必要です。トランザクション競合や再試行設計は トランザクション分離レベル別の一貫性テスト も参考になります。
AQのイベント型と受信箱を作る
イベント起動ジョブは短く保ちます。イベントを受け取ったら、業務処理をその場で長時間実行せず、まず受信箱へ冪等に保存します。別のワーカーが受信箱を処理すれば、同時実行中に次のイベントが届く場合や再送にも対応しやすくなります。
CREATE TYPE app_event_typ AS OBJECT(
event_id VARCHAR2(64),
event_name VARCHAR2(64),
correlation_id VARCHAR2(64),
object_schema VARCHAR2(128),
object_name VARCHAR2(128),
payload_json CLOB
);
/
CREATE TABLE event_inbox (
event_id VARCHAR2(64) PRIMARY KEY,
event_name VARCHAR2(64) NOT NULL,
correlation_id VARCHAR2(64),
object_schema VARCHAR2(128),
object_name VARCHAR2(128),
payload_json CLOB CHECK (payload_json IS JSON),
status VARCHAR2(20) DEFAULT 'NEW' NOT NULL,
received_at TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL,
processed_at TIMESTAMP WITH TIME ZONE
);
CREATE INDEX ix_event_inbox_status
ON event_inbox(status, received_at);
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'APP.EVENT_QTAB',
multiple_consumers => TRUE,
queue_payload_type => 'APP_EVENT_TYP'
);
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'APP.EVENT_QUEUE',
queue_table => 'APP.EVENT_QTAB'
);
DBMS_AQADM.START_QUEUE(
queue_name => 'APP.EVENT_QUEUE'
);
END;
/
EVENT_MESSAGEを受け取る名前付きプログラムを作る
AQイベントのペイロードをジョブへ渡すには、イベント型を引数に持つストアドプロシージャを作り、名前付きプログラムの引数を EVENT_MESSAGE として定義します。匿名PL/SQLブロックへダミー型を代入する方法では受け取れません。
CREATE OR REPLACE PROCEDURE app.receive_file_ready_event(
p_event IN app_event_typ
) AUTHID DEFINER IS
BEGIN
DBMS_SESSION.SET_IDENTIFIER(p_event.correlation_id);
DBMS_APPLICATION_INFO.SET_MODULE('AQ_EVENT', p_event.event_name);
-- event_idを主キーにして重複イベントを無害化する
MERGE INTO event_inbox i
USING (
SELECT
p_event.event_id AS event_id,
p_event.event_name AS event_name,
p_event.correlation_id AS correlation_id,
p_event.object_schema AS object_schema,
p_event.object_name AS object_name,
p_event.payload_json AS payload_json
FROM dual
) s
ON (i.event_id = s.event_id)
WHEN NOT MATCHED THEN INSERT(
event_id, event_name, correlation_id,
object_schema, object_name, payload_json
) VALUES(
s.event_id, s.event_name, s.correlation_id,
s.object_schema, s.object_name, s.payload_json
);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
RAISE;
END;
/
BEGIN
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name => 'APP.PRG_RECEIVE_FILE_EVENT',
program_type => 'STORED_PROCEDURE',
program_action => 'APP.RECEIVE_FILE_READY_EVENT',
number_of_arguments => 1,
enabled => FALSE
);
DBMS_SCHEDULER.DEFINE_METADATA_ARGUMENT(
program_name => 'APP.PRG_RECEIVE_FILE_EVENT',
metadata_attribute => 'EVENT_MESSAGE',
argument_position => 1,
argument_name => 'P_EVENT'
);
DBMS_SCHEDULER.ENABLE('APP.PRG_RECEIVE_FILE_EVENT');
END;
/
イベントスケジュールとジョブを作成する
event_condition はAQルールの構文で書き、オブジェクト型の属性は tab.user_data から参照します。ジョブは名前付きプログラムを指定します。
BEGIN
DBMS_SCHEDULER.CREATE_EVENT_SCHEDULE(
schedule_name => 'APP.ES_FILE_READY',
start_date => SYSTIMESTAMP,
event_condition => q'[
tab.user_data.event_name = 'FILE_READY'
AND tab.user_data.object_name LIKE 'orders_%'
]',
queue_spec => 'APP.EVENT_QUEUE'
);
DBMS_SCHEDULER.CREATE_JOB(
job_name => 'APP.JOB_RECEIVE_FILE_EVENT',
program_name => 'APP.PRG_RECEIVE_FILE_EVENT',
schedule_name => 'APP.ES_FILE_READY',
job_class => 'CLS_BATCH_LOW',
auto_drop => FALSE,
enabled => TRUE
);
END;
/
Oracle Schedulerのイベント起動では、同じジョブがすでに実行中の間に届いたイベントが消費されても、追加実行を起こさない場合があります。そのため受信ジョブは短時間で終了させ、重い処理は受信箱を読む別ワーカーへ分離します。
イベントをAQへ投入する
DECLARE
v_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_msgid RAW(16);
v_correlation_id VARCHAR2(64) := 'CID-' || RAWTOHEX(SYS_GUID());
v_payload app_event_typ;
BEGIN
v_payload := app_event_typ(
event_id => RAWTOHEX(SYS_GUID()),
event_name => 'FILE_READY',
correlation_id => v_correlation_id,
object_schema => 'STAGE',
object_name => 'orders_20260606.csv',
payload_json => JSON_OBJECT(
'bucket' VALUE 'ingest',
'key' VALUE 'orders_20260606.csv'
RETURNING CLOB
)
);
DBMS_AQ.ENQUEUE(
queue_name => 'APP.EVENT_QUEUE',
enqueue_options => v_enqueue_options,
message_properties => v_message_properties,
payload => v_payload,
msgid => v_msgid
);
COMMIT;
END;
/
AQは少なくとも1回配送を前提に、重複しても壊れない受信処理を設計します。例外キュー、再配送回数、保持期間、古いメッセージの削除方針も運用設計に含めます。
監視ビューの正しい列を使う
実行中チェーンの監視には、STEP_NAME、STATE、ERROR_CODE、START_DATE、END_DATE、DURATION を使います。CHAIN_STEP や STEP_STATUS という列はありません。
-- 直近のジョブ実行履歴 SELECT job_name, status, actual_start_date, run_duration, error#, additional_info FROM user_scheduler_job_run_details WHERE log_date >= SYSTIMESTAMP - INTERVAL '1' DAY ORDER BY log_date DESC; -- 実行中チェーンの各ステップ SELECT job_name, chain_name, step_name, state, error_code, start_date, end_date, duration FROM user_scheduler_running_chains ORDER BY job_name, step_name; -- 次回実行予定 SELECT job_name, state, next_run_date FROM user_scheduler_jobs WHERE enabled = 'TRUE' ORDER BY next_run_date;
チェーンを正しく再実行する
RUN_CHAIN の start_steps に渡すのはルール名ではなく、開始したいステップ名です。通常の先頭ルールから開始するなら start_steps を省略します。
-- 通常どおり先頭から起動
BEGIN
DBMS_SCHEDULER.RUN_CHAIN(
chain_name => 'APP.CHN_NIGHTLY_IMPORT',
job_name => 'APP.RUN_NIGHTLY_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF3')
);
END;
/
-- 検証済みの中間結果があり、STEP_VALIDATEから再開する場合
BEGIN
DBMS_SCHEDULER.RUN_CHAIN(
chain_name => 'APP.CHN_NIGHTLY_IMPORT',
job_name => 'APP.RUN_VALIDATE_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF3'),
start_steps => 'STEP_VALIDATE'
);
END;
/
途中ステップから再開する前に、前段の成果物が存在し、再利用して安全かを確認します。単に失敗したステップから始めればよいとは限りません。各ステップに処理済みキー、入力版、出力件数、チェックサムを残すと判断しやすくなります。
ウィンドウとリソース制御を設定する
ウィンドウの継続時間は duration 引数で指定します。カレンダー式の repeat_interval に期間を混ぜません。
BEGIN
DBMS_SCHEDULER.CREATE_WINDOW(
window_name => 'WDW_NIGHT',
resource_plan => 'APP_BATCH_PLAN',
start_date => SYSTIMESTAMP,
repeat_interval => 'FREQ=DAILY;BYHOUR=1;BYMINUTE=0;BYSECOND=0',
duration => INTERVAL '6' HOUR,
window_priority => 'LOW'
);
DBMS_SCHEDULER.ENABLE('WDW_NIGHT');
END;
/
テストとリリースで確認すること
- 各プログラムを単体実行し、正常・異常・再実行を確認する
- 各チェーンルールが想定した状態でだけ発火するか確認する
- イベントペイロードがEVENT_MESSAGE引数へ正しい型で渡るか確認する
- 同じevent_idを複数回投入しても受信箱が重複しないか確認する
- イベントジョブ実行中に追加イベントを投入して挙動を確認する
- AQ例外キューと再配送回数を確認する
- 途中ステップ再実行時の前提データを検証する
- サービス、ジョブクラス、Resource Manager設定を本番前に確認する
まとめ
DBMS_SCHEDULER のチェーンは、処理の段階と分岐を明示するのに向いています。AQイベントからジョブを起動する場合は、名前付き STORED_PROCEDURE プログラムと EVENT_MESSAGE メタデータ引数を使い、実際のイベント型で受け取ります。
イベント受信ジョブは短くし、受信箱へ冪等に保存してから別ワーカーで処理すると、重複・再送・同時実行に強くなります。再実行ではルール名ではなくステップ名を指定し、監視では正しいSchedulerビュー列を使います。チェーン、AQ、再試行、監視、リソース制御を分けて設計すれば、非同期処理を運用可能な形へ育てられます。

