日本語版はAIによる翻訳です。正確な情報については英語版をご参照ください。 英語版を表示
メインコンテンツまでスキップ

Snowflake

Snowflake からのデータ取り込みの設定

このガイドでは、Snowflake から Auxia の GCS バケットにデータをエクスポートして取り込む方法を説明します。

1. 概要

このアプローチでは、Snowflake のネイティブ COPY INTO コマンドを使用して、Snowflake から Auxia の GCS バケットに直接データをエクスポートします。その後、Auxia がそこからデータを取り込みます。

フロー:

Client (Snowflake)                         Auxia
| |
| | 1. Provide GCS bucket path
|<-------------------------------------|
| |
| 2. Create storage integration |
| (Snowflake generates a GCP |
| service account) |
| |
| 3. Retrieve & send service account |
|------------------------------------->|
| |
| | 4. Grant bucket access to
| | service account (via IAM)
|<-------------------------------------|
| |
| 5. Set up scheduled exports |
| |
| Data flows to GCS |
|=====================================>|

2. Auxia から提供されるもの

Auxia は以下を提供します。

  • 送信先 GCS バケットパス(例: gcs://<your_bucket_path>/

取得するには、Auxia のソリューションエンジニアにお問い合わせください。

3. 前提条件

  • このセットアップを行うユーザーに ACCOUNTADMIN ロールが付与されていること(ストレージ統合の作成に必要)。
  • コンピュート用の稼働中の Snowflake ウェアハウス。

4. Snowflake での SQL の実行

このガイドのすべての SQL コマンドは、Snowflake の Web インターフェース(Snowsight)を使用して実行します。

SQL ワークシートを開くには:

  1. 左側のナビゲーションメニューで、ProjectsWorksheets を選択します。

    Snowflake left nav — Projects → Worksheets

  2. 右上の + ボタンをクリックします。

    New worksheet button

  3. SQL Worksheet を選択します。

SQL コマンドを実行する前に:

  1. ワークシートの右上で Role を選択します(ストレージ統合コマンドには ACCOUNTADMIN を使用)。

    Role selector in worksheet

  2. コンピュートリソース用の Warehouse を選択します。

  3. 必要に応じて DatabaseSchema コンテキストを選択します。

SQL を実行するには:

  • ステートメントにカーソルを置き、Run ボタンをクリックします(Mac では Cmd+Enter、Windows では Ctrl+Enter)。

    Run button

  • すべてのステートメントを実行するには、先にテキスト全体を選択します。

5. 必要なアクション

ステップ 1: ストレージ統合の作成

注記

Google Cloud Console からコピーした GCS バケットパスは gs:// プレフィックスを使用します。Snowflake では代わりに gcs:// プレフィックスが必要です。

ACCOUNTADMIN ロールを選択した状態で、ワークシートで以下の SQL を実行します。

CREATE STORAGE INTEGRATION auxia_gcs_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<your_bucket_path>/');

正常に実行されると、以下が表示されます。

Successful storage integration creation

ステップ 2: サービスアカウントの取得と Auxia への送付

以下の SQL を実行して、Snowflake が生成した GCP サービスアカウントを取得します。

DESC STORAGE INTEGRATION auxia_gcs_integration;

STORAGE_GCP_SERVICE_ACCOUNT の行を見つけ、その property_value をコピーします。

DESC STORAGE INTEGRATION output

コピーした値を Auxia に送付してください。

Auxia は GCP IAM を通じてこのサービスアカウントに送信先バケットへの書き込みアクセスを付与します。

ステップ 3: スケジュールエクスポートの設定

Auxia が書き込みアクセスの付与を確認したら、以下に指定するフォーマットで GCS バケットにデータを配信するスケジュールエクスポートを設定します。

コストに関する考慮事項:

  • ウェアハウスコンピュート: 各タスク実行時に、ウェアハウスのサイズと実行時間に基づいて Snowflake クレジットが消費されます。課金は秒単位で、実行ごとに最低1分です。
  • データ転送料金: Snowflake はクロスリージョンまたはクロスクラウドのデータ転送に対してバイト単位の転送料金を請求します。同一リージョン内の転送は無料です。
  • 推奨事項: 適切なサイズのウェアハウス(例: 小規模データセットには X-Small)を使用し、データ量に基づいてエクスポート頻度を検討してください。

データフォーマット要件:

ファイルフォーマット

  • Parquet(必須)— エクスポートクエリで FILE_FORMAT = (TYPE = PARQUET) を明示的に指定する必要があります。Snowflake は Parquet ファイルにデフォルトで Snappy 圧縮を使用します。

パーティショニング

データは UTC 日付(dt=YYYY-MM-DD/)または UTC 日付と時間(dt=YYYY-MM-DD/hr=HH/)でパーティションする必要があります。各フォルダには、その UTC 時間(時間単位のエクスポート)または UTC 日(日単位のエクスポート)に Snowflake に追加されたデータが含まれる必要があります。

日次パーティショニング:

<your_bucket_path>/
data/<table_name>/
dt=2026-01-24/
data_0_0_0.parquet
dt=2026-01-25/
data_0_0_0.parquet
dt=2026-01-26/
data_0_0_0.parquet

時間単位パーティショニング:

<your_bucket_path>/
data/<table_name>/
dt=2026-01-26/
hr=00/
data_0_0_0.parquet
hr=01/
data_0_0_0.parquet
hr=14/
data_0_0_0.parquet

データ要件

  • エクスポートしたファイルはイミュータブルとして扱ってください。書き込み後に変更や削除をしないでください。
  • すべてのパーティショニングに UTC タイムスタンプを使用してください。

実現方法:

エクスポートでは、前回のエクスポート以降に追加された新しいデータをフィルタリングする必要があります。一般的なアプローチは、各行が Snowflake に挿入された日時を記録するタイムスタンプカラム(例: export_timestamp)を使用し、そのカラムに基づいてフィルタリングして対応する UTC 期間のデータを抽出することです。正確なフィルタリングロジックは、データ構造とパイプラインによって異なります。

以下は、参考となる Snowflake Task の例です。スケジュール、フィルタリングロジック、エクスポートパスは例示です。データパイプラインに合わせて変更してください。

サンプル: 時間単位エクスポート

-- Set database context
USE DATABASE <your_database>;
USE SCHEMA <your_schema>;

CREATE OR REPLACE TASK hourly_export_task
WAREHOUSE = <your_warehouse>
SCHEDULE = 'USING CRON 5 * * * * UTC' -- Example: runs once per hour, at HH:05 UTC
AS
DECLARE
export_ts TIMESTAMP_NTZ := DATEADD(hour, -1, CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP()));
export_date VARCHAR := TO_CHAR(export_ts, 'YYYY-MM-DD');
export_hour VARCHAR := TO_CHAR(export_ts, 'HH24');
BEGIN
EXECUTE IMMEDIATE '
COPY INTO ''gcs://<your_bucket_path>/data/<table_name>/dt=' || export_date || '/hr=' || export_hour || '/''
FROM (
SELECT *
FROM <your_database>.<your_schema>.<table_name>
-- Convert export_timestamp to UTC (replace <source_timezone> with your timezone, e.g., America/Los_Angeles)
WHERE DATE(CONVERT_TIMEZONE(''<source_timezone>'', ''UTC'', export_timestamp)) = ''' || export_date || '''::DATE
AND HOUR(CONVERT_TIMEZONE(''<source_timezone>'', ''UTC'', export_timestamp)) = ' || export_hour || '
)
STORAGE_INTEGRATION = auxia_gcs_integration
FILE_FORMAT = (TYPE = PARQUET)
HEADER = TRUE
';
END;

タスクが作成されたら、実行を開始するためにタスクを再開します。

ALTER TASK hourly_export_task RESUME;

サンプル: 日次エクスポート

-- Set database context
USE DATABASE <your_database>;
USE SCHEMA <your_schema>;

CREATE OR REPLACE TASK daily_export_task
WAREHOUSE = <your_warehouse>
SCHEDULE = 'USING CRON 30 0 * * * UTC' -- Example: runs once per day, at 00:30 UTC
AS
DECLARE
export_date VARCHAR := TO_CHAR(DATEADD(day, -1, CURRENT_DATE()), 'YYYY-MM-DD');
BEGIN
EXECUTE IMMEDIATE '
COPY INTO ''gcs://<your_bucket_path>/data/<table_name>/dt=' || export_date || '/''
FROM (
SELECT *
FROM <your_database>.<your_schema>.<table_name>
-- Convert export_timestamp to UTC (replace <source_timezone> with your timezone, e.g., America/Los_Angeles)
WHERE DATE(CONVERT_TIMEZONE(''<source_timezone>'', ''UTC'', export_timestamp)) = ''' || export_date || '''::DATE
)
STORAGE_INTEGRATION = auxia_gcs_integration
FILE_FORMAT = (TYPE = PARQUET)
HEADER = TRUE
';
END;

タスクが作成されたら、実行を開始するためにタスクを再開します。

ALTER TASK daily_export_task RESUME;

タスクが正常に作成されると、以下が表示されます。

Successful task creation

Task creation confirmation

6. タスク実行の監視

エクスポートタスクが正しく実行されていることを確認するには、以下の SQL コマンドを使用します。

すべてのタスクを一覧表示:

SHOW TASKS;

先ほど作成したタスクが結果に表示されます。

SHOW TASKS output

最近のタスク実行履歴を表示:

SELECT
NAME,
STATE,
SCHEDULED_TIME,
COMPLETED_TIME,
ERROR_MESSAGE
FROM TABLE(<your_database>.INFORMATION_SCHEMA.TASK_HISTORY())
WHERE NAME = 'HOURLY_EXPORT_TASK'
ORDER BY SCHEDULED_TIME DESC
LIMIT 10;

タスクが正常に実行されると、STATE カラムに SUCCEEDED と表示されます。

Task history with SUCCEEDED state

タスクの一時停止または再開:

-- Suspend
ALTER TASK hourly_export_task SUSPEND;

-- Resume
ALTER TASK hourly_export_task RESUME;

7. Auxia への確認

エクスポートタスクが実行され確認できたら、Auxia に通知し、以下を確認してください。

  • エクスポート頻度(時間単位または日次)
  • エクスポートするテーブル

8. 推奨プラクティス

  • Parquet フォーマットを使用 — Snowflake はデフォルトで Snappy 圧縮を使用して Parquet ファイルを圧縮します。
  • UTC 日付でパーティション(日次)または UTC 日付/時間(時間単位)でパーティション
  • イミュータビリティの確保 — ファイルが書き込まれたら、変更や削除をしないでください。
  • UTC タイムスタンプを使用 — すべてのパーティショニングは UTC 時間に基づく必要があります。
  • タスク実行の監視 — タスク履歴を定期的にチェックして、失敗や遅延がないか確認してください。

9. まとめ

ステップアクション担当
1GCS バケットパスの提供Auxia
2ストレージ統合の作成お客様
3サービスアカウントの取得と Auxia への送付お客様
4サービスアカウントへのバケットアクセスの付与(IAM 経由)Auxia
5スケジュールエクスポートタスクの設定お客様
6エクスポートの実行確認お客様

サポートが必要ですか?

support@auxia.io または Auxia のソリューションエンジニアまでお問い合わせください。