全部產品
Search
文件中心

ApsaraMQ for Kafka:Node.js SDK收發訊息

更新時間:Dec 27, 2024

本文介紹如何使用Node.js SDK通過存取點接入雲訊息佇列 Kafka 版並收發訊息。

環境配置

安裝C++依賴庫

  1. 執行以下命令切換到yum源配置目錄/etc/yum.repos.d/

    cd /etc/yum.repos.d/
  2. 建立yum源設定檔confluent.repo

    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. 執行以下命令安裝C++依賴庫。

    yum install librdkafka-devel

安裝Node.js依賴庫

  1. 執行以下命令為前置處理器指定OpenSSL標頭檔路徑。

    # 配置為您的系統安裝的OpenSSL標頭檔路徑。
    export CPPFLAGS=-I</usr/local/opt/openssl/include>
  2. 執行以下命令為連接器指定OpenSSL庫路徑。

    # 配置為您的系統安裝的OpenSSL庫路徑。
    export LDFLAGS=-L</usr/local/opt/openssl/lib>
  3. 執行以下命令安裝Node.js依賴庫。

    npm install i --unsafe-perm node-rdkafka
說明

在命令列視窗執行whereis openssl命令擷取OpenSSL標頭檔路徑和庫路徑。

準備配置

  1. 可選:下載SSL根憑證。如果是SSL存取點,需下載該認證。

  2. 訪問Aliware-kafka-demos,單擊download,下載Demo工程到本地並解壓。

  3. 在解壓的Demo工程中找到kafka-nodejs-demo檔案夾,根據存取點類型開啟對應的檔案夾,配置setting.js檔案。

    module.exports = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'bootstrap_servers': ["XXX"],
        'topic_name': 'XXX',
        'consumer_id': 'XXX'
    }

    參數

    描述

    sasl_plain_username

    SASL使用者名稱。如果是預設存取點,則無此配置項。

    說明
    • 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台实例详情頁面的配置信息地區擷取預設的用户名密码
    • 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權

    sasl_plain_password

    SASL使用者名稱密碼。如果是預設存取點,則無此配置項。

    bootstrap_servers

    SSL存取點。您可在雲訊息佇列 Kafka 版控制台实例详情頁面的接入点信息地區擷取。

    topic_name

    Topic名稱。您可在雲訊息佇列 Kafka 版控制台Topic 管理頁面擷取。

    consumer_id

    Group名稱。您可在雲訊息佇列 Kafka 版控制台Group 管理頁面擷取。

  4. 配置完成後,將設定檔所在檔案夾下的全部檔案(如果是SSL存取點執行個體,包含認證SSL根憑證檔案),上傳至伺服器Node.js依賴庫安裝目錄下。

發送訊息

執行如下命令發送訊息。

node producer.js

程式碼範例

  • 預設存取點

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log("features:" + Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    
    var producer = new Kafka.Producer({
        /*'debug': 'all', */
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'dr_cb': true,
        'dr_msg_cb': true
    });
    
    var connected = false
    
    producer.setPollInterval(100);
    
    producer.connect();
    
    
    producer.on('ready', function() {
      connected = true
      console.log("connect ok")
    });
    
    producer.on("disconnected", function() {
      connected = false;
      producer.connect();
    })
    
    producer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    producer.on("error", function(error) {
        console.log("error:" + error);
    });
    
    function produce() {
      try {
        producer.produce(
          config['topic_name'],   
          null,      
          new Buffer('Hello Ali Kafka'),      
          null,   
          Date.now()
        );
      } catch (err) {
        console.error('A problem occurred when sending our message');
        console.error(err);
      }
    
    }
    
    producer.on('delivery-report', function(err, report) {
        console.log("delivery-report: producer ok");
    });
    
    producer.on('event.error', function(err) {
        console.error('event.error:' + err);
    })
    
    setInterval(produce,1000,"Interval");
  • SSL存取點

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log("features:" + Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    
    var producer = new Kafka.Producer({
        /*'debug': 'all', */
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'dr_cb': true,
        'dr_msg_cb': true,
        'security.protocol' : 'sasl_ssl',
        'ssl.ca.location' : './ca-cert.pem',
        'sasl.mechanisms' : 'PLAIN',
        'ssl.endpoint.identification.algorithm':'none',
        'sasl.username' : config['sasl_plain_username'],
        'sasl.password' : config['sasl_plain_password']
    });
    
    var connected = false
    
    producer.setPollInterval(100);
    
    producer.connect();
    
    producer.on('ready', function() {
      connected = true
      console.log("connect ok")
    
      });
    
    function produce() {
      try {
        producer.produce(
          config['topic_name'],
          new Buffer('Hello Ali Kafka'),
          null,
          Date.now()
        );
      } catch (err) {
        console.error('A problem occurred when sending our message');
        console.error(err);
      }
    }
    
    producer.on("disconnected", function() {
      connected = false;
      producer.connect();
    })
    
    producer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    producer.on("error", function(error) {
        console.log("error:" + error);
    });
    
    producer.on('delivery-report', function(err, report) {
        console.log("delivery-report: producer ok");
    });
    // Any errors we encounter, including connection errors
    producer.on('event.error', function(err) {
        console.error('event.error:' + err);
    })
    
    setInterval(produce,1000,"Interval");

訂閱訊息

執行如下命令消費訊息。

node consumer.js

程式碼範例

  • 預設存取點

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log(Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    console.log(config)
    
    var consumer = new Kafka.KafkaConsumer({
        /*'debug': 'all',*/
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'group.id' : config['consumer_id']
    });
    
    consumer.connect();
    
    consumer.on('ready', function() {
      console.log("connect ok");
      consumer.subscribe([config['topic_name']]);
      consumer.consume();
    })
    
    consumer.on('data', function(data) {
      console.log(data);
    });
    
    
    consumer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    consumer.on('error', function(error) {
        console.log("error:" + error);
    });
    
    consumer.on('event', function(event) {
            console.log("event:" + event);
    });
  • SSL存取點

    const Kafka = require('node-rdkafka');
    const config = require('./setting');
    console.log(Kafka.features);
    console.log(Kafka.librdkafkaVersion);
    console.log(config)
    
    var consumer = new Kafka.KafkaConsumer({
        /*'debug': 'all',*/
        'api.version.request': 'true',
        'bootstrap.servers': config['bootstrap_servers'],
        'security.protocol' : 'sasl_ssl',
        'ssl.endpoint.identification.algorithm':'none',
        'ssl.ca.location' : './ca-cert.pem',
        'sasl.mechanisms' : 'PLAIN',
        'message.max.bytes': 32000,
        'fetch.max.bytes' : 32000,
        'fetch.message.max.bytes': 32000,
        'max.partition.fetch.bytes': 32000,
        'sasl.username' : config['sasl_plain_username'],
        'sasl.password' : config['sasl_plain_password'],
        'group.id' : config['consumer_id']
    });
    
    consumer.connect();
    
    consumer.on('ready', function() {
      console.log("connect ok");
      consumer.subscribe([config['topic_name']]);
      consumer.consume();
    })
    
    consumer.on('data', function(data) {
      console.log(data);
    });
    
    
    consumer.on('event.log', function(event) {
          console.log("event.log", event);
    });
    
    consumer.on('error', function(error) {
        console.log("error:" + error);
    });
    
    consumer.on('event', function(event) {
            console.log("event:" + event);
    });