This topic describes how to use the SDK for PHP to connect to ApsaraMQ for Kafka to send and receive messages.
Environment requirements
GNU Compiler Collection (GCC) is installed. For more information, see Installing GCC.
PHP is installed. For more information, visit the download page of PHP.
PHP Extension Community Library (PECL) is installed. For more information, see Downloading PECL extensions.
Install the C++ library
Run the following command to switch to the /etc/yum.repos.d/ yum repository directory:
cd /etc/yum.repos.d/
Create a yum repository configuration file named confluent.repo.
[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.1/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.1 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.1/archive.key enabled=1
Run the following command to install the C++ library:
yum install librdkafka-devel
Install the PHP library
Run the following command to install the PHP library:
pecl install rdkafka
In the PHP initialization file php.ini, add the following line to enable Kafka extensions:
extension=rdkafka.so
Prepare configuration files
(Optional) Download the Secure Sockets Layer (SSL) root certificate. If you use the SSL endpoint to connect to your ApsaraMQ for Kafka instance, you must install the certificate.
Go to the aliware-kafka-demos page, click to download the demo project to your on-premises machine, and then decompress the package of the demo project.
In the decompressed package, go to the kafka-php-demo folder. Then, open the corresponding folder based on the endpoint that you want to use, and configure the setting.php file in the folder.
<?php return [ 'sasl_plain_username' => 'xxx', 'sasl_plain_password' => 'xxx', 'bootstrap_servers' => "xxx:xx,xxx:xx", 'topic_name' => 'xxx', 'consumer_id' => 'xxx' ];
Parameter
Description
sasl_plain_username
The username of the Simple Authentication and Security Layer (SASL) user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.
NoteIf the ACL feature is not enabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.
If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and receive messages by using the instance. For more information, see Grant permissions to SASL users.
sasl_plain_password
The password of the SASL user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.
bootstrap_servers
The SSL endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
topic_name
The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.
consumer_id
The group ID. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.
After the required parameters are configured, upload all files in the folder in which the configuration file is located to the PHP installation directory on your server. The folder that corresponds to the SSL endpoint contains the SSL root certificate file.
Send messages
Run the following command to run kafka-producer.php to send messages:
php kafka-producer.php
The following sample code provides an example of kafka-producer.php:
In the sample code, the SSL endpoint is used. If you use the default endpoint, SASL-related code is not required. Delete the lines that contain sasl.
or ssl.
from the sample code.
<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('ssl.endpoint.identification.algorithm', 'none');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# if want to debug, set log level to LOG_DEBUG
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
$rk->poll(50);
}
echo "send succ" . PHP_EOL;
For more information about the sample code, see php-rdkafka.
Subscribe to messages
Run the following command to run kafka-consumer.php to subscribe to messages:
php kafka-consumer.php
The following sample code provides an example of kafka-consumer.php:
In the sample code, the SSL endpoint is used. If you use the default endpoint, SASL-related code is not required. Delete the lines that contain sasl.
or ssl.
from the sample code.
<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('session.timeout.ms', 10000);
$conf->set('request.timeout.ms', 305000);
$conf->set('group.id', $setting['consumer_id']);
$conf->set('ssl.endpoint.identification.algorithm', 'none');
$conf->set('metadata.broker.list', $setting['bootstrap_servers']);
$topicConf = new RdKafka\TopicConf();
$conf->setDefaultTopicConf($topicConf);
$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$setting['topic_name']]);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(30 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>
For more information about the sample code, see php-rdkafka.