跳转至

mulStream

之前所有的操作均是在一条流, 真实业务中会存在多条流的情况

分流

普通方法

fliter过滤

split 分流 需要保证输入和输出类型一致 目前已经淘汰

通用方法

使用侧输出流

package com.matt.apitest.mulstream;

import com.matt.apitest.beans.Event;
import com.matt.apitest.source.SourceTest4_UDF;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.mortbay.util.ajax.JSON;

import java.time.Duration;

/**
 * @author matt
 * @create 2023-03-02 22:54
 * @desc xxx
 */
public class SplitStreamTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new SourceTest4_UDF.ParallelCustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        // 定义输出标签
        OutputTag<Tuple2<String, String>> t1 = new OutputTag<Tuple2<String, String>>("t1") {
        };
        OutputTag<Tuple2<String, String>> t2 = new OutputTag<Tuple2<String, String>>("t2") {
        };

        SingleOutputStreamOperator<Event> s = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event event, ProcessFunction<Event, Event>.Context context, Collector<Event> collector) throws Exception {
                if (event.user.equals("matt")) {
                    context.output(t1, Tuple2.of(event.user, event.url));
                } else if (event.user.equals("jack")) {
                    context.output(t2, Tuple2.of(event.user, event.url));
                } else {
                    collector.collect(event);
                }
            }
        });

        s.getSideOutput(t1).print("t1");
        s.getSideOutput(t2).print("t2");

        s.print("main");
        env.execute();
    }
}

合流

多条流的合并

合并 union

package com.matt.apitest.mulstream;

import com.matt.apitest.beans.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author matt
 * @create 2023-03-02 23:09
 * @desc union 取2个流水位线最小值
 */
public class UnionTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> s1 = env.socketTextStream("localhost", 777)
                .map(d -> {
                    String[] fArr = d.split(" ");
                    if (fArr.length < 3) {
                        return new Event();
                    }
                    return new Event(fArr[0], fArr[1], Long.valueOf(fArr[2]));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));


        SingleOutputStreamOperator<Event> s2 = env.socketTextStream("localhost", 888)
                .map(d -> {
                    String[] fArr = d.split(" ");
                    if (fArr.length < 3) {
                        return new Event();
                    }
                    return new Event(fArr[0], fArr[1], Long.valueOf(fArr[2]));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));



        s1.union(s2).process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event event, ProcessFunction<Event, String>.Context context, Collector<String> collector) throws Exception {
                collector.collect("wk" + context.timerService().currentWatermark());
            }
        }).print();

        env.execute();
    }
}

取了并集 多条流数据类型需要保持一致, union方法 支持>=1个流

水位线以多条流最小的为准

连接 connect

connetcedStream

2条流可以是不同数据类型

connetc = connetcedStream

package com.matt.apitest.mulstream;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * @author matt
 * @create 2023-03-02 23:40
 * @desc xxx
 */
public class ConnectTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> s1 = env.fromElements(10, 20, 30);
        DataStreamSource<Long> s2 = env.fromElements(11L, 22L, 31L);

        ConnectedStreams<Integer, Long> connect = s1.connect(s2);
        SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, Long, String>() {
            @Override
            public String map1(Integer integer) throws Exception {
                return String.valueOf(integer) + "s1";
            }

            @Override
            public String map2(Long v) throws Exception {
                return String.valueOf(v) + "s2";
            }
        });

        map.print();
        env.execute();
    }
}

CoProcessFunction

connect -> process (CoProcessFunction)

processElement1:处理第一条流

processElement2:处理第一条流

package com.matt.apitest.mulstream;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author matt
 * @create 2023-03-03 00:01
 * @desc xxx
 */
public class BillCheckCase {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> s1 = env.fromElements(
                        Tuple3.of("order-1", "app", 1000L),
                        Tuple3.of("order-2", "app", 1000L),
                        Tuple3.of("order-3", "app", 2000L))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
                                return t.f2;
                            }
                        }));

        SingleOutputStreamOperator<Tuple4<String, String, String, Long>> s2 = env.fromElements(
                        Tuple4.of("order-1", "3", "suc", 2000L),
                        Tuple4.of("order-3", "3", "suc", 4000L))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple4<String, String, String, Long> t, long l) {
                                return t.f3;
                            }
                        }));

        // 检测在同一个订单 俩条流是否匹配 5s 等待时间
        s1.keyBy(d -> d.f0)
                .connect(s2.keyBy(d -> d.f0))
                .process(new OrderMatchResult())
                .print();


        //s1.connect(s2).keyBy(d -> d.f0, d -> d.f0);
        env.execute();
    }

    public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>,
            Tuple4<String, String, String, Long>, String> {

        // 状态变量
        private ValueState<Tuple3<String, String, Long>> state1;
        private ValueState<Tuple4<String, String, String, Long>> state2;

        @Override
        public void open(Configuration conf) throws Exception {
            state1 = getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String, String, Long>>(
                    "state1", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));
            state2 = getRuntimeContext().getState(new ValueStateDescriptor<Tuple4<String, String, String, Long>>(
                    "state2", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG)));
        }

        @Override
        public void processElement1(Tuple3<String, String, Long> v, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> collector) throws Exception {
            if (state2.value() != null) {
                collector.collect("s1对账成功" + v + " " + state2.value());
                state2.clear();
            } else {
                state1.update(v);
                ctx.timerService().registerEventTimeTimer(v.f2 + 5000L);
            }
        }

        @Override
        public void processElement2(Tuple4<String, String, String, Long> v, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String,
                String, String, Long>, String>.Context ctx, Collector<String> collector) throws Exception {
            if (state1.value() != null) {
                collector.collect("s2对账成功" + state1.value() + " " + v);
                state1.clear();
            } else {
                state2.update(v);
                ctx.timerService().registerEventTimeTimer(v.f3);
            }
        }


        @Override
        public void onTimer(long timestamp, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            // 如果某个状态为空 存在一条流没到
            if (state1.value() != null) {
                out.collect(state1.value() + "对账失败");
            } else if (state2.value() != null) {
                out.collect(state2.value() + "对账失败");
            }
            state1.clear();
            state2.clear();
        }
    }
}

广播连接流 BroadcastConnectedStream

定义一个广播流, 需先定义一个描述器

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                    .broadcast(ruleStateDescriptor);

连接

DataStream<String> output = stream.connect(ruleBroadcastStream)
              .process( new BroadcastProcessFunction<>() {...} );
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

dataStream 和广播流连接就是 BroadcastConnectedStream

对数据流调用过 keyBy 进行了按键分区,那么就是 KeyedBroadcastProcessFunction;

联结 join

上述合流操作均是取了并集,有一些情况可能需要a&b 合并为1条数据

窗口连接

package com.matt.apitest.mulstream;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

/**
 * @author matt
 * @create 2023-03-05 14:56
 * @desc xxx
 */
public class WinJoinTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> s1 = env.fromElements(
                Tuple2.of("t1", 1000L),
                Tuple2.of("t3", 9999L),
                Tuple2.of("t2", 8000L),
                Tuple2.of("t4", 11000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String, Long> t, long l) {
                        return t.f1;
                    }
                }));

        SingleOutputStreamOperator<Tuple2<String, Long>> s2 = env.fromElements(
                Tuple2.of("t1", 2002L),
                Tuple2.of("t3", 5000L),
                Tuple2.of("t2", 9000L),
                Tuple2.of("t4", 22000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String, Long> t, long l) {
                        return t.f1;
                    }
                }));

        s1.join(s2)
                .where(d -> d.f0)
                .equalTo(d -> d.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5L)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> f1, Tuple2<String, Long> f2) throws Exception {

                        return f1 + "->" + f2;
                    }
                }).print();

        env.execute();
    }
}

join joinedStream ->

where指定第一条流的k, equalTo指定第一条流的k 俩条流k相等才会进行匹配 ->

window 指定窗口类型 ->

apply 合并数据 第一条流和第二条流匹配的数据会做笛卡尔积

间隔联结

a流a1 b流b2 a1,b2虽然不在一个窗口内但仍希望进行联结

package com.matt.apitest.mulstream;

import com.matt.apitest.beans.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author matt
 * @create 2023-03-05 15:26
 * @desc xxx
 */
public class IntervalJoinTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> s1 = env.fromElements(
                new Event("a1", "/u1", 100L),
                new Event("a2", "/u1", 200L),
                new Event("a3", "/u2", 300L),
                new Event("a4", "/u3", 400L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event t, long l) {
                        return t.timestamp;
                    }
                }));


        SingleOutputStreamOperator<Event> s2 = env.fromElements(
                new Event("a1", "/bill", 100L),
                new Event("a2", "/b2", 200L),
                new Event("a3", "/b3", 300L),
                new Event("a4", "/b4", 400L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event t, long l) {
                        return t.timestamp;
                    }
                }));

        s1.keyBy(d -> d.user)
                .intervalJoin(s2.keyBy(d -> d.user))
                .between(Time.seconds(-5), Time.seconds(5))
                .process(new ProcessJoinFunction<Event, Event, String>() {
                    @Override
                    public void processElement(Event e1, Event e2, ProcessJoinFunction<Event, Event, String>.Context context, Collector<String> collector) throws Exception {
                        collector.collect(e1 + "->" + e2);
                    }
                }).print();

        env.execute();
    }
}

between 指定窗口 上下界

winStart + low <= t < winEnd + up 只要2条流满足这个条件均可以连接

窗口同组联结

apply:CoGroupFunction

一次传入匹配的数据集 由用户自己去定义匹配 不在是暴力笛卡尔积

package com.matt.apitest.mulstream;

import com.matt.apitest.beans.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author matt
 * @create 2023-03-05 15:50
 * @desc xxx
 */
public class CoGroupTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> s1 = env.fromElements(
                Tuple2.of("t1", 1000L),
                Tuple2.of("t3", 9999L),
                Tuple2.of("t2", 8000L),
                Tuple2.of("t4", 11000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String, Long> t, long l) {
                        return t.f1;
                    }
                }));


        SingleOutputStreamOperator<Tuple2<String, Long>> s2 = env.fromElements(
                Tuple2.of("t1", 2002L),
                Tuple2.of("t1", 2003L),
                Tuple2.of("t3", 5000L),
                Tuple2.of("t2", 9000L),
                Tuple2.of("t4", 22000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String, Long> t, long l) {
                        return t.f1;
                    }
                }));

        s1.coGroup(s2)
                .where(d -> d.f0)
                .equalTo(d -> d.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5L)))
                .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, Long>> s1Iterable, Iterable<Tuple2<String, Long>> s2Iterable, Collector<String> collector) throws Exception {
                        collector.collect(s1Iterable + "->" + s2Iterable);
                    }
                }).print();

        env.execute();
    }
}