本文介紹如何使用Node.js SDK通過存取點接入雲訊息佇列 Kafka 版並收發訊息。
環境配置
安裝C++依賴庫
執行以下命令切換到yum源配置目錄/etc/yum.repos.d/。
cd /etc/yum.repos.d/
建立yum源設定檔confluent.repo。
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.1/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.1 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1
執行以下命令安裝C++依賴庫。
yum install librdkafka-devel
安裝Node.js依賴庫
執行以下命令為前置處理器指定OpenSSL標頭檔路徑。
# 配置為您的系統安裝的OpenSSL標頭檔路徑。 export CPPFLAGS=-I</usr/local/opt/openssl/include>
執行以下命令為連接器指定OpenSSL庫路徑。
# 配置為您的系統安裝的OpenSSL庫路徑。 export LDFLAGS=-L</usr/local/opt/openssl/lib>
執行以下命令安裝Node.js依賴庫。
npm install i --unsafe-perm node-rdkafka
在命令列視窗執行whereis openssl命令擷取OpenSSL標頭檔路徑和庫路徑。
準備配置
可選:下載SSL根憑證。如果是SSL存取點,需下載該認證。
訪問Aliware-kafka-demos,單擊,下載Demo工程到本地並解壓。
在解壓的Demo工程中找到kafka-nodejs-demo檔案夾,根據存取點類型開啟對應的檔案夾,配置setting.js檔案。
module.exports = { 'sasl_plain_username': 'XXX', 'sasl_plain_password': 'XXX', 'bootstrap_servers': ["XXX"], 'topic_name': 'XXX', 'consumer_id': 'XXX' }
參數
描述
sasl_plain_username
SASL使用者名稱。如果是預設存取點,則無此配置項。
說明- 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的实例详情頁面的配置信息地區擷取預設的用户名和密码。
- 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權。
sasl_plain_password
SASL使用者名稱密碼。如果是預設存取點,則無此配置項。
bootstrap_servers
SSL存取點。您可在雲訊息佇列 Kafka 版控制台的实例详情頁面的接入点信息地區擷取。
topic_name
Topic名稱。您可在雲訊息佇列 Kafka 版控制台的Topic 管理頁面擷取。
consumer_id
Group名稱。您可在雲訊息佇列 Kafka 版控制台的Group 管理頁面擷取。
配置完成後,將設定檔所在檔案夾下的全部檔案(如果是SSL存取點執行個體,包含認證SSL根憑證檔案),上傳至伺服器Node.js依賴庫安裝目錄下。
發送訊息
執行如下命令發送訊息。
node producer.js
程式碼範例
預設存取點
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log("features:" + Kafka.features); console.log(Kafka.librdkafkaVersion); var producer = new Kafka.Producer({ /*'debug': 'all', */ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'dr_cb': true, 'dr_msg_cb': true }); var connected = false producer.setPollInterval(100); producer.connect(); producer.on('ready', function() { connected = true console.log("connect ok") }); producer.on("disconnected", function() { connected = false; producer.connect(); }) producer.on('event.log', function(event) { console.log("event.log", event); }); producer.on("error", function(error) { console.log("error:" + error); }); function produce() { try { producer.produce( config['topic_name'], null, new Buffer('Hello Ali Kafka'), null, Date.now() ); } catch (err) { console.error('A problem occurred when sending our message'); console.error(err); } } producer.on('delivery-report', function(err, report) { console.log("delivery-report: producer ok"); }); producer.on('event.error', function(err) { console.error('event.error:' + err); }) setInterval(produce,1000,"Interval");
SSL存取點
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log("features:" + Kafka.features); console.log(Kafka.librdkafkaVersion); var producer = new Kafka.Producer({ /*'debug': 'all', */ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'dr_cb': true, 'dr_msg_cb': true, 'security.protocol' : 'sasl_ssl', 'ssl.ca.location' : './ca-cert.pem', 'sasl.mechanisms' : 'PLAIN', 'ssl.endpoint.identification.algorithm':'none', 'sasl.username' : config['sasl_plain_username'], 'sasl.password' : config['sasl_plain_password'] }); var connected = false producer.setPollInterval(100); producer.connect(); producer.on('ready', function() { connected = true console.log("connect ok") }); function produce() { try { producer.produce( config['topic_name'], new Buffer('Hello Ali Kafka'), null, Date.now() ); } catch (err) { console.error('A problem occurred when sending our message'); console.error(err); } } producer.on("disconnected", function() { connected = false; producer.connect(); }) producer.on('event.log', function(event) { console.log("event.log", event); }); producer.on("error", function(error) { console.log("error:" + error); }); producer.on('delivery-report', function(err, report) { console.log("delivery-report: producer ok"); }); // Any errors we encounter, including connection errors producer.on('event.error', function(err) { console.error('event.error:' + err); }) setInterval(produce,1000,"Interval");
訂閱訊息
執行如下命令消費訊息。
node consumer.js
程式碼範例
預設存取點
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log(Kafka.features); console.log(Kafka.librdkafkaVersion); console.log(config) var consumer = new Kafka.KafkaConsumer({ /*'debug': 'all',*/ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'group.id' : config['consumer_id'] }); consumer.connect(); consumer.on('ready', function() { console.log("connect ok"); consumer.subscribe([config['topic_name']]); consumer.consume(); }) consumer.on('data', function(data) { console.log(data); }); consumer.on('event.log', function(event) { console.log("event.log", event); }); consumer.on('error', function(error) { console.log("error:" + error); }); consumer.on('event', function(event) { console.log("event:" + event); });
SSL存取點
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log(Kafka.features); console.log(Kafka.librdkafkaVersion); console.log(config) var consumer = new Kafka.KafkaConsumer({ /*'debug': 'all',*/ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'security.protocol' : 'sasl_ssl', 'ssl.endpoint.identification.algorithm':'none', 'ssl.ca.location' : './ca-cert.pem', 'sasl.mechanisms' : 'PLAIN', 'message.max.bytes': 32000, 'fetch.max.bytes' : 32000, 'fetch.message.max.bytes': 32000, 'max.partition.fetch.bytes': 32000, 'sasl.username' : config['sasl_plain_username'], 'sasl.password' : config['sasl_plain_password'], 'group.id' : config['consumer_id'] }); consumer.connect(); consumer.on('ready', function() { console.log("connect ok"); consumer.subscribe([config['topic_name']]); consumer.consume(); }) consumer.on('data', function(data) { console.log(data); }); consumer.on('event.log', function(event) { console.log("event.log", event); }); consumer.on('error', function(error) { console.log("error:" + error); }); consumer.on('event', function(event) { console.log("event:" + event); });