メインコンテンツまでスキップ
バージョン: 3.1

SPARK LOAD

説明

Spark Load は外部の Spark リソースを通じてインポートされたデータを前処理し、大量の StarRocks データのインポート性能を向上させ、StarRocks クラスターの計算リソースを節約します。主に初期移行や大量のデータを StarRocks にインポートするシナリオで使用されます。

Spark Load は非同期のインポート方法です。ユーザーは MySQL プロトコルを通じて Spark タイプのインポートタスクを作成し、SHOW LOAD を通じてインポート結果を確認する必要があります。

注意

  • StarRocks テーブルにデータをロードするには、その StarRocks テーブルに対して INSERT 権限を持つユーザーである必要があります。INSERT 権限がない場合は、GRANT に従って、使用する StarRocks クラスターに接続するユーザーに INSERT 権限を付与してください。
  • Spark Load を使用して StarRocks テーブルにデータをロードする場合、StarRocks テーブルのバケット列は DATE、DATETIME、または DECIMAL 型であってはなりません。

構文

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH RESOURCE resource_name
[resource_properties]
[opt_properties]
  1. load_label

現在インポートされているバッチのラベル。データベース内で一意です。

構文:

[database_name.]your_label
  1. data_desc

インポートされたデータのバッチを説明するために使用されます。

構文:

DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (col2, ...)]
[SET (k1 = func(k2))]
[WHERE predicate]

DATA FROM TABLE hive_external_tbl
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]

注意

file_path:

ファイルパスは 1 つのファイルを指定することができ、または * ワイルドカードを使用してディレクトリ内のすべてのファイルを指定することができます。ワイルドカードはファイルに一致する必要があり、ディレクトリには一致しません。

hive_external_tbl:

Hive 外部テーブル名。
インポートされた StarRocks テーブルの列は Hive 外部テーブルに存在する必要があります。
各ロードタスクは 1 つの Hive 外部テーブルからのロードのみをサポートします。
file_path モードと同時に使用することはできません。

PARTITION:

このパラメータが指定されている場合、指定されたパーティションのみがインポートされ、インポートされたパーティション外のデータはフィルタリングされます。
指定されていない場合、デフォルトですべてのテーブルのパーティションがインポートされます。

NEGATIVE:

このパラメータが指定されている場合、"ネガティブ" データのバッチをロードすることに相当します。以前にインポートされた同じバッチのデータを相殺するために使用されます。
このパラメータは、値列が存在し、値列の集計タイプが SUM のみの場合にのみ適用されます。

column_separator:

インポートファイルの列区切り文字を指定します。デフォルトは \t です。
不可視文字の場合は、\ \ x を前置して区切り文字を 16 進数で表現する必要があります。
たとえば、Hive ファイルの区切り文字 \x01 は "\ \ x01" と指定されます。

file_type:

インポートされたファイルのタイプを指定するために使用されます。現在、サポートされているファイルタイプは csv、orc、parquet です。

column_list:

インポートファイルの列とテーブルの列の対応を指定するために使用されます。
インポートファイルで列をスキップする必要がある場合、その列をテーブルに存在しない列名として指定します。

構文:
(col_name1, col_name2, ...)

SET:

このパラメータを指定すると、ソースファイルの列を関数に従って変換し、変換された結果をテーブルにインポートできます。構文は column_name = expression です。
Spark SQL のビルドイン関数のみがサポートされています。詳細は https://spark.apache.org/docs/2.4.6/api/sql/index.html を参照してください。
理解を助けるためにいくつかの例を示します。
例 1: テーブルに "c1, c2, c3" の 3 つの列があり、ソースファイルの最初の 2 列が (c1, c2) に対応し、最後の 2 列の合計が C3 に対応する場合、列 (c1, c2, tmp_c3, tmp_c4) set (c3 = tmp_c3 + tmp_c4) を指定する必要があります。
例 2: テーブルに "year, month, day" の 3 つの列があり、ソースファイルには "2018-06-01 01:02:03" の形式で 1 つの時間列しかない場合。
その場合、columns (tmp_time) set (year = year (tmp_time), month = month (tmp_time), day = day (tmp_time)) を指定してインポートを完了できます。

WHERE:

変換されたデータをフィルタリングし、WHERE 条件を満たすデータのみがインポートされます。WHERE ステートメントではテーブル内の列名のみを参照できます。
  1. resource_name

使用される Spark リソースの名前は SHOW RESOURCES コマンドを通じて確認できます。

  1. resource_properties

一時的な必要がある場合、たとえば Spark や HDFS の設定を変更する場合、ここでパラメータを設定できます。この設定は特定の Spark ロードジョブでのみ有効であり、StarRocks クラスターの既存の設定には影響しません。

  1. opt_properties

特別なパラメータを指定するために使用されます。

構文:

[PROPERTIES ("key"="value", ...)]

次のパラメータを指定できます: timeout: インポート操作のタイムアウトを指定します。デフォルトのタイムアウトは 4 時間です。秒単位。 max_filter_ratio:フィルタリング可能なデータの最大許容割合(非標準データなどの理由)。デフォルトはゼロトレランスです。 strict mode: データを厳密に制限するかどうか。デフォルトは false です。 timezone: タイムゾーンに影響されるいくつかの関数(strftime / alignment_timestamp/from_unixtime など)のタイムゾーンを指定します。詳細は [time zone] ドキュメントを参照してください。指定されていない場合は "Asia / Shanghai" タイムゾーンが使用されます。

  1. インポートデータ形式の例

int (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234 float (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356 date (DATE/DATETIME) :2017-10-03, 2017-06-13 12:34:03. (注: 他の日付形式については、インポートコマンドで strftime または time_format 関数を使用して変換できます) string クラス (CHAR/VARCHAR): "I am a student", "a"

NULL 値: \ N

  1. HDFS からデータのバッチをインポートし、タイムアウト時間とフィルタリング比率を指定します。Spark の名前として my_spark リソースを使用します。

    LOAD LABEL example_db.label1
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/file")
    INTO TABLE `my_table`
    )
    WITH RESOURCE 'my_spark'
    PROPERTIES
    (
    "timeout" = "3600",
    "max_filter_ratio" = "0.1"
    );

    ここで hdfs_host は namenode のホスト、hdfs_port は fs.defaultfs ポート(デフォルト 9000)です。

  2. HDFS から "ネガティブ" データのバッチをインポートし、区切り文字をカンマとして指定し、ワイルドカード * を使用してディレクトリ内のすべてのファイルを指定し、Spark リソースの一時パラメータを指定します。

    LOAD LABEL example_db.label3
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/*")
    NEGATIVE
    INTO TABLE `my_table`
    COLUMNS TERMINATED BY ","
    )
    WITH RESOURCE 'my_spark'
    (
    "spark.executor.memory" = "3g",
    "broker.username" = "hdfs_user",
    "broker.password" = "hdfs_passwd"
    );
  3. HDFS からデータのバッチをインポートし、パーティションを指定し、インポートファイルの列にいくつかの変換を行います。以下のようにします。

    テーブル構造は次のとおりです:
    k1 varchar(20)
    k2 int

    データファイルには 1 行のデータしかないと仮定します:

    Adele,1,1

    データファイルの各列はインポートステートメントで指定された各列に対応します:
    k1,tmp_k2,tmp_k3

    変換は次のとおりです:

    1. k1: 変換なし
    2. k2: tmp_k2 と tmp_k3 の合計

    LOAD LABEL example_db.label6
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/file")
    INTO TABLE `my_table`
    PARTITION (p1, p2)
    COLUMNS TERMINATED BY ","
    (k1, tmp_k2, tmp_k3)
    SET (
    k2 = tmp_k2 + tmp_k3
    )
    )
    WITH RESOURCE 'my_spark';
  4. ファイルパスからパーティションフィールドを抽出します

    必要に応じて、ファイルパス内のパーティションフィールドはテーブルで定義されたフィールドタイプに従って解決されます。これは Spark の Partition Discovery 機能に似ています。

    LOAD LABEL example_db.label10
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing/*/*")
    INTO TABLE `my_table`
    (k1, k2, k3)
    COLUMNS FROM PATH AS (city, utc_date)
    SET (uniq_id = md5sum(k1, city))
    )
    WITH RESOURCE 'my_spark';

    hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing ディレクトリには次のファイルが含まれています:

    [hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...]

    ファイルパス内の city と utc_date フィールドが抽出されます

  5. インポートするデータをフィルタリングします。k1 の値が 10 を超える列のみがインポートされます。

    LOAD LABEL example_db.label10
    (
    DATA INFILE("hdfs://hdfs_host:hdfs_port/user/starRocks/data/input/file")
    INTO TABLE `my_table`
    WHERE k1 > 10
    )
    WITH RESOURCE 'my_spark';
  6. Hive 外部テーブルからインポートし、ソーステーブルの uuid 列をグローバル辞書を通じてビットマップ型に変換します。

    LOAD LABEL db1.label1
    (
    DATA FROM TABLE hive_t1
    INTO TABLE tbl1
    SET
    (
    uuid=bitmap_dict(uuid)
    )
    )
    WITH RESOURCE 'my_spark';