Logstash processes data through a pipeline of inputs, filters, and outputs. By connecting an ApsaraMQ for Kafka instance as an input, Logstash consumes messages from Kafka topics and forwards them to destinations such as Elasticsearch, file storage, or stdout.
This architecture provides two benefits:
Asynchronous processing: Kafka buffers messages so that Logstash processes them at its own pace, preventing data loss during traffic spikes.
Decoupling: If a downstream system such as Elasticsearch goes offline, Kafka retains messages until the system recovers. Upstream producers remain unaffected.
This topic describes how to configure Logstash to consume messages from an ApsaraMQ for Kafka instance over a virtual private cloud (VPC) connection.
Prerequisites
Before you begin, make sure that you have:
An ApsaraMQ for Kafka instance purchased and deployed. For more information, see Purchase and deploy a VPC-connected instance
Logstash installed. For more information, see Download Logstash
Java Development Kit (JDK) 8 installed. For more information, see the Java 8 download page
The Logstash Kafka input plugin available. Run the following command to verify: If the plugin is not listed, install it:
bin/logstash-plugin list | grep logstash-input-kafkabin/logstash-plugin install logstash-input-kafka
Step 1: Get the endpoint
Logstash connects to ApsaraMQ for Kafka through a VPC endpoint. ApsaraMQ for Kafka provides two types of VPC endpoints:
| Endpoint type | Port | When to use |
|---|---|---|
| Default endpoint | 9092 | Standard access without authentication |
| Simple Authentication and Security Layer (SASL) endpoint | 9094 | Authenticated access. Requires the access control list (ACL) feature to be enabled. For more information, see Enable the ACL feature. |
For more information about endpoint differences, see Comparison among endpoints.
To get the endpoint:
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
On the Instances page, click the name of the target instance.
On the Instance Details page, find the endpoint in the Endpoint Information section. If you plan to use the SASL endpoint, note the Username and Password values in the Configuration Information section.

Step 2: Create a topic
Create a topic to hold the messages that Logstash will consume.
Create the topic in the same region as your Elastic Compute Service (ECS) instance. Topics cannot be used across regions. For example, if the producers and consumers run on an ECS instance in the China (Beijing) region, the topic must also be in the China (Beijing) region.
Log on to the ApsaraMQ for Kafka console.
In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
On the Instances page, click the name of the target instance.
In the left-side navigation pane, click Topics.
On the Topics page, click Create Topic.
In the Create Topic panel, configure the following parameters and click OK.
| Parameter | Description | Example |
|---|---|---|
| Name | The topic name. | demo |
| Description | The topic description. | demo test |
| Partitions | The number of partitions. | 12 |
| Storage Engine | The storage engine type. Available only for Professional Edition instances. Standard Edition instances use Cloud Storage by default. Options: Cloud Storage -- Uses Alibaba Cloud disks with three replicas in distributed mode. Provides low latency, high performance, and high reliability. Required when the instance edition is Standard (High Write). Local Storage -- Uses the in-sync replicas (ISR) algorithm of open source Apache Kafka with three replicas in distributed mode. | Cloud Storage |
| Message Type | The message ordering type. Normal Message -- Messages with the same key are stored in the same partition in send order. Partition ordering may not be preserved during broker failures. Automatically selected when Storage Engine is Cloud Storage. Partitionally Ordered Message -- Messages with the same key remain ordered in the same partition even during broker failures. Some partitions may become temporarily unavailable. Automatically selected when Storage Engine is Local Storage. | Normal Message |
| Log Cleanup Policy | The log cleanup policy. Available only when Storage Engine is Local Storage (Professional Edition only). Delete -- Default policy. Retains messages based on the maximum retention period. Deletes the earliest messages when storage usage exceeds 85%. Compact -- Retains only the latest value for each message key. Used by components such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos. Important You can use log-compacted topics only in specific cloud-native components, such as Kafka Connect and Confluent Schema Registry. | Compact |
| Tag | Tags to attach to the topic. | demo |
After the topic is created, it appears on the Topics page.
Step 3: Send test messages
Send a test message to verify the topic is ready for consumption.
Log on to the ApsaraMQ for Kafka console.
Navigate to the target instance and click Topics in the left-side navigation pane.
Click the topic name, then click Send Message in the upper-right corner of the Topic Details page.
In the Start to Send and Consume Message panel, select a Sending Method (Console, Docker, or SDK) and send a test message. If you want to send the message to a specific partition, you can specify the partition ID. For information about how to query partition IDs, see View partition status.
Step 4: Create a consumer group
Create a consumer group for Logstash to track its consumption offset.
Log on to the ApsaraMQ for Kafka console.
Navigate to the target instance and click Groups in the left-side navigation pane.
On the Groups page, click Create Group.
In the Create Group panel, enter a Group ID and Description, attach tags if needed, and click OK.
After the consumer group is created, it appears on the Groups page.
Step 5: Configure and run Logstash
Navigate to the Logstash installation directory.
Create a configuration file named
input.confin thebindirectory:
input {
kafka {
bootstrap_servers => "<your-kafka-endpoint>" # VPC endpoint, e.g., alikafka-pre-cn-zv****-1-vpc.alikafka.aliyuncs.com:9092
group_id => "<your-consumer-group-id>" # Consumer group created in Step 4
topics => ["<your-topic-name>"] # Topic created in Step 2
consumer_threads => 12 # Match this to the number of topic partitions
auto_offset_reset => "earliest" # Start from the earliest message
}
}
output {
stdout { codec => rubydebug }
}Run Logstash:
./logstash -f input.confConfiguration parameters
| Parameter | Description | Example |
|---|---|---|
bootstrap_servers | Comma-separated list of VPC endpoints for the ApsaraMQ for Kafka instance. Default endpoint or SASL endpoint. | alikafka-pre-cn-zv\*\*\*\*-1-vpc.alikafka.aliyuncs.com:9092 |
group_id | Consumer group identifier. | logstash_group |
topics | Topic name or names to consume from. | logstash_test |
consumer_threads | Number of consumer threads. We recommend that you set this to the number of topic partitions. | 12 |
auto_offset_reset | Offset reset strategy. earliest: consume from the first available message. latest: consume only new messages. | earliest |
Verify message consumption
After Logstash starts, consumed messages print to stdout in Ruby debug format:

See also
Kafka input plugin -- Full parameter reference for the Logstash Kafka input plugin.
Comparison among endpoints -- Differences between ApsaraMQ for Kafka endpoint types.