すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Logstashを使用してdata Warehouse Editionにデータをインポートする

最終更新日:Jun 11, 2024

Logstashはオープンソースのサーバー側データ処理パイプラインで、複数のソースから同時にデータを収集し、データを変換して、指定された場所に保存できます。 AnalyticDB for MySQLはMySQLと完全に互換性があります。 Logstashの入力プラグインを使用して、データソースからAnalyticDB for MySQLにデータをインポートできます。 このトピックでは、Logstashを使用してApache KafkaからAnalyticDB for MySQL data Warehouse Edition (V3.0) にデータをインポートする方法について説明します。

Logstashプラグイン

  • さまざまなソースからさまざまなタイプおよびサイズのデータを収集するために使用される入力プラグイン

    通常、データは異なる形式で複数のシステムに保存されます。 Logstashは複数のデータ入力をサポートし、同時に複数のデータソースからデータを収集します。 Logstashは、ログ、メトリクス、webアプリケーション、データストア、AWSサービスからデータを継続的にストリーミングで収集できます。

  • リアルタイムでデータの解析と変換に使用されるプラグインのフィルタリング

    Logstashはフィルターを使用してすべてのタイプのイベントを解析し、定義されたフィールドを識別してスキーマを構築します。 次に、スキーマを共通のデータ型に変換し、データを宛先リポジトリに送信することで、データを簡単かつ効率的に分析して最大限に活用することができます。 フィルタプラグインには、次の機能があります。

    • Grokを使用して、非構造化データを構造化データに解析します。
    • IPアドレスから地理情報を解析します。
    • 個人を特定できる情報 (PII) を匿名化して、機密フィールドを完全に除外します。
    • データソース、形式、アーキテクチャの影響を受けずに、処理全体を簡素化します。
  • データのエクスポートに使用される出力プラグイン

    Logstashは複数の出力を提供し、さまざまな下流のユースケースに柔軟に適応できます。AnalyticDB for MySQL

手順

Apache Kafkaは、高スループットでログを公開およびサブスクライブできる分散サービスです。 高可用性、高性能、分散アーキテクチャ、高いスケーラビリティ、および高い耐久性を提供します。 Apache Kafkaは大手企業で広く使用されています。 Logstashと統合でき、繰り返し構築する必要がなくなります。

  1. Apache Kafkaサーバーのルートディレクトリで、次のコマンドを実行してプラグインをインストールおよび更新します。
    $ bin/pluginインストール
    $bin /プラグインの更新 

    Logstash 1.5以降のバージョンはApache Kafkaと統合されており、すべてのプラグインのディレクトリと名前を変更します。 Logstashプラグインの詳細については、『GitHub』をご参照ください。

  2. プラグインの設定
    • 入力設定の例

      次の例は、Kafkaコンシューマーの使用方法を示しています。

      input {
          kafka {
              zk_connect => "localhost:2181"
              group_id => "Logstash"
              topic_id => "test"
              codec => プレーン
              reset_begin=> false # boolean (オプション) 、デフォルト: false
              consumer_threads => 5# 番号 (オプション) 、デフォルト: 1
              decorate_events => true # boolean (オプション) 、デフォルト: false
          }
      }          
      パラメーター:
      • group_id: 一意のグループIDを持つコンシューマーグループ。 異なる消費者グループからの消費は互いに分離されています。
      • topic_id: トピック。 コンシューマーは最初にトピックをサブスクライブし、次にトピック内のデータを消費します。
      • reset_begin: Logstashがデータを消費し始める位置。 デフォルトでは、Logstashは前のオフセットからデータを消費し始めます。 データが消費されていない場合、Logstashは開始オフセットからデータの消費を開始します。

        元のデータをインポートするには、reset_startingtrueに設定する必要があります。 このように、Logstashはcatコマンドと同じ方法で開始オフセットからデータを消費します。 Logstashが最後の行を消費すると、システムはreset_startをtail -Fに設定し、Logstashは終了せずに対応するデータを監視し続けます。

      • decore_events: メッセージの発行時に固有の情報を提供します。 この情報には、消費されたメッセージのサイズ、トピックソース、および消費者グループ情報が含まれます。
      • rebalance_max_retries: パーティションの所有者レジストリノードを登録するために実行できる再試行の回数。 新しいコンシューマがコンシューマグループに参加すると、リバランスが実行され、その後、特定のパーティションが新しいコンシューマに移動されます。 新しいコンシューマーが特定のパーティションに対する消費権限を取得すると、新しいコンシューマーはパーティション所有者レジストリノードをZooKeeperに登録します。 再試行は、元のコンシューマがノードを解放するまで実行される。
      • consumer_timeout_ms: メッセージを受信するためのタイムアウト期間。 通常、このパラメーターを変更する必要はありません。

      入力パラメーターの詳細については、『GitHub』をご参照ください。

      説明 複数のコンシューマーが同じトピックのメッセージを並行して消費する必要がある場合は、トピックを複数のパーティションに分割し、2つ以上のコンシューマーに同じgroup_idtopic_idの値を設定する必要があります。 これにより、メッセージが順番に消費されます。
    • 出力設定例
      {Output
          jdbc {
              driver_class => "com.mysql.jdbc.Driver"
              connection_string => "jdbc:mysql:// HOSTNAME/DATABASE?user=USER&password=PASSWORD"
              statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@ timestamp", "message"]
          }
      }         
      パラメーター:
      • connect_string: AnalyticDB for MySQLクラスターへの接続に使用されるエンドポイント。
      • : INSERT文で宣言された配列。

      出力パラメーターの詳細については、『GitHub』をご参照ください。

  3. Logstashのインストールディレクトリでbin/Logstash -f config/xxxx.confコマンドを実行してタスクを開始し、KafkaデータをAnalyticDB for MySQLに書き込みます。