Spark コネクタを使用してデータをロードする(推奨)
StarRocks は、Apache Spark™ 用に開発されたコネクタである StarRocks Connector for Apache Spark™(以下、Spark コネクタ)を提供しており、Spark を使用して StarRocks テーブルにデータをロードするのに役立ちます。基本的な原理は、データを蓄積し、STREAM LOAD を通じて一度にすべてのデータを StarRocks にロードすることです。Spark コネクタは Spark DataSource V2 に基づいて実装されています。DataSource は Spark DataFrames または Spark SQL を使用して作成できます。バッチモードと構造化ストリーミングモードの両方がサポートされています。
注意
StarRocks テーブルにデータをロードできるのは、SELECT および INSERT 権限を持つユーザーのみです。GRANT の指示に従って、これらの権限をユーザーに付与できます。
バージョン要件
| Spark コネクタ | Spark | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 以降 | 8 | 2.12 |
| 1.1.1 | 3.2, 3.3, または 3.4 | 2.5 以降 | 8 | 2.12 |
| 1.1.0 | 3.2, 3.3, または 3.4 | 2.5 以降 | 8 | 2.12 |
注意
- Spark コネクタのバージョン間の動作の変更については、Upgrade Spark connector を参照してください。
- Spark コネクタはバージョン 1.1.1 以降、MySQL JDBC ドライバを提供していないため、手動で Spark クラスパスにドライバをインポートする必要があります。ドライバは MySQL サイト または Maven Central で見つけることができます。
Spark コネクタの取得
Spark コネクタの JAR ファイルは以下の方法で取得できます:
- コンパイル済みの Spark コネクタ JAR ファイルを直接ダウンロードする。
- Maven プロジェクトに Spark コネクタを依存関係として追加し、JAR ファイルをダウンロードする。
- Spark コネクタのソースコードを自分でコンパイルして JAR ファイルを作成する。
Spark コネクタ JAR ファイルの命名形式は starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar です。
例えば、環境に Spark 3.2 と Scala 2.12 をインストールし、Spark コネクタ 1.1.0 を使用したい場合、starrocks-spark-connector-3.2_2.12-1.1.0.jar を使用できます。
注意
一般に、最新 バージョンの Spark コネクタは、Spark の最新の 3 バージョンとのみ互換性があります。
コンパイル済み Jar ファイルのダウンロード
Maven Central Repository から対応するバージョンの Spark コネクタ JAR を直接ダウンロードします。
Maven 依存関係
-
Maven プロジェクトの
pom.xmlファイルに、以下の形式で Spark コネクタを依存関係として追加します。spark_version、scala_version、connector_versionをそれぞれのバージョンに置き換えてください。<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency> -
例えば、環境の Spark バージョンが 3.2、Scala バージョンが 2.12 で、Spark コネクタ 1.1.0 を選択した場合、以下の依存関係を追加する必要があります:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
自分でコンパイルする
-
Spark コネクタパッケージ をダウンロードします。
-
以下のコマンドを実行して、Spark コネクタのソースコードを JAR ファイルにコンパイルします。
spark_versionは対応する Spark バージョンに置き換えてください。sh build.sh <spark_version>例えば、環境の Spark バージョンが 3.2 の場合、以下のコマンドを実行する必要があります:
sh build.sh 3.2 -
target/ディレクトリに移動し、コンパイル時に生成された Spark コネクタ JAR ファイル(例:starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar)を見つけます。
注意
正式にリリースされていない Spark コネクタの名前には
SNAPSHOTサフィックスが含まれています。
パラメータ
starrocks.fe.http.url
必須: YES
デフォルト値: なし
説明: StarRocks クラスター内の FE の HTTP URL。複数の URL を指定でき、カンマ (,) で区切る必要があります。形式: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>。バージョン 1.1.1 以降、URL に http:// プレフィックスを追加することもできます。例:http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2>。
starrocks.fe.jdbc.url
必須: YES
デフォルト値: なし
説明: FE の MySQL サーバーに接続するために使用されるアドレス。形式: jdbc:mysql://<fe_host>:<fe_query_port>。
starrocks.table.identifier
必須: YES
デフォルト値: なし
説明: StarRocks テーブルの名前。形式: <database_name>.<table_name>。
starrocks.user
必須: YES
デフォルト値: なし
説明: StarRocks クラスターアカウントのユーザー名。ユーザーは StarRocks テーブルに対する SELECT および INSERT 権限 が必要です。
starrocks.password
必須: YES
デフォルト値: なし
説明: StarRocks クラスターアカウントのパスワード。
starrocks.write.label.prefix
必須: NO
デフォルト値: spark-
説明: Stream Load で使用されるラベルプレフィックス。
starrocks.write.enable.transaction-stream-load
必須: NO
デフォルト値: TRUE
説明: データをロードするために Stream Load トランザクションインターフェース を使用するかどうか。StarRocks v2.5 以降が必要です。この機能は、トランザクション内でより多くのデータを少ないメモリ使 用量でロードし、パフォーマンスを向上させます。
注意: バージョン 1.1.1 以降、このパラメータは starrocks.write.max.retries の値が非正の場合にのみ有効です。なぜなら、Stream Load トランザクションインターフェースはリトライをサポートしていないためです。
starrocks.write.buffer.size
必須: NO
デフォルト値: 104857600
説明: 一度に StarRocks に送信される前にメモリに蓄積できるデータの最大サイズ。このパラメータを大きな値に設定すると、ロードパフォーマンスが向上しますが、ロードの遅延が増加する可能性があります。
starrocks.write.buffer.rows
必須: NO
デフォルト値: Integer.MAX_VALUE
説明: バージョン 1.1.1 以降でサポートされています。一度に StarRocks に送信される前にメモリに蓄積できる行の最大数。
starrocks.write.flush.interval.ms
必須: NO
デフォルト値: 300000
説明: データが StarRocks に送信される間隔。このパラメータはロードの遅延を制御するために使用されます。
starrocks.write.max.retries
必須: NO
デフォルト値: 3
説明: バージョン 1.1.1 以降でサポートされています。ロードが失敗した場合に同じバッチのデータに対して Stream Load を再試行する回数。
注意: Stream Load トランザクションインターフェースはリトライをサポートしていないため、このパラメータが正の場合、コネクタは常に Stream Load インターフェースを使用し、starrocks.write.enable.transaction-stream-load の値を無視します。
starrocks.write.retry.interval.ms
必須: NO
デフォルト値: 10000
説明: バージョン 1.1.1 以降でサポートされています。ロードが失敗した場合に同じバッチのデータに対して Stream Load を再試行する間隔。
starrocks.columns
必須: NO
デフォルト値: なし
説明: データをロードしたい StarRocks テーブルの列。複数の列を指定でき、カンマ (,) で区切る必要があります。例:"col0,col1,col2"。
starrocks.column.types
必須: NO
デフォルト値: なし
説明: バージョン 1.1.1 以降でサポートされています。StarRocks テーブルから推測されるデフォルトや デフォルトのマッピング を使用する代わりに、Spark 用の列データ型をカスタマイズします。パラメータ値は Spark の StructType#toDDL の出力と同じ DDL 形式のスキーマです。例:col0 INT, col1 STRING, col2 BIGINT。カスタマイズが必要な列のみを指定する必要があります。使用例として、BITMAP または HLL 型の列にデータをロードすることがあります。
starrocks.write.properties.*
必須: NO
デフ ォルト値: なし
説明: Stream Load の動作を制御するために使用されるパラメータ。例えば、パラメータ starrocks.write.properties.format はロードされるデータの形式を指定します。CSV や JSON などです。サポートされているパラメータとその説明のリストについては、STREAM LOAD を参照してください。
starrocks.write.properties.format
必須: NO
デフォルト値: CSV
説明: Spark コネクタが StarRocks にデータを送信する前に各バッチのデータを変換するためのファイル形式。有効な値:CSV および JSON。
starrocks.write.properties.row_delimiter
必須: NO
デフォルト値: \n
説明: CSV 形式のデータの行区切り文字。