You can collect logs by using tools such as KafkaProducer SDK, Beats, collectd, Fluentd, Logstash, Telegraf, and Vector, and use the Kafka protocol to upload the collected logs to Simple Log Service. This topic describes how to use a log collection tool to collect logs and use the Kafka protocol to upload the collected logs to Simple Log Service.
Permissions
You must use one of the following policies:
Configuration method
When you use the Kafka protocol to upload logs to Simple Log Service, you must configure related parameters. The following table describes the parameters.
Parameter | Value | Description | Example |
Parameter | Value | Description | Example |
SLS_KAFKA_ENDPOINT | The address to which an initial connection is established. You can specify the endpoint of a Simple Log Service project in the Project name.Endpoint:Port format. For more information, see Endpoints. | Example of an internal endpoint: Project name.cn-hangzhou-intranet.log.aliyuncs.com:10011 . The port number is 10011. Example of a public endpoint: Project name.cn-hangzhou.log.aliyuncs.com:10012 . The port number is 10012.
| In the following examples, aliyun-project-test is the project name. cn-hangzhou-xxx.aliyuncs.com is the endpoint. For more information, see Endpoints. 10011 is the port number for the internal endpoint, and 10012 is the port number for the public endpoint. |
SLS_PROJECT | The project name. | The name of the Simple Log Service project. | aliyun-project-test |
SLS_LOGSTORE | The Logstore name. | The name of the Simple Log Service Logstore. If you append .json to a Logstore name to create a different name, the logs in the Logstore are parsed in JSON mode. | In this example, a Simple Log Service Logstore is named test-logstore If you set this parameter to test-logstore , the collected logs are stored in the content field. If you set this parameter to test-logstore.json , the collected logs are parsed in JSON mode before the logs are uploaded to Simple Log Service. The first-layer keys in JSON logs are field names and the related values are field values.
|
SLS_PASSWORD | The AccessKey pair that has the write permissions on Simple Log Service. | For more information about the definition of an AccessKey pair and how to create an AccessKey pair, see Create an AccessKey pair. The value is the combination of an AccessKey ID and an AccessKey secret. The AccessKey ID and the AccessKey secret are combined by using the number sign (# ). | LTaI5xxxxxxxxxxxxindexp2#CZO8XXXXXXXXXXpKSG Note AccessKey ID: LTaI5xxxxxxxxxxxxindexp2. AccessKey secret: CZO8XXXXXXXXXXpKSG. Number sign (#): combines the AccessKey ID and the AccessKey secret.
|
Note
If you want to use a Kafka consumer group to consume data in Simple Log Service in real time, submit a ticket to contact Alibaba Cloud technical support.
Example 1: Use Beats to upload logs
You can use Beats, such as Metricbeat, Packetbeat, Winlogbeat, Auditbeat, Filebeat, and Heartbeat, to collect logs. After the logs are collected, you can use the Kafka protocol to upload the logs to Simple Log Service.
Example 2: Use collectd to upload logs
collectd is a daemon process that periodically collects the metrics of systems and applications. You can use the Kafka protocol to upload the collected metrics to Simple Log Service. For more information about collectd, see collectd. For more information, see Write Kafka Plugin.
Before you upload the metrics that are collected by collectd to Simple Log Service, you must install the collectd-write_kafka plug-in and the related dependencies. For example, you can run the sudo yum install collectd-write_kafka
command on a CentOS Linux server to install the collectd-write_kafka plug-in. For more information about how to install RPM Package Manager (RPM) packages, visit Collectd-write_kafka.
Configuration example
collectd supports various formats, including the JSON, Command, and Graphite formats. In this example, the JSON format is used. For more information, see collectd documentation.
For more information about how to configure parameters prefixed with SLS_
in the example, see Configuration method.
LoadPlugin write_kafka
<Plugin write_kafka>
Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
Property "security.protocol" "sasl_ssl"
Property "sasl.mechanism" "PLAIN"
Property "sasl.username" "SLS_PROJECT"
Property "sasl.password" "SLS_PASSWORD"
Property "broker.address.family" "v4"
<Topic "SLS_LOGSTORE">
Format JSON
Key "content"
</Topic>
</Plugin>
Example 3: Use Telegraf to upload logs
Telegraf is an agent written in the Go programming language and is used to collect, process, and aggregate metrics. Telegraf consumes only a small amount of memory resources. For more information, see Telegraf. Telegraf provides various plug-ins and integration capabilities. You can use Telegraf to retrieve metrics from the systems on which Telegraf runs or from third-party APIs. You can also use Telegraf to monitor metrics by using StatsD and Kafka consumers.
Before you upload the metrics that are collected by using Telegraf to Simple Log Service, you must modify the configuration file of Telegraf.
Configuration example
Telegraf supports various formats, including the JSON, Carbon2, and Graphite formats. In this example, the JSON format is used. For more information, see Output Data Formats of Telegraf.
Note
You must specify a valid path for tls_ca. You can specify the path to the root certificate on your server. In most cases, the path to the root certificate on a Linux server is /etc/ssl/certs/ca-bundle.crt.
For more information about how to configure parameters prefixed with SLS_
in the example, see Configuration method.
[[outputs.kafka]]
brokers = ["SLS_KAFKA_ENDPOINT"]
topic = "SLS_LOGSTORE"
routing_key = "content"
compression_codec = 1
tls_cert = "/etc/ssl/certs/ca-certificates.crt"
sasl_username = "SLS_PROJECT"
sasl_password = "SLS_PASSWORD"
data_format = "json"
Example 4: Use Fluentd to upload logs
Fluentd is an open source log collector. Fluentd is a project of Cloud Native Computing Foundation (CNCF). All components of Fluentd are available under the Apache 2 License. For more information, see Fluentd.
Fluentd supports various input, processing, and output plug-ins. You can collect logs by using Fluentd and upload the collected logs to Simple Log Service by using the fluent-plugin-kafka plug-in. You need to only install and configure the plug-in. For more information, see fluent-plugin-kafka.
Configuration example
Fluentd supports dozens of formats. In this example, the JSON format is used. For more information, see Fluentd Formatter.
For more information about how to configure parameters prefixed with SLS_
in the example, see Configuration method.
<match **>
@type kafka2
brokers SLS_KAFKA_ENDPOINT
default_topic SLS_LOGSTORE
default_message_key content
sasl_over_ssl true
use_event_time true
username SLS_PROJECT
password "SLS_PASSWORD"
ssl_ca_certs_from_system true
max_send_retries 1000
required_acks 1
compression_codec gzip
use_event_time true
max_send_limit_bytes 2097152
<buffer hostlogs>
flush_interval 10s
</buffer>
<format>
@type json
</format>
</match>
Example 5: Use Logstash to upload logs
Logstash is an open source log collection engine that provides real-time processing capabilities. You can use Logstash to dynamically collect logs from different sources. For more information, see Logstash.
Logstash provides a built-in Kafka output plug-in. You can configure Logstash to collect logs and upload the collected logs to Simple Log Service by using the Kafka protocol. Simple Log Service uses the SASL_SSL protocol during data transmission. You must configure an SSL certificate and a Java Authentication and Authorization Service (JAAS) file.
Configuration example
Logstash supports dozens of formats. In this example, the JSON format is used For more information, see Logstash Codec.
Note
The following configuration is used to test network connectivity. In a production environment, we recommend that you remove the stdout field.
For more information about how to configure parameters prefixed with SLS_
in the example, see Configuration method.
output {
stdout { codec => rubydebug }
kafka {
topic_id => "SLS_LOGSTORE"
bootstrap_servers => "SLS_KAFKA_ENDPOINT"
security_protocol => "SASL_SSL"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
sasl_mechanism => "PLAIN"
codec => "json"
client_id => "kafka-logstash"
}
}
Example 6: Use Fluent Bit to upload logs
Fluent Bit is a lightweight and highly scalable logging and metrics processor and forwarder. Fluent Bit supports various input, processing, and output plug-ins. You can use the Kafka output plug-in to upload logs to Simple Log Service. For more information, see Kafka output plugin.
Example 7: Use Vector to upload logs
Vector is a lightweight and high-performance log processing tool that you can use to upload logs by using the Kafka protocol. For more information, see Vector. The following example describes the configuration for Vector to write logs to Simple Log Service by using the Kafka protocol.
Example 8: Use a KafkaProducer SDK to upload logs
Sample code
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProduceExample {
public static void main(String[] args) {
Properties props = new Properties();
String project = "etl-dev";
String logstore = "testlog";
boolean parseJson = true;
String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
String endpoint = "cn-huhehaote.log.aliyuncs.com";
String port = "10012";
String hosts = project + "." + endpoint + ":" + port;
String topic = logstore;
if(parseJson) {
topic = topic + ".json";
}
props.put("bootstrap.servers", hosts);
props.put("security.protocol", "sasl_ssl");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
props.put("enable.idempotence", "false");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
KafkaProducer<String,String> producer = new KafkaProducer<>(props);
for(int i=0;i<1;i++){
String content = "{\"msg\": \"Hello World\"}";
ProducerRecord record = new ProducerRecord<String, String>(topic, content);
producer.send(record);
}
producer.close();
}
}
POM dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
Errors
If a log fails to be uploaded by using the Kafka protocol, an error is reported. The following table describes the errors. For more information, see Error list.
Error | Description | Solution |
NetworkException | A network exception occurred. | Wait for 1 second and try again. |
TopicAuthorizationException | Authentication failed. | If your AccessKey pair is invalid or the AccessKey pair does not have the permissions to write data to the specified project or Logstore, the authentication fails. In this case, enter a valid AccessKey pair and make sure that the AccessKey pair has the required write permissions. |
UnknownTopicOrPartitionException | One of the following errors occurred: | Make sure that the specified project or Logstore exists. If the specified project or Logstore exists but the issue persists, check whether the region where the specified project resides is the same as the region of the specified endpoint. |
KafkaStorageException | A server error occurred. | Wait for 1 second and try again. |