All Products
Search
Document Center

Simple Log Service:Use the Kafka protocol to upload logs

Last Updated:Dec 25, 2024

Utilize tools such as Kafka Producer SDK, Beats, collectd, Fluentd, Logstash, Telegraf, and Vector to collect logs and upload them to Simple Log Service using the Kafka protocol. This topic outlines the steps to upload logs to Simple Log Service via the Kafka protocol after collection by a log collection tool.

Limits

  • The Kafka protocol must be version 2.1.0 or later.

  • For secure log transmission, the SASL_SSL protocol is required.

Permission rules

One of the following permission rules must be met:

  • System policies for SLS

    This policy grants permissions to manage Simple Log Service (Log). For details on authorization methods, see grant permissions to a RAM user and grant permissions to a RAM role.

  • Custom permission policy

    1. Create a custom permission policy. In the Script Editor, replace the existing content with the following script. For details, see create a custom permission policy.

      Note

      Replace Project name with your actual project name.

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/project name",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/project name/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. Add the custom permission policy to the RAM user. For details, see grant permissions to a RAM user.

Configuration method

Configure the following parameters to use the Kafka protocol for uploading logs:

### Proofread English Translation

Configuration name

Configuration value

Description

Example

SLS_KAFKA_ENDPOINT

The initial connection's cluster endpoint. Format: Project name.Endpoint:Port. Configure according to the project's endpoint. For more information, see endpoint.

  • Private network: The port number is 10011. Example: Project name.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Public network: The port number is 10012. Example: Project name.cn-hangzhou.log.aliyuncs.com:10012.

aliyun-project-test is the project name, cn-hangzhou-xxx.aliyuncs.com is the endpoint, and 10011 and 10012 are the port numbers for the private and public networks, respectively.

  • Private network: aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011.

  • Public network: aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012.

SLS_PROJECT

Project name

The Simple Log Service project name.

aliyun-project-test

SLS_LOGSTORE

Logstore name

The Simple Log Service logstore name. A suffix of .json indicates an attempt to parse JSON.

For instance, the logstore name is test-logstore.

  • The configuration value is test-logstore, and the reported log content is stored in the content field.

  • The configuration value is test-logstore.json, and the reported log content is extracted as JSON. The first-level key in the JSON data reported by the user is extracted as the field name, and the corresponding value is extracted as the field value.

SLS_PASSWORD

The AccessKey secret with write permissions to Simple Log Service.

For information on AccessKey concepts and creation steps, see create an AccessKey.

The value combines the AccessKey ID and AliyunKey Secret, separated by a # symbol.

  • AccessKey ID: Identifies the user.

  • AccessKey Secret: A password to authenticate the identity of the AccessKey ID owner.

LTaI5xxxxxxxxxxxxindexp2#CZO8XXXXXXXXXXpKSG

Note
  • AccessKey ID: LTaI5xxxxxxxxxxxxindexp2.

  • AliyunKey Secret: CZO8XXXXXXXXXXpKSG

  • #: Concatenates the AccessKey ID and AliyunKey Secret.

Note

If you want to consume data from Simple Log Service in real-time through a Kafka consumer group, submit a ticket to consult Alibaba Cloud technical support engineers.

Example 1: Use Beats to upload logs

Logs collected by Beats, including Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat, can be uploaded to Simple Log Service using the Kafka protocol. For more information, see Beats-Kafka-Output.

  • Sample configuration

    For parameters starting with SLS_ in the example, refer to the configuration method.

    output.kafka:
      # initial brokers for reading cluster metadata
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # message topic selection + partitioning
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

Example 2: Use collectd to upload logs

Collectd is a daemon that periodically collects system and application performance metrics. It can upload these metrics to Simple Log Service using the Kafka protocol. For more information, see the Write Kafka Plugin documentation.

Before uploading logs collected by collectd to Simple Log Service, you must first install the Kafka plug-in along with its dependencies. On Linux CentOS, for instance, you can install the Kafka plug-in using the yum command: sudo yum install collectd-write_kafka. For details on installing RPM packages, see Collectd-write_kafka.

  • Sample configuration

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

Example 3: Use Telegraf to upload logs

Telegraf is an agent program developed in Go, known for its minimal memory usage. It is designed to collect, process, and aggregate data metrics, offering a wealth of plugins and integration options. Telegraf can retrieve metrics from the host system, third-party APIs, and listen for metrics via statsd and Kafka consumer services.

To upload logs collected by Telegraf to Simple Log Service using the Kafka protocol, you must first adjust the configuration file.

  • Sample Configuration

    • In this example, the log output is configured in JSON format. Telegraf also supports Graphite and Carbon2 formats. For more information, see Telegraf output formats.

      Note

      Telegraf requires a valid tls_ca path for configuration. You may use the server's default root certificate path. Typically, in Linux systems, the root certificate's CA path is located at /etc/ssl/certs/ca-bundle.crt.

    • For details on parameters prefixed with SLS_ in the example, refer to the configuration method.

    # Kafka output plugin configuration
    [[outputs.kafka]]
      ## URLs of kafka brokers
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## Kafka topic for producer messages
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec represents the various compression codecs recognized by
      ## Kafka in messages.
      ## 0 : No compression
      ## 1 : Gzip compression
      ## 2 : Snappy compression
      ## 3 : LZ4 compression
      compression_codec = 1
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## Use TLS but skip chain & host verification
      # insecure_skip_verify = false
      ## Optional SASL Config
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## Data format to output.
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

Example 4: Use Fluentd to upload logs

Fluentd is an open-source data collector and a project of the Cloud Native Computing Foundation (CNCF), licensed under the Apache 2.0 License.

Fluentd offers a variety of input, processing, and output plugins. It enables the collection of logs which can then be uploaded to Simple Log Service using the Kafka plugin. The only requirements are to install and configure the Kafka plugin. For more information, see fluent-plugin-kafka.

  • Sample Configuration

    • In this example, the log output is configured in JSON format. Fluentd is compatible with numerous formats. For more information, see Fluentd Formatter.

    • For details on parameters prefixed with SLS_ in the example, refer to the configuration method.

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # ruby-kafka producer options
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

Example 5: Use Logstash to upload logs

Logstash is an open-source log collection engine that processes data in real-time. It is capable of dynamically collecting logs from various sources.

Logstash includes a built-in Kafka output plug-in, allowing you to configure it to gather logs and send them to Simple Log Service using the Kafka protocol. Simple Log Service employs the SASL_SSL protocol for secure data transmission, necessitating the configuration of an SSL certificate and a JAAS file.

  • Sample Configuration

    • In this example, the log output is configured in JSON format. Logstash supports a wide array of formats. For more information, see Logstash Codec.

      Note

      This configuration is intended for testing network connectivity. For production environments, it is recommended to remove the stdout field.

    • For details on parameters prefixed with SLS_ in the example, refer to the configuration method.

    
    output {
      stdout { codec => rubydebug }
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

Example 6: Use Fluent Bit to upload logs

Fluent Bit is a lightweight, scalable processor and forwarder for logging and metrics. It supports a variety of input, processing, and output plugins, including the Kafka output plugin for sending logs to Simple Log Service. For more information, see Kafka output plugin.

  • Sample Configuration

    For details on parameters prefixed with SLS_ in the example, refer to configuration method.

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

Example 7: Vector configuration for Kafka protocol upload

Vector is a lightweight, high-performance log processor that supports log transmission via the Kafka protocol. The following configuration example enables Vector to send logs to Simple Log Service using the Kafka protocol.

  • Sample Configuration

    For details on the parameters prefixed with SLS_ in the example, refer to the configuration method.

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

Example 8: Use Kafka producer to upload logs

  • Sample Code

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            // Configuration information.
            Properties props = new Properties();
            String project = "etl-dev";
            String logstore = "testlog";
            // Set the following parameter to true if you want to automatically expand JSON logs.
            boolean parseJson = true;
            // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Log Service is a high-risk operation. To avoid security risks, we recommend that you use a RAM user to call API operations or perform routine O&M. You can log on to the RAM console to create a RAM user.
            // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can save your AccessKey ID and AccessKey secret to your configuration file based on your business requirements.
            // We recommend that you do not hard-code the AccessKey ID and AccessKey secret in your code. Otherwise, the AccessKey pair may be leaked.
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-huhehaote.log.aliyuncs.com"; // The endpoint varies based on the region of your Simple Log Service project.
            String port = "10012"; // For a public endpoint, set the port number to 10012. For an internal endpoint, set the port number to 10011.
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put("bootstrap.servers", hosts);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put("enable.idempotence", "false"); // The Kafka write interface of Simple Log Service does not support transactions.
    
            // Specify the serialization class for data keys and values.
            props.put("key.serializer", StringSerializer.class);
            props.put("value.serializer", StringSerializer.class);
    
            // Create a producer instance.
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            // Send records.
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                ProducerRecord record = new ProducerRecord<String, String>(topic, content);
                producer.send(record);
            }
            producer.close();
        }
    }
  • POM Dependency

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>

Error messages

When log uploads via the Kafka protocol fail, a corresponding Kafka error message is returned. For more information about Kafka protocol error messages, see error list.

Error Message

Description

Recommended Solution

NetworkException

This error message is returned when a network error occurs.

Generally, wait for one second and retry.

TopicAuthorizationException

This error message is returned when authentication fails.

Generally, the AccessKey you provided is incorrect or does not have the permission to write to the corresponding project or Logstore. Please enter a correct AccessKey with write permission.

UnknownTopicOrPartitionException

This error may occur for two reasons:

  • The corresponding project or Logstore does not exist.

  • The region of the project does not match the specified endpoint.

Ensure that the corresponding project and Logstore have been created. If they have been created and the error still occurs, check whether the project's region matches the specified endpoint.

KafkaStorageException

This error message is returned when an exception occurs on the server side.

Generally, wait for one second and retry.