すべてのプロダクト
Search
ドキュメントセンター

Tablestore:時系列テーブルへのデータ同期

最終更新日:Dec 28, 2024

kafka-connect-tablestore パッケージを使用して、Apache Kafka から Tablestore の時系列テーブルにデータを同期できます。このトピックでは、Kafka から Tablestore の時系列テーブルへのデータ同期を構成する方法について説明します。

前提条件

  • Apache Kafka がインストールされ、有効になっており、ZooKeeper が有効になっていること。詳細については、Kafka のドキュメントを参照してください。

  • Tablestore サービスがアクティブ化され、インスタンスと時系列テーブルが作成されていること。詳細については、Tablestore の使用を参照してください。

    説明

    Tablestore Sink Connector を使用して、宛先の時系列テーブルを自動的に作成することもできます。この時系列テーブルを作成するには、auto.create を true に設定します。

  • AccessKey ペアを取得していること。詳細については、AccessKey ペアの取得を参照してください。

背景情報

Tablestore は時系列データを格納でき、時系列データの分析をサポートしています。詳細については、TimeSeries モデルを参照してください。

手順 1: Tablestore Sink Connector をデプロイする

  1. 次のいずれかの方法で Tablestore Sink Connector パッケージを取得します。

    • GitHub のTablestore Sink Connector ソースコードからソースコードをダウンロードし、ソースコードをコンパイルします。

      1. Git ツールを使用して、次のコマンドを実行して Tablestore Sink Connector のソースコードをダウンロードします。

        git clone https://github.com/aliyun/kafka-connect-tablestore.git
      2. ダウンロードしたソースコードが保存されているディレクトリに移動し、Maven を使用して次のコマンドを実行してソースコードをパッケージ化します。

        mvn clean package -DskipTests

        コンパイルが完了すると、生成されたパッケージは target ディレクトリに保存されます。 kafka-connect-tablestore-1.0.jar パッケージを例として使用します。

    • コンパイル済みのkafka-connect-tablestore パッケージをダウンロードします。

  2. 各ノードの $KAFKA_HOME/libs ディレクトリにパッケージをコピーします。

手順 2: Tablestore Sink Connector を起動する

Tablestore Sink Connector は、スタンドアロンモードまたは分散モードで動作できます。ビジネス要件に基づいてモードを選択できます。

時系列データを Tablestore に書き込むには、Kafka のメッセージレコードが JSON 形式である必要があります。したがって、Tablestore Sink Connector を起動するには Jsonconverter が必要です。スキーマを抽出してキーを入力する必要はありませんが、connect-standalone.properties または connect-distributed.properties の構成項目を構成する必要があります。次のサンプルコードは、構成項目を構成する方法を示しています。

説明

キーを入力する場合は、キーの形式に基づいて key.converter と key.converter.schemas.enable を構成する必要があります。

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

このセクションでは、スタンドアロンモードで Tablestore の時系列テーブルにデータを同期する方法について説明します。分散モードで Tablestore の時系列テーブルにデータを同期する手順は、分散モードで Tablestore のデータテーブルにデータを同期する手順と似ています。ただし、ワーカー構成ファイル connect-distributed.properties の上記の構成項目を変更し、コネクタ構成ファイル connect-tablestore-sink-quickstart.json の時系列関連の構成項目を変更する必要があります。詳細については、手順 2: Tablestore Sink Connector を起動するの分散モードでの構成手順を参照してください。

スタンドアロンモードで Tablestore Sink Connector を使用するには、次の手順を実行します。

  1. 要件に基づいて、ワーカー構成ファイル connect-standalone.properties とコネクタ構成ファイル connect-tablestore-sink-quickstart.properties を変更します。

    • ワーカー構成ファイル connect-standalone.properties の変更例

      ワーカー構成ファイルには、Kafka 接続パラメータ、シリアル化形式、オフセットがコミットされる頻度などの構成項目が含まれています。次のサンプルコードは、Apache Kafka が提供するワーカー構成ファイルの変更例です。詳細については、Kafka Connectを参照してください。

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #    http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      
      # これらはデフォルト値です。このファイルは、一部の設定をオーバーライドする方法を示すためのものです。
      bootstrap.servers=localhost:9092
      
      # コンバーターは、Kafka のデータの形式と、Connect データへの変換方法を指定します。すべての Connect ユーザーは、Kafka からロードまたは Kafka に保存するときに必要なデータ形式に基づいて、これらを構成する必要があります。
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      # コンバーター固有の設定は、適用するコンバーターの前にコンバーターの設定を付けることで渡すことができます。
      key.converter.schemas.enable=true
      value.converter.schemas.enable=false
      
      offset.storage.file.filename=/tmp/connect.offsets
      # 通常よりもはるかに速くフラッシュします。これはテスト/デバッグに役立ちます。
      offset.flush.interval.ms=10000
      
      # プラグイン (コネクタ、コンバーター、変換) のクラスロード分離を有効にするには、コンマ (,) で区切られたファイルシステムパスのリストに設定します。リストは、次のいずれかの組み合わせを含むトップレベルディレクトリで構成する必要があります。
      # a) プラグインとその依存関係を含む jar を直接含むディレクトリ
      # b) プラグインとその依存関係を含む uber-jar
      # c) プラグインとその依存関係のクラスのパッケージディレクトリ構造を直接含むディレクトリ
      # 注: シンボリックリンクがたどられ、依存関係またはプラグインが検出されます。
      # 例:
      # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
      #plugin.path=
    • コネクタ構成ファイル connect-tablestore-sink-quickstart.properties の変更例

      コネクタ構成ファイルには、コネクタクラス、Tablestore 接続パラメータ、データマッピングなどの構成項目が含まれています。詳細については、構成の説明を参照してください。

      # コネクタ名を指定します。
      name=tablestore-sink
      # コネクタクラスを指定します。
      connector.class=TableStoreSinkConnector
      # タスクの最大数を指定します。
      tasks.max=1
      # データをエクスポートする Kafka トピックのリストを指定します。
      topics=test
      
      # 次の Tablestore 接続パラメータの値を指定します。
      # Tablestore インスタンスのエンドポイント。
      tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
      # 認証モード。
      tablestore.auth.mode=aksk
      # AccessKey ID と AccessKey シークレット。 tablestore.auth.mode が aksk に設定されている場合は、AccessKey ID と AccessKey シークレットを指定する必要があります。
      tablestore.access.key.id=xxx
      tablestore.access.key.secret=xxx
      # Tablestore インスタンスの名前。
      tablestore.instance.name=xxx
      ## Security Token Service (STS) 認証に関連する構成項目。 STS 認証を使用する場合は、次の構成項目を指定する必要があります。また、STS 認証を使用する場合は、環境変数に ACCESS_ID と ACCESS_KEY を指定する必要があります。
      #sts.endpoint=
      #region=
      #account.id=
      #role.name=
      
      # 宛先 Tablestore テーブルの名前の形式文字列を指定します。文字列内で <topic> を、データをエクスポートするトピックのプレースホルダーとして使用できます。
      # topics.assign.tables は table.name.format よりも優先順位が高く割り当てられます。 topics.assign.tables が指定されている場合、table.name.format の構成は無視されます。
      # たとえば、table.name.format が kafka_<topic> に設定されており、データをエクスポートする Kafka トピックの名前が test の場合、test トピックからの Kafka メッセージレコードは Tablestore の kafka_test という名前のテーブルにマッピングされます。
      table.name.format=<topic>
      # Kafka トピックと宛先 Tablestore テーブル間のマッピングを指定します。値は <topic>:<tablename> 形式である必要があります。トピック名とテーブル名はコロン (:) で区切ります。複数のマッピングを指定する場合は、複数のマッピングをコンマ (,) で区切ります。
      # マッピングが指定されていない場合は、table.name.format の構成が使用されます。
      # topics.assign.tables=test:test_kafka
      
      
      # 宛先テーブルを自動的に作成するかどうかを指定します。デフォルト値: false。
      auto.create=true
      
      
      # ダーティデータの処理方法を指定します。
      # Kafka メッセージレコードの解析時または時系列テーブルへの書き込み時にエラーが発生する場合があります。次の 2 つのパラメータを指定して、エラーの修正方法を決定できます。
      # フォールトトレランス機能を指定します。有効な値: none と all。デフォルト値: none。
      # none: エラーが発生すると、Tablestore Sink Connector を使用するデータインポートタスクが失敗します。
      # all: エラーが報告されたメッセージレコードはスキップされ、ログに記録されます。
      runtime.error.tolerance=none
      # ダーティデータのログ記録方法を指定します。有効な値: ignore、kafka、tablestore。デフォルト値: ignore。
      # ignore: すべてのエラーが無視されます。
      # kafka: エラーが報告されたメッセージレコードとエラーメッセージは、別の Kafka トピックに保存されます。
      # tablestore: エラーが報告されたメッセージレコードとエラーメッセージは、Tablestore データテーブルに保存されます。
      runtime.error.mode=ignore
      
      # runtime.error.mode を kafka に設定した場合は、Kafka クラスタアドレスとトピックを指定する必要があります。
      # runtime.error.bootstrap.servers=localhost:9092
      # runtime.error.topic.name=errors
      
      # runtime.error.mode を tablestore に設定した場合は、Tablestore データテーブルの名前を指定する必要があります。
      # runtime.error.table.name=errors
      
      ## 次の構成項目は、Apache Kafka から Tablestore の時系列テーブルへのデータ同期に固有のものです。
      
      # コネクタの動作モード。デフォルト値: normal。
      tablestore.mode=timeseries
      # 時系列テーブルのプライマリキーフィールドのマッピング。
      tablestore.timeseries.test.measurement=m
      tablestore.timeseries.test.dataSource=d
      tablestore.timeseries.test.tags=region,level
      # 時系列テーブルの時間フィールドのマッピング。
      tablestore.timeseries.test.time=timestamp
      tablestore.timeseries.test.time.unit=MILLISECONDS
      # 時系列データフィールドの列名を小文字に変換するかどうかを指定します。デフォルト値: true。 TimeSeries モデルの時系列テーブルの列名は、大文字をサポートしていません。 tablestore.timeseries.toLowerCase が false に設定されており、列名に大文字が含まれている場合、時系列テーブルにデータが書き込まれるときにエラーが報告されます。
      tablestore.timeseries.toLowerCase=true
      # プライマリキーフィールドと時間フィールド以外のフィールドを時系列テーブルの時系列データフィールドとして保存するかどうかを指定します。デフォルト値: true。 tablestore.timeseries.mapAll が false に設定されている場合、tablestore.timeseries.test.field.name を使用して指定されたフィールドのみが時系列データフィールドとして時系列テーブルに保存されます。
      tablestore.timeseries.mapAll=true
      # 時系列データフィールドに含まれるフィールドの名前を指定します。時系列データフィールドに含まれる複数のフィールドを指定する場合は、複数のフィールド名をコンマ (,) で区切ります。
      tablestore.timeseries.test.field.name=cpu
      # 時系列データフィールドに含まれるフィールドのタイプを指定します。有効な値: double、integer、string、binary、boolean。
      # 時系列データフィールドに複数のフィールドが含まれている場合、フィールドタイプとフィールド名はペアで構成する必要があります。複数のフィールドタイプはコンマ (,) で区切ります。
      tablestore.timeseries.test.field.type=double
  2. $KAFKA_HOME ディレクトリに移動し、次のコマンドを実行してスタンドアロンモードを有効にします。

    bin/connect-standalone.sh config/connect-standalone.properties config/connect-tablestore-sink-quickstart.properties

手順 3: メッセージレコードを生成する

  1. $KAFKA_HOME ディレクトリに移動し、次のコマンドを実行してコンソールプロデューサークライアントを起動します。

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    次の表は、コンソールプロデューサークライアントを起動するために構成する必要があるパラメータを示しています。

    パラメータ

    説明

    --broker-list

    localhost:9092

    Kafka クラスタのブローカーのアドレスとポート。

    --topic

    test

    トピックの名前。デフォルトでは、Tablestore Sink Connector を起動すると、トピックが自動的に作成されます。トピックを手動で作成することもできます。

  2. test という名前のトピックにメッセージを書き込みます。

    重要

    時系列テーブルにデータをインポートするには、JSON 形式のデータを Kafka トピックに書き込む必要があります。

    {"m":"cpu","d":"127.0.0.1","region":"shanghai","level":1,"timestamp":1638868699090,"io":5.5,"cpu":"3.5"}
  3. Tablestore コンソールにログインしてデータを表示します。