" />

警告:即将离开本站

点击"继续"将前往其他页面,确认后跳转。

侧边栏壁纸
  • 累计撰写 19 篇文章
  • 累计创建 2 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Flink基础(一)

dengdz
2024-12-16 / 0 评论 / 1 点赞 / 14 阅读 / 0 字

1. Flink基础

1.1. 架构

  • JobManager和TaskManager组成

  • TaskManager中最小的资源调度单元是task slot

1.2. 运行模式

  • Standalone

  • Yarn

  • Session:先启动flink集群常驻yarn,适合提交多个小job

  • per-job:一个job是一个集群,执行完集群消失,适合大规模长时间的job

  • application:和per-job区别就是多个job在一个application下

1.3. 四个图

  1. streamGraph :在client生成,是代码的流程图

  2. jobGraph :在client把streamGraph进行并行度优化,发给jobmanager

  3. executorGraph :在jobmanager根据jobGraph生成并行化的版本发给taskmanager执行

  4. physical :最终的结果,不是真正的图

1.4. 检查点

  • 就是flink内部从source端每隔一段时间往下游的各个并行度发送一个barrier(检查点分界线)

  • 当一个算子所有并行度上的barrier都到达的时候,会对当前状态打一个快照,并且把当前的状态缓存下来

  • 也就是检查点对齐,对齐之后会把保存成功的状态返回给jobmanager,然后继续向下游传递、对齐

  • 代码:

// 启用检查点机制,并指定检查点之间的时间间隔

env.enableCheckpointing(5000); // 每5秒触发一个检查点



// 设置检查点模式为“恰好一次”

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);



// 设置检查点超时时间

env.getCheckpointConfig().setCheckpointTimeout(30000); // 30秒



// 设置并发进行检查点的最大数量

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 同一时间只允许一个检查点



// 设置两个检查点之间的最小暂停时间,避免频繁的检查点操作

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // 1秒



// 设置是否在故障恢复时优先使用检查点来恢复状态

env.getCheckpointConfig().setPreferCheckpointForRecovery(true);



// 设置允许检查点失败的次数

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 允许3次检查点失败



// 设置状态后端,这里使用文件系统状态后端

env.setStateBackend(new FsStateBackend("file:///path/to/state/backend"));



// 设置检查点存储配置,将检查点保存在文件系统中

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);



// 启用不对齐检查点

env.getCheckpointConfig().enableUnalignedCheckpoints();



// 设置重启策略

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

1.5. 容错机制

  • 开启检查点 + 设置重启策略

  • 任务报错的时候,会按照重启策略从检查点恢复任务

  • 固定重启策略 + 故障率重启策略

  • 代码:

// 使用固定延迟重启策略,最大尝试3次,每次重启之间等待10秒

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));



// 使用基于失败率的重启策略,如果在60分钟内失败率超过50%,则重启任务

env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(60), Time.seconds(10)));

1.6. 窗口机制

  • 对数据分组和聚合的一种机制

  • 架构:窗口分配器、触发器

  • 将数据流分成各种时间段,然后对时间段内的数据进行计算

  • 滚动窗口、滑动窗口、会话窗口

  • 滚动窗口:将时间流分成固定大小,不重叠的时间段

  • 滑动窗口:固定大小的、可以重叠的时间段

  • 会话窗口:根据事件之间的间隔来划分时间段

  • 窗口划分逻辑:

  • 开始时间 = 当前时间 - (当前时间 + 窗口大小)% 窗口大小

  • 结束时间 = 开始时间 + 窗口大小

  • 窗口内的最大时间是结束时间 - 1 ms(毫秒)(窗口是左开右闭)

  • 窗口触发时间:水位线达到窗口的最大时间,触发窗口内的计算

  • 窗口关闭时间:水位线达到窗口的最大时间 + 允许的迟到时间(乱序时间)

  • 代码

// 调用window算子 传入窗口分配器的静态方法of 创建窗口分配器

Datastream.window(TumblingEventTimeWindows.of(Time.seconds(5)));



//长度为10s 滑动步长为5s的滑动窗口,5s统计一次,每次统计当前10s内的数据

Datastream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)));



//一个会话超过10s内没有数据,就对他们进行合并

Datastream.window(EventTimeSessionWindows.withGap(Time.seconds(10)));

1.7. watermark

  • 水位线机制主要是处理事件时间的延迟数据的机制

  • 在流式计算里可能会有网络传输延迟等一些问题,造成事件时间乱序

  • 代码

Datastream.assignTimestampsAndWatermarks(

WatermarkStrategy

.<Event>forBoundedOutOfOrderness(Duration.ZERO) // 可容许的延迟时间 这里是0秒

.withTimestampAssigner((SerializableTimestampAssigner<T>) (x, l) -> x.time) // 这里定义要抽取的时间 作为数据流中的事件

);

1.8. State

1.9. join

  • Window Join:基于窗口,同一窗口的可以join上,滑动窗口还会重复join

  • Interval Join:基于状态

  • 两条流都维护一个Map,key是时间戳,value是数据

  • 有一条数据进来的时候会去对方的Map中找到对于时间戳的数据,如果有就算join上

  • 如果没有就注册定时器,等待对方流来了对应的数据重复上面操作

  • 超过时间范围,清楚状态里的时间戳和数据

  • 代码:

//Window Join

DataStream<FactOrderItem> resultDS = DS1.join(DS2)

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

//<IN1, IN2, OUT>

.apply(new JoinFunction<Goods, OrderItem, FactOrderItem>() {

@Override

public FactOrderItem join(Goods first, OrderItem second) throws Exception {

FactOrderItem result = new FactOrderItem();

result.setGoodsId(first.getGoodsId());

result.setGoodsName(first.getGoodsName());

result.setCount(new BigDecimal(second.getCount()));

result.setTotalMoney(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));

return result;

}

});



//Interval Join

SingleOutputStreamOperator<FactOrderItem> resultDS = goodsDSWithWatermark.keyBy(Goods::getGoodsId)

.intervalJoin(orderItemDSWithWatermark.keyBy(OrderItem::getGoodsId))

//join的条件:

// 条件1.id要相等

// 条件2. OrderItem的时间戳 - 2 <=Goods的时间戳 <= OrderItem的时间戳 + 1

.between(Time.seconds(-2), Time.seconds(1))

//ProcessJoinFunction<IN1, IN2, OUT>

.process(new ProcessJoinFunction<Goods, OrderItem, FactOrderItem>() {

@Override

public void processElement(Goods left, OrderItem right, Context ctx, Collector<FactOrderItem> out) throws Exception {

FactOrderItem result = new FactOrderItem();

result.setGoodsId(left.getGoodsId());

result.setGoodsName(left.getGoodsName());

result.setCount(new BigDecimal(right.getCount()));

result.setTotalMoney(new BigDecimal(right.getCount()).multiply(left.getGoodsPrice()));

out.collect(result);

}

});

1.10. CEP

  • 定义复杂事件的pattern模板

  • 将模板应用到流上

  • 返回符合定义的数据

  • 代码:

// 定义事件模式

Pattern<Tuple2<String, Double>, Tuple2<String, Double>> pattern = Pattern

.<Tuple2<String, Double>>begin("start")

.where(new SimpleCondition<Tuple2<String, Double>>() {

@Override

public boolean filter(Tuple2<String, Double> value) {

return value.f1 > 100.0; // 条件:事件值大于100.0

}

})

.next("end")

.where(new SimpleCondition<Tuple2<String, Double>>() {

@Override

public boolean filter(Tuple2<String, Double> value) {

return value.f1 < 50.0; // 条件:事件值小于50.0

}

});



// 应用事件模式到数据流上

DataStream<Map<String, List<Tuple2<String, Double>>>> resultStream = CEP.pattern(inputDataStream, pattern)

.select(new PatternSelectFunction<Tuple2<String, Double>, Map<String, List<Tuple2<String, Double>>>>() {

@Override

public Map<String, List<Tuple2<String, Double>>> select(Map<String, List<Tuple2<String, Double>>> pattern) throws Exception {

return pattern;

}

});

1.11. 常用算子

  • map

  • filter

  • keyby

  • reduce

  • window

  • join

  • union

  • broadcast

  • readtextfile


1

评论区