李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
03.Flink入门案例
Leefs
2021-12-21 PM
1042℃
0条
[TOC] ### 前言 本篇将通过两个案例来入门Flink流式处理和批处理编程。 准备环境: + Scala 2.12版本 + 创建一个Maven工程项目 ### 一、准备 #### 1.1 引入依赖 + pom.xml ```xml
org.apache.flink
flink-scala_2.12
1.10.1
org.apache.flink
flink-streaming-scala_2.12
1.10.1
net.alchim31.maven
scala-maven-plugin
3.4.6
compile
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
``` #### 1.2 数据准备 + **创建文件word.txt** ```basic hello world hello flink hello scala hello java scala demo flink demo ``` 将文件存放在datas目录下。 #### 1.3 添加Scala框架和Scala文件夹 ![03.Flink入门案例01.jpg](https://lilinchao.com/usr/uploads/2021/12/120696203.jpg) ### 二、代码实现 #### 2.1 批处理WordCount ```scala import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ /** * @author lilinchao * @date 2021/12/21 * @description 批处理WordCount **/ object BatchWordCount { def main(args: Array[String]): Unit = { //创建执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //从文件中读取数据 val inputPath = "datas/word.txt" val inputDS: DataSet[String] = env.readTextFile(inputPath) //分词之后,对单词进行groupby分组,然后用sum进行聚合 val resultDataSet: AggregateDataSet[(String, Int)] = inputDS .flatMap(_.split(" ")) .map((_, 1)) .groupBy(0) .sum(1) //打印输出 resultDataSet.print() } } ``` **输出结果** ```basic (demo,2) (flink,2) (world,1) (hello,4) (java,1) (scala,2) ``` 注意:Flink程序支持 java和scala两种语言,本课程中以scala语言为主。在 引入包中,有java和scala 两种包时注意要使用scala的包。 #### 2.2 流处理WordCount ```scala import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ /** * @author lilinchao * @date 2021/12/21 * @description 流处理WordCount **/ object StreamWordCount { def main(args: Array[String]): Unit = { // 从外部命令中获取参 val params: ParameterTool = ParameterTool.fromArgs(args) val host: String = params.get("host") val port: Int = params.getInt("port") //创建流处理环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //接收socket文本流 val textDstream: DataStream[String] = env.socketTextStream(host,port) //flatMap 和 Map 需要引用的隐式转换 import org.apache.flink.api.scala._ val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s+")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(0) .sum(1) dataStream.print().setParallelism(1) // 启动 executor,执行任务 env.execute("Socket stream word count") } } ``` + 向args中传入如下参数 ```bash --host 192.168.159.135 --port 7777 ``` ![03.Flink入门案例02.jpg](https://lilinchao.com/usr/uploads/2021/12/36270177.jpg) 在 linux系统中用netcat命令进行发送测试: ```bash nc -lk 7777 ``` **运行结果** ``` (hello,1) (hello,2) (scala,1) (scala,2) (demo,1) (hello,3) (hello,4) (java,1) (world,1) (flink,1) ```
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1769.html
上一篇
Kibana中的KQL语法
下一篇
04.Flink本地模式部署
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
SpringCloud
链表
栈
人工智能
CentOS
Git
Hadoop
数据结构和算法
Nacos
Sentinel
MyBatisX
Ubuntu
高并发
二叉树
工具
Redis
Hive
Jenkins
nginx
Map
Hbase
并发线程
JVM
VUE
Livy
gorm
NIO
序列化和反序列化
Java
Spark RDD
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞