李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
17.Flink流处理API之Sink
Leefs
2022-01-17 PM
1413℃
0条
[TOC] ### 前言 使用 Flink 进行数据处理时,数据经 Data Source 流入,通过系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。 ### 一、概述 Flink没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。 ```scala stream.addSink(new MySink(xxxx)) ``` 官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。 ![17.Flink流处理API之Sink01.jpg](https://lilinchao.com/usr/uploads/2022/01/3405132150.jpg) ### 二、输出到文件 ```scala import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/17 * Description 输出到文件 */ object SinkToFileTest { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从集合中读取数据 val dataList = List("10001,Thyee,60","10002,Jeyoo,100","10001,Leefs,90","10001,Thsue,58","10001,Hyee,56") val studentStream: DataStream[String] = env.fromCollection(dataList) //将数据转换成样例类 val studentData: DataStream[Student] = studentStream.map(data => { val array: Array[String] = data.split(",") Student(array(0).toInt, array(1), array(2).toDouble) }) //这两种方法已经被弃用 //直接写入文件 // studentData.writeAsText("") // studentData.writeAsCsv("") //新的输出到文件方式 //该种方式输出的文件带时间 studentData.addSink(StreamingFileSink.forRowFormat[Student]( new Path("E:\\data\\flink"), new SimpleStringEncoder[Student]("UTF-8") ).build()) env.execute("sink to file test") } } //定义样例类 case class Student(id:Int,name:String,score:Double) ``` **运行结果** ![17.Flink流处理API之Sink02.jpg](https://lilinchao.com/usr/uploads/2022/01/3928896107.jpg) ### 三、输出到Socket ```scala import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.util.serialization._ /** * Created by lilinchao * Date 2022/1/17 * Description 写入Socket */ object SinkToSocketTest { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从集合中读取数据 val dataList = List("10001,Thyee,60","10002,Jeyoo,100","10001,Leefs,90","10001,Thsue,58","10001,Hyee,56") val studentStream: DataStream[String] = env.fromCollection(dataList) //将数据转换成样例类 val studentData: DataStream[Student] = studentStream.map(data => { val array: Array[String] = data.split(",") Student(array(0).toInt, array(1), array(2).toDouble) }) //根据 SerializationSchema 将数据输出到 socket studentData.writeToSocket("192.168.61.129",7777,new SerializationSchema[Student]{ override def serialize(t: Student): Array[Byte] = { (t + "\n").getBytes() } }) env.execute("sink to socket test") } } //定义样例类 case class Student(id:Int,name:String,score:Double) ``` **运行结果** ![17.Flink流处理API之Sink03.jpg](https://lilinchao.com/usr/uploads/2022/01/497614082.jpg) ### 四、输出到Kafka #### 4.1 引入依赖 ```xml
org.apache.flink
flink-connector-kafka-0.11_2.12
1.10.1
``` #### 4.2 代码实现 ```scala import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka._ /** * Created by lilinchao * Date 2022/1/17 * Description 写入Kafka */ object SinkToKafkaTest { def main(args: Array[String]): Unit = { // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //从集合中读取数据 val dataList = List("10001,Thyee,60","10002,Jeyoo,100","10001,Leefs,90","10001,Thsue,58","10001,Hyee,56") val studentStream: DataStream[String] = env.fromCollection(dataList) //将数据转换成样例类 val studentData: DataStream[String] = studentStream.map(data => { val array: Array[String] = data.split(",") //使用toString方法转换成为String类型 Student(array(0).toInt, array(1), array(2).toDouble).toString }) studentData.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "sinktest", new SimpleStringSchema())) env.execute("sink to kafka test") } } //定义样例类 case class Student(id:Int,name:String,score:Double) ``` ### 五、JDBC自定义Sink #### 5.1 数据准备 + **sensor.txt** ```basic sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9 ``` #### 5.2 引入MySQL连接依赖 ```xml
mysql
mysql-connector-java
5.1.44
``` #### 5.3 代码实现 ```scala import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/17 * Description 自定义输出MySQL */ object SinkToJDBC { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputDStream: DataStream[String] = env.readTextFile("datas/sensor.txt") val dataDstream: DataStream[SensorReading] = inputDStream.map( data => { val dataArray: Array[String] = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) //调用自定义JDBC连接 dataDstream.addSink( MyJdbcSink() ) dataDstream.print("mysql") env.execute("sink jdbc test") } } case class MyJdbcSink() extends RichSinkFunction[SensorReading]{ // 声明连接变量 var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { // 创建连接和预编译语句 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456") insertStmt = conn.prepareStatement("insert into sensor_temp(id,temperature) values(?,?)") updateStmt = conn.prepareStatement("update sensor_temp set temperature = ? where id = ?") } // 每来一条数据,就调用连接,执行一次sql override def invoke(value: SensorReading): Unit = { // 直接执行udate语句,如果没有更新数据,那么执行insert updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() if(updateStmt.getUpdateCount == 0){ insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1856.html
上一篇
16.Flink实现UDF函数
下一篇
18.Flink window API介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Java
Netty
SpringCloud
SpringBoot
gorm
人工智能
MyBatis
Azkaban
Hive
哈希表
ajax
SQL练习题
Tomcat
Sentinel
Quartz
BurpSuite
Kafka
GET和POST
SpringCloudAlibaba
容器深入研究
LeetCode刷题
机器学习
Beego
Linux
Hadoop
Livy
并发编程
pytorch
Golang
MyBatis-Plus
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭