全部產品
Search
文件中心

Simple Log Service:使用Kafka協議上傳日誌

更新時間:Sep 27, 2024

您可以使用Kafka Producer SDK、Beats系列軟體、Collectd、Fluentd、Logstash、Telegraf、Vector等採集工具採集日誌,並通過Kafka協議上傳到Log Service。本文介紹通過採集工具採集到日誌後,利用Kafka協議上傳日誌到Log Service的操作步驟。

相關限制

  • 支援的Kafka協議版本最低為2.1.0。

  • 為保證日誌傳輸安全性,必須使用SASL_SSL連線協定。

資料解析說明

使用Kafka協議上傳的日誌,其日誌內容儲存在content欄位中。如果日誌為JSON類型,您可以為content欄位設定JSON類型的索引。更多資訊,請參見JSON類型

現在,通過Kafka生產者(produce)或Beats系列軟體上傳日誌時,您可以在採集配置中設定topicheaders,實現JSON格式的日誌自動展示。即Log Service會自動延伸content欄位,您無需為content欄位配置JSON類型的索引。具體說明,請參見配置方式

配置方式

使用Kafka協議上傳日誌時,您需要配置以下參數。

說明

各個工具的配置參數名稱略有不同,請根據實際參數配置。

參數

說明

連線類型

為保證日誌傳輸安全性,連線協定必須為SASL_SSL。

hosts

初始串連的叢集地址,格式為project名稱.Endpoint,請根據Project所在的Endpoint進行配置。更多資訊,請參見服務存取點

  • 阿里雲內網:連接埠號碼為10011,例如test-project-1.cn-hangzhou-intranet.log.aliyuncs.com:10011。

  • 公網:連接埠號碼為10012,例如test-project-1.cn-hangzhou.log.aliyuncs.com:10012。

topic

配置為Log ServiceLogstore名稱。

使用Kafka生產者(produce)或Beats系列軟體上傳日誌且指定輸出格式為JSON時,您可以將topic的值設定為Logstore名稱.json格式,實現JSON日誌自動延伸。更多資訊,請參見樣本六:通過Kafka生產者(produce)上傳日誌

headers

使用Kafka生產者(produce)或Beats系列軟體上傳日誌且指定輸出格式為JSON時,您可以將headers配置為如下內容後,實現JSON日誌自動延伸。

  headers:
    - key: "data-parse-format"
      value: "json"

更多資訊,請參見樣本一:通過Beats系列軟體上傳日誌

username

配置為Log ServiceProject名稱。

password

配置為阿里雲AK,格式為${access-key-id}#${access-key-secret}。請根據實際情況,將${access-key-id}替換為您的AccessKey ID,將${access-key-secret}替換為您的AccessKey Secret。建議使用RAM使用者的AK。更多資訊,請參見授權

認證檔案

Log Service的網域名稱均具備可信任認證,您只需使用伺服器內建的根憑證即可,例如:/etc/ssl/certs/ca-bundle.crt

說明

如果您要通過Kafka消費組即時消費Log Service的資料,請提交工單諮詢阿里雲支援人員工程師。

樣本一:通過Beats系列軟體上傳日誌

Beats系列軟體(MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat等)採集到日誌後,支援通過Kafka協議將日誌上傳到Log Service。更多資訊,請參見Beats-Kafka-Output

  • 樣本1

    • 配置樣本

      output.kafka: 
        # initial brokers for reading cluster metadata 
        hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
        username: "yourusername" 
        password: "yourpassword" 
        ssl.certificate_authorities: 
        # message topic selection + partitioning 
        topic: 'test-logstore-1' 
        partition.round_robin: 
          reachable_only: false 
      
        required_acks: 1 
        compression: gzip 
        max_message_bytes: 1000000
    • 日誌範例

      Beats系列軟體預設輸出的日誌為JSON類型,您可以為content欄位建立JSON類型的索引。更多資訊,請參見JSON類型

      Beats系列軟體

  • 樣本2

    • 配置樣本

      output.kafka:
        enabled: true
        hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"]
        username: "test-project-1"
        password: "access-key-id#access-key-secret"
        ssl.certificate_authorities:
        topic: 'test-logstore-1'
        headers:
          - key: "data-parse-format"
            value: "json"
        partition.hash:
          reachable_only: false
    • 日誌範例

      通過headers配置,實現JSON日誌自動延伸。採集日誌

樣本二:通過Collectd上傳日誌

Collectd是一個守護(daemon)進程,用於定期採集系統和應用程式的效能指標,並支援通過Kafka協議上傳到Log Service。更多資訊,請參見Write Kafka Plugin

將Collectd採集到日誌上傳到Log Service時,還需安裝Kafka外掛程式以及相關依賴。例如:在linux Centos中,可以使用yum安裝Kafka外掛程式,命令為sudo yum install collectd-write_kafka,安裝RPM請參見Collectd-write_kafka

  • 配置樣本

    樣本中將日誌輸出格式(Format)設定為JSON,除此之外,還支援Command、Graphite類型。更多資訊,請參見Collectd配置文檔

    <Plugin write_kafka>
      Property "metadata.broker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
      Property "security.protocol" "sasl_ssl" 
      Property "sasl.mechanism" "PLAIN" 
      Property "sasl.username" "yourusername" 
      Property "sasl.password" "yourpassword" 
      Property "broker.address.family" "v4"  
      <Topic "test-logstore-1">
        Format JSON 
        Key "content"  
      </Topic>
    </Plugin>
                        
  • 日誌範例

    使用JSON模式輸出日誌後,您可以給content欄位建立JSON類型的索引。更多資訊,請參見JSON類型

    Collectd

樣本三:使用Telegraf上傳日誌

Telegraf是由Go語言編寫的代理程式,記憶體佔用小,用於收集、處理、摘要資料指標。Telegraf具有豐富的外掛程式及具備整合功能,可從其啟動並執行系統中擷取各種指標、從第三方API中擷取指標以及通過statsd和Kafka消費者服務監聽指標。

將Telegraf採集到的日誌通過Kafka協議上傳到Log Service前,您需要先修改設定檔。

  • 配置樣本

    樣本中將日誌輸出格式(Format)設定為JSON,除此之外還支援Graphite、Carbon2等類型。更多資訊,請參見Telegraf輸出格式

    說明

    Telegraf必須配置一個合法的tls_ca路徑,使用伺服器內建的根憑證的路徑即可。Linux環境中,根憑證CA路徑一般為/etc/ssl/certs/ca-bundle.crt

    [[outputs.kafka]] 
      ## URLs of kafka brokers 
      brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] 
      ## Kafka topic for producer messages 
      topic = "test-logstore-1" 
      routing_key = "content" 
      ## CompressionCodec represents the various compression codecs recognized by 
      ## Kafka in messages. 
      ## 0 : No compression 
      ## 1 : Gzip compression 
      ## 2 : Snappy compression 
      ## 3 : LZ4 compression 
      compression_codec = 1 
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" 
      # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" 
      ## Use TLS but skip chain & host verification 
      # insecure_skip_verify = false 
      ## Optional SASL Config 
      sasl_username = "yourusername" 
      sasl_password = "yourpassword" 
      ## Data format to output. 
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md 
      data_format = "json"
  • 日誌範例

    使用JSON模式輸出日誌後,您可以給content欄位建立JSON類型的索引。更多資訊,請參見JSON類型

    Telegraf

樣本四:使用Fluentd上傳日誌

Fluentd是一個開源的記錄收集器,是雲端原生計算基金會(CNCF)的成員專案之一,遵循Apache 2 License協議。

Fluentd支援眾多輸入、處理、輸出外掛程式,支援通過Kafka外掛程式將日誌上傳到Log Service,您只需安裝並配置Kafka外掛程式即可。更多資訊,請參見fluent-plugin-kafka

  • 配置樣本

    樣本中將日誌輸出格式(Format)設定為JSON,除此之外還支援數十種Format類型。更多資訊,請參見Fluentd Formatter

    <match **>
      @type kafka 
      # Brokers: you can choose either brokers or zookeeper. 
      brokers      test-project-1.cn-hangzhou.log.aliyuncs.com:10012 
      default_topic test-logstore-1 
      default_message_key content 
      output_data_type json 
      output_include_tag true 
      output_include_time true 
      sasl_over_ssl true 
      username yourusername  //使用者名稱,請根據真實值進行替換。
      password "yourpassword"   //使用者密碼,請根據真實值進行替換。
      ssl_ca_certs_from_system true 
      # ruby-kafka producer options 
      max_send_retries 10000 
      required_acks 1 
      compression_codec gzip 
    </match>
  • 日誌範例

    使用JSON模式輸出日誌後,您可以給content欄位建立JSON類型的索引。更多資訊,請參見JSON類型Fluentd

樣本五:使用Logstash上傳日誌

Logstash是一個具備即時處理能力、開源的日誌採集引擎,可以動態採集不同來源的日誌。

Logstash內建Kafka輸出外掛程式,您可以配置Logstash實現日誌通過kafka協議上傳到Log Service。由於Log Service使用SASL_SSL連線協定,因此還需要配置SSL認證和jaas檔案。

  • 配置樣本

    1. 建立jaas檔案,並儲存到任意路徑(例如/etc/kafka/kafka_client_jaas.conf)。

      將如下內容添加到jaas檔案中。

      KafkaClient { 
        org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="yourusername" 
        password="yourpassword"; 
      };
    2. 配置SSL信任認證,儲存到任意路徑(例如:/etc/kafka/client-root.truststore.jks)。

      下載根憑證,儲存到任意路徑(例如:/etc/kafka/root.pem),然後通過keytool命令產生.jks格式的檔案(首次產生時,需要配置密碼)。

      keytool -keystore client-root.truststore.jks -alias root -import -file /etc/kafka/root.pem
    3. 配置Logstash。

      樣本中將日誌輸出格式(Format)設定為JSON,除此之外還支援數十種Format類型。更多資訊,請參見Logstash Codec

      說明

      本樣本為連通性測試的配置,您的生產環境中建議刪除stdout的輸出配置。

      output { 
        stdout { codec => rubydebug } 
        kafka { 
          topic_id => "test-logstore-1" 
          bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" 
          security_protocol => "SASL_SSL" 
          ssl_truststore_location => "/etc/client-root.truststore.jks" 
          ssl_truststore_password => "123456" 
          jaas_path => "/etc/kafka_client_jaas.conf" 
          sasl_mechanism => "PLAIN" 
          codec => "json" 
          client_id => "kafka-logstash" 
        } 
      }
  • 日誌範例

    使用JSON模式輸出日誌後,您可以給content欄位建立JSON類型的索引。更多資訊,請參見JSON類型Logstash

樣本六:通過Kafka生產者(produce)上傳日誌

  • 程式碼範例

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            //配置資訊。
            Properties props = new Properties();
            String project = "etl-dev";
            String logstore = "testlog";
            // 如果希望produce的內容被json解析展開,則設定為true
            boolean parseJson = true;
            // 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者。
            // 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您可以根據業務需要,儲存到設定檔裡。
            // 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-huhehaote.log.aliyuncs.com"; // 根據實際project所在的endpoint配置
            String port = "10012"; // 公網用10012,私網用10011
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put("bootstrap.servers", hosts);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put("enable.idempotence", "false"); // SLS的Kafka寫入介面不支援事務
    
            //設定資料key和value的序列化處理類。
            props.put("key.serializer", StringSerializer.class);
            props.put("value.serializer", StringSerializer.class);
    
            //建立生產者執行個體。
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            //發送記錄
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                ProducerRecord record = new ProducerRecord<String, String>(topic, content);
                producer.send(record);
            }
            producer.close();
        }
    }
  • pom依賴

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>
  • 日誌範例

    image

樣本七:通過Fluent-bit上傳日誌

Fluent-bit是一個輕量級、高可擴充的日誌與指標的處理器、轉寄站,支援眾多輸入、處理和輸出外掛程式,支援通過Kafka外掛程式將日誌上傳到Log Service。更多資訊,請參見Kafka output plugin

  • 配置樣本

    相關的配置資訊,請參見配置方式

    [Output]
        Name    kafka
        Match    *
        Brokers   etl-shanghai.cn-shanghai.log.aliyuncs.com:10012
        Topics    etl-out
        Format    json
        rdkafka.sasl.username   yourusername 
        rdkafka.sasl.password   yourpassword  
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism     PLAIN
  • 日誌範例Fluent-bit

樣本八 :Vector配置Kafka協議上傳

Vector是一款輕量級、高效能的Tlog軟體,它支援Kafka協議的方式上報日誌。下面是Vector通過Kafka相容模式寫入SLS的配置方法。

  • 配置樣本

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["file_logs"] # 定義source,這裡是監控一個記錄檔
      bootstrap_servers = "etl-dev.cn-huhehaote.log.aliyuncs.com:10012"
      compression = "gzip"
      healthcheck = true
      topic = "dst-kafka.json" # dst-kafka是logstore名字,加.json尾碼是表示Kafka相容寫入的時候嘗試json展開
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "etl-dev" # etl-dev是sls project名稱
      sasl.password = "{{RAM使用者的AKSK,格式{AK#SK}}}"
      tls.enabled = true
  • 日誌範例

    image.png

錯誤資訊

使用Kafka協議上傳日誌失敗時,會按照Kafka的錯誤資訊返回對應的錯誤資訊,如下表所示,Kafka協議錯誤資訊詳情,請參見error list

錯誤資訊

說明

推薦解決方式

NetworkException

出現網路錯誤時返回該錯誤資訊。

一般等待1秒後重試即可。

TopicAuthorizationException

鑒權失敗時返回該錯誤資訊。

一般是您提供的AccessKey錯誤或沒有寫入對應Project、Logstore的許可權。請填寫正確的且具備寫入許可權的AccessKey。

UnknownTopicOrPartitionException

出現該錯誤可能有兩種原因:

  • 不存在對應的Project或Logstore。

  • Project所在地區與填入的Endpoint不一致。

請確保已建立對應的Project和Logstore。如果已建立還是提示該錯誤,請檢查Project所在地區是否和填入的Endpoint一致。

KafkaStorageException

服務端出現異常時返回該錯誤資訊。

一般等待1秒後重試即可。