Push events from ApsaraMQ for RabbitMQ to Function Compute

Updated at: 2024-11-06 05:07

This topic describes how to use EventBridge to push events from ApsaraMQ for RabbitMQ to Function Compute.

Prerequisites

Before you start, make sure that the following operations are performed:

Step 1: Add a custom event source

  1. Log on to the EventBridge console.

  2. In the left-side navigation pane, click Event Buses.

  3. In the top navigation bar, select a region.

  4. On the Event Buses page, click the name of the custom event bus that you created.

  5. In the left-side navigation pane, click Event Sources.

  6. On the Event Source page, click Add Event Source.

  7. In the Add Custom Event Source panel, configure the Name and Description parameters, select ApsaraMQ for RabbitMQ from the Event Provider drop-down list, select the resources that you created for ApsaraMQ for RabbitMQ, and then click OK.

Step 2: Create an event rule

Important

The event targets that you want to configure for an event rule must reside in the same region as the event rule.

  1. Log on to the EventBridge console. In the left-side navigation pane, click Event Buses.

  2. In the top navigation bar, select a region. On the Event Buses page, click the name of the event bus that you want to manage.

  3. In the left-side navigation pane, click Event Rules. On the page that appears, click Create Rule.

  4. Perform the following operations in the Create Rule panel:

    1. In the Configure Basic Info step, enter a rule name in the Name field and a rule description in the Description field, and click Next Step.

    2. In the Configure Event Pattern step, set the Event Source Type parameter to Custom Event Source, select the custom event source that is created in Step 1 from the Event Source drop-down list, specify an event pattern in the Pattern Content code editor, and then click Next Step.

      For more information, see Event patterns.

    3. In the Configure Targets step, configure an event target and click Create.

      Note

      You can configure up to five event targets for an event rule.

      • Service Type: Select Function Compute.

      • Service: Select the service that you created in Function Compute.

      • Function: Select the function that you created in Function Compute.

      • Event: Select Template.

        The following part shows two sample variables and a sample template:

        Sample variables:

        {
          "source":"$.source",
          "type":"$.type"
        }

        Sample template:

        The event comes from ${source},event type is ${type}.

        For more information, see Event transformation.

      • Service Version and Alias: Select a service version or a service alias.

        • Default Version: The value is fixed to LATEST.

        • Specified Version: Select a service version. For more information, see Manage versions.

        • Specified Alias: Select a service alias. For more information, see Manage aliases.

      • Invocation Mode: Select Synchronous or Asynchronous. For more information, see Synchronous invocations and Overview.

Step 3: Publish an event

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // The endpoint of the ApsaraMQ for RabbitMQ instance. 
        String hostName = "xxx.xxx.aliyuncs.com";
        // The static username and password of the ApsaraMQ for RabbitMQ instance. 
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        // The vhost of the ApsaraMQ for RabbitMQ instance. 
        String virtualHost = "${VirtualHost}";

        // In production environments, we recommend that you create the connection in advance and reuse it when needed. This prevents frequent establishment and closing of connections and improves system performance and stability. 
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        // The binding between the exchange and the queue. 
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String routingKey = "${RoutingKey}";
        // The exchange type. 
        String exchangeType = "${ExchangeType}";

        // To ensure user experience, make sure that the exchange and the queue are created by using a suitable method. 
        // In production environments, we recommend that you create the exchange and the queue in the ApsaraMQ for RabbitMQ console in advance. Do not declare the exchange or the queue in the code. Otherwise, throttling is triggered for the corresponding API operation. 
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        // Send a message. 
        for (int i = 0; i < 10; i++  ) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, routingKey, true, props,
                    ("Sample message body-" + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey);
        }
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        // Specify whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // The default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
        factory.setPort(5672);
        // The timeout period. Configure this parameter based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}

Verify the results

To verify the results, you can view logs in the Function Compute console.

  1. Log on to the Function Compute console.

  2. In the left-side navigation pane, click Services & Functions.

  3. In the top navigation bar, select a region.

  4. On the Services page, find the service to which you routed the event and click Functions in the Actions column.

  5. On the Functions page, click the name of the function to which you routed the event.

  6. On the Function Details page, click the Logs tab to view logs.

    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****

FAQ

How can I locate the issue if the event fails to be published?

If an event fails to be published, you can view the response to the publishing request for troubleshooting. You can go to the EventBridge console and view the related information in the Event Delivery section of the Event Trace message. Then, take appropriate measures based on the response returned.

What can I do if an event fails to be published to Function Compute and the "[500]ConnectErrorconnectiontimedout" error is returned in the response?

You can perform the following steps:
  1. Log on to the Function Compute console. Execute the function to which the event is routed and check the execution duration.
  2. If the execution duration is longer than 15s, check the network connection. If the execution duration is shorter than 15s, check whether you can access the endpoint for the region where the service to which the event is routed is deployed.
  3. If you cannot access the endpoints of the region where Function Compute is deployed, contact the Function Compute engineers.
  • On this page (1, T)
  • Prerequisites
  • Step 1: Add a custom event source
  • Step 2: Create an event rule
  • Step 3: Publish an event
  • Verify the results
  • FAQ
  • How can I locate the issue if the event fails to be published?
  • What can I do if an event fails to be published to Function Compute and the "[500]ConnectErrorconnectiontimedout" error is returned in the response?
Feedback