This topic describes how to use the SDK for Python to connect to ApsaraMQ for Kafka to send and receive messages.
Prerequisites
Python is installed. For more information, see Download Python.
NotePython 2.7 and 3.x are supported.
pip is installed. For more information, see pip documentation.
Install the Python library
Run the following command to install the Python library:
pip install confluent-kafka==1.9.2
We recommend that you install confluent-kafka 1.9.2 or earlier. Otherwise, the SSL_HANDSHAKE
error is returned when you send messages over the Internet.
Prepare a configuration file
(Optional) Download the Secure Sockets Layer (SSL) root certificate. If you use the SSL endpoint to connect to your ApsaraMQ for Kafka instance, you must install the certificate.
Visit the aliware-kafka-demos page. On the aliware-kafka-demos page, click the icon and select Download ZIP to download the demo package. Then, decompress the demo package.
In the decompressed package, find the kafka-confluent-python-demo folder and upload the folder to your Linux system.
Modify the setting.py configuration file.
Default endpoint
Log on to your Linux system, go to the vpc directory, and then modify the setting.py configuration file.
kafka_setting = { 'bootstrap_servers': 'XXX:xxx,XXX:xxx', 'topic_name': 'XXX', 'group_name': 'XXX' }
Parameter
Description
bootstrap_servers
The default endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
topic_name
The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.
group_name
The group ID. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.
SSL endpoint
Log on to your Linux system, go to the vpc-ssl directory, and then modify the setting.py configuration file.
kafka_setting = { 'sasl_plain_username': 'XXX', 'sasl_plain_password': 'XXX', 'ca_location': '/XXX/mix-4096-ca-cert', 'bootstrap_servers': 'XXX:xxx,XXX:xxx', 'topic_name': 'XXX', 'group_name': 'XXX' }
Parameter
Description
sasl_plain_username
The username of the Simple Authentication and Security Layer (SASL) user.
NoteIf the ACL feature is not enabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.
If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and receive messages by using the instance. For more information, see Grant permissions to SASL users.
sasl_plain_password
The password of the SASL user.
ca_location
The path to which the SSL root certificate is saved. Replace XXX in the sample code with the local path. Example: /home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert.
bootstrap_servers
The SSL endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
topic_name
The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.
group_name
The group ID. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.
Send messages
Run the following command to send messages. In the sample code, Python 3.9 is used.
python kafka_producer.py
The following sample code provides examples of kafka_producer.py:
Default endpoint
from confluent_kafka import Producer import setting conf = setting.kafka_setting # Initialize a producer. p = Producer({'bootstrap.servers': conf['bootstrap_servers']}) def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) # Send messages in asynchronous transmission mode. p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) # When the program is ended, call the flush() method. p.flush()
SSL endpoint
from confluent_kafka import Producer import setting conf = setting.kafka_setting p = Producer({'bootstrap.servers':conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'ssl.endpoint.identification.algorithm':'none', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password']}) def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) p.flush()
Subscribe to messages
Run the following command to subscribe to messages. In the sample code, Python 3.9 is used.
python kafka_consumer.py
The following sample code provides examples of kafka_consumer.py:
Default endpoint
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'group.id': conf['group_name'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()
SSL endpoint
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password'], 'ssl.endpoint.identification.algorithm':'none', 'group.id': conf['group_name'], 'auto.offset.reset': 'latest', 'fetch.message.max.bytes':'1024*512' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()