news 2026/2/25 19:31:01

Flink ML K-Means 离线聚类 + 在线增量聚类(mini-batch + decayFactor)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink ML K-Means 离线聚类 + 在线增量聚类(mini-batch + decayFactor)

一、K-Means(离线版):有限数据上的迭代聚类

1)输入列(Input Columns)

参数名类型默认值说明
featuresColVector"features"特征向量

2)输出列(Output Columns)

参数名类型默认值说明
predictionColInteger"prediction"预测所属簇 ID(簇中心编号)

3)参数(Parameters)详解

KMeansModel(预测侧)参数
Key默认值类型说明
distanceMeasureEuclideanDistanceMeasure.NAMEString距离度量(当前支持欧式距离)
featuresCol"features"String特征列名
predictionCol"prediction"String输出列名
k2Integer簇数量(最大聚类数)
KMeans(训练侧)额外参数
Key默认值类型说明
initMode"random"String初始化方式(当前支持 random)
seednullLong随机种子(保证可复现)
maxIter20Integer最大迭代次数

4)Java 示例代码解读(离线 KMeans)

示例流程很标准:

  1. 构造输入数据(DenseVector 流)
  2. DataStream → Table,并命名为features
  3. kmeans.fit(table)训练得到KMeansModel
  4. model.transform(table)输出每条数据的簇 ID
  5. collect 打印 features + clusterId

关键代码片段:

DataStream<DenseVector>inputStream=env.fromElements(Vectors.dense(0.0,0.0),Vectors.dense(0.0,0.3),Vectors.dense(0.3,0.0),Vectors.dense(9.0,0.0),Vectors.dense(9.0,0.6),Vectors.dense(9.6,0.0));TableinputTable=tEnv.fromDataStream(inputStream).as("features");KMeanskmeans=newKMeans().setK(2).setSeed(1L);KMeansModelkmeansModel=kmeans.fit(inputTable);TableoutputTable=kmeansModel.transform(inputTable)[0];

输出打印(预测列是 Integer):

DenseVectorfeatures=(DenseVector)row.getField(kmeans.getFeaturesCol());intclusterId=(Integer)row.getField(kmeans.getPredictionCol());

二、Online K-Means:无界流上的持续聚类(mini-batch + 遗忘)

1)为什么需要 Online K-Means?

离线 KMeans 训练出来的中心是“固定”的。
但很多业务数据分布会随时间变化,例如:

  • 用户行为习惯变了
  • 商品/内容热点变化
  • 流量来源变化

这时你希望模型能“持续学习”,让聚类中心跟着数据漂移而更新,就需要 Online K-Means。

2)Online K-Means 的核心思想(mini-batch + decayFactor)

Online K-Means 基于“mini-batch KMeans”的更新规则,并加入遗忘机制(decay):

  • 每次从训练流中积累一个 mini-batch
  • 基于这个 batch 计算临时中心(estimated centroids)
  • 用加权平均更新旧中心(original centroids):

decayFactor 解释(非常关键)

  • decayFactor = 1:历史与新数据同等重要(几乎不遗忘)
  • decayFactor = 0:完全由最新数据决定中心(强遗忘)
  • 值越小 → 遗忘越强 → 模型越“跟新”
  • 值越大 → 趋于稳定 → 变化越慢

3)输入输出列(Online)

输入列同离线:

参数名类型默认值说明
featuresColVector"features"特征向量

输出列同离线:

参数名类型默认值说明
predictionColInteger"prediction"所属簇 ID

4)参数(OnlineKMeans)详解

OnlineKMeansModel(预测侧)
Key默认值类型说明
distanceMeasureEuclideanDistanceMeasure.NAMEString距离度量(欧式距离)
featuresCol"features"String特征列名
predictionCol"prediction"String输出列名
k2Integer簇数量
OnlineKMeans(训练侧)额外参数
Key默认值类型说明
batchStrategyCOUNT_STRATEGYStringmini-batch 构造策略
globalBatchSize32Integer全局 batch 大小
decayFactor0.0Double遗忘系数(历史中心贡献缩放)
seednullLong随机种子

5)Java 示例代码解读(OnlineKMeans)

示例里做了非常“演示型”的设计:训练数据分两段周期性出现,观察聚类结果如何随时间变化。

(1)训练流是无限流,周期性吐两批不同分布的数据
  • trainData1:大致在 (0~10) 附近
  • trainData2:分布跳到了 (10,100) 与 (-10,-100) 两块

这等于让数据分布发生“漂移”,你就能看到在线聚类中心被新数据影响。

(2)predict 也是周期性吐同一组预测点
List<Row>predictData=Arrays.asList(Row.of(Vectors.dense(10.0,10.0)),Row.of(Vectors.dense(-10.0,10.0)));

输出里会不停打印:

  • 两个点是否被分到同一个簇
    因为随着训练数据改变、中心改变,聚类结果可能随时间变化。
(3)初始化模型数据 initialModelData

在线聚类必须有初始中心,否则没法开始迭代。示例使用:

.setInitialModelData(KMeansModelData.generateRandomModelData(tEnv,2,2,0.0,0))

含义是:

  • k=2 个中心
  • 每个中心 2 维
  • 随机生成初始中心
(4)globalBatchSize=6:每 6 条数据更新一次中心
.setGlobalBatchSize(6)

这与训练数据每批 6 条刚好对应,便于演示“每批更新一次”的效果。

三、离线 KMeans vs 在线 OnlineKMeans:怎么选?

选离线 KMeans 的典型场景

  • 你有明确的历史数据窗口(按天、按周)
  • 模型周期性训练发布,追求稳定可控
  • 线上只是推理(transform),不希望训练影响延迟

选 OnlineKMeans 的典型场景

  • 数据持续流入且分布变化快
  • 你希望模型能持续适应新模式(概念漂移)
  • 你可以接受聚类结果随时间变化

四、实战建议(非常重要)

1)KMeans 之前强烈建议做标准化

KMeans 基于距离(欧式距离),特征尺度不同会导致“某个维度支配聚类”。典型做法:

  • VectorAssembler(拼特征)
  • StandardScaler(标准化)
  • KMeans / OnlineKMeans

2)k 的选择不要拍脑袋

常见方法:

  • 肘部法(Elbow)
  • 轮廓系数(Silhouette)
  • 结合业务可解释性(比如用户分群常选 5/8/10)

3)OnlineKMeans 的 decayFactor 是控制“跟新程度”的旋钮

简单经验:

  • 数据分布很稳定:decayFactor 接近 1
  • 数据漂移明显:decayFactor 取 0.1~0.5 让模型更灵活
  • 想快速跟随热点:decayFactor 更小

4)batch size 与更新频率要结合吞吐与稳定性

  • batch 小:更新快但抖动大
  • batch 大:更稳定但响应慢

五、小结

Flink ML 的 KMeans 家族可以覆盖绝大多数“聚类/分群”需求:

  • KMeans(离线):有限数据、迭代训练、中心稳定
  • OnlineKMeans(在线):无界流、mini-batch 更新、支持遗忘机制

掌握了k / maxIter / globalBatchSize / decayFactor这些关键参数,你就能把聚类从“demo”落到“线上可用”。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/24 20:20:29

Java计算机毕设之基于SpringBoot的私房菜上门定制系统的设计与实现基于springboot+vue的私房菜定制上门服务系统的设计与实现(完整前后端代码+说明文档+LW,调试定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/2/23 2:36:56

Git reset撤销错误提交,保护PyTorch项目历史

Git reset撤销错误提交&#xff0c;保护PyTorch项目历史 在深度学习项目的日常开发中&#xff0c;你是否曾经历过这样的瞬间&#xff1a;刚提交完代码&#xff0c;突然发现训练脚本里还留着调试用的 print() 语句&#xff1f;或者不小心把包含敏感信息的配置文件推到了仓库&…

作者头像 李华
网站建设 2026/2/25 7:55:09

PyTorch-CUDA基础镜像安全加固措施说明

PyTorch-CUDA 基础镜像安全加固实践指南 在现代 AI 工程体系中&#xff0c;一个看似简单的命令 docker run --gpus all pytorch-cuda:v2.6 背后&#xff0c;往往承载着从算法研发到生产部署的完整链路。然而&#xff0c;当我们在享受“一键启动”带来的便利时&#xff0c;是否…

作者头像 李华
网站建设 2026/2/25 12:38:56

【毕业设计】基于SpringBoot的高校学习讲座预约系统的设计与实现(源码+文档+远程调试,全bao定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

作者头像 李华
网站建设 2026/2/25 4:49:07

Docker Compose编排PyTorch多卡训练环境,支持分布式计算

Docker Compose 编排 PyTorch 多卡训练环境&#xff0c;支持分布式计算 在深度学习项目开发中&#xff0c;最让人头疼的往往不是模型设计本身&#xff0c;而是“环境配置”这个前置环节。你是否经历过这样的场景&#xff1a;同事发来一份训练代码&#xff0c;信心满满地准备复…

作者头像 李华