CREATE ROUTINE LOAD
Try Routine Load out in this Quick Start
Routine Load は Apache Kafka® からメッセージを継続的に消費し、データを StarRocks にロードすることができます。Routine Load は Kafka クラスターから CSV、JSON、Avro(v3.0.1 以降でサポート)データを消費し、plaintext、ssl、sasl_plaintext、sasl_ssl などの複数のセキュリティプロトコルを介して Kafka にアクセスできます。
このトピックでは、CREATE ROUTINE LOAD ステートメントの構文、パラメータ、および例について説明します。
NOTE
- Routine Load の適用シナリオ、原則、および基本操作については、 Load data using Routine Load を参照してください。
- StarRocks テーブルにデータをロードするには、StarRocks テーブルに対 して INSERT 権限を持つユーザーとしてのみ行うことができます。INSERT 権限がない場合は、 GRANT に従って、使用するユーザーに INSERT 権限を付与してください。
Syntax
CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
Parameters
database_name, job_name, table_name
database_name
オプション。StarRocks データベースの名前。
job_name
必須。Routine Load ジョブの名前。1 つのテーブルは複数の Routine Load ジョブからデータを受け取ることができます。識別可能な情報、例えば Kafka トピック名やジョブ作成時刻などを使用して、意味のある Routine Load ジョブ名を設定することをお勧めします。同じデータベース内で Routine Load ジョブの名前は一意でなければなりません。
table_name
必須。データがロードされる StarRocks テーブルの名前。
load_properties
オプション。データのプロパティ。構文:
[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]
COLUMNS TERMINATED BY
CSV 形式のデータのカラム区切り文字。デフォルトのカラム区切り文字は \t(タブ)です。例えば、カラム区切り文字をカンマに指定するには COLUMNS TERMINATED BY "," を使用します。
Note
- ここで指定したカラム区切り文字が、取り込むデータのカラム区切り文字と同じであることを確認してください。
- カンマ(,)、タブ、パイプ(|)などの UTF-8 文字列をテキストデリミタとして使用できますが、その長さは 50 バイトを超えてはなりません。
- Null 値は
\Nを使用して示されます。例えば、データレコードが 3 つのカラムで構成されており、データレ コードが最初と 3 番目のカラムにデータを保持しているが、2 番目のカラムにはデータを保持していない場合、この状況では 2 番目のカラムに\Nを使用して Null 値を示す必要があります。これは、レコードをa,\N,bとしてコンパイルする必要があることを意味します。a,,bはレコードの 2 番目のカラムが空の文字列を保持していることを示します。
ROWS TERMINATED BY
CSV 形式のデータの行区切り文字。デフォルトの行区切り文字は \n です。
COLUMNS
ソースデータのカラムと StarRocks テーブルのカラム間のマッピング。詳細については、このトピックの Column mapping を参照してください。
column_name: ソースデータのカラムが計算なしで StarRocks テーブルのカラムにマッピングできる場合、カラム名を指定するだけで済みます。これらのカラムはマッピングされたカラムと呼ばれます。column_assignment: ソースデータのカラムが直接 StarRocks テーブルのカラムにマッピングできず、データロード前に関数を使用してカラムの値を計算する必要がある場合、exprで計算関数を指定する必要があります。これらのカラムは派生カラムと呼ばれます。StarRocks は最初にマッピングされたカラムを解析するため、派生カラムはマッピングされたカラムの後に配置することをお勧めします。
WHERE
フィルター条件。フィルター条件を満たすデータのみが StarRocks にロードされます。例えば、col1 の値が 100 より大きく、col2 の値が 1000 と等しい行のみを取り込みたい場合、WHERE col1 > 100 and col2 = 1000 を使用できます。
NOTE
フィルター条件で指定されたカラムは、ソースカラムまたは派生カラムであることができます。
PARTITION
StarRocks テーブルがパーティション p0、p1、p2、p3 に分散されており、StarRocks で p1、p2、p3 にのみデータをロードし、p0 に保存されるデータをフィルタリングしたい場合、フィルター条件として PARTITION(p1, p2, p3) を指定できます。デフォルトでは、このパラメータを指定しない場合、データはすべてのパーティションにロードされます。例:
PARTITION (p1, p2, p3)
TEMPORARY PARTITION
データをロードしたい temporary partition の名前。複数の一時パーティションを指定することができ、カンマ(,)で区切る必要がありま す。
job_properties
必須。ロードジョブのプロパティ。構文:
PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
| Property | Required | Description |
|---|---|---|
| desired_concurrent_number | No | 単一の Routine Load ジョブの期待されるタスク並行性。デフォルト値: 3。実際のタスク並行性は、複数のパラメータの最小値によって決定されます: min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)。
|
| max_batch_interval | No | タスクのスケジューリング間隔、つまりタスクが実行される頻度。単位: 秒。値の範囲: 5 ~ 60。デフォルト値: 10。10 秒以上の値を設定することをお勧めします。スケジューリングが 10 秒未満の場合、ロード頻度が高すぎるために多くのタブレットバージョンが生成されます。 |
| max_batch_rows | No | このプロパティはエラーデータ検出ウィンドウを定義するためにのみ使用されます。ウィンドウは、単一の Routine Load タスクによって消費されるデータの行数です。値は 10 * max_batch_rows です。デフォルト値は 10 * 200000 = 2000000 です。Routine Load タスクは、エラーデータ検出ウィンドウ内でエラーデータを検出します。エラーデータとは、StarRocks が解析できないデータ、例えば無効な JSON 形式のデータを指します。 |
| max_error_number | No | エラーデータ検出ウィンドウ内で許容されるエラーデータ行の最大数。この値を超えると、ロードジョブは一時停止します。 SHOW ROUTINE LOAD を実行し、ErrorLogUrls を使用してエラーログを表示できます。その後、エラーログに従って Kafka のエラーを修正できます。デフォルト値は 0 で、エラーデータ行は許可されません。 NOTE
|
| max_filter_ratio | No | ロードジョブの最大エラー許容度。エラー許容度は、ロードジョブによって要求されたすべてのデータレコードの中で、データ品質が不十分なためにフィルタリングされる可能性のあるデータレコードの最大割合です。有効な値: 0 から 1。デフォルト値: 1(実際には効果を発揮しません)。0 に設定することをお勧めします。これにより、不適格なデータレコードが検出された場合、ロードジョブが一時停止し、データの正確性が保証されます。不適格なデータレコードを無視したい場合は、このパラメータを 0 より大きい値に設定できます。これにより、データファイルに不適格なデータレコードが含まれていても、ロードジョブは成功します。 NOTE
|
| strict_mode | No | strict mode を有効にするかどうかを指定します。有効な値: true と false。デフォルト値: false。strict mode が有効な場合、ロードされたデータのカラムの値が NULL であり、ターゲットテーブルがそのカラムに NULL 値を許可しない場合、データ行はフィルタリングされます。 |
| log_rejected_record_num | No | ログに記録できる不適格なデータ行の最大数を指定します。このパラメータは v3.1 以降でサポートされています。有効な値: 0、-1、および任意の非ゼロの正の整数。デフォルト値: 0。
|
| timezone | No | ロードジョブで使用されるタイムゾーン。デフォルト値: Asia/Shanghai。このパラメータの値は、strftime()、alignment_timestamp()、from_unixtime() などの関数によって返される結果に影響します。このパラメータで指定されたタイムゾーンはセッションレベルのタイムゾーンです。詳細については、 Configure a time zone を参照してください。 |
| partial_update | No | 部分更新を使用するかどうか。 有効な値: TRUE と FALSE。 デフォルト値: FALSE、この機能を無効にすることを示します。 |
| merge_condition | No | データを更新するかどうかを判断する条件として使用するカラムの名前を指定します。このカラムにロードされるデータの値がこのカラムの現在の値以上である場合にのみデータが更新されます。 NOTE 条件付き更新をサポートするのは主キーテーブルのみです。指定するカラムは主キーのカラムであってはなりません。 |
| format | No | ロードするデータの形式。 有効な値: CSV、JSON、および Avro(v3.0.1 以降でサポート)。 デフォルト値: CSV。 |
| trim_space | No | データファイルが CSV 形式の場合、カラム区切り文字の前後のスペースを削除するかどうかを指定します。タイプ: BOOLEAN。デフォルト値: false。一部のデータベースでは、データを CSV 形式のデータファイルとしてエクスポートする際に、カラム区切り文字にスペースが追加されます。これらのスペースは、その位置に応じて先行スペースまたは後続スペースと呼ばれます。 trim_space パラメータを設定することで、StarRocks がデータロード中にこれらの不要なスペースを削除できるようにします。StarRocks は、 enclose で指定された文字で囲まれたフィールド内のスペース(先行スペースおよび後続スペースを含む)を削除しないことに注意してください。例えば、次のフィールド値はパイプ(|)をカラム区切り文 字として使用し、二重引用符(")を enclose で指定された文字として使用しています: | "Love StarRocks" |。trim_space を true に設定すると、StarRocks は前述のフィールド値を |"Love StarRocks"| として処理します。 |
| enclose | No | データファイルが CSV 形式の場合、 RFC4180 に従ってフィールド値を囲むために使用される文字を指定します。タイプ: 単一バイト文字。デフォルト値: NONE。最も一般的な文字は単一引用符(')および二重引用符(")です。enclose で指定された文字で囲まれたすべての特殊文字(行区切り文字およびカラム区切り文字を含む)は通常の記号と見なされます。StarRocks は、enclose で指定された文字として任意の単一バイト文字を指定できるため、RFC4180 よりも多くのことができます。フィールド値に enclose で指定された文字が含まれている場合、同じ文字を使用してその enclose で指定された文字をエスケープできます。例えば、enclose を " に設定し、フィールド値が a "quoted" c の場合、このフィールド値をデータファイルに "a ""quoted"" c" として入力できます。 |
| escape | No | 行区切り文字、カラム区切り文字、エスケープ文字、enclose で指定された文字などのさまざまな特殊文字をエスケープするために使用される文字を指定しま す。これらは StarRocks によって通常の文字と見なされ、それらが存在するフィールド値の一部として解析されます。タイプ: 単一バイト文字。デフォルト値: NONE。最も一般的な文字はスラッシュ(\)で、SQL ステートメントでは二重スラッシュ(\\)として記述する必要があります。NOTE escape で指定された文字は、各ペアの enclose で指定された文字の内側と外側の両方に適用されます。次のような例があります:
|
| strip_outer_array | No | JSON 形式のデータの最外部の配列構造を削除するかどうかを指定します。有効な値: true と false。デフォルト値: false。実際のビジネスシナリオでは、JSON 形式のデータには [] で示される最外部の配列構造がある場合があります。この状況では、このパラメータを true に設定することをお勧めします。これにより、StarRocks は最外部の角括弧 [] を削除し、各内部配列を個別のデータレコードとしてロードします。このパラメータを false に設定すると、StarRocks は JSON 形式のデータ全体を 1 つの配列として解析し、その配列を 1 つの データレコードとしてロードします。JSON 形式のデータ [{"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ] を例として使用します。このパラメータを true に設定すると、{"category" : 1, "author" : 2} と {"category" : 3, "author" : 4} は 2 つの別々のデータレコードとして解析され、2 つの StarRocks データ行にロードされます。 |
| jsonpaths | No | JSON 形式のデータからロードしたいフィールドの名前。このパラメータの値は有効な JsonPath 式です。詳細については、 StarRocks table contains derived columns whose values are generated by using expressions を参照してください。 |
| json_root | No | ロードする JSON 形式のデータのルート要素。StarRocks は json_root を通じてルートノードの要素を抽出して解析します。デフォルトでは、このパラメータの値は空であり、すべての JSON 形式のデータがロードされることを示します。詳細については、 Specify the root element of the JSON-formatted data to be loaded を参照してください。 |
| task_consume_second | No | 指定された Routine Load ジョブ内の各 Routine Load タスクがデータを消費する最大時間。単位: 秒。 FE dynamic parameters routine_load_task_consume_second(クラスター内のすべての Routine Load ジョブに適用される)とは異なり、このパラメータは個々の Routine Load ジョブ に特化しており、より柔軟です。このパラメータは v3.1.0 以降でサポートされています。
|
| task_timeout_second | No | 指定された Routine Load ジョブ内の各 Routine Load タスクのタイムアウト期間。単位: 秒。 FE dynamic parameter routine_load_task_timeout_second(クラスター内のすべての Routine Load ジョブに適用される)とは異なり、このパラメータは個々の Routine Load ジョブに特化しており、より柔軟です。このパラメータは v3.1.0 以降でサポートされています。
|
data_source, data_source_properties
必須。データソースと関連するプロパティ。
FROM <data_source>
("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
data_source
必須。ロードしたいデータのソース。有効な値: KAFKA。
data_source_properties
データソースのプロパティ。
| Property | Required | Description |
|---|---|---|
| kafka_broker_list | Yes | Kafka のブローカー接続情報。形式は <kafka_broker_ip>:<broker_ port> です。複数のブローカーはカンマ(,)で区切られます。Kafka ブローカーが使用するデフォルトのポートは 9092 です。例: "kafka_broker_list" = ""xxx.xx.xxx.xx:9092,xxx.xx.xxx.xx:9092"。 |
| kafka_topic | Yes | 消費される Kafka トピック。Routine Load ジョブは 1 つのトピックからのみメッセージを消費できます。 |
| kafka_partitions | No | 消費される Kafka パーティション。例: "kafka_partitions" = "0, 1, 2, 3"。このプロパティが指定されていない場合、デフォルトですべてのパーティションが消費されます。 |
| kafka_offsets | No | kafka_partitions で指定された Kafka パーティションでデータを消費し始めるオフセット。指定されていない場合、Routine Load ジョブは kafka_partitions の最新のオフセットからデータを消費します。有効な値:
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000"。 |
| property.kafka_default_offsets | No | すべての消費者パーティションのデフォルトの開始オフセット。このプロパティでサポートされる値は、kafka_offsets プロパティの値と同じです。 |
| confluent.schema.registry.url | No | Avro スキーマが登録されている Schema Registry の URL。StarRocks はこの URL を使用して Avro スキーマを取得します。形式は次のとおりです:confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname or ip address>[:<port>] |
データソース関連のプロパティをさらに指定する
Kafka コマンドライン --property を使用するのと同等の追加のデータソース(Kafka)関連のプロパティを指定できます。サポートされているプロパティの詳細については、 librdkafka configuration properties の Kafka コンシューマークライアントのプロパティを参照してください。
NOTE
プロパティの値がファイル名である場合、ファイル名の前にキーワード
FILE:を追加してください。ファイルの作成方法については、 CREATE FILE を参照してください。
- 消費されるすべてのパーティションのデフォルトの初期オフセットを指定する
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
- Routine Load ジョブで使用されるコンシューマーグループの ID を指定する
"property.group.id" = "group_id_0"
property.group.id が指定されていない場合、StarRocks は Routine Load ジョブの名前に基づいてランダムな値を生成します。形式は {job_name}_{random uuid} で、例: simple_job_0a64fe25-3983-44b2-a4d8-f52d3af4c3e8。
-
BE が Kafka にアクセスするために使用するセキュリティプロトコルと関連するパラメータを指定する
セキュリティプロトコルは
plaintext(デフォルト)、ssl、sasl_plaintext、またはsasl_sslとして指定できます。指定されたセキュリティプロトコルに応じて関連するパラメータを設定する必要があります。セキュリティプロトコルが
sasl_plaintextまたはsasl_sslに設定されている場合、次の SASL 認証メカニズムがサポートされています:- PLAIN
- SCRAM-SHA-256 および SCRAM-SHA-512
- OAUTHBEARER
- GSSAPI (Kerberos)
例:
-
SSL セキュリティプロトコルを使用して Kafka にアクセスする:
"property.security.protocol" = "ssl", -- セキュリティプロトコルを SSL として指定します。
"property.ssl.ca.location" = "FILE:ca-cert", -- Kafka ブローカーのキーを検証するための CA 証明書のファイルまたはディレクトリパス。
-- Kafka サーバーがクライアント認証を有効にしている場合、次の 3 つのパラメータも必要です:
"property.ssl.certificate.location" = "FILE:client.pem", -- 認証に使用されるクライアントの公開鍵のパス。
"property.ssl.key.location" = "FILE:client.key", -- 認証に使用されるクライアントの秘密鍵のパス。
"property.ssl.key.password" = "xxxxxx" -- クライアントの秘密鍵のパスワード。 -
SASL_PLAINTEXT セキュリティプロトコルと SASL/PLAIN 認証メカニズムを使用して Kafka にアクセスする:
"property.security.protocol" = "SASL_PLAINTEXT", -- セキュリティプロトコルを SASL_PLAINTEXT として指定します。
"property.sasl.mechanism" = "PLAIN", -- SASL メカニズムを PLAIN として指定します。これはシンプルなユーザー名/パスワード認証メカニズムです。
"property.sasl.username" = "admin", -- SASL ユーザー名。
"property.sasl.password" = "xxxxxx" -- SASL パスワード。 -
SASL_PLAINTEXT セキュリティプロトコルと SASL/GSSAPI (Kerberos) 認証メカニズムを使用して Kafka にアクセスする:
"property.security.protocol" = "SASL_PLAINTEXT", -- セキュリティプロトコルを SASL_PLAINTEXT として指定します。
"property.sasl.mechanism" = "GSSAPI", -- SASL 認証メカニ ズムを GSSAPI として指定します。デフォルト値は GSSAPI です。
"property.sasl.kerberos.service.name" = "kafka", -- ブローカーサービス名。デフォルト値は kafka です。
"property.sasl.kerberos.keytab" = "/home/starrocks/starrocks.keytab", -- クライアント keytab の場所。
"property.sasl.kerberos.principal" = "starrocks@YOUR.COM" -- Kerberos プリンシパル。注記-
StarRocks v3.1.4 以降、SASL/GSSAPI (Kerberos) 認証がサポートされています。
-
SASL 関連のモジュールは BE マシンにインストールする必要があります。
# Debian/Ubuntu:
sudo apt-get install libsasl2-modules-gssapi-mit libsasl2-dev
# CentOS/Redhat:
sudo yum install cyrus-sasl-gssapi cyrus-sasl-devel
-
FE と BE の設定項目
Routine Load に関連する FE と BE の設定項目については、 configuration items を参照してください。
Column mapping
CSV 形式のデータをロードするためのカラムマッピングを設定する
CSV 形式のデータのカラムが StarRocks テーブルのカラムに順番に 1 対 1 でマッピングできる場 合、データと StarRocks テーブルの間のカラムマッピングを設定する必要はありません。
CSV 形式のデータのカラムが StarRocks テーブルのカラムに順番に 1 対 1 でマッピングできない場合、columns パラメータを使用してデータファイルと StarRocks テーブルの間のカラムマッピングを設定する必要があります。これには次の 2 つのユースケースが含まれます:
-
カラム数は同じだがカラムの順序が異なる。また、データファイルからのデータは StarRocks テーブルの対応するカラムにロードされる前に関数によって計算される必要はありません。
-
columnsパラメータでは、データファイルのカラムが配置されている順序と同じ順序で StarRocks テーブルのカラム名を指定する必要があります。 -
例えば、StarRocks テーブルは順番に
col1、col2、col3の 3 つのカラムで構成され、データファイルも 3 つのカラムで構成され、順番に StarRocks テーブルのカラムcol3、col2、col1にマッピングできます。この場合、"columns: col3, col2, col1"と指定する必要があります。
-
-
カラム数が異なり、カラムの順序も異なる。また、データファイルからのデータは StarRocks テーブルの対応するカラムにロードされる前に関数によって計算される必要があります。
columnsパラメータでは、データファイルのカラムが配置されている順序と同じ順序で StarRocks テーブルのカラム名を指定し、データを計算するために使用する関数を指定する必要があ ります。次の 2 つの例があります:- StarRocks テーブルは順番に
col1、col2、col3の 3 つのカラムで構成されています。データファイルは 4 つのカラムで構成されており、そのうち最初の 3 つのカラムは順番に StarRocks テーブルのカラムcol1、col2、col3にマッピングでき、4 番目のカラムは StarRocks テーブルのカラムにマッピングできません。この場合、データファイルの 4 番目のカラムに一時的な名前を指定する必要があり、その一時的な名前は StarRocks テーブルのカラム名と異なる必要があります。例えば、"columns: col1, col2, col3, temp"と指定できます。この場合、データファイルの 4 番目のカラムは一時的にtempと名付けられます。 - StarRocks テーブルは順番に
year、month、dayの 3 つのカラムで構成されています。データファイルはyyyy-mm-dd hh:mm:ss形式の日付と時刻の値を含む 1 つのカラムで構成されています。この場合、"columns: col, year = year(col), month=month(col), day=day(col)"と指定できます。この場合、colはデータファイルのカラムの一時的な名前であり、関数year = year(col)、month=month(col)、day=day(col)はデータファイルのカラムcolからデータを抽出し、対応する StarRocks テーブルのカラムにデータをロードするために使用されます。例えば、year = year(col)はデータファイルのカラムcolからyyyyデータを抽出し、StarRocks テーブルのカラムyearにデータをロードするために使用さ れます。
- StarRocks テーブルは順番に
詳細な例については、 Configure column mapping を参照してください。
JSON 形式または Avro 形式のデータをロードするためのカラムマッピングを設定する
NOTE
v3.0.1 以降、StarRocks は Routine Load を使用して Avro データのロードをサポートしています。JSON または Avro データをロードする場合、カラムマッピングと変換の設定は同じです。したがって、このセクションでは、JSON データを例にとって設定を紹介します。
JSON 形式のデータのキーが StarRocks テーブルのカラムと同じ名前を持っている場合、シンプルモードを使用して JSON 形式のデータをロードできます。シンプルモードでは、jsonpaths パラメータを指定する必要はありません。このモードでは、JSON 形式のデータは {} で示されるオブジェクトである必要があります。例えば、{"category": 1, "author": 2, "price": "3"} のように。この例では、category、author、price はキー名であり、これらのキーは名前で 1 対 1 で StarRocks テーブルのカラム category、author、price にマッピングできます。詳細な例については、 simple mode を参照してください。
JSON 形式のデータのキーが StarRocks テーブルのカラムと異なる名前を持っている場合、マッチドモードを使用して JSON 形式のデータをロードできます。マッチドモードでは、jsonpaths および COLUMNS パラメータを使用して JSON 形式のデータと StarRocks テーブルの間のカラムマッピングを指定する必要があります:
jsonpathsパラメータでは、JSON 形式のデータに配置されている順序で JSON キーを指定します。COLUMNSパラメータでは、JSON キーと StarRocks テーブルのカラム間のマッピングを指定します:COLUMNSパラメータで指定されたカラム名は、JSON 形式のデータに順番に 1 対 1 でマッピングされます。COLUMNSパラメータで指定されたカラム名は、名前で StarRocks テーブルのカラムに 1 対 1 でマッピングされます。
詳細な例については、 StarRocks table contains derived columns whose values are generated by using expressions を参照してください。
Examples
CSV 形式のデータをロードする
このセクションでは、CSV 形式のデータを例にとり、さまざまなパラメータ設定と組み合わせを使用して、多様なロード要件を満たす方法を説明します。
データセットを準備する
例えば、Kafka トピック ordertest1 から CSV 形式のデータをロードしたいとします。データセットの各メッセージには、注文 ID、支払い日、顧客名、国籍、性別、価格の 6 つのカラムが含まれています。
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina,Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924
テーブルを 作成する
CSV 形式のデータのカラムに基づいて、データベース example_db に example_tbl1 という名前のテーブルを作成します。
CREATE TABLE example_db.example_tbl1 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`gender` varchar(26) NULL COMMENT "Gender",
`price` double NULL COMMENT "Price")
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);