flink processFunction算子
创始人
2025-05-31 09:51:19
0

flink processFunction算子

  • 1 process function 概述

1 process function 概述

process function 相对于前文所述的 map、flatmap、filter 算子来说,最大的区别是其让开发人员对数据 的 处 理 逻 辑 拥 有 更 大 的 自 由 度 ; 同 时 , ProcessFunction 继 承 了 RichFunction , 因 而 具 备 了 getRuntimeContext() ,open() ,close()等方法;

在不同类型的 datastream 上,(比如 keyed stream、windowedStream、ConnectedStream 等),应用 process function 时,flink 提供了大量不同类型的 process function,让其针对不同的 datastream 拥有更具针对 性的功能;

  • ProcessFunction (普通 DataStream 上调 process 时)
  • KeyedProcessFunction (KeyedStream 上调 process 时)
  • ProcessWindowFunction(WindowedStream 上调 process 时)
  • ProcessAllWindowFunction(AllWindowedStream 上调 process 时)
  • CoProcessFuntion (ConnectedStreams 上调 process 时)
  • ProcessJoinFunction (JoinedStreams 上调 process 时)
  • BroadcastProcessFunction (BroadCastConnectedStreams 上调 process 时)
  • KeyedBroadcastProcessFunction(KeyedBroadCastConnectedStreams 上调 process 时)

各种算子运算后所生成的 datastream 类型,及各种 datastream 类型之间的互相转换关系:
在这里插入图片描述

/*** @Author: deep as the sea* @Site: www.51doit.com* @QQ: 657270652* @Date: 2022/4/26* @Desc: process 算子及 ProcessFunction 示例* 在不同类型的数据流上,调用 process 算子时,所需要传入的 ProcessFunction 也会有不同*/
public class _17_ProcessFunctions_Demo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger("rest.port", 8822);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);// id,eventIdDataStreamSource stream1 = env.socketTextStream("localhost", 9998);/*** 在普通的 datastream 上调用 process 算子,传入的是 "ProcessFunction"*/SingleOutputStreamOperator> s1 = stream1.process(new ProcessFunction>() {// 可以使用 生命周期 open 方法@Overridepublic void open(Configuration parameters) throws Exception {// 可以调用 getRuntimeContext 方法拿到各种运行时上下文信息RuntimeContext runtimeContext = getRuntimeContext();runtimeContext.getTaskName();super.open(parameters);}@Overridepublic void processElement(String value, ProcessFunction>.Context ctx, Collector> out) throws Exception {// 可以做测流输出ctx.output(new OutputTag("s1", Types.STRING), value);// 可以做主流输出String[] arr = value.split(",");out.collect(Tuple2.of(arr[0], arr[1]));}// 可以使用 生命周期 close 方法@Overridepublic void close() throws Exception {super.close();}});/*** 在 keyedStream 上调用 process 算子,传入的是 "KeyedProcessFunction"* KeyedProcessFunction 中的,泛型 1:流中的 key 的类型;泛型 2:流中的数据的类型;泛型 3:处理后的输出结果的类型*/// 对 s1 流进行 keyby 分组KeyedStream, String> keyedStream = s1.keyBy(tp2 -> tp2.f0);// 然后在 keyby 后的数据流上调用 process 算子SingleOutputStreamOperator> s2 = keyedStream.process(new KeyedProcessFunction, Tuple2>() {@Overridepublic void processElement(Tuple2 value, KeyedProcessFunction, Tuple2>.Context ctx, Collector> out) throws Exception {// 把 id 变整数,把 eventId 变大写out.collect(Tuple2.of(Integer.parseInt(value.f0), value.f1.toUpperCase()));}});s2.print();env.execute();}
}

相关内容

热门资讯

Spring 远程加载配置 本文以携程的Apollo和阿里的Nacos为例。 pom中引入一下依赖: ...
四川安居端午游园会启幕 首日迎... 张勤宝 封面新闻记者 刘虎端午佳节,四川省遂宁市安居区节日氛围浓,人们体验端午民俗,感悟传统文化。在...
前端-http请求 目录:  (1)表单  (2)...
到四川旅游团建五天一般多少钱,... 标题:到四川旅游团建五天一般多少钱,驴友亲测!——与乐乐一起的难忘旅程 四川旅游推荐!当地导游-乐乐...
去四川旅游攻略小包团五日游报价... 标题:去四川旅游攻略小包团五日游报价亲测,跟着乐乐玩转天府之国! 四川旅游推荐!当地导游-乐乐:18...
上海6月1日入汛 6月1日,上海正式入汛,汛期历时四个月,将至9月30日结束。据气象部门此前发布的初步预测,2025年...
2023-DataWorks数...  DataWorks开发规范 1 数仓基本概念1.4.1 ods数据源层表命名规范1.4.2 dim...
一、基础算法4:高精度 模板题... 文章目录算法模板高精度加法模板高精度减法模板高精度乘低精度模板高精度除以低精度模板模板题高精度加法原...
原创 烙... #图文打卡计划#小时候最盼着奶奶做烙饼的日子。灶台边飘着面香,铁锅里滋啦作响,她总能用那双布满皱纹的...
椰泰新品上市丨餐桌有简胃,健康... 餐桌有简胃,健康又开味
jmeter公共数据维护 为什么要进行公共数据维护?数据维护的方法是?一、用户自定义变量添加一个用...
家常菜:素炒小油菜,非常适合夏... 嗨!大家好!我是懒懒,爱美食爱生活! 一名每天在思考吃什么的90后社畜,喜欢做饭的业余美食制作者,用...
不仅好吃,还能提高复习效率!一... 高考的钟声渐近,紧迫感仿佛在空气中弥漫。一道美味的海鲜小菜,不仅能慰藉疲惫的心灵,还能提供能量,帮助...
原创 万... 很多糖尿病人还不知道,苦荞茶的降血糖功效值得关注。今天,邀请糖尿病专家万晓刚主任从现代医学与中医角度...
GuLi商城-SpringCl... Nacos支持三种配置加载方方案 Nacos支持Namespace + group +...