Spark コネクタを使用してデータをロードする(推奨)
StarRocks は、Apache Spark™ 用に開発されたコネクタである StarRocks Connector for Apache Spark(以下、Spark コネクタ)を提供しています。このコネクタを使用して、Spark を通じて StarRocks テーブルにデータをロードすることができます。基本的な原理は、データを蓄積し、STREAM LOAD を通じて一度に StarRocks にロードすることです。Spark コネクタは Spark DataSource V2 に基づいて実装されています。DataSource は Spark DataFrames または Spark SQL を使用して作成できます。バッチモードと構造化ストリーミングモードの両方がサポートされています。
注意
StarRocks テーブルにデータをロードできるのは、SELECT および INSERT 権限を持つユーザーのみです。GRANT の指示に従って、これらの権限をユーザーに付与できます。
バージョン 要件
| Spark コネクタ | Spark | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 以降 | 8 | 2.12 |
| 1.1.1 | 3.2, 3.3, または 3.4 | 2.5 以降 | 8 | 2.12 |
| 1.1.0 | 3.2, 3.3, または 3.4 | 2.5 以降 | 8 | 2.12 |
注意
- Spark コネクタのバージョン間の動作変更については、Upgrade Spark connector を参照してください。
- Spark コネクタはバージョン 1.1.1 以降、MySQL JDBC ドライバを提供していません。ドライバを手動で Spark クラスパスにインポートする必要があります。ドライバは MySQL サイト または Maven Central で見つけることができます。
Spark コネクタの取得
Spark コネクタの JAR ファイルは以下の方法で取得できます:
- コンパイル済みの Spark コネクタ JAR ファイルを直接ダウンロードします。
- Maven プロジェクトに Spark コネクタを依存関係として追加し、JAR ファイルをダウンロードします。
- Spark コネクタのソースコードを自分でコンパイルして JAR ファイルを作成します。
Spark コネクタ JAR ファイルの命名形式は starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar です。
例えば、環境に Spark 3.2 と Scala 2.12 をインストールしており、Spark コネクタ 1.1.0 を使用したい場合、starrocks-spark-connector-3.2_2.12-1.1.0.jar を使用できます。
注意
一般に、最新バージョンの Spark コネクタは、Spark の最新の 3 バージョンとの互換性のみを維持します。
コンパイル済みの Jar ファイルをダウンロード
Maven Central Repository から対応するバージョンの Spark コネクタ JAR を直接ダウンロードします。
Maven 依存関係
-
Maven プロジェクトの
pom.xmlファイルに、以下の形式で Spark コネクタを依存関係として追加します。spark_version、scala_version、およびconnector_versionをそれぞれのバージョンに置き換えます。<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency> -
例えば、環境の Spark バージョンが 3.2、Scala バージョンが 2.12 で、Spark コネクタ 1.1.0 を選択する場合、以下の依存関係を追加する必要があります:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
自分でコンパイル
-
Spark コネクタパッケージ をダウンロードします。
-
以下のコマンドを実行して、Spark コネクタのソースコードを JAR ファイルにコンパイルします。
spark_versionは対応する Spark バージョンに置き換えてください。sh build.sh <spark_version>例えば、環境の Spark バージョンが 3.2 の場合、以下のコマンドを実行する必要があります:
sh build.sh 3.2 -
target/ディレクトリに移動し、コンパイル時に生成された Spark コネクタ JAR ファイル(例:starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar)を見つけます。
注意
正式にリリースされていない Spark コネクタの名前には
SNAPSHOTサフィックスが含まれています。
パラメータ
starrocks.fe.http.url
必須: YES
デフォルト値: なし
説明: StarRocks クラスター内の FE の HTTP URL。複数の URL を指定でき、カンマ (,) で区切る必要があります。形式: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>。バージョン 1.1.1 以降、URL に http:// プレフィックスを追加することもできます。例:http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2>。
starrocks.fe.jdbc.url
必須: YES
デフォルト値: なし
説明: FE の MySQL サーバーに接続するために使用されるアドレス。形式: jdbc:mysql://<fe_host>:<fe_query_port>。
starrocks.table.identifier
必須: YES
デフォルト値: なし
説明: StarRocks テーブルの名前。形式: <database_name>.<table_name>。
starrocks.user
必須: YES
デフォルト値: なし
説明: StarRocks クラスターアカウントのユーザー名。ユーザーは StarRocks テーブルに対する SELECT および INSERT 権限 を持つ必要があります。
starrocks.password
必須: YES
デフォルト値: なし
説明: StarRocks クラスターアカウントのパスワード。
starrocks.write.label.prefix
必須: NO
デフォルト値: spark-
説明: Stream Load で使用されるラベルプレフィックス。
starrocks.write.enable.transaction-stream-load
必須: NO
デフォルト値: TRUE
説明: Stream Load トランザクションインターフェース を使用してデータをロードするかどうか。StarRocks v2.5 以降が必要です。この機能は、トランザクション内でより多くのデータを少ないメモリ使用量でロードし、パフォーマンスを向上させます。
注意: バージョン 1.1.1 以降、このパラメータは starrocks.write.max.retries の値が非正の場合にのみ有効です。なぜなら、Stream Load トランザクションインターフェースはリトライをサポートしていないからです。
starrocks.write.buffer.size
必須: NO
デフォルト値: 104857600
説明: 一度に StarRocks に送信される前にメモリに蓄積できるデータの最大サイズ。このパラメータを大きな値に設定すると、ロードパフォーマンスが向上しますが、ロード遅延が増加する可能性があります。
starrocks.write.buffer.rows
必須: NO
デフォルト値: Integer.MAX_VALUE
説明: バージョン 1.1.1 以降でサポートされています。一度に StarRocks に送信される前にメモリに蓄積できる行の最大数。
starrocks.write.flush.interval.ms
必須: NO
デフォルト値: 300000
説明: データが StarRocks に送信される間隔。このパラメータはロード遅延を制御するために使用されます。
starrocks.write.max.retries
必須: NO
デフォルト値: 3
説明: バージョン 1.1.1 以降でサポートされています。ロードが失敗した場合に同じバッチのデータに対して Stream Load を再試行する回数。
注意: Stream Load トランザクションインターフェースはリトライをサポートしていないため、このパラメータが正の場合、コネクタは常に Stream Load インターフェースを使用し、starrocks.write.enable.transaction-stream-load の値を無視します。