Spark コネクタを使用して StarRocks からデータを読み取る
StarRocks は、Apache Spark™ 用に開発したコネクタである StarRocks Connector for Apache Spark™(以下、Spark コネクタ)を提供しており、これを使用して StarRocks テーブルからデータを読み取ることができます。Spark を使用して、StarRocks から読み取ったデータに対して複雑な処理や機械学習を行うことができます。
Spark コネクタは、Spark SQL、Spark DataFrame、Spark RDD の3つの読み取り方法をサポートしています。
Spark SQL を使用して StarRocks テーブルに一時ビューを作成し、その一時ビューを使用して StarRocks テーブルから直接データを読み取ることができます。
また、StarRocks テーブルを Spark DataFrame または Spark RDD にマッピングし、その Spark DataFrame または Spark RDD からデータを読み取ることもできます。Spark DataFrame の使用を推奨します。
注意
StarRocks テーブルの SELECT 権限を持つユーザーのみが、このテーブルからデータを読み取ることができます。GRANT の指示に従って、ユーザーに権限を付与することができます。
使用上の注意
- データを読み取る前に StarRocks 上でデータをフィルタリングすることで、転送されるデータ量を削減できます。
- データ読み取りのオーバーヘッドが大きい場合、適切なテーブル設計とフィルタ条件を使用して、Spark が一度に過剰なデータを読み取らないようにすることができます。これにより、ディスクやネットワーク接続への I/O 圧力を軽減し、通常のクエリが適切に実行されることを保証できます。
バージョン要件
| Spark コネクタ | Spark | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 and later | 8 | 2.12 |
| 1.1.1 | 3.2, 3.3, 3.4 | 2.5 and later | 8 | 2.12 |
| 1.1.0 | 3.2, 3.3, 3.4 | 2.5 and later | 8 | 2.12 |
| 1.0.0 | 3.x | 1.18 and later | 8 | 2.12 |
| 1.0.0 | 2.x | 1.18 and later | 8 | 2.11 |
注意
- 異なるコネクタバージョン間の動作変更については、Upgrade Spark connector を参照してください。
- バージョン 1.1.1 以降、コネクタは MySQL JDBC ドライバを提供していないため、Spark クラスパスに手動でドライバをインポートする必要があります。ドライバは Maven Central で見つけることができます。
- バージョン 1.0.0 では、Spark コネクタは StarRocks からのデータ読み取りのみをサポートしています。バージョン 1.1.0 以降では、StarRocks からのデータ読み取りと書き込みの両方をサポートしています。
- バージョン 1.0.0 と 1.1.0 の間では、パラメータとデータ型のマッピングが異なります。Upgrade Spark connector を参照してください。
- 一般的な場合、バージョン 1.0.0 に新しい機能は追加されません。できるだけ早く Spark コネクタをアップグレードすることをお勧めします。
Spark コネクタの取得
ビジネスニーズに合った Spark コネクタ .jar パッケージを取得するには、次のいずれかの方法を使用します。
- コンパイル済みパッケージをダウンロードする。
- Maven を使用して Spark コネクタに必要な依存関係を追加する。(この方法は Spark コネクタ 1.1.0 以降でのみサポートされています。)
- 手動でパッケージをコンパイルする。
Spark コネクタ 1.1.0 以降
Spark コネクタ .jar パッケージは、次の形式で命名されています。
starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
たとえば、Spark 3.2 と Scala 2.12 で Spark コネクタ 1.1.0 を使用したい場合、starrocks-spark-connector-3.2_2.12-1.1.0.jar を選択できます。
注意
通常、最新の Spark コネクタバージョンは、最新の 3 つの Spark バージョンと互換性があります。
コンパイル済みパッケージをダウンロードする
さまざまなバージョンの Spark コネクタ .jar パッケージは Maven Central Repository で入手できます。
Maven 依存関係を追加する
Spark コネクタに必要な依存関係を次のように設定します。
注意
spark_version、scala_version、connector_versionを使用する Spark バージョン、Scala バージョン、および Spark コネクタバージョンに置き換える必要があります。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>
たとえば、Spark 3.2 と Scala 2.12 で Spark コネクタ 1.1.0 を使用したい場合、依存関係を次のように設定します。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
手動でパッケージをコンパイルする
-
Spark コネクタのコード をダウンロードします。
-
次のコマンドを使用して Spark コネクタをコンパイルします。
注意
spark_versionを使用する Spark バージョンに置き換える必要があります。sh build.sh <spark_version>たとえば、Spark 3.2 で Spark コネクタを使用したい場合、次のように Spark コネクタをコンパイルします。
sh build.sh 3.2 -
target/パスに移動し、コンパイル時に生成されたstarrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jarのような Spark コネクタ .jar パッケージを確認します。注意
公式にリリースされていない Spark コネクタバージョンを使用している場合、生成された Spark コネクタ .jar パッケージの名前には
SNAPSHOTがサフィックスとして含まれます。
Spark コネクタ 1.0.0
コンパイル済みパッケージをダウンロードする
手動でパッケージをコンパイルする
-
Spark コネクタのコード をダウンロードします。
注意
spark-1.0に切り替える必要があります。 -
Spark コネクタをコンパイルするために次のいずれかのアクションを実行します。
-
Spark 2.x を使用している場合、次のコマンドを実行します。デフォルトで Spark 2.3.4 に適した Spark コネクタをコンパイルします。
sh build.sh 2 -
Spark 3.x を使用している場合、次のコマンドを実行します。デフォルトで Spark 3.1.2 に適した Spark コネクタをコンパイルします。
sh build.sh 3
-
-
output/パスに移動し、コンパイル時に生成されたstarrocks-spark2_2.11-1.0.0.jarファイルを確認します。その後、ファイルを Spark のクラスパスにコピーします。- Spark クラスターが
Localモードで実行されている場合、ファイルをjars/パスに配置します。 - Spark クラスターが
Yarnモードで実行されている場合、ファイルを事前デプロイメントパッケージに配置します。
- Spark クラスターが
指定された場所にファイルを配置した後にのみ、Spark コネクタを使用して StarRocks からデータを読み取ることができます。
パラメータ
このセクションでは、Spark コネクタを使用して StarRocks からデータを読み取る際に設定する必要があるパラメータについて説明します。
共通パラメータ
次のパラメータは、Spark SQL、Spark DataFrame、Spark RDD の3つの読み取り方法すべてに適用されます。
| パラメータ | デフォルト値 | 説明 |
|---|---|---|
| starrocks.fenodes | None | StarRocks クラスター内の FE の HTTP URL。形式 <fe_host>:<fe_http_port>。複数の URL を指定する場合は、カンマ (,) で区切る必要があります。 |
| starrocks.table.identifier | None | StarRocks テーブルの名前。形式: <database_name>.<table_name>。 |
| starrocks.request.retries | 3 | Spark が StarRocks に読み取りリクエストを再送信できる最大回数。 |
| starrocks.request.connect.timeout.ms | 30000 | StarRocks に送信された読み取りリクエストがタイムアウトするまでの最大時間。 |
| starrocks.request.read.timeout.ms | 30000 | StarRocks に送信されたリクエストの読み取りがタイムアウトするまでの最大時間。 |
| starrocks.request.query.timeout.s | 3600 | StarRocks からのデータクエリがタイムアウトするまでの最大時間。デフォルトのタイムアウト期間は1時間です。-1 はタイムアウト期間が指定されていないことを意味します。 |
| starrocks.request.tablet.size | Integer.MAX_VALUE | 各 Spark RDD パーティションにグループ化される StarRocks タブレットの数。このパラメータの値が小さいほど、生成される Spark RDD パーティションの数が多くなります。Spark の並行性が高くなる一方で、StarRocks への負荷も増加します。 |
| starrocks.batch.size | 4096 | 一度に BEs から読み取ることができる最大行数。このパラメータの値を増やすことで、Spark と StarRocks 間で確立される接続数を減らし、ネッ トワーク遅延による余分な時間のオーバーヘッドを軽減できます。 |
| starrocks.exec.mem.limit | 2147483648 | クエリごとに許可される最大メモリ量。単位: バイト。デフォルトのメモリ制限は2 GBです。 |
| starrocks.deserialize.arrow.async | false | Arrow メモリ形式を Spark コネクタのイテレーションに必要な RowBatches に非同期で変換するかどうかを指定します。 |
| starrocks.deserialize.queue.size | 64 | Arrow メモリ形式を RowBatches に非同期で変換するタスクを保持する内部キューのサイズ。このパラメータは starrocks.deserialize.arrow.async が true に設定されている場合に有効です。 |
| starrocks.filter.query | None | StarRocks 上でデータをフィルタリングするための条件。複数のフィルタ条件を指定する場合は、and で結合する必要があります。StarRocks は、指定されたフィルタ条件に基づいて StarRocks テーブルからデータをフィルタリングし、その後 Spark によってデータが読み取られます。 |
| starrocks.timezone | JVM のデフォルトタイムゾーン | 1.1.1 以降でサポートされています。StarRocks の DATETIME を Spark の TimestampType に変換するために使用されるタイムゾーン。デフォルトは ZoneId#systemDefault() によって返される JVM のタイムゾーンです。形式は Asia/Shanghai のようなタイムゾーン名、または +08:00 のようなゾーンオフセットです。 |