FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。
FlinkCEP不是二进制发布包的一部分,需要添加Maven依赖或者是lib下引入。
1. 模式定义
在最开始首先要了解模式是什么?
能够实现什么功能?
模式分类有多少种?
1.1. 概述
模式简单来说就定义一个希望从数据流中抽取数据的复杂行为序列。
每个复杂的模式序列包括多个简单的模式,比如,寻找拥有相同属性事件序列的模式。从现在开始,把这些简单的模式称作模式, 把在数据流中最终寻找的复杂模式序列称作模式序列,你可以把模式序列看作是这样的模式构成的图, 这些模式基于用户指定的条件从一个转换到另外一个,比如 event.getName().equals("end")
。 一个匹配是输入事件的一个序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。
每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。
模式的名字不能包含字符":"
.
1.2. 单个模式
单个模式就最简单的也是最基础的模式,它可以是单例模式也可以是循环模式。
单例匹配只接受一个事件进行条件匹配;
循环匹配可以接收多个事件。
在模式匹配表达式中,表达式:a b+ c? 中 a 和 c都是单例模式,b+是一个循环模式,一般来说模式都是单例的,这需要使用量词将单例转成循环模式。
通常来说每个模式都可以指定一个或多个条件来指定模式要匹配到哪些事件。
1.2.1. 条件
对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如,它的value字段应该大于5,或者大于前面接受的事件的平均值。 指定判断事件属性的条件可以通过pattern.where()
、pattern.or()
或者pattern.until()
方法。 这些可以是IterativeCondition
或者SimpleCondition
。
1.2.1.1. IterativeCondition(迭代条件)
IterativeCondition 中提供了TimeContext,可以调用ctx.getEventsForPattern(...)
可以获得所有前面已经接受作为可能匹配的事件。
但是调用这个操作的代价可能很小也可能很大,所以在实现你的条件时,尽量少使用它。
下面是一个迭代条件的代码,它接受"middle"模式下一个事件的名称开头是"foo", 并且前面已经匹配到的事件加上这个事件的价格小于5.0。 迭代条件非常强大,尤其是跟循环模式结合使用时。
middle.oneOrMore()
.where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
if (!value.getName().startsWith("foo")) {
return false;
}
double sum = value.getPrice();
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});
需要结合循环模式使用
1.2.1.2. SimpleCondition(简单条件)
SimpleCondition是IterativeCondition的子类,他并不提供上下文对象,因此它决定是否接受一个事件只取决于事件自身的属性。
start.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});
1.2.1.3. AND | OR
在指定where条件时,可以通过 and 或者 or 来为当前模式指定复杂的条件。
pattern.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // 一些判断条件
}
}).or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // 一些判断条件
}
});
1.2.1.4. 量词
当使用循环模式时,需要对单例模式指定量词来指定希望事件出现的次数。
对一个命名为start
的模式,你可以通过以下量词进行指定:
// 期望出现4次
start.times(4);
// 期望出现0或者4次
start.times(4).optional();
// 期望出现2、3或者4次
start.times(2, 4);
// 期望出现2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).greedy();
// 期望出现0、2、3或者4次
start.times(2, 4).optional();
// 期望出现0、2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).optional().greedy();
// 期望出现1到多次
start.oneOrMore();
// 期望出现1到多次,并且尽可能的重复次数多
start.oneOrMore().greedy();
// 期望出现0到多次
start.oneOrMore().optional();
// 期望出现0到多次,并且尽可能的重复次数多
start.oneOrMore().optional().greedy();
// 期望出现2到多次
start.timesOrMore(2);
// 期望出现2到多次,并且尽可能的重复次数多
start.timesOrMore(2).greedy();
// 期望出现0、2或多次
start.timesOrMore(2).optional();
// 期望出现0、2或多次,并且尽可能的重复次数多
start.timesOrMore(2).optional().greedy();
你可以使用pattern.greedy()
方法让循环模式变成贪心的,但现在还不能让模式组贪心。 你可以使用pattern.optional()
方法让所有的模式变成可选的,不管是否是循环模式。
1.2.2. 终止条件
当使用循环模式时,可以指定一个结束条件来终止循环,例如,接受事件的值大于5满足累计值的和小于50时结束循环。
为了更好的理解它,看下面的例子。给定
模式如
"(a+ until b)"
(一个或者更多的"a"
直到"b"
)到来的事件序列
"a1" "c" "a2" "b" "a3"
输出结果会是:
{a1 a2} {a1} {a2} {a3}
.
你可以看到{a1 a2 a3}
和{a2 a3}
由于停止条件没有被输出
1.2.3. 单个模式API描述(部分)
1.3. 组合模式
上面看到了单个模式是什么样的,组合模式就是由多个单个模式组合成的模式序列。
一个组合模式一般有一个初始模式作为模式的入口。例如:
Pattern.<Event>begin("start")
.oneOrMore()
.where(new IterativeCondition<Event>() { /
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return "1".equals(value.getValue());
}
})
./* and */or(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return "2".equals(value.getValue());
}
});
1.3.1. 匹配后跳过策略
对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy
。 有五种跳过策略,如下:
NO_SKIP: 每个成功的匹配都会被输出。
SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。
SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。
注意:当使用SKIP_TO_FIRST和SKIP_TO_LAST策略时,需要指定一个合法的PatternName
例如,给定一个模式b+ c
和一个数据流b1 b2 b3 c
,不同跳过策略之间的不同如下:
上述例子中 NO_SKIP 和 SKIP_TO_FIRST 看不出明显差别,下面再举一个例子说明它俩之间的差别。
模式: (a | b | c) (b | c) c+.greedy d
,输入:a b c1 c2 c3 d
,结果将会是:
为了更好的理解NO_SKIP和SKIP_TO_NEXT之间的差别,看下面的例子: 模式:a b+
,输入:a b1 b2 b3
,结果将会是
想指定要使用的跳过策略,只需要调用下面的方法创建AfterMatchSkipStrategy
:
默认情况下会使用NO_SKIP策略
可以通过调用下面方法将跳过策略应用到模式上:
AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("start", skipStrategy);
使用SKIP_TO_FIRST/LAST时,有两个选项可以用来处理没有事件可以映射到对应的变量名上的情况。另外一个选项是抛出异常。 可以使用如下的选项:
AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
1.3.2. 事件间连续策略
1.3.2.1. 事件约束
严格连续: 期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
松散连续: 忽略匹配的事件之间的不匹配的事件。
不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。
可以使用下面的方法来指定模式之间的连续策略:
next()
,指定严格连续,followedBy()
,指定松散连续,followedByAny()
,指定不确定的松散连续。
或者
notNext()
,不想后面直接连着一个特定事件notFollowedBy()
,不想一个特定事件发生在两个事件之间的任何地方。
注意:
模式序列不能以
notFollowedBy()
结尾。一个 NOT 模式前面不能是可选的模式。
松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上,而不确定的松散连接情况下,有着同样起始的多个匹配会被输出。 举例来说,模式"a b"
,给定事件序列"a","c","b1","b2"
,会产生如下的结果:
"a"
和"b"
之间严格连续:{}
(没有匹配),"a"
之后的"c"
导致"a"
被丢弃。"a"
和"b"
之间松散连续:{a b1}
,松散连续会"跳过不匹配的事件直到匹配上的事件"。"a"
和"b"
之间不确定的松散连续:{a b1}
,{a b2}
,这是最常见的情况,也是默认的选项。
1.3.2.2. 时间约束
也可以为模式定义一个有效时间约束。 例如,你可以通过pattern.within()
方法指定一个模式应该在10秒内发生。这种时间模式支持处理时间和事件时间.
一个模式序列只能有一个时间限制。如果限制了多个时间在不同的单个模式上,会使用最小的那个时间限制。
next.within(Time.seconds(10));
1.3.2.3. 循环模式中的连续性 (扩展)
你可以在循环模式中使用和前面章节讲过的同样的连续性。 连续性会被运用在被接受进入模式的事件之间。 用这个例子来说明上面所说的连续性,一个模式序列"a b+ c"
("a"
后面跟着一个或者多个(不确定连续的)"b"
,然后跟着一个"c"
) 输入为"a","b1","d1","b2","d2","b3","c"
,输出结果如下:
简单来说,循环模式下使用严格限制,每次匹配的 "b" 事件之间不能有其他事件。
严格连续:
{a b1 c}
,{a b2 c}
,{a b3 c}
- 没有相邻的"b"
。松散连续:
{a b1 c}
,{a b1 b2 c}
,{a b1 b2 b3 c}
,{a b2 c}
,{a b2 b3 c}
,{a b3 c}
-"d"
都被忽略了。不确定松散连续:
{a b1 c}
,{a b1 b2 c}
,{a b1 b3 c}
,{a b1 b2 b3 c}
,{a b2 c}
,{a b2 b3 c}
,{a b3 c}
- 注意{a b1 b3 c}
,这是因为"b"
之间是不确定松散连续产生的。
对于循环模式(例如oneOrMore()
和times()
)),默认是松散连续。如果想使用严格连续,你需要使用consecutive()
方法明确指定, 如果想使用不确定松散连续,你可以使用allowCombinations()
方法。
1.4. 模式组
模式组是通过将多个模式组合起来处理事件流的一种机制。模式组的主要作用是为事件流的复杂匹配提供灵活性,允许针对不同的场景定义多个独立的模式,同时提高代码的组织性和效率
一个模式组由多个模式组成,每个模式可以独立匹配事件流中的部分数据。通过模式组,你可以一次性在同一个事件流中并行检测多个模式,而不需要为每个模式单独处理整个事件流。
例如定义一个模式序列作为begin
,followedBy
,followedByAny
和next
的条件。这个模式序列在逻辑上会被当作匹配的条件, 并且返回一个GroupPattern
,可以在GroupPattern
上使用oneOrMore()
,times(#ofTimes)
, times(#fromTimes, #toTimes)
,optional()
,consecutive()
,allowCombinations()
。
Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);
// 严格连续
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// 松散连续
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// 不确定松散连续
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
下面是查询:
第一阶段做了:a b c
第二阶段做了:d+
第三阶段做了:a c
Pattern.begin(
Pattern.<Event>begin("stage1-a").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return event.eventName.equals("a");
}
}
)
.next("stage1-b").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return event.eventName.equals("b");
}
}
)
.next("stage1-c").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return event.eventName.equals("c");
}
}
)
).next(
Pattern.<Event>begin("stage2-d+")
.where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return event.eventName.equals("d");
}
}
).oneOrMore()
).next(
Pattern.<Event>begin("stage3-a").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return event.eventName.equals("a");
}
}
)
.next("stage3-c").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return event.eventName.equals("c");
}
}
)
);
2. 抽取数据
2.1. 检测模式
在指定了要寻找的模式后,该把它们应用到输入流上来发现可能的匹配了。
为了在事件流上运行你的模式,需要创建一个PatternStream
。 给定一个输入流input
,一个模式pattern
和一个可选的用来对使用事件时间时有同样时间戳或者同时到达的事件进行排序的比较器comparator
, 你可以通过调用如下方法来创建PatternStream
:
DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // 可选的
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
输入流根据你的使用场景可以是keyed或者non-keyed。
在 non-keyed 流上使用模式将会使你的作业并发度被设为1。
2.2. 从模式中选取数据
在获得到一个PatternStream
之后,你可以应用各种转换来发现事件序列。推荐使用PatternProcessFunction
。
PatternProcessFunction
有一个processMatch
的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List<IN>>
的格式接收一个匹配,Map的Key是你的模式序列中的每个模式的名称,Value是被接受的事件列表(IN
是输入事件的类型)。 模式的输入事件按照时间戳进行排序。
为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()
和times()
)时, 对一个模式会有不止一个事件被接受。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
@Override
public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
IN startEvent = match.get("start").get(0);
IN endEvent = match.get("end").get(0);
out.collect(OUT(startEvent, endEvent));
}
}
PatternProcessFunction
可以访问Context
对象。有了它之后,你可以访问时间属性,比如currentProcessingTime
或者当前匹配的timestamp
(最新分配到匹配上的事件的时间戳)。 更多信息可以看时间上下文。 通过这个上下文也可以将结果输出到侧输出.
2.2.1. 处理超时的匹配部分
当一个模式上通过within
加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler
接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction
里另外实现这个接口。 TimedOutPartialMatchHandler
提供了另外的processTimedOutMatch
方法,这个方法对每个超时的部分匹配都会调用。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
@Override
public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
...
}
@Override
public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
IN startEvent = match.get("start").get(0);
ctx.output(outputTag, T(startEvent));
}
}
processTimedOutMatch
不能访问主输出。 但你可以通过Context
对象把结果输出到侧输出。
3. CEP Demo
下面的例子在一个并行度的Events
流上检测模式start, middle(name = "error") -> end(name = "critical")
。 事件按照id
KeyBy,一个有效的模式需要发生在10秒内。
StreamExecutionEnvironment env = ...
DataStream<Event> input = ...
DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
});
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlert(pattern);
}
});
评论区