すべてのプロダクト
Search
ドキュメントセンター

Tablestore:構成の説明

最終更新日:Dec 28, 2024

Tablestore Sink Connector を起動する前に、Kafka Connect プロセスにパラメータを渡すためのキーと値のペアを指定する必要があります。このトピックでは、Tablestore Sink Connector を構成する方法を示す構成例とパラメータの説明を提供します。

構成例

構成項目は、Kafka から Tablestore のデータテーブルまたは時系列テーブルにデータを同期するかどうかに応じて異なります。構成ファイルの構成例は、動作モードによって異なります。このセクションでは、Kafka から Tablestore のデータテーブルへのデータ同期を構成する方法の例を示します。Tablestore の時系列テーブルにデータを同期するには、Kafka から Tablestore の時系列テーブルへのデータ同期に固有の構成項目を追加する必要があります。

  • 次のサンプルコードは、スタンドアロンモードで Tablestore Sink Connector の .properties 形式の構成ファイルを構成する方法の例を示しています。
    # コネクタ名を指定します。
    name=tablestore-sink
    # コネクタクラスを指定します。
    connector.class=TableStoreSinkConnector
    # タスクの最大数を指定します。
    tasks.max=1
    # データのエクスポート元の Kafka トピックのリストを指定します。
    topics=test
    # 次の Tablestore 接続パラメータの値を指定します。
    # Tablestore インスタンスのエンドポイント。
    tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
    # AccessKey ID と AccessKey シークレットで構成される AccessKey ペア。
    tablestore.access.key.id=xxx
    tablestore.access.key.secret=xxx
    # Tablestore インスタンスの名前。
    tablestore.instance.name=xxx
    
    # 次のデータマッピングパラメータを指定します。
    # Kafka メッセージレコードの解析に使用するパーサーを指定します。
    # Tablestore Sink Connector の DefaultEventParser は、Kafka Connect の Struct クラスと Map クラスをサポートしています。カスタム EventParser を使用することもできます。
    event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser
    
    # エクスポート元のトピックのプレースホルダーとして <topic> を使用して、宛先 Tablestore テーブルの名前のフォーマット文字列を指定します。
    # topics.assign.tables は table.name.format よりも優先されます。topics.assign.tables が指定されている場合、table.name.format の構成は無視されます。
    # たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。
    table.name.format=<topic>
    # Kafka トピックと宛先 Tablestore テーブル間のマッピングを指定します。値は <topic>:<tablename> 形式である必要があります。トピック名とテーブル名はコロン(:)で区切ります。複数のマッピングを指定する場合は、カンマ(,)で区切ります。
    # マッピングが指定されていない場合は、table.name.format の構成が使用されます。
    # topics.assign.tables=test:test_kafka
    
    # プライマリキーモードを指定します。有効な値:kafka、record_key、record_value。デフォルト値:kafka。
    # kafka:<connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。
    # record_key:レコードキーのフィールドがデータテーブルのプライマリキーとして使用されます。
    # record_value:レコード値のフィールドがデータテーブルのプライマリキーとして使用されます。
    primarykey.mode=kafka
    
    # 宛先 Tablestore データテーブルのプライマリキー列の名前とデータ型を指定します。
    # プライマリキー列名の形式は tablestore.<tablename>.primarykey.name です。プライマリキー列のデータ型の形式は tablestore.<tablename>.primarykey.type です。
    # <tablename> はデータテーブル名のプレースホルダーです。
    # プライマリキーモードが kafka の場合、プライマリキー列の名前とデータ型を指定する必要はありません。デフォルトのプライマリキー列名 {"topic_partition","offset"} とデフォルトのデータ型 {string, integer} が使用されます。
    # プライマリキーモードが record_key または record_value の場合、プライマリキー列の名前とデータ型を指定する必要があります。
    # tablestore.test.primarykey.name=A,B
    # tablestore.test.primarykey.type=string,integer
    
    # 属性列のホワイトリストを指定して、レコード値のフィールドをフィルタリングし、必要な属性列を取得します。
    # デフォルトでは、属性列のホワイトリストは空です。レコード値のすべてのフィールドがデータテーブルの属性列として使用されます。
    # 属性列名の形式は tablestore.<tablename>.columns.whitelist.name です。属性列のデータ型の形式は tablestore.<tablename>.columns.whitelist.type です。
    # <tablename> はデータテーブル名のプレースホルダーです。
    # tablestore.test.columns.whitelist.name=A,B
    # tablestore.test.columns.whitelist.type=string,integer
    
    # Kafka メッセージレコードを宛先 Tablestore テーブルに書き込む方法を指定します。
    # 書き込みモードを指定します。有効な値:put と update。デフォルト値:put。
    # put:宛先テーブルのデータは Kafka メッセージレコードによって上書きされます。
    # update:宛先テーブルのデータは Kafka メッセージレコードによって更新されます。
    insert.mode=put
    # データが読み取られた順序でデータを書き込むかどうかを指定します。デフォルト値:true。書き込みパフォーマンスを向上させるために、このオプションを無効にすることができます。
    insert.order.enable=true
    # 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値:false。
    auto.create=false
    
    # 削除モードを指定します。有効な値:none、row、column、row_and_column。デフォルト値:none。
    # none:削除操作は実行できません。
    # row:行を削除できます。
    # column:属性列を削除できます。
    # row_and_column:行と属性列を削除できます。
    delete.mode=none
    
    # データをデータテーブルに書き込むときにメモリ内のバッファキューに含めることができる行の最大数を指定します。デフォルト値:1024。このパラメータの値は 2 の指数である必要があります。
    buffer.size=1024
    # データをデータテーブルに書き込むときに使用されるコールバックスレッドの数を指定します。デフォルト値 = vCPU の数 + 1。
    # max.thread.count=
    # データをデータテーブルに書き込むために送信できる同時書き込みリクエストの最大数を指定します。デフォルト値:10。
    max.concurrency=10
    # データの書き込み先となるバケットの数を指定します。デフォルト値:3。このパラメータの値を増やすと、同時書き込み機能を向上させることができます。ただし、このパラメータの値を指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。
    bucket.count=3
    # データをデータテーブルに書き込むときにバッファキューを更新する間隔を指定します。単位:ミリ秒。デフォルト値:10000。
    flush.Interval=10000
    
    # ダーティデータの処理方法を指定します。
    # Kafka メッセージレコードの解析時またはデータテーブルへの書き込み時にエラーが発生する可能性があります。次の 2 つのパラメータを指定して、エラーの修正方法を決定できます。
    # フォールトトレランス機能を指定します。有効な値:none と all。デフォルト値:none。
    # none:エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。
    # all:エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。
    runtime.error.tolerance=none
    # ダーティデータのログ記録方法を指定します。有効な値:ignore、kafka、tablestore。デフォルト値:ignore。
    # ignore:すべてのエラーが無視されます。
    # kafka:エラーが報告されたメッセージレコードとエラーメッセージは別の Kafka トピックに保存されます。
    # tablestore:エラーが報告されたメッセージレコードとエラーメッセージは別の Tablestore データテーブルに保存されます。
    runtime.error.mode=ignore
    
    # runtime.error.mode を kafka に設定した場合、Kafka クラスタアドレスとトピックを指定する必要があります。
    # runtime.error.bootstrap.servers=localhost:9092
    # runtime.error.topic.name=errors
    
    # runtime.error.mode を tablestore に設定した場合、Tablestore データテーブルの名前を指定する必要があります。
    # runtime.error.table.name=errors
  • 次のサンプルコードは、分散モードで Tablestore Sink Connector の .json 形式の構成ファイルを構成する方法の例を示しています。
    {
      "name": "tablestore-sink",
      "config": {
        // コネクタクラスを指定します。
        "connector.class":"TableStoreSinkConnector",
        // タスクの最大数を指定します。
        "tasks.max":"3",
        // データのエクスポート元の Kafka トピックのリストを指定します。
        "topics":"test",
        // 次の Tablestore 接続パラメータの値を指定します。
        // Tablestore インスタンスのエンドポイント。
        "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
        // AccessKey ID と AccessKey シークレットで構成される AccessKey ペア。
        "tablestore.access.key.id":"xxx",
        "tablestore.access.key.secret":"xxx",
        // Tablestore インスタンスの名前。
        "tablestore.instance.name":"xxx",
    
        // 次のデータマッピングパラメータを指定します。
        // Kafka メッセージレコードの解析に使用するパーサーを指定します。
        // Tablestore Sink Connector の DefaultEventParser は、Kafka Connect の Struct クラスと Map クラスをサポートしています。カスタム EventParser を使用することもできます。
        "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",
    
        // エクスポート元のトピックのプレースホルダーとして <topic> を使用して、宛先 Tablestore テーブルの名前のフォーマット文字列を指定します。
        // topics.assign.tables は table.name.format よりも優先されます。topics.assign.tables が指定されている場合、table.name.format の構成は無視されます。
        // たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。
        "table.name.format":"<topic>",
        // Kafka トピックと宛先 Tablestore テーブル間のマッピングを指定します。値は <topic>:<tablename> 形式である必要があります。トピック名とテーブル名はコロン(:)で区切ります。複数のマッピングを指定する場合は、カンマ(,)で区切ります。
        // マッピングが指定されていない場合は、table.name.format の構成が使用されます。
        // "topics.assign.tables":"test:test_kafka",
    
        // プライマリキーモードを指定します。有効な値:kafka、record_key、record_value。デフォルト値:kafka。
        // kafka:<connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。
        // record_key:レコードキーのフィールドがデータテーブルのプライマリキーとして使用されます。
        // record_value:レコード値のフィールドがデータテーブルのプライマリキーとして使用されます。
        "primarykey.mode":"kafka",
    
        // 宛先 Tablestore データテーブルのプライマリキー列の名前とデータ型を指定します。
        // プライマリキー列名の形式は tablestore.<tablename>.primarykey.name です。プライマリキー列のデータ型の形式は tablestore.<tablename>.primarykey.type です。
        // <tablename> はデータテーブル名のプレースホルダーです。
        // プライマリキーモードが kafka の場合、プライマリキー列の名前とデータ型を指定する必要はありません。デフォルトのプライマリキー列名 {"topic_partition","offset"} とデフォルトのデータ型 {string, integer} が使用されます。
        // プライマリキーモードが record_key または record_value の場合、プライマリキー列の名前とデータ型を指定する必要があります。
        // "tablestore.test.primarykey.name":"A,B",
        // "tablestore.test.primarykey.type":"string,integer",
    
        // 属性列のホワイトリストを指定して、レコード値のフィールドをフィルタリングし、必要な属性列を取得します。
        // デフォルトでは、属性列のホワイトリストは空です。レコード値のすべてのフィールドがデータテーブルの属性列として使用されます。
        // 属性列名の形式は tablestore.<tablename>.columns.whitelist.name です。属性列のデータ型の形式は tablestore.<tablename>.columns.whitelist.type です。
        // <tablename> はデータテーブル名のプレースホルダーです。
        // "tablestore.test.columns.whitelist.name":"A,B",
        // "tablestore.test.columns.whitelist.type":"string,integer",
    
        // Kafka メッセージレコードを宛先 Tablestore テーブルに書き込む方法を指定します。
        // 書き込みモードを指定します。有効な値:put と update。デフォルト値:put。
        // put:テーブルのデータは Kafka メッセージレコードによって上書きされます。
        // update:テーブルのデータは Kafka メッセージレコードによって更新されます。
        "insert.mode":"put",
        // データが読み取られた順序でデータを書き込むかどうかを指定します。デフォルト値:true。書き込みパフォーマンスを向上させるために、このオプションを無効にすることができます。
        "insert.order.enable":"true",
        // 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値:false。
        "auto.create":"false",
    
        // 削除モードを指定します。有効な値:none、row、column、row_and_column。デフォルト値:none。
        // none:削除操作は実行できません。
        // row:行を削除できます。
        // column:属性列を削除できます。
        // row_and_column:行と属性列を削除できます。
        "delete.mode":"none",
    
        // データをデータテーブルに書き込むときにメモリ内のバッファキューに含めることができる行の最大数を指定します。デフォルト値:1024。このパラメータの値は 2 の指数である必要があります。
        "buffer.size":"1024",
        // データをデータテーブルに書き込むときに使用されるコールバックスレッドの数を指定します。デフォルト値 = vCPU の数 + 1。
        // "max.thread.count":
        // データをデータテーブルに書き込むために送信できる同時書き込みリクエストの最大数を指定します。デフォルト値:10。
        "max.concurrency":"10",
        // データの書き込み先となるバケットの数を指定します。デフォルト値:3。このパラメータの値を増やすと、同時書き込み機能を向上させることができます。ただし、このパラメータの値を指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。
        "bucket.count":"3",
        // データをデータテーブルに書き込むときにバッファキューを更新する間隔を指定します。単位:ミリ秒。デフォルト値:10000。
        "flush.Interval":"10000",
    
        // ダーティデータの処理方法を指定します。
        // Kafka メッセージレコードの解析時またはデータテーブルへの書き込み時にエラーが発生する可能性があります。次の 2 つのパラメータを指定して、エラーの修正方法を決定できます。
        // フォールトトレランス機能を指定します。有効な値:none と all。デフォルト値:none。
        // none:エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。
        // all:エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。
        "runtime.error.tolerance":"none",
        // ダーティデータのログ記録方法を指定します。有効な値:ignore、kafka、tablestore。デフォルト値:ignore。
        // ignore:すべてのエラーが無視されます。
        // kafka:エラーが報告されたメッセージレコードとエラーメッセージは別の Kafka トピックに保存されます。
        // tablestore:エラーが報告されたメッセージレコードとエラーメッセージは別の Tablestore データテーブルに保存されます。
        "runtime.error.mode":"ignore"
    
        // runtime.error.mode を kafka に設定した場合、Kafka クラスタアドレスとトピックを指定する必要があります。
        // "runtime.error.bootstrap.servers":"localhost:9092",
        // "runtime.error.topic.name":"errors",
    
        // runtime.error.mode を tablestore に設定した場合、Tablestore データテーブルの名前を指定する必要があります。
        // "runtime.error.table.name":"errors",
      }

パラメータ

次の表は、構成ファイルのパラメータについて説明しています。Kafka から Tablestore の時系列テーブルにデータを同期する場合にのみ、時系列関連のパラメータを構成する必要があります。

tablestore.access.key.secret
カテゴリパラメータタイプ必須説明
Kafka Connect パラメータnamestringはいtablestore-sinkコネクタの名前。コネクタ名は一意である必要があります。
connector.classclassはいTableStoreSinkConnectorコネクタの Java クラス。
コネクタを使用する場合は、connector.class を使用してコネクタクラスを指定します。connector.class には、コネクタクラスのフルネームまたはエイリアスを設定できます。コネクタクラスのフルネームは com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector で、エイリアスは TableStoreSinkConnector です。
connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector
tasks.maxintegerはい3コネクタに対して作成できるタスクの最大数。

タスクの最大数が作成に失敗した場合、作成されるタスクの数が少なくなる可能性があります。

key.converterstringいいえorg.apache.kafka.connect.json.JsonConverterワーカー構成ファイルで指定されたデフォルトのキーコンバーターを置き換えるために使用されるキーコンバーター。
value.converterstringいいえorg.apache.kafka.connect.json.JsonConverterワーカー構成ファイルで指定されたデフォルトの値コンバーターを置き換えるために使用される値コンバーター。
topicslistはいtestコネクタに指定できる Kafka トピックのリスト。複数の Kafka トピックはカンマ(,)で区切ります。

コネクタに指定されたトピックを管理するには、topics を指定する必要があります。

コネクタ接続パラメータtablestore.endpointstringはいhttps://xxx.xxx.ots.aliyuncs.comTablestore インスタンスのエンドポイント。詳細については、エンドポイントをご参照ください。
tablestore.modestringはいtimeseries宛先テーブルのタイプ。デフォルト値:normal。有効な値:
  • normal:Tablestore のデータテーブル。
  • timeseries:Tablestore の時系列テーブル。
tablestore.access.key.idstringはいLTAn********************アカウントの AccessKey ID と AccessKey シークレット。AccessKey ID と AccessKey シークレットの取得方法については、AccessKey ペアの取得をご参照ください。
stringはいzbnK**************************
tablestore.auth.modestringはいaksk認証モード。デフォルト値:aksk。有効な値:
  • aksk:Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレットを使用して認証します。このトピックでは、tablestore.auth.mode は aksk に設定されています。
  • sts:Security Token Service (STS) から取得した一時的なアクセス認証情報を使用して認証します。Tablestore が Message Queue for Apache Kafka に接続されている場合は、tablestore.auth.mode を sts に設定します。
tablestore.instance.namestringはいmyotstestTablestore インスタンスの名前。
コネクタのデータマッピングパラメータevent.parse.classclassはいDefaultEventParserEventParser の Java クラス。デフォルト値:DefaultEventParser。パーサーは Kafka メッセージレコードを解析して、データテーブルのプライマリキー列と属性列を取得します。
重要 Tablestore は列値のサイズに制限を設けています。string 型または binary 型のプライマリキー列の値は 1 KB を超えることはできず、属性列の値は 2 MB を超えることはできません。詳細については、一般的な制限をご参照ください。

データ型の変換後に列値が制限を超えた場合、Kafka メッセージレコードはダーティデータとして処理されます。

DefaultEventParser を使用するには、Kafka メッセージレコードのキーまたは値が Kafka Connect の Struct クラスまたは Map クラスである必要があります。Struct で選択されたフィールドは、Tablestore Sink Connector でサポートされているデータ型である必要があります。フィールドは、データ型マッピングテーブルに基づいて Tablestore データ型のデータに変換され、データテーブルに書き込まれます。Map の値のデータ型は、Tablestore Sink Connector でサポートされているデータ型である必要があります。Tablestore Sink Connector は、Struct と Map で同じデータ型をサポートしています。Map の値は binary 型のデータに変換され、データテーブルに書き込まれます。Kafka と Tablestore 間のデータ型マッピングの詳細については、付録:Kafka と Tablestore 間のデータ型マッピングをご参照ください。

Kafka メッセージレコードのデータ型が Tablestore Sink Connector と互換性がない場合は、com.aliyun.tablestore.kafka.connect.parsers.EventParser で定義されている操作を呼び出してパーサーを構成できます。

table.name.formatstringいいえkafka_<topic>宛先 Tablestore データテーブルの名前のフォーマット文字列。デフォルト値:<topic>。<topic> は、データのエクスポート元のトピックのプレースホルダーとして文字列で使用できます。たとえば、table.name.format が kafka_<topic> に設定され、データのエクスポート元の Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。

topics.assign.tables は table.name.format よりも優先されます。topics.assign.tables が指定されている場合、table.name.format の構成は無視されます。

topics.assign.tableslistはいtest:destTable<topic_1>:<tablename_1>,<topic_2>:<tablename_2> 形式で、トピックと宛先 Tablestore テーブル間のマッピングを指定します。複数のマッピングはカンマ(,)で区切ります。たとえば、test:destTable は、test という名前のトピックからのメッセージレコードが destTable という名前のデータテーブルに書き込まれることを指定します。

topics.assign.tables は table.name.format よりも優先されます。topics.assign.tables が指定されている場合、table.name.format の構成は無視されます。

primarykey.modestringいいえkafkaデータテーブルのプライマリキーモード。有効な値:
  • kafka:<connect_topic>_<connect_partition> と <connect_offset> がデータテーブルのプライマリキーとして使用されます。Kafka トピック <connect_topic> とパーティション <connect_partition> はアンダースコア(_)で区切られ、<connect_offset> はパーティション内のメッセージレコードのオフセットを指定します。
  • record_key:レコードキーの Struct クラスのフィールドまたは Map クラスのキーがデータテーブルのプライマリキーとして使用されます。
  • record_value:レコード値の Struct クラスのフィールドまたは Map クラスのキーがデータテーブルのプライマリキーとして使用されます。

このパラメータは、tablestore.<tablename>.primarykey.name および tablestore.<tablename>.primarykey.type と一緒に構成します。このパラメータの値は大文字と小文字を区別しません。

tablestore.<tablename>.primarykey.namelistいいえA,Bデータテーブルのプライマリキー列名。<tablename> はデータテーブル名のプレースホルダーです。このパラメータの値には、カンマ(,)で区切られた 1 つから 4 つのプライマリキー列名が含まれます。
プライマリキー列名は、プライマリキーモードによって異なります。
  • プライマリキーモードが kafka に設定されている場合、このパラメータのデフォルト値は topic_partition,offset です。kafka プライマリキーモードでは、プライマリキー列名を指定する必要はありません。プライマリキー列名が指定されている場合でも、デフォルトのプライマリキー列名が優先されます。
  • プライマリキーモードが record_key に設定されている場合、指定されたプライマリキー列名と同じ名前を持つ Struct クラスのフィールドまたは Map クラスのキーが、レコードキーからデータテーブルのプライマリキーとして抽出されます。record_key プライマリキーモードでは、プライマリキー列名を指定する必要があります。
  • プライマリキーモードが record_value に設定されている場合、指定されたプライマリキー列名と同じ名前を持つ Struct クラスのフィールドまたは Map クラスのキーが、レコード値からデータテーブルのプライマリキーとして抽出されます。record_value プライマリキーモードでは、プライマリキー列名を指定する必要があります。

Tablestore データテーブルのプライマリキー列は順次です。tablestore.<tablename>.primarykey.name を定義するときは、プライマリキー列の順序に注意する必要があります。たとえば、PRIMARY KEY (A, B, C) と PRIMARY KEY (A, C, B) は異なるスキーマです。

tablestore.<tablename>.primarykey.typelistいいえstring, integerデータテーブルのプライマリキー列のデータ型。<tablename> はデータテーブル名のプレースホルダーです。このパラメータの値には、1 つから 4 つのプライマリキー列のデータ型が含まれます。プライマリキー列のデータ型はカンマ(,)で区切ります。プライマリキー列のデータ型の順序は、tablestore.<tablename>.primarykey.name で指定されたプライマリキー列名の順序に対応している必要があります。このパラメータの値は大文字と小文字を区別しません。有効な値:integer、string、binary、auto_increment。
プライマリキー列のデータ型は、プライマリキーモードによって異なります。
  • プライマリキーモードが kafka に設定されている場合、このパラメータのデフォルト値は string, integer です。

    kafka プライマリキーモードでは、プライマリキー列のデータ型を指定する必要はありません。プライマリキー列のデータ型が指定されている場合でも、デフォルトのプライマリキー列のデータ型が優先されます。

  • プライマリキーモードが record_key または record_value に設定されている場合、プライマリキー列のデータ型を指定する必要があります。

    指定されたプライマリキー列のデータ型が Kafka スキーマで定義されたデータ型と競合する場合、解析エラーが発生します。この場合、ランタイムエラーパラメータを構成してエラーを修正できます。

    このパラメータが auto_increment に設定されている場合、データがデータテーブルに書き込まれるときに、Kafka メッセージレコードのフィールドがデータテーブルに自動インクリメントプライマリキー列として挿入されます。

tablestore.<tablename>.columns.whitelist.namelistいいえA,B属性列ホワイトリストの属性列名。<tablename> はデータテーブル名のプレースホルダーです。属性列名はカンマ(,)で区切ります。

このパラメータを構成しない場合、レコード値の Struct クラスのすべてのフィールドまたは Map クラスのすべてのキーがデータテーブルの属性列として使用されます。このパラメータを構成すると、レコード値のフィールドは指定された属性列ホワイトリストに基づいてフィルタリングされ、必要な属性列が取得されます。

tablestore.<tablename>.columns.whitelist.typelistいいえstring, integer属性列ホワイトリストの属性列のデータ型。<tablename> はデータテーブル名のプレースホルダーです。属性列のデータ型はカンマ(,)で区切ります。属性列のデータ型の順序は、<tablename>.columns.whitelist.name で指定された属性列名の順序に対応している必要があります。このパラメータの値は大文字と小文字を区別しません。有効な値:integer、string、binary、boolean、double。
コネクタ書き込みパラメータinsert.modestringいいえput書き込みモード。デフォルト値:put。有効な値:
  • put:既存のデータは、テーブルに書き込むデータ行によって上書きされます。この値は、Tablestore の PutRow 操作に対応します。
  • update:データ行を更新すると、属性列が行に追加されるか、既存の属性列の値が更新されます。この値は、Tablestore の UpdateRow 操作に対応します。

このパラメータの値は大文字と小文字を区別しません。

insert.order.enablebooleanいいえtrueデータが読み取られた順序でデータテーブルに書き込むかどうかを指定します。デフォルト値:true。有効な値:
  • true:Kafka メッセージレコードは、メッセージレコードが読み取られた順序でデータテーブルに書き込まれます。
  • false:Kafka メッセージレコードは特定の順序なしでデータテーブルに書き込まれます。これにより、書き込みパフォーマンスが向上します。
auto.createbooleanいいえfalse宛先テーブルを自動的に作成するかどうかを指定します。データテーブルまたは時系列テーブルを自動的に作成できます。デフォルト値:false。有効な値:
  • true:システムは宛先 Tablestore テーブルを自動的に作成します。
  • false:システムは宛先 Tablestore テーブルを自動的に作成しません。
delete.modestringいいえnone削除モード。このパラメータの構成は、データがデータテーブルに同期され、プライマリキーモードが record_key に設定されている場合にのみ有効になります。デフォルト値:none。有効な値:
  • none:削除操作は実行できません。
  • row:行を削除できます。レコード値が空の場合、対応する行が削除されます。
  • column:属性列を削除できます。レコード値の Struct クラスのフィールド値または Map クラスのキー値が空の場合、対応する属性列が削除されます。
  • row_and_column:行と属性列を削除できます。

このパラメータの値は大文字と小文字を区別しません。

このパラメータは、insert.mode パラメータの値に基づいて指定されます。詳細については、付録:削除構文をご参照ください。

buffer.sizeintegerいいえ1024データがデータテーブルに書き込まれるときに、メモリ内のバッファキューに含めることができる行の最大数。デフォルト値:1024。このパラメータの値は 2 の指数である必要があります。
max.thread.countintegerいいえ3データがデータテーブルに書き込まれるときに使用されるコールバックスレッドの数。デフォルト値 = vCPU の数 + 1
max.concurrencyintegerいいえ10データテーブルにデータを書き込むために送信できる同時書き込みリクエストの最大数。
bucket.countintegerいいえ3データの書き込み先となるバケットの数。デフォルト値:3。このパラメータの値を増やすと、同時書き込み機能を向上させることができます。ただし、このパラメータの値を、指定した同時書き込みリクエストの最大数よりも大きい値に設定することはできません。
flush.Intervalintegerいいえ10000データがデータテーブルに書き込まれるときに、バッファキューが更新される間隔。単位:ミリ秒。デフォルト値:10000。
コネクタランタイムエラーパラメータruntime.error.tolerancestringいいえnoneKafka メッセージレコードの解析時またはテーブルへの書き込み時にエラーが発生した場合に使用されるエラー処理ポリシー。デフォルト値:none。有効な値:
  • none:エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。
  • all:エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。

このパラメータの値は大文字と小文字を区別しません。

runtime.error.modestringいいえignoreKafka メッセージレコードの解析時またはテーブルへの書き込み時にエラーが報告されたメッセージレコードの処理方法を指定します。デフォルト値:ignore。有効な値:
  • ignore:すべてのエラーが無視されます。
  • kafka:エラーが報告されたメッセージレコードとエラーメッセージは別の Kafka トピックに保存されます。この場合、runtime.error.bootstrap.servers と runtime.error.topic.name を指定する必要があります。新しいトピックのエラーが報告された Kafka メッセージレコードのキーと値は、データのエクスポート元のトピックのメッセージレコードのキーと値と同じです。ErrorInfo フィールドはヘッダーに含まれており、エラーメッセージを記録します。
  • tablestore:エラーが報告されたメッセージレコードとエラーメッセージは別の Tablestore データテーブルに保存されます。この場合、runtime.error.table.name を指定する必要があります。エラーが報告されたメッセージレコードとエラーメッセージを記録するために使用されるデータテーブルのプライマリキー列は、topic_partition (string 型) と offset (integer 型) です。データテーブルの属性列は、key (bytes 型)、value (bytes 型)、error_info (string 型) です。

runtime.error.mode が kafka に設定されている場合は、Kafka メッセージレコードのヘッダー、キー、値をシリアル化する必要があります。runtime.error.mode が tablestore に設定されている場合は、Kafka メッセージレコードのキーと値をシリアル化する必要があります。デフォルトでは、org.apache.kafka.connect.json.JsonConverter がデータのシリアル化に使用され、schemas.enable が true に設定されています。JsonConverter を使用してデータを逆シリアル化し、元のデータを取得できます。Converter の詳細については、Kafka Converter をご参照ください。

runtime.error.bootstrap.serversstringいいえlocalhost:9092エラーが報告されたメッセージレコードとエラーメッセージが保存される Kafka クラスタのアドレス。
runtime.error.topic.namestringいいえerrorsエラーが報告されたメッセージレコードとエラーメッセージを保存する Kafka トピックの名前。
runtime.error.table.namestringいいえerrorsエラーが報告されたメッセージレコードとエラーメッセージを保存する Tablestore テーブルの名前。
時系列関連パラメータtablestore.timeseries.<tablename>.measurementstringはいmNameJSON 形式のデータで指定されたキーに対応する値が、_m_name フィールドの値として時系列テーブルに書き込まれることを指定します。

tablestore.timeseries.<tablename>.measurement が <topic> に設定されている場合、Kafka メッセージレコードの topic キーに対応する値が、_m_name フィールドの値として時系列テーブルに書き込まれます。

パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。たとえば、時系列テーブルの名前が test の場合、パラメータ名は tablestore.timeseries.test.measurement です。

tablestore.timeseries.<tablename>.dataSourcestringはいdsJSON 形式のデータで ds キーに対応する値が、_data_source フィールドの値として時系列テーブルに書き込まれることを指定します。

パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。

tablestore.timeseries.<tablename>.tagslistはいregion,levelJSON 形式のデータで region キーと level キーに対応する値が、tags フィールドの値として時系列テーブルに書き込まれることを指定します。

パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。

tablestore.timeseries.<tablename>.timestringはいtimestampJSON 形式のデータで timestamp キーに対応する値が、_time フィールドの値として時系列テーブルに書き込まれることを指定します。

パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。

tablestore.timeseries.<tablename>.time.unitstringはいMILLISECONDStablestore.timeseries.<tablename>.time パラメータの値の単位。有効な値:SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。

パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。

tablestore.timeseries.<tablename>.field.namelistいいえcpu,ioJSON 形式のデータの cpu キーと io キーが _field_name の名前として時系列テーブルに書き込まれ、JSON 形式のデータの cpu キーと io キーに対応する値が _field_name の値として時系列テーブルに書き込まれることを指定します。

パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。

tablestore.timeseries.<tablename>.field.typestringいいえdouble,integertablestore.timeseries.<tablename>.field.name で指定されたフィールドのデータ型。有効な値:double、integer、string、binary、boolean。複数のデータ型はカンマ(,)で区切ります。

パラメータ内の <tablename> は、時系列テーブル名のプレースホルダーです。ビジネス要件に基づいてパラメータ名を変更します。

tablestore.timeseries.mapAllbooleanいいえfalseJSON 形式のデータのプライマリキーフィールドと時間フィールド以外のフィールドが、フィールドとして時系列テーブルに書き込まれるかどうかを指定します。

tablestore.timeseries.mapAll が false に設定されている場合、tablestore.timeseries.<tablename>.field.name パラメータと tablestore.timeseries.<tablename>.field.type パラメータを構成する必要があります。

tablestore.timeseries.toLowerCasebooleanいいえtrueフィールドのキーが時系列テーブルに書き込まれる前に小文字に変換されるかどうかを指定します。フィールドのキーとは、プライマリキーフィールドまたは時間フィールド以外のキー、または tablestore.timeseries.<tablename>.field.name で指定されたキーです。
tablestore.timeseries.rowsPerBatchintegerいいえ50リクエストで Tablestore に書き込むことができる行の最大数。最大値とデフォルト値は 200 です。

付録:Kafka と Tablestore 間のデータ型マッピング

次の表は、Kafka と Tablestore のデータ型間のマッピングについて説明しています。

Kafka スキーマタイプTablestore データ型
STRINGSTRING
INT8、INT16、INT32、INT64INTEGER
FLOAT32、FLOAT64DOUBLE
BOOLEANBOOLEAN
BYTESBINARY

付録:削除構文

説明 この機能は、Kafka から Tablestore のデータテーブルにデータを同期する場合にのみサポートされます。

次の表は、メッセージレコードに空の値が含まれており、Kafka から Tablestore のデータテーブルにデータが同期されている場合に、書き込みモード (insert.mode) と削除モード (delete.mode) の構成に基づいて Tablestore データテーブルにデータを書き込むために使用されるメソッドについて説明しています。

insert.modeputupdate
delete.modenonerowcolumnrow_and_columnnonerowcolumnrow_and_column
空の値上書き行の削除上書き行の削除ダーティデータ行の削除ダーティデータ行の削除
値のすべてのフィールドが空上書き上書き上書き上書きダーティデータダーティデータ列の削除列の削除
値の一部のフィールドが空上書き上書き上書き上書き空の値を無視空の値を無視列の削除列の削除