All Products
Search
Document Center

ApsaraMQ for Kafka:Connect a ApsaraMQ for Kafka instance to Logstash as an output over the Internet

Last Updated:Dec 26, 2024

A ApsaraMQ for Kafka instance can be connected to Logstash as an output. This topic describes how to use Logstash to send messages to ApsaraMQ for Kafka over the Internet.

Prerequisites

Before you start this tutorial, make sure that the following operations are complete:

Step 1: Obtain an endpoint

Logstash establishes a connection to ApsaraMQ for Kafka by using a ApsaraMQ for Kafka endpoint.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to connect to Logstash as an output.

  4. In the Endpoint Information section of the Instance Details page, view the endpoints of the instance. In the Configuration Information section, obtain the values of the Username and Password parameters.

    endpoint

    Note

    For information about the differences among different types of endpoints, see Comparison among endpoints.

Step 2: Create a topic

Perform the following operations to create a topic for storing messages:

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

    Important

    You must create topics in the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if the producers and consumers of messages run on an ECS instance that is deployed in the China (Beijing) region, the topic must also be created in the China (Beijing) region.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Topics.

  5. On the Topics page, click Create Topic.

  6. In the Create Topic panel, specify the properties of the topic and click OK.

    Parameter

    Description

    Example

    Name

    The topic name.

    demo

    Description

    The topic description.

    demo test

    Partitions

    The number of partitions in the topic.

    12

    Storage Engine

    Note

    You can specify the storage engine type only if you use a Professional Edition instance. If you use a Standard Edition instance, cloud storage is selected by default.

    The type of the storage engine that is used to store messages in the topic.

    ApsaraMQ for Kafka supports the following types of storage engines:

    • Cloud Storage: If you select this value, the system uses Alibaba Cloud disks for the topic and stores data in three replicas in distributed mode. This storage engine features low latency, high performance, long durability, and high reliability. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can set this parameter only to Cloud Storage.

    • Local Storage: If you select this value, the system uses the in-sync replicas (ISR) algorithm of open source Apache Kafka and stores data in three replicas in distributed mode.

    Cloud Storage

    Message Type

    The message type of the topic. Valid values:

    • Normal Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. If a broker in the cluster fails, the order of messages that are stored in the partitions may not be preserved. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.

    • Partitionally Ordered Message: By default, messages that have the same key are stored in the same partition in the order in which the messages are sent. If a broker in the cluster fails, messages are still stored in the partitions in the order in which the messages are sent. Messages in some partitions cannot be sent until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.

    Normal Message

    Log Cleanup Policy

    The log cleanup policy that is used by the topic.

    If you set the Storage Engine parameter to Local Storage, you must configure the Log Cleanup Policy parameter. You can set the Storage Engine parameter to Local Storage only if you use an ApsaraMQ for Kafka Professional Edition instance.

    ApsaraMQ for Kafka provides the following log cleanup policies:

    • Delete: the default log cleanup policy. If sufficient storage space is available in the system, messages are retained based on the maximum retention period. After the storage usage exceeds 85%, the system deletes the earliest stored messages to ensure service availability.

    • Compact: the log compaction policy that is used in Apache Kafka. Log compaction ensures that the latest values are retained for messages that have the same key. This policy is suitable for scenarios such as restoring a failed system or reloading the cache after a system restarts. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the information about the system status and configurations in a log-compacted topic.

      Important

      You can use log-compacted topics only in specific cloud-native components, such as Kafka Connect and Confluent Schema Registry. For more information, see aliware-kafka-demos.

    Compact

    Tag

    The tags that you want to attach to the topic.

    demo

    After a topic is created, you can view the topic on the Topics page.

Step 3: Use Logstash to send a message

Start Logstash on the server where Logstash is installed, and send a message to the topic that you created.

  1. Run the cd command to switch to the bin directory of Logstash.

  2. Run the following command to download the kafka.client.truststore.jks certificate file:

    wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks
  3. Create a configuration file named jaas.conf.

    1. Run the vim jaas.conf command to create an empty configuration file.

    2. Press the i key to enter the insert mode.

    3. Enter the following content:

      KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="XXX"
        password="XXX";
      };

      Parameter

      Description

      Example

      username

      The username of your Message Queue for Apache Kafka instance of the Internet and VPC type.

      alikafka_pre-cn-v0h1***

      password

      The password of your Message Queue for Apache Kafka instance of the Internet and VPC type.

      GQiSmqbQVe3b9hdKLDcIlkrBK6***

    4. Press the Esc key to return to the CLI mode.

    5. Press the : key to enter the bottom line mode. Enter wq and press the Enter key to save the file and exit.

  4. Create a configuration file named output.conf.

    1. Run the vim output.conf command to create an empty configuration file.

    2. Press the i key to enter the insert mode.

    3. Enter the following content:

      input {
          stdin{}
      }
      
      output {
         kafka {
              bootstrap_servers => "alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093"
              topic_id => "logstash_test"
              security_protocol => "SASL_SSL"
              sasl_mechanism => "PLAIN"
              jaas_path => "/home/logstash-7.6.2/bin/jaas.conf"
              ssl_truststore_password => "KafkaOnsClient"
              ssl_truststore_location => "/home/logstash-7.6.2/bin/kafka.client.truststore.jks"
              ssl_endpoint_identification_algorithm => ""
          }
      }

      Parameter

      Description

      Example

      bootstrap_servers

      The public endpoint of your Message Queue for Apache Kafka instance. The public endpoint provided by ApsaraMQ for Kafka is the Secure Sockets Layer (SSL) endpoint.

      alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093

      topic_id

      The name of the topic.

      logstash_test

      security_protocol

      The security protocol. Default value: SASL_SSL. You do not need to change this value.

      SASL_SSL

      sasl_mechanism

      The security authentication mechanism. Default value: PLAIN. You do not need to change this value.

      PLAIN

      jaas_path

      The path of the jaas.conf configuration file.

      /home/logstash-7.6.2/bin/jaas.conf

      ssl_truststore_password

      The password of the kafka.client.truststore.jks certificate. Default value: KafkaOnsClient. You do not need to change this value.

      KafkaOnsClient

      ssl_truststore_location

      The path of the kafka.client.truststore.jks certificate.

      /home/logstash-7.6.2/bin/kafka.client.truststore.jks

      ssl_endpoint_identification_algorithm

      The algorithm for identifying the SSL endpoint. This parameter is required for Logstash V6.x and later.

      Null

    4. Press the Esc key to return to the CLI mode.

    5. Press the : key to enter the bottom line mode. Enter wq and press the Enter key to save the file and exit.

  5. Send a message to the topic that you created.

    1. Run the ./logstash -f output.conf command.

    2. Enter test and press Enter.

      output_result

Step 4: View the partitions of the topic

Perform the following operations to view the message that was sent to the topic:

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Topics.

  5. On the Topics page, click the name of the topic that you want to manage. On the Topic Details page, click the Partition Status tab.

    Table 1. Parameters included in partition status

    Parameter

    Description

    Partition ID

    The partition ID.

    Minimum Offset

    The minimum offset in the partition.

    Maximum Offset

    The maximum offset in the partition.

    Messages

    The number of messages in the partition.

    Last Updated At

    The time when the last message in the partition is stored.

    分区状态信息

Step 5: Query the message by offset

You can query the sent message based on its partition ID and offset information.

  1. Log on to the ApsaraMQ for Kafka console.

  2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. On the Instances page, click the name of the instance that you want to manage.

  4. In the left-side navigation pane, click Message Query.

  5. On the Message Query page, select Search by offset from the Search Method drop-down list.

  6. Select a topic from the Topic drop-down list and a partition from the Partition drop-down list, enter an offset value in the Offset field, and then click Search.

    Messages whose offset values are greater than or equal to the specified offset value are displayed. For example, if you specify 5 as the values of the Partition parameter and the Offset parameter, the system queries messages whose offsets are equal to or greater than 5 from Partition 5.

    Table 2. Parameters included in message query results

    Parameter

    Description

    Partition

    The partition from which the message is retrieved.

    Offset

    The offset of the message.

    Key

    The message key. The key is converted to a string.

    Value

    The message value, which is also known as message content. The message value is converted to a string.

    Created At

    The point in time when the message was sent. The value is the timestamp that the client recorded when the message was sent or the value of the timestamp field that you specified for the ProducerRecord object.

    Note
    • If you specified a value for the timestamp field, the specified value is displayed.

    • If you did not specify a value for the timestamp field, the local system time when the message was sent is displayed.

    • A value in the 1970/x/x x:x:x format indicates that the timestamp field is specified as 0 or an invalid value.

    • You cannot specify the timestamp field on clients of ApsaraMQ for Kafka version 0.9 or earlier.

    Actions

    • Click Download Key to download the message key.

    • Click Download Value to download the message content.

    Important
    • Up to 1 KB of content for each retrieved message can be displayed in the ApsaraMQ for Kafka console. If a retrieved message exceeds 1 KB in size, the system automatically truncates the content. If you want to view the complete message content, download the message.

    • You can download up to 10 MB of the retrieved messages at a time. If the retrieved messages exceed 10 MB in size, only the first 10 MB of message content can be downloaded.

References

For more information about parameter settings, see Kafka output plugin.