flink processFunction算子
创始人
2025-05-31 09:51:19

flink processFunction算子

  • 1 process function 概述

1 process function 概述

process function 相对于前文所述的 map、flatmap、filter 算子来说,最大的区别是其让开发人员对数据 的 处 理 逻 辑 拥 有 更 大 的 自 由 度 ; 同 时 , ProcessFunction 继 承 了 RichFunction , 因 而 具 备 了 getRuntimeContext() ,open() ,close()等方法;

在不同类型的 datastream 上,(比如 keyed stream、windowedStream、ConnectedStream 等),应用 process function 时,flink 提供了大量不同类型的 process function,让其针对不同的 datastream 拥有更具针对 性的功能;

  • ProcessFunction (普通 DataStream 上调 process 时)
  • KeyedProcessFunction (KeyedStream 上调 process 时)
  • ProcessWindowFunction(WindowedStream 上调 process 时)
  • ProcessAllWindowFunction(AllWindowedStream 上调 process 时)
  • CoProcessFuntion (ConnectedStreams 上调 process 时)
  • ProcessJoinFunction (JoinedStreams 上调 process 时)
  • BroadcastProcessFunction (BroadCastConnectedStreams 上调 process 时)
  • KeyedBroadcastProcessFunction(KeyedBroadCastConnectedStreams 上调 process 时)

各种算子运算后所生成的 datastream 类型,及各种 datastream 类型之间的互相转换关系:
在这里插入图片描述

/*** @Author: deep as the sea* @Site: www.51doit.com* @QQ: 657270652* @Date: 2022/4/26* @Desc: process 算子及 ProcessFunction 示例* 在不同类型的数据流上,调用 process 算子时,所需要传入的 ProcessFunction 也会有不同*/
public class _17_ProcessFunctions_Demo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger("rest.port", 8822);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);// id,eventIdDataStreamSource stream1 = env.socketTextStream("localhost", 9998);/*** 在普通的 datastream 上调用 process 算子,传入的是 "ProcessFunction"*/SingleOutputStreamOperator> s1 = stream1.process(new ProcessFunction>() {// 可以使用 生命周期 open 方法@Overridepublic void open(Configuration parameters) throws Exception {// 可以调用 getRuntimeContext 方法拿到各种运行时上下文信息RuntimeContext runtimeContext = getRuntimeContext();runtimeContext.getTaskName();super.open(parameters);}@Overridepublic void processElement(String value, ProcessFunction>.Context ctx, Collector> out) throws Exception {// 可以做测流输出ctx.output(new OutputTag("s1", Types.STRING), value);// 可以做主流输出String[] arr = value.split(",");out.collect(Tuple2.of(arr[0], arr[1]));}// 可以使用 生命周期 close 方法@Overridepublic void close() throws Exception {super.close();}});/*** 在 keyedStream 上调用 process 算子,传入的是 "KeyedProcessFunction"* KeyedProcessFunction 中的,泛型 1:流中的 key 的类型;泛型 2:流中的数据的类型;泛型 3:处理后的输出结果的类型*/// 对 s1 流进行 keyby 分组KeyedStream, String> keyedStream = s1.keyBy(tp2 -> tp2.f0);// 然后在 keyby 后的数据流上调用 process 算子SingleOutputStreamOperator> s2 = keyedStream.process(new KeyedProcessFunction, Tuple2>() {@Overridepublic void processElement(Tuple2 value, KeyedProcessFunction, Tuple2>.Context ctx, Collector> out) throws Exception {// 把 id 变整数,把 eventId 变大写out.collect(Tuple2.of(Integer.parseInt(value.f0), value.f1.toUpperCase()));}});s2.print();env.execute();}
}

相关内容

热门资讯

“广东第一田”出品,台山丝苗米... 从江门市区往南驱车近100公里,就来到南海之滨的台山斗山镇。当地的地道美食黄鳝饭飘香百里,吸引着食客...
【临汾日报】市人民医院膳食专家... 年夜饭 是团圆的味道 是时光的馈赠 更是营养与美好的温柔邂逅 新春将至 大家是不是正发愁 年夜饭该做...
「年夜饭系列4」除夕夜,没有什... 年夜饭的餐桌上,总有一道菜是专为“团圆”而生的。它不是最贵重的,也不是最复杂的,但它一定是最能牵动味...
除夕年夜饭菜单,4-5口人家8... 除夕年夜饭菜单,4-5口人家8个菜,菜式丰富不浪费,年味十足! 在中国人的传统观念里,除夕是一年中...
原创 血... 血糖高的人早餐完全可以吃荞麦面条,而且荞麦面是很适合控糖的主食,但吃法很关键。 1. 为什么推荐吃...