All Products
Search
Document Center

Simple Log Service:Use the Kafka protocol to upload logs

Last Updated:Jan 20, 2025

You can collect logs by using tools such as KafkaProducer SDK, Beats, collectd, Fluentd, Logstash, Telegraf, and Vector, and use the Kafka protocol to upload the collected logs to Simple Log Service. This topic describes how to use a log collection tool to collect logs and use the Kafka protocol to upload the collected logs to Simple Log Service.

Limits

  • The Kafka protocol version must be 2.1.0 or later.

  • You must use the SASL_SSL protocol to ensure the security of log transmission.

Permissions

You must use one of the following policies:

  • System policies for SLS

    This policy grants the management permissions on Simple Log Service resources. For more information, see Grant permissions to a RAM user and Grant permissions to a RAM role.

  • Custom policy

    1. Create a custom policy. On the JSON tab of the Create Policy page, replace the existing script in the code editor with the following policy document. For more information, see Create custom policies.

      Note

      Replace Project name in the policy document based on your business requirements.

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/Project name",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/Project name/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. Attach the created custom policy to your Resource Access Management (RAM) user. For more information, see Grant permissions to a RAM user.

Configuration method

When you use the Kafka protocol to upload logs to Simple Log Service, you must configure related parameters. The following table describes the parameters.

Parameter

Value

Description

Example

SLS_KAFKA_ENDPOINT

The address to which an initial connection is established. You can specify the endpoint of a Simple Log Service project in the Project name.Endpoint:Port format. For more information, see Endpoints.

  • Example of an internal endpoint: Project name.cn-hangzhou-intranet.log.aliyuncs.com:10011. The port number is 10011.

  • Example of a public endpoint: Project name.cn-hangzhou.log.aliyuncs.com:10012. The port number is 10012.

In the following examples, aliyun-project-test is the project name. cn-hangzhou-xxx.aliyuncs.com is the endpoint. For more information, see Endpoints. 10011 is the port number for the internal endpoint, and 10012 is the port number for the public endpoint.

  • Example of an internal endpoint: aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Example of a public endpoint: aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012.

SLS_PROJECT

The project name.

The name of the Simple Log Service project.

aliyun-project-test

SLS_LOGSTORE

The Logstore name.

The name of the Simple Log Service Logstore. If you append .json to a Logstore name to create a different name, the logs in the Logstore are parsed in JSON mode.

In this example, a Simple Log Service Logstore is named test-logstore

  • If you set this parameter to test-logstore, the collected logs are stored in the content field.

  • If you set this parameter to test-logstore.json, the collected logs are parsed in JSON mode before the logs are uploaded to Simple Log Service. The first-layer keys in JSON logs are field names and the related values are field values.

SLS_PASSWORD

The AccessKey pair that has the write permissions on Simple Log Service.

For more information about the definition of an AccessKey pair and how to create an AccessKey pair, see Create an AccessKey pair.

The value is the combination of an AccessKey ID and an AccessKey secret. The AccessKey ID and the AccessKey secret are combined by using the number sign (#).

  • An AccessKey ID is used to identify a user.

  • An AccessKey secret is the password used to verify that you have the AccessKey ID

LTaI5xxxxxxxxxxxxindexp2#CZO8XXXXXXXXXXpKSG

Note
  • AccessKey ID: LTaI5xxxxxxxxxxxxindexp2.

  • AccessKey secret: CZO8XXXXXXXXXXpKSG.

  • Number sign (#): combines the AccessKey ID and the AccessKey secret.

Note

If you want to use a Kafka consumer group to consume data in Simple Log Service in real time, submit a ticket to contact Alibaba Cloud technical support.

Example 1: Use Beats to upload logs

You can use Beats, such as Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat, to collect logs. After the logs are collected, you can use the Kafka protocol to upload the logs to Simple Log Service. For more information, see Beats-Kafka-Output.

  • Configuration example

    For more information about how to configure parameters prefixed with SLS_ in the example, see Configuration method.

    output.kafka:
      # initial brokers for reading cluster metadata
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # message topic selection + partitioning
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

Example 2: Use collectd to upload logs

collectd is a daemon process that periodically collects the metrics of systems and applications. You can use the Kafka protocol to upload the collected metrics to Simple Log Service. For more information about collectd, see collectd. For more information, see Write Kafka Plugin.

Before you upload the metrics that are collected by collectd to Simple Log Service, you must install the collectd-write_kafka plug-in and the related dependencies. For example, you can run the sudo yum install collectd-write_kafka command on a CentOS Linux server to install the collectd-write_kafka plug-in. For more information about how to install RPM Package Manager (RPM) packages, visit Collectd-write_kafka.

  • Configuration example

    • collectd supports various formats, including the JSON, Command, and Graphite formats. In this example, the JSON format is used. For more information, see collectd documentation.

    • For more information about how to configure parameters prefixed with SLS_ in the example, see Configuration method.

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

Example 3: Use Telegraf to upload logs

Telegraf is an agent written in the Go programming language and is used to collect, process, and aggregate metrics. Telegraf consumes only a small amount of memory resources. For more information, see Telegraf. Telegraf provides various plug-ins and integration capabilities. You can use Telegraf to retrieve metrics from the systems on which Telegraf runs or from third-party APIs. You can also use Telegraf to monitor metrics by using StatsD and Kafka consumers.

Before you upload the metrics that are collected by using Telegraf to Simple Log Service, you must modify the configuration file of Telegraf.

  • Configuration example

    • Telegraf supports various formats, including the JSON, Carbon2, and Graphite formats. In this example, the JSON format is used. For more information, see Output Data Formats of Telegraf.

      Note

      You must specify a valid path for tls_ca. You can specify the path to the root certificate on your server. In most cases, the path to the root certificate on a Linux server is /etc/ssl/certs/ca-bundle.crt.

    • For more information about how to configure parameters prefixed with SLS_ in the example, see Configuration method.

    # Kafka output plugin configuration
    [[outputs.kafka]]
      ## URLs of kafka brokers
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## Kafka topic for producer messages
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec represents the various compression codecs recognized by
      ## Kafka in messages.
      ## 0 : No compression
      ## 1 : Gzip compression
      ## 2 : Snappy compression
      ## 3 : LZ4 compression
      compression_codec = 1
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## Use TLS but skip chain & host verification
      # insecure_skip_verify = false
      ## Optional SASL Config
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## Data format to output.
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

Example 4: Use Fluentd to upload logs

Fluentd is an open source log collector. Fluentd is a project of Cloud Native Computing Foundation (CNCF). All components of Fluentd are available under the Apache 2 License. For more information, see Fluentd.

Fluentd supports various input, processing, and output plug-ins. You can collect logs by using Fluentd and upload the collected logs to Simple Log Service by using the fluent-plugin-kafka plug-in. You need to only install and configure the plug-in. For more information, see fluent-plugin-kafka.

  • Configuration example

    • Fluentd supports dozens of formats. In this example, the JSON format is used. For more information, see Fluentd Formatter.

    • For more information about how to configure parameters prefixed with SLS_ in the example, see Configuration method.

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # ruby-kafka producer options
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

Example 5: Use Logstash to upload logs

Logstash is an open source log collection engine that provides real-time processing capabilities. You can use Logstash to dynamically collect logs from different sources. For more information, see Logstash.

Logstash provides a built-in Kafka output plug-in. You can configure Logstash to collect logs and upload the collected logs to Simple Log Service by using the Kafka protocol. Simple Log Service uses the SASL_SSL protocol during data transmission. You must configure an SSL certificate and a Java Authentication and Authorization Service (JAAS) file.

  • Configuration example

    • Logstash supports dozens of formats. In this example, the JSON format is used For more information, see Logstash Codec.

      Note

      The following configuration is used to test network connectivity. In a production environment, we recommend that you remove the stdout field.

    • For more information about how to configure parameters prefixed with SLS_ in the example, see Configuration method.

    
    output {
      stdout { codec => rubydebug }
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

Example 6: Use Fluent Bit to upload logs

Fluent Bit is a lightweight and highly scalable logging and metrics processor and forwarder. Fluent Bit supports various input, processing, and output plug-ins. You can use the Kafka output plug-in to upload logs to Simple Log Service. For more information, see Kafka output plugin.

  • Configuration example

    For more information about how to configure parameters prefixed with SLS_ in the example, see Configuration method.

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

Example 7: Use Vector to upload logs

Vector is a lightweight and high-performance log processing tool that you can use to upload logs by using the Kafka protocol. For more information, see Vector. The following example describes the configuration for Vector to write logs to Simple Log Service by using the Kafka protocol.

  • Configuration example

    For more information about how to configure parameters prefixed with SLS_ in the example, see Configuration method.

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

Example 8: Use a KafkaProducer SDK to upload logs

  • Sample code

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // The configuration information. 
            Properties props = new Properties();
            String project = "etl-dev";
            String logstore = "testlog";
            // If you want to automatically expand JSON logs, set the boolean parseJson parameter to true.
            boolean parseJson = true;
            // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Simple Log Service is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. 
            // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can save your AccessKey ID and AccessKey secret in your configuration file based on your business requirements. 
            // To prevent key leaks, we recommend that you do not save your AccessKey ID and AccessKey secret in the code.
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-huhehaote.log.aliyuncs.com"; // The endpoint varies based on the region of your Simple Log Service project.
            String port = "10012"; // For public endpoints, set the port number to 10012. For internal endpoints, set the port number to 10011.
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put("bootstrap.servers", hosts);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put("enable.idempotence", "false"); // The Kafka write interface of Simple Log Service does not support transactions.
    
            // Specify the serialization class for data keys and values. 
            props.put("key.serializer", StringSerializer.class);
            props.put("value.serializer", StringSerializer.class);
    
            // Create a producer instance. 
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            // Send records.
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                ProducerRecord record = new ProducerRecord<String, String>(topic, content);
                producer.send(record);
            }
            producer.close();
        }
    }
  • POM dependency

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>

Errors

If a log fails to be uploaded by using the Kafka protocol, an error is reported. The following table describes the errors. For more information, see Error list.

Error

Description

Solution

NetworkException

A network exception occurred.

Wait for 1 second and try again.

TopicAuthorizationException

Authentication failed.

If your AccessKey pair is invalid or the AccessKey pair does not have the permissions to write data to the specified project or Logstore, the authentication fails. In this case, enter a valid AccessKey pair and make sure that the AccessKey pair has the required write permissions.

UnknownTopicOrPartitionException

One of the following errors occurred:

  • The specified project or Logstore does not exist.

  • The region where the specified project resides is different from the region of the specified endpoint.

Make sure that the specified project or Logstore exists. If the specified project or Logstore exists but the issue persists, check whether the region where the specified project resides is the same as the region of the specified endpoint.

KafkaStorageException

A server error occurred.

Wait for 1 second and try again.