在本人当前项目架构中有多个Flink作业需要在数据处理过程中查询并关联HBase中的数据。由于对最终的结果展现有时间上的要求,所以引出了本次关于HBase外部数据系统的查询方面的优化。
1. 同步I/O
传统情况下Flink查询HBase时是同步I/O连接。在查询a和b两条数据时,只有a的请求发送到数据库并且返回结果后,才能发送查询b的请求,所以这样在大数据量的查询情况下同步I/O效率是非常低的。可能会进一步出现反压等问题。
2. Async I/O
因此在Flink1.2版本时引入Async I/O,该功能由阿里捐献给社区。当查询数据时客户端发送多个异步请求,数据库先接收到哪个请求就优先处理哪个请求,并且Flink在处理时也是哪个结果先返回就先将结果传输到下游继续处理,这样好处就是不需要等待其他的结果,从而加快了Flink流处理时和外部系统交互时的速度。
3. 性能测试
本人项目中Flink在process算子中会按照用户ID作为RowKey到HBase的用户宽表查询用户的最新数据进行合并,并最终将用户的全量结果写入到Elasticsearch。
基于此场景在本地集群中准备了五千万条用户数据,包含用户ID、身份证号、手机号、性别、年龄、省份、城市几个属性字段。分别以全局并行度为 1 测试同步I/O和Async I/O所需的处理时长。
测试结果:
从两次测试结果来看Async I/O 相对于同步 I/O 的速度提升了大约 35%。
4. 代码
4.1. SyncIOProcessFunc
package com.dengdz.func;
import com.alibaba.fastjson.JSON;
import com.dengdz.utils.SyncHBase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class SyncIOProcessFunc extends ProcessFunction<String, String> {
public static final String HBASE_ASYNC_DEMO = "hbase_async_demo";
private SyncHBase syncHBase;
private final long bufferFlushMaxMutations = 1000;
private long star;
private long end;
private transient AtomicLong numPendingRequests;
private transient AtomicLong total;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
star = System.currentTimeMillis();
System.out.println("Start processing, current time:" + star);
syncHBase = new SyncHBase("Flink-Project/flink-async-hbase/src/main/resources/hbase-site.xml",
"Flink-Project/flink-async-hbase/src/main/resources/core-site.xml");
}
@Override
public void close() throws Exception {
super.close();
end = System.currentTimeMillis();
System.out.println("Start processing, current time:" + end + " ,time consuming:" + (end - star) + " ms");
syncHBase.close();
}
private void flush() {
System.out.println("Has been inserted:" + total.addAndGet(numPendingRequests.get()));
numPendingRequests.set(0);
}
@Override
public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
Map<String, Map<String, String>> row = syncHBase.getRow(HBASE_ASYNC_DEMO, value);
String jsonString = JSON.toJSONString(row);
out.collect(jsonString);
if (numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
flush();
}
}
}
4.2. AsyncIOProcessFunc
package com.dengdz.func;
import com.alibaba.fastjson.JSON;
import com.dengdz.utils.AsyncHBase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class AsyncIOProcessFunc extends RichAsyncFunction<String, String> {
public static final String HBASE_ASYNC_DEMO = "hbase_async_demo";
private final long bufferFlushMaxMutations = 1000;
private AsyncHBase asyncHBase;
private long star;
private long end;
private transient AtomicLong numPendingRequests;
private transient AtomicLong total;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
asyncHBase = new AsyncHBase("Flink-Project/flink-async-hbase/src/main/resources/hbase-site.xml",
"Flink-Project/flink-async-hbase/src/main/resources/core-site.xml");
star = System.currentTimeMillis();
System.out.println("Start processing, current time:" + star);
this.total = new AtomicLong(0);
this.numPendingRequests = new AtomicLong(0);
}
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return JSON.toJSONString(asyncHBase.getRow(HBASE_ASYNC_DEMO, input));
}
})
.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
resultFuture.complete(Collections.singletonList(s));
if (numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
flush();
}
}
});
}
private void flush() {
System.out.println("Has been inserted:" + total.addAndGet(numPendingRequests.get()));
numPendingRequests.set(0);
}
@Override
public void close() throws Exception {
super.close();
end = System.currentTimeMillis();
System.out.println("Start processing, current time:" + end + " ,time consuming:" + (end - star) + " ms");
asyncHBase.close();
}
@Override
public void timeout(String input, ResultFuture<String> resultFuture) throws Exception {
super.timeout(input, resultFuture);
}
}
5. 总结
从两次测试样本来看,Async I/O 确实要比同步I/O速度要快,但是从监控来看Async I/O时内存使用以及GC也比同步I/O要消耗的资源更大,所以在优化时也要考虑外部系统的承受压力,合理设置参数。有精力的兄弟可以自己测试一下~
评论区