1. Flink基础
1.1. 架构
JobManager和TaskManager组成
TaskManager中最小的资源调度单元是task slot
1.2. 运行模式
Standalone
Yarn
Session:先启动flink集群常驻yarn,适合提交多个小job
per-job:一个job是一个集群,执行完集群消失,适合大规模长时间的job
application:和per-job区别就是多个job在一个application下
1.3. 四个图
streamGraph :在client生成,是代码的流程图
jobGraph :在client把streamGraph进行并行度优化,发给jobmanager
executorGraph :在jobmanager根据jobGraph生成并行化的版本发给taskmanager执行
physical :最终的结果,不是真正的图
1.4. 检查点
就是flink内部从source端每隔一段时间往下游的各个并行度发送一个barrier(检查点分界线)
当一个算子所有并行度上的barrier都到达的时候,会对当前状态打一个快照,并且把当前的状态缓存下来
也就是检查点对齐,对齐之后会把保存成功的状态返回给jobmanager,然后继续向下游传递、对齐
代码:
// 启用检查点机制,并指定检查点之间的时间间隔
env.enableCheckpointing(5000); // 每5秒触发一个检查点
// 设置检查点模式为“恰好一次”
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(30000); // 30秒
// 设置并发进行检查点的最大数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同一时间只允许一个检查点
// 设置两个检查点之间的最小暂停时间,避免频繁的检查点操作
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // 1秒
// 设置是否在故障恢复时优先使用检查点来恢复状态
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 设置允许检查点失败的次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 允许3次检查点失败
// 设置状态后端,这里使用文件系统状态后端
env.setStateBackend(new FsStateBackend("file:///path/to/state/backend"));
// 设置检查点存储配置,将检查点保存在文件系统中
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 启用不对齐检查点
env.getCheckpointConfig().enableUnalignedCheckpoints();
// 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
1.5. 容错机制
开启检查点 + 设置重启策略
任务报错的时候,会按照重启策略从检查点恢复任务
固定重启策略 + 故障率重启策略
代码:
// 使用固定延迟重启策略,最大尝试3次,每次重启之间等待10秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
// 使用基于失败率的重启策略,如果在60分钟内失败率超过50%,则重启任务
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(60), Time.seconds(10)));
1.6. 窗口机制
对数据分组和聚合的一种机制
架构:窗口分配器、触发器
将数据流分成各种时间段,然后对时间段内的数据进行计算
分滚动窗口、滑动窗口、会话窗口
滚动窗口:将时间流分成固定大小,不重叠的时间段
滑动窗口:固定大小的、可以重叠的时间段
会话窗口:根据事件之间的间隔来划分时间段
窗口划分逻辑:
开始时间 = 当前时间 - (当前时间 + 窗口大小)% 窗口大小
结束时间 = 开始时间 + 窗口大小
窗口内的最大时间是结束时间 - 1 ms(毫秒)(窗口是左开右闭)
窗口触发时间:水位线达到窗口的最大时间,触发窗口内的计算
窗口关闭时间:水位线达到窗口的最大时间 + 允许的迟到时间(乱序时间)
代码
// 调用window算子 传入窗口分配器的静态方法of 创建窗口分配器
Datastream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
//长度为10s 滑动步长为5s的滑动窗口,5s统计一次,每次统计当前10s内的数据
Datastream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)));
//一个会话超过10s内没有数据,就对他们进行合并
Datastream.window(EventTimeSessionWindows.withGap(Time.seconds(10)));
1.7. watermark
水位线机制主要是处理事件时间的延迟数据的机制
在流式计算里可能会有网络传输延迟等一些问题,造成事件时间乱序
代码
Datastream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ZERO) // 可容许的延迟时间 这里是0秒
.withTimestampAssigner((SerializableTimestampAssigner<T>) (x, l) -> x.time) // 这里定义要抽取的时间 作为数据流中的事件
);
1.8. State
1.9. join
Window Join:基于窗口,同一窗口的可以join上,滑动窗口还会重复join
Interval Join:基于状态
两条流都维护一个Map,key是时间戳,value是数据
有一条数据进来的时候会去对方的Map中找到对于时间戳的数据,如果有就算join上
如果没有就注册定时器,等待对方流来了对应的数据重复上面操作
超过时间范围,清楚状态里的时间戳和数据
代码:
//Window Join
DataStream<FactOrderItem> resultDS = DS1.join(DS2)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//<IN1, IN2, OUT>
.apply(new JoinFunction<Goods, OrderItem, FactOrderItem>() {
@Override
public FactOrderItem join(Goods first, OrderItem second) throws Exception {
FactOrderItem result = new FactOrderItem();
result.setGoodsId(first.getGoodsId());
result.setGoodsName(first.getGoodsName());
result.setCount(new BigDecimal(second.getCount()));
result.setTotalMoney(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));
return result;
}
});
//Interval Join
SingleOutputStreamOperator<FactOrderItem> resultDS = goodsDSWithWatermark.keyBy(Goods::getGoodsId)
.intervalJoin(orderItemDSWithWatermark.keyBy(OrderItem::getGoodsId))
//join的条件:
// 条件1.id要相等
// 条件2. OrderItem的时间戳 - 2 <=Goods的时间戳 <= OrderItem的时间戳 + 1
.between(Time.seconds(-2), Time.seconds(1))
//ProcessJoinFunction<IN1, IN2, OUT>
.process(new ProcessJoinFunction<Goods, OrderItem, FactOrderItem>() {
@Override
public void processElement(Goods left, OrderItem right, Context ctx, Collector<FactOrderItem> out) throws Exception {
FactOrderItem result = new FactOrderItem();
result.setGoodsId(left.getGoodsId());
result.setGoodsName(left.getGoodsName());
result.setCount(new BigDecimal(right.getCount()));
result.setTotalMoney(new BigDecimal(right.getCount()).multiply(left.getGoodsPrice()));
out.collect(result);
}
});
1.10. CEP
定义复杂事件的pattern模板
将模板应用到流上
返回符合定义的数据
代码:
// 定义事件模式
Pattern<Tuple2<String, Double>, Tuple2<String, Double>> pattern = Pattern
.<Tuple2<String, Double>>begin("start")
.where(new SimpleCondition<Tuple2<String, Double>>() {
@Override
public boolean filter(Tuple2<String, Double> value) {
return value.f1 > 100.0; // 条件:事件值大于100.0
}
})
.next("end")
.where(new SimpleCondition<Tuple2<String, Double>>() {
@Override
public boolean filter(Tuple2<String, Double> value) {
return value.f1 < 50.0; // 条件:事件值小于50.0
}
});
// 应用事件模式到数据流上
DataStream<Map<String, List<Tuple2<String, Double>>>> resultStream = CEP.pattern(inputDataStream, pattern)
.select(new PatternSelectFunction<Tuple2<String, Double>, Map<String, List<Tuple2<String, Double>>>>() {
@Override
public Map<String, List<Tuple2<String, Double>>> select(Map<String, List<Tuple2<String, Double>>> pattern) throws Exception {
return pattern;
}
});
1.11. 常用算子
map
filter
keyby
reduce
window
join
union
broadcast
readtextfile
评论区