1. 底层实现
Flink通过Apache另一个开源项目calcite作为SQL解析器将SQL语句解析为AST,最终转换为Flink的执行逻辑。核心流程包括:
SQL 解析与验证
逻辑计划生成
查询优化(RBO 和 CBO)
物理计划生成
任务执行与调度
2. 环境配置
2.1. 使用前先下载对应的jar ,放在flink的\lib\下 ,并重新启动flink集群
https://blog.csdn.net/qq_35758926/article/details/111709948
2.1.1. hive
需要将
flink-connector-hive_2.12-1.14.3.jar
hadoop-mapreduce-client-core-3.2.2.jar(保证正常执行yarn任务)
hive-exec-3.1.2.jar
libfb303-0.9.3.jar
放入flink的lib下
https://www.modb.pro/db/476071
2.1.2. mysql
需要将
flink-connector-jdbc_2.12-1.14.3.jar
mysql-connector-java-8.0.25.jar
放入flink的lib下
2.1.3. kafka/upsert-kafka
需要将
flink-connector-kafka_2.12-1.14.3.jar
放入flink的lib下
2.1.4. elasticsearch
需要将
flink-sql-connector-elasticsearch7_2.12-1.14.3.jar
放入flink的lib下
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/
2.2. 根据sql文件执行任务
sql-client.sh -f sql.sql
3. 参数配置
3.1. 表格模式(table mode)
在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET 'sql-client.execution.result-mode' = 'table';
3.2. 变更日志模式(changelog mode)
不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。
SET 'sql-client.execution.result-mode' = 'changelog';
3.3. Tableau模式(tableau mode)
更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):
SET 'sql-client.execution.result-mode' = 'tableau';
4. Connector
4.1. mysql
4.1.1. 建表
CREATE TABLE mysql_table_stu (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop1:3306/test',
'table-name' = 'stu',
'username' = 'root',
'password' = '123456'
);
4.2. hive
4.2.1. 建表
-- 创建hive catalog
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/opt/module/hive-3.1.2/conf'
);
-- 使用 hive catalog
use catalog myhive;
4.2.2. mysql to hive
insert into mysql_table_stu select * from stu;
4.3. kafka
主要提供kafka查询能力
4.3.1. 建表
CREATE TABLE kafka_table_stu (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`id` INT,
`name` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'stu',
'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
-- 如果没有会自动topic
4.3.2. 查询
-- 查询Kafka时需要将表模式设置成流模式
SET 'execution.runtime-mode' = 'streaming';
-- 插入数据 {"id":"1","name":"zhangsan"}
select * from kafka_table_stu;
4.4. upsert-kafka
4.4.1. 建表
CREATE TABLE kafka_table_stu_upsert (
`id` INT,
`name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'stu_upsert',
'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092',
'key.format' = 'json',
'value.format' = 'json'
);
-- 如果没有会自动topic
4.4.2. 插入
insert into kafka_table_stu_upsert
select id ,name from kafka_table_stu;
4.5. elasticsearch
4.5.1. 建表
CREATE TABLE es_table_stu (
id STRING,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.100.201:9200;http://192.168.100.202:9200;http://192.168.100.203:9200',
'index' = 'stu',
'username' = 'elastic',
'password' = 'elastic'
);
-- 如果没有会自动索引
4.5.2. 插入
insert into es_table_stu(id,name) values( '1','zhangsan');
评论区