李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
13.Flink流处理API之Source
Leefs
2022-01-12 PM
1505℃
0条
[TOC] ### 前言 flink支持从文件、socket、集合中读取数据。同时也提供了一些接口类和抽象类来支撑实现自定义Source。 **版本:** + flink 1.14.2 + scala 2.12 ### 一、基于本地集合的source + **引入pom.xml依赖** ```xml
1.14.2
2.12
${flink.version}
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
org.apache.flink
flink-cep-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-connector-kafka-0.11_2.12
1.10.1
org.apache.hadoop
hadoop-client
3.2.1
``` 注意:因使用1.14.2版本连接kafka和之前版本差别比较大,本次将使用1.10.1版本依赖连接kafka。 + **创建LocalSource文件** ```scala import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/10 * Description 基于本地集合的source */ object LocalSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //用element创建DataStream(fromElements) val dataStream: DataStream[String] = env.fromElements("spark","flink") dataStream.print().setParallelism(1) //用Tuple创建DataStream(fromElements) val tupleData: DataStream[(Int, String)] = env.fromElements((1,"spark"),(2,"flink")) tupleData.print().setParallelism(1) //用Array创建DataStream val arrayData: DataStream[String] = env.fromCollection(Array("spark","flink")) arrayData.print().setParallelism(1) //用List创建DataStream val listData: DataStream[String] = env.fromCollection(List("spark","flink")) listData.print().setParallelism(1) env.execute("local source test") } } ``` ### 二、基于文件的source **2.1 数据准备** + **student.csv** ```basic name,age aaa,12 bbb,13 ccc,14 ddd,15 ``` + **word.txt** ```basic hello spark hello flink hello scala ``` **2.2 离线任务** + **创建ExecutionFileSource** ```scala import org.apache.flink.api.scala._ /** * Created by lilinchao * Date 2022/1/10 * Description 基于文件的source */ object ExecutionFileSource { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // val textFileData = fromTextFile(env) //输出 // textFileData.print() val csvFileData: DataSet[Student] = fromCsvFile(env) csvFileData.print() // env.execute("file source test") } /** * 读取文本文件 * @param env * @return */ def fromTextFile(env:ExecutionEnvironment): DataSet[String] = { val textFile: DataSet[String] = env.readTextFile("datas/word.txt") textFile } /** * 读取Csv文件 */ def fromCsvFile(env:ExecutionEnvironment):DataSet[Student] = { val csvData: DataSet[Student] = env.readCsvFile[Student]("datas/student.csv", ignoreFirstLine = true, pojoFields = Array("name", "age")) csvData } case class Student(name:String,age:Int) } ``` **2.3 流式任务** + **创建StreamExecutionFileSource** ```scala import org.apache.flink.streaming.api.scala._ /** * Created by lilinchao * Date 2022/1/12 * Description 1.0 */ object StreamExecutionFileSource { def main(args: Array[String]): Unit = { //创建运行时环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //1.读取本地文件 val csvDataStream: DataStream[String] = env.readTextFile("datas/student.csv") csvDataStream.print() //2.读取HDFS文件 val hdfsDataStream: DataStream[String] = env.readTextFile("hdfs://hadoop001:9000/student/student.txt") hdfsDataStream.print() env.execute("file source test") } } ``` ### 三、基于网络套接字的source + **创建SocketSource** ```scala /** * Created by lilinchao * Date 2022/1/10 * Description socket文本流 */ object SocketSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val socketData: DataStream[String] = env.socketTextStream("192.168.61.129",8002) socketData.print().setParallelism(1) env.execute("socket source test") } } ``` + **运行netcat监听8002端口** ``` [root@hadoop001 ~]# nc -lk 8002 ``` ### 四、读取消息队列(Kafka)的数据 Flink 提供了一个 Apache Kafka 连接器,用于从Kafka Topic读取数据和向Kafka Topic写入数据,并保证恰好一次次语义。 + **创建KafkaSource** ```scala import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 /** * Created by lilinchao * Date 2022/1/12 * Description 以Kafka作为数据源 */ object KafkaSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") // 当开启checkpoints时必须设置groupid,否则抛出无效groupid的异常 properties.setProperty("group.id","consumer-group") properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val kafkaStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties)) kafkaStream.print() env.execute("kafka source test") } } ``` ### 五、自定义Source 除了以上的source数据来源,我们还可以自定义source。需要做的,只是传入一个 SourceFunction就可以。 **具体实现** ```scala import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala._ import scala.util.Random /** * Created by lilinchao * Date 2022/1/12 * Description 自定义Source */ object MySource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val dataSource: DataStream[SensorReading] = env.addSource(new MySensorSource()) dataSource.print() env.execute("my source test") } } class MySensorSource extends SourceFunction[SensorReading]{ // flag: 表示数据源是否还在正常运行 var running: Boolean = true override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { //初始化一个随机数发生器 val rand = new Random() //随机生成一组(10个)传感器的初始温度: (id,temp) var curTemp = 1.to(10).map( i => ("sensor_"+i,65 + rand.nextGaussian() * 20) ) // 定义无限循环,不停地产生数据,除非被cancel while (running){ // 在上次数据基础上微调,更新温度值 curTemp = curTemp.map( t => (t._1,t._2+rand.nextGaussian()) ) // 获取当前时间戳,加入到数据中,调用ctx.collect发出数据 val curTime = System.currentTimeMillis() curTemp.foreach( t => sourceContext.collect(SensorReading(t._1, curTime, t._2)) ) // 间隔500ms Thread.sleep(500) } } override def cancel(): Unit = { running = false } } // 定义样例类,传感器 id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double) ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1831.html
上一篇
12.Flink流处理API之Environment
下一篇
14.Flink流处理API之Transform转换算子
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Filter
Spark RDD
Livy
Netty
Flink
线程池
Git
MyBatis-Plus
Kafka
FastDFS
Sentinel
JavaWEB项目搭建
数据结构和算法
微服务
Golang基础
数学
SpringBoot
Python
Jenkins
Quartz
序列化和反序列化
正则表达式
Http
设计模式
BurpSuite
并发编程
Spark Core
DataWarehouse
Eclipse
Golang
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭