李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
03.Flink SQL之在Catalog中注册表
Leefs
2022-02-21 PM
1875℃
0条
[TOC] ### 一、表(Table)的概念 + TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 `Catalog-Table` 表之间的map。 + 表(Table)是由一个“标识符”来指定的,由三部分组成:Catalog 名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。 + 表可以是常规的(Table,表),或者虚拟的(View,视图)。 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转换而来。 视图可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果。 ### 二、创建表 TableEnvironment可以调用 `.connect()` 方法,连接外部系统,并调用 `.createTemporaryTable()` 方法,在 Catalog 中注册表: ```java tableEnv .connect(...) // 定义表的数据来源,和外部系统建立连接 .withFormat(...) // 定义数据格式化方法 .withSchema(...) // 定义表结构 .createTemporaryTable("MyTable") // 创建临时表 ``` + 可以创建 Table 来描述文件数据,它可以从文件中读取,或者将数据写入文件 ```java tableEnv .connect( new FileSystem().path(“YOUR_Path/sensor.txt”) ) // 定义到文件系统的连接 .withFormat(new Csv()) // 定义以csv格式进行数据格式化 .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) // 定义表结构 .createTemporaryTable("sensorTable") // 创建临时表 ``` ### 三、实例 + **数据准备** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 sensor_1,1547718215,32.9 sensor_1,1547718218,33.6 sensor_1,1547718225,35.8 ``` #### 3.1 连接到文件系统(Csv 格式) 连接外部系统在 Catalog 中注册表,直接调用 `tableEnv.connect()`就可以,里面参数要传 入一个 `ConnectorDescriptor`,也就是 `connector` 描述器。对于文件系统的 `connector` 而言,flink 内部已经提供了,就叫做 `FileSystem()` **引入依赖** ```xml
org.apache.flink
flink-csv
${flink.version}
``` 引入csv文件依赖 **实现代码** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @author lilinchao * @date 2022/2/21 * @description 连接文件系统 **/ object Demo { def main(args: Array[String]): Unit = { // 1. 创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv = StreamTableEnvironment.create(env) // 2. 连接外部系统,读取数据 tableEnv .connect(new FileSystem().path("datas/sensor.txt")) //定义表数据来源,外部连接 .withFormat(new Csv()) //定义从外部系统读取数据之后的格式化方法 .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ) //定义表结构 .createTemporaryTable("inputTable") val inputTable: Table = tableEnv.from("inputTable") inputTable.toAppendStream[(String,Long,Double)].print() env.execute("Demo") } } ``` #### 3.2 连接到 Kafka kafka 的连接器 `flink-kafka-connector` 中,1.10 版本的已经提供了 Table API 的支持。我们 可以在 connect 方法中直接传入一个叫做 Kafka 的类,这就是 kafka 连接器的描述器 `ConnectorDescriptor`。 **实现代码** ```java import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, Kafka, Schema} /** * @author lilinchao * @date 2022/2/21 * @description 1.0 **/ object ConnectKafkaDemo { def main(args: Array[String]): Unit = { // 1. 创建环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv = StreamTableEnvironment.create(env) //2. 从Kafka读取数据 tableEnv.connect(new Kafka() .version("0.11") .topic("flinkdemo") .property("zookeeper.connect", "192.168.159.135:2181") .property("bootstrap.servers", "192.168.159.135:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaInputTable") val inputTable: Table = tableEnv.from("kafkaInputTable") inputTable.toAppendStream[(String,Long,Double)].print() env.execute("ConnectKafkaDemo") } } ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1905.html
上一篇
02.Table API和Flink SQL程序的结构
下一篇
04.Table API和Flink SQL表的查询
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Tomcat
SQL练习题
Java编程思想
Elastisearch
Kafka
Typora
Jquery
MyBatis-Plus
Hbase
工具
Eclipse
MyBatis
JavaWeb
ajax
数据结构
Git
Elasticsearch
栈
FastDFS
SpringBoot
Jenkins
MySQL
Java工具类
pytorch
Yarn
国产数据库改造
Ubuntu
Hive
Spark RDD
RSA加解密
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭