" />

警告:即将离开本站

点击"继续"将前往其他页面,确认后跳转。

侧边栏壁纸
  • 累计撰写 19 篇文章
  • 累计创建 2 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Flink SQL(二)

dengdz
2024-12-16 / 0 评论 / 0 点赞 / 32 阅读 / 0 字

1. 底层实现

Flink通过Apache另一个开源项目calcite作为SQL解析器将SQL语句解析为AST,最终转换为Flink的执行逻辑。核心流程包括:

  1. SQL 解析与验证

  2. 逻辑计划生成

  3. 查询优化(RBO 和 CBO)

  4. 物理计划生成

  5. 任务执行与调度

2. 环境配置

2.1. 使用前先下载对应的jar ,放在flink的\lib\下 ,并重新启动flink集群

https://blog.csdn.net/qq_35758926/article/details/111709948

2.1.1. hive

需要将

flink-connector-hive_2.12-1.14.3.jar

hadoop-mapreduce-client-core-3.2.2.jar(保证正常执行yarn任务)

hive-exec-3.1.2.jar

libfb303-0.9.3.jar

放入flink的lib下

https://www.modb.pro/db/476071

2.1.2. mysql

需要将

flink-connector-jdbc_2.12-1.14.3.jar

mysql-connector-java-8.0.25.jar

放入flink的lib下

2.1.3. kafka/upsert-kafka

需要将

flink-connector-kafka_2.12-1.14.3.jar

放入flink的lib下

2.1.4. elasticsearch

需要将

flink-sql-connector-elasticsearch7_2.12-1.14.3.jar

放入flink的lib下

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/

2.2. 根据sql文件执行任务

sql-client.sh -f sql.sql

3. 参数配置

3.1. 表格模式(table mode)

在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:

SET 'sql-client.execution.result-mode' = 'table';

3.2. 变更日志模式(changelog mode)

不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

SET 'sql-client.execution.result-mode' = 'changelog';

3.3. Tableau模式(tableau mode)

更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):

SET 'sql-client.execution.result-mode' = 'tableau';

4. Connector

4.1. mysql

4.1.1. 建表

CREATE TABLE mysql_table_stu (
  id INT,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://hadoop1:3306/test',
   'table-name' = 'stu',
   'username' = 'root',
   'password' = '123456'
);

4.2. hive

4.2.1. 建表

-- 创建hive catalog
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/module/hive-3.1.2/conf'
);
-- 使用 hive catalog
use catalog myhive;

4.2.2. mysql to hive

insert into mysql_table_stu select * from stu;

4.3. kafka

主要提供kafka查询能力

4.3.1. 建表

CREATE TABLE kafka_table_stu (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `id` INT,
  `name` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'stu',
  'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- 如果没有会自动topic

4.3.2. 查询

-- 查询Kafka时需要将表模式设置成流模式
SET 'execution.runtime-mode' = 'streaming';

-- 插入数据  {"id":"1","name":"zhangsan"}

select * from kafka_table_stu;

4.4. upsert-kafka

4.4.1. 建表

CREATE TABLE kafka_table_stu_upsert (
  `id` INT,
  `name` STRING,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'stu_upsert',
  'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

-- 如果没有会自动topic

4.4.2. 插入

insert into kafka_table_stu_upsert 
select id ,name  from kafka_table_stu;

4.5. elasticsearch

4.5.1. 建表

CREATE TABLE es_table_stu (
  id STRING,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://192.168.100.201:9200;http://192.168.100.202:9200;http://192.168.100.203:9200',
  'index' = 'stu',
  'username' = 'elastic',
  'password' = 'elastic'
);
-- 如果没有会自动索引

4.5.2. 插入

insert into es_table_stu(id,name) values( '1','zhangsan');

0

评论区