KafkaProducer SDK、Beats、collectd、Fluentd、Logstash、Telegraf、Vectorなどのツールを使用してログを収集し、Kafkaプロトコルを使用してSimple Log Serviceにログをアップロードできます。 このトピックでは、Kafkaプロトコルを使用して、ログ収集ツールを使用してログを収集し、収集したログをSimple log Serviceにアップロードする方法について説明します。
制限事項
Kafka 2.1.0以降のバージョンのみがサポートされています。
ログ送信のセキュリティを確保するには、SASL_SSLプロトコルを使用する必要があります。
データ解析
Kafkaプロトコルを使用してアップロードされたログは、contentフィールドに保存されます。 ログがJSONタイプの場合、contentフィールドにJSONインデックスを設定できます。 詳細については、「JSONタイプ」をご参照ください。
KafkaProducer SDKまたはBeatsを使用してログを収集する場合、収集設定でtopicまたはheadersパラメーターを設定して、ログをJSON形式で自動的に表示できます。 この場合、Simple Log Serviceは自動的にcontentフィールドを展開します。 contentフィールドにJSONインデックスを設定する必要はありません。 詳細については、「設定方法」をご参照ください。
設定方法
Kafkaプロトコルを使用してログをSimple Log Serviceにアップロードする場合、関連するパラメーターを設定する必要があります。 下表に、各パラメーターを説明します。
パラメーター名は、ログ収集ツールによって異なります。 ビジネスシナリオに基づいてパラメーターを設定します。
パラメーター | 説明 |
接続タイプ | セキュリティプロトコル。 ログ送信のセキュリティを確保するには、SASL_SSLプロトコルを使用する必要があります。 |
ホスト | 初期接続が確立されるアドレス。 Simple Log Serviceプロジェクトのエンドポイントは、
|
トピック | ログストアの名前 KafkaProducer SDKまたはBeatsを使用してログを収集し、出力形式をJSONとして指定する場合、topicパラメーターを |
ヘッダー | KafkaProducer SDKまたはBeatsを使用してログを収集し、出力形式をJSONとして指定する場合、headersパラメーターを次の内容に設定して、JSONログを自動的に展開できます。
詳細については、「例1: Beatsを使用したログのアップロード」をご参照ください。 |
ユーザー名 | プロジェクトの名前。 |
パスワード | AccessKeyペア。 値は ${access-key-id }#${ access-key-secret} 形式で指定する必要があります。 ${access-key-id} と ${access-key-secret} をAccessKey IDとAccessKey secretに置き換えます。 Resource Access Management (RAM) ユーザーのAccessKeyペアを使用することを推奨します。 詳細については、「RAMユーザーを作成し、RAMユーザーにSimple Log Serviceへのアクセスを許可する」をご参照ください。 |
証明書ファイル | エンドポイントの証明書ファイル。 Simple Log Serviceの各エンドポイントには証明書があります。 このパラメーターをサーバーのルート証明書へのパスに設定します。 例: /etc/ssl/certs/ca-bundle.crt |
Kafkaコンシューマーグループを使用してSimple Log Serviceのデータをリアルタイムで消費する場合は、 ticket を使用してAlibaba Cloudテクニカルサポートに連絡します。
例1: Beatsを使用したログのアップロード
Metricbeat、Packetbeat、Winlogbeat、Auditbeat、Filebeat、HeartbeatなどのBeatsを使用してログを収集できます。 ログが収集されたら、Kafkaプロトコルを使用してログをSimple Log Serviceにアップロードできます。 詳細については、「Beats-Kafka-Output」をご参照ください。
例 1:
設定例
output.kafka: # initial brokers for reading cluster metadata hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] username: "yourusername" password: "yourpassword" ssl.certificate_authorities: # message topic selection + partitioning topic: 'test-logstore-1' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
サンプルログ
デフォルトでは、BeatsはJSON形式のログを提供します。 ログはSimple Log Serviceにアップロードされ、コンテンツフィールドに保存されます。 contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。
例 2:
設定例
output.kafka: enabled: true hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"] username: "test-project-1" password: "access-key-id#access-key-secret" ssl.certificate_authorities: topic: 'test-logstore-1' headers: - key: "data-parse-format" value: "json" partition.hash: reachable_only: false
サンプルログ
headersパラメーターを設定して、JSONログを自動的に展開できます。
例2: collectdを使用したログのアップロード
collectdは、システムとアプリケーションのパフォーマンスメトリックを定期的に収集するデーモンプロセスです。 Kafkaプロトコルを使用して、収集したメトリクスをSimple Log Serviceにアップロードできます。 詳細については、「Kafkaプラグインの書き込み」をご参照ください。
collectdによって収集されたログをSimple Log Serviceにアップロードする前に、collectd-write_kafkaプラグインと関連する依存関係をインストールする必要があります。 たとえば、CentOS Linuxサーバーでは、sudo yum install collectd-write_kafka
コマンドを実行してcollectd-write_kafkaプラグインをインストールできます。 RPM Package Manager (RPM) パッケージのインストール方法の詳細については、「Collectd-write_kafka」をご参照ください。
設定例
collectdは、JSON、Command、Graphiteなどのさまざまな形式をサポートしています。 この例では、JSON形式が使用されています。 詳細については、「collectd documentation」をご参照ください。
<Plugin write_kafka> Property "metadata.broker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "yourusername" Property "sasl.password" "yourpassword" Property "broker.address.family" "v4" <Topic "test-logstore-1"> Format JSON Key "content" </Topic> </Plugin>
サンプルログ
JSON形式のログがSimple Log Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。
例3: Telegrafを使用したログのアップロード
TelegrafはGoプログラミング言語で記述されたエージェントで、メトリクスの収集、処理、集計に使用されます。 Telegrafは少量のメモリリソースしか消費しません。 詳細については、「Telegraf」をご参照ください。 Telegrafは、さまざまなプラグインと統合機能を提供します。 Telegrafを使用して、Telegrafが実行されているシステムから、またはサードパーティのAPIを呼び出すことによってメトリックを取得できます。 Telegrafを使用して、StatsDおよびKafkaコンシューマを使用してメトリックを監視することもできます。
Telegrafを使用して収集されたログをSimple Log Serviceにアップロードする前に、Telegrafの設定ファイルを変更する必要があります。
設定例
Telegrafは、JSON、Carbon2、Graphiteなどのさまざまな形式をサポートしています。 この例では、JSON形式が使用されています。 詳細については、「Telegrafの出力データ形式」をご参照ください。
説明tls_caに有効なパスを指定する必要があります。 サーバー上のルート証明書へのパスを指定できます。 ほとんどの場合、Linuxサーバーのルート証明書へのパスは /etc/ssl/certs/ca-bundle.crtです。
[[outputs.kafka]] ## URLs of kafka brokers brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] ## Kafka topic for producer messages topic = "test-logstore-1" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "yourusername" sasl_password = "yourpassword" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
サンプルログ
JSON形式のログがSimple Log Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。
例4: Fluentdを使用したログのアップロード
Fluentdはオープンソースのログコレクターです。 これは、Cloud Native Computing Foundation (CNCF) のプロジェクトです。 Fluentdのすべてのコンポーネントは、Apache 2ライセンスで利用できます。 詳細については、「Fluentd」をご参照ください。
Fluentdは、さまざまな入力、処理、および出力プラグインをサポートします。 Fluentdを使用してログを収集し、fluent-plugin-kafkaプラグインを使用して収集したログをSimple Log Serviceにアップロードできます。 プラグインのインストールと設定のみが必要です。 詳細については、「fluent-plugin-kafka」をご参照ください。
設定例
Fluentdは数十のフォーマットをサポートしています。 この例では、JSON形式が使用されています。 詳細については、「Fluentd Formatter」をご参照ください。
<match **> @type kafka # Brokers: you can choose either brokers or zookeeper. brokers test-project-1.cn-hangzhou.log.aliyuncs.com:10012 default_topic test-logstore-1 default_message_key content output_data_type json output_include_tag true output_include_time true sasl_over_ssl true username yourusername // The username. Replace yourusername with the actual value. password "yourpassword" // The password. Replace yourpassword with the actual value. ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 10000 required_acks 1 compression_codec gzip </match>
サンプルログ
JSON形式のログがSimple Log Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。
例5: Logstashを使用したログのアップロード
Logstashは、リアルタイム処理機能を提供するオープンソースのログ収集エンジンです。 Logstashを使用して、さまざまなソースからログを動的に収集できます。 詳細は、「Logstash」をご参照ください。
Logstash は、組み込みの Kafka 出力プラグインを提供します。 Kafkaプロトコルを使用して、ログを収集し、収集したログをSimple Log ServiceにアップロードするようにLogstashを設定できます。 Simple Log Serviceは、データ送信中にSASLSSLプロトコルを使用します。 SSL証明書とJava Authentication and Authorization Service (JAAS) ファイルを設定する必要があります。
設定例
JAASファイルを作成し、ディレクトリに保存します。 例: /etc/kafka/kafka_client_jaas.conf
JAASファイルに次のコンテンツを追加します。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="yourpassword"; };
SSL証明書を設定し、証明書をディレクトリに保存します。 例: /etc/kafka/client-root.truststore.jks
Logstashを設定します。
Logstashは数十のフォーマットをサポートしています。 この例では、JSON形式が使用されています。詳細については、「Logstash Codec」をご参照ください。
説明次の構成は、ネットワーク接続のテストに使用されます。 本番環境では、stdoutフィールドを削除することを推奨します。
output { stdout { codec => rubydebug } kafka { topic_id => "test-logstore-1" bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" security_protocol => "SASL_SSL" ssl_truststore_location => "/etc/client-root.truststore.jks" ssl_truststore_password => "123456" jaas_path => "/etc/kafka_client_jaas.conf" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
サンプルログ
JSON形式のログがSimple Log Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。
例6: KafkaProducer SDKを使用したログのアップロード
設定例
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // Configuration information. Properties props = new Properties(); String project = "etl-dev"; String logstore = "testlog"; // Set the following parameter to true if you want to automatically expand JSON logs: boolean parseJson = true; // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Simple Log Service is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can save your AccessKey ID and AccessKey secret in your configuration file if required. // To prevent key leaks, we recommend that you do not save your AccessKey ID and AccessKey secret in the code. String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-huhehaote.log.aliyuncs.com"; // The endpoint varies based on the region of your Simple Log Service project. String port = "10012"; // For a public endpoint, set the port number to 10012. For an internal endpoint, set the port number to 10011. String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put("bootstrap.servers", hosts); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put("enable.idempotence", "false"); // The Kafka write interface of Simple Log Service does not support transactions. // Specify the serialization class for data keys and values. props.put("key.serializer", StringSerializer.class); props.put("value.serializer", StringSerializer.class); // Create a producer instance. KafkaProducer<String,String> producer = new KafkaProducer<>(props); // Send records. for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; ProducerRecord record = new ProducerRecord<String, String>(topic, content); producer.send(record); } producer.close(); } }
POM依存
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>
サンプルログ
例7: Fluent Bitを使用したログのアップロード
Fluent Bitは、軽量でスケーラブルなロギングおよびメトリクスプロセッサおよびフォワーダです。 Fluent Bitは、さまざまな入力、処理、および出力プラグインをサポートします。 Kafka出力プラグインを使用して、ログをSimple Log Serviceにアップロードできます。 詳細については、「Kafka output plugin」をご参照ください。
設定例
設定方法の詳細については、「設定方法」をご参照ください。
[Output] Name kafka Match * Brokers etl-shanghai.cn-shanghai.log.aliyuncs.com:10012 Topics etl-out Format json rdkafka.sasl.username yourusername rdkafka.sasl.password yourpassword rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
サンプルログ
例8: Vectorを使用したログのアップロード
Vectorは、Kafkaプロトコルを使用してログをレポートできる、軽量で高性能なログ処理ツールです。 詳細については、「ベクター」をご参照ください。 次の例は、Kafkaプロトコルを使用してSimple Log Serviceにログを書き込むVectorの設定を示しています。
設定例
[sinks.aliyun_sls] type = "kafka" inputs = ["file_logs"] # The source. In this example, a log file is monitored. bootstrap_servers = "etl-dev.cn-huhehaote.log.aliyuncs.com:10012" compression = "gzip" healthcheck = true topic = "dst-kafka.json" # dst-kafka is the name of the Logstore. The .json suffix indicates that JSON logs are automatically expanded. encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "etl-dev" # etl-dev is the name of the project. sasl.password = "{{The AccessKey ID and AccessKey secret of the RAM user in the {AK#SK} format.}}" tls.enabled = true
サンプルログ
エラー
Kafkaプロトコルを使用してログをアップロードできない場合は、エラーが報告されます。 次の表にエラーを示します。 詳細については、「エラーリスト」をご参照ください。
エラー | 説明 | 解決策 |
NetworkException | ネットワーク例外が発生しました。 | 3 秒待ってからやり直します。 |
TopicAuthorizationException | 認証に失敗しました。 | AccessKeyペアが無効な場合、またはAccessKeyペアに指定されたプロジェクトまたはLogstoreにデータを書き込む権限がない場合、認証は失敗します。 この場合、有効なAccessKeyペアを入力し、AccessKeyペアに必要な書き込み権限があることを確認します。 |
UnknownTopicOrPartitionException | 次のいずれかのエラーが発生しました。
| 指定されたプロジェクトまたはLogstoreが存在することを確認します。 指定されたプロジェクトまたはLogstoreが存在するが、エラーが続く場合は、指定されたプロジェクトが存在するリージョンが指定されたエンドポイントのリージョンと同じかどうかを確認します。 |
KafkaStorageException | サーバエラーが発生しました。 | 3 秒待ってからやり直します。 |