【PL/SQL】ステージングテーブル設計|CSV取込・バリデーション・再実行・エラー隔離まで

【PL/SQL】ステージングテーブル設計|CSV取込・バリデーション・再実行・エラー隔離まで PL/SQL

PL/SQLでCSVや外部システムのデータを取り込むとき、いきなり本テーブルへINSERTやMERGEをすると、途中で失敗した行、型変換できない値、再実行すべき範囲が見えにくくなります。特に日次バッチや移行データ取込では、1行だけ不正でも全体を止めるのか、正常行だけ進めるのか、あとから説明できる設計が必要です。

そこで使いやすいのがステージングテーブルです。外部データをまず受信用テーブルへ保存し、行番号、取込バッチID、処理状態、検証エラーを持たせてから、本テーブルへ反映します。この形にすると、失敗行だけを修正して再実行する、同じファイルの二重取込を防ぐ、監査用に元データを残す、といった運用がしやすくなります。

この記事では、PL/SQLでCSV取込を想定したステージングテーブル設計、バリデーション、エラー隔離、正常行のMERGE、再実行の作り方を整理します。ファイル読込そのものは UTL_FILE完全ガイド、大量処理の考え方は BULK COLLECTとFORALLによる一括処理 とあわせて読むと実装しやすくなります。

この記事で扱うこと

  • ステージングテーブルを使う理由
  • 取込バッチID、行番号、状態、エラー情報の持ち方
  • CSVの値を最初は文字列で受ける設計
  • バリデーションと型変換の分離
  • 正常行だけを本テーブルへMERGEする方法
  • エラー行の修正、再検証、再実行の流れ
スポンサーリンク

なぜ本テーブルへ直接入れないのか

外部データは、DB内で作ったデータよりも揺れます。日付形式が混ざる、数値にカンマが入る、必須列が空、コード値がマスタに存在しない、同じキーが複数行ある、ということが普通に起きます。この状態で本テーブルへ直接反映すると、どの行で失敗したか、どこまで反映済みか、再実行してよいかが分かりにくくなります。

失敗行を追跡しにくいSQL例外だけでは、元ファイルの何行目が悪いのか運用者に説明しづらくなります。
型変換エラーで全体が止まる1行の日付不正や数値不正で、正常行まで取り込めない設計になりがちです。
再実行で重複しやすいどこまで成功したかを持たないと、同じファイルを再実行したときに二重登録が起きます。
元データを確認できない変換後の値しか残さないと、CSV上は何が入っていたのか追えません。

ステージングテーブルを挟むと、取込、検証、本登録を分けられます。本テーブルは正規化された業務データを持ち、ステージングテーブルは外部データの受け皿、検証結果、再実行状態を持つ役割にします。

全体の流れを決める

実務では、ステージング取込を1本の巨大プロシージャにせず、段階を分けておくと保守しやすくなります。取込に失敗したのか、検証で止まったのか、本登録で止まったのかが分かるだけで、障害対応の速度がかなり変わります。

1. バッチ開始取込バッチIDを採番し、ファイル名、開始時刻、実行者を記録します。
2. 生データ取込CSVの各行を文字列のままステージングテーブルへ保存します。
3. バリデーション必須、型、桁、マスタ存在、重複などを検証し、状態とエラーを記録します。
4. 本登録正常行だけを本テーブルへINSERTまたはMERGEします。
5. 完了または再実行エラー行を修正して再検証するか、バッチ全体を取り消します。

外部送信と同じく、取込処理も「どこまで終わったか」をDB上で見えるようにするのが大切です。処理状態を明示しておく考え方は アウトボックスパターンと再送キュー設計 とも近いです。

取込バッチ管理テーブルを作る

まず、ファイル単位または実行単位を表すバッチ管理テーブルを作ります。同じファイルを何回取り込んだか、どの状態で止まったか、何件成功・失敗したかをここで持ちます。

import-batch-ddl.sql
CREATE TABLE import_batch (
  batch_id        NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
  source_name     VARCHAR2(255) NOT NULL,
  source_hash     VARCHAR2(64),
  status          VARCHAR2(20) DEFAULT 'STARTED' NOT NULL,
  total_count     NUMBER DEFAULT 0 NOT NULL,
  valid_count     NUMBER DEFAULT 0 NOT NULL,
  error_count     NUMBER DEFAULT 0 NOT NULL,
  loaded_count    NUMBER DEFAULT 0 NOT NULL,
  started_at      TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
  finished_at     TIMESTAMP,
  created_by      VARCHAR2(128) DEFAULT SYS_CONTEXT('USERENV','SESSION_USER') NOT NULL,
  last_error_msg  VARCHAR2(1000),
  CONSTRAINT ck_import_batch_status CHECK (
    status IN ('STARTED','LOADED_RAW','VALIDATED','LOADED','ERROR','CANCELLED')
  )
);

CREATE UNIQUE INDEX uq_import_batch_source_hash
  ON import_batch(source_hash);

source_hashにはファイル内容のハッシュや、外部システムから渡された一意な取込IDを入れます。同じファイルの二重取込を防ぎたい場合は、ここにユニーク制約を置くと安全です。ただし、同じファイルを意図的に再取込したい運用なら、再取込理由や世代番号を別に持たせます。

ステージングテーブルは文字列で受ける

CSV取込のステージングテーブルでは、最初から日付型や数値型にしないほうが扱いやすいことが多いです。なぜなら、型変換できない値も「不正データ」として保存し、何行目の何列が悪いかを後から見たいからです。本テーブルへ反映する段階で、検証済みの値を型変換します。

また、取込用のステージングは一時表ではなく、永続表として作ることが多いです。障害調査、再実行、監査、運用者へのエラー返却で後から参照するためです。セッション単位で消えてよい中間計算なら一時表も使えますが、CSV取込の受け皿は「残して追える」ことを優先します。

stg-order-ddl.sql
CREATE TABLE stg_order_import (
  batch_id       NUMBER NOT NULL,
  row_no         NUMBER NOT NULL,
  raw_line       CLOB,
  order_no_raw   VARCHAR2(100),
  order_date_raw VARCHAR2(100),
  customer_raw   VARCHAR2(200),
  amount_raw     VARCHAR2(100),
  status         VARCHAR2(20) DEFAULT 'RAW' NOT NULL,
  error_code     VARCHAR2(100),
  error_msg      VARCHAR2(1000),
  loaded_order_id NUMBER,
  created_at     TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
  updated_at     TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
  CONSTRAINT pk_stg_order_import PRIMARY KEY (batch_id, row_no),
  CONSTRAINT ck_stg_order_status CHECK (
    status IN ('RAW','VALID','ERROR','LOADED','SKIPPED')
  ),
  CONSTRAINT fk_stg_order_batch FOREIGN KEY (batch_id)
    REFERENCES import_batch(batch_id)
);

CREATE INDEX ix_stg_order_status
  ON stg_order_import(batch_id, status);

row_noは元ファイルの行番号です。障害対応では「123行目の注文日が不正です」と言えることが重要なので、必ず持たせます。raw_lineも残しておくと、パース後の列だけでは判断できない文字化けや区切り文字の問題を確認できます。

CSVをステージングへ登録する

CSVのパースは現場の仕様によって大きく変わります。ダブルクォート、カンマを含む文字列、改行を含む列があるなら、単純なINSTR分割ではなく、アプリ側やSQL Loader、外部表、専用パーサを検討します。ここでは流れを見せるため、パース済みの値を受け取ってステージングへ入れる形にします。

stage-order-row.sql
CREATE OR REPLACE PACKAGE order_import_stage AS
  FUNCTION start_batch(
    p_source_name VARCHAR2,
    p_source_hash VARCHAR2
  ) RETURN NUMBER;

  PROCEDURE add_row(
    p_batch_id       NUMBER,
    p_row_no         NUMBER,
    p_raw_line       CLOB,
    p_order_no_raw   VARCHAR2,
    p_order_date_raw VARCHAR2,
    p_customer_raw   VARCHAR2,
    p_amount_raw     VARCHAR2
  );
END;
/

CREATE OR REPLACE PACKAGE BODY order_import_stage AS
  FUNCTION start_batch(
    p_source_name VARCHAR2,
    p_source_hash VARCHAR2
  ) RETURN NUMBER IS
    v_batch_id NUMBER;
  BEGIN
    INSERT INTO import_batch(source_name, source_hash)
    VALUES (p_source_name, p_source_hash)
    RETURNING batch_id INTO v_batch_id;

    RETURN v_batch_id;
  END;

  PROCEDURE add_row(
    p_batch_id       NUMBER,
    p_row_no         NUMBER,
    p_raw_line       CLOB,
    p_order_no_raw   VARCHAR2,
    p_order_date_raw VARCHAR2,
    p_customer_raw   VARCHAR2,
    p_amount_raw     VARCHAR2
  ) IS
  BEGIN
    INSERT INTO stg_order_import(
      batch_id, row_no, raw_line,
      order_no_raw, order_date_raw, customer_raw, amount_raw
    ) VALUES (
      p_batch_id, p_row_no, p_raw_line,
      p_order_no_raw, p_order_date_raw, p_customer_raw, p_amount_raw
    );
  END;
END;
/

実際にファイルを読む部分はUTL_FILE.GET_LINEなどを使えます。ただし、CSV仕様が複雑な場合はPL/SQLだけで無理にパースせず、外部表やアプリケーション側で正しく分解してからDBへ渡す判断も必要です。

バリデーションで状態を分ける

ステージングに入れた直後の行はRAWです。バリデーションで問題がなければVALID、問題があればERRORにします。型変換に失敗した行も例外で処理全体を止めず、行単位のエラーとして保存します。

validate-order-stage.sql
CREATE OR REPLACE PACKAGE order_import_validate AS
  PROCEDURE validate_batch(p_batch_id NUMBER);
END;
/

CREATE OR REPLACE PACKAGE BODY order_import_validate AS
  FUNCTION is_number(p_value VARCHAR2) RETURN BOOLEAN IS
    v_number     NUMBER;
    v_normalized VARCHAR2(100);
  BEGIN
    v_normalized := REPLACE(TRIM(p_value), ',', '');

    IF NOT REGEXP_LIKE(v_normalized, '^-?[0-9]+([.][0-9]+)?$') THEN
      RETURN FALSE;
    END IF;

    v_number := TO_NUMBER(
      v_normalized,
      'FM999999999999999999999999999999D9999999999',
      'NLS_NUMERIC_CHARACTERS=.,'
    );
    RETURN TRUE;
  EXCEPTION
    WHEN VALUE_ERROR OR INVALID_NUMBER THEN
      RETURN FALSE;
  END;

  FUNCTION is_date_yyyymmdd(p_value VARCHAR2) RETURN BOOLEAN IS
    v_date DATE;
  BEGIN
    IF NOT REGEXP_LIKE(TRIM(p_value), '^[0-9]{8}$') THEN
      RETURN FALSE;
    END IF;

    v_date := TO_DATE(TRIM(p_value), 'FXYYYYMMDD');
    RETURN TRUE;
  EXCEPTION
    WHEN OTHERS THEN
      RETURN FALSE;
  END;

  PROCEDURE set_error(
    p_batch_id NUMBER,
    p_row_no   NUMBER,
    p_code     VARCHAR2,
    p_msg      VARCHAR2
  ) IS
  BEGIN
    UPDATE stg_order_import
       SET status = 'ERROR',
           error_code = p_code,
           error_msg = p_msg,
           updated_at = SYSTIMESTAMP
     WHERE batch_id = p_batch_id
       AND row_no = p_row_no;
  END;

  PROCEDURE validate_batch(p_batch_id NUMBER) IS
  BEGIN
    FOR r IN (
      SELECT *
        FROM stg_order_import
       WHERE batch_id = p_batch_id
         AND status IN ('RAW','ERROR')
       ORDER BY row_no
    ) LOOP
      IF r.order_no_raw IS NULL THEN
        set_error(p_batch_id, r.row_no, 'ORDER_NO_REQUIRED', 'order_no is required');
      ELSIF r.order_date_raw IS NULL OR NOT is_date_yyyymmdd(r.order_date_raw) THEN
        set_error(p_batch_id, r.row_no, 'ORDER_DATE_INVALID', 'order_date must be YYYYMMDD');
      ELSIF r.amount_raw IS NULL OR NOT is_number(r.amount_raw) THEN
        set_error(p_batch_id, r.row_no, 'AMOUNT_INVALID', 'amount must be number');
      ELSE
        UPDATE stg_order_import
           SET status = 'VALID',
               error_code = NULL,
               error_msg = NULL,
               updated_at = SYSTIMESTAMP
         WHERE batch_id = p_batch_id
           AND row_no = r.row_no;
      END IF;
    END LOOP;
  END;
END;
/

この例は分かりやすさを優先して行単位ループにしています。日付はFXYYYYMMDDで厳密に変換し、数値はカンマを除去してから正規表現とNLS_NUMERIC_CHARACTERS固定のTO_NUMBERで確認しています。全角数字、通貨記号、空白、小数点の扱いは業務仕様で変わるため、取込前にどこまで正規化するかを決めておく必要があります。大量データでは集合UPDATEやFORALLも検討します。ただし、最初から速度だけを優先するとエラー理由が雑になりやすいので、まずは運用者が読めるエラーを残す設計を固めるのがおすすめです。

マスタ存在チェックと重複チェックを入れる

型が正しいだけでは、本登録してよいとは限りません。顧客コードがマスタに存在するか、同じ注文番号がファイル内に重複していないか、すでに本テーブルに登録済みではないかも確認します。

validate-master-and-duplicate.sql
-- 顧客マスタに存在しない行をエラーにする
UPDATE stg_order_import s
   SET status = 'ERROR',
       error_code = 'CUSTOMER_NOT_FOUND',
       error_msg = 'customer code does not exist',
       updated_at = SYSTIMESTAMP
 WHERE s.batch_id = :batch_id
   AND s.status = 'VALID'
   AND NOT EXISTS (
     SELECT 1
       FROM customer_mst c
      WHERE c.customer_code = s.customer_raw
   );

-- 同じファイル内で注文番号が重複している行をエラーにする
UPDATE stg_order_import s
   SET status = 'ERROR',
       error_code = 'DUPLICATE_IN_FILE',
       error_msg = 'order_no is duplicated in source file',
       updated_at = SYSTIMESTAMP
 WHERE s.batch_id = :batch_id
   AND s.status = 'VALID'
   AND s.order_no_raw IN (
     SELECT order_no_raw
       FROM stg_order_import
      WHERE batch_id = :batch_id
        AND status = 'VALID'
      GROUP BY order_no_raw
     HAVING COUNT(*) > 1
   );

こうした検証は、すべて例外処理に任せるよりも、ステージング上で明示的にERRORへ落とすほうが運用しやすいです。制約違反をあえて拾う場合は DBMS_ERRLOGとLOG ERRORSの使い方 も選択肢になります。

正常行だけ本テーブルへMERGEする

検証済みのVALID行だけを本テーブルへ反映します。新規登録と更新の両方があり得るならMERGEが使いやすいです。MERGE文の細かい挙動は Oracle MERGE文完全ガイド も参考になります。

merge-valid-orders.sql
MERGE INTO orders t
USING (
  SELECT
    batch_id,
    row_no,
    order_no_raw AS order_no,
    TO_DATE(TRIM(order_date_raw), 'FXYYYYMMDD') AS order_date,
    customer_raw AS customer_code,
    TO_NUMBER(
      REPLACE(TRIM(amount_raw), ',', ''),
      'FM999999999999999999999999999999D9999999999',
      'NLS_NUMERIC_CHARACTERS=.,'
    ) AS amount
  FROM stg_order_import
  WHERE batch_id = :batch_id
    AND status = 'VALID'
) s
ON (t.order_no = s.order_no)
WHEN MATCHED THEN
  UPDATE SET
    t.order_date = s.order_date,
    t.customer_code = s.customer_code,
    t.amount = s.amount,
    t.updated_at = SYSTIMESTAMP
WHEN NOT MATCHED THEN
  INSERT (order_id, order_no, order_date, customer_code, amount, created_at, updated_at)
  VALUES (order_seq.NEXTVAL, s.order_no, s.order_date, s.customer_code, s.amount,
          SYSTIMESTAMP, SYSTIMESTAMP);

本登録後は、反映済みのステージング行をLOADEDへ更新します。注文IDなど本テーブル側のキーを戻しておくと、問い合わせや障害調査で便利です。ただし、次の例はorder_noが本テーブルで一意に管理されている前提です。

本登録後にステージング状態を更新する

MERGEだけで終わらせると、ステージング側から見た進捗が分からなくなります。本テーブル反映後は、反映済み行をLOADEDへ更新し、バッチ管理テーブルの件数も更新します。今回のMERGEで処理した行だけを厳密に追跡したい場合は、対象テーブルにlast_import_batch_idを持たせるか、取込結果マッピングテーブルを別に作ります。

mark-loaded.sql
UPDATE stg_order_import s
   SET status = 'LOADED',
       loaded_order_id = (
         SELECT o.order_id
           FROM orders o
          WHERE o.order_no = s.order_no_raw
       ),
       updated_at = SYSTIMESTAMP
 WHERE s.batch_id = :batch_id
   AND s.status = 'VALID'
   AND EXISTS (
     SELECT 1
       FROM orders o
      WHERE o.order_no = s.order_no_raw
   );

UPDATE import_batch b
   SET total_count = (
         SELECT COUNT(*) FROM stg_order_import s WHERE s.batch_id = b.batch_id
       ),
       valid_count = (
         SELECT COUNT(*) FROM stg_order_import s WHERE s.batch_id = b.batch_id AND s.status IN ('VALID','LOADED')
       ),
       error_count = (
         SELECT COUNT(*) FROM stg_order_import s WHERE s.batch_id = b.batch_id AND s.status = 'ERROR'
       ),
       loaded_count = (
         SELECT COUNT(*) FROM stg_order_import s WHERE s.batch_id = b.batch_id AND s.status = 'LOADED'
       ),
       status = CASE
         WHEN EXISTS (
           SELECT 1 FROM stg_order_import s
            WHERE s.batch_id = b.batch_id
              AND s.status = 'ERROR'
         ) THEN 'ERROR'
         ELSE 'LOADED'
       END,
       finished_at = SYSTIMESTAMP
 WHERE b.batch_id = :batch_id;

ステージング行をすぐ削除するか残すかは運用次第です。監査や問い合わせ対応が必要なら一定期間残し、件数が多いならパーティションや保存期間を決めて削除します。大量データのコミット単位やUNDOへの影響は 大量データ処理のコミット頻度とUNDO最適化 が参考になります。

エラー行を修正して再実行できるようにする

ステージング設計の価値は、エラー行を見つけて終わりではなく、修正して再実行できることです。運用画面や管理SQLから不正値を直し、ERROR行を再検証対象へ戻します。

retry-error-rows.sql
-- 例: 金額のカンマを除去して再検証対象へ戻す
UPDATE stg_order_import
   SET amount_raw = REPLACE(amount_raw, ',', ''),
       status = 'RAW',
       error_code = NULL,
       error_msg = NULL,
       updated_at = SYSTIMESTAMP
 WHERE batch_id = :batch_id
   AND status = 'ERROR'
   AND error_code = 'AMOUNT_INVALID';

BEGIN
  order_import_validate.validate_batch(:batch_id);
END;
/

-- 再検証後、VALIDになった行だけ本登録へ進める

ここで重要なのは、再実行時に全行を無条件でやり直さないことです。LOADED行は再反映せず、RAWまたはERRORから再検証された行だけを対象にします。例外設計や再試行の考え方は PL/SQLの例外設計と再試行パターン ともつながります。

コミット単位を決める

ステージング取込では、コミット単位を曖昧にすると再実行が難しくなります。ファイル全体を1トランザクションにするのか、取込だけ先にコミットするのか、本登録をチャンクごとにコミットするのかを決めておきます。

小規模で全件整合性が必要取込から本登録まで1トランザクションにできます。ただし失敗時は全体やり直しになります。
大規模で正常行を進めたい生データ取込を先にコミットし、検証と本登録を状態管理しながら進めます。
途中再開を重視バッチIDと行状態をコミットし、LOADED行をスキップして再開できる設計にします。
外部連携を含む本登録後に外部通知するなら、直接送信ではなくoutboxへ積む設計も検討します。

トランザクション境界は性能だけでなく、障害時にどこから再開できるかに直結します。設計に迷う場合は トランザクション設計パターン の観点で、COMMIT境界と再処理単位を先に決めると整理しやすいです。

監視SQLを用意する

取込処理は、エラーが出たあとに人が見るSQLまで用意して完成です。バッチ単位の状態、エラー内容、元ファイルの行番号をすぐ確認できるようにします。

monitor-import-batch.sql
-- バッチ一覧
SELECT batch_id, source_name, status,
       total_count, valid_count, error_count, loaded_count,
       started_at, finished_at
  FROM import_batch
 ORDER BY batch_id DESC;

-- エラー行の確認
SELECT row_no, order_no_raw, order_date_raw, customer_raw, amount_raw,
       error_code, error_msg
  FROM stg_order_import
 WHERE batch_id = :batch_id
   AND status = 'ERROR'
 ORDER BY row_no;

-- まだ本登録されていない正常行
SELECT COUNT(*) AS valid_not_loaded
  FROM stg_order_import
 WHERE batch_id = :batch_id
   AND status = 'VALID';

運用者が最初に見るSQLは、短く、意味が分かりやすいものにします。エラー詳細を追うSQLと、全体進捗を見るSQLを分けておくと、障害対応中に迷いにくくなります。

設計時に避けたいパターン

ステージングテーブルは便利ですが、何でも置き場にすると後から苦しくなります。次のような設計は避けたほうが安全です。

バッチIDがない複数ファイルの行が混ざり、再実行や削除の単位が分からなくなります。
行番号がないCSVの何行目が悪いのか運用者に返せません。
最初から数値型・日付型で受ける不正値を保存できず、エラー行の原因調査が難しくなります。
エラー理由が1列だけ複数の検証エラーや修正履歴を扱えず、再実行判断が曖昧になります。
LOADED状態を持たない本登録済み行を再実行で二重反映する危険があります。

1つのSELECTから複数テーブルへ振り分けたい場合は、ステージング後に マルチテーブルINSERT を使う設計もあります。ただし、エラー行の説明や再実行を重視するなら、まず状態管理できるステージングを作るほうが堅実です。

まとめ

PL/SQLでCSVや外部データを安全に取り込むなら、直接本テーブルへ入れるのではなく、ステージングテーブルを挟む設計が有効です。取込バッチID、元ファイル行番号、処理状態、エラーコード、エラーメッセージ、反映済みキーを持たせることで、失敗行の説明、再検証、再実行がしやすくなります。

最初は文字列で受け、バリデーションでVALIDERRORに分け、正常行だけを本テーブルへMERGEし、反映後はLOADEDへ更新します。この流れを守ると、取込処理が途中で失敗しても、どこまで進んだか、何を直せばよいか、どこから再開すればよいかをDB上で追えるようになります。ステージングテーブルは単なる一時置き場ではなく、外部データ取込を安全に運用するための制御テーブルとして設計するのがポイントです。