李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
12.Flink流处理API之Environment
Leefs
2022-01-10 PM
1690℃
0条
### 前言 **流处理基本步骤:** ![12.Flink流处理API之Environment01.png](https://lilinchao.com/usr/uploads/2022/01/2702441370.png) (1)创建环境(类似于spark里的上下文SparkContext); (2)添加数据来源Source; (3)对数据进行Transform处理; (4)添加输出Sink。 ### 一、Environment分类 **1、批处理** + **ExecutionEnvironment** ![12.Flink流处理API之Environment02.png](https://lilinchao.com/usr/uploads/2022/01/3748122928.png) + **LocalEnvironment**:本地模式执行 + **RemoteEnvironment** :提交到远程集群执行 + **CollectionEnvironment** :集合数据集模式执行 + **OptimizerPlanEnvironment**: 不执行作业,仅创建优化的计划 + **PreviewPlanEnvironment**: 提取预先优化的执行计划 + **ContextEnvironment**: 用于在客户端上远程执行 + **DetachedEnvironment**: 用于在客户端上以分离模式进行远程执行 **2、流处理** + **StreamExecutionEnvironment** ![12.Flink流处理API之Environment03.png](https://lilinchao.com/usr/uploads/2022/01/1996144039.png) **3、Table 模式处理** + **TableEnvironment** TableEnvironment 是用来创建 Table & SQL 程序的上下文执行环境 ,也是 Table & SQL 程序的入口,Table & SQL 程序的所有功能都是围绕 TableEnvironment 这个核心类展开的。 TableEnvironment 的主要职能包括:对接外部系统,表及元数据的注册和检索,执行SQL语句,提供更详细的配置选项。 ![12.Flink流处理API之Environment04.png](https://lilinchao.com/usr/uploads/2022/01/846876626.png) + **BatchTableEnvironment**: Batch 处理模式的 Table ,主要处理 DataSet 与 Table 之间操作 + **StreamTableEnvironment**: streaming 处理模式的 Table , 主要处理 DataStream 与 Table 之间操作 ### 二、Environment常用方法 **2.1 getExecutionEnvironment** **语法** ```scala val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment ``` **说明** 创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境; 如果从命令行客户端调用程序以提交到集群, 则此方法返回此集群的执行环境, 也就是说, getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境, 是最常用的一种创建执行环境的方式。 如果没有设置并行度, 会以 flink-conf.yaml 中的配置为准, 默认是 1。 ```yaml parallelism.default: 1 ``` **2.2 createLocalEnvironment** **语法** ```scala val env = ExecutionEnvironment.createLocalEnvironment(1); val env = StreamExecutionEnvironment.createLocalEnvironment(1) ``` **说明** 返回本地执行环境,需要在调用时指定默认的并行度。 **2.3 createRemoteEnvironment** **语法** ```scala val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar") val env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar"); ``` **说明** 返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。 *附参考文章链接:* *https://blog.csdn.net/xiaohulunb/article/details/103030437*
标签:
Flink
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1830.html
上一篇
11.Flink并行度和任务链
下一篇
13.Flink流处理API之Source
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
gorm
Azkaban
Netty
Golang
LeetCode刷题
并发线程
BurpSuite
Typora
Spark
Git
JavaWeb
并发编程
SpringCloudAlibaba
数据结构
FileBeat
Java
Map
Flume
散列
Spring
国产数据库改造
数学
GET和POST
二叉树
SpringBoot
Thymeleaf
Zookeeper
机器学习
FastDFS
Shiro
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭