全部产品
Search
文档中心

云消息队列 Kafka 版:Python SDK收发消息

更新时间:Nov 28, 2024

本文介绍如何使用Python SDK通过接入点接入云消息队列 Kafka 版并收发消息。

环境准备

添加Python依赖库

执行以下命令安装依赖库。

pip install confluent-kafka==1.9.2
重要

建议您安装confluent-kafka 1.9.2及以下版本的依赖库,否则使用公网发送消息会报SSL_HANDSHAKE错误。

准备配置

  1. 可选:下载SSL根证书。如果是SSL接入点,需下载该证书。

  2. 访问aliware-kafka-demos,单击code图标,然后在下拉框选择Download ZIP,下载Demo包并解压。
  3. 在解压的Demo工程中,找到kafka-confluent-python-demo文件夹,将此文件夹上传到Linux系统。

  4. 修改配置文件setting.py。

    默认接入点

    登录Linux系统,进入vpc文件目录,修改配置文件setting.py。

    kafka_setting = {
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    参数

    描述

    bootstrap_servers

    默认接入点。您可在云消息队列 Kafka 版控制台实例详情页面的接入点信息区域获取。

    topic_name

    Topic名称。您可在云消息队列 Kafka 版控制台Topic 管理页面获取。

    group_name

    Group名称。您可在云消息队列 Kafka 版控制台Group 管理页面获取。

    SSL接入点

    登录Linux系统,进入vpc-ssl文件目录,修改配置文件setting.py。

    kafka_setting = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'ca_location': '/XXX/mix-4096-ca-cert',
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    参数

    描述

    sasl_plain_username

    SASL用户名。

    说明
    • 如果实例未开启ACL,您可以在云消息队列 Kafka 版控制台实例详情页面的配置信息区域获取默认的用户名密码
    • 如果实例已开启ACL,请确保要使用的SASL用户已被授予向云消息队列 Kafka 版实例收发消息的权限。具体操作,请参见SASL用户授权

    sasl_plain_password

    SASL用户名密码。

    ca_location

    SSL根证书的路径。用本地路径替换示例中的XXX。例如:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert

    bootstrap_servers

    SSL接入点。您可在云消息队列 Kafka 版控制台实例详情页面的接入点信息区域获取。

    topic_name

    Topic名称。您可在云消息队列 Kafka 版控制台Topic 管理页面获取。

    group_name

    Group名称。您可在云消息队列 Kafka 版控制台Group 管理页面获取。

发送消息

执行以下命令发送消息(当前示例的Python版本为3.9)。

python kafka_producer.py

消息程序kafka_producer.py示例代码如下:

  • 默认接入点

    from confluent_kafka import Producer
    import setting
    
    conf = setting.kafka_setting
    # 初始化一个Producer对象。
    p = Producer({'bootstrap.servers': conf['bootstrap_servers']})
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    # 异步发送消息。
    p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
    p.poll(0)
    
    # 在程序结束时,调用flush。
    p.flush()
  • SSL接入点

    from confluent_kafka import Producer
    import setting
    
    conf = setting.kafka_setting
    
    p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
       'ssl.endpoint.identification.algorithm': 'none',
       'sasl.mechanisms':'PLAIN',
       'ssl.ca.location':conf['ca_location'],
       'security.protocol':'SASL_SSL',
       'sasl.username':conf['sasl_plain_username'],
       'sasl.password':conf['sasl_plain_password']})
    
    
    def delivery_report(err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
    p.poll(0)
    
    p.flush()

订阅消息

执行以下命令订阅消息(当前示例的Python版本为3.9)。

python kafka_consumer.py

消息程序kafka_consumer.py示例代码如下:

  • 默认接入点

    from confluent_kafka import Consumer, KafkaError
    
    import setting
    
    conf = setting.kafka_setting
    
    c = Consumer({
        'bootstrap.servers': conf['bootstrap_servers'],
        'group.id': conf['group_name'],
        'auto.offset.reset': 'latest'
    })
    
    c.subscribe([conf['topic_name']])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print("Consumer error: {}".format(msg.error()))
                continue
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()
  • SSL接入点

    from confluent_kafka import Consumer, KafkaError
    
    import setting
    
    conf = setting.kafka_setting
    
    c = Consumer({
        'bootstrap.servers': conf['bootstrap_servers'],
        'ssl.endpoint.identification.algorithm': 'none',
        'sasl.mechanisms':'PLAIN',
        'ssl.ca.location':conf['ca_location'],
        'security.protocol':'SASL_SSL',
        'sasl.username':conf['sasl_plain_username'],
        'sasl.password':conf['sasl_plain_password'],
        'group.id': conf['group_name'],
        'auto.offset.reset': 'latest',
        'fetch.message.max.bytes':'1024*512'
    })
    
    c.subscribe([conf['topic_name']])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
           if msg.error().code() == KafkaError._PARTITION_EOF:
              continue
           else:
               print("Consumer error: {}".format(msg.error()))
               continue
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()