Tablestore Sink Connectorは、subscribeされたトピックに基づいて、Apache Kafkaからpollモードでメッセージレコードを取得し、メッセージレコードを解析し、データをTablestoreのデータテーブルにバッチインポートします。
前提条件
Apache Kafkaがインストールされ、有効になっており、ZooKeeperが有効になっていること。詳細は、Kafkaドキュメントをご参照ください。
Tablestoreサービスがアクティブ化され、インスタンスとデータテーブルが作成されていること。詳細は、Tablestoreの使用をご参照ください。
説明Tablestore Sink Connectorを使用して、宛先データテーブルを自動的に作成することもできます。このデータテーブルを作成するには、auto.createをtrueに設定します。
AccessKeyペアを取得していること。詳細は、AccessKeyペアの取得をご参照ください。
手順1: Tablestore Sink Connectorをデプロイする
次のいずれかの方法でTablestore Sink Connectorパッケージを取得します。
GitHubのTablestore Sink Connectorソースコードからソースコードをダウンロードし、ソースコードをコンパイルします。
Gitツールを使用して、次のコマンドを実行してTablestore Sink Connectorのソースコードをダウンロードします。
git clone https://github.com/aliyun/kafka-connect-tablestore.git
ダウンロードしたソースコードが保存されているディレクトリに移動し、Mavenを使用して次のコマンドを実行してソースコードをパッケージ化します。
mvn clean package -DskipTests
コンパイルが完了すると、生成されたパッケージはtargetディレクトリに保存されます。kafka-connect-tablestore-1.0.jarパッケージを例として使用します。
コンパイル済みのkafka-connect-tablestoreパッケージをダウンロードします。
各ノードの$KAFKA_HOME/libsディレクトリにパッケージをコピーします。
手順2: Tablestore Sink Connectorを起動する
Tablestore Sink Connectorは、スタンドアロンモードまたは分散モードで動作できます。ビジネス要件に基づいてモードを選択できます。
Tablestore Sink Connectorをスタンドアロンモードで使用するには、次の手順を実行します。
要件に基づいて、ワーカー構成ファイルconnect-standalone.propertiesとコネクタ構成ファイルconnect-tablestore-sink-quickstart.propertiesを変更します。
ワーカー構成ファイルconnect-standalone.propertiesの変更例
ワーカー構成ファイルには、構成項目が含まれています。これらの項目には、Kafka接続パラメーター、シリアル化形式、およびオフセットがコミットされる頻度が含まれます。次のサンプルコードは、Apache Kafkaが提供するワーカー構成ファイルの変更例です。詳細は、Kafka Connectをご参照ください。
# Apache Software Foundation (ASF) により、1 つ以上の # コントリビューターライセンス契約に基づいてライセンスされています。著作権所有権に関する追加情報は、 # この作品とともに配布されている NOTICE ファイルを参照してください。 # ASF は、Apache License, Version 2.0(以下「ライセンス」)に基づいてこのファイルをライセンスします。 # ライセンスに準拠する場合を除き、このファイルを使用することはできません。 # ライセンスのコピーは、次の場所で入手できます。 # # http://www.apache.org/licenses/LICENSE-2.0 # # 適用法で要求されている場合、または書面で合意されている場合を除き、 # ライセンスに基づいて配布されるソフトウェアは、「現状有姿」で配布され、 # 明示的または黙示的な保証や条件は一切提供されません。 # ライセンスに基づく許可と制限事項については、ライセンスを参照してください。 # これらはデフォルト値です。このファイルは、一部の設定をオーバーライドする方法を示しています。 bootstrap.servers=localhost:9092 # コンバーターは、Kafka のデータ形式と、Connect データへの変換方法を指定します。すべての Connect ユーザーは、 # Kafka からロードまたは Kafka に保存するときに必要なデータ形式に基づいて、これらを構成する必要があります。 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # コンバーター固有の設定は、コンバーターの設定に適用するコンバーターをプレフィックスとして付けることで渡すことができます。 key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets # 通常よりもはるかに高速にフラッシュします。これは、テスト/デバッグに役立ちます。 offset.flush.interval.ms=10000 # プラグインのクラスロード分離を有効にするには、コンマ (,) で区切られたファイルシステムパスのリストに設定します。 # (コネクタ、コンバーター、変換)。リストは、次のいずれかの組み合わせを含むトップレベルディレクトリで構成する必要があります。 # a) プラグインとその依存関係を含む jar を直接含むディレクトリ # b) プラグインとその依存関係を含む uber-jar # c) プラグインとその依存関係のクラスのパッケージディレクトリ構造を直接含むディレクトリ # 注: シンボリックリンクは、依存関係またはプラグインを検出するために追われます。 # 例: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, #plugin.path=
コネクタ構成ファイルconnect-tablestore-sink-quickstart.propertiesの変更例
コネクタ構成ファイルには、構成項目が含まれています。これらの項目には、コネクタクラス、Tablestore接続パラメーター、およびデータマッピングが含まれます。詳細は、構成の説明をご参照ください。
# コネクタ名を指定します。 name=tablestore-sink # コネクタクラスを指定します。 connector.class=TableStoreSinkConnector # タスクの最大数を指定します。 tasks.max=1 # データをエクスポートする Kafka トピックのリストを指定します。 topics=test # 次の Tablestore 接続パラメーターの値を指定します。 # Tablestore インスタンスのエンドポイント。 tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com # AccessKey ID と AccessKey シークレットで構成される AccessKey ペア。 tablestore.access.key.id =xxx tablestore.access.key.secret=xxx # Tablestore インスタンスの名前。 tablestore.instance.name=xxx # Tablestore の宛先テーブルの名前の形式文字列を指定します。<topic> は、データをエクスポートするトピックのプレースホルダーです。デフォルト値: <topic>。 # 例: # table.name.format=kafka_<topic> を指定すると、test という名前のトピックからのメッセージレコードは kafka_test という名前のデータテーブルに書き込まれます。 # table.name.format= # プライマリキーモードを指定します。デフォルト値: kafka。 # プライマリキーモードが kafka に設定されている場合、<topic>_<partition> と <offset> が Tablestore データテーブルのプライマリキーとして使用されます。<topic>_<partition> は、アンダースコア (_) で区切られた Kafka トピックとパーティションを指定します。<offset> は、パーティション内のメッセージレコードのオフセットを指定します。 # primarykey.mode= # 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値: false。 auto.create=true
$KAFKA_HOME ディレクトリに移動し、次のコマンドを実行してスタンドアロンモードを有効にします。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-tablestore-sink-quickstart.properties
Tablestore Sink Connectorを分散モードで使用するには、次の手順を実行します。
ビジネス要件に基づいて、ワーカー構成ファイル connect-distributed.properties を変更します。
ワーカー構成ファイルには、構成項目が含まれています。これらの項目には、Kafka 接続パラメーター、シリアル化形式、オフセットがコミットされる頻度、およびコネクタ情報を格納するトピックが含まれます。トピックを事前に作成することをお勧めします。次のサンプルコードは、Apache Kafka が提供するワーカー構成ファイルの変更例です。詳細は、Kafka Connect をご参照ください。
offset.storage.topic: コネクタオフセットが格納されるコンパクトトピックを指定します。
config.storage.topic: コネクタとタスクの構成が格納されるコンパクトトピックを指定します。コンパクトトピックのパーティション数は 1 に設定する必要があります。
status.storage.topic: Kafka Connect に関するステータス情報が格納されるコンパクトトピックを指定します。
## # Apache Software Foundation (ASF) により、1 つ以上の # コントリビューターライセンス契約に基づいてライセンスされています。著作権所有権に関する追加情報は、 # この作品とともに配布されている NOTICE ファイルを参照してください。 # ASF は、Apache License, Version 2.0(以下「ライセンス」)に基づいてこのファイルをライセンスします。 # ライセンスに準拠する場合を除き、このファイルを使用することはできません。 # ライセンスのコピーは、次の場所で入手できます。 # # http://www.apache.org/licenses/LICENSE-2.0 # # 適用法で要求されている場合、または書面で合意されている場合を除き、 # ライセンスに基づいて配布されるソフトウェアは、「現状有姿」で配布され、 # 明示的または黙示的な保証や条件は一切提供されません。 # ライセンスに基づく許可と制限事項については、ライセンスを参照してください。 ## # このファイルには、Kafka Connect 分散ワーカーの構成の一部が含まれています。このファイルは、 # 例とともに使用するように設計されており、一部の設定は本番システムで使用される設定と異なる場合があります。特に、 # `bootstrap.servers` とレプリケーション係数を指定する設定は異なります。 # Kafka クラスターへの初期接続を確立するために使用するホスト/ポートペアのリスト。 bootstrap.servers=localhost:9092 # クラスターの一意の名前。Connect クラスターグループの形成に使用されます。これは、コンシューマーグループ ID と競合してはなりません。 group.id=connect-cluster # コンバーターは、Kafka のデータ形式と、Connect データへの変換方法を指定します。すべての Connect ユーザーは、 # Kafka からロードまたは Kafka に保存するときに必要なデータ形式に基づいて、これらを構成する必要があります。 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # コンバーター固有の設定は、コンバーターの設定に適用するコンバーターをプレフィックスとして付けることで渡すことができます。 key.converter.schemas.enable=true value.converter.schemas.enable=true # オフセットを格納するために使用するトピック。このトピックには多くのパーティションがあり、レプリケートおよび圧縮されている必要があります。 # Kafka Connect は、必要に応じてトピックを自動的に作成しようとしますが、特定のトピック構成が必要な場合は、 # Kafka Connect を起動する前にいつでも手動でトピックを作成できます。 # ほとんどのユーザーは、組み込みのデフォルトレプリケーション係数 3 を使用するか、場合によってはさらに大きい値を指定します。 # これは、最大レプリケーション係数として使用される数以上のブローカーが必要であることを意味するため、 # この例を単一ブローカクラスターで実行できるように、ここではレプリケーション係数を 1 に設定します。 offset.storage.topic=connect-offsets offset.storage.replication.factor=1 #offset.storage.partitions=25 # コネクタとタスクの構成を格納するために使用するトピック。これは、単一パーティション、高レプリケーション、 # および圧縮されたトピックである必要があります。Kafka Connect は、必要に応じてトピックを自動的に作成しようとしますが、特定のトピック構成が必要な場合は、 # Kafka Connect を起動する前にいつでも手動でトピックを作成できます。 # ほとんどのユーザーは、組み込みのデフォルトレプリケーション係数 3 を使用するか、場合によってはさらに大きい値を指定します。 # これは、最大レプリケーション係数として使用される数以上のブローカーが必要であることを意味するため、 # この例を単一ブローカクラスターで実行できるように、ここではレプリケーション係数を 1 に設定します。 config.storage.topic=connect-configs config.storage.replication.factor=1 # ステータスを格納するために使用するトピック。このトピックには複数のパーティションを含めることができ、レプリケートおよび圧縮されている必要があります。 # Kafka Connect は、必要に応じてトピックを自動的に作成しようとしますが、特定のトピック構成が必要な場合は、 # Kafka Connect を起動する前にいつでも手動でトピックを作成できます。 # ほとんどのユーザーは、組み込みのデフォルトレプリケーション係数 3 を使用するか、場合によってはさらに大きい値を指定します。 # これは、最大レプリケーション係数として使用される数以上のブローカーが必要であることを意味するため、 # この例を単一ブローカクラスターで実行できるように、ここではレプリケーション係数を 1 に設定します。 status.storage.topic=connect-status status.storage.replication.factor=1 #status.storage.partitions=5 # 通常よりもはるかに高速にフラッシュします。これは、テスト/デバッグに役立ちます。 offset.flush.interval.ms=10000 # これらは、REST ホストとポートの構成の存在についてユーザーに知らせるために提供されます。 # REST API がリッスンするホスト名とポート。これが設定されている場合、リクエストをリッスンするために使用されるインターフェースにバインドされます。 #rest.host.name= #rest.port=8083 # 他のワーカーに接続するために提供されるホスト名とポート。つまり、他のサーバーからルーティング可能な URL。 #rest.advertised.host.name= #rest.advertised.port= # プラグインのクラスロード分離を有効にするには、コンマ (,) で区切られたファイルシステムパスのリストに設定します。 # (コネクタ、コンバーター、変換)。リストは、次のいずれかの組み合わせを含むトップレベルディレクトリで構成する必要があります。 # a) プラグインとその依存関係を含む jar を直接含むディレクトリ # b) プラグインとその依存関係を含む uber-jar # c) プラグインとその依存関係のクラスのパッケージディレクトリ構造を直接含むディレクトリ # 例: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, #plugin.path=
$KAFKA_HOME ディレクトリに移動し、次のコマンドを実行して分散モードを有効にします。
重要各ノードでワーカープロセスを起動する必要があります。
bin/connect-distributed.sh config/connect-distributed.properties
REST API を使用してコネクタを管理します。詳細は、REST API をご参照ください。
config パスに connect-tablestore-sink-quickstart.json という名前のファイルを作成します。次のサンプルコードは、ファイルに追加する必要があるコンテンツの例を示しています。
コネクタ構成ファイルは、JSON 形式の文字列を使用して、構成項目のキーと値のペアを指定します。これらの項目には、コネクタクラス、Tablestore 接続パラメーター、およびデータマッピングが含まれます。詳細は、構成の説明をご参照ください。
{ "name": "tablestore-sink", "config": { "connector.class":"TableStoreSinkConnector", "tasks.max":"1", "topics":"test", "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com", "tablestore.access.key.id":"xxx", "tablestore.access.key.secret":"xxx", "tablestore.instance.name":"xxx", "table.name.format":"<topic>", "primarykey.mode":"kafka", "auto.create":"true" } }
次のコマンドを実行して、Tablestore Sink Connector クライアントを起動します。
curl -i -k -H "Content-type: application/json" -X POST -d @config/connect-tablestore-sink-quickstart.json http://localhost:8083/connectors
上記のコマンドで、
http://localhost:8083/connectors
は Kafka REST サービスのアドレスです。ビジネス要件に基づいてアドレスを変更します。
手順3: メッセージレコードを生成する
$KAFKA_HOME ディレクトリに移動し、次のコマンドを実行してコンソールプロデューサークライアントを起動します。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
次の表は、コンソールプロデューサークライアントを起動するために構成する必要があるパラメーターを示しています。
パラメーター
例
説明
--broker-list
localhost:9092
Kafka クラスター内のブローカーのアドレスとポート。
--topic
test
トピックの名前。デフォルトでは、Tablestore Sink Connector を起動するとトピックが自動的に作成されます。トピックを手動で作成することもできます。
test という名前のトピックにメッセージを書き込みます。
Struct 形式のメッセージ
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" }, { "type":"string", "optional":false, "field":"product" }, { "type":"int64", "optional":false, "field":"quantity" }, { "type":"double", "optional":false, "field":"price" } ], "optional":false, "name":"record" }, "payload":{ "id":1, "product":"foo", "quantity":100, "price":50 } }
Map 形式のメッセージ
{ "schema":{ "type":"map", "keys":{ "type":"string", "optional":false }, "values":{ "type":"int32", "optional":false }, "optional":false }, "payload":{ "id":1 } }
Tablestoreコンソールにログインしてデータを表示します。
Tablestore インスタンスに test という名前のデータテーブルが自動的に作成されます。次の図は、データテーブル内のデータを示しています。最初の行のデータはインポートされた Map 形式のメッセージの結果であり、2 番目の行のデータはインポートされた Struct 形式のメッセージの結果です。