StarRocks Spark Connector
Spark コネクタを使用してデータをロードする(推奨)
StarRocks は、Apache Spark™ 用に開発されたコネクタである StarRocks Connector for Apache Spark(以下、Spark コネクタ)を提供しています。このコネクタを使用して、Spark を通じて StarRocks テーブルにデータをロードすることができます。基本的な原理は、データを蓄積し、STREAM LOAD を通じて一度に StarRocks にロードすることです。Spark コネクタは Spark DataSource V2 に基づいて実装されています。DataSource は Spark DataFrames または Spark SQL を使用して作成できます。バッチモードと構造化ストリーミングモードの両方がサポートされています。
注意
StarRocks テーブルにデータをロードできるのは、SELECT および INSERT 権限を持つユーザーのみです。GRANT の指示に従って、これらの権限をユーザーに付与できます。
バージョン要件
Spark コネクタ | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 以降 | 8 | 2.12 |
1.1.1 | 3.2, 3.3, または 3.4 | 2.5 以降 | 8 | 2.12 |
1.1.0 | 3.2, 3.3, または 3.4 | 2.5 以降 | 8 | 2.12 |
注意
- Spark コネクタのバージョン間の動作変更については、Upgrade Spark connector を参照してください。
- Spark コネクタはバージョン 1.1.1 以降、MySQL JDBC ドライバを提供していません。ドライバを手動で Spark クラスパスにインポートする必要があります。ドライバは MySQL サイト または Maven Central で見つけることができます。
Spark コネクタの取得
Spark コネクタの JAR ファイルは以下の方法で取得できます:
- コンパイル済みの Spark コネクタ JAR ファイルを直接ダウンロードします。
- Maven プロジェクトに Spark コネクタを依存関係として追加し、JAR ファイルをダウンロードします。
- Spark コネクタのソースコードを自分でコンパイルして JAR ファイルを作成します。
Spark コネクタ JAR ファイルの命名形式は starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
です。
例えば、環境に Spark 3.2 と Scala 2.12 をインストールしており、Spark コネクタ 1.1.0 を使用したい場合、starrocks-spark-connector-3.2_2.12-1.1.0.jar
を使用できます。
注意
一般に、最新バージョンの Spark コネクタは、Spark の最新の 3 バージョンとの互換性のみを維持します。
コンパイル済みの Jar ファイルをダウンロード
Maven Central Repository から対応するバージョンの Spark コネクタ JAR を直接ダウンロードします。
Maven 依存関係
-
Maven プロジェクトの
pom.xml
ファイルに、以下の形式で Spark コネクタを依存関係として追加します。spark_version
、scala_version
、およびconnector_version
をそれぞれのバージョンに置き換えます。<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency> -
例えば、環境の Spark バージョンが 3.2、Scala バージョンが 2.12 で、Spark コネクタ 1.1.0 を選択する場合、以下の依存関係を追加する必要があります:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
自分でコンパイル
-
Spark コネクタパッケージ をダウンロードします。
-
以下のコマンドを実行して、Spark コネクタのソースコードを JAR ファイルにコンパイルします。
spark_version
は対応する Spark バージョンに置き換えてください。sh build.sh <spark_version>
例えば、環境の Spark バージョンが 3.2 の場合、以下のコマンドを実行する必要があります:
sh build.sh 3.2
-
target/
ディレクトリに移動し、コンパイル時に生成された Spark コネクタ JAR ファイル(例:starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar
)を見つけます。
注意
正式にリリースされていない Spark コネクタの名前には
SNAPSHOT
サフィックスが含まれています。
パラメータ
starrocks.fe.http.url
必須: YES
デフォルト値: なし
説明: StarRocks クラスター内の FE の HTTP URL。複数の URL を指定でき、カンマ (,) で区切る必要があります。形式: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>
。バージョン 1.1.1 以降、URL に http://
プレフィックスを追加することもできます。例:http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2>
。
starrocks.fe.jdbc.url
必須: YES
デフォルト値: なし
説明: FE の MySQL サーバーに接続するために使用されるアドレス。形式: jdbc:mysql://<fe_host>:<fe_query_port>
。
starrocks.table.identifier
必須: YES
デフォルト値: なし
説明: StarRocks テーブルの名前。形式: <database_name>.<table_name>
。
starrocks.user
必須: YES
デフォルト値: なし
説明: StarRocks クラスターアカウントのユーザー名。ユーザーは StarRocks テーブルに対する SELECT および INSERT 権限 を持つ必要があります。
starrocks.password
必須: YES
デフォルト値: なし
説明: StarRocks クラスターアカウントのパスワード。
starrocks.write.label.prefix
必須: NO
デフォルト値: spark-
説明: Stream Load で使用されるラベルプレフィックス。
starrocks.write.enable.transaction-stream-load
必須: NO
デフォルト値: TRUE
説明: Stream Load トランザクションインターフェース を使用してデータをロードするかどうか。StarRocks v2.5 以降が必要です。この機能は、トランザクション内でより多くのデータを少ないメモリ使用量でロードし、パフォーマンスを向上させます。
注意: バージョン 1.1.1 以降、このパラメータは starrocks.write.max.retries
の値が非正の場合にのみ有効です。なぜなら、Stream Load トランザクションインターフェースはリトライをサポートしていないからです。
starrocks.write.buffer.size
必須: NO
デフォルト値: 104857600
説明: 一度に StarRocks に送信される前にメモリに蓄積できるデータの最大サイズ。このパラメータを大きな値に設定すると、ロードパフォーマンスが向上しますが、ロード遅延が増加する可能性があります。
starrocks.write.buffer.rows
必須: NO
デフォルト値: Integer.MAX_VALUE
説明: バージョン 1.1.1 以降でサポートされています。一度に StarRocks に送信される前にメモリに蓄積できる行の最大数。
starrocks.write.flush.interval.ms
必須: NO
デフォルト値: 300000
説明: データが StarRocks に送信される間隔。このパラメータはロード遅延を制御するために使用されます。
starrocks.write.max.retries
必須: NO
デフォルト値: 3
説明: バージョン 1.1.1 以降でサポートされています。ロードが失敗した場合に同じバッチのデータに対して Stream Load を再試行する回数。
注意: Stream Load トランザクションインターフェースはリトライをサポートしていないため、このパラメータが正の場合、コネクタは常に Stream Load インターフェースを使用し、starrocks.write.enable.transaction-stream-load
の値を無視します。
starrocks.write.retry.interval.ms
必須: NO
デフォルト値: 10000
説明: バージョン 1.1.1 以降でサポートされています。ロードが失敗した場合に同じバッチのデータに対して Stream Load を再試行する間隔。
starrocks.columns
必須: NO
デフォルト値: なし
説明: データをロードしたい StarRocks テーブルの列。複数の列を指定でき、カンマ (,) で区切る必要があります。例:"col0,col1,col2"
。
starrocks.column.types
必須: NO
デフォルト値: なし
説明: バージョン 1.1.1 以降でサポートされています。StarRocks テーブルから推測されるデフォルトや デフォルトマッピング を使用する代わりに、Spark の列データ型をカスタマイズします。パラメータ値は、Spark の StructType#toDDL の出力と同じ DDL 形式のスキーマです。例:col0 INT, col1 STRING, col2 BIGINT
。カスタマイズが必要な列のみを指定する必要があります。使用例として、BITMAP または HLL 型の列にデータをロードすることがあります。
starrocks.write.properties.*
必須: NO
デフォルト値: なし
説明: Stream Load の動作を制御するために使用されるパラメータ。例えば、パラメータ starrocks.write.properties.format
はロードされるデータの形式を指定します。CSV や JSON などです。サポートされているパラメータとその説明のリストについては、STREAM LOAD を参照してください。
starrocks.write.properties.format
必須: NO
デフォルト値: CSV
説明: Spark コネクタが各バッチのデータを StarRocks に送信する前に変換するファイル形式。有効な値:CSV および JSON。
starrocks.write.properties.row_delimiter
必須: NO
デフォルト値: \n
説明: CSV 形式のデータの行区切り文字。
starrocks.write.properties.column_separator
必須: NO
デフォルト値: \t
説明: CSV 形式のデータの列区切り文字。
starrocks.write.properties.partial_update
必須: NO
デフォルト値: FALSE
説明: 部分更新を使用するかどうか。有効な値:TRUE
および FALSE
。デフォルト値:FALSE
、この機能を無効にすることを示します。
starrocks.write.properties.partial_update_mode
必須: NO
デフォルト値: row
説明: 部分更新のモードを指定します。有効な値:row
および column
。
- 値
row
(デフォルト)は行モードでの部分更新を意味し、多くの列と小さなバッチでのリアルタイム更新に適しています。 - 値
column
は列モードでの部分更新を意味し、少ない列と多くの行でのバッチ更新に適しています。このようなシナリオでは、列モードを有効にすると更新速度が速くなります。例えば、100 列のテーブルで、すべての行に対して 10 列(全体の 10%)のみが更新される場合、列モードの更新速度は 10 倍速くなります。
starrocks.write.num.partitions
必須: NO
デフォルト値: なし
説明: Spark がデータを書き込む際に並列で使用できるパーティションの数。データ量が少ない場合、パーティション数を減らしてロードの同時実行性と頻度を下げることができます。このパラメータのデフォルト値は Spark によって決定されます。ただし、この方法は Spark Shuffle コストを引き起こす可能性があります。
starrocks.write.partition.columns
必須: NO
デフォルト値: なし
説明: Spark のパーティション列。このパラメータは starrocks.write.num.partitions
が指定されている場合にのみ有効です。このパラメータが指定されていない場合、書き込まれるすべての列がパーティションに使用されます。
starrocks.timezone
必須: NO
デフォルト値: JVM のデフォルトタイムゾーン
説明: バージョン 1.1.1 以降でサポートされています。Spark の TimestampType
を StarRocks の DATETIME
に変換するために使用されるタイムゾーン。デフォルトは ZoneId#systemDefault()
によって返される JVM のタイムゾーンです。形式は Asia/Shanghai
のようなタイムゾーン名、または +08:00
のようなゾーンオフセットです。
Spark と StarRocks のデータ型マッピング
-
デフォルトのデータ型マッピングは以下の通りです:
Spark データ型 StarRocks データ型 BooleanType BOOLEAN ByteType TINYINT ShortType SMALLINT IntegerType INT LongType BIGINT StringType LARGEINT FloatType FLOAT DoubleType DOUBLE DecimalType DECIMAL StringType CHAR StringType VARCHAR StringType STRING StringType JSON DateType DATE TimestampType DATETIME ArrayType ARRAY
注意:
バージョン 1.1.1 以降でサポートされています。詳細な手順については、Load data into columns of ARRAY type を参照してください。 -
データ型マッピングをカスタマイズすることもできます。
例えば、StarRocks テーブルに BITMAP および HLL 列が含まれている場合、Spark はこれらのデータ型をサポートしていません。Spark で対応するデータ型をカスタマイズする必要があります。詳細な手順については、BITMAP および HLL 列にデータをロードする方法を参照してください。BITMAP および HLL はバージョン 1.1.1 以降でサポートされています。
Spark コネクタのアップグレード
バージョン 1.1.0 から 1.1.1 へのアップグレード
- バージョン 1.1.1 以降、Spark コネクタは MySQL の公式 JDBC ドライバである
mysql-connector-java
を提供しません。これはmysql-connector-java
に使用されている GPL ライセンスの制限によるものです。しかし、Spark コネクタは StarRocks のテーブルメタデータに接続するために MySQL JDBC ドライバを必要とするため、ドライバを手動で Spark クラスパスに追加する必要があります。ドライバは MySQL サイト または Maven Central で見つけることができます。 - バージョン 1.1.1 以降、コネクタはデフォルトで Stream Load インターフェースを使用し、バージョン 1.1.0 では Stream Load トランザクションインターフェースを使用していました。Stream Load トランザクションインターフェースを引き続き使用したい場合は、オプション
starrocks.write.max.retries
を0
に設定できます。詳細については、starrocks.write.enable.transaction-stream-load
およびstarrocks.write.max.retries
の説明を参照してください。
例
以下の例は、Spark DataFrames または Spark SQL を使用して Spark コネクタを使用して StarRocks テーブルにデータをロードする方法を示しています。Spark DataFrames はバッチモードと構造化ストリーミングモードの両方をサポートしています。
詳細な例については、Spark Connector Examples を参照してください。
準備
StarRocks テーブルを作成
データベース test
を作成し、主キーテーブル score_board
を作成します。
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
ネットワーク構成
Spark が配置されているマシンが、StarRocks クラスターの FE ノードに http_port
(デフォルト: 8030
)および query_port
(デフォルト: 9030
)を介してアクセスでき、BE ノードに be_http_port
(デフォルト: 8040
)を介してアクセスできることを確認します。
Spark 環境のセットアップ
以下の例は Spark 3.2.4 で実行され、spark-shell
、pyspark
、および spark-sql
を使用します。例を実行する前に、Spark コネクタ JAR ファイルを $SPARK_HOME/jars
ディレクトリに配置してください。
Spark DataFrames でデータをロード
以下の 2 つの例は、Spark DataFrames バッチまたは構造化ストリーミングモードでデータをロードする方法を説明しています。
バッチ
メモリ内でデータを構築し、StarRocks テーブルにデータをロードします。
- Scala または Python を使用して Spark アプリケーションを記述できます。
Scala の場合、spark-shell
で以下のコードスニペットを実行します:
// 1. Create a DataFrame from a sequence.
val data = Seq((1, "starrocks", 100), (2, "spark", 100))
val df = data.toDF("id", "name", "score")
// 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
// You need to modify the options according your own environment.
df.write.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.mode("append")
.save()
Python の場合、pyspark
で以下のコードスニペットを実行します:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("StarRocks Example") \
.getOrCreate()
# 1. Create a DataFrame from a sequence.
data = [(1, "starrocks", 100), (2, "spark", 100)]
df = spark.sparkContext.parallelize(data) \
.toDF(["id", "name", "score"])
# 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
# You need to modify the options according your own environment.
df.write.format("starrocks") \
.option("starrocks.fe.http.url", "127.0.0.1:8030") \
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030") \
.option("starrocks.table.identifier", "test.score_board") \
.option("starrocks.user", "root") \
.option("starrocks.password", "") \
.mode("append") \
.save()
-
StarRocks テーブルのデータをクエリします。
MySQL [test]> SELECT * FROM `score_board`;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
構造化ストリーミング
CSV ファイルからのデータのストリーミング読み取りを構築し、StarRocks テーブルにデータをロードします。
-
ディレクトリ
csv-data
に、以下のデータを含む CSV ファイルtest.csv
を作成します:3,starrocks,100
4,spark,100 -
Scala または Python を使用して Spark アプリケーションを記述できます。
Scala の場合、spark-shell
で以下のコードスニペットを実行します:
import org.apache.spark.sql.types.StructType
// 1. Create a DataFrame from CSV.
val schema = (new StructType()
.add("id", "integer")
.add("name", "string")
.add("score", "integer")
)
val df = (spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
// Replace it with your path to the directory "csv-data".
.load("/path/to/csv-data")
)
// 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
// You need to modify the options according your own environment.
val query = (df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
// replace it with your checkpoint directory
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
Python の場合、pyspark
で以下のコードスニペットを実行します:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
spark = SparkSession \
.builder \
.appName("StarRocks SS Example") \
.getOrCreate()
# 1. Create a DataFrame from CSV.
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("score", IntegerType())
])
df = (
spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
# Replace it with your path to the directory "csv-data".
.load("/path/to/csv-data")
)
# 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
# You need to modify the options according your own environment.
query = (
df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
# replace it with your checkpoint directory
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
-
StarRocks テーブルのデータをクエリします。
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 4 | spark | 100 |
| 3 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.67 sec)
Spark SQL でデータをロード
以下の例は、Spark SQL CLI の INSERT INTO
ステートメントを使用して Spark SQL でデータをロードする方法を説明しています。
-
spark-sql
で以下の SQL ステートメントを実行します:-- 1. Create a table by configuring the data source as `starrocks` and the following options.
-- You need to modify the options according your own environment.
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"=""
);
-- 2. Insert two rows into the table.
INSERT INTO `score_board` VALUES (5, "starrocks", 100), (6, "spark", 100); -
StarRocks テーブルのデータをクエリします。
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 6 | spark | 100 |
| 5 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
ベストプラクティス
主キーテーブルにデータをロードする
このセクションでは、StarRocks 主キーテーブルにデータをロードして部分更新や条件付き更新を実現する方法を示します。これらの機能の詳細な紹介については、Change data through loading を参照してください。これらの例では Spark SQL を使用します。
準備
StarRocks でデータベース test
を作成し、主キーテーブル score_board
を作成します。
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
部分更新
この例では、ロードを通じて name
列のデータのみを更新する方法を示します:
-
MySQL クライアントで StarRocks テーブルに初期データを挿入します。
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);
mysql> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.02 sec) -
Spark SQL クライアントで Spark テーブル
score_board
を作成します。- コネクタに部分更新を行うように指示するオプション
starrocks.write.properties.partial_update
をtrue
に設定します。 - コネクタに書き込む列を指示するオプション
starrocks.columns
を"id,name"
に設定します。
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.write.properties.partial_update"="true",
"starrocks.columns"="id,name"
); - コネクタに部分更新を行うように指示するオプション
-
Spark SQL クライアントでテーブルにデータを挿入し、
name
列のみを更新します。INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'spark-update');
-
MySQL クライアントで StarRocks テーブルをクエリします。
name
の値のみが変更され、score
の値は変更されていないことがわかります。mysql> select * from score_board;
+------+------------------+-------+
| id | name | score |
+------+------------------+-------+
| 1 | starrocks-update | 100 |
| 2 | spark-update | 100 |
+------+------------------+-------+
2 rows in set (0.02 sec)
条件付き更新
この例では、score
列の値に基づいて条件付き更新を行う方法を示します。id
の更新は、新しい score
の値が古い値以上の場合にのみ有効になります。
-
MySQL クライアントで StarRocks テーブルに初期データを挿入します。
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100);
mysql> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.02 sec) -
Spark テーブル
score_board
を以下の方法で作成します。- コネクタに
score
列を条件として使用するよう指示するオプションstarrocks.write.properties.merge_condition
をscore
に設定します。 - Spark コネクタが Stream Load トランザクションインターフェースではなく、Stream Load インターフェースを使用してデータをロードすることを確認します。後者はこの機能をサポートしていません。
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.write.properties.merge_condition"="score"
); - コネクタに
-
Spark SQL クライアントでテーブルにデータを挿入し、
id
が 1 の行を小さいスコア値で更新し、id
が 2 の行を大きいスコア値で更新します。INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'spark-update', 101);
-
MySQL クライアントで StarRocks テーブルをクエリします。
id
が 2 の行のみが変更され、id
が 1 の行は変更されていないことがわかります。mysql> select * from score_board;
+------+--------------+-------+
| id | name | score |
+------+--------------+-------+
| 1 | starrocks | 100 |
| 2 | spark-update | 101 |
+------+--------------+-------+
2 rows in set (0.03 sec)
BITMAP 型の列にデータをロードする
BITMAP
は、UV のカウントなどの正確なカウントディスティンクトを高速化するためによく使用されます。Use Bitmap for exact Count Distinct を参照してください。ここでは、UV のカウントを例にとり、BITMAP
型の列にデータをロードする方法を示します。BITMAP
はバージョン 1.1.1 以降でサポートされています。
-
StarRocks 集計テーブルを作成します。
データベース
test
に、列visit_users
がBITMAP
型として定義され、集計関数BITMAP_UNION
が設定された集計テーブルpage_uv
を作成します。CREATE TABLE `test`.`page_uv` (
`page_id` INT NOT NULL COMMENT 'page ID',
`visit_date` datetime NOT NULL COMMENT 'access time',
`visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
) ENGINE=OLAP
AGGREGATE KEY(`page_id`, `visit_date`)
DISTRIBUTED BY HASH(`page_id`); -
Spark テーブルを作成します。
Spark テーブルのスキーマは StarRocks テーブルから推測され、Spark は
BITMAP
型をサポートしていません。そのため、Spark で対応する列データ型をカスタマイズする必要があります。例えば、BIGINT
として、オプション"starrocks.column.types"="visit_users BIGINT"
を設定します。Stream Load を使用してデータを取り込む際、コネクタはto_bitmap
関数を使用してBIGINT
型のデータをBITMAP
型に変換します。spark-sql
で以下の DDL を実行します:CREATE TABLE `page_uv`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.page_uv",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.column.types"="visit_users BIGINT"
); -
StarRocks テーブルにデータをロードします。
spark-sql
で以下の DML を実行します:INSERT INTO `page_uv` VALUES
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
(1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
(2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23); -
StarRocks テーブルからページ UV を計算します。
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
+---------+-----------------------------+
| page_id | count(DISTINCT visit_users) |
+---------+-----------------------------+
| 2 | 1 |
| 1 | 3 |
+---------+-----------------------------+
2 rows in set (0.01 sec)
注意:
コネクタは
to_bitmap
関数を使用して、Spark のTINYINT
、SMALLINT
、INTEGER
、およびBIGINT
型のデータを StarRocks のBITMAP
型に変換し、他の Spark データ型にはbitmap_hash
関数を使用します。
HLL 型の列にデータをロードする
HLL
は、近似カウントディスティンクトに使用できます。Use HLL for approximate count distinct を参照してください。
ここでは、UV のカウントを例にとり、HLL
型の列にデータをロードする方法を示します。HLL
はバージョン 1.1.1 以降でサポートされています。
-
StarRocks 集計テーブルを作成します。
データベース
test
に、列visit_users
がHLL
型として定義され、集計関数HLL_UNION
が設定された集計テーブルhll_uv
を作成します。CREATE TABLE `hll_uv` (
`page_id` INT NOT NULL COMMENT 'page ID',
`visit_date` datetime NOT NULL COMMENT 'access time',
`visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
) ENGINE=OLAP
AGGREGATE KEY(`page_id`, `visit_date`)
DISTRIBUTED BY HASH(`page_id`); -
Spark テーブルを作成します。
Spark テーブルのスキーマは StarRocks テーブルから推測され、Spark は
HLL
型をサポートしていません。そのため、Spark で対応する列データ型をカスタマイズする必要があります。例えば、BIGINT
として、オプション"starrocks.column.types"="visit_users BIGINT"
を設定します。Stream Load を使用してデータを取り込む際、コネクタはhll_hash
関数を使用してBIGINT
型のデータをHLL
型に変換します。spark-sql
で以下の DDL を実行します:CREATE TABLE `hll_uv`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.hll_uv",
"starrocks.user"="root",
"starrocks.password"="",
"starrocks.column.types"="visit_users BIGINT"
); -
StarRocks テーブルにデータをロードします。
spark-sql
で以下の DML を実行します:INSERT INTO `hll_uv` VALUES
(3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
(4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
(3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674); -
StarRocks テーブルからページ UV を計算します。
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
+---------+-----------------------------+
| page_id | count(DISTINCT visit_users) |
+---------+-----------------------------+
| 4 | 1 |
| 3 | 2 |
+---------+-----------------------------+
2 rows in set (0.01 sec)
ARRAY 型の列にデータをロードする
以下の例は、ARRAY
型の列にデータをロードする方法を説明しています。
-
StarRocks テーブルを作成します。
データベース
test
に、INT
列と 2 つのARRAY
列を含む主キーテーブルarray_tbl
を作成します。CREATE TABLE `array_tbl` (
`id` INT NOT NULL,
`a0` ARRAY<STRING>,
`a1` ARRAY<ARRAY<INT>>
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
; -
StarRocks にデータを書き込みます。
一部のバージョンの StarRocks は
ARRAY
列のメタデータを提供していないため、コネクタはこの列の対応する Spark データ型を推測できません。ただし、オプションstarrocks.column.types
で列の対応する Spark データ型を明示的に指定できます。この例では、オプションをa0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>
として設定できます。spark-shell
で以下のコードを実行します:val data = Seq(
| (1, Seq("hello", "starrocks"), Seq(Seq(1, 2), Seq(3, 4))),
| (2, Seq("hello", "spark"), Seq(Seq(5, 6, 7), Seq(8, 9, 10)))
| )
val df = data.toDF("id", "a0", "a1")
df.write
.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.array_tbl")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.option("starrocks.column.types", "a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>")
.mode("append")
.save() -
StarRocks テーブルのデータをクエリします。
MySQL [test]> SELECT * FROM `array_tbl`;
+------+-----------------------+--------------------+
| id | a0 | a1 |
+------+-----------------------+--------------------+
| 1 | ["hello","starrocks"] | [[1,2],[3,4]] |
| 2 | ["hello","spark"] | [[5,6,7],[8,9,10]] |
+------+-----------------------+--------------------+
2 rows in set (0.01 sec)