李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
07.DStream优雅关闭
Leefs
2021-10-26 PM
1061℃
0条
[TOC] ### 前言 流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。 ### 一、概念 **非优雅关闭两种方式:** + `kill -9 processId` + `yarn -kill applicationId` **弊端:** 由于Spark Streaming是基于micro-batch机制工作的,按照间隔时间生成RDD,如果在间隔期间执行了暴力关闭,那么就会**导致这段时间的数据丢失**,虽然提供了`checkpoint`机制,可以使程序启动的时候进行恢复,但是当出现程序发生变更的场景,必须要删除掉`checkpoint`,因此这里就会有丢失的风险。 **优雅关闭优点:** **保证数据准确,不允许数据丢失** ### 二、实现优雅关闭方式 #### 2.1 方式一:全手动操作关闭 1.4之后,spark提供了一个新的关于优雅关闭的配置,需要设置配置如下: ```scala sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true"); ``` **操作步骤:** (1)通过Hadoop 8088页面找到运行的程序 (2)打开spark ui的监控页面 (3)打开executor的监控页面 (4)登录liunx找到驱动节点所在的机器ip以及运行的端口号 (5)然后执行一个封装好的命令 ```bash sudo ss -tanlp | grep 5555 |awk '{print $6}'|awk -F, '{print $2}' | sudo xargs kill -15 ``` #### 2.2 方式二:通过第三方做消息通知 **操作步骤:** (1)每隔一段时间,扫描HDFS上某一个文件; (2)如果发现这个文件存在,就调用StreamContext对象stop方法,自己优雅的终止自己; (3)如果需要关闭,登录上有hdfs客户端的机器,然后touch一个空文件到**指定目录**,然后等到间隔 的扫描时间到之后,发现有文件存在,就知道需要关闭程序了。 这里HDFS可以换成redis,zk,hbase,db都可以,这里唯一的问题就是依赖了外部的一个存储系统来达到消息通知的目的。 **示例代码** ```scala import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.streaming.{StreamingContext, StreamingContextState} /** * @author lilinchao * @date 2021/10/26 * @description 1.0 **/ class MonitorStop(ssc: StreamingContext) extends Runnable { override def run(): Unit = { val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "root") while (true) { try Thread.sleep(5000) catch { case e: InterruptedException => e.printStackTrace() } //获得运行状态 val state: StreamingContextState = ssc.getState //监听HDFS中stopSpark目录是否存在文件,如果存在触发关闭操作 val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark")) if (bool) { //判断是否处于运行状态 if (state == StreamingContextState.ACTIVE) { ssc.stop(stopSparkContext = true, stopGracefully = true) //关闭进程 System.exit(0) } } } } } ``` **运行测试** ```scala import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @author lilinchao * @date 2021/10/26 * @description 1.0 **/ object SparkTest { def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = { val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => { //当前批次内容的计算 val sum: Int = values.sum //取出状态信息中上一次状态 val lastStatu: Int = status.getOrElse(0) Some(sum + lastStatu) } val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest") //设置优雅的关闭 sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true") val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.checkpoint("./ck") val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999) val word: DStream[String] = line.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = word.map((_, 1)) val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update) wordAndCount.print() ssc } def main(args: Array[String]): Unit = { val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC()) new Thread(new MonitorStop(ssc)).start() ssc.start() ssc.awaitTermination() } } ``` *注意:下次启动前,需要清空stopSpark目录下写入的文件。* #### 2.3 方式三:对外暴露接口,提供关闭功能 > 内部暴露一个socket或者http端口用来接收请求,等待触发关闭流程序 **代码示例** http服务方式 ```scala def httpServer(port:Int,ssc:StreamingContext)={ val server = new Server(port) val context = new ContextHandler() context.setContextPath("/shutdown") context.setHandler( new CloseStreamHandler(ssc) ) server.setHandler(context) server.start() } class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler { override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={ ssc.stop(true,true) response.setContentType("text/html; charset=utf-8"); response.setStatus(HttpServletResponse.SC_OK); val out = response.getWriter(); baseRequest.setHandled(true); } } ``` *附:参考文章链接* *https://www.jianshu.com/p/1cf035bb6112* *https://www.jianshu.com/p/cb072b98c83f*
标签:
Spark
,
Spark Streaming
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1593.html
上一篇
06.DStream输出
下一篇
Spark Core案例实操(一)
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Shiro
链表
gorm
SpringCloud
容器深入研究
Spark SQL
Jenkins
Elasticsearch
散列
Hive
Zookeeper
算法
哈希表
ClickHouse
序列化和反序列化
MyBatis-Plus
Java阻塞队列
数据结构
国产数据库改造
锁
Livy
VUE
RSA加解密
Http
持有对象
FastDFS
Spark RDD
Java编程思想
SpringCloudAlibaba
Map
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞