" />

警告:即将离开本站

点击"继续"将前往其他页面,确认后跳转。

侧边栏壁纸
  • 累计撰写 19 篇文章
  • 累计创建 2 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Flink CEP(三)

dengdz
2024-11-25 / 0 评论 / 0 点赞 / 23 阅读 / 0 字

FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。

FlinkCEP不是二进制发布包的一部分,需要添加Maven依赖或者是lib下引入。

1. 模式定义

在最开始首先要了解模式是什么?

能够实现什么功能?

模式分类有多少种?

1.1. 概述

模式简单来说就定义一个希望从数据流中抽取数据的复杂行为序列。

每个复杂的模式序列包括多个简单的模式,比如,寻找拥有相同属性事件序列的模式。从现在开始,把这些简单的模式称作模式, 把在数据流中最终寻找的复杂模式序列称作模式序列,你可以把模式序列看作是这样的模式构成的图, 这些模式基于用户指定的条件从一个转换到另外一个,比如 event.getName().equals("end")。 一个匹配是输入事件的一个序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。

每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。

模式的名字不能包含字符":".

1.2. 单个模式

单个模式就最简单的也是最基础的模式,它可以是单例模式也可以是循环模式。

单例匹配只接受一个事件进行条件匹配;

循环匹配可以接收多个事件。

在模式匹配表达式中,表达式:a b+ c? 中 a 和 c都是单例模式,b+是一个循环模式,一般来说模式都是单例的,这需要使用量词将单例转成循环模式。

通常来说每个模式都可以指定一个或多个条件来指定模式要匹配到哪些事件。

1.2.1. 条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如,它的value字段应该大于5,或者大于前面接受的事件的平均值。 指定判断事件属性的条件可以通过pattern.where()pattern.or()或者pattern.until()方法。 这些可以是IterativeCondition或者SimpleCondition

1.2.1.1. IterativeCondition(迭代条件)

IterativeCondition 中提供了TimeContext,可以调用ctx.getEventsForPattern(...)可以获得所有前面已经接受作为可能匹配的事件。

但是调用这个操作的代价可能很小也可能很大,所以在实现你的条件时,尽量少使用它。

下面是一个迭代条件的代码,它接受"middle"模式下一个事件的名称开头是"foo", 并且前面已经匹配到的事件加上这个事件的价格小于5.0。 迭代条件非常强大,尤其是跟循环模式结合使用时。

middle.oneOrMore()
    .where(new IterativeCondition<SubEvent>() {
        @Override
        public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
            if (!value.getName().startsWith("foo")) {
                return false;
            }

            double sum = value.getPrice();
            for (Event event : ctx.getEventsForPattern("middle")) {
                sum += event.getPrice();
            }
            return Double.compare(sum, 5.0) < 0;
        }
    });

需要结合循环模式使用

1.2.1.2. SimpleCondition(简单条件)

SimpleCondition是IterativeCondition的子类,他并不提供上下文对象,因此它决定是否接受一个事件只取决于事件自身的属性。

start.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }
});

1.2.1.3. AND | OR

在指定where条件时,可以通过 and 或者 or 来为当前模式指定复杂的条件。

pattern.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // 一些判断条件
    }
}).or(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // 一些判断条件
    }
});

1.2.1.4. 量词

当使用循环模式时,需要对单例模式指定量词来指定希望事件出现的次数。

对一个命名为start的模式,你可以通过以下量词进行指定:

// 期望出现4次
start.times(4);

// 期望出现0或者4次
start.times(4).optional();

// 期望出现2、3或者4次
start.times(2, 4);

// 期望出现2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).greedy();

// 期望出现0、2、3或者4次
start.times(2, 4).optional();

// 期望出现0、2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).optional().greedy();

// 期望出现1到多次
start.oneOrMore();

// 期望出现1到多次,并且尽可能的重复次数多
start.oneOrMore().greedy();

// 期望出现0到多次
start.oneOrMore().optional();

// 期望出现0到多次,并且尽可能的重复次数多
start.oneOrMore().optional().greedy();

// 期望出现2到多次
start.timesOrMore(2);

// 期望出现2到多次,并且尽可能的重复次数多
start.timesOrMore(2).greedy();

// 期望出现0、2或多次
start.timesOrMore(2).optional();

// 期望出现0、2或多次,并且尽可能的重复次数多
start.timesOrMore(2).optional().greedy();

你可以使用pattern.greedy()方法让循环模式变成贪心的,但现在还不能让模式组贪心。 你可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式。

1.2.2. 终止条件

当使用循环模式时,可以指定一个结束条件来终止循环,例如,接受事件的值大于5满足累计值的和小于50时结束循环。

为了更好的理解它,看下面的例子。给定

  • 模式如"(a+ until b)" (一个或者更多的"a"直到"b")

  • 到来的事件序列"a1" "c" "a2" "b" "a3"

  • 输出结果会是: {a1 a2} {a1} {a2} {a3}.

你可以看到{a1 a2 a3}{a2 a3}由于停止条件没有被输出

1.2.3. 单个模式API描述(部分)

模式操作

描述

where(condition)

为当前模式定义一个条件。为了匹配这个模式,一个事件必须满足某些条件。 多个连续的where()语句取与组成判断条件:

        pattern.where(new IterativeCondition() {
            @Override
            public boolean filter(Event value, Context ctx) throws Exception {
                return ... // 一些判断条件 } }); 
            }
        }

or(condition)

增加一个新的判断,和当前的判断取或。一个事件只要满足至少一个判断条件就匹配到模式:

pattern.where(new IterativeCondition() {
            @Override
            public boolean filter(Event value, Context ctx) throws Exception {
                return ... // 一些判断条件 
            }
        }).or(new IterativeCondition() {
            @Override
            public boolean filter(Event value, Context ctx) throws Exception {
                return ... // 替代条件
            }
        });

until(condition)

为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。

只适用于和oneOrMore()同时使用。

NOTE: 在基于事件的条件中,它可用于清理对应模式的状态。

        pattern.oneOrMore().until(new IterativeCondition() {
            @Override
            public boolean filter(Event value, Context ctx) throws Exception {
                return ... // 替代条件
            }
        });

subtype(subClass)

为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式:

pattern.subtype(SubEvent.class);

oneOrMore()

指定模式期望匹配到的事件至少出现一次。.

默认(在子事件间)使用松散的内部连续性。 关于内部连续性的更多信息可以参考连续性

NOTE: 推荐使用until()或者within()来清理状态。

 pattern.oneOrMore();

timesOrMore(#times)

指定模式期望匹配到的事件至少出现#times次。.

默认(在子事件间)使用松散的内部连续性。 关于内部连续性的更多信息可以参考连续性

pattern.timesOrMore(2); 

times(#ofTimes)

指定模式期望匹配到的事件正好出现的次数。

默认(在子事件间)使用松散的内部连续性。 关于内部连续性的更多信息可以参考连续性

pattern.times(2); 

times(#fromTimes, #toTimes)

指定模式期望匹配到的事件出现次数在#fromTimes#toTimes之间。

默认(在子事件间)使用松散的内部连续性。 关于内部连续性的更多信息可以参考连续性

pattern.times(2, 4); 

optional()

指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。

pattern.oneOrMore().optional(); 

greedy()

指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。

pattern.oneOrMore().greedy();

1.3. 组合模式

上面看到了单个模式是什么样的,组合模式就是由多个单个模式组合成的模式序列。

一个组合模式一般有一个初始模式作为模式的入口。例如:

Pattern.<Event>begin("start")   
                .oneOrMore()       
                .where(new IterativeCondition<Event>() {    /
                    @Override
                    public boolean filter(Event value, Context<Event> ctx) throws Exception {
                        return "1".equals(value.getValue());
                    }
                })
                ./* and */or(new IterativeCondition<Event>() {
                    @Override
                    public boolean filter(Event value, Context<Event> ctx) throws Exception {
                        return "2".equals(value.getValue());
                    }
                });

1.3.1. 匹配后跳过策略

对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:

  • NO_SKIP: 每个成功的匹配都会被输出。

  • SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。

  • SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。

  • SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。

  • SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。

注意:当使用SKIP_TO_FIRST和SKIP_TO_LAST策略时,需要指定一个合法的PatternName

例如,给定一个模式b+ c和一个数据流b1 b2 b3 c,不同跳过策略之间的不同如下:

跳过策略

策略描述

结果

结果描述

NO_SKIP

所有可能的匹配都会输出

b1 b2 b3 c


b2 b3 c


b3 c



找到匹配b1 b2 b3 c

之后,不会丢弃任何结果。

SKIP_TO_NEXT

找到一个完整的匹配后,从匹配的第一个事件的下一个事件开始继续匹配。

b1 b2 b3 c


b2 b3 c


b3 c



找到匹配b1 b2 b3 c

之后,不会丢弃任何结果,因为没有以b1

开始的其他匹配。

SKIP_PAST_LAST_EVENT

找到一个完整的匹配后, 下一次匹配从当前匹配的最后一个事件的下一个事件开始

b1 b2 b3 c



找到匹配b1 b2 b3 c

之后,会丢弃其他所有的部分匹配。

SKIP_TO_FIRST[b]

找到一个完整的匹配后,下一次匹配从第一个出现的 b事件开始继续匹配。

b 为分界线, 第一个 b 事件之前的部分匹配结果被丢弃

b1 b2 b3 c


b2 b3 c


b3 c



找到匹配b1 b2 b3 c

之后,会尝试丢弃所有在b1

之前开始的部分匹配,但没有这样的匹配,所以没有任何匹配被丢弃。

SKIP_TO_LAST[b]

找到一个完整的匹配后,下一次匹配从最后一个出现的 b事件开始继续匹配。

b 为分界线, 第一个 b 到最后一个 b 中间的结果被丢弃

b1 b2 b3 c


b3 c



找到匹配b1 b2 b3 c

之后,会尝试丢弃所有在b3

之前开始的部分匹配,有一个这样的b2 b3 c

被丢弃。

上述例子中 NO_SKIP 和 SKIP_TO_FIRST 看不出明显差别,下面再举一个例子说明它俩之间的差别。

模式: (a | b | c) (b | c) c+.greedy d,输入:a b c1 c2 c3 d,结果将会是:

跳过策略

结果

描述

NO_SKIP

a b c1 c2 c3 d


b c1 c2 c3 d


c1 c2 c3 d



找到匹配a b c1 c2 c3 d

之后,不会丢弃任何结果。

SKIP_TO_FIRST[c*]

a b c1 c2 c3 d


c1 c2 c3 d



找到匹配a b c1 c2 c3 d

之后,会丢弃所有在c1

之前开始的部分匹配,有一个这样的b c1 c2 c3 d

被丢弃。

为了更好的理解NO_SKIP和SKIP_TO_NEXT之间的差别,看下面的例子: 模式:a b+,输入:a b1 b2 b3,结果将会是

跳过策略

结果

描述

NO_SKIP

a b1


a b1 b2


a b1 b2 b3



找到匹配a b1

之后,不会丢弃任何结果。

SKIP_TO_NEXT

a b1



找到匹配a b1

之后,会丢弃所有以a

开始的部分匹配。这意味着不会产生a b1 b2

a b1 b2 b3

了。

想指定要使用的跳过策略,只需要调用下面的方法创建AfterMatchSkipStrategy

默认情况下会使用NO_SKIP策略

方法

描述

AfterMatchSkipStrategy.noSkip()

创建NO_SKIP策略

AfterMatchSkipStrategy.skipToNext()

创建SKIP_TO_NEXT策略

AfterMatchSkipStrategy.skipPastLastEvent()

创建SKIP_PAST_LAST_EVENT策略

AfterMatchSkipStrategy.skipToFirst(patternName)

创建引用模式名称为patternNameSKIP_TO_FIRST策略

AfterMatchSkipStrategy.skipToLast(patternName)

创建引用模式名称为patternNameSKIP_TO_LAST策略

可以通过调用下面方法将跳过策略应用到模式上:

AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("start", skipStrategy);

使用SKIP_TO_FIRST/LAST时,有两个选项可以用来处理没有事件可以映射到对应的变量名上的情况。另外一个选项是抛出异常。 可以使用如下的选项:

AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()

1.3.2. 事件间连续策略

1.3.2.1. 事件约束

  • 严格连续: 期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。

  • 松散连续: 忽略匹配的事件之间的不匹配的事件。

  • 不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。

可以使用下面的方法来指定模式之间的连续策略:

  • next(),指定严格连续,

  • followedBy(),指定松散连续,

  • followedByAny(),指定不确定的松散连续。

或者

  • notNext(),不想后面直接连着一个特定事件

  • notFollowedBy(),不想一个特定事件发生在两个事件之间的任何地方。

注意:

  • 模式序列不能以notFollowedBy()结尾。

  • 一个 NOT 模式前面不能是可选的模式。

模式操作

描述

next()

  • 描述:

  • 表示 严格连续 匹配,即两个事件之间 不能有任何其他事件插入

  • 如果在 startmiddle 之间出现了不相关的事件,这个模式就会 失败

  • 应用场景:

  • 用于需要事件严格按照某种顺序紧密相连的场景,比如 A -> B -> C 这样的操作中,必须确保 ABBC 之间没有其他事件。

// 严格连续
Pattern<Event, ?> strict = start.next("middle").where(...);

followedBy()

  • 描述:

  • 表示 松散连续 匹配,即两个事件之间 可以有其他事件插入

  • 只要在 start 之后的任意位置找到 middle,匹配就算成功。

  • 应用场景:

  • 用于需要识别事件顺序,但对中间的噪音事件不敏感的场景,比如 A -> (任何事件) -> B

// 松散连续
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);

followedByAny()

  • 描述:

  • 表示 不确定的松散连续 匹配,和 followedBy 类似,但会尝试匹配所有可能的路径。

  • 多分支匹配:对于每一个符合 start 的事件,都会尝试寻找所有可能的 middle,从而产生多个匹配结果。

  • 应用场景:

  • 用于需要保留事件间所有可能关系的场景,比如分析复杂的多事件依赖关系。

// 不确定的松散连续
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);

notNext()

  • 描述:

  • 表示 严格连续的 NOT 模式,即在 start 和后续事件之间 不能出现某种特定事件 (not)

  • 如果 not 出现在 start 后面紧接的位置,则模式会 失败

  • 应用场景:

  • 用于需要严格排除某些事件的场景,比如在 A -> B 的事件链中,A 后面不能紧跟 C

Pattern<Event, ?> strictNot = start.notNext("not").where(...);

notFollowedBy()

  • 描述:

  • 表示 松散连续的 NOT 模式,即在 start 和后续任意事件之间,不能出现某种特定事件 (not)

  • 如果 not 出现在 startmiddle 之间,则模式会 失败

  • 应用场景:

  • 用于需要松散排除某些事件的场景,比如在 A -> (任何事件) -> B 的事件链中,AB 之间不能出现 C

Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);

松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上,而不确定的松散连接情况下,有着同样起始的多个匹配会被输出。 举例来说,模式"a b",给定事件序列"a","c","b1","b2",会产生如下的结果:

  1. "a""b"之间严格连续: {} (没有匹配),"a"之后的"c"导致"a"被丢弃。

  2. "a""b"之间松散连续: {a b1},松散连续会"跳过不匹配的事件直到匹配上的事件"。

  3. "a""b"之间不确定的松散连续: {a b1}, {a b2},这是最常见的情况,也是默认的选项。

1.3.2.2. 时间约束

也可以为模式定义一个有效时间约束。 例如,你可以通过pattern.within()方法指定一个模式应该在10秒内发生。这种时间模式支持处理时间和事件时间.

一个模式序列只能有一个时间限制。如果限制了多个时间在不同的单个模式上,会使用最小的那个时间限制。

next.within(Time.seconds(10));

1.3.2.3. 循环模式中的连续性 (扩展)

你可以在循环模式中使用和前面章节讲过的同样的连续性。 连续性会被运用在被接受进入模式的事件之间。 用这个例子来说明上面所说的连续性,一个模式序列"a b+ c""a"后面跟着一个或者多个(不确定连续的)"b",然后跟着一个"c") 输入为"a","b1","d1","b2","d2","b3","c",输出结果如下:

简单来说,循环模式下使用严格限制,每次匹配的 "b" 事件之间不能有其他事件。

  1. 严格连续: {a b1 c}, {a b2 c}, {a b3 c} - 没有相邻的 "b"

  2. 松散连续: {a b1 c}{a b1 b2 c}{a b1 b2 b3 c}{a b2 c}{a b2 b3 c}{a b3 c} - "d"都被忽略了。

  3. 不确定松散连续: {a b1 c}{a b1 b2 c}{a b1 b3 c}{a b1 b2 b3 c}{a b2 c}{a b2 b3 c}{a b3 c} - 注意{a b1 b3 c},这是因为"b"之间是不确定松散连续产生的。

对于循环模式(例如oneOrMore()times())),默认是松散连续。如果想使用严格连续,你需要使用consecutive()方法明确指定, 如果想使用不确定松散连续,你可以使用allowCombinations()方法。

模式操作

描述

consecutive()

  • oneOrMore()times()一起使用, 在匹配的事件之间施加严格的连续性, 也就是说,任何不匹配的事件都会终止匹配(和next()一样)。

  • 如果不使用它,那么就是松散连续(和followedBy()一样)。

例如,一个如下的模式:

Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

输入:C D A1 A2 A3 D A4 B,会产生下面的输出:

如果施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B}

不施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}

allowCombinations()

  • 与 oneOrMore() 和 times() 一起使用, 在匹配的事件中间施加不确定松散连续性(和followedByAny() 一样)。

  • 如果不使用,就是松散连续(和followedBy()一样)。

例如,一个如下的模式:

Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

输入:C D A1 A2 A3 D A4 B,会产生如下的输出:

  • 如果使用不确定松散连续: {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}

  • 如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}

1.4. 模式组

模式组是通过将多个模式组合起来处理事件流的一种机制。模式组的主要作用是为事件流的复杂匹配提供灵活性,允许针对不同的场景定义多个独立的模式,同时提高代码的组织性和效率

一个模式组由多个模式组成,每个模式可以独立匹配事件流中的部分数据。通过模式组,你可以一次性在同一个事件流中并行检测多个模式,而不需要为每个模式单独处理整个事件流。

例如定义一个模式序列作为beginfollowedByfollowedByAnynext的条件。这个模式序列在逻辑上会被当作匹配的条件, 并且返回一个GroupPattern,可以在GroupPattern上使用oneOrMore()times(#ofTimes)times(#fromTimes, #toTimes)optional()consecutive()allowCombinations()


Pattern<Event, ?> start = Pattern.begin(
    Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);

// 严格连续
Pattern<Event, ?> strict = start.next(
    Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);

// 松散连续
Pattern<Event, ?> relaxed = start.followedBy(
    Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();

// 不确定松散连续
Pattern<Event, ?> nonDetermin = start.followedByAny(
    Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();

下面是查询:

  • 第一阶段做了:a b c

  • 第二阶段做了:d+

  • 第三阶段做了:a c

Pattern.begin(
                Pattern.<Event>begin("stage1-a").where(
                                new SimpleCondition<Event>() {
                                    @Override
                                    public boolean filter(Event event) throws Exception {
                                        return event.eventName.equals("a");
                                    }
                                }
                        )
                        .next("stage1-b").where(
                                new SimpleCondition<Event>() {
                                    @Override
                                    public boolean filter(Event event) throws Exception {
                                        return event.eventName.equals("b");
                                    }
                                }
                        )
                        .next("stage1-c").where(
                                new SimpleCondition<Event>() {
                                    @Override
                                    public boolean filter(Event event) throws Exception {
                                        return event.eventName.equals("c");
                                    }
                                }
                        )
        ).next(
                Pattern.<Event>begin("stage2-d+")
                        .where(
                                new SimpleCondition<Event>() {
                                    @Override
                                    public boolean filter(Event event) throws Exception {
                                        return event.eventName.equals("d");
                                    }
                                }
                        ).oneOrMore()
        ).next(
                Pattern.<Event>begin("stage3-a").where(
                                new SimpleCondition<Event>() {
                                    @Override
                                    public boolean filter(Event event) throws Exception {
                                        return event.eventName.equals("a");
                                    }
                                }
                        )
                        .next("stage3-c").where(
                                new SimpleCondition<Event>() {
                                    @Override
                                    public boolean filter(Event event) throws Exception {
                                        return event.eventName.equals("c");
                                    }
                                }
                        )
        );

模式操作

描述

begin(#name)

定义一个开始的模式:

Pattern start = Pattern.begin("start"); 

begin(#pattern_sequence)

定义一个开始的模式:

Pattern start = Pattern.begin( Pattern.begin("start").where(...).followedBy("middle").where(...) ); 

next(#name)

增加一个新的模式。匹配的事件必须是直接跟在前面匹配到的事件后面(严格连续):

Pattern next = start.next("middle"); 

next(#pattern_sequence)

增加一个新的模式。匹配的事件序列必须是直接跟在前面匹配到的事件后面(严格连续):

 Pattern next = start.next( Pattern.begin("start").where(...).followedBy("middle").where(...) );

followedBy(#name)

增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间(松散连续):

Pattern followedBy = start.followedBy("middle"); 

followedBy(#pattern_sequence)

增加一个新的模式。可以有其他事件出现在匹配的事件序列和之前匹配到的事件中间(松散连续):

Pattern followedBy = start.followedBy( Pattern.begin("start").where(...).followedBy("middle").where(...) );

followedByAny(#name)

增加一个新的模式。可以有其他事件出现在匹配的事件和之前匹配到的事件中间, 每个可选的匹配事件都会作为可选的匹配结果输出(不确定的松散连续):

Pattern followedByAny = start.followedByAny("middle"); 

followedByAny(#pattern_sequence)

增加一个新的模式。可以有其他事件出现在匹配的事件序列和之前匹配到的事件中间, 每个可选的匹配事件序列都会作为可选的匹配结果输出(不确定的松散连续):

Pattern followedByAny = start.followedByAny( Pattern.begin("start").where(...).followedBy("middle").where(...) );

notNext()

增加一个新的否定模式。匹配的(否定)事件必须直接跟在前面匹配到的事件之后(严格连续)来丢弃这些部分匹配:

Pattern notNext = start.notNext("not");

notFollowedBy()

增加一个新的否定模式。即使有其他事件在匹配的(否定)事件和之前匹配的事件之间发生, 部分匹配的事件序列也会被丢弃(松散连续):

Pattern notFollowedBy = start.notFollowedBy("not"); 

within(time)

定义匹配模式的事件序列出现的最大时间间隔。如果未完成的事件序列超过了这个事件,就会被丢弃:

pattern.within(Time.seconds(10)); 

2. 抽取数据

2.1. 检测模式

在指定了要寻找的模式后,该把它们应用到输入流上来发现可能的匹配了。

为了在事件流上运行你的模式,需要创建一个PatternStream。 给定一个输入流input,一个模式pattern和一个可选的用来对使用事件时间时有同样时间戳或者同时到达的事件进行排序的比较器comparator, 你可以通过调用如下方法来创建PatternStream

DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // 可选的

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

输入流根据你的使用场景可以是keyed或者non-keyed

non-keyed 流上使用模式将会使你的作业并发度被设为1。

2.2. 从模式中选取数据

在获得到一个PatternStream之后,你可以应用各种转换来发现事件序列。推荐使用PatternProcessFunction

PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List<IN>>的格式接收一个匹配,Map的Key是你的模式序列中的每个模式的名称,Value是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。

为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()times())时, 对一个模式会有不止一个事件被接受。

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        IN startEvent = match.get("start").get(0);
        IN endEvent = match.get("end").get(0);
        out.collect(OUT(startEvent, endEvent));
    }
}

PatternProcessFunction可以访问Context对象。有了它之后,你可以访问时间属性,比如currentProcessingTime或者当前匹配的timestamp (最新分配到匹配上的事件的时间戳)。 更多信息可以看时间上下文。 通过这个上下文也可以将结果输出到侧输出.

2.2.1. 处理超时的匹配部分

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
        IN startEvent = match.get("start").get(0);
        ctx.output(outputTag, T(startEvent));
    }
}

processTimedOutMatch不能访问主输出。 但你可以通过Context对象把结果输出到侧输出

3. CEP Demo

下面的例子在一个并行度的Events流上检测模式start, middle(name = "error") -> end(name = "critical")。 事件按照idKeyBy,一个有效的模式需要发生在10秒内。

StreamExecutionEnvironment env = ...

DataStream<Event> input = ...

DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
	@Override
	public Integer getKey(Event value) throws Exception {
		return value.getId();
	}
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
	.next("middle").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("error");
		}
	}).followedBy("end").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("critical");
		}
	}).within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
	@Override
	public Alert select(Map<String, List<Event>> pattern) throws Exception {
		return createAlert(pattern);
	}
});

0

评论区