Apache Airflow
は、DAG (Directed Acyclic Graphs) と SQL オペレーターを使用して、StarRocks とのデータワークフローのオーケストレーションとスケジューリングを可能にします。SQLExecuteQueryOperator と MySQLHook を使用して、実装や複雑な設定なしでデータロードと変換を行うことができます。
Apache Airflow GitHub リポジトリ.
サポートされている機能
- MySQL プロトコルを介した SQL 実行
- 接続管理
- トランザクションサポート
- パラメータ化されたクエリ
- タスク依存関係
- リトライロジック
インストール
前提条件
- Apache Airflow 2.0+ または 3.0+
- Python 3.8+
- StarRocks クラスターへのアクセス ( クイックスタートガイド を参照)
インストール
StarRocks は MySQL プロトコルを使用するため、MySQL プロバイダーパッケージが必要です。
pip install apache-airflow-providers-mysql
インストールを確認するには、インストールされたプロバイダーを確認します。
airflow providers list
この出力には apache-airflow-providers-mysql が含まれているはずです。
設定
StarRocks 接続の作成
Airflow UI または環境変数を介して StarRocks 接続を作成します。接続名は後で DAG によって使用されます。
Airflow UI を介して
- Admin > Connections に移動
-
- ボタンをクリックして新しい接続を追加
- 接続を設定:
- Connection Id:
starrocks_default - Connection Type: MySQL
- Host:
your-starrocks-host.com - Schema:
your_database - Login:
your_username - Password:
your_password - Port:
9030
Airflow CLI を介して
airflow connections add 'starrocks_default' \
--conn-type 'mysql' \
--conn-host 'your-starrocks-host.com' \
--conn-schema 'your_database' \
--conn-login 'your_username' \
--conn-password 'your_password' \
--conn-port 9030
使用例
これらの例は、StarRocks と Airflow を統合するための一般的なパターンを示しています。各例は、データロード、変換、ワークフローオーケストレーションの異なるアプローチを紹介しながら、基本的な概念を構築します。
学べること:
- データロード: CSV ファイルやクラウドストレージから StarRocks への効率的なデータロード
- データ変換: SQL クエリの実行と Python を使用した結果の処理
- 高度なパターン: 増分ロード、非同期操作、クエリ最適化の実装
- プロダクションのベストプラクティス: エラーを優雅に処理し、堅牢なパイプラインを構築
すべての例は、 クイックスタートガイド で説明されているクラッシュデータテーブルを使用します。
データロード
ストリームデータロード
StarRocks Stream Load API を使用して大きな CSV ファイルを効率的にロードします。Stream Load は以下の用途に推奨されます:
- 高スループットのデータロード (並列ロードをサポート)
- 列変換とフィルタリングを伴うデータのロード
Stream Load は、大規模データセットに対して INSERT INTO VALUES ステートメントよりも優れたパフォーマンスを提供し、エラー耐性などの組み込み機能を含みます。なお、CSV ファイルは Airflow ワーカーのファイルシステムでアクセス可能である必要があります。
from airflow.sdk import dag, task
from airflow.hooks.base import BaseHook
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
from urllib.parse import urlparse
class PreserveAuthSession(requests.Session):
"""
リダイレクトをまたいで Authorization ヘッダーを保持するカスタムセッション。
StarRocks FE は Stream Load リクエストを BE ノードにリダイレクトすることがあります。
"""
def rebuild_auth(self, prepared_request, response):
old = urlparse(response.request.url)
new = urlparse(prepared_request.url)
# 同じホスト名にリダイレクトする場合のみ認証を保持
if old.hostname == new.hostname:
prepared_request.headers["Authorization"] = response.request.headers.get("Authorization")
@dag(
dag_id="starrocks_stream_load_example",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["starrocks", "stream_load", "example"],
)
def starrocks_stream_load_example():
@task
def load_csv_to_starrocks():
# 設定
DATABASE = "quickstart"
TABLE = "crashdata"
CSV_PATH = "/path/to/crashdata.csv"
conn = BaseHook.get_connection("starrocks_default")
url = f"http://{conn.host}:{conn.port}/api/{DATABASE}/{TABLE}/_stream_load"
# ユニークなラベルを生成
from airflow.sdk import get_current_context
context = get_current_context()
execution_date = context['logical_date'].strftime('%Y%m%d_%H%M%S')
label = f"{TABLE}_load_{execution_date}"
headers = {
"label": label,
"column_separator": ",",
"skip_header": "1",
"max_filter_ratio": "0.1", # 最大 10% のエラー率を許容
"Expect": "100-continue",
"columns": """
tmp_CRASH_DATE, tmp_CRASH_TIME,
CRASH_DATE=str_to_date(concat_ws(' ', tmp_CRASH_DATE, tmp_CRASH_TIME), '%m/%d/%Y %H:%i'),
BOROUGH, ZIP_CODE, LATITUDE, LONGITUDE, LOCATION,
ON_STREET_NAME, CROSS_STREET_NAME, OFF_STREET_NAME,
NUMBER_OF_PERSONS_INJURED, NUMBER_OF_PERSONS_KILLED,
NUMBER_OF_PEDESTRIANS_INJURED, NUMBER_OF_PEDESTRIANS_KILLED,
NUMBER_OF_CYCLIST_INJURED, NUMBER_OF_CYCLIST_KILLED,
NUMBER_OF_MOTORIST_INJURED, NUMBER_OF_MOTORIST_KILLED,
CONTRIBUTING_FACTOR_VEHICLE_1, CONTRIBUTING_FACTOR_VEHICLE_2,
CONTRIBUTING_FACTOR_VEHICLE_3, CONTRIBUTING_FACTOR_VEHICLE_4,
CONTRIBUTING_FACTOR_VEHICLE_5, COLLISION_ID,
VEHICLE_TYPE_CODE_1, VEHICLE_TYPE_CODE_2,
VEHICLE_TYPE_CODE_3, VEHICLE_TYPE_CODE_4, VEHICLE_TYPE_CODE_5
""".replace("\n", "").replace(" ", ""),
}
session = PreserveAuthSession()
with open(CSV_PATH, "rb") as f:
response = session.put(
url,
headers=headers,
data=f,
auth=HTTPBasicAuth(conn.login, conn.password or ""),
timeout=3600,
)
result = response.json()
print(f"\nStream Load Response:")
print(f" Status: {result.get('Status')}")
print(f" Loaded Rows: {result.get('NumberLoadedRows', 0):,}")
if result.get("Status") == "Success":
return result
else:
error_msg = result.get("Message", "Unknown error")
raise Exception(f"Stream Load failed: {error_msg}")
load_csv_to_starrocks()
starrocks_stream_load_example()
ファイルからの挿入
StarRocks の FILES() テーブル関数を使用して、ファイルから直接データをロードします。このアプローチは以下に最適です:
- S3、HDFS、Google Cloud Storage からのデータロード
- ロード中に変換を適用するワンステップのデータ取り込み
- 様々なファイルソースからのアドホックデータロード
FILES() は複数のファイル形式とストレージシステムをサポートしており、特定のユースケースに対する Stream Load の柔軟な代替手段となります。データは単一の SQL ステートメントで読み込まれ、挿入されます。
from airflow.sdk import dag, task
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
FILE_PATH = "path_to_file_here"
@dag(
dag_id='crashdata_dynamic_files_load',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['starrocks', 'files', 'dynamic'],
)
def crashdata_files():
@task
def load_file(file_path: str):
hook = MySqlHook(mysql_conn_id='starrocks_default')
sql = f"""
INSERT INTO crashdata (
CRASH_DATE, BOROUGH, ZIP_CODE, LATITUDE, LONGITUDE,
LOCATION, ON_STREET_NAME, CROSS_STREET_NAME, OFF_STREET_NAME,
CONTRIBUTING_FACTOR_VEHICLE_1, CONTRIBUTING_FACTOR_VEHICLE_2,
COLLISION_ID, VEHICLE_TYPE_CODE_1, VEHICLE_TYPE_CODE_2
)
SELECT
STR_TO_DATE(CONCAT_WS(' ', `CRASH DATE`, `CRASH TIME`), '%m/%d/%Y %H:%i'),
BOROUGH,
`ZIP CODE`,
CAST(LATITUDE as INT),
CAST(LONGITUDE as INT),
LOCATION,
`ON STREET NAME`,
`CROSS STREET NAME`,
`OFF STREET NAME`,
`CONTRIBUTING FACTOR VEHICLE 1`,
`CONTRIBUTING FACTOR VEHICLE 2`,
CAST(`COLLISION_ID` as INT),
`VEHICLE TYPE CODE 1`,
`VEHICLE TYPE CODE 2`
FROM FILES(
"path" = "s3://{file_path}",
"format" = "parquet",
"aws.s3.access_key" = "XXXXXXXXXX",
"aws.s3.secret_key" = "YYYYYYYYYY",
"aws.s3.region" = "us-west-2"
)
"""
result = hook.run(sql)
return file_path
load_file(FILE_PATH)
crashdata_files()
データ変換
StarRocks に対して SQL クエリを実行し、テーブル作成やデータ挿入を行います。これは以下に役立ちます:
- データベーススキーマの設定
- 小規模データセットのロード
- アドホッククエリの実行
from airflow.sdk import dag, chain
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime
@dag(
dag_id='crashdata_basic_setup',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['starrocks', 'crashdata'],
)
def crashdata_basic_pipeline():
"""crashdata テーブルを作成し、サンプルの NYC クラッシュデータを挿入します。"""
create_table = SQLExecuteQueryOperator(
task_id='create_crashdata_table',
conn_id='starrocks_default',
sql="""
CREATE TABLE IF NOT EXISTS crashdata (
CRASH_DATE DATETIME,
BOROUGH STRING,
ZIP_CODE STRING,
LATITUDE INT,
LONGITUDE INT,
LOCATION STRING,
ON_STREET_NAME STRING,
CROSS_STREET_NAME STRING,
OFF_STREET_NAME STRING,
CONTRIBUTING_FACTOR_VEHICLE_1 STRING,
CONTRIBUTING_FACTOR_VEHICLE_2 STRING,
COLLISION_ID INT,
VEHICLE_TYPE_CODE_1 STRING,
VEHICLE_TYPE_CODE_2 STRING
)
DUPLICATE KEY(CRASH_DATE)
DISTRIBUTED BY HASH(COLLISION_ID) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
)
""",
)
insert_data = SQLExecuteQueryOperator(
task_id='insert_sample_data',
conn_id='starrocks_default',
sql="""
INSERT INTO crashdata VALUES
('2024-01-15 08:30:00', 'MANHATTAN', '10001', 40748817, -73985428,
'(40.748817, -73.985428)', '5 AVENUE', 'WEST 34 STREET', NULL,
'Driver Inattention/Distraction', 'Unspecified', 4567890, 'Sedan', 'Taxi'),
('2024-01-15 14:20:00', 'BROOKLYN', '11201', 40693139, -73987664,
'(40.693139, -73.987664)', 'FLATBUSH AVENUE', 'ATLANTIC AVENUE', NULL,
'Failure to Yield Right-of-Way', 'Unspecified', 4567891, 'SUV', 'Sedan'),
('2024-01-15 18:45:00', 'QUEENS', '11354', 40767689, -73827426,
'(40.767689, -73.827426)', 'NORTHERN BOULEVARD', 'MAIN STREET', NULL,
'Following Too Closely', 'Driver Inattention/Distraction', 4567892, 'Sedan', 'Sedan'),
('2024-01-16 09:15:00', 'BRONX', '10451', 40820679, -73925300,
'(40.820679, -73.925300)', 'GRAND CONCOURSE', 'EAST 161 STREET', NULL,
'Unsafe Speed', 'Unspecified', 4567893, 'Truck', 'Sedan')
""",
)
create_table >> insert_data
crashdata_basic_pipeline()
MySqlHook を使用したより複雑な操作
MySqlHook を使用して、Python タスク内で高度なデータ分析と処理を行います。このアプローチは以下に役立ちます:
- 分析クエリを実行し、Python で結果を処理
- StarRocks クエリを Python ライブラリ (pandas, numpy など) と組み合わせる
- SQL と Python の両方を必要とする複雑なビジネスロジックの実装
- データ品質チェックと検証ワークフローの作成
MySqlHook はクエリ結果への完全なプログラムアク セスを提供し、DAG 内での高度なデータ変換と分析を可能にします。
from airflow.sdk import dag, task
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
@dag(
dag_id='crashdata_python_analysis',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['starrocks', 'python', 'analytics'],
)
def crashdata_python_pipeline():
@task
def analyze_crash_hotspots():
"""バローとストリートごとのクラッシュホットスポットを特定します。"""
hook = MySqlHook(mysql_conn_id='starrocks_default')
# 高頻度クラッシュロケーションを見つけるためのクエリ
sql = """
SELECT
BOROUGH,
ON_STREET_NAME,
COUNT(*) as crash_count,
COUNT(DISTINCT DATE(CRASH_DATE)) as days_with_crashes
FROM crashdata
WHERE ON_STREET_NAME IS NOT NULL
GROUP BY BOROUGH, ON_STREET_NAME
HAVING crash_count >= 3
ORDER BY crash_count DESC
LIMIT 10
"""
results = hook.get_records(sql)
print("トップ 10 クラッシュホットスポット:")
for row in results:
borough, street, count, days = row
print(f"{borough:15} | {street:40} | {count:3} クラッシュ {days} 日間")
return len(results)
@task
def calculate_contributing_factors():
"""寄与要因の割合分布を計算します。"""
hook = MySqlHook(mysql_conn_id='starrocks_default')
sql = """
SELECT
CONTRIBUTING_FACTOR_VEHICLE_1 as factor,
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentage
FROM crashdata
WHERE CONTRIBUTING_FACTOR_VEHICLE_1 != 'Unspecified'
GROUP BY CONTRIBUTING_FACTOR_VEHICLE_1
ORDER BY count DESC
"""
results = hook.get_records(sql)
print("\n寄与要因分析:")
for factor, count, percentage in results:
print(f"{factor:50} | {count:4} ({percentage}%)")
return results
# タスク実行順序を定義
hotspots = analyze_crash_hotspots()
factors = calculate_contributing_factors()
hotspots >> factors
crashdata_python_pipeline()