Utilize tools such as Kafka Producer SDK, Beats, collectd, Fluentd, Logstash, Telegraf, and Vector to collect logs and upload them to Simple Log Service using the Kafka protocol. This topic outlines the steps to upload logs to Simple Log Service via the Kafka protocol after collection by a log collection tool.
Limits
The Kafka protocol must be version 2.1.0 or later.
For secure log transmission, the SASL_SSL protocol is required.
Permission rules
One of the following permission rules must be met:
This policy grants permissions to manage Simple Log Service (Log). For details on authorization methods, see grant permissions to a RAM user and grant permissions to a RAM role.
Custom permission policy
Create a custom permission policy. In the Script Editor, replace the existing content with the following script. For details, see create a custom permission policy.
NoteReplace
Project name
with your actual project name.{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/project name", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/project name/logstore/*", "Effect": "Allow" } ] }
Add the custom permission policy to the RAM user. For details, see grant permissions to a RAM user.
Configuration method
Configure the following parameters to use the Kafka protocol for uploading logs:
### Proofread English Translation
Configuration name | Configuration value | Description | Example |
SLS_KAFKA_ENDPOINT | The initial connection's cluster endpoint. Format: |
| aliyun-project-test is the project name,
|
SLS_PROJECT | Project name | The Simple Log Service project name. | aliyun-project-test |
SLS_LOGSTORE | Logstore name | The Simple Log Service logstore name. A suffix of | For instance, the logstore name is
|
SLS_PASSWORD | The AccessKey secret with write permissions to Simple Log Service. | For information on AccessKey concepts and creation steps, see create an AccessKey. The value combines the AccessKey ID and AliyunKey Secret, separated by a
| LTaI5xxxxxxxxxxxxindexp2#CZO8XXXXXXXXXXpKSG Note
|
If you want to consume data from Simple Log Service in real-time through a Kafka consumer group, submit a ticket to consult Alibaba Cloud technical support engineers.
Example 1: Use Beats to upload logs
Logs collected by Beats, including Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat, can be uploaded to Simple Log Service using the Kafka protocol. For more information, see Beats-Kafka-Output.
Sample configuration
For parameters starting with
SLS_
in the example, refer to the configuration method.output.kafka: # initial brokers for reading cluster metadata hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # message topic selection + partitioning topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Example 2: Use collectd to upload logs
Collectd is a daemon that periodically collects system and application performance metrics. It can upload these metrics to Simple Log Service using the Kafka protocol. For more information, see the Write Kafka Plugin documentation.
Before uploading logs collected by collectd to Simple Log Service, you must first install the Kafka plug-in along with its dependencies. On Linux CentOS, for instance, you can install the Kafka plug-in using the yum command: sudo yum install collectd-write_kafka
. For details on installing RPM packages, see Collectd-write_kafka.
Sample configuration
In this example, the log output is configured in JSON format. Other supported formats include Command and Graphite. For more information, see the Collectd configuration document.
For parameters starting with
SLS_
in the example, refer to the configuration method.
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin>
Example 3: Use Telegraf to upload logs
Telegraf is an agent program developed in Go, known for its minimal memory usage. It is designed to collect, process, and aggregate data metrics, offering a wealth of plugins and integration options. Telegraf can retrieve metrics from the host system, third-party APIs, and listen for metrics via statsd and Kafka consumer services.
To upload logs collected by Telegraf to Simple Log Service using the Kafka protocol, you must first adjust the configuration file.
Sample Configuration
In this example, the log output is configured in JSON format. Telegraf also supports Graphite and Carbon2 formats. For more information, see Telegraf output formats.
NoteTelegraf requires a valid tls_ca path for configuration. You may use the server's default root certificate path. Typically, in Linux systems, the root certificate's CA path is located at /etc/ssl/certs/ca-bundle.crt.
For details on parameters prefixed with
SLS_
in the example, refer to the configuration method.
# Kafka output plugin configuration [[outputs.kafka]] ## URLs of kafka brokers brokers = ["SLS_KAFKA_ENDPOINT"] ## Kafka topic for producer messages topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
Example 4: Use Fluentd to upload logs
Fluentd is an open-source data collector and a project of the Cloud Native Computing Foundation (CNCF), licensed under the Apache 2.0 License.
Fluentd offers a variety of input, processing, and output plugins. It enables the collection of logs which can then be uploaded to Simple Log Service using the Kafka plugin. The only requirements are to install and configure the Kafka plugin. For more information, see fluent-plugin-kafka.
Sample Configuration
In this example, the log output is configured in JSON format. Fluentd is compatible with numerous formats. For more information, see Fluentd Formatter.
For details on parameters prefixed with
SLS_
in the example, refer to the configuration method.
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match>
Example 5: Use Logstash to upload logs
Logstash is an open-source log collection engine that processes data in real-time. It is capable of dynamically collecting logs from various sources.
Logstash includes a built-in Kafka output plug-in, allowing you to configure it to gather logs and send them to Simple Log Service using the Kafka protocol. Simple Log Service employs the SASL_SSL protocol for secure data transmission, necessitating the configuration of an SSL certificate and a JAAS file.
Sample Configuration
In this example, the log output is configured in JSON format. Logstash supports a wide array of formats. For more information, see Logstash Codec.
NoteThis configuration is intended for testing network connectivity. For production environments, it is recommended to remove the stdout field.
For details on parameters prefixed with
SLS_
in the example, refer to the configuration method.
output { stdout { codec => rubydebug } kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
Example 6: Use Fluent Bit to upload logs
Fluent Bit is a lightweight, scalable processor and forwarder for logging and metrics. It supports a variety of input, processing, and output plugins, including the Kafka output plugin for sending logs to Simple Log Service. For more information, see Kafka output plugin.
Sample Configuration
For details on parameters prefixed with
SLS_
in the example, refer to configuration method.[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
Example 7: Vector configuration for Kafka protocol upload
Vector is a lightweight, high-performance log processor that supports log transmission via the Kafka protocol. The following configuration example enables Vector to send logs to Simple Log Service using the Kafka protocol.
Sample Configuration
For details on the parameters prefixed with
SLS_
in the example, refer to the configuration method.[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
Example 8: Use Kafka producer to upload logs
Sample Code
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // Configuration information. Properties props = new Properties(); String project = "etl-dev"; String logstore = "testlog"; // Set the following parameter to true if you want to automatically expand JSON logs. boolean parseJson = true; // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Log Service is a high-risk operation. To avoid security risks, we recommend that you use a RAM user to call API operations or perform routine O&M. You can log on to the RAM console to create a RAM user. // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can save your AccessKey ID and AccessKey secret to your configuration file based on your business requirements. // We recommend that you do not hard-code the AccessKey ID and AccessKey secret in your code. Otherwise, the AccessKey pair may be leaked. String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-huhehaote.log.aliyuncs.com"; // The endpoint varies based on the region of your Simple Log Service project. String port = "10012"; // For a public endpoint, set the port number to 10012. For an internal endpoint, set the port number to 10011. String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put("bootstrap.servers", hosts); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put("enable.idempotence", "false"); // The Kafka write interface of Simple Log Service does not support transactions. // Specify the serialization class for data keys and values. props.put("key.serializer", StringSerializer.class); props.put("value.serializer", StringSerializer.class); // Create a producer instance. KafkaProducer<String,String> producer = new KafkaProducer<>(props); // Send records. for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; ProducerRecord record = new ProducerRecord<String, String>(topic, content); producer.send(record); } producer.close(); } }
POM Dependency
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>
Error messages
When log uploads via the Kafka protocol fail, a corresponding Kafka error message is returned. For more information about Kafka protocol error messages, see error list.
Error Message | Description | Recommended Solution |
NetworkException | This error message is returned when a network error occurs. | Generally, wait for one second and retry. |
TopicAuthorizationException | This error message is returned when authentication fails. | Generally, the AccessKey you provided is incorrect or does not have the permission to write to the corresponding project or Logstore. Please enter a correct AccessKey with write permission. |
UnknownTopicOrPartitionException | This error may occur for two reasons:
| Ensure that the corresponding project and Logstore have been created. If they have been created and the error still occurs, check whether the project's region matches the specified endpoint. |
KafkaStorageException | This error message is returned when an exception occurs on the server side. | Generally, wait for one second and retry. |