process function 相对于前文所述的 map、flatmap、filter 算子来说,最大的区别是其让开发人员对数据 的 处 理 逻 辑 拥 有 更 大 的 自 由 度 ; 同 时 , ProcessFunction 继 承 了 RichFunction , 因 而 具 备 了 getRuntimeContext() ,open() ,close()等方法;
在不同类型的 datastream 上,(比如 keyed stream、windowedStream、ConnectedStream 等),应用 process function 时,flink 提供了大量不同类型的 process function,让其针对不同的 datastream 拥有更具针对 性的功能;
各种算子运算后所生成的 datastream 类型,及各种 datastream 类型之间的互相转换关系:
/*** @Author: deep as the sea* @Site: www.51doit.com* @QQ: 657270652* @Date: 2022/4/26* @Desc: process 算子及 ProcessFunction 示例* 在不同类型的数据流上,调用 process 算子时,所需要传入的 ProcessFunction 也会有不同*/
public class _17_ProcessFunctions_Demo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger("rest.port", 8822);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);// id,eventIdDataStreamSource stream1 = env.socketTextStream("localhost", 9998);/*** 在普通的 datastream 上调用 process 算子,传入的是 "ProcessFunction"*/SingleOutputStreamOperator> s1 = stream1.process(new ProcessFunction>() {// 可以使用 生命周期 open 方法@Overridepublic void open(Configuration parameters) throws Exception {// 可以调用 getRuntimeContext 方法拿到各种运行时上下文信息RuntimeContext runtimeContext = getRuntimeContext();runtimeContext.getTaskName();super.open(parameters);}@Overridepublic void processElement(String value, ProcessFunction>.Context ctx, Collector> out) throws Exception {// 可以做测流输出ctx.output(new OutputTag("s1", Types.STRING), value);// 可以做主流输出String[] arr = value.split(",");out.collect(Tuple2.of(arr[0], arr[1]));}// 可以使用 生命周期 close 方法@Overridepublic void close() throws Exception {super.close();}});/*** 在 keyedStream 上调用 process 算子,传入的是 "KeyedProcessFunction"* KeyedProcessFunction 中的,泛型 1:流中的 key 的类型;泛型 2:流中的数据的类型;泛型 3:处理后的输出结果的类型*/// 对 s1 流进行 keyby 分组KeyedStream, String> keyedStream = s1.keyBy(tp2 -> tp2.f0);// 然后在 keyby 后的数据流上调用 process 算子SingleOutputStreamOperator> s2 = keyedStream.process(new KeyedProcessFunction, Tuple2>() {@Overridepublic void processElement(Tuple2 value, KeyedProcessFunction, Tuple2>.Context ctx, Collector> out) throws Exception {// 把 id 变整数,把 eventId 变大写out.collect(Tuple2.of(Integer.parseInt(value.f0), value.f1.toUpperCase()));}});s2.print();env.execute();}
}