本文介紹如何使用PHP SDK通過存取點接入雲訊息佇列 Kafka 版並收發訊息。
環境準備
安裝C++依賴庫
執行以下命令切換到yum源配置目錄/etc/yum.repos.d/。
cd /etc/yum.repos.d/
建立yum源設定檔confluent.repo。
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.1/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.1 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1
執行以下命令安裝C++依賴庫。
yum install librdkafka-devel
安裝PHP依賴庫
執行以下命令安裝PHP依賴庫。
pecl install rdkafka
在PHP的初始設定檔案php.ini中添加以下一行語句以開啟擴充。
extension=rdkafka.so
準備配置
可選:下載SSL根憑證。如果是SSL存取點,需下載該認證。
訪問Aliware-kafka-demos,單擊,下載Demo工程到本地並解壓。
在解壓的Demo工程找到kafka-php-demo檔案夾,根據存取點類型開啟對應的檔案夾,配置setting.php檔案。
<?php return [ 'sasl_plain_username' => 'xxx', 'sasl_plain_password' => 'xxx', 'bootstrap_servers' => "xxx:xx,xxx:xx", 'topic_name' => 'xxx', 'consumer_id' => 'xxx' ];
參數
描述
sasl_plain_username
SASL使用者名稱。如果是預設存取點,則無此配置項。
說明- 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的实例详情頁面的配置信息地區擷取預設的用户名和密码。
- 如果執行個體已開啟ACL,請確保要使用的SASL使用者已被授予向雲訊息佇列 Kafka 版執行個體收發訊息的許可權。具體操作,請參見SASL使用者授權。
sasl_plain_password
SASL使用者名稱密碼。如果是預設存取點,則無此配置項。
bootstrap_servers
SSL存取點。您可在雲訊息佇列 Kafka 版控制台的实例详情頁面的接入点信息地區擷取。
topic_name
Topic名稱。您可在雲訊息佇列 Kafka 版控制台的Topic 管理頁面擷取。
consumer_id
Group名稱。您可在雲訊息佇列 Kafka 版控制台的Group 管理頁面擷取。
配置完成後,將設定檔所在檔案夾下的全部檔案(如果是SSL存取點執行個體,包含認證SSL根憑證檔案)上傳至伺服器PHP安裝目錄下。
發送訊息
執行以下命令發送訊息。
php kafka-producer.php
訊息程式kafka-producer.php範例程式碼如下:
範例程式碼為SSL存取點的代碼。如果是預設存取點,無SASL相關代碼,即刪除如下代碼中包含sasl.
和ssl.
相關的代碼即可。
<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('ssl.endpoint.identification.algorithm', 'none');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# if want to debug, set log level to LOG_DEBUG
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
$rk->poll(50);
}
echo "send succ" . PHP_EOL;
程式碼範例詳情,請參見php-rdkafka。
訂閱訊息
執行如下命令訂閱訊息。
php kafka-consumer.php
訊息程式kafka-consumer.php範例程式碼如下:
範例程式碼為SSL存取點的代碼。如果是預設存取點,無SASL相關代碼,即刪除如下代碼中包含sasl.
和ssl.
相關的代碼即可。
<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('session.timeout.ms', 10000);
$conf->set('request.timeout.ms', 305000);
$conf->set('group.id', $setting['consumer_id']);
$conf->set('ssl.endpoint.identification.algorithm', 'none');
$conf->set('metadata.broker.list', $setting['bootstrap_servers']);
$topicConf = new RdKafka\TopicConf();
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$setting['topic_name']]);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(30 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>
程式碼範例詳情,請參見php-rdkafka。