PL/SQLでCSVや外部システムのデータを取り込むとき、いきなり本テーブルへINSERTやMERGEをすると、途中で失敗した行、型変換できない値、再実行すべき範囲が見えにくくなります。特に日次バッチや移行データ取込では、1行だけ不正でも全体を止めるのか、正常行だけ進めるのか、あとから説明できる設計が必要です。
そこで使いやすいのがステージングテーブルです。外部データをまず受信用テーブルへ保存し、行番号、取込バッチID、処理状態、検証エラーを持たせてから、本テーブルへ反映します。この形にすると、失敗行だけを修正して再実行する、同じファイルの二重取込を防ぐ、監査用に元データを残す、といった運用がしやすくなります。
この記事では、PL/SQLでCSV取込を想定したステージングテーブル設計、バリデーション、エラー隔離、正常行のMERGE、再実行の作り方を整理します。ファイル読込そのものは UTL_FILE完全ガイド、大量処理の考え方は BULK COLLECTとFORALLによる一括処理 とあわせて読むと実装しやすくなります。
- ステージングテーブルを使う理由
- 取込バッチID、行番号、状態、エラー情報の持ち方
- CSVの値を最初は文字列で受ける設計
- バリデーションと型変換の分離
- 正常行だけを本テーブルへMERGEする方法
- エラー行の修正、再検証、再実行の流れ
なぜ本テーブルへ直接入れないのか
外部データは、DB内で作ったデータよりも揺れます。日付形式が混ざる、数値にカンマが入る、必須列が空、コード値がマスタに存在しない、同じキーが複数行ある、ということが普通に起きます。この状態で本テーブルへ直接反映すると、どの行で失敗したか、どこまで反映済みか、再実行してよいかが分かりにくくなります。
ステージングテーブルを挟むと、取込、検証、本登録を分けられます。本テーブルは正規化された業務データを持ち、ステージングテーブルは外部データの受け皿、検証結果、再実行状態を持つ役割にします。
全体の流れを決める
実務では、ステージング取込を1本の巨大プロシージャにせず、段階を分けておくと保守しやすくなります。取込に失敗したのか、検証で止まったのか、本登録で止まったのかが分かるだけで、障害対応の速度がかなり変わります。
外部送信と同じく、取込処理も「どこまで終わったか」をDB上で見えるようにするのが大切です。処理状態を明示しておく考え方は アウトボックスパターンと再送キュー設計 とも近いです。
取込バッチ管理テーブルを作る
まず、ファイル単位または実行単位を表すバッチ管理テーブルを作ります。同じファイルを何回取り込んだか、どの状態で止まったか、何件成功・失敗したかをここで持ちます。
CREATE TABLE import_batch (
batch_id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
source_name VARCHAR2(255) NOT NULL,
source_hash VARCHAR2(64),
status VARCHAR2(20) DEFAULT 'STARTED' NOT NULL,
total_count NUMBER DEFAULT 0 NOT NULL,
valid_count NUMBER DEFAULT 0 NOT NULL,
error_count NUMBER DEFAULT 0 NOT NULL,
loaded_count NUMBER DEFAULT 0 NOT NULL,
started_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
finished_at TIMESTAMP,
created_by VARCHAR2(128) DEFAULT SYS_CONTEXT('USERENV','SESSION_USER') NOT NULL,
last_error_msg VARCHAR2(1000),
CONSTRAINT ck_import_batch_status CHECK (
status IN ('STARTED','LOADED_RAW','VALIDATED','LOADED','ERROR','CANCELLED')
)
);
CREATE UNIQUE INDEX uq_import_batch_source_hash
ON import_batch(source_hash);
source_hashにはファイル内容のハッシュや、外部システムから渡された一意な取込IDを入れます。同じファイルの二重取込を防ぎたい場合は、ここにユニーク制約を置くと安全です。ただし、同じファイルを意図的に再取込したい運用なら、再取込理由や世代番号を別に持たせます。
ステージングテーブルは文字列で受ける
CSV取込のステージングテーブルでは、最初から日付型や数値型にしないほうが扱いやすいことが多いです。なぜなら、型変換できない値も「不正データ」として保存し、何行目の何列が悪いかを後から見たいからです。本テーブルへ反映する段階で、検証済みの値を型変換します。
また、取込用のステージングは一時表ではなく、永続表として作ることが多いです。障害調査、再実行、監査、運用者へのエラー返却で後から参照するためです。セッション単位で消えてよい中間計算なら一時表も使えますが、CSV取込の受け皿は「残して追える」ことを優先します。
CREATE TABLE stg_order_import (
batch_id NUMBER NOT NULL,
row_no NUMBER NOT NULL,
raw_line CLOB,
order_no_raw VARCHAR2(100),
order_date_raw VARCHAR2(100),
customer_raw VARCHAR2(200),
amount_raw VARCHAR2(100),
status VARCHAR2(20) DEFAULT 'RAW' NOT NULL,
error_code VARCHAR2(100),
error_msg VARCHAR2(1000),
loaded_order_id NUMBER,
created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
updated_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
CONSTRAINT pk_stg_order_import PRIMARY KEY (batch_id, row_no),
CONSTRAINT ck_stg_order_status CHECK (
status IN ('RAW','VALID','ERROR','LOADED','SKIPPED')
),
CONSTRAINT fk_stg_order_batch FOREIGN KEY (batch_id)
REFERENCES import_batch(batch_id)
);
CREATE INDEX ix_stg_order_status
ON stg_order_import(batch_id, status);
row_noは元ファイルの行番号です。障害対応では「123行目の注文日が不正です」と言えることが重要なので、必ず持たせます。raw_lineも残しておくと、パース後の列だけでは判断できない文字化けや区切り文字の問題を確認できます。
CSVをステージングへ登録する
CSVのパースは現場の仕様によって大きく変わります。ダブルクォート、カンマを含む文字列、改行を含む列があるなら、単純なINSTR分割ではなく、アプリ側やSQL Loader、外部表、専用パーサを検討します。ここでは流れを見せるため、パース済みの値を受け取ってステージングへ入れる形にします。
CREATE OR REPLACE PACKAGE order_import_stage AS
FUNCTION start_batch(
p_source_name VARCHAR2,
p_source_hash VARCHAR2
) RETURN NUMBER;
PROCEDURE add_row(
p_batch_id NUMBER,
p_row_no NUMBER,
p_raw_line CLOB,
p_order_no_raw VARCHAR2,
p_order_date_raw VARCHAR2,
p_customer_raw VARCHAR2,
p_amount_raw VARCHAR2
);
END;
/
CREATE OR REPLACE PACKAGE BODY order_import_stage AS
FUNCTION start_batch(
p_source_name VARCHAR2,
p_source_hash VARCHAR2
) RETURN NUMBER IS
v_batch_id NUMBER;
BEGIN
INSERT INTO import_batch(source_name, source_hash)
VALUES (p_source_name, p_source_hash)
RETURNING batch_id INTO v_batch_id;
RETURN v_batch_id;
END;
PROCEDURE add_row(
p_batch_id NUMBER,
p_row_no NUMBER,
p_raw_line CLOB,
p_order_no_raw VARCHAR2,
p_order_date_raw VARCHAR2,
p_customer_raw VARCHAR2,
p_amount_raw VARCHAR2
) IS
BEGIN
INSERT INTO stg_order_import(
batch_id, row_no, raw_line,
order_no_raw, order_date_raw, customer_raw, amount_raw
) VALUES (
p_batch_id, p_row_no, p_raw_line,
p_order_no_raw, p_order_date_raw, p_customer_raw, p_amount_raw
);
END;
END;
/
実際にファイルを読む部分はUTL_FILE.GET_LINEなどを使えます。ただし、CSV仕様が複雑な場合はPL/SQLだけで無理にパースせず、外部表やアプリケーション側で正しく分解してからDBへ渡す判断も必要です。
バリデーションで状態を分ける
ステージングに入れた直後の行はRAWです。バリデーションで問題がなければVALID、問題があればERRORにします。型変換に失敗した行も例外で処理全体を止めず、行単位のエラーとして保存します。
CREATE OR REPLACE PACKAGE order_import_validate AS
PROCEDURE validate_batch(p_batch_id NUMBER);
END;
/
CREATE OR REPLACE PACKAGE BODY order_import_validate AS
FUNCTION is_number(p_value VARCHAR2) RETURN BOOLEAN IS
v_number NUMBER;
v_normalized VARCHAR2(100);
BEGIN
v_normalized := REPLACE(TRIM(p_value), ',', '');
IF NOT REGEXP_LIKE(v_normalized, '^-?[0-9]+([.][0-9]+)?$') THEN
RETURN FALSE;
END IF;
v_number := TO_NUMBER(
v_normalized,
'FM999999999999999999999999999999D9999999999',
'NLS_NUMERIC_CHARACTERS=.,'
);
RETURN TRUE;
EXCEPTION
WHEN VALUE_ERROR OR INVALID_NUMBER THEN
RETURN FALSE;
END;
FUNCTION is_date_yyyymmdd(p_value VARCHAR2) RETURN BOOLEAN IS
v_date DATE;
BEGIN
IF NOT REGEXP_LIKE(TRIM(p_value), '^[0-9]{8}$') THEN
RETURN FALSE;
END IF;
v_date := TO_DATE(TRIM(p_value), 'FXYYYYMMDD');
RETURN TRUE;
EXCEPTION
WHEN OTHERS THEN
RETURN FALSE;
END;
PROCEDURE set_error(
p_batch_id NUMBER,
p_row_no NUMBER,
p_code VARCHAR2,
p_msg VARCHAR2
) IS
BEGIN
UPDATE stg_order_import
SET status = 'ERROR',
error_code = p_code,
error_msg = p_msg,
updated_at = SYSTIMESTAMP
WHERE batch_id = p_batch_id
AND row_no = p_row_no;
END;
PROCEDURE validate_batch(p_batch_id NUMBER) IS
BEGIN
FOR r IN (
SELECT *
FROM stg_order_import
WHERE batch_id = p_batch_id
AND status IN ('RAW','ERROR')
ORDER BY row_no
) LOOP
IF r.order_no_raw IS NULL THEN
set_error(p_batch_id, r.row_no, 'ORDER_NO_REQUIRED', 'order_no is required');
ELSIF r.order_date_raw IS NULL OR NOT is_date_yyyymmdd(r.order_date_raw) THEN
set_error(p_batch_id, r.row_no, 'ORDER_DATE_INVALID', 'order_date must be YYYYMMDD');
ELSIF r.amount_raw IS NULL OR NOT is_number(r.amount_raw) THEN
set_error(p_batch_id, r.row_no, 'AMOUNT_INVALID', 'amount must be number');
ELSE
UPDATE stg_order_import
SET status = 'VALID',
error_code = NULL,
error_msg = NULL,
updated_at = SYSTIMESTAMP
WHERE batch_id = p_batch_id
AND row_no = r.row_no;
END IF;
END LOOP;
END;
END;
/
この例は分かりやすさを優先して行単位ループにしています。日付はFXYYYYMMDDで厳密に変換し、数値はカンマを除去してから正規表現とNLS_NUMERIC_CHARACTERS固定のTO_NUMBERで確認しています。全角数字、通貨記号、空白、小数点の扱いは業務仕様で変わるため、取込前にどこまで正規化するかを決めておく必要があります。大量データでは集合UPDATEやFORALLも検討します。ただし、最初から速度だけを優先するとエラー理由が雑になりやすいので、まずは運用者が読めるエラーを残す設計を固めるのがおすすめです。
マスタ存在チェックと重複チェックを入れる
型が正しいだけでは、本登録してよいとは限りません。顧客コードがマスタに存在するか、同じ注文番号がファイル内に重複していないか、すでに本テーブルに登録済みではないかも確認します。
-- 顧客マスタに存在しない行をエラーにする
UPDATE stg_order_import s
SET status = 'ERROR',
error_code = 'CUSTOMER_NOT_FOUND',
error_msg = 'customer code does not exist',
updated_at = SYSTIMESTAMP
WHERE s.batch_id = :batch_id
AND s.status = 'VALID'
AND NOT EXISTS (
SELECT 1
FROM customer_mst c
WHERE c.customer_code = s.customer_raw
);
-- 同じファイル内で注文番号が重複している行をエラーにする
UPDATE stg_order_import s
SET status = 'ERROR',
error_code = 'DUPLICATE_IN_FILE',
error_msg = 'order_no is duplicated in source file',
updated_at = SYSTIMESTAMP
WHERE s.batch_id = :batch_id
AND s.status = 'VALID'
AND s.order_no_raw IN (
SELECT order_no_raw
FROM stg_order_import
WHERE batch_id = :batch_id
AND status = 'VALID'
GROUP BY order_no_raw
HAVING COUNT(*) > 1
);
こうした検証は、すべて例外処理に任せるよりも、ステージング上で明示的にERRORへ落とすほうが運用しやすいです。制約違反をあえて拾う場合は DBMS_ERRLOGとLOG ERRORSの使い方 も選択肢になります。
正常行だけ本テーブルへMERGEする
検証済みのVALID行だけを本テーブルへ反映します。新規登録と更新の両方があり得るならMERGEが使いやすいです。MERGE文の細かい挙動は Oracle MERGE文完全ガイド も参考になります。
MERGE INTO orders t
USING (
SELECT
batch_id,
row_no,
order_no_raw AS order_no,
TO_DATE(TRIM(order_date_raw), 'FXYYYYMMDD') AS order_date,
customer_raw AS customer_code,
TO_NUMBER(
REPLACE(TRIM(amount_raw), ',', ''),
'FM999999999999999999999999999999D9999999999',
'NLS_NUMERIC_CHARACTERS=.,'
) AS amount
FROM stg_order_import
WHERE batch_id = :batch_id
AND status = 'VALID'
) s
ON (t.order_no = s.order_no)
WHEN MATCHED THEN
UPDATE SET
t.order_date = s.order_date,
t.customer_code = s.customer_code,
t.amount = s.amount,
t.updated_at = SYSTIMESTAMP
WHEN NOT MATCHED THEN
INSERT (order_id, order_no, order_date, customer_code, amount, created_at, updated_at)
VALUES (order_seq.NEXTVAL, s.order_no, s.order_date, s.customer_code, s.amount,
SYSTIMESTAMP, SYSTIMESTAMP);
本登録後は、反映済みのステージング行をLOADEDへ更新します。注文IDなど本テーブル側のキーを戻しておくと、問い合わせや障害調査で便利です。ただし、次の例はorder_noが本テーブルで一意に管理されている前提です。
本登録後にステージング状態を更新する
MERGEだけで終わらせると、ステージング側から見た進捗が分からなくなります。本テーブル反映後は、反映済み行をLOADEDへ更新し、バッチ管理テーブルの件数も更新します。今回のMERGEで処理した行だけを厳密に追跡したい場合は、対象テーブルにlast_import_batch_idを持たせるか、取込結果マッピングテーブルを別に作ります。
UPDATE stg_order_import s
SET status = 'LOADED',
loaded_order_id = (
SELECT o.order_id
FROM orders o
WHERE o.order_no = s.order_no_raw
),
updated_at = SYSTIMESTAMP
WHERE s.batch_id = :batch_id
AND s.status = 'VALID'
AND EXISTS (
SELECT 1
FROM orders o
WHERE o.order_no = s.order_no_raw
);
UPDATE import_batch b
SET total_count = (
SELECT COUNT(*) FROM stg_order_import s WHERE s.batch_id = b.batch_id
),
valid_count = (
SELECT COUNT(*) FROM stg_order_import s WHERE s.batch_id = b.batch_id AND s.status IN ('VALID','LOADED')
),
error_count = (
SELECT COUNT(*) FROM stg_order_import s WHERE s.batch_id = b.batch_id AND s.status = 'ERROR'
),
loaded_count = (
SELECT COUNT(*) FROM stg_order_import s WHERE s.batch_id = b.batch_id AND s.status = 'LOADED'
),
status = CASE
WHEN EXISTS (
SELECT 1 FROM stg_order_import s
WHERE s.batch_id = b.batch_id
AND s.status = 'ERROR'
) THEN 'ERROR'
ELSE 'LOADED'
END,
finished_at = SYSTIMESTAMP
WHERE b.batch_id = :batch_id;
ステージング行をすぐ削除するか残すかは運用次第です。監査や問い合わせ対応が必要なら一定期間残し、件数が多いならパーティションや保存期間を決めて削除します。大量データのコミット単位やUNDOへの影響は 大量データ処理のコミット頻度とUNDO最適化 が参考になります。
エラー行を修正して再実行できるようにする
ステージング設計の価値は、エラー行を見つけて終わりではなく、修正して再実行できることです。運用画面や管理SQLから不正値を直し、ERROR行を再検証対象へ戻します。
-- 例: 金額のカンマを除去して再検証対象へ戻す
UPDATE stg_order_import
SET amount_raw = REPLACE(amount_raw, ',', ''),
status = 'RAW',
error_code = NULL,
error_msg = NULL,
updated_at = SYSTIMESTAMP
WHERE batch_id = :batch_id
AND status = 'ERROR'
AND error_code = 'AMOUNT_INVALID';
BEGIN
order_import_validate.validate_batch(:batch_id);
END;
/
-- 再検証後、VALIDになった行だけ本登録へ進める
ここで重要なのは、再実行時に全行を無条件でやり直さないことです。LOADED行は再反映せず、RAWまたはERRORから再検証された行だけを対象にします。例外設計や再試行の考え方は PL/SQLの例外設計と再試行パターン ともつながります。
コミット単位を決める
ステージング取込では、コミット単位を曖昧にすると再実行が難しくなります。ファイル全体を1トランザクションにするのか、取込だけ先にコミットするのか、本登録をチャンクごとにコミットするのかを決めておきます。
トランザクション境界は性能だけでなく、障害時にどこから再開できるかに直結します。設計に迷う場合は トランザクション設計パターン の観点で、COMMIT境界と再処理単位を先に決めると整理しやすいです。
監視SQLを用意する
取込処理は、エラーが出たあとに人が見るSQLまで用意して完成です。バッチ単位の状態、エラー内容、元ファイルの行番号をすぐ確認できるようにします。
-- バッチ一覧
SELECT batch_id, source_name, status,
total_count, valid_count, error_count, loaded_count,
started_at, finished_at
FROM import_batch
ORDER BY batch_id DESC;
-- エラー行の確認
SELECT row_no, order_no_raw, order_date_raw, customer_raw, amount_raw,
error_code, error_msg
FROM stg_order_import
WHERE batch_id = :batch_id
AND status = 'ERROR'
ORDER BY row_no;
-- まだ本登録されていない正常行
SELECT COUNT(*) AS valid_not_loaded
FROM stg_order_import
WHERE batch_id = :batch_id
AND status = 'VALID';
運用者が最初に見るSQLは、短く、意味が分かりやすいものにします。エラー詳細を追うSQLと、全体進捗を見るSQLを分けておくと、障害対応中に迷いにくくなります。
設計時に避けたいパターン
ステージングテーブルは便利ですが、何でも置き場にすると後から苦しくなります。次のような設計は避けたほうが安全です。
1つのSELECTから複数テーブルへ振り分けたい場合は、ステージング後に マルチテーブルINSERT を使う設計もあります。ただし、エラー行の説明や再実行を重視するなら、まず状態管理できるステージングを作るほうが堅実です。
まとめ
PL/SQLでCSVや外部データを安全に取り込むなら、直接本テーブルへ入れるのではなく、ステージングテーブルを挟む設計が有効です。取込バッチID、元ファイル行番号、処理状態、エラーコード、エラーメッセージ、反映済みキーを持たせることで、失敗行の説明、再検証、再実行がしやすくなります。
最初は文字列で受け、バリデーションでVALIDとERRORに分け、正常行だけを本テーブルへMERGEし、反映後はLOADEDへ更新します。この流れを守ると、取込処理が途中で失敗しても、どこまで進んだか、何を直せばよいか、どこから再開すればよいかをDB上で追えるようになります。ステージングテーブルは単なる一時置き場ではなく、外部データ取込を安全に運用するための制御テーブルとして設計するのがポイントです。
