news 2026/1/31 23:33:18

Easyoole 使用rdkafka 进行kafka的创建topic创建 删除 以及数据发布 订阅

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Easyoole 使用rdkafka 进行kafka的创建topic创建 删除 以及数据发布 订阅

开发环境 Linux

首先我们需要下载安装librdkafka

https://github.com/confluentinc/librdkafka/tags?after=v2.10.0-RC2

tar -zxvf librdkafka-2.7.0.tar.gz cd librdkafka-2.7.0 ./configure make && make install

如何知道我们安装成功了呢

ldconfig -p | grep rdkafka

如果有如下输出 就说明安装成功了 。

接下来 我们安装 rdkafka 的PHP 扩展

https://pecl.php.net/package/rdkafka这里我们选择6.0.0

tar -zxvf rdkafka-6.0.0.tgz cd rdkafka-6.0.0 /usr/php/bin/phpize ./configure --with-php-config=/usr/php/bin/php-config make && make

其中phpize 和 php-config 请修改你自己的PHP 环境。

编译好后,修改php.ini 将rdkafka.so 添加到配置中,重启php 运行如下命令,看看是否扩展生效。

php --ri rdkafka

如果有输出 说明对应的kafka扩展已经安装好了 下面将进行代码层面的编写。

在EasySwooleEvent.php 文件中 加载配置目录 如下图所示:

接下来我们在Config 目录新增一个文件 如下:

Kakfa.php

<?php /** * kafka相关连接配置信息 */ return [ 'kafka' => [ "host" => [ '192.168.1.1:9092', ], "zookeeper"=>[ "192.168.1.1:2181/kafka" ], 'kafka_bin_path'=>'/usr/kafka/bin', //kafka的运行命令 本机有kafka客户端 填写 无 忽略 'is_kafka_client'=>true, //本机有kafka客户端 true 有 false 没有,这里与创建 删除topic 有关 若没有客户端,对应topic 需要提前新建好 ], ];

新增相关的服务 我们在App/Service 下面做相关服务。

新增KafkaConsumerService.php

<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; class KafkaConsumerService { private $consumer; public function __construct(string $topicName) { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('group.id', 'group_' . $topicName); $conf->set('enable.auto.commit', 'true'); $this->consumer = new KafkaConsumer($conf); $this->consumer->subscribe([$topicName]); } public function listen(callable $callback) { while (true) { $msg = $this->consumer->consume(1000); if (empty($msg)) continue; if ($msg->err === RD_KAFKA_RESP_ERR_NO_ERROR) { $callback($msg->payload); } } } }

KafkaProducerService.php

<?php namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; class KafkaProducerService { private static $instance = null; private $producer; private $topics = []; private function __construct() { $config = Config::getInstance()->getConf('kafka');//配置变量 $brokers = implode(',', $config['host']); $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $brokers); $conf->set('queue.buffering.max.ms', 5); $this->producer = new Producer($conf); } public static function getInstance(): self { if (!self::$instance) { self::$instance = new self(); } return self::$instance; } public function send(string $topicName, string $message): bool { if (!isset($this->topics[$topicName])) { $this->topics[$topicName] = $this->producer->newTopic($topicName); } $topic = $this->topics[$topicName]; $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message); $result = $this->producer->flush(10000); return $result === RD_KAFKA_RESP_ERR_NO_ERROR; } }

KafkaService.php

<?php /** * Kafka消息服务 * @author 树下水月 * @date 2025年11月27日13:23:23 */ namespace App\Service; use EasySwoole\EasySwoole\Config; use RdKafka\Producer; use RdKafka\KafkaConsumer; use RdKafka\Admin\TopicSpecification; class KafkaService { /** * 发布数据到kafka * @param string $topic topic信息 * @param array $data 发送数据 数组 * @return bool */ public static function publish(string $topic, array $data): bool { return KafkaProducerService::getInstance()->send( $topic, json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) ); } /** * 订阅kafka数据 * @param string $topic topic信息 * @param callable $callback * @return void */ public static function consume(string $topic, callable $callback) { $consumer = new KafkaConsumerService($topic); $consumer->listen($callback); } /** * 获取kafka 支持啥类型 * @param string $kafkaBinPath * @return string */ private function getKafkaMode(string $kafkaBinPath): string { $help = []; exec("$kafkaBinPath/kafka-topics.sh --help 2>&1", $help); // 新版 Kafka (2.0+) 支持 --bootstrap-server foreach ($help as $line) { if (strpos($line, '--bootstrap-server') !== false) { return 'bootstrap'; } } return 'zookeeper'; } /** * 获取 list 命令 */ private function buildListCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --list"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --list"; } } /** * 获取 create 命令 */ private function buildCreateCommand(string $kafkaBinPath, string $addr, string $zookeeps, string $topic, int $partitions, int $replica, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --create --topic $topic --partitions $partitions --replication-factor $replica"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeeps --create --topic $topic --partitions $partitions --replication-factor $replica"; } } /** * 获取 delete 命令 */ private function buildDeleteCommand(string $kafkaBinPath, string $addr, string $zookeepers, string $topic, string $mode) { if ($mode === 'bootstrap') { return "$kafkaBinPath/kafka-topics.sh --bootstrap-server $addr --delete --topic $topic"; } else { return "$kafkaBinPath/kafka-topics.sh --zookeeper $zookeepers --delete --topic $topic"; } } /** * 在 PHP 中通过 exec 创建 Kafka topic * @param string $topicName Topic 名称 * @param int $partitions 分区数 * @param int $replication 副本数 * @param string $kafkaBinPath Kafka bin 目录(包含 kafka-topics.sh) * @return array 返回结果 ['success'=>bool, 'message'=>string] */ public function createKafkaTopic($topicName, $partitions = 1, $replication = 1, $kafka_bootstrap, $zookeepers, $is_kafka_client = false, $kafkaBinPath = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = $kafka_bootstrap; $mode = $this->getKafkaMode($kafkaBinPath); //获取模式 zookeeper bootstrap if ($this->isKafkaTopicExist($topicName, [$addr], [$zookeepers], $kafkaBinPath)) { var_dump("Topic {$topicName} 已存在,跳过创建"); return true; } $cmd = $this->buildCreateCommand($kafkaBinPath, $addr, $zookeepers, $topicName, $partitions, $replication, $mode); //执行创建 topic exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { var_dump("当前系统内没有kafka客户端,跳过topic创建"); return true; //没有客户端 不会创建topic 直接跳过 } } /** * 检查 Topic 是否存在(兼容模式) * @param $topicName 需要检查的topic * @param array $brokers * @param array $zookeeper zookeeper * @param $kafkaBin * @return bool * @throws \Exception */ public function isKafkaTopicExist($topicName, array $brokers, array $zookeeper, $kafkaBin = '/yisa_oe/kafka/bin') { $addr = implode(',', $brokers); $zookeepers = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); //获取模式 $cmd = $this->buildListCommand($kafkaBin, $addr, $zookeepers, $mode); //获取topic 是否存在 exec($cmd . " 2>&1", $output, $returnVar); if ($returnVar !== 0) { throw new \Exception("检查 topic 失败:" . implode("\n", $output)); } return in_array($topicName, $output); } /** * 删除 Topic * @param $topicName 需要删除的topic * @param array $brokers * @param array $zookeeps zookeep 信息 * @param $is_kafka_client 是否有kafka客户端 默认false 无 * @param $kafkaBin kafka 对应的bin执行目录 * @return bool */ public function deleteKafkaTopic($topicName, array $brokers, array $zookeeper, $is_kafka_client = false, $kafkaBin = '/yisa_oe/kafka/bin') { if ($is_kafka_client) { $addr = implode(',', $brokers); $zookeeper_str = implode(',', $zookeeper); $mode = $this->getKafkaMode($kafkaBin); $cmd = $this->buildDeleteCommand($kafkaBin, $addr, $zookeeper_str, $topicName, $mode); exec($cmd . " 2>&1", $output, $returnVar); return $returnVar === 0; } else { return true; } } }

我们新增一个路由 这里直接忽略 我们以Test.php 这个控制器为例吧

<?php /** * 测评回调控制器 * @author liupeng * @email liupenga@yisa.com * @date 2025年11月18日14:22:20 */ namespace App\HttpController\Api; use EasySwoole\Http\AbstractInterface\Controller; use App\Service\KafkaService; use EasySwoole\EasySwoole\Config; use EasySwoole\ORM\Exception\Exception; use EasySwoole\Validate\Validate; use App\Model\CommonModel; use App\Model\CommonOrmModel; use EasySwoole\Validate\Functions\Length; class Evaluation extends Controller { public function onRequest(?string $action): bool { return parent::onRequest($action); // TODO: Change the autogenerated stub } public function __construct() { parent::__construct(); $this->config = Config::getInstance()->getConf('common');//配置变量 } public function tet(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $result = $kafka_service->createKafkaTopic($topicName, $partitions, $replication, $kafka_bootstrap, $zookeeps, $is_kafka_client, $kafka_bin_path); //创建kafka 的topic $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 var_dump(KafkaService::publish($topicName, ['ddd' => 33, 'time' => date('Y-m-d H:i:s')])); //写入kafka数据 }

这个tet方法 是创建了一个topic 名字为test的 如果存在 就跳过 如果不存在,创建,在然后就是KafkaService::publish 推送数据到对应的topic 中。

删除Topic

public function tet1(){ $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'test'; $partitions = 1; $replication = 1; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 $result = $kafka_service->deleteKafkaTopic('Liupeng', $brokers, $zookeeps_arr, $is_kafka_client, $kafka_bin_path); //删除topic }

订阅某个kafka数据

public function dd() { $config = Config::getInstance()->getConf('kafka');//配置变量 $kafka_bootstrap = implode(',', $config['host']); // 逗号分隔 $zookeeps = implode(',', $config['zookeeper']); //zookeep地址 $is_kafka_client = $config['is_kafka_client']; //是否存在 $kafka_bin_path = $config['kafka_bin_path']; $topicName = 'kkk'; $kafka_service = new KafkaService(); $brokers = $config['host']; //数组 $zookeeps_arr = $config['zookeeper']; //数组 //消费kafka数据 一致占用 KafkaService::consume($topicName,function ($msg){ echo "收到". $msg . PHP_EOL; }); }

KafkaService::consume 订阅数据

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/28 11:31:28

EMS-NT企业微电网能碳管理平台:架构、功能与应用研究

摘要随着“双碳”目标的推进&#xff0c;工业企业与园区面临能源管理与碳排放控制的双重挑战。本文基于《工业企业和园区数字化能碳管理中心建设指南》等政策背景&#xff0c;系统阐述了EMS-NT企业微电网能碳管理平台的解决方案、功能架构、关键技术及应用案例。平台以能源管理…

作者头像 李华
网站建设 2026/1/30 6:38:34

读捍卫隐私10读后总结与感想兼导读

1. 基本信息捍卫隐私​[美]凯文米特尼克&#xff0c;罗伯特瓦摩西&#xff0c;浙江人民出版社2019年9月1.1. 读薄率书籍总字数17.9万字&#xff0c;笔记总字数35721字。读薄率35721179000≈19.96%1.2. 读厚方向当我点击时&#xff0c;算法在想什么&#xff1f;算法霸权极简算法…

作者头像 李华
网站建设 2026/1/31 3:59:09

华为云国际站代理商的AS跨境有什么优势呢?

你这里提到的 AS 大概率是华为云的自动伸缩&#xff08;Auto Scaling&#xff09;服务&#xff0c;华为云国际站代理商提供的该服务用于跨境场景时&#xff0c;能凭借技术适配、成本优化和本地化服务等多方面优势&#xff0c;助力企业解决跨境业务中的资源调度、合规和运维等难…

作者头像 李华
网站建设 2026/1/22 10:39:17

NPP 草原:美国中部平原实验牧场(SGS),1939-1990 年,R1

NPP Grassland: Central Plains Experimental Range (SGS), USA, 1939-1990, R1 简介 该数据集记录了位于科罗拉多州中北部中央平原实验保护区&#xff08;CPER&#xff09;/波尼国家草原的半干旱短草草原的生产力。数据集包含九个数据文件&#xff08;.txt&#xff09;。其中…

作者头像 李华
网站建设 2026/1/28 8:44:34

CCD相机同步外触发拍照抓拍识别高速脉冲计数器信号采集模块

相机触发脉冲计数器是一种基于外部脉冲信号&#xff08;如来自编码器或传感器&#xff09;的触发模式&#xff0c;用于在特定脉冲数量到达时启动相机图像采集。这种模式通过计数器模块累积输入脉冲&#xff0c;并在达到预设阈值时生成触发信号&#xff0c;实现精确的定时或等距…

作者头像 李华