This topic describes how to use the SDK for Node.js to connect to ApsaraMQ for Kafka to send and receive messages.
Environment requirements
GNU Compiler Collection (GCC) is installed. For more information, see Installing GCC.
Node.js is installed. For more information, visit the download page of Node.js.
ImportantThe version of Node.js must be 4.0.0 or later.
OpenSSL is installed. For more information, see Downloads.
Install the C++ library
Run the following command to switch to the /etc/yum.repos.d/ yum repository directory:
cd /etc/yum.repos.d/
Create a yum repository configuration file named confluent.repo.
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.1/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.1 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1
Run the following command to install the C++ library:
yum install librdkafka-devel
Install the Node.js library
Run the following command to specify the path of the OpenSSL header file for the preprocessor:
# Set this parameter to the path of the OpenSSL header file that is installed on your on-premises machine. export CPPFLAGS=-I</usr/local/opt/openssl/include>
Run the following command to specify the path of the OpenSSL library for the connector:
# Set this parameter to the path of the OpenSSL library file that is installed on your on-premises machine. export LDFLAGS=-L</usr/local/opt/openssl/lib>
Run the following command to install the Node.js library:
npm install i --unsafe-perm node-rdkafka
Run the whereis openssl command to obtain the path of the OpenSSL header file and the path of the OpenSSL library.
Create configuration files
(Optional) Download the Secure Sockets Layer (SSL) root certificate. If you use the SSL endpoint to connect to your ApsaraMQ for Kafka instance, you must install the certificate.
Go to the aliware-kafka-demos page, click to download the demo project to your on-premises machine and then decompress the package of the demo project.
In the decompressed package, go to the kafka-nodejs-demo folder. Then, open the corresponding folder based on the endpoint that you want to use and configure the setting.js file in the folder.
module.exports = { 'sasl_plain_username': 'XXX', 'sasl_plain_password': 'XXX', 'bootstrap_servers': ["XXX"], 'topic_name': 'XXX', 'consumer_id': 'XXX' }
Parameter
Description
sasl_plain_username
The username of the Simple Authentication and Security Layer (SASL) user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.
NoteIf the ACL feature is not enabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.
If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and receive messages by using the instance. For more information, see Grant permissions to SASL users.
sasl_plain_password
The password of the SASL user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.
bootstrap_servers
The SSL endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
topic_name
The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.
consumer_id
The group ID. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.
After the required parameters are configured, upload all files in the folder in which the configuration file is located to the installation directory of the Node.js dependency library on your server. The folder that corresponds to the SSL endpoint contains the SSL root certificate file.
Send messages
Run the following command to run producer.js to send messages:
node producer.js
The following sample code provides examples of consumer.js:
If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, use the following sample code:
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log("features:" + Kafka.features); console.log(Kafka.librdkafkaVersion); var producer = new Kafka.Producer({ /*'debug': 'all', */ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'dr_cb': true, 'dr_msg_cb': true }); var connected = false producer.setPollInterval(100); producer.connect(); producer.on('ready', function() { connected = true console.log("connect ok") }); producer.on("disconnected", function() { connected = false; producer.connect(); }) producer.on('event.log', function(event) { console.log("event.log", event); }); producer.on("error", function(error) { console.log("error:" + error); }); function produce() { try { producer.produce( config['topic_name'], null, new Buffer('Hello Ali Kafka'), null, Date.now() ); } catch (err) { console.error('A problem occurred when sending our message'); console.error(err); } } producer.on('delivery-report', function(err, report) { console.log("delivery-report: producer ok"); }); producer.on('event.error', function(err) { console.error('event.error:' + err); }) setInterval(produce,1000,"Interval");
If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, use the following sample code:
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log("features:" + Kafka.features); console.log(Kafka.librdkafkaVersion); var producer = new Kafka.Producer({ /*'debug': 'all', */ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'dr_cb': true, 'dr_msg_cb': true, 'security.protocol' : 'sasl_ssl', 'ssl.ca.location' : './ca-cert.pem', 'sasl.mechanisms' : 'PLAIN', 'ssl.endpoint.identification.algorithm':'none', 'sasl.username' : config['sasl_plain_username'], 'sasl.password' : config['sasl_plain_password'] }); var connected = false producer.setPollInterval(100); producer.connect(); producer.on('ready', function() { connected = true console.log("connect ok") }); function produce() { try { producer.produce( config['topic_name'], new Buffer('Hello Ali Kafka'), null, Date.now() ); } catch (err) { console.error('A problem occurred when sending our message'); console.error(err); } } producer.on("disconnected", function() { connected = false; producer.connect(); }) producer.on('event.log', function(event) { console.log("event.log", event); }); producer.on("error", function(error) { console.log("error:" + error); }); producer.on('delivery-report', function(err, report) { console.log("delivery-report: producer ok"); }); // Any errors we encounter, including connection errors producer.on('event.error', function(err) { console.error('event.error:' + err); }) setInterval(produce,1000,"Interval");
Receive messages
Run the following command to run consumer.js to receive messages:
node consumer.js
The following sample code provides examples of consumer.js:
If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, use the following sample code:
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log(Kafka.features); console.log(Kafka.librdkafkaVersion); console.log(config) var consumer = new Kafka.KafkaConsumer({ /*'debug': 'all',*/ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'group.id' : config['consumer_id'] }); consumer.connect(); consumer.on('ready', function() { console.log("connect ok"); consumer.subscribe([config['topic_name']]); consumer.consume(); }) consumer.on('data', function(data) { console.log(data); }); consumer.on('event.log', function(event) { console.log("event.log", event); }); consumer.on('error', function(error) { console.log("error:" + error); }); consumer.on('event', function(event) { console.log("event:" + event); });
If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, use the following sample code:
const Kafka = require('node-rdkafka'); const config = require('./setting'); console.log(Kafka.features); console.log(Kafka.librdkafkaVersion); console.log(config) var consumer = new Kafka.KafkaConsumer({ /*'debug': 'all',*/ 'api.version.request': 'true', 'bootstrap.servers': config['bootstrap_servers'], 'security.protocol' : 'sasl_ssl', 'ssl.endpoint.identification.algorithm':'none', 'ssl.ca.location' : './ca-cert.pem', 'sasl.mechanisms' : 'PLAIN', 'message.max.bytes': 32000, 'fetch.max.bytes' : 32000, 'fetch.message.max.bytes': 32000, 'max.partition.fetch.bytes': 32000, 'sasl.username' : config['sasl_plain_username'], 'sasl.password' : config['sasl_plain_password'], 'group.id' : config['consumer_id'] }); consumer.connect(); consumer.on('ready', function() { console.log("connect ok"); consumer.subscribe([config['topic_name']]); consumer.consume(); }) consumer.on('data', function(data) { console.log(data); }); consumer.on('event.log', function(event) { console.log("event.log", event); }); consumer.on('error', function(error) { console.log("error:" + error); }); consumer.on('event', function(event) { console.log("event:" + event); });