Recotem に SQL / GA4 Data API データソースを追加

推薦システム構築ツール Recotem のレシピ駆動パイプラインに、2 つのファーストパーティデータソースを追加しました。SQLAlchemy 2 経由の汎用 SQL ソース (sql) と、Google Analytics 4 Data API の直接読み込みソース (ga4) です。これまで標準対応していた CSV / Parquet / BigQuery に加え、運用中の RDB や GA4 から直接学習データを取り込めるようになっています。

追加した 2 つのデータソース

sql — 汎用 SQL データソース

SQLAlchemy 2 を使った汎用的な SQL データソースで、PostgreSQL / MySQL / MariaDB / SQLite に対応します。

data:
  source: sql
  dsn_env: RECOTEM_PURCHASE_DSN
  query: |
    SELECT user_id, item_id, purchased_at
    FROM purchases
    WHERE purchased_at >= :since
  params:
    since: '2025-01-01'
  user_column: user_id
  item_column: item_id

DSN はレシピに直接書かず、必ず環境変数経由で渡す設計です。pandas.read_sql を 100k 行単位でチャンク読み込みし、RECOTEM_MAX_SQL_ROWS で行数の上限をハードキャップします。サーバーサイドカーソル (stream_results=True) を使って、メモリに乗り切らないサイズのテーブルを安全に扱えます。

ga4 — GA4 Data API データソース

Google Analytics 4 の Data API を直接叩いて、イベントログを学習データとして取り込みます。

data:
  source: ga4
  property_id: '123456789'
  date_range:
    start_date: '2025-01-01'
    end_date: '2025-12-31'
  event_names: ['purchase', 'add_to_cart']
  user_column: clientId
  item_column: itemId

認証は Application Default Credentials (ADC) のみ対応で、レシピに credentials を埋め込めない設計です。runReport のページングは RECOTEM_GA4_MAX_PAGES で上限を設けています。ResourceExhausted / ServiceUnavailable に対しては 3 × api_timeout_seconds の予算ベースのリトライを行い、各 API コール後に壁時計のデッドラインを再チェックします。

SQL ソースのセキュリティ設計

SQL ソースは「ユーザーが任意の接続先 DSN を指定できる」性質上、SSRF を含む攻撃面が広くなります。以下のような多層防御を組み込んでいます。

読み取り専用トランザクションの強制

ダイアレクトごとに読み取り専用ヒントを発行します。

DB適用するヒント
PostgreSQLSET TRANSACTION READ ONLY
MySQLSET SESSION TRANSACTION READ ONLY
MariaDBSET SESSION TRANSACTION READ ONLY + max_statement_time
SQLitePRAGMA query_only=ON

特に SQLite は以前は silent no-op になっていたものを fail-closed に変更し、ステートメントタイムアウトが適用できない場合は構造化警告 (sql_statement_timeout_not_applied) を出すようにしました。

SSRF ガード

デフォルトで RFC1918 / loopback / link-local の宛先を拒否します。プライベートネットワーク内の RDB を読みたい場合は RECOTEM_SQL_ALLOW_PRIVATE=1 で明示的に opt-in します。

ガードは DSN の netloc だけでなく、各ドライバが認識するルーティング指定を全て検査します。

  • ?host= / ?hostaddr= (libpq)
  • ?service= / ?unix_socket= (拒否)
  • 絶対パスの ?host= (拒否)
  • ホスト情報を持たないネットワーク DSN (拒否)

これは SQLAlchemy の make_url が netloc しか url.host に格納しない一方で、libpq や PyMySQL は上記のクエリパラメータも参照するため、netloc だけ見ていると簡単にバイパスできてしまうからです。

DNS リバインディング対策

プローブとフェッチのタイミングで getaddrinfo を使って IPv4 / IPv6 双方を再解決し、解決された IP 集合をピン留めします。gethostbyname_ex 時代の IPv4 限定実装をやめたことで、デュアルスタックや IPv6 専用ホストでの誤検知も解消されました。

ログからの DSN ユーザー情報の除去

src/recotem/log_redaction.py に DSN userinfo スクラバーを追加し、構造化ログのチェーン先頭に置いています。空ユーザー名・URL エンコードされたパスワード・スキーム別の判定も組み込みで、s3:// / gs:// / az:// / abfs(s):// のようなオブジェクトストア URI は誤検知しないよう除外しています。

また、DataSourceError.code"datasource_error" に固定することで、pipeline.py が exc_info=False でロギングするようになり、psycopg / PyMySQL の例外チェーン経由で DSN userinfo が流出するのを防いでいます。

TLS 設定の Advisory

PostgreSQL / MySQL の DSN で sslmode= / ssl= が指定されていない場合、初期化時に sql_dsn_tls_not_configured を警告として記録します。

GA4 ソースの堅牢化

エラーの正しい分類

GA4 レスポンスの eventCount が非整数だった場合、以前は ValueError として exit code 1 (_EXIT_UNKNOWN) で落ちていましたが、DataSourceError (exit 3 / _EXIT_DATASOURCE) に変更しました。データソース起因の障害が運用上きちんと分類されるようになります。

Prometheus メトリクスの部分初期化耐性

_metrics_ga4 は atomic init とロールバックを実装しており、いずれかのカウンタ登録が失敗するとモジュール全体を no-op フォールバックにラッチします。catch する例外も第三者レジストリ起因の KeyError / ImportError まで広げて、本体の動作を阻害しないようにしています。

バリデーション強化

  • event_names は正規表現と長さ上限でバウンドチェック
  • weight_column が dimensions と衝突する場合はバリデーション時に拒否
  • ローリング XOR バグ修正済みの日付範囲バリデーション

新規追加された環境変数

変数デフォルト用途
RECOTEM_MAX_SQL_ROWS50,000,000SQL の行数ハードキャップ ([1,000, 500,000,000] にクランプ)
RECOTEM_SQL_ALLOW_PRIVATE(空)SQL ソースでプライベート / loopback DSN を許可
RECOTEM_GA4_MAX_PAGES500GA4 のページネーション上限 ([1, 10,000] にクランプ)

エントリーポイント登録とインストールヒント

新規データソースは [project.entry-points."recotem.datasources"] で登録しつつ、optional extras がインストールされていない wheel でもロード自体は通るように、フォールバック辞書と _BUILTIN_INSTALL_HINTS を用意しました。エラー時には適切な extras (postgres / mysql / sqlite / ga4 / all) をインストールするよう案内されます。

pip install 'recotem[postgres]'   # PostgreSQL
pip install 'recotem[mysql]'      # MySQL / MariaDB
pip install 'recotem[sqlite]'     # SQLite
pip install 'recotem[ga4]'        # GA4 Data API
pip install 'recotem[all]'        # 全部入り

テスト

ユニット / 統合 / fuzz の 3 階層で 1,674 ケース をカバーしました。

  • ユニット: 各ソースの設定バリデーション、SSRF の全ルーティング形式のパラメトライズ、IPv4 / IPv6 / デュアルスタックでの DNS リバインディング、PostgreSQL / MySQL / MariaDB / SQLite の読み取り専用 + ステートメントタイムアウトの SQL 完全一致検査、GA4 のリトライ判定 (ResourceExhausted / ServiceUnavailable / PermissionDenied)、weight_column 衝突拒否、非整数 eventCount 拒否、ログリダクションのオブジェクトストア URI 保持
  • 統合: SQLite を使った SQL の train + serve エンドツーエンド
  • Fuzz: 両ソースのレシピ形状に対する hypothesis のバイトミューテーション
  • レジストリ: 正常系とフォールバック経路の両方を検証、autouse フィクスチャでテスト間の LRU キャッシュ漏れを防止

まとめ

Recotem のデータソースに sqlga4 を追加し、運用中の RDB や GA4 のイベントログをそのまま学習データとして取り込めるようになりました。SQL ソースは SSRF / DNS リバインディング / DSN ログ流出対策を全ルーティング形式に対して入れ、読み取り専用も fail-closed に修正しています。GA4 ソースは ADC のみの認証・予算ベースのリトライ・部分初期化耐性のメトリクスといった、運用観点での堅牢化を行いました。既存の csv / parquet / bigquery レシピは無変更で、Recipe.source 判別子への追加だけの後方互換変更です。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です