李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
03.Flink SQL之在Catalog中注册表
Leefs
2022-02-21 PM
1510℃
0条
[TOC] ### 一、表(Table)的概念 + TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 `Catalog-Table` 表之间的map。 + 表(Table)是由一个“标识符”来指定的,由三部分组成:Catalog 名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。 + 表可以是常规的(Table,表),或者虚拟的(View,视图)。 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转换而来。 视图可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果。 ### 二、创建表 TableEnvironment可以调用 `.connect()` 方法,连接外部系统,并调用 `.createTemporaryTable()` 方法,在 Catalog 中注册表: ```java tableEnv .connect(...) // 定义表的数据来源,和外部系统建立连接 .withFormat(...) // 定义数据格式化方法 .withSchema(...) // 定义表结构 .createTemporaryTable("MyTable") // 创建临时表 ``` + 可以创建 Table 来描述文件数据,它可以从文件中读取,或者将数据写入文件 ```java tableEnv .connect( new FileSystem().path(“YOUR_Path/sensor.txt”) ) // 定义到文件系统的连接 .withFormat(new Csv()) // 定义以csv格式进行数据格式化 .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("sensorTable") // 创建临时表 ``` ### 三、实例 + **数据准备** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 sensor_1,1547718215,32.9 sensor_1,1547718218,33.6 sensor_1,1547718225,35.8 ``` #### 3.1 连接到文件系统(Csv 格式) 连接外部系统在 Catalog 中注册表,直接调用 `tableEnv.connect()`就可以,里面参数要传 入一个 `ConnectorDescriptor`,也就是 `connector` 描述器。对于文件系统的 `connector` 而言,flink 内部已经提供了,就叫做 `FileSystem()` **引入依赖** ```xml
org.apache.flink
flink-csv
${flink.version}
``` 引入csv文件依赖 **实现代码** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @author lilinchao * @date 2022/2/21 * @description 连接文件系统 **/ object Demo { def main(args: Array[String]): Unit = { // 1. 创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv = StreamTableEnvironment.create(env) // 2. 连接外部系统,读取数据 tableEnv .connect(new FileSystem().path("datas/sensor.txt")) //定义表数据来源,外部连接 .withFormat(new Csv()) //定义从外部系统读取数据之后的格式化方法 .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ) //定义表结构 .createTemporaryTable("inputTable") val inputTable: Table = tableEnv.from("inputTable") inputTable.toAppendStream[(String,Long,Double)].print() env.execute("Demo") } } ``` #### 3.2 连接到 Kafka kafka 的连接器 `flink-kafka-connector` 中,1.10 版本的已经提供了 Table API 的支持。我们 可以在 connect 方法中直接传入一个叫做 Kafka 的类,这就是 kafka 连接器的描述器 `ConnectorDescriptor`。 **实现代码** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, Kafka, Schema} /** * @author lilinchao * @date 2022/2/21 * @description 1.0 **/ object ConnectKafkaDemo { def main(args: Array[String]): Unit = { // 1. 创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv = StreamTableEnvironment.create(env) //2. 从Kafka读取数据 tableEnv.connect(new Kafka() .version("0.11") .topic("flinkdemo") .property("zookeeper.connect", "192.168.159.135:2181") .property("bootstrap.servers", "192.168.159.135:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaInputTable") val inputTable: Table = tableEnv.from("kafkaInputTable") inputTable.toAppendStream[(String,Long,Double)].print() env.execute("ConnectKafkaDemo") } } ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1905.html
上一篇
02.Table API和Flink SQL程序的结构
下一篇
04.Table API和Flink SQL表的查询
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
MyBatis-Plus
Elasticsearch
稀疏数组
人工智能
锁
Java工具类
容器深入研究
FileBeat
持有对象
Azkaban
Zookeeper
Kafka
Elastisearch
Scala
Spark
排序
Hadoop
GET和POST
LeetCode刷题
JavaScript
Stream流
哈希表
MyBatisX
HDFS
JavaWeb
SpringBoot
Nacos
Flume
Eclipse
随笔
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞