You can collect logs by using tools such as KafkaProducer SDK, Beats, collectd, Fluentd, Logstash, Telegraf, and Vector, and use the Kafka protocol to upload the collected logs to Simple Log Service. This topic describes how to use a log collection tool to collect logs and use the Kafka protocol to upload the collected logs to Simple Log Service.
Limits
The Kafka protocol version must be 2.1.0 or later.
You must use the SASL_SSL protocol to ensure the security of log transmission.
Permissions
You must use one of the following policies:
This policy grants the management permissions on Simple Log Service resources. For more information, see Grant permissions to a RAM user and Grant permissions to a RAM role.
Custom policy
Create a custom policy. On the JSON tab of the Create Policy page, replace the existing script in the code editor with the following policy document. For more information, see Create custom policies.
NoteReplace
Project name
in the policy document based on your business requirements.{ "Version": "1", "Statement": [ { "Action": "log:GetProject", "Resource": "acs:log:*:*:project/Project name", "Effect": "Allow" }, { "Action": [ "log:GetLogStore", "log:ListShards", "log:PostLogStoreLogs" ], "Resource": "acs:log:*:*:project/Project name/logstore/*", "Effect": "Allow" } ] }
Attach the created custom policy to your Resource Access Management (RAM) user. For more information, see Grant permissions to a RAM user.
Configuration method
When you use the Kafka protocol to upload logs to Simple Log Service, you must configure related parameters. The following table describes the parameters.
Parameter | Value | Description | Example |
SLS_KAFKA_ENDPOINT | The address to which an initial connection is established. You can specify the endpoint of a Simple Log Service project in the |
| In the following examples, aliyun-project-test is the project name.
|
SLS_PROJECT | The project name. | The name of the Simple Log Service project. | aliyun-project-test |
SLS_LOGSTORE | The Logstore name. | The name of the Simple Log Service Logstore. If you append | In this example, a Simple Log Service Logstore is named
|
SLS_PASSWORD | The AccessKey pair that has the write permissions on Simple Log Service. | For more information about the definition of an AccessKey pair and how to create an AccessKey pair, see Create an AccessKey pair. The value is the combination of an AccessKey ID and an AccessKey secret. The AccessKey ID and the AccessKey secret are combined by using the number sign (
| LTaI5xxxxxxxxxxxxindexp2#CZO8XXXXXXXXXXpKSG Note
|
If you want to use a Kafka consumer group to consume data in Simple Log Service in real time, submit a ticket to contact Alibaba Cloud technical support.
Example 1: Use Beats to upload logs
You can use Beats, such as Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat, to collect logs. After the logs are collected, you can use the Kafka protocol to upload the logs to Simple Log Service. For more information, see Beats-Kafka-Output.
Configuration example
For more information about how to configure parameters prefixed with
SLS_
in the example, see Configuration method.output.kafka: # initial brokers for reading cluster metadata hosts: ["SLS_KAFKA_ENDPOINT"] username: "SLS_PROJECT" password: "SLS_PASSWORD" ssl.certificate_authorities: # message topic selection + partitioning topic: 'SLS_LOGSTORE' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Example 2: Use collectd to upload logs
collectd is a daemon process that periodically collects the metrics of systems and applications. You can use the Kafka protocol to upload the collected metrics to Simple Log Service. For more information about collectd, see collectd. For more information, see Write Kafka Plugin.
Before you upload the metrics that are collected by collectd to Simple Log Service, you must install the collectd-write_kafka plug-in and the related dependencies. For example, you can run the sudo yum install collectd-write_kafka
command on a CentOS Linux server to install the collectd-write_kafka plug-in. For more information about how to install RPM Package Manager (RPM) packages, visit Collectd-write_kafka.
Configuration example
collectd supports various formats, including the JSON, Command, and Graphite formats. In this example, the JSON format is used. For more information, see collectd documentation.
For more information about how to configure parameters prefixed with
SLS_
in the example, see Configuration method.
LoadPlugin write_kafka <Plugin write_kafka> Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "SLS_PROJECT" Property "sasl.password" "SLS_PASSWORD" Property "broker.address.family" "v4" <Topic "SLS_LOGSTORE"> Format JSON Key "content" </Topic> </Plugin>
Example 3: Use Telegraf to upload logs
Telegraf is an agent written in the Go programming language and is used to collect, process, and aggregate metrics. Telegraf consumes only a small amount of memory resources. For more information, see Telegraf. Telegraf provides various plug-ins and integration capabilities. You can use Telegraf to retrieve metrics from the systems on which Telegraf runs or from third-party APIs. You can also use Telegraf to monitor metrics by using StatsD and Kafka consumers.
Before you upload the metrics that are collected by using Telegraf to Simple Log Service, you must modify the configuration file of Telegraf.
Configuration example
Telegraf supports various formats, including the JSON, Carbon2, and Graphite formats. In this example, the JSON format is used. For more information, see Output Data Formats of Telegraf.
NoteYou must specify a valid path for tls_ca. You can specify the path to the root certificate on your server. In most cases, the path to the root certificate on a Linux server is /etc/ssl/certs/ca-bundle.crt.
For more information about how to configure parameters prefixed with
SLS_
in the example, see Configuration method.
# Kafka output plugin configuration [[outputs.kafka]] ## URLs of kafka brokers brokers = ["SLS_KAFKA_ENDPOINT"] ## Kafka topic for producer messages topic = "SLS_LOGSTORE" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "SLS_PROJECT" sasl_password = "SLS_PASSWORD" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
Example 4: Use Fluentd to upload logs
Fluentd is an open source log collector. Fluentd is a project of Cloud Native Computing Foundation (CNCF). All components of Fluentd are available under the Apache 2 License. For more information, see Fluentd.
Fluentd supports various input, processing, and output plug-ins. You can collect logs by using Fluentd and upload the collected logs to Simple Log Service by using the fluent-plugin-kafka plug-in. You need to only install and configure the plug-in. For more information, see fluent-plugin-kafka.
Configuration example
Fluentd supports dozens of formats. In this example, the JSON format is used. For more information, see Fluentd Formatter.
For more information about how to configure parameters prefixed with
SLS_
in the example, see Configuration method.
<match **> @type kafka2 brokers SLS_KAFKA_ENDPOINT default_topic SLS_LOGSTORE default_message_key content sasl_over_ssl true use_event_time true username SLS_PROJECT password "SLS_PASSWORD" ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 1000 required_acks 1 compression_codec gzip use_event_time true max_send_limit_bytes 2097152 <buffer hostlogs> flush_interval 10s </buffer> <format> @type json </format> </match>
Example 5: Use Logstash to upload logs
Logstash is an open source log collection engine that provides real-time processing capabilities. You can use Logstash to dynamically collect logs from different sources. For more information, see Logstash.
Logstash provides a built-in Kafka output plug-in. You can configure Logstash to collect logs and upload the collected logs to Simple Log Service by using the Kafka protocol. Simple Log Service uses the SASL_SSL protocol during data transmission. You must configure an SSL certificate and a Java Authentication and Authorization Service (JAAS) file.
Configuration example
Logstash supports dozens of formats. In this example, the JSON format is used For more information, see Logstash Codec.
NoteThe following configuration is used to test network connectivity. In a production environment, we recommend that you remove the stdout field.
For more information about how to configure parameters prefixed with
SLS_
in the example, see Configuration method.
output { stdout { codec => rubydebug } kafka { topic_id => "SLS_LOGSTORE" bootstrap_servers => "SLS_KAFKA_ENDPOINT" security_protocol => "SASL_SSL" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
Example 6: Use Fluent Bit to upload logs
Fluent Bit is a lightweight and highly scalable logging and metrics processor and forwarder. Fluent Bit supports various input, processing, and output plug-ins. You can use the Kafka output plug-in to upload logs to Simple Log Service. For more information, see Kafka output plugin.
Configuration example
For more information about how to configure parameters prefixed with
SLS_
in the example, see Configuration method.[Output] Name kafka Match * Brokers SLS_KAFKA_ENDPOINT Topics SLS_LOGSTORE Format json rdkafka.sasl.username SLS_PROJECT rdkafka.sasl.password SLS_PASSWORD rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
Example 7: Use Vector to upload logs
Vector is a lightweight and high-performance log processing tool that you can use to upload logs by using the Kafka protocol. For more information, see Vector. The following example describes the configuration for Vector to write logs to Simple Log Service by using the Kafka protocol.
Configuration example
For more information about how to configure parameters prefixed with
SLS_
in the example, see Configuration method.[sinks.aliyun_sls] type = "kafka" inputs = ["test_logs"] bootstrap_servers = "SLS_KAFKA_ENDPOINT" compression = "gzip" healthcheck = true topic = "SLS_LOGSTORE" encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "SLS_PROJECT" sasl.password = "SLS_PASSWORD" tls.enabled = true
Example 8: Use a KafkaProducer SDK to upload logs
Sample code
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // The configuration information. Properties props = new Properties(); String project = "etl-dev"; String logstore = "testlog"; // If you want to automatically expand JSON logs, set the boolean parseJson parameter to true. boolean parseJson = true; // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Simple Log Service is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can save your AccessKey ID and AccessKey secret in your configuration file based on your business requirements. // To prevent key leaks, we recommend that you do not save your AccessKey ID and AccessKey secret in the code. String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-huhehaote.log.aliyuncs.com"; // The endpoint varies based on the region of your Simple Log Service project. String port = "10012"; // For public endpoints, set the port number to 10012. For internal endpoints, set the port number to 10011. String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put("bootstrap.servers", hosts); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put("enable.idempotence", "false"); // The Kafka write interface of Simple Log Service does not support transactions. // Specify the serialization class for data keys and values. props.put("key.serializer", StringSerializer.class); props.put("value.serializer", StringSerializer.class); // Create a producer instance. KafkaProducer<String,String> producer = new KafkaProducer<>(props); // Send records. for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; ProducerRecord record = new ProducerRecord<String, String>(topic, content); producer.send(record); } producer.close(); } }
POM dependency
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>
Errors
If a log fails to be uploaded by using the Kafka protocol, an error is reported. The following table describes the errors. For more information, see Error list.
Error | Description | Solution |
NetworkException | A network exception occurred. | Wait for 1 second and try again. |
TopicAuthorizationException | Authentication failed. | If your AccessKey pair is invalid or the AccessKey pair does not have the permissions to write data to the specified project or Logstore, the authentication fails. In this case, enter a valid AccessKey pair and make sure that the AccessKey pair has the required write permissions. |
UnknownTopicOrPartitionException | One of the following errors occurred:
| Make sure that the specified project or Logstore exists. If the specified project or Logstore exists but the issue persists, check whether the region where the specified project resides is the same as the region of the specified endpoint. |
KafkaStorageException | A server error occurred. | Wait for 1 second and try again. |