StarRocks Spark Connector
Spark コネクタを使用してデータをロードする(推奨)
StarRocks は、Apache Spark™ 用に開発されたコネクタである StarRocks Connector for Apache Spark™(以下、Spark コネクタ)を提供しており、Spark を使用して StarRocks テーブルにデータをロードするのに役立ちます。基本的な原理は、データを蓄積し、STREAM LOAD を通じて一度にすべてのデータを StarRocks にロードすることです。Spark コネクタは Spark DataSource V2 に基づいて実装されています。DataSource は Spark DataFrames または Spark SQL を使用して作成できます。バッチモードと構造化ストリーミングモードの両方がサポートされています。
注意
StarRocks テーブルにデータをロードできるのは、SELECT および INSERT 権限を持つユーザーのみです。GRANT の指示に従って、これらの権限をユーザーに付与できます。
バージョン要件
| Spark コネクタ | Spark | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 以降 | 8 | 2.12 |
| 1.1.1 | 3.2, 3.3, または 3.4 | 2.5 以降 | 8 | 2.12 |
| 1.1.0 | 3.2, 3.3, または 3.4 | 2.5 以降 | 8 | 2.12 |
注意
- Spark コネクタのバージョン間の動作の変更については、Upgrade Spark connector を参照してください。
- Spark コネクタはバージョン 1.1.1 以降、MySQL JDBC ドライバを提供していないため、手動で Spark クラスパスにドライバをインポートする必要があります。ドライバは MySQL サイト または Maven Central で見つけることができます。
Spark コネクタの取得
Spark コネクタの JAR ファイルは以下の方法で取得できます:
- コンパイル済みの Spark コネクタ JAR ファイルを直接ダウンロードする。
- Maven プロジェクトに Spark コネクタを依存関係として追加し、JAR ファイルをダウンロードする。
- Spark コネクタのソースコードを自分でコンパイルして JAR ファイルを作成する。
Spark コネクタ JAR ファイルの命名形式は starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar です。
例えば、環境に Spark 3.2 と Scala 2.12 をインストールし、Spark コネクタ 1.1.0 を使用したい場合、starrocks-spark-connector-3.2_2.12-1.1.0.jar を使用できます。
注意
一般に、最新バージョンの Spark コネクタは、Spark の最新の 3 バージョンとのみ互換性があります。
コンパイル済み Jar ファイルのダウンロード
Maven Central Repository から対応するバージョンの Spark コネクタ JAR を直接ダウンロードします。
Maven 依存関係
-
Maven プロジェクトの
pom.xmlファイルに、以下の形式で Spark コネクタを依存関係として追加します。spark_version、scala_version、connector_versionをそれぞれのバージョンに置き換えてください。<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency> -
例えば、環境の Spark バージョンが 3.2、Scala バージョンが 2.12 で、Spark コネクタ 1.1.0 を選択した場合、以下の依存関係を追加する必要があります:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
自分でコンパイルする
-
Spark コネクタパッケージ をダウンロードします。
-
以下のコマンドを実行して、Spark コネクタのソースコードを JAR ファイルにコンパイルします。
spark_versionは対応する Spark バージョンに置き換えてください。sh build.sh <spark_version>例えば、環境の Spark バージョンが 3.2 の場合、以下のコマンドを実行する必要があります:
sh build.sh 3.2 -
target/ディレクトリに移動し、コンパイル時に生成された Spark コネクタ JAR ファイル(例:starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar)を見つけます。
注意
正式にリリースされていない Spark コネクタの名前には
SNAPSHOTサフィックスが含まれています。
パラメータ
starrocks.fe.http.url
必須: YES
デフォルト値: なし
説明: StarRocks クラスター内の FE の HTTP URL。複数の URL を指定でき、カンマ (,) で区切る必要があります。形式: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>。バージョン 1.1.1 以降、URL に http:// プレフィックスを追加することもできます。例:http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2>。
starrocks.fe.jdbc.url
必須: YES
デフォルト値: なし
説明: FE の MySQL サーバーに接続するために使用されるアドレス。形式: jdbc:mysql://<fe_host>:<fe_query_port>。
starrocks.table.identifier
必須: YES
デフォルト値: なし
説明: StarRocks テーブルの名前。形式: <database_name>.<table_name>。
starrocks.user
必須: YES
デフォルト値: なし
説明: StarRocks クラスターアカウントのユーザー名。ユーザーは StarRocks テーブルに対する SELECT および INSERT 権限 が必要です。
starrocks.password
必須: YES
デフォルト値: なし
説明: StarRocks クラスターアカウントのパスワード。
starrocks.write.label.prefix
必須: NO
デフォルト値: spark-
説明: Stream Load で使用されるラベルプレフィックス。
starrocks.write.enable.transaction-stream-load
必須: NO
デフォルト値: TRUE
説明: データをロードするために Stream Load トランザクションインターフェース を使用するかどうか。StarRocks v2.5 以降が必要です。この機能は、トランザクション内でより多くのデータを少ないメモリ使用量でロードし、パフォーマンスを向上させます。
注意: バージョン 1.1.1 以降、このパラメータは starrocks.write.max.retries の値が非正の場合にのみ有効です。なぜなら、Stream Load トランザクションインターフェースはリトライをサポートしていないためです。
starrocks.write.buffer.size
必須: NO
デフォルト値: 104857600
説明: 一度に StarRocks に送信される前にメモリに蓄積できるデータの最大サイズ。このパラメータを大きな値に設定すると、ロードパフォーマンスが向上しますが、ロードの遅延が増加する可能性があります。
starrocks.write.buffer.rows
必須: NO
デフォルト値: Integer.MAX_VALUE
説明: バージョン 1.1.1 以降でサポートされています。一度に StarRocks に送信される前にメモリに蓄積できる行の最大数。
starrocks.write.flush.interval.ms
必須: NO
デフォルト値: 300000
説明: データが StarRocks に送信される間隔。このパラメータはロードの遅延を制御するために使用されます。
starrocks.write.max.retries
必須: NO
デフォルト値: 3
説明: バージョン 1.1.1 以降でサポートされています。ロードが失敗した場合に同じバッチのデータに対して Stream Load を再試行する回数。
注意: Stream Load トランザクションインターフェースはリトライをサポートしていないため、このパラメータが正の場合、コネクタは常に Stream Load インターフェースを使用し、starrocks.write.enable.transaction-stream-load の値を無視します。
starrocks.write.retry.interval.ms
必須: NO
デフォルト値: 10000
説明: バージョン 1.1.1 以降でサポートされています。ロードが失敗した場合に同じバッチのデータに対して Stream Load を再試行する間隔。
starrocks.columns
必須: NO
デフォルト値: なし
説明: データをロードしたい StarRocks テーブルの列。複数の列を指定でき、カンマ (,) で区切る必要があります。例:"col0,col1,col2"。
starrocks.column.types
必須: NO
デフォルト値: なし
説明: バージョン 1.1.1 以降でサポートされています。StarRocks テーブルから推測されるデフォルトや デフォルトのマッピング を使用する代わりに、Spark 用の列データ型をカスタマイズします。パラメータ値は Spark の StructType#toDDL の出力と同じ DDL 形式のスキーマです。例 :col0 INT, col1 STRING, col2 BIGINT。カスタマイズが必要な列のみを指定する必要があります。使用例として、BITMAP または HLL 型の列にデータをロードすることがあります。
starrocks.write.properties.*
必須: NO
デフォルト値: なし
説明: Stream Load の動作を制御するために使用されるパラメータ。例えば、パラメータ starrocks.write.properties.format はロードされるデータの形式を指定します。CSV や JSON などです。サポートされているパラメータとその説明のリストについては、STREAM LOAD を参照してください。
starrocks.write.properties.format
必須: NO
デフォルト値: CSV
説明: Spark コネクタが StarRocks にデータを送信する前に各バッチのデータを変換するためのファイル形式。有効な値:CSV および JSON。
starrocks.write.properties.row_delimiter
必須: NO
デフォルト値: \n
説明: CSV 形式のデータの行区切り文字。
starrocks.write.properties.column_separator
必須: NO
デフォルト値: \t
説明: CSV 形式のデータの列区切り文字。
starrocks.write.properties.partial_update
必須: NO
デフォルト値: FALSE
説明: 部分更新を使用するかどうか。有効な値:TRUE および FALSE。デフォルト値:FALSE、この機能を無効にすることを示します。
starrocks.write.properties.partial_update_mode
必須: NO
デフォルト値: row
説明: 部分更新のモードを指定します。有効な値:row および column。
- 値
row(デフォルト)は行モードでの部分更新を意味し、多くの列と小さなバッチでのリアルタイム更新により適しています。 - 値
columnは列モードでの部分更新を意味し、少ない列と多くの行でのバッチ更新により適しています。このようなシナリオでは、列モードを有効にすると更新速度が速くなります。例えば、100 列のテーブルで、すべての行に対して 10 列(全体の 10%)のみが更新される場合、列モードの更新速度は 10 倍速くなります。
starrocks.write.num.partitions
必須: NO
デフォルト値: なし
説明: Spark がデータを書き込むことができる並列パーティションの数。データ量が少ない場合、パーティション数を減らしてロードの同時実行性と頻度を下げることができます。このパラメータのデフォルト値は Spark によって決定されます。ただし、この方法は Spark Shuffle コストを引き起こす可能性があります。
starrocks.write.partition.columns
必須: NO
デフォルト値: なし
説明: Spark のパーティション列。このパラメータは starrocks.write.num.partitions が指定されている場合にのみ有効です。このパラメータが指定されていない場合、書き込まれるすべての列がパーティションに使用されます。
starrocks.timezone
必須: NO
デフォルト値: JVM のデフォルトタイムゾーン
説明: バージョン 1.1.1 以降でサポートされています。Spark の TimestampType を StarRocks の DATETIME に変換するために使用されるタイムゾーン。デフォルトは ZoneId#systemDefault() によって返される JVM のタイムゾーンです。形式は Asia/Shanghai のようなタイムゾーン名、または +08:00 のようなゾーンオフセットです。
Spark と StarRocks のデータ型マッピング
- デフォルトのデータ型マッピングは以下の通りです:
| Spark データ型 | StarRocks データ型 |
|---|---|
| BooleanType | BOOLEAN |
| ByteType | TINYINT |
| ShortType | SMALLINT |
| IntegerType | INT |
| LongType | BIGINT |
| StringType | LARGEINT |
| FloatType | FLOAT |
| DoubleType | DOUBLE |
| DecimalType | DECIMAL |
| StringType | CHAR |
| StringType | VARCHAR |
| StringType | STRING |
| StringType | JSON |
| DateType | DATE |
| TimestampType | DATETIME |
| ArrayType | ARRAY 注意: バージョン 1.1.1 以降でサポートされています。詳細な手順については、Load data into columns of ARRAY type を参照してください。 |
-
データ型マッピングをカスタマイズすることもできます。
例えば、StarRocks テーブルに BITMAP および HLL 列が含まれているが、Spark はこれらのデータ型をサポートしていません。Spark で対応するデータ型をカスタマイズする必要があります。詳細な手順については、BITMAP および HLL 列にデータをロードする方法を参照してください。BITMAP および HLL はバージョン 1.1.1 以降でサポートされています。
Spark コネクタのアップグレード
バージョン 1.1.0 から 1.1.1 へのアップグレード
- バージョン 1.1.1 以降、Spark コネクタは MySQL の公式 JDBC ドライバである
mysql-connector-javaを提供していません。これは、mysql-connector-javaが使用する GPL ライセンスの制限によるものです。ただし、Spark コネクタはテーブルメタデータに接続するために MySQL JDBC ドライバを必要とするため、手動で Spark クラスパスにドライバを追加する必要があります。ドライバは MySQL サイト または Maven Central で見つけることができます。 - バージョン 1.1.1 以降、コネクタはデフォルトで Stream Load インターフェースを使用し、バージョン 1.1.0 では Stream Load トランザクションインターフェースを使用していました。Stream Load トランザクションインターフェースを引き続き使用したい場合は、オプション
starrocks.write.max.retriesを0に設定できます。詳細については、starrocks.write.enable.transaction-stream-loadおよびstarrocks.write.max.retriesの説明を参照してください。
例
以下の例は、Spark DataFrames または Spark SQL を使用して Spark コネクタで StarRocks テーブルにデータをロードする方法を示しています。Spark DataFrames はバッチモードと構造化ストリーミングモードの両方をサポートしています。
詳細な例については、Spark Connector Examples を参照してください。
準備
StarRocks テーブルの作成
データベース test を作成し、主キーテーブル score_board を作成します。
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
ネットワーク設定
Spark が配置されているマシンが、StarRocks クラスターの FE ノードに http_port(デフォルト:8030)および query_port(デフォルト:9030)を介してアクセスでき、BE ノードに be_http_port(デフォルト:8040)を介してアクセスできることを確認してください。
Spark 環境のセットアップ
以下の例は Spark 3.2.4 で実行され、spark-shell、pyspark、spark-sql を使用します。例を実行する前に、Spark コネクタ JAR ファイル を $SPARK_HOME/jars ディレクトリに配置してください。
Spark DataFrames を使用してデータをロードする
以下の 2 つの例は、Spark DataFrames バッチまたは構造化ストリーミングモードでデータをロードする方法を説明しています。
バッチ
メモリ内でデータを構築し、StarRocks テーブルにデータをロードします。
- Scala または Python を使用して Spark アプリケーションを書くことができます。
Scala の場合、spark-shell で以下のコードスニペットを実行します:
// 1. Create a DataFrame from a sequence.
val data = Seq((1, "starrocks", 100), (2, "spark", 100))
val df = data.toDF("id", "name", "score")
// 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
// You need to modify the options according your own environment.
df.write.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.mode("append")
.save()
Python の場合、pyspark で以下のコードスニペットを実行します:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("StarRocks Example") \
.getOrCreate()
# 1. Create a DataFrame from a sequence.
data = [(1, "starrocks", 100), (2, "spark", 100)]
df = spark.sparkContext.parallelize(data) \
.toDF(["id", "name", "score"])
# 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
# You need to modify the options according your own environment.
df.write.format("starrocks") \
.option("starrocks.fe.http.url", "127.0.0.1:8030") \
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030") \
.option("starrocks.table.identifier", "test.score_board") \
.option("starrocks.user", "root") \
.option("starrocks.password", "") \
.mode("append") \
.save()
-
StarRocks テーブルのデータをクエリします。
MySQL [test]> SELECT * FROM `score_board`;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
構造化ストリーミング
CSV ファイルからのデータのストリーミング読み取りを構築し、StarRocks テーブルにデータをロードします。
-
ディレクトリ
csv-dataに、以下のデータを含む CSV ファイルtest.csvを作成します:3,starrocks,100
4,spark,100 -
Scala または Python を使用して Spark アプリケーションを書くことができます。
Scala の場合、spark-shell で以下のコードスニペットを実行します:
import org.apache.spark.sql.types.StructType
// 1. Create a DataFrame from CSV.
val schema = (new StructType()
.add("id", "integer")
.add("name", "string")
.add("score", "integer")
)
val df = (spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
// Replace it with your path to the directory "csv-data".
.load("/path/to/csv-data")
)
// 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
// You need to modify the options according your own environment.
val query = (df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
// replace it with your checkpoint directory
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
Python の場合、pyspark で以下のコードスニペットを実行します:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
spark = SparkSession \
.builder \
.appName("StarRocks SS Example") \
.getOrCreate()
# 1. Create a DataFrame from CSV.
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("score", IntegerType())
])
df = (
spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
# Replace it with your path to the directory "csv-data".
.load("/path/to/csv-data")
)
# 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
# You need to modify the options according your own environment.
query = (
df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
# replace it with your checkpoint directory
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
-
StarRocks テーブルのデータをクエリします。
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 4 | spark | 100 |
| 3 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.67 sec)
Spark SQL を使用してデータをロードする
以下の例は、Spark SQL CLI で INSERT INTO ステートメントを使用して Spark SQL でデータをロードする方法を説明しています。
-
spark-sqlで以下の SQL ステートメントを実行します:-- 1. Create a table by configuring the data source as `starrocks` and the following options.
-- You need to modify the options according your own environment.
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"=""
);
-- 2. Insert two rows into the table.
INSERT INTO `score_board` VALUES (5, "starrocks", 100), (6, "spark", 100); -
StarRocks テーブルのデータをクエリします。
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 6 | spark | 100 |
| 5 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
ベストプラクティス
主キーテーブルにデータをロードする
このセクションでは、StarRocks 主キーテーブルにデータをロードして部分更新や条件付き更新を実現する方法を示します。これらの機能の詳細な紹介については、Change data through loading を参照してください。これらの例では Spark SQL を使用します。
準備
データベース test を作成し、StarRocks に主キーテーブル score_board を作成します。
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
部分更新
この例では、ロードを通じて name 列のデータのみを更新する方法を示します:
-
MySQL クライアントで StarRocks テーブルに初期データ を挿入します。
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);
mysql> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.02 sec) -
Spark SQL クライアントで Spark テーブル
score_boardを作成します。- コネクタに部分更新を行うように指示するオプション
starrocks.write.properties.partial_updateをtrueに設定します。 - コネクタに書き込む列を指示するオプション
starrocks.columnsを"id,name"に設定します。
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.write.properties.partial_update"="true",
"starrocks.columns"="id,name"
); - コネクタに部分更新を行うように指示するオプション
-
Spark SQL クライアントでテーブルにデータを挿入し、
name列のみを更新します。INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'spark-update'); -
MySQL クライアントで StarRocks テーブルをクエリします。
nameの値のみが変更され、scoreの値は変更されていないことがわかります。mysql> select * from score_board;
+------+------------------+-------+
| id | name | score |
+------+------------------+-------+
| 1 | starrocks-update | 100 |
| 2 | spark-update | 100 |
+------+------------------+-------+
2 rows in set (0.02 sec)
条件付き更新
この例では、score 列の値に基づいて条件付き更新を行う方法を示します。id の更 新は、新しい score の値が古い値以上の場合にのみ有効です。
-
MySQL クライアントで StarRocks テーブルに初期データを挿入します。
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);
mysql> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.02 sec) -
Spark テーブル
score_boardを以下の方法で作成します。- コネクタに
score列を条件として使用するように指示するオプションstarrocks.write.properties.merge_conditionをscoreに設定します。 - コネクタがデータをロードするために Stream Load インターフェースを使用し、Stream Load トランザクションインターフェースを使用しないことを確認します。後者はこの機能をサポートしていません。
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.write.properties.merge_condition"="score"
); - コネクタに
-
Spark SQL クライアントでテーブルにデータを挿入し、
idが 1 の行を小さいスコア値で更新し、idが 2 の行を大きいスコア値で更新します。INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'spark-update', 101); -
MySQL クライアントで StarRocks テーブルをクエリします。
idが 2 の行のみが変更され、idが 1 の行は変更されていないことがわかります。mysql> select * from score_board;
+------+--------------+-------+
| id | name | score |
+------+--------------+-------+
| 1 | starrocks | 100 |
| 2 | spark-update | 101 |
+------+--------------+-------+
2 rows in set (0.03 sec)