Logstashはオープンソースのサーバー側データ処理パイプラインで、複数のソースから同時にデータを収集し、データを変換して、指定された場所に保存できます。 AnalyticDB for MySQLはMySQLと完全に互換性があります。 Logstashの入力プラグインを使用して、データソースからAnalyticDB for MySQLにデータをインポートできます。 このトピックでは、Logstashを使用してApache KafkaからAnalyticDB for MySQL data Warehouse Edition (V3.0) にデータをインポートする方法について説明します。
Logstashプラグイン
- さまざまなソースからさまざまなタイプおよびサイズのデータを収集するために使用される入力プラグイン
通常、データは異なる形式で複数のシステムに保存されます。 Logstashは複数のデータ入力をサポートし、同時に複数のデータソースからデータを収集します。 Logstashは、ログ、メトリクス、webアプリケーション、データストア、AWSサービスからデータを継続的にストリーミングで収集できます。
- リアルタイムでデータの解析と変換に使用されるプラグインのフィルタリング
Logstashはフィルターを使用してすべてのタイプのイベントを解析し、定義されたフィールドを識別してスキーマを構築します。 次に、スキーマを共通のデータ型に変換し、データを宛先リポジトリに送信することで、データを簡単かつ効率的に分析して最大限に活用することができます。 フィルタプラグインには、次の機能があります。
- Grokを使用して、非構造化データを構造化データに解析します。
- IPアドレスから地理情報を解析します。
- 個人を特定できる情報 (PII) を匿名化して、機密フィールドを完全に除外します。
- データソース、形式、アーキテクチャの影響を受けずに、処理全体を簡素化します。
- データのエクスポートに使用される出力プラグイン
Logstashは複数の出力を提供し、さまざまな下流のユースケースに柔軟に適応できます。AnalyticDB for MySQL
手順
Apache Kafkaは、高スループットでログを公開およびサブスクライブできる分散サービスです。 高可用性、高性能、分散アーキテクチャ、高いスケーラビリティ、および高い耐久性を提供します。 Apache Kafkaは大手企業で広く使用されています。 Logstashと統合でき、繰り返し構築する必要がなくなります。
- Apache Kafkaサーバーのルートディレクトリで、次のコマンドを実行してプラグインをインストールおよび更新します。
$ bin/pluginインストール $bin /プラグインの更新
Logstash 1.5以降のバージョンはApache Kafkaと統合されており、すべてのプラグインのディレクトリと名前を変更します。 Logstashプラグインの詳細については、『GitHub』をご参照ください。
- プラグインの設定
- 入力設定の例
次の例は、Kafkaコンシューマーの使用方法を示しています。
input { kafka { zk_connect => "localhost:2181" group_id => "Logstash" topic_id => "test" codec => プレーン reset_begin=> false # boolean (オプション) 、デフォルト: false consumer_threads => 5# 番号 (オプション) 、デフォルト: 1 decorate_events => true # boolean (オプション) 、デフォルト: false } }
パラメーター:group_id
: 一意のグループIDを持つコンシューマーグループ。 異なる消費者グループからの消費は互いに分離されています。topic_id
: トピック。 コンシューマーは最初にトピックをサブスクライブし、次にトピック内のデータを消費します。reset_begin
: Logstashがデータを消費し始める位置。 デフォルトでは、Logstashは前のオフセットからデータを消費し始めます。 データが消費されていない場合、Logstashは開始オフセットからデータの消費を開始します。元のデータをインポートするには、
reset_starting
をtrue
に設定する必要があります。 このように、Logstashはcatコマンドと同じ方法で開始オフセットからデータを消費します。 Logstashが最後の行を消費すると、システムはreset_startをtail -F
に設定し、Logstashは終了せずに対応するデータを監視し続けます。decore_events
: メッセージの発行時に固有の情報を提供します。 この情報には、消費されたメッセージのサイズ、トピックソース、および消費者グループ情報が含まれます。rebalance_max_retries
: パーティションの所有者レジストリノードを登録するために実行できる再試行の回数。 新しいコンシューマがコンシューマグループに参加すると、リバランスが実行され、その後、特定のパーティションが新しいコンシューマに移動されます。 新しいコンシューマーが特定のパーティションに対する消費権限を取得すると、新しいコンシューマーはパーティション所有者レジストリノードをZooKeeperに登録します。 再試行は、元のコンシューマがノードを解放するまで実行される。consumer_timeout_ms
: メッセージを受信するためのタイムアウト期間。 通常、このパラメーターを変更する必要はありません。
入力パラメーターの詳細については、『GitHub』をご参照ください。
説明 複数のコンシューマーが同じトピックのメッセージを並行して消費する必要がある場合は、トピックを複数のパーティションに分割し、2つ以上のコンシューマーに同じgroup_id
とtopic_id
の値を設定する必要があります。 これにより、メッセージが順番に消費されます。 - 出力設定例
{Output jdbc { driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql:// HOSTNAME/DATABASE?user=USER&password=PASSWORD" statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@ timestamp", "message"] } }
パラメーター:connect_string
: AnalyticDB for MySQLクラスターへの接続に使用されるエンドポイント。文
: INSERT文で宣言された配列。
出力パラメーターの詳細については、『GitHub』をご参照ください。
- 入力設定の例
- Logstashのインストールディレクトリで
bin/Logstash -f config/xxxx.conf
コマンドを実行してタスクを開始し、KafkaデータをAnalyticDB for MySQLに書き込みます。