すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Kafkaプロトコルを使用してログをアップロードする

最終更新日:Sep 29, 2024

KafkaProducer SDK、Beats、collectd、Fluentd、Logstash、Telegraf、Vectorなどのツールを使用してログを収集し、Kafkaプロトコルを使用してSimple Log Serviceにログをアップロードできます。 このトピックでは、Kafkaプロトコルを使用して、ログ収集ツールを使用してログを収集し、収集したログをSimple log Serviceにアップロードする方法について説明します。

制限事項

  • Kafka 2.1.0以降のバージョンのみがサポートされています。

  • ログ送信のセキュリティを確保するには、SASL_SSLプロトコルを使用する必要があります。

データ解析

Kafkaプロトコルを使用してアップロードされたログは、contentフィールドに保存されます。 ログがJSONタイプの場合、contentフィールドにJSONインデックスを設定できます。 詳細については、「JSONタイプ」をご参照ください。

KafkaProducer SDKまたはBeatsを使用してログを収集する場合、収集設定でtopicまたはheadersパラメーターを設定して、ログをJSON形式で自動的に表示できます。 この場合、Simple Log Serviceは自動的にcontentフィールドを展開します。 contentフィールドにJSONインデックスを設定する必要はありません。 詳細については、「設定方法」をご参照ください。

設定方法

Kafkaプロトコルを使用してログをSimple Log Serviceにアップロードする場合、関連するパラメーターを設定する必要があります。 下表に、各パラメーターを説明します。

説明

パラメーター名は、ログ収集ツールによって異なります。 ビジネスシナリオに基づいてパラメーターを設定します。

パラメーター

説明

接続タイプ

セキュリティプロトコル。 ログ送信のセキュリティを確保するには、SASL_SSLプロトコルを使用する必要があります。

ホスト

初期接続が確立されるアドレス。 Simple Log Serviceプロジェクトのエンドポイントは、プロジェクト名. endpoint形式で指定できます。 詳細については、「エンドポイント」をご参照ください。

  • 内部エンドポイントの例: test-project-1.cn-hangzhou-intranet.log.aliyuncs.com:10011。 ポート番号は10011です。

  • パブリックエンドポイントの例: test-project-1.cn-hangzhou.log.aliyuncs.com:10012。 ポート番号は10012です。

トピック

ログストアの名前

KafkaProducer SDKまたはBeatsを使用してログを収集し、出力形式をJSONとして指定する場合、topicパラメーターをLogstore name.json形式の値に設定して、JSONログを自動的に展開できます。 詳細については、「例6: KafkaProducer SDKを使用したログのアップロード」をご参照ください。

ヘッダー

KafkaProducer SDKまたはBeatsを使用してログを収集し、出力形式をJSONとして指定する場合、headersパラメーターを次の内容に設定して、JSONログを自動的に展開できます。

  headers:
    - key: "data-parse-format"
      value: "json"

詳細については、「例1: Beatsを使用したログのアップロード」をご参照ください。

ユーザー名

プロジェクトの名前。

パスワード

AccessKeyペア。 値は ${access-key-id }#${ access-key-secret} 形式で指定する必要があります。 ${access-key-id}${access-key-secret} をAccessKey IDとAccessKey secretに置き換えます。 Resource Access Management (RAM) ユーザーのAccessKeyペアを使用することを推奨します。 詳細については、「RAMユーザーを作成し、RAMユーザーにSimple Log Serviceへのアクセスを許可する」をご参照ください。

証明書ファイル

エンドポイントの証明書ファイル。 Simple Log Serviceの各エンドポイントには証明書があります。 このパラメーターをサーバーのルート証明書へのパスに設定します。 例: /etc/ssl/certs/ca-bundle.crt

説明

Kafkaコンシューマーグループを使用してSimple Log Serviceのデータをリアルタイムで消費する場合は、 ticket を使用してAlibaba Cloudテクニカルサポートに連絡します。

例1: Beatsを使用したログのアップロード

Metricbeat、Packetbeat、Winlogbeat、Auditbeat、Filebeat、HeartbeatなどのBeatsを使用してログを収集できます。 ログが収集されたら、Kafkaプロトコルを使用してログをSimple Log Serviceにアップロードできます。 詳細については、「Beats-Kafka-Output」をご参照ください。

  • 例 1:

    • 設定例

      output.kafka: 
        # initial brokers for reading cluster metadata 
        hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
        username: "yourusername" 
        password: "yourpassword" 
        ssl.certificate_authorities: 
        # message topic selection + partitioning 
        topic: 'test-logstore-1' 
        partition.round_robin: 
          reachable_only: false 
      
        required_acks: 1 
        compression: gzip 
        max_message_bytes: 1000000
    • サンプルログ

      デフォルトでは、BeatsはJSON形式のログを提供します。 ログはSimple Log Serviceにアップロードされ、コンテンツフィールドに保存されます。 contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。

      Beats系列软件

  • 例 2:

    • 設定例

      output.kafka:
        enabled: true
        hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"]
        username: "test-project-1"
        password: "access-key-id#access-key-secret"
        ssl.certificate_authorities:
        topic: 'test-logstore-1'
        headers:
          - key: "data-parse-format"
            value: "json"
        partition.hash:
          reachable_only: false
    • サンプルログ

      headersパラメーターを設定して、JSONログを自動的に展開できます。采集日志

例2: collectdを使用したログのアップロード

collectdは、システムとアプリケーションのパフォーマンスメトリックを定期的に収集するデーモンプロセスです。 Kafkaプロトコルを使用して、収集したメトリクスをSimple Log Serviceにアップロードできます。 詳細については、「Kafkaプラグインの書き込み」をご参照ください。

collectdによって収集されたログをSimple Log Serviceにアップロードする前に、collectd-write_kafkaプラグインと関連する依存関係をインストールする必要があります。 たとえば、CentOS Linuxサーバーでは、sudo yum install collectd-write_kafkaコマンドを実行してcollectd-write_kafkaプラグインをインストールできます。 RPM Package Manager (RPM) パッケージのインストール方法の詳細については、「Collectd-write_kafka」をご参照ください。

  • 設定例

    collectdは、JSON、Command、Graphiteなどのさまざまな形式をサポートしています。 この例では、JSON形式が使用されています。 詳細については、「collectd documentation」をご参照ください。

    <Plugin write_kafka>
      Property "metadata.broker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
      Property "security.protocol" "sasl_ssl" 
      Property "sasl.mechanism" "PLAIN" 
      Property "sasl.username" "yourusername" 
      Property "sasl.password" "yourpassword" 
      Property "broker.address.family" "v4"  
      <Topic "test-logstore-1">
        Format JSON 
        Key "content"  
      </Topic>
    </Plugin>
                        
  • サンプルログ

    JSON形式のログがSimple Log Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。

    Collectd

例3: Telegrafを使用したログのアップロード

TelegrafはGoプログラミング言語で記述されたエージェントで、メトリクスの収集、処理、集計に使用されます。 Telegrafは少量のメモリリソースしか消費しません。 詳細については、「Telegraf」をご参照ください。 Telegrafは、さまざまなプラグインと統合機能を提供します。 Telegrafを使用して、Telegrafが実行されているシステムから、またはサードパーティのAPIを呼び出すことによってメトリックを取得できます。 Telegrafを使用して、StatsDおよびKafkaコンシューマを使用してメトリックを監視することもできます。

Telegrafを使用して収集されたログをSimple Log Serviceにアップロードする前に、Telegrafの設定ファイルを変更する必要があります。

  • 設定例

    Telegrafは、JSON、Carbon2、Graphiteなどのさまざまな形式をサポートしています。 この例では、JSON形式が使用されています。 詳細については、「Telegrafの出力データ形式」をご参照ください。

    説明

    tls_caに有効なパスを指定する必要があります。 サーバー上のルート証明書へのパスを指定できます。 ほとんどの場合、Linuxサーバーのルート証明書へのパスは /etc/ssl/certs/ca-bundle.crtです。

    [[outputs.kafka]] 
      ## URLs of kafka brokers 
      brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
      ## Kafka topic for producer messages 
      topic = "test-logstore-1" 
      routing_key = "content" 
      ## CompressionCodec represents the various compression codecs recognized by 
      ## Kafka in messages. 
      ## 0 : No compression 
      ## 1 : Gzip compression 
      ## 2 : Snappy compression 
      ## 3 : LZ4 compression 
      compression_codec = 1 
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" 
      # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" 
      ## Use TLS but skip chain & host verification 
      # insecure_skip_verify = false 
      ## Optional SASL Config 
      sasl_username = "yourusername" 
      sasl_password = "yourpassword" 
      ## Data format to output. 
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md 
      data_format = "json"
  • サンプルログ

    JSON形式のログがSimple Log Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。

    Telegraf

例4: Fluentdを使用したログのアップロード

Fluentdはオープンソースのログコレクターです。 これは、Cloud Native Computing Foundation (CNCF) のプロジェクトです。 Fluentdのすべてのコンポーネントは、Apache 2ライセンスで利用できます。 詳細については、「Fluentd」をご参照ください。

Fluentdは、さまざまな入力、処理、および出力プラグインをサポートします。 Fluentdを使用してログを収集し、fluent-plugin-kafkaプラグインを使用して収集したログをSimple Log Serviceにアップロードできます。 プラグインのインストールと設定のみが必要です。 詳細については、「fluent-plugin-kafka」をご参照ください。

  • 設定例

    Fluentdは数十のフォーマットをサポートしています。 この例では、JSON形式が使用されています。 詳細については、「Fluentd Formatter」をご参照ください。

    <match **>
      @type kafka 
      # Brokers: you can choose either brokers or zookeeper. 
      brokers      test-project-1.cn-hangzhou.log.aliyuncs.com:10012 
      default_topic test-logstore-1 
      default_message_key content 
      output_data_type json 
      output_include_tag true 
      output_include_time true 
      sasl_over_ssl true 
      username yourusername  // The username. Replace yourusername with the actual value. 
      password "yourpassword"   // The password. Replace yourpassword with the actual value. 
      ssl_ca_certs_from_system true 
      # ruby-kafka producer options 
      max_send_retries 10000 
      required_acks 1 
      compression_codec gzip 
    </match>
  • サンプルログ

    JSON形式のログがSimple Log Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。Fluentd

例5: Logstashを使用したログのアップロード

Logstashは、リアルタイム処理機能を提供するオープンソースのログ収集エンジンです。 Logstashを使用して、さまざまなソースからログを動的に収集できます。 詳細は、「Logstash」をご参照ください。

Logstash は、組み込みの Kafka 出力プラグインを提供します。 Kafkaプロトコルを使用して、ログを収集し、収集したログをSimple Log ServiceにアップロードするようにLogstashを設定できます。 Simple Log Serviceは、データ送信中にSASLSSLプロトコルを使用します。 SSL証明書とJava Authentication and Authorization Service (JAAS) ファイルを設定する必要があります。

  • 設定例

    1. JAASファイルを作成し、ディレクトリに保存します。 例: /etc/kafka/kafka_client_jaas.conf

      JAASファイルに次のコンテンツを追加します。

      KafkaClient { 
        org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="yourusername" 
        password="yourpassword"; 
      };
    2. SSL証明書を設定し、証明書をディレクトリに保存します。 例: /etc/kafka/client-root.truststore.jks

      ルート証明書をダウンロードし、証明書をディレクトリに保存します。 例: /etc/kafka/root.pem keytoolコマンドを実行して、ファイルを生成します。jks形式。 初めてコマンドを実行してファイルを生成するときは、パスワードを設定する必要があります。

      keytool -keystore client-root.truststore.jks -alias root -import -file /etc/kafka/root.pem
    3. Logstashを設定します。

      Logstashは数十のフォーマットをサポートしています。 この例では、JSON形式が使用されています。詳細については、「Logstash Codec」をご参照ください。

      説明

      次の構成は、ネットワーク接続のテストに使用されます。 本番環境では、stdoutフィールドを削除することを推奨します。

      output { 
        stdout { codec => rubydebug } 
        kafka { 
          topic_id => "test-logstore-1" 
          bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
          security_protocol => "SASL_SSL" 
          ssl_truststore_location => "/etc/client-root.truststore.jks" 
          ssl_truststore_password => "123456" 
          jaas_path => "/etc/kafka_client_jaas.conf" 
          sasl_mechanism => "PLAIN" 
          codec => "json" 
          client_id => "kafka-logstash" 
        } 
      }
  • サンプルログ

    JSON形式のログがSimple Log Serviceにアップロードされ、コンテンツフィールドに保存された後、contentフィールドのJSONインデックスを作成できます。 詳細については、「JSONタイプ」をご参照ください。Logstash

例6: KafkaProducer SDKを使用したログのアップロード

  • 設定例

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // Configuration information. 
            Properties props = new Properties();
            String project = "etl-dev";
            String logstore = "testlog";
            // Set the following parameter to true if you want to automatically expand JSON logs:
            boolean parseJson = true;
            // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Simple Log Service is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. 
            // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can save your AccessKey ID and AccessKey secret in your configuration file if required. 
            // To prevent key leaks, we recommend that you do not save your AccessKey ID and AccessKey secret in the code.
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-huhehaote.log.aliyuncs.com"; // The endpoint varies based on the region of your Simple Log Service project.
            String port = "10012"; // For a public endpoint, set the port number to 10012. For an internal endpoint, set the port number to 10011.
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put("bootstrap.servers", hosts);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put("enable.idempotence", "false"); // The Kafka write interface of Simple Log Service does not support transactions.
    
            // Specify the serialization class for data keys and values. 
            props.put("key.serializer", StringSerializer.class);
            props.put("value.serializer", StringSerializer.class);
    
            // Create a producer instance. 
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            // Send records.
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                ProducerRecord record = new ProducerRecord<String, String>(topic, content);
                producer.send(record);
            }
            producer.close();
        }
    }
  • POM依存

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • サンプルログ

    image

例7: Fluent Bitを使用したログのアップロード

Fluent Bitは、軽量でスケーラブルなロギングおよびメトリクスプロセッサおよびフォワーダです。 Fluent Bitは、さまざまな入力、処理、および出力プラグインをサポートします。 Kafka出力プラグインを使用して、ログをSimple Log Serviceにアップロードできます。 詳細については、「Kafka output plugin」をご参照ください。

  • 設定例

    設定方法の詳細については、「設定方法」をご参照ください。

    [Output]
        Name    kafka
        Match    *
        Brokers   etl-shanghai.cn-shanghai.log.aliyuncs.com:10012
        Topics    etl-out
        Format    json
        rdkafka.sasl.username   yourusername 
        rdkafka.sasl.password   yourpassword  
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism     PLAIN
  • サンプルログ Fluent-bit

例8: Vectorを使用したログのアップロード

Vectorは、Kafkaプロトコルを使用してログをレポートできる、軽量で高性能なログ処理ツールです。 詳細については、「ベクター」をご参照ください。 次の例は、Kafkaプロトコルを使用してSimple Log Serviceにログを書き込むVectorの設定を示しています。

  • 設定例

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["file_logs"] # The source. In this example, a log file is monitored.
      bootstrap_servers = "etl-dev.cn-huhehaote.log.aliyuncs.com:10012"
      compression = "gzip"
      healthcheck = true
      topic = "dst-kafka.json" # dst-kafka is the name of the Logstore. The .json suffix indicates that JSON logs are automatically expanded.
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "etl-dev" # etl-dev is the name of the project.
      sasl.password = "{{The AccessKey ID and AccessKey secret of the RAM user in the {AK#SK} format.}}"
      tls.enabled = true
  • サンプルログ

    image.png

エラー

Kafkaプロトコルを使用してログをアップロードできない場合は、エラーが報告されます。 次の表にエラーを示します。 詳細については、「エラーリスト」をご参照ください。

エラー

説明

解決策

NetworkException

ネットワーク例外が発生しました。

3 秒待ってからやり直します。

TopicAuthorizationException

認証に失敗しました。

AccessKeyペアが無効な場合、またはAccessKeyペアに指定されたプロジェクトまたはLogstoreにデータを書き込む権限がない場合、認証は失敗します。 この場合、有効なAccessKeyペアを入力し、AccessKeyペアに必要な書き込み権限があることを確認します。

UnknownTopicOrPartitionException

次のいずれかのエラーが発生しました。

  • 指定されたプロジェクトまたはLogstoreは存在しません。

  • 指定されたプロジェクトが存在するリージョンは、指定されたエンドポイントのリージョンとは異なります。

指定されたプロジェクトまたはLogstoreが存在することを確認します。 指定されたプロジェクトまたはLogstoreが存在するが、エラーが続く場合は、指定されたプロジェクトが存在するリージョンが指定されたエンドポイントのリージョンと同じかどうかを確認します。

KafkaStorageException

サーバエラーが発生しました。

3 秒待ってからやり直します。