Tablestore Sink Connector を起動する前に、Kafka Connect プロセスにパラメータを渡すためのキーと値のペアを指定する必要があります。このトピックでは、Tablestore Sink Connector を構成する方法を示す構成例とパラメータの説明を提供します。
構成例
構成項目は、Kafka から Tablestore のデータテーブルまたは時系列テーブルにデータを同期するかどうかに応じて異なります。構成ファイルの構成例は、動作モードによって異なります。このセクションでは、Kafka から Tablestore のデータテーブルへのデータ同期を構成する方法の例を示します。Tablestore の時系列テーブルにデータを同期するには、Kafka から Tablestore の時系列テーブルへのデータ同期に固有の構成項目を追加する必要があります。
- 次のサンプルコードは、スタンドアロンモードで Tablestore Sink Connector の .properties 形式の構成ファイルを構成する方法の例を示しています。
# コネクタ名を指定します。 name=tablestore-sink # コネクタクラスを指定します。 connector.class=TableStoreSinkConnector # タスクの最大数を指定します。 tasks.max=1 # データのエクスポート元の Kafka トピックのリストを指定します。 topics=test # 次の Tablestore 接続パラメータの値を指定します。 # Tablestore インスタンスのエンドポイント。 tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com # AccessKey ID と AccessKey シークレットで構成される AccessKey ペア。 tablestore.access.key.id=xxx tablestore.access.key.secret=xxx # Tablestore インスタンスの名前。 tablestore.instance.name=xxx # 次のデータマッピングパラメータを指定します。 # Kafka メッセージレコードの解析に使用するパーサーを指定します。 # Tablestore Sink Connector の DefaultEventParser は、Kafka Connect の Struct クラスと Map クラスをサポートしています。カスタム EventParser を使用することもできます。 event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser # エクスポート元のトピックのプレースホルダーとして <topic> を使用して、宛先 Tablestore テーブルの名前のフォーマット文字列を指定します。 # topics.assign.tables は table.name.format よりも優先されます。topics.assign.tables が指定されている場合、table.name.format の構成は無視されます。 # たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。 table.name.format=<topic> # Kafka トピックと宛先 Tablestore テーブル間のマッピングを指定します。値は <topic>:<tablename> 形式である必要があります。トピック名とテーブル名はコロン(:)で区切ります。複数のマッピングを指定する場合は、カンマ(,)で区切ります。 # マッピングが指定されていない場合は、table.name.format の構成が使用されます。 # topics.assign.tables=test:test_kafka # プライマリキーモードを指定します。有効な値:kafka、record_key、record_value。デフォルト値:kafka。 # kafka:<connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。 # record_key:レコードキーのフィールドがデータテーブルのプライマリキーとして使用されます。 # record_value:レコード値のフィールドがデータテーブルのプライマリキーとして使用されます。 primarykey.mode=kafka # 宛先 Tablestore データテーブルのプライマリキー列の名前とデータ型を指定します。 # プライマリキー列名の形式は tablestore.<tablename>.primarykey.name です。プライマリキー列のデータ型の形式は tablestore.<tablename>.primarykey.type です。 # <tablename> はデータテーブル名のプレースホルダーです。 # プライマリキーモードが kafka の場合、プライマリキー列の名前とデータ型を指定する必要はありません。デフォルトのプライマリキー列名 {"topic_partition","offset"} とデフォルトのデータ型 {string, integer} が使用されます。 # プライマリキーモードが record_key または record_value の場合、プライマリキー列の名前とデータ型を指定する必要があります。 # tablestore.test.primarykey.name=A,B # tablestore.test.primarykey.type=string,integer # 属性列のホワイトリストを指定して、レコード値のフィールドをフィルタリングし、必要な属性列を取得します。 # デフォルトでは、属性列のホワイトリストは空です。レコード値のすべてのフィールドがデータテーブルの属性列として使用されます。 # 属性列名の形式は tablestore.<tablename>.columns.whitelist.name です。属性列のデータ型の形式は tablestore.<tablename>.columns.whitelist.type です。 # <tablename> はデータテーブル名のプレースホルダーです。 # tablestore.test.columns.whitelist.name=A,B # tablestore.test.columns.whitelist.type=string,integer # Kafka メッセージレコードを宛先 Tablestore テーブルに書き込む方法を指定します。 # 書き込みモードを指定します。有効な値:put と update。デフォルト値:put。 # put:宛先テーブルのデータは Kafka メッセージレコードによって上書きされます。 # update:宛先テーブルのデータは Kafka メッセージレコードによって更新されます。 insert.mode=put # データが読み取られた順序でデータを書き込むかどうかを指定します。デフォルト値:true。書き込みパフォーマンスを向上させるために、このオプションを無効にすることができます。 insert.order.enable=true # 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値:false。 auto.create=false # 削除モードを指定します。有効な値:none、row、column、row_and_column。デフォルト値:none。 # none:削除操作は実行できません。 # row:行を削除できます。 # column:属性列を削除できます。 # row_and_column:行と属性列を削除できます。 delete.mode=none # データをデータテーブルに書き込むときにメモリ内のバッファキューに含めることができる行の最大数を指定します。デフォルト値:1024。このパラメータの値は 2 の指数である必要があります。 buffer.size=1024 # データをデータテーブルに書き込むときに使用されるコールバックスレッドの数を指定します。デフォルト値 = vCPU の数 + 1。 # max.thread.count= # データをデータテーブルに書き込むために送信できる同時書き込みリクエストの最大数を指定します。デフォルト値:10。 max.concurrency=10 # データの書き込み先となるバケットの数を指定します。デフォルト値:3。このパラメータの値を増やすと、同時書き込み機能を向上させることができます。ただし、このパラメータの値を指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。 bucket.count=3 # データをデータテーブルに書き込むときにバッファキューを更新する間隔を指定します。単位:ミリ秒。デフォルト値:10000。 flush.Interval=10000 # ダーティデータの処理方法を指定します。 # Kafka メッセージレコードの解析時またはデータテーブルへの書き込み時にエラーが発生する可能性があります。次の 2 つのパラメータを指定して、エラーの修正方法を決定できます。 # フォールトトレランス機能を指定します。有効な値:none と all。デフォルト値:none。 # none:エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。 # all:エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。 runtime.error.tolerance=none # ダーティデータのログ記録方法を指定します。有効な値:ignore、kafka、tablestore。デフォルト値:ignore。 # ignore:すべてのエラーが無視されます。 # kafka:エラーが報告されたメッセージレコードとエラーメッセージは別の Kafka トピックに保存されます。 # tablestore:エラーが報告されたメッセージレコードとエラーメッセージは別の Tablestore データテーブルに保存されます。 runtime.error.mode=ignore # runtime.error.mode を kafka に設定した場合、Kafka クラスタアドレスとトピックを指定する必要があります。 # runtime.error.bootstrap.servers=localhost:9092 # runtime.error.topic.name=errors # runtime.error.mode を tablestore に設定した場合、Tablestore データテーブルの名前を指定する必要があります。 # runtime.error.table.name=errors
- 次のサンプルコードは、分散モードで Tablestore Sink Connector の .json 形式の構成ファイルを構成する方法の例を示しています。
{ "name": "tablestore-sink", "config": { // コネクタクラスを指定します。 "connector.class":"TableStoreSinkConnector", // タスクの最大数を指定します。 "tasks.max":"3", // データのエクスポート元の Kafka トピックのリストを指定します。 "topics":"test", // 次の Tablestore 接続パラメータの値を指定します。 // Tablestore インスタンスのエンドポイント。 "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com", // AccessKey ID と AccessKey シークレットで構成される AccessKey ペア。 "tablestore.access.key.id":"xxx", "tablestore.access.key.secret":"xxx", // Tablestore インスタンスの名前。 "tablestore.instance.name":"xxx", // 次のデータマッピングパラメータを指定します。 // Kafka メッセージレコードの解析に使用するパーサーを指定します。 // Tablestore Sink Connector の DefaultEventParser は、Kafka Connect の Struct クラスと Map クラスをサポートしています。カスタム EventParser を使用することもできます。 "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser", // エクスポート元のトピックのプレースホルダーとして <topic> を使用して、宛先 Tablestore テーブルの名前のフォーマット文字列を指定します。 // topics.assign.tables は table.name.format よりも優先されます。topics.assign.tables が指定されている場合、table.name.format の構成は無視されます。 // たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。 "table.name.format":"<topic>", // Kafka トピックと宛先 Tablestore テーブル間のマッピングを指定します。値は <topic>:<tablename> 形式である必要があります。トピック名とテーブル名はコロン(:)で区切ります。複数のマッピングを指定する場合は、カンマ(,)で区切ります。 // マッピングが指定されていない場合は、table.name.format の構成が使用されます。 // "topics.assign.tables":"test:test_kafka", // プライマリキーモードを指定します。有効な値:kafka、record_key、record_value。デフォルト値:kafka。 // kafka:<connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。 // record_key:レコードキーのフィールドがデータテーブルのプライマリキーとして使用されます。 // record_value:レコード値のフィールドがデータテーブルのプライマリキーとして使用されます。 "primarykey.mode":"kafka", // 宛先 Tablestore データテーブルのプライマリキー列の名前とデータ型を指定します。 // プライマリキー列名の形式は tablestore.<tablename>.primarykey.name です。プライマリキー列のデータ型の形式は tablestore.<tablename>.primarykey.type です。 // <tablename> はデータテーブル名のプレースホルダーです。 // プライマリキーモードが kafka の場合、プライマリキー列の名前とデータ型を指定する必要はありません。デフォルトのプライマリキー列名 {"topic_partition","offset"} とデフォルトのデータ型 {string, integer} が使用されます。 // プライマリキーモードが record_key または record_value の場合、プライマリキー列の名前とデータ型を指定する必要があります。 // "tablestore.test.primarykey.name":"A,B", // "tablestore.test.primarykey.type":"string,integer", // 属性列のホワイトリストを指定して、レコード値のフィールドをフィルタリングし、必要な属性列を取得します。 // デフォルトでは、属性列のホワイトリストは空です。レコード値のすべてのフィールドがデータテーブルの属性列として使用されます。 // 属性列名の形式は tablestore.<tablename>.columns.whitelist.name です。属性列のデータ型の形式は tablestore.<tablename>.columns.whitelist.type です。 // <tablename> はデータテーブル名のプレースホルダーです。 // "tablestore.test.columns.whitelist.name":"A,B", // "tablestore.test.columns.whitelist.type":"string,integer", // Kafka メッセージレコードを宛先 Tablestore テーブルに書き込む方法を指定します。 // 書き込みモードを指定します。有効な値:put と update。デフォルト値:put。 // put:テーブルのデータは Kafka メッセージレコードによって上書きされます。 // update:テーブルのデータは Kafka メッセージレコードによって更新されます。 "insert.mode":"put", // データが読み取られた順序でデータを書き込むかどうかを指定します。デフォルト値:true。書き込みパフォーマンスを向上させるために、このオプションを無効にすることができます。 "insert.order.enable":"true", // 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値:false。 "auto.create":"false", // 削除モードを指定します。有効な値:none、row、column、row_and_column。デフォルト値:none。 // none:削除操作は実行できません。 // row:行を削除できます。 // column:属性列を削除できます。 // row_and_column:行と属性列を削除できます。 "delete.mode":"none", // データをデータテーブルに書き込むときにメモリ内のバッファキューに含めることができる行の最大数を指定します。デフォルト値:1024。このパラメータの値は 2 の指数である必要があります。 "buffer.size":"1024", // データをデータテーブルに書き込むときに使用されるコールバックスレッドの数を指定します。デフォルト値 = vCPU の数 + 1。 // "max.thread.count": // データをデータテーブルに書き込むために送信できる同時書き込みリクエストの最大数を指定します。デフォルト値:10。 "max.concurrency":"10", // データの書き込み先となるバケットの数を指定します。デフォルト値:3。このパラメータの値を増やすと、同時書き込み機能を向上させることができます。ただし、このパラメータの値を指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。 "bucket.count":"3", // データをデータテーブルに書き込むときにバッファキューを更新する間隔を指定します。単位:ミリ秒。デフォルト値:10000。 "flush.Interval":"10000", // ダーティデータの処理方法を指定します。 // Kafka メッセージレコードの解析時またはデータテーブルへの書き込み時にエラーが発生する可能性があります。次の 2 つのパラメータを指定して、エラーの修正方法を決定できます。 // フォールトトレランス機能を指定します。有効な値:none と all。デフォルト値:none。 // none:エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。 // all:エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。 "runtime.error.tolerance":"none", // ダーティデータのログ記録方法を指定します。有効な値:ignore、kafka、tablestore。デフォルト値:ignore。 // ignore:すべてのエラーが無視されます。 // kafka:エラーが報告されたメッセージレコードとエラーメッセージは別の Kafka トピックに保存されます。 // tablestore:エラーが報告されたメッセージレコードとエラーメッセージは別の Tablestore データテーブルに保存されます。 "runtime.error.mode":"ignore" // runtime.error.mode を kafka に設定した場合、Kafka クラスタアドレスとトピックを指定する必要があります。 // "runtime.error.bootstrap.servers":"localhost:9092", // "runtime.error.topic.name":"errors", // runtime.error.mode を tablestore に設定した場合、Tablestore データテーブルの名前を指定する必要があります。 // "runtime.error.table.name":"errors", }
パラメータ
次の表は、構成ファイルのパラメータについて説明しています。Kafka から Tablestore の時系列テーブルにデータを同期する場合にのみ、時系列関連のパラメータを構成する必要があります。
カテゴリ | パラメータ | タイプ | 必須 | 例 | 説明 |
Kafka Connect パラメータ | name | string | はい | tablestore-sink | コネクタの名前。コネクタ名は一意である必要があります。 |
connector.class | class | はい | TableStoreSinkConnector | コネクタの Java クラス。 コネクタを使用する場合は、connector.class を使用してコネクタクラスを指定します。connector.class には、コネクタクラスのフルネームまたはエイリアスを設定できます。コネクタクラスのフルネームは com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector で、エイリアスは TableStoreSinkConnector です。
| |
tasks.max | integer | はい | 3 | コネクタに対して作成できるタスクの最大数。 タスクの最大数が作成に失敗した場合、作成されるタスクの数が少なくなる可能性があります。 | |
key.converter | string | いいえ | org.apache.kafka.connect.json.JsonConverter | ワーカー構成ファイルで指定されたデフォルトのキーコンバーターを置き換えるために使用されるキーコンバーター。 | |
value.converter | string | いいえ | org.apache.kafka.connect.json.JsonConverter | ワーカー構成ファイルで指定されたデフォルトの値コンバーターを置き換えるために使用される値コンバーター。 | |
topics | list | はい | test | コネクタに指定できる Kafka トピックのリスト。複数の Kafka トピックはカンマ(,)で区切ります。 コネクタに指定されたトピックを管理するには、topics を指定する必要があります。 | |
コネクタ接続パラメータ | tablestore.endpoint | string | はい | https://xxx.xxx.ots.aliyuncs.com | Tablestore インスタンスのエンドポイント。詳細については、エンドポイントをご参照ください。 |
tablestore.mode | string | はい | timeseries | 宛先テーブルのタイプ。デフォルト値:normal。有効な値:
| |
tablestore.access.key.id | string | はい | LTAn******************** | アカウントの AccessKey ID と AccessKey シークレット。AccessKey ID と AccessKey シークレットの取得方法については、AccessKey ペアの取得をご参照ください。 | |
string | はい | zbnK************************** | |||
tablestore.auth.mode | string | はい | aksk | 認証モード。デフォルト値:aksk。有効な値:
| |
tablestore.instance.name | string | はい | myotstest | Tablestore インスタンスの名前。 | |
コネクタのデータマッピングパラメータ | event.parse.class | class | はい | DefaultEventParser | EventParser の Java クラス。デフォルト値:DefaultEventParser。パーサーは Kafka メッセージレコードを解析して、データテーブルのプライマリキー列と属性列を取得します。 重要 Tablestore は列値のサイズに制限を設けています。string 型または binary 型のプライマリキー列の値は 1 KB を超えることはできず、属性列の値は 2 MB を超えることはできません。詳細については、一般的な制限をご参照ください。 データ型の変換後に列値が制限を超えた場合、Kafka メッセージレコードはダーティデータとして処理されます。 DefaultEventParser を使用するには、Kafka メッセージレコードのキーまたは値が Kafka Connect の Struct クラスまたは Map クラスである必要があります。Struct で選択されたフィールドは、Tablestore Sink Connector でサポートされているデータ型である必要があります。フィールドは、データ型マッピングテーブルに基づいて Tablestore データ型のデータに変換され、データテーブルに書き込まれます。Map の値のデータ型は、Tablestore Sink Connector でサポートされているデータ型である必要があります。Tablestore Sink Connector は、Struct と Map で同じデータ型をサポートしています。Map の値は binary 型のデータに変換され、データテーブルに書き込まれます。Kafka と Tablestore 間のデータ型マッピングの詳細については、付録:Kafka と Tablestore 間のデータ型マッピングをご参照ください。 Kafka メッセージレコードのデータ型が Tablestore Sink Connector と互換性がない場合は、com.aliyun.tablestore.kafka.connect.parsers.EventParser で定義されている操作を呼び出してパーサーを構成できます。 |
table.name.format | string | いいえ | kafka_<topic> | 宛先 Tablestore データテーブルの名前のフォーマット文字列。デフォルト値:<topic>。<topic> は、データのエクスポート元のトピックのプレースホルダーとして文字列で使用できます。たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。 topics.assign.tables は table.name.format よりも優先されます。topics.assign.tables が指定されている場合、table.name.format の構成は無視されます。 | |
topics.assign.tables | list | はい | test:destTable | <topic_1>:<tablename_1>,<topic_2>:<tablename_2> 形式で、トピックと宛先 Tablestore テーブル間のマッピングを指定します。複数のマッピングはカンマ(,)で区切ります。たとえば、test:destTable は、test という名前のトピックからのメッセージレコードが destTable という名前のデータテーブルに書き込まれることを指定します。topics.assign.tables は table.name.format よりも優先されます。topics.assign.tables が指定されている場合、table.name.format の構成は無視されます。 | |
primarykey.mode | string | いいえ | kafka | データテーブルのプライマリキーモード。有効な値:
このパラメータは、tablestore.<tablename>.primarykey.name および tablestore.<tablename>.primarykey.type と一緒に構成します。このパラメータの値は大文字と小文字を区別しません。 | |
tablestore.<tablename>.primarykey.name | list | いいえ | A,B | データテーブルのプライマリキー列名。<tablename> はデータテーブル名のプレースホルダーです。このパラメータの値には、カンマ(,)で区切られた 1 つから 4 つのプライマリキー列名が含まれます。 プライマリキー列名は、プライマリキーモードによって異なります。
Tablestore データテーブルのプライマリキー列は順次です。tablestore.<tablename>.primarykey.name を定義するときは、プライマリキー列の順序に注意する必要があります。たとえば、PRIMARY KEY (A, B, C) と PRIMARY KEY (A, C, B) は異なるスキーマです。 | |
tablestore.<tablename>.primarykey.type | list | いいえ | string, integer | データテーブルのプライマリキー列のデータ型。<tablename> はデータテーブル名のプレースホルダーです。このパラメータの値には、1 つから 4 つのプライマリキー列のデータ型が含まれます。プライマリキー列のデータ型はカンマ(,)で区切ります。プライマリキー列のデータ型の順序は、tablestore.<tablename>.primarykey.name で指定されたプライマリキー列名の順序に対応している必要があります。このパラメータの値は大文字と小文字を区別しません。有効な値:integer、string、binary、auto_increment。 プライマリキー列のデータ型は、プライマリキーモードによって異なります。
| |
tablestore.<tablename>.columns.whitelist.name | list | いいえ | A,B | 属性列ホワイトリストの属性列名。<tablename> はデータテーブル名のプレースホルダーです。属性列名はカンマ(,)で区切ります。 このパラメータを構成しない場合、レコード値の Struct クラスのすべてのフィールドまたは Map クラスのすべてのキーがデータテーブルの属性列として使用されます。このパラメータを構成すると、レコード値のフィールドは指定された属性列ホワイトリストに基づいてフィルタリングされ、必要な属性列が取得されます。 | |
tablestore.<tablename>.columns.whitelist.type | list | いいえ | string, integer | 属性列ホワイトリストの属性列のデータ型。<tablename> はデータテーブル名のプレースホルダーです。属性列のデータ型はカンマ(,)で区切ります。属性列のデータ型の順序は、<tablename>.columns.whitelist.name で指定された属性列名の順序に対応している必要があります。このパラメータの値は大文字と小文字を区別しません。有効な値:integer、string、binary、boolean、double。 | |
コネクタ書き込みパラメータ | insert.mode | string | いいえ | put | 書き込みモード。デフォルト値:put。有効な値:
このパラメータの値は大文字と小文字を区別しません。 |
insert.order.enable | boolean | いいえ | true | データが読み取られた順序でデータテーブルに書き込むかどうかを指定します。デフォルト値:true。有効な値:
| |
auto.create | boolean | いいえ | false | 宛先テーブルを自動的に作成するかどうかを指定します。データテーブルまたは時系列テーブルを自動的に作成できます。デフォルト値:false。有効な値:
| |
delete.mode | string | いいえ | none | 削除モード。このパラメータの構成は、データがデータテーブルに同期され、プライマリキーモードが record_key に設定されている場合にのみ有効になります。デフォルト値:none。有効な値:
このパラメータの値は大文字と小文字を区別しません。 このパラメータは、insert.mode パラメータの値に基づいて指定されます。詳細については、付録:削除構文をご参照ください。 | |
buffer.size | integer | いいえ | 1024 | データがデータテーブルに書き込まれるときに、メモリ内のバッファキューに含めることができる行の最大数。デフォルト値:1024。このパラメータの値は 2 の指数である必要があります。 | |
max.thread.count | integer | いいえ | 3 | データがデータテーブルに書き込まれるときに使用されるコールバックスレッドの数。デフォルト値 = vCPU の数 + 1 。 | |
max.concurrency | integer | いいえ | 10 | データテーブルにデータを書き込むために送信できる同時書き込みリクエストの最大数。 | |
bucket.count | integer | いいえ | 3 | データの書き込み先となるバケットの数。デフォルト値:3。このパラメータの値を増やすと、同時書き込み機能を向上させることができます。ただし、このパラメータの値を、指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。 | |
flush.Interval | integer | いいえ | 10000 | データがデータテーブルに書き込まれるときに、バッファキューが更新される間隔。単位:ミリ秒。デフォルト値:10000。 | |
コネクタランタイムエラーパラメータ | runtime.error.tolerance | string | いいえ | none | Kafka メッセージレコードの解析時またはテーブルへの書き込み時にエラーが発生した場合に使用されるエラー処理ポリシー。デフォルト値:none。有効な値:
このパラメータの値は大文字と小文字を区別しません。 |
runtime.error.mode | string | いいえ | ignore | Kafka メッセージレコードの解析時またはテーブルへの書き込み時にエラーが報告されたメッセージレコードの処理方法を指定します。デフォルト値:ignore。有効な値:
runtime.error.mode が kafka に設定されている場合は、Kafka メッセージレコードのヘッダー、キー、値をシリアル化する必要があります。runtime.error.mode が tablestore に設定されている場合は、Kafka メッセージレコードのキーと値をシリアル化する必要があります。デフォルトでは、org.apache.kafka.connect.json.JsonConverter がデータのシリアル化に使用され、schemas.enable が true に設定されています。JsonConverter を使用してデータを逆シリアル化し、元のデータを取得できます。Converter の詳細については、Kafka Converter をご参照ください。 | |
runtime.error.bootstrap.servers | string | いいえ | localhost:9092 | エラーが報告されたメッセージレコードとエラーメッセージが保存される Kafka クラスタのアドレス。 | |
runtime.error.topic.name | string | いいえ | errors | エラーが報告されたメッセージレコードとエラーメッセージを保存する Kafka トピックの名前。 | |
runtime.error.table.name | string | いいえ | errors | エラーが報告されたメッセージレコードとエラーメッセージを保存する Tablestore テーブルの名前。 | |
時系列関連パラメータ | tablestore.timeseries.<tablename>.measurement | string | はい | mName | JSON 形式のデータで指定されたキーに対応する値が、_m_name フィールドの値として時系列テーブルに書き込まれることを指定します。 tablestore.timeseries.<tablename>.measurement が <topic> に設定されている場合、Kafka メッセージレコードの topic キーに対応する値が、_m_name フィールドの値として時系列テーブルに書き込まれます。 パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。たとえば、時系列テーブルの名前が test の場合、パラメータ名は tablestore.timeseries.test.measurement です。 |
tablestore.timeseries.<tablename>.dataSource | string | はい | ds | JSON 形式のデータで ds キーに対応する値が、_data_source フィールドの値として時系列テーブルに書き込まれることを指定します。 パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。 | |
tablestore.timeseries.<tablename>.tags | list | はい | region,level | JSON 形式のデータで region キーと level キーに対応する値が、tags フィールドの値として時系列テーブルに書き込まれることを指定します。 パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。 | |
tablestore.timeseries.<tablename>.time | string | はい | timestamp | JSON 形式のデータで timestamp キーに対応する値が、_time フィールドの値として時系列テーブルに書き込まれることを指定します。 パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。 | |
tablestore.timeseries.<tablename>.time.unit | string | はい | MILLISECONDS | tablestore.timeseries.<tablename>.time パラメータの値の単位。有効な値:SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。 パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。 | |
tablestore.timeseries.<tablename>.field.name | list | いいえ | cpu,io | JSON 形式のデータの cpu キーと io キーが _field_name の名前として時系列テーブルに書き込まれ、JSON 形式のデータの cpu キーと io キーに対応する値が _field_name の値として時系列テーブルに書き込まれることを指定します。 パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。 | |
tablestore.timeseries.<tablename>.field.type | string | いいえ | double,integer | tablestore.timeseries.<tablename>.field.name で指定されたフィールドのデータ型。有効な値:double、integer、string、binary、boolean。複数のデータ型はカンマ(,)で区切ります。 パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。 | |
tablestore.timeseries.mapAll | boolean | いいえ | false | JSON 形式のデータのプライマリキーフィールドと時間フィールド以外のフィールドが、フィールドとして時系列テーブルに書き込まれるかどうかを指定します。 tablestore.timeseries.mapAll が false に設定されている場合、tablestore.timeseries.<tablename>.field.name パラメータと tablestore.timeseries.<tablename>.field.type パラメータを構成する必要があります。 | |
tablestore.timeseries.toLowerCase | boolean | いいえ | true | フィールドのキーが時系列テーブルに書き込まれる前に小文字に変換されるかどうかを指定します。フィールドのキーとは、プライマリキーフィールドまたは時間フィールド以外のキー、または tablestore.timeseries.<tablename>.field.name で指定されたキーです。 | |
tablestore.timeseries.rowsPerBatch | integer | いいえ | 50 | リクエストで Tablestore に書き込むことができる行の最大数。最大値とデフォルト値は 200 です。 |
付録:Kafka と Tablestore 間のデータ型マッピング
次の表は、Kafka と Tablestore のデータ型間のマッピングについて説明しています。
Kafka スキーマタイプ | Tablestore データ型 |
STRING | STRING |
INT8、INT16、INT32、INT64 | INTEGER |
FLOAT32、FLOAT64 | DOUBLE |
BOOLEAN | BOOLEAN |
BYTES | BINARY |
付録:削除構文
次の表は、メッセージレコードに空の値が含まれており、Kafka から Tablestore のデータテーブルにデータが同期されている場合に、書き込みモード (insert.mode) と削除モード (delete.mode) の構成に基づいて Tablestore データテーブルにデータを書き込むために使用されるメソッドについて説明しています。
insert.mode | put | update | ||||||
delete.mode | none | row | column | row_and_column | none | row | column | row_and_column |
空の値 | 上書き | 行の削除 | 上書き | 行の削除 | ダーティデータ | 行の削除 | ダーティデータ | 行の削除 |
値のすべてのフィールドが空 | 上書き | 上書き | 上書き | 上書き | ダーティデータ | ダーティデータ | 列の削除 | 列の削除 |
値の一部のフィールドが空 | 上書き | 上書き | 上書き | 上書き | 空の値を無視 | 空の値を無視 | 列の削除 | 列の削除 |