开发环境 Linux
首先我们需要下载安装librdkafka
https://github.com/confluentinc/librdkafka/tags?after=v2.10.0-RC2
tar -zxvf librdkafka-2.7.0.tar.gz cd librdkafka-2.7.0 ./configure make && make install如何知道我们安装成功了呢
ldconfig -p | grep rdkafka如果有如下输出 就说明安装成功了 。
接下来 我们安装 rdkafka 的PHP 扩展
https://pecl.php.net/package/rdkafka这里我们选择6.0.0
tar -zxvf rdkafka-6.0.0.tgz cd rdkafka-6.0.0 /usr/php/bin/phpize ./configure --with-php-config=/usr/php/bin/php-config make && make其中phpize 和 php-config 请修改你自己的PHP 环境。
编译好后,修改php.ini 将rdkafka.so 添加到配置中,重启php 运行如下命令,看看是否扩展生效。
php --ri rdkafka如果有输出 说明对应的kafka扩展已经安装好了 下面将进行代码层面的编写。
在EasySwooleEvent.php 文件中 加载配置目录 如下图所示:
接下来我们在Config 目录新增一个文件 如下:
Kakfa.php
<?php /** * kafka相关连接配置信息 */ return [ 'kafka' => [ "host" => [ '192.168.1.1:9092', ], "zookeeper"=>[ "192.168.1.1:2181/kafka" ], 'kafka_bin_path'=>'/usr/kafka/bin', //kafka的运行命令 本机有kafka客户端 填写 无 忽略 'is_kafka_client'=>true, //本机有kafka客户端 true 有 false 没有,这里与创建 删除topic 有关 若没有客户端,对应topic 需要提前新建好 ], ];新增相关的服务 我们在App/Service 下面做相关服务。
新增KafkaConsumerService.php
<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; class KafkaConsumerService { private $consumer; public function __construct(string $topicName) { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('group.id', 'group_' . $topicName); $conf->set('enable.auto.commit', 'true'); $this->consumer = new KafkaConsumer($conf); $this->consumer->subscribe([$topicName]); } public function listen(callable $callback) { while (true) { $msg = $this->consumer->consume(1000); if (empty($msg)) continue; if ($msg->err === RD_KAFKA_RESP_ERR_NO_ERROR) { $callback($msg->payload); } } } }KafkaProducerService.php
<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; class KafkaProducerService { private static $instance = null; private $producer; private $topics = []; private function __construct() { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('queue.buffering.max.ms', 5); $this->producer = new Producer($conf); } public static function getInstance(): self { if (!self::$instance) { self::$instance = new self(); } return self::$instance; } public function send(string $topicName, string $message): bool { if (!isset($this->topics[$topicName])) { $this->topics[$topicName] = $this->producer->newTopic($topicName); } $topic = $this->topics[$topicName]; $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); $result = $this->producer->flush(10000); return $result === RD_KAFKA_RESP_ERR_NO_ERROR; } }KafkaService.php
<?php /** * Kafka消息服务 * @author 树下水月 * @date 2025年11月27日13:23:23 */ namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; use RdKafka\Admin\TopicSpecification; class KafkaService { /** * 发布数据到kafka * @param string $topic topic信息 * @param array $data 发送数据 数组 * @return bool */ public static function publish(string $topic, array $data): bool { return KafkaProducerService::getInstance()->send( $topic, json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) ); } /** * 订阅kafka数据 * @param string $topic topic信息 * @param callable $callback * @return void */ public static function consume(string $topic, callable $callback) { $consumer = new KafkaConsumerService($topic); $consumer->listen($callback); } /** * 获取kafka 支持啥类型 * @param string $kafkaBinPath * @return string */ private function getKafkaMode(string $kafkaBinPath): string { $help = []; exec("$kafkaBinPath/kafka-topics.sh --help 2>&1", $help); // 新版 Kafka (2.0+) 支持 --bootstrap-server foreach ($help as $line) { if (strpos($line, '--bootstrap-server') !== false) { return 'bootstrap'; } } return 'zookeeper'; } /** * 获取 list 命令 */ private function buildListCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --list"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --list"; } } /** * 获取 create 命令 */ private function buildCreateCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $topic, int $partitions, int $replica, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --create --topic $topic --partitions $partitions --replication-factor $replica"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --create --topic $topic --partitions $partitions --replication-factor $replica"; } } /** * 获取 delete 命令 */ private function buildDeleteCommand(string $kafkaBinPath, string $addr, string $zookeepers, string $topic, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --delete --topic $topic"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeepers --delete --topic $topic"; } } /** * 在 PHP 中通过 exec 创建 Kafka topic * @param string $topicName Topic 名称 * @param int $partitions 分区数 * @param int $replication 副本数 * @param string $kafkaBinPath Kafka bin 目录(包含 kafka-topics.sh) * @return array 返回结果 ['success'=>bool, 'message'=>string] */ public function createKafkaTopic($topicName, $partitions = 1, $replication = 1, $kafka_bootstrap, $zookeepers, $is_kafka_client = false, $kafkaBinPath = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = $kafka_bootstrap; $mode = $this->getKafkaMode($kafkaBinPath); //获取模式 zookeeper bootstrap if ($this->isKafkaTopicExist($topicName, [$addr], [$zookeepers], $kafkaBinPath)) { var_dump("Topic {$topicName} 已存在,跳过创建"); return true; } $cmd = $this->buildCreateCommand($kafkaBinPath, $addr, $zookeepers, $topicName, $partitions, $replication, $mode); //执行创建 topic exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { var_dump("当前系统内没有kafka客户端,跳过topic创建"); return true; //没有客户端 不会创建topic 直接跳过 } } /** * 检查 Topic 是否存在(兼容模式) * @param $topicName 需要检查的topic * @param array $brokers * @param array $zookeeper zookeeper * @param $kafkaBin * @return bool * @throws \Exception */ public function isKafkaTopicExist($topicName, array $brokers, array $zookeeper, $kafkaBin = '/yisa_oe/kafka/bin') { $addr = implode(',', $brokers); $zookeepers = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); //获取模式 $cmd = $this->buildListCommand($kafkaBin, $addr, $zookeepers, $mode); //获取topic 是否存在 exec($cmd . " 2>&1", $output, $returnVar); if ($returnVar !== 0) { throw new \Exception("检查 topic 失败:" . implode("\n", $output)); } return in_array($topicName, $output); } /** * 删除 Topic * @param $topicName 需要删除的topic * @param array $brokers * @param array $zookeeps zookeep 信息 * @param $is_kafka_client 是否有kafka客户端 默认false 无 * @param $kafkaBin kafka 对应的bin执行目录 * @return bool */ public function deleteKafkaTopic($topicName, array $brokers, array $zookeeper, $is_kafka_client = false, $kafkaBin = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = implode(',', $brokers); $zookeeper_str = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); $cmd = $this->buildDeleteCommand($kafkaBin, $addr, $zookeeper_str, $topicName, $mode); exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { return true; } } }我们新增一个路由 这里直接忽略 我们以Test.php 这个控制器为例吧
<?php /** * 测评回调控制器 * @author liupeng * @email liupenga@yisa.com * @date 2025年11月18日14:22:20 */ namespace App\HttpController\Api; use EasySwoole\Http\AbstractInterface\Controller; use App\Service\KafkaService; use EasySwoole\EasySwoole\Config; use EasySwoole\ORM\Exception\Exception; use EasySwoole\Validate\Validate; use App\Model\CommonModel; use App\Model\CommonOrmModel; use EasySwoole\Validate\Functions\Length; class Evaluation extends Controller { public function onRequest(?string $action): bool { return parent::onRequest($action); // TODO: Change the autogenerated stub } public function __construct() { parent::__construct(); $this->config = Config::getInstance()->getConf('common');//配置变量 } public function tet(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $result = $kafka_service->createKafkaTopic($topicName, $partitions, $replication, $kafka_bootstrap, $zookeeps, $is_kafka_client, $kafka_bin_path); //创建kafka 的topic $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 var_dump(KafkaService::publish($topicName, ['ddd' => 33, 'time' => date('Y-m-d H:i:s')])); //写入kafka数据 }这个tet方法 是创建了一个topic 名字为test的 如果存在 就跳过 如果不存在,创建,在然后就是KafkaService::publish 推送数据到对应的topic 中。
删除Topic
public function tet1(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 $result = $kafka_service->deleteKafkaTopic('Liupeng', $brokers, $zookeeps_arr, $is_kafka_client, $kafka_bin_path); //删除topic }订阅某个kafka数据
public function dd() { $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'kkk'; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 //消费kafka数据 一致占用 KafkaService::consume($topicName,function ($msg){ echo "收到". $msg . PHP_EOL; }); }KafkaService::consume 订阅数据