news 2026/6/26 1:56:04

flink的streaming api 统计文本中的字段个数

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
flink的streaming api 统计文本中的字段个数

1.flink 的streaming api初步学习

有界数据流处理,文件数据处理。

package com.ycl; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountStreamDemo { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.读取数据 DataStreamSource<String> lineDS = env.readTextFile("input/word.txt"); //3.处理数据 切分,转换,分组,聚合。 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { //按照 空格切分 String[] words = value.split(" "); for (String word : words) { //转换成 二元组 (word,1) Tuple2<String, Integer> wordAndOne = Tuple2.of(word, 1); //通过采集器向下游发送数据 out.collect(wordAndOne); } } }); //3.2 分组 KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); //3.3聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1); //4.输出数据 sumDS.print(); //5.执行:类似 sparkstreaming 最后,ssc.start(); env.execute(); } } /** 接口 A,里面有一个方法a(); 正常写法,定义个 class B,实现接口A,方法a() B b = new B(); 匿名实现类: new A(){ a(){} } */

输出结果如下;

前面的编号是,并行度线程编号。

在 DataSet API 里面分组使用的groupBy ; 在streaming里面使用的分组函数是: keyBy;
执行环境: DataSet 是:ExecutionEnvironment, Streaming 是: StreamExecutionEnvironment
调用: DataStream里面 env.execute();是必须调用的。DataSet不用去调用。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/26 1:50:44

HS2-HF Patch:3步完成HoneySelect2游戏终极增强

HS2-HF Patch&#xff1a;3步完成HoneySelect2游戏终极增强 【免费下载链接】HS2-HF_Patch Automatically translate, uncensor and update HoneySelect2! 项目地址: https://gitcode.com/gh_mirrors/hs/HS2-HF_Patch HS2-HF Patch是HoneySelect2玩家的游戏增强终极解决…

作者头像 李华
网站建设 2026/6/26 1:48:41

如何看待anthropic指控阿里 qwen 蒸馏 Claude ?

2900万次交锋下的“知识走私”&#xff1a;Anthropic 控诉阿里 Qwen 蒸馏背后的真相与底层博弈就在今天早晨&#xff0c;科技圈和开发者社区被一条具有里程碑性质的深水炸弹彻底炸翻了。外媒纷纷以头条头版披露&#xff1a;硅谷人工智能新贵 Anthropic 已经正式向美国参议院和白…

作者头像 李华
网站建设 2026/6/26 1:48:21

Transformer工程化学习路线图:从手写代码到生产落地

1. 这不是又一本“Transformer原理科普”&#xff0c;而是一份可执行的工程化学习路线图 “Learning Transformers: Code, Concepts, and Impact”——这个标题里藏着三个被绝大多数教程刻意割裂的维度&#xff1a; Code&#xff08;可运行的代码&#xff09; 、 Concepts&a…

作者头像 李华
网站建设 2026/6/26 1:48:07

评测:Codex、Manus、Claude Code、OpenClaw 谁才是最强的 Agent

本文基于至顶AI实验室的真实工作流实测框架&#xff0c;对 Codex、Manus、Claude Code、OpenClaw&#xff08;开源 Agent 工具&#xff0c;俗称"龙虾"&#xff09;四款主流 Agent 工具在部署难度、应用性、扩展性、办公能力、代码能力、工具调用能力六个维度下的表现…

作者头像 李华
网站建设 2026/6/26 1:44:03

火山引擎多模态数据湖的制作思路

火山引擎多模态数据湖的设计核心在于统一存储、灵活计算与高效治理。以下是其关键架构思路&#xff1a;1. 统一存储层底座支持异构数据通过分布式对象存储&#xff08;如HDFS兼容存储&#xff09;承载结构化数据&#xff08;如数据库表&#xff09;、半结构化数据&#xff08;如…

作者头像 李华