本文介紹如何使用Python SDK通過存取點接入雲訊息佇列 Kafka 版並收發訊息。
環境準備
添加Python依賴庫
執行以下命令安裝依賴庫。
pip install confluent-kafka==1.9.2
建議您安裝confluent-kafka 1.9.2及以下版本的依賴庫,否則使用公網發送訊息會報SSL_HANDSHAKE
錯誤。
準備配置
可選:下載SSL根憑證。如果是SSL存取點,需下載該認證。
- 訪問aliware-kafka-demos,單擊表徵圖,然後在下拉框選擇Download ZIP,下載Demo包並解壓。
在解壓的Demo工程中,找到kafka-confluent-python-demo檔案夾,將此檔案夾上傳到Linux系統。
修改設定檔setting.py。
預設存取點
登入Linux系統,進入vpc檔案目錄,修改設定檔setting.py。
kafka_setting = { 'bootstrap_servers': 'XXX:xxx,XXX:xxx', 'topic_name': 'XXX', 'group_name': 'XXX' }
參數
描述
bootstrap_servers
預設存取點。您可在雲訊息佇列 Kafka 版控制台的实例详情頁面的接入点信息地區擷取。
topic_name
Topic名稱。您可在雲訊息佇列 Kafka 版控制台的Topic 管理頁面擷取。
group_name
Group名稱。您可在雲訊息佇列 Kafka 版控制台的Group 管理頁面擷取。
SSL存取點
登入Linux系統,進入vpc-ssl檔案目錄,修改設定檔setting.py。
kafka_setting = { 'sasl_plain_username': 'XXX', 'sasl_plain_password': 'XXX', 'ca_location': '/XXX/mix-4096-ca-cert', 'bootstrap_servers': 'XXX:xxx,XXX:xxx', 'topic_name': 'XXX', 'group_name': 'XXX' }
參數
描述
sasl_plain_username
SASL使用者名稱。
說明- 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的实例详情頁面的配置信息地區擷取預設的用户名和密码。
- 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權。
sasl_plain_password
SASL使用者名稱密碼。
ca_location
SSL根憑證的路徑。用本地路徑替換樣本中的XXX。例如:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert。
bootstrap_servers
SSL存取點。您可在雲訊息佇列 Kafka 版控制台的实例详情頁面的接入点信息地區擷取。
topic_name
Topic名稱。您可在雲訊息佇列 Kafka 版控制台的Topic 管理頁面擷取。
group_name
Group名稱。您可在雲訊息佇列 Kafka 版控制台的Group 管理頁面擷取。
發送訊息
執行以下命令發送訊息(當前樣本的Python版本為3.9)。
python kafka_producer.py
訊息程式kafka_producer.py範例程式碼如下:
預設存取點
from confluent_kafka import Producer import setting conf = setting.kafka_setting # 初始化一個Producer對象。 p = Producer({'bootstrap.servers': conf['bootstrap_servers']}) def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) # 非同步發送訊息。 p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) # 在程式結束時,調用flush。 p.flush()
SSL存取點
from confluent_kafka import Producer import setting conf = setting.kafka_setting p = Producer({'bootstrap.servers':conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password']}) def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) p.flush()
訂閱訊息
執行以下命令訂閱訊息(當前樣本的Python版本為3.9)。
python kafka_consumer.py
訊息程式kafka_consumer.py範例程式碼如下:
預設存取點
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'group.id': conf['group_name'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()
SSL存取點
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password'], 'group.id': conf['group_name'], 'auto.offset.reset': 'latest', 'fetch.message.max.bytes':'1024*512' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()