推薦システム構築ツール Recotem のレシピ駆動パイプラインに、2 つのファーストパーティデータソースを追加しました。SQLAlchemy 2 経由の汎用 SQL ソース (sql) と、Google Analytics 4 Data API の直接読み込みソース (ga4) です。これまで標準対応していた CSV / Parquet / BigQuery に加え、運用中の RDB や GA4 から直接学習データを取り込めるようになっています。
追加した 2 つのデータソース
sql — 汎用 SQL データソース
SQLAlchemy 2 を使った汎用的な SQL データソースで、PostgreSQL / MySQL / MariaDB / SQLite に対応します。
data:
source: sql
dsn_env: RECOTEM_PURCHASE_DSN
query: |
SELECT user_id, item_id, purchased_at
FROM purchases
WHERE purchased_at >= :since
params:
since: '2025-01-01'
user_column: user_id
item_column: item_id
DSN はレシピに直接書かず、必ず環境変数経由で渡す設計です。pandas.read_sql を 100k 行単位でチャンク読み込みし、RECOTEM_MAX_SQL_ROWS で行数の上限をハードキャップします。サーバーサイドカーソル (stream_results=True) を使って、メモリに乗り切らないサイズのテーブルを安全に扱えます。
ga4 — GA4 Data API データソース
Google Analytics 4 の Data API を直接叩いて、イベントログを学習データとして取り込みます。
data:
source: ga4
property_id: '123456789'
date_range:
start_date: '2025-01-01'
end_date: '2025-12-31'
event_names: ['purchase', 'add_to_cart']
user_column: clientId
item_column: itemId
認証は Application Default Credentials (ADC) のみ対応で、レシピに credentials を埋め込めない設計です。runReport のページングは RECOTEM_GA4_MAX_PAGES で上限を設けています。ResourceExhausted / ServiceUnavailable に対しては 3 × api_timeout_seconds の予算ベースのリトライを行い、各 API コール後に壁時計のデッドラインを再チェックします。
SQL ソースのセキュリティ設計
SQL ソースは「ユーザーが任意の接続先 DSN を指定できる」性質上、SSRF を含む攻撃面が広くなります。以下のような多層防御を組み込んでいます。
読み取り専用トランザクションの強制
ダイアレクトごとに読み取り専用ヒントを発行します。
| DB | 適用するヒント |
|---|---|
| PostgreSQL | SET TRANSACTION READ ONLY |
| MySQL | SET SESSION TRANSACTION READ ONLY |
| MariaDB | SET SESSION TRANSACTION READ ONLY + max_statement_time |
| SQLite | PRAGMA query_only=ON |
特に SQLite は以前は silent no-op になっていたものを fail-closed に変更し、ステートメントタイムアウトが適用できない場合は構造化警告 (sql_statement_timeout_not_applied) を出すようにしました。
SSRF ガード
デフォルトで RFC1918 / loopback / link-local の宛先を拒否します。プライベートネットワーク内の RDB を読みたい場合は RECOTEM_SQL_ALLOW_PRIVATE=1 で明示的に opt-in します。
ガードは DSN の netloc だけでなく、各ドライバが認識するルーティング指定を全て検査します。
?host=/?hostaddr=(libpq)?service=/?unix_socket=(拒否)- 絶対パスの
?host=(拒否) - ホスト情報を持たないネットワーク DSN (拒否)
これは SQLAlchemy の make_url が netloc しか url.host に格納しない一方で、libpq や PyMySQL は上記のクエリパラメータも参照するため、netloc だけ見ていると簡単にバイパスできてしまうからです。
DNS リバインディング対策
プローブとフェッチのタイミングで getaddrinfo を使って IPv4 / IPv6 双方を再解決し、解決された IP 集合をピン留めします。gethostbyname_ex 時代の IPv4 限定実装をやめたことで、デュアルスタックや IPv6 専用ホストでの誤検知も解消されました。
ログからの DSN ユーザー情報の除去
src/recotem/log_redaction.py に DSN userinfo スクラバーを追加し、構造化ログのチェーン先頭に置いています。空ユーザー名・URL エンコードされたパスワード・スキーム別の判定も組み込みで、s3:// / gs:// / az:// / abfs(s):// のようなオブジェクトストア URI は誤検知しないよう除外しています。
また、DataSourceError.code を "datasource_error" に固定することで、pipeline.py が exc_info=False でロギングするようになり、psycopg / PyMySQL の例外チェーン経由で DSN userinfo が流出するのを防いでいます。
TLS 設定の Advisory
PostgreSQL / MySQL の DSN で sslmode= / ssl= が指定されていない場合、初期化時に sql_dsn_tls_not_configured を警告として記録します。
GA4 ソースの堅牢化
エラーの正しい分類
GA4 レスポンスの eventCount が非整数だった場合、以前は ValueError として exit code 1 (_EXIT_UNKNOWN) で落ちていましたが、DataSourceError (exit 3 / _EXIT_DATASOURCE) に変更しました。データソース起因の障害が運用上きちんと分類されるようになります。
Prometheus メトリクスの部分初期化耐性
_metrics_ga4 は atomic init とロールバックを実装しており、いずれかのカウンタ登録が失敗するとモジュール全体を no-op フォールバックにラッチします。catch する例外も第三者レジストリ起因の KeyError / ImportError まで広げて、本体の動作を阻害しないようにしています。
バリデーション強化
event_namesは正規表現と長さ上限でバウンドチェックweight_columnが dimensions と衝突する場合はバリデーション時に拒否- ローリング XOR バグ修正済みの日付範囲バリデーション
新規追加された環境変数
| 変数 | デフォルト | 用途 |
|---|---|---|
RECOTEM_MAX_SQL_ROWS | 50,000,000 | SQL の行数ハードキャップ ([1,000, 500,000,000] にクランプ) |
RECOTEM_SQL_ALLOW_PRIVATE | (空) | SQL ソースでプライベート / loopback DSN を許可 |
RECOTEM_GA4_MAX_PAGES | 500 | GA4 のページネーション上限 ([1, 10,000] にクランプ) |
エントリーポイント登録とインストールヒント
新規データソースは [project.entry-points."recotem.datasources"] で登録しつつ、optional extras がインストールされていない wheel でもロード自体は通るように、フォールバック辞書と _BUILTIN_INSTALL_HINTS を用意しました。エラー時には適切な extras (postgres / mysql / sqlite / ga4 / all) をインストールするよう案内されます。
pip install 'recotem[postgres]' # PostgreSQL
pip install 'recotem[mysql]' # MySQL / MariaDB
pip install 'recotem[sqlite]' # SQLite
pip install 'recotem[ga4]' # GA4 Data API
pip install 'recotem[all]' # 全部入り
テスト
ユニット / 統合 / fuzz の 3 階層で 1,674 ケース をカバーしました。
- ユニット: 各ソースの設定バリデーション、SSRF の全ルーティング形式のパラメトライズ、IPv4 / IPv6 / デュアルスタックでの DNS リバインディング、PostgreSQL / MySQL / MariaDB / SQLite の読み取り専用 + ステートメントタイムアウトの SQL 完全一致検査、GA4 のリトライ判定 (
ResourceExhausted/ServiceUnavailable/PermissionDenied)、weight_column衝突拒否、非整数eventCount拒否、ログリダクションのオブジェクトストア URI 保持 - 統合: SQLite を使った SQL の train + serve エンドツーエンド
- Fuzz: 両ソースのレシピ形状に対する hypothesis のバイトミューテーション
- レジストリ: 正常系とフォールバック経路の両方を検証、autouse フィクスチャでテスト間の LRU キャッシュ漏れを防止
まとめ
Recotem のデータソースに sql と ga4 を追加し、運用中の RDB や GA4 のイベントログをそのまま学習データとして取り込めるようになりました。SQL ソースは SSRF / DNS リバインディング / DSN ログ流出対策を全ルーティング形式に対して入れ、読み取り専用も fail-closed に修正しています。GA4 ソースは ADC のみの認証・予算ベースのリトライ・部分初期化耐性のメトリクスといった、運用観点での堅牢化を行いました。既存の csv / parquet / bigquery レシピは無変更で、Recipe.source 判別子への追加だけの後方互換変更です。
- PR #92: feat(datasource): add SQL + GA4 Data API sources
- リポジトリ: codelibs/recotem