You can collect logs by using tools such as KafkaProducer SDK, Beats, collectd, Fluentd, Logstash, Telegraf, and Vector, and upload the logs to Simple Log Service by using the Kafka protocol. This topic describes how to use a log collection tool to collect logs and upload the collected logs to Simple Log Service by using the Kafka protocol.
Limits
Only Kafka 2.1.0 and later versions are supported.
You must use the SASL_SSL protocol to ensure the security of log transmission.
Data parsing
Logs that are uploaded by using the Kafka protocol are stored in the content field. If the logs are of the JSON type, you can configure a JSON index for the content field. For more information, see JSON type.
If you use a KafkaProducer SDK or Beats to collect logs, you can configure the topic or headers parameter in the collection configuration to automatically display logs in the JSON format. In this case, Simple Log Service automatically expands the content field. You do not need to configure a JSON index for the content field. For more information, see Configuration method.
Configuration method
When you use the Kafka protocol to upload logs to Simple Log Service, you must configure the related parameters. The following table describes the parameters.
The parameter names vary based on the log collection tool. Configure the parameters based on your business scenario.
Parameter | Description |
Connection type | The security protocol. You must use the SASL_SSL protocol to ensure the security of log transmission. |
hosts | The address to which an initial connection is established. You can specify the endpoint of a Simple Log Service project in the
|
topic | The name of the Logstore. If you use a KafkaProducer SDK or Beats to collect logs and specify the output format as JSON, you can set the topic parameter to a value in the |
headers | If you use a KafkaProducer SDK or Beats to collect logs and specify the output format as JSON, you can set the headers parameter to the following content to automatically expand JSON logs:
For more information, see Example 1: Use Beats to upload logs. |
username | The name of the project. |
password | The AccessKey pair. You must specify a value in the ${access-key-id}#${access-key-secret} format. Replace ${access-key-id} and ${access-key-secret} with your AccessKey ID and AccessKey secret. We recommend that you use the AccessKey pair of a Resource Access Management (RAM) user. For more information, see Create a RAM user and authorize the RAM user to access Simple Log Service. |
Certificate file | The certificate file of the endpoint. Each endpoint of Simple Log Service has a certificate. Set this parameter to the path to the root certificate on your server. Example: /etc/ssl/certs/ca-bundle.crt. |
If you want to use a Kafka consumer group to consume data from Simple Log Service in real time, submit a ticket to contact Alibaba Cloud technical support.
Example 1: Use Beats to upload logs
You can use Beats such as Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat to collect logs. After the logs are collected, you can use the Kafka protocol to upload the logs to Simple Log Service. For more information, see Beats-Kafka-Output.
Example 1:
Sample configuration
output.kafka: # initial brokers for reading cluster metadata hosts: ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] username: "yourusername" password: "yourpassword" ssl.certificate_authorities: # message topic selection + partitioning topic: 'test-logstore-1' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
Sample log
By default, Beats provides JSON-formatted logs. The logs are uploaded to Simple Log Service and stored in the content field. You can create a JSON index for the content field. For more information, see JSON type.
Example 2:
Sample configuration
output.kafka: enabled: true hosts: ["cn-hangzhou-intranet.log.aliyuncs.com:10011"] username: "test-project-1" password: "access-key-id#access-key-secret" ssl.certificate_authorities: topic: 'test-logstore-1' headers: - key: "data-parse-format" value: "json" partition.hash: reachable_only: false
Sample log
You can configure the headers parameter to automatically expand JSON logs.
Example 2: Use collectd to upload logs
collectd is a daemon process that periodically collects the performance metrics of systems and applications. You can upload the collected metrics to Simple Log Service by using the Kafka protocol. For more information, see Write Kafka Plugin.
Before you upload the logs that are collected by collectd to Simple Log Service, you must install the collectd-write_kafka plug-in and related dependencies. For example, on a CentOS Linux server, you can run the sudo yum install collectd-write_kafka
command to install the collectd-write_kafka plug-in. For more information about how to install RPM Package Manager (RPM) packages, visit Collectd-write_kafka.
Sample configuration
collectd supports various formats, including JSON, Command, and Graphite. In this example, the JSON format is used. For more information, see collectd documentation.
<Plugin write_kafka> Property "metadata.broker.list" "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" Property "security.protocol" "sasl_ssl" Property "sasl.mechanism" "PLAIN" Property "sasl.username" "yourusername" Property "sasl.password" "yourpassword" Property "broker.address.family" "v4" <Topic "test-logstore-1"> Format JSON Key "content" </Topic> </Plugin>
Sample log
After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.
Example 3: Use Telegraf to upload logs
Telegraf is an agent written in the Go programming language and is used to collect, process, and aggregate metrics. Telegraf consumes only a small amount of memory resources. For more information, see Telegraf. Telegraf provides various plug-ins and integration capabilities. You can use Telegraf to retrieve metrics from the systems on which Telegraf runs or by calling third-party APIs. You can also use Telegraf to monitor metrics by using StatsD and Kafka consumers.
Before you upload the logs that are collected by using Telegraf to Simple Log Service, you must modify the configuration file of Telegraf.
Sample configuration
Telegraf supports various formats, including JSON, Carbon2, and Graphite. In this example, the JSON format is used. For more information, see Output Data Formats of Telegraf.
NoteYou must specify a valid path for tls_ca. You can specify the path to the root certificate on your server. In most cases, the path to the root certificate on a Linux server is /etc/ssl/certs/ca-bundle.crt.
[[outputs.kafka]] ## URLs of kafka brokers brokers = ["test-project-1.cn-hangzhou.log.aliyuncs.com:10012"] ## Kafka topic for producer messages topic = "test-logstore-1" routing_key = "content" ## CompressionCodec represents the various compression codecs recognized by ## Kafka in messages. ## 0 : No compression ## 1 : Gzip compression ## 2 : Snappy compression ## 3 : LZ4 compression compression_codec = 1 ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Optional SASL Config sasl_username = "yourusername" sasl_password = "yourpassword" ## Data format to output. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "json"
Sample log
After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.
Example 4: Use Fluentd to upload logs
Fluentd is an open source log collector. It is a project under the Cloud Native Computing Foundation (CNCF). All components of Fluentd are available under the Apache 2 License. For more information, see Fluentd.
Fluentd supports various input, processing, and output plug-ins. You can collect logs by using Fluentd and upload the collected logs to Simple Log Service by using the fluent-plugin-kafka plug-in. You need to only install and configure the plug-in. For more information, see fluent-plugin-kafka.
Sample configuration
Fluentd supports dozens of formats. In this example, the JSON format is used. For more information, see Fluentd Formatter.
<match **> @type kafka # Brokers: you can choose either brokers or zookeeper. brokers test-project-1.cn-hangzhou.log.aliyuncs.com:10012 default_topic test-logstore-1 default_message_key content output_data_type json output_include_tag true output_include_time true sasl_over_ssl true username yourusername // The username. Replace yourusername with the actual value. password "yourpassword" // The password. Replace yourpassword with the actual value. ssl_ca_certs_from_system true # ruby-kafka producer options max_send_retries 10000 required_acks 1 compression_codec gzip </match>
Sample log
After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.
Example 5: Use Logstash to upload logs
Logstash is an open source log collection engine that provides real-time processing capabilities. You can use Logstash to dynamically collect logs from different sources. For more information, see Logstash.
Logstash provides a built-in Kafka output plug-in. You can configure Logstash to collect logs and upload the collected logs to Simple Log Service by using the Kafka protocol. Simple Log Service uses the SASL_SSL protocol during data transmission. You must configure an SSL certificate and a Java Authentication and Authorization Service (JAAS) file.
Sample configuration
Create a JAAS file and save the file to a directory. Example: /etc/kafka/kafka_client_jaas.conf.
Add the following content to the JAAS file:
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="yourpassword"; };
Configure an SSL certificate and save the certificate to a directory. Example: /etc/kafka/client-root.truststore.jks.
Download the root certificate and save the certificate to a directory. Example: /etc/kafka/root.pem. Run a keytool command to generate a file in the .jks format. The first time you run the command to generate a file, you must configure a password.
keytool -keystore client-root.truststore.jks -alias root -import -file /etc/kafka/root.pem
Configure Logstash.
Logstash supports dozens of formats. In this example, the JSON format is used For more information, see Logstash Codec.
NoteThe following configuration is used to test network connectivity. In a production environment, we recommend that you remove the stdout field.
output { stdout { codec => rubydebug } kafka { topic_id => "test-logstore-1" bootstrap_servers => "test-project-1.cn-hangzhou.log.aliyuncs.com:10012" security_protocol => "SASL_SSL" ssl_truststore_location => "/etc/client-root.truststore.jks" ssl_truststore_password => "123456" jaas_path => "/etc/kafka_client_jaas.conf" sasl_mechanism => "PLAIN" codec => "json" client_id => "kafka-logstash" } }
Sample log
After JSON-formatted logs are uploaded to Simple Log Service and stored in the content field, you can create a JSON index for the content field. For more information, see JSON type.
Example 6: Use a KafkaProducer SDK to upload logs
Sample configuration
package org.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProduceExample { public static void main(String[] args) { // Configuration information. Properties props = new Properties(); String project = "etl-dev"; String logstore = "testlog"; // Set the following parameter to true if you want to automatically expand JSON logs: boolean parseJson = true; // The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations in Simple Log Service is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console. // In this example, the AccessKey ID and AccessKey secret are stored in the environment variables. You can save your AccessKey ID and AccessKey secret in your configuration file if required. // To prevent key leaks, we recommend that you do not save your AccessKey ID and AccessKey secret in the code. String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID"); String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET"); String endpoint = "cn-huhehaote.log.aliyuncs.com"; // The endpoint varies based on the region of your Simple Log Service project. String port = "10012"; // For a public endpoint, set the port number to 10012. For an internal endpoint, set the port number to 10011. String hosts = project + "." + endpoint + ":" + port; String topic = logstore; if(parseJson) { topic = topic + ".json"; } props.put("bootstrap.servers", hosts); props.put("security.protocol", "sasl_ssl"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";"); props.put("enable.idempotence", "false"); // The Kafka write interface of Simple Log Service does not support transactions. // Specify the serialization class for data keys and values. props.put("key.serializer", StringSerializer.class); props.put("value.serializer", StringSerializer.class); // Create a producer instance. KafkaProducer<String,String> producer = new KafkaProducer<>(props); // Send records. for(int i=0;i<1;i++){ String content = "{\"msg\": \"Hello World\"}"; ProducerRecord record = new ProducerRecord<String, String>(topic, content); producer.send(record); } producer.close(); } }
POM dependency
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>
Sample log
Example 7: Use Fluent Bit to upload logs
Fluent Bit is a lightweight and highly scalable logging and metrics processor and forwarder. Fluent Bit supports various input, processing, and output plug-ins. You can use the Kafka output plug-in to upload logs to Simple Log Service. For more information, see Kafka output plugin.
Sample configuration
For more information about the configuration method, see Configuration method.
[Output] Name kafka Match * Brokers etl-shanghai.cn-shanghai.log.aliyuncs.com:10012 Topics etl-out Format json rdkafka.sasl.username yourusername rdkafka.sasl.password yourpassword rdkafka.security.protocol SASL_SSL rdkafka.sasl.mechanism PLAIN
Sample log
Example 8: Use Vector to upload logs
Vector is a lightweight and high-performance log processing tool that can report logs by using the Kafka protocol. For more information, see Vector. The following example shows the configuration for Vector to write logs to Simple Log Service by using the Kafka protocol.
Sample configuration
[sinks.aliyun_sls] type = "kafka" inputs = ["file_logs"] # The source. In this example, a log file is monitored. bootstrap_servers = "etl-dev.cn-huhehaote.log.aliyuncs.com:10012" compression = "gzip" healthcheck = true topic = "dst-kafka.json" # dst-kafka is the name of the Logstore. The .json suffix indicates that JSON logs are automatically expanded. encoding.codec = "json" sasl.enabled = true sasl.mechanism = "PLAIN" sasl.username = "etl-dev" # etl-dev is the name of the project. sasl.password = "{{The AccessKey ID and AccessKey secret of the RAM user in the {AK#SK} format.}}" tls.enabled = true
Sample log
Errors
If a log fails to be uploaded by using the Kafka protocol, an error is reported. The following table describes the errors. For more information, see Error list.
Error | Description | Solution |
NetworkException | A network exception occurred. | Wait for 1 second and try again. |
TopicAuthorizationException | Authentication failed. | If your AccessKey pair is invalid or the AccessKey pair does not have permissions to write data to the specified project or Logstore, authentication fails. In this case, enter a valid AccessKey pair and make sure that the AccessKey pair has the required write permissions. |
UnknownTopicOrPartitionException | One of the following errors occurred:
| Make sure that the specified project or Logstore exists. If the specified project or Logstore exists but the error persists, check whether the region where the specified project resides is the same as the region of the specified endpoint. |
KafkaStorageException | A server error occurred. | Wait for 1 second and try again. |