All Products
Search
Document Center

ApsaraMQ for Kafka:Use ApsaraMQ for Kafka as a Logstash input in a VPC

Last Updated:Mar 11, 2026

Logstash processes data through a pipeline of inputs, filters, and outputs. By connecting an ApsaraMQ for Kafka instance as an input, Logstash consumes messages from Kafka topics and forwards them to destinations such as Elasticsearch, file storage, or stdout.

This architecture provides two benefits:

  • Asynchronous processing: Kafka buffers messages so that Logstash processes them at its own pace, preventing data loss during traffic spikes.

  • Decoupling: If a downstream system such as Elasticsearch goes offline, Kafka retains messages until the system recovers. Upstream producers remain unaffected.

This topic describes how to configure Logstash to consume messages from an ApsaraMQ for Kafka instance over a virtual private cloud (VPC) connection.

Prerequisites

Before you begin, make sure that you have:

  • An ApsaraMQ for Kafka instance purchased and deployed. For more information, see Purchase and deploy a VPC-connected instance

  • Logstash installed. For more information, see Download Logstash

  • Java Development Kit (JDK) 8 installed. For more information, see the Java 8 download page

  • The Logstash Kafka input plugin available. Run the following command to verify: If the plugin is not listed, install it:

      bin/logstash-plugin list | grep logstash-input-kafka
      bin/logstash-plugin install logstash-input-kafka

Step 1: Get the endpoint

Logstash connects to ApsaraMQ for Kafka through a VPC endpoint. ApsaraMQ for Kafka provides two types of VPC endpoints:

Endpoint typePortWhen to use
Default endpoint9092Standard access without authentication
Simple Authentication and Security Layer (SASL) endpoint9094Authenticated access. Requires the access control list (ACL) feature to be enabled. For more information, see Enable the ACL feature.

For more information about endpoint differences, see Comparison among endpoints.

To get the endpoint:

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.

  3. On the Instances page, click the name of the target instance.

  4. On the Instance Details page, find the endpoint in the Endpoint Information section. If you plan to use the SASL endpoint, note the Username and Password values in the Configuration Information section.

endpoint

Step 2: Create a topic

Create a topic to hold the messages that Logstash will consume.

Important

Create the topic in the same region as your Elastic Compute Service (ECS) instance. Topics cannot be used across regions. For example, if the producers and consumers run on an ECS instance in the China (Beijing) region, the topic must also be in the China (Beijing) region.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.

  3. On the Instances page, click the name of the target instance.

  4. In the left-side navigation pane, click Topics.

  5. On the Topics page, click Create Topic.

  6. In the Create Topic panel, configure the following parameters and click OK.

ParameterDescriptionExample
NameThe topic name.demo
DescriptionThe topic description.demo test
PartitionsThe number of partitions.12
Storage EngineThe storage engine type. Available only for Professional Edition instances. Standard Edition instances use Cloud Storage by default. Options: Cloud Storage -- Uses Alibaba Cloud disks with three replicas in distributed mode. Provides low latency, high performance, and high reliability. Required when the instance edition is Standard (High Write). Local Storage -- Uses the in-sync replicas (ISR) algorithm of open source Apache Kafka with three replicas in distributed mode.Cloud Storage
Message TypeThe message ordering type. Normal Message -- Messages with the same key are stored in the same partition in send order. Partition ordering may not be preserved during broker failures. Automatically selected when Storage Engine is Cloud Storage. Partitionally Ordered Message -- Messages with the same key remain ordered in the same partition even during broker failures. Some partitions may become temporarily unavailable. Automatically selected when Storage Engine is Local Storage.Normal Message
Log Cleanup PolicyThe log cleanup policy. Available only when Storage Engine is Local Storage (Professional Edition only). Delete -- Default policy. Retains messages based on the maximum retention period. Deletes the earliest messages when storage usage exceeds 85%. Compact -- Retains only the latest value for each message key. Used by components such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.
Important

You can use log-compacted topics only in specific cloud-native components, such as Kafka Connect and Confluent Schema Registry.

Compact
TagTags to attach to the topic.demo

After the topic is created, it appears on the Topics page.

Step 3: Send test messages

Send a test message to verify the topic is ready for consumption.

  1. Log on to the ApsaraMQ for Kafka console.

  2. Navigate to the target instance and click Topics in the left-side navigation pane.

  3. Click the topic name, then click Send Message in the upper-right corner of the Topic Details page.

  4. In the Start to Send and Consume Message panel, select a Sending Method (Console, Docker, or SDK) and send a test message. If you want to send the message to a specific partition, you can specify the partition ID. For information about how to query partition IDs, see View partition status.

Step 4: Create a consumer group

Create a consumer group for Logstash to track its consumption offset.

  1. Log on to the ApsaraMQ for Kafka console.

  2. Navigate to the target instance and click Groups in the left-side navigation pane.

  3. On the Groups page, click Create Group.

  4. In the Create Group panel, enter a Group ID and Description, attach tags if needed, and click OK.

After the consumer group is created, it appears on the Groups page.

Step 5: Configure and run Logstash

  1. Navigate to the Logstash installation directory.

  2. Create a configuration file named input.conf in the bin directory:

input {
  kafka {
    bootstrap_servers => "<your-kafka-endpoint>"       # VPC endpoint, e.g., alikafka-pre-cn-zv****-1-vpc.alikafka.aliyuncs.com:9092
    group_id          => "<your-consumer-group-id>"     # Consumer group created in Step 4
    topics            => ["<your-topic-name>"]          # Topic created in Step 2
    consumer_threads  => 12                             # Match this to the number of topic partitions
    auto_offset_reset => "earliest"                     # Start from the earliest message
  }
}
output {
  stdout { codec => rubydebug }
}
  1. Run Logstash:

./logstash -f input.conf

Configuration parameters

ParameterDescriptionExample
bootstrap_serversComma-separated list of VPC endpoints for the ApsaraMQ for Kafka instance. Default endpoint or SASL endpoint.alikafka-pre-cn-zv\*\*\*\*-1-vpc.alikafka.aliyuncs.com:9092
group_idConsumer group identifier.logstash_group
topicsTopic name or names to consume from.logstash_test
consumer_threadsNumber of consumer threads. We recommend that you set this to the number of topic partitions.12
auto_offset_resetOffset reset strategy. earliest: consume from the first available message. latest: consume only new messages.earliest

Verify message consumption

After Logstash starts, consumed messages print to stdout in Ruby debug format:

logstash_5

See also