flink processFunction算子
创始人
2025-05-31 09:51:19

flink processFunction算子

  • 1 process function 概述

1 process function 概述

process function 相对于前文所述的 map、flatmap、filter 算子来说,最大的区别是其让开发人员对数据 的 处 理 逻 辑 拥 有 更 大 的 自 由 度 ; 同 时 , ProcessFunction 继 承 了 RichFunction , 因 而 具 备 了 getRuntimeContext() ,open() ,close()等方法;

在不同类型的 datastream 上,(比如 keyed stream、windowedStream、ConnectedStream 等),应用 process function 时,flink 提供了大量不同类型的 process function,让其针对不同的 datastream 拥有更具针对 性的功能;

  • ProcessFunction (普通 DataStream 上调 process 时)
  • KeyedProcessFunction (KeyedStream 上调 process 时)
  • ProcessWindowFunction(WindowedStream 上调 process 时)
  • ProcessAllWindowFunction(AllWindowedStream 上调 process 时)
  • CoProcessFuntion (ConnectedStreams 上调 process 时)
  • ProcessJoinFunction (JoinedStreams 上调 process 时)
  • BroadcastProcessFunction (BroadCastConnectedStreams 上调 process 时)
  • KeyedBroadcastProcessFunction(KeyedBroadCastConnectedStreams 上调 process 时)

各种算子运算后所生成的 datastream 类型,及各种 datastream 类型之间的互相转换关系:
在这里插入图片描述

/*** @Author: deep as the sea* @Site: www.51doit.com* @QQ: 657270652* @Date: 2022/4/26* @Desc: process 算子及 ProcessFunction 示例* 在不同类型的数据流上,调用 process 算子时,所需要传入的 ProcessFunction 也会有不同*/
public class _17_ProcessFunctions_Demo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger("rest.port", 8822);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);// id,eventIdDataStreamSource stream1 = env.socketTextStream("localhost", 9998);/*** 在普通的 datastream 上调用 process 算子,传入的是 "ProcessFunction"*/SingleOutputStreamOperator> s1 = stream1.process(new ProcessFunction>() {// 可以使用 生命周期 open 方法@Overridepublic void open(Configuration parameters) throws Exception {// 可以调用 getRuntimeContext 方法拿到各种运行时上下文信息RuntimeContext runtimeContext = getRuntimeContext();runtimeContext.getTaskName();super.open(parameters);}@Overridepublic void processElement(String value, ProcessFunction>.Context ctx, Collector> out) throws Exception {// 可以做测流输出ctx.output(new OutputTag("s1", Types.STRING), value);// 可以做主流输出String[] arr = value.split(",");out.collect(Tuple2.of(arr[0], arr[1]));}// 可以使用 生命周期 close 方法@Overridepublic void close() throws Exception {super.close();}});/*** 在 keyedStream 上调用 process 算子,传入的是 "KeyedProcessFunction"* KeyedProcessFunction 中的,泛型 1:流中的 key 的类型;泛型 2:流中的数据的类型;泛型 3:处理后的输出结果的类型*/// 对 s1 流进行 keyby 分组KeyedStream, String> keyedStream = s1.keyBy(tp2 -> tp2.f0);// 然后在 keyby 后的数据流上调用 process 算子SingleOutputStreamOperator> s2 = keyedStream.process(new KeyedProcessFunction, Tuple2>() {@Overridepublic void processElement(Tuple2 value, KeyedProcessFunction, Tuple2>.Context ctx, Collector> out) throws Exception {// 把 id 变整数,把 eventId 变大写out.collect(Tuple2.of(Integer.parseInt(value.f0), value.f1.toUpperCase()));}});s2.print();env.execute();}
}

相关内容

热门资讯

正式开放!衡阳新增一户外运动休... 大安乡是油茶大乡,有近3万亩油茶林,山清水秀,旅游资源丰富。去年12月,大安乡与衡阳县文化旅游发展有...
藏不住了!黄岩这条线路全省推荐... 近日 浙江省第二批 人文乡村旅游线路出炉 黄岩区“宋韵千年 橘乡甜城”之旅 上榜 这条线路串联起了...
喜报!龙岩新晋3处国家3A级旅... 近日 龙岩市文化和旅游局网站发布公示 批准上杭红色大连景区 武平县上湖战斗纪念园景区 漳平水韵圆潭旅...
体育旅游:在运动中邂逅诗与远方 沿着山间步道徒步穿行,在湖光山色间挥杆击球,于碧海蓝天下逐浪冲浪……如今,越来越多的人告别传统观光式...
原创 这... 《——【·前言·】——》 你能想象吗?就在此刻,地球上有个国家正在经历着比电影更残酷的现实。 街道上...