Apache Airflow
Enables orchestration and scheduling of data workflows with StarRocks using DAGs (Directed Acyclic Graphs) and SQL operators. Use Airflow for data loading and transformation using the SQLExecuteQueryOperator and MySQLHook without any implementation or complex configuration.
Apache Airflow GitHub repo.
Supported featuresβ
- SQL Execution through MySQL protocol
- Connection management
- Transaction support
- Parameterized queries
- Task dependencies
- Retry logic
Installationβ
Prerequisitesβ
- Apache Airflow 2.0+ or 3.0+
- Python 3.8+
- Access to a StarRocks cluster (see the quickstart guide)
Installβ
The MySQL provider package is required to use StarRocks as StarRocks uses MySQL protocol.
pip install apache-airflow-providers-mysql
Verify the installation by checking the installed providers:
airflow providers list
This should list apache-airflow-providers-mysql in the output.
Configurationβ
Create a StarRocks Connectionβ
Create a StarRocks connection in the Airflow UI or via environment variable. The name of the connection will be used by the DAGs later.
Via Airflow UIβ
- Navigate to Admin > Connections
- Click the + button to add a new connection
- Configure the connection:
- Connection Id:
starrocks_default - Connection Type: MySQL
- Host:
your-starrocks-host.com - Schema:
your_database - Login:
your_username - Password:
your_password - Port:
9030
Via Airflow CLIβ
airflow connections add 'starrocks_default' \
--conn-type 'mysql' \
--conn-host 'your-starrocks-host.com' \
--conn-schema 'your_database' \
--conn-login 'your_username' \
--conn-password 'your_password' \
--conn-port 9030
Usage Examplesβ
These examples demonstrate common patterns for integrating StarRocks with Airflow. Each example builds on core concepts while showcasing different approaches to data loading, transformation, and workflow orchestration.
What You'll Learn:
- Data Loading: Efficiently load data from CSV files and cloud storage into StarRocks
- Data Transformation: Execute SQL queries and process results with Python
- Advanced Patterns: Implement incremental loading, async operations, and query optimization
- Production Best Practices: Handle errors gracefully and build resilient pipelines
All examples use the crash data tables described in the quickstart guide.
Data Loadingβ
Stream Data Loadingβ
Load large CSV files efficiently using StarRocks Stream Load API. Stream Load is the recommended approach for:
- High-throughput data loading (supports parallel loads)
- Loading data with column transformations and filtering
Stream Load provides better performance than INSERT INTO VALUES statements for large datasets and includes built-in features like error tolerance. Note that this does require the CSV file is accessible on the Airflow worker's filesystem.
from airflow.sdk import dag, task
from airflow.hooks.base import BaseHook
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
from urllib.parse import urlparse
class PreserveAuthSession(requests.Session):
"""
Custom session that preserves Authorization header across redirects.
StarRocks FE may redirect Stream Load requests to BE nodes.
"""
def rebuild_auth(self, prepared_request, response):
old = urlparse(response.request.url)
new = urlparse(prepared_request.url)
# Only preserve auth when redirecting to same hostname
if old.hostname == new.hostname:
prepared_request.headers["Authorization"] = response.request.headers.get("Authorization")
@dag(
dag_id="starrocks_stream_load_example",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["starrocks", "stream_load", "example"],
)
def starrocks_stream_load_example():
@task
def load_csv_to_starrocks():
# Configuration
DATABASE = "quickstart"
TABLE = "crashdata"
CSV_PATH = "/path/to/crashdata.csv"
conn = BaseHook.get_connection("starrocks_default")
url = f"http://{conn.host}:{conn.port}/api/{DATABASE}/{TABLE}/_stream_load"
# Generate unique label
from airflow.sdk import get_current_context
context = get_current_context()
execution_date = context['logical_date'].strftime('%Y%m%d_%H%M%S')
label = f"{TABLE}_load_{execution_date}"
headers = {
"label": label,
"column_separator": ",",
"skip_header": "1",
"max_filter_ratio": "0.1", # Allow up to 10% error rate
"Expect": "100-continue",
"columns": """
tmp_CRASH_DATE, tmp_CRASH_TIME,
CRASH_DATE=str_to_date(concat_ws(' ', tmp_CRASH_DATE, tmp_CRASH_TIME), '%m/%d/%Y %H:%i'),
BOROUGH, ZIP_CODE, LATITUDE, LONGITUDE, LOCATION,
ON_STREET_NAME, CROSS_STREET_NAME, OFF_STREET_NAME,
NUMBER_OF_PERSONS_INJURED, NUMBER_OF_PERSONS_KILLED,
NUMBER_OF_PEDESTRIANS_INJURED, NUMBER_OF_PEDESTRIANS_KILLED,
NUMBER_OF_CYCLIST_INJURED, NUMBER_OF_CYCLIST_KILLED,
NUMBER_OF_MOTORIST_INJURED, NUMBER_OF_MOTORIST_KILLED,
CONTRIBUTING_FACTOR_VEHICLE_1, CONTRIBUTING_FACTOR_VEHICLE_2,
CONTRIBUTING_FACTOR_VEHICLE_3, CONTRIBUTING_FACTOR_VEHICLE_4,
CONTRIBUTING_FACTOR_VEHICLE_5, COLLISION_ID,
VEHICLE_TYPE_CODE_1, VEHICLE_TYPE_CODE_2,
VEHICLE_TYPE_CODE_3, VEHICLE_TYPE_CODE_4, VEHICLE_TYPE_CODE_5
""".replace("\n", "").replace(" ", ""),
}
session = PreserveAuthSession()
with open(CSV_PATH, "rb") as f:
response = session.put(
url,
headers=headers,
data=f,
auth=HTTPBasicAuth(conn.login, conn.password or ""),
timeout=3600,
)
result = response.json()
print(f"\nStream Load Response:")
print(f" Status: {result.get('Status')}")
print(f" Loaded Rows: {result.get('NumberLoadedRows', 0):,}")
if result.get("Status") == "Success":
return result
else:
error_msg = result.get("Message", "Unknown error")
raise Exception(f"Stream Load failed: {error_msg}")
load_csv_to_starrocks()
starrocks_stream_load_example()
Insert From Filesβ
Use StarRocks' FILES() table function to load data directly from files. This approach is ideal for:
- Loading data from S3, HDFS, Google Cloud Storage
- One-step data ingestion with transformations applied during load
- Ad-hoc data loads from various file sources
FILES() supports multiple file formats and storage systems, making it a flexible alternative to Stream Load for certain use cases. The data is read and inserted in a single SQL statement.
from airflow.sdk import dag, task
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime
FILE_PATH = "path_to_file_here"
@dag(
dag_id='crashdata_dynamic_files_load',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['starrocks', 'files', 'dynamic'],
)
def crashdata_files():
@task
def load_file(file_path: str):
hook = MySqlHook(mysql_conn_id='starrocks_default')
sql = f"""
INSERT INTO crashdata (
CRASH_DATE, BOROUGH, ZIP_CODE, LATITUDE, LONGITUDE,
LOCATION, ON_STREET_NAME, CROSS_STREET_NAME, OFF_STREET_NAME,
CONTRIBUTING_FACTOR_VEHICLE_1, CONTRIBUTING_FACTOR_VEHICLE_2,
COLLISION_ID, VEHICLE_TYPE_CODE_1, VEHICLE_TYPE_CODE_2
)
SELECT
STR_TO_DATE(CONCAT_WS(' ', `CRASH DATE`, `CRASH TIME`), '%m/%d/%Y %H:%i'),
BOROUGH,
`ZIP CODE`,
CAST(LATITUDE as INT),
CAST(LONGITUDE as INT),
LOCATION,
`ON STREET NAME`,
`CROSS STREET NAME`,
`OFF STREET NAME`,
`CONTRIBUTING FACTOR VEHICLE 1`,
`CONTRIBUTING FACTOR VEHICLE 2`,
CAST(`COLLISION_ID` as INT),
`VEHICLE TYPE CODE 1`,
`VEHICLE TYPE CODE 2`
FROM FILES(
"path" = "s3://{file_path}",
"format" = "parquet",
"aws.s3.access_key" = "XXXXXXXXXX",
"aws.s3.secret_key" = "YYYYYYYYYY",
"aws.s3.region" = "us-west-2"
)
"""
result = hook.run(sql)
return file_path
load_file(FILE_PATH)
crashdata_files()