全部產品
Search
文件中心

ApsaraMQ for Kafka:Python SDK收發訊息

更新時間:Dec 27, 2024

本文介紹如何使用Python SDK通過存取點接入雲訊息佇列 Kafka 版並收發訊息。

環境準備

添加Python依賴庫

執行以下命令安裝依賴庫。

pip install confluent-kafka==1.9.2
重要

建議您安裝confluent-kafka 1.9.2及以下版本的依賴庫,否則使用公網發送訊息會報SSL_HANDSHAKE錯誤。

準備配置

  1. 可選:下載SSL根憑證。如果是SSL存取點,需下載該認證。

  2. 訪問aliware-kafka-demos,單擊code表徵圖,然後在下拉框選擇Download ZIP,下載Demo包並解壓。
  3. 在解壓的Demo工程中,找到kafka-confluent-python-demo檔案夾,將此檔案夾上傳到Linux系統。

  4. 修改設定檔setting.py。

    預設存取點

    登入Linux系統,進入vpc檔案目錄,修改設定檔setting.py。

    kafka_setting = {
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    參數

    描述

    bootstrap_servers

    預設存取點。您可在雲訊息佇列 Kafka 版控制台实例详情頁面的接入点信息地區擷取。

    topic_name

    Topic名稱。您可在雲訊息佇列 Kafka 版控制台Topic 管理頁面擷取。

    group_name

    Group名稱。您可在雲訊息佇列 Kafka 版控制台Group 管理頁面擷取。

    SSL存取點

    登入Linux系統,進入vpc-ssl檔案目錄,修改設定檔setting.py。

    kafka_setting = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'ca_location': '/XXX/mix-4096-ca-cert',
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    參數

    描述

    sasl_plain_username

    SASL使用者名稱。

    說明
    • 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台实例详情頁面的配置信息地區擷取預設的用户名密码
    • 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權

    sasl_plain_password

    SASL使用者名稱密碼。

    ca_location

    SSL根憑證的路徑。用本地路徑替換樣本中的XXX。例如:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert

    bootstrap_servers

    SSL存取點。您可在雲訊息佇列 Kafka 版控制台实例详情頁面的接入点信息地區擷取。

    topic_name

    Topic名稱。您可在雲訊息佇列 Kafka 版控制台Topic 管理頁面擷取。

    group_name

    Group名稱。您可在雲訊息佇列 Kafka 版控制台Group 管理頁面擷取。

發送訊息

執行以下命令發送訊息(當前樣本的Python版本為3.9)。

python kafka_producer.py

訊息程式kafka_producer.py範例程式碼如下:

  • 預設存取點

    from confluent_kafka import Producer
    import setting
    
    conf = setting.kafka_setting
    # 初始化一個Producer對象。
    p = Producer({'bootstrap.servers': conf['bootstrap_servers']})
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    # 非同步發送訊息。
    p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
    p.poll(0)
    
    # 在程式結束時,調用flush。
    p.flush()
  • SSL存取點

    from confluent_kafka import Producer
    import setting
    
    conf = setting.kafka_setting
    
    p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
       'ssl.endpoint.identification.algorithm': 'none',
       'sasl.mechanisms':'PLAIN',
       'ssl.ca.location':conf['ca_location'],
       'security.protocol':'SASL_SSL',
       'sasl.username':conf['sasl_plain_username'],
       'sasl.password':conf['sasl_plain_password']})
    
    
    def delivery_report(err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
    p.poll(0)
    
    p.flush()

訂閱訊息

執行以下命令訂閱訊息(當前樣本的Python版本為3.9)。

python kafka_consumer.py

訊息程式kafka_consumer.py範例程式碼如下:

  • 預設存取點

    from confluent_kafka import Consumer, KafkaError
    
    import setting
    
    conf = setting.kafka_setting
    
    c = Consumer({
        'bootstrap.servers': conf['bootstrap_servers'],
        'group.id': conf['group_name'],
        'auto.offset.reset': 'latest'
    })
    
    c.subscribe([conf['topic_name']])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print("Consumer error: {}".format(msg.error()))
                continue
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()
  • SSL存取點

    from confluent_kafka import Consumer, KafkaError
    
    import setting
    
    conf = setting.kafka_setting
    
    c = Consumer({
        'bootstrap.servers': conf['bootstrap_servers'],
        'ssl.endpoint.identification.algorithm': 'none',
        'sasl.mechanisms':'PLAIN',
        'ssl.ca.location':conf['ca_location'],
        'security.protocol':'SASL_SSL',
        'sasl.username':conf['sasl_plain_username'],
        'sasl.password':conf['sasl_plain_password'],
        'group.id': conf['group_name'],
        'auto.offset.reset': 'latest',
        'fetch.message.max.bytes':'1024*512'
    })
    
    c.subscribe([conf['topic_name']])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
           if msg.error().code() == KafkaError._PARTITION_EOF:
              continue
           else:
               print("Consumer error: {}".format(msg.error()))
               continue
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()