一、Flink运行时各个组件介绍
Flink 运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager),以及分发器(Dispatcher)。因为 Flink 是用 Java 和 Scala 实现的,所以所有组件都会运行在Java 虚拟机上。接下来对各个组件的功能进行简单介绍i。
作业管理器(JobManager)
作业管理器它会控制一个应用程序的主进程,每个应用程序都会被一个不同的JobManager 所控制执行。JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager 上。而在运行过程中,JobManager 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
资源管理器(ResourceManager)
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是 Flink 中定义的处理资源单元。Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及 standalone 部署。当 JobManager 申请插槽资源时,ResourceManager会将有空闲插槽的 TaskManager 分配给 JobManager。如果 ResourceManager 没有足够的插槽来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager进程的容器。另外,ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。
任务管理器(TaskManager)
Flink 中的工作进程。通常在 Flink 中会有多个 TaskManager 运行,每一个 TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了 TaskManager 能够执行的任务数量。启动之后,TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager 就会将一个或者多个插槽提供给 JobManager 调用。JobManager 就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个 TaskManager 可以跟其它运行同一应用程
序的 TaskManager 交换数据。
分发器(Dispatcher)
可以跨作业运行,它为应用提交提供了 REST 接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个 JobManager。由于是 REST 接口,所以 Dispatcher 可以作为集群的一个 HTTP 接入点,这样就能够不受防火墙阻挡。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中可能并不是必需的,这取决于应用提交运行的方式。
二、Flink任务提交的流程
上面介绍完了flink运行时架构的各个组件对应的功能,接下来我们看一下他们互相之间都是怎么协作运转的。
一般的交互协作图:
提交一个flink程序到分发器,分发器(Dispatcher)将提交的应用移交给任务管理器(JobManager),然后向资源管理器(ResourceManager)请求执行任务必要的资源,接着资源管理器会将有空闲插槽的 TaskManager 分配给 JobManager,然后JobManager会将要在插槽中执行的任务提交给TaskManager 。
具体到Yarn上的交互图
Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming 的任务),也可以不结束并等待结果返回。
JobManager 主 要 负 责 调 度 Job 并 协 调 Task 做 checkpoint, 职 责 上 很 像Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。
TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理
(1)客户端APP通过分发器提供REST 接口,将作业提交给JobManager
(2)分发器启动JobMaster, 将作业(包含JobGraph)提交给JobMaster
(3)JobMaster 将JobGraph 解析为ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)
(4)资源管理器协调资源
(5)Taskmanager 启动只会向ResourceManager 注册自己的可以slots
(6)资源管理器通知TaskManager 为新的作业提供slots
(7)TaskManager连接到对应的JobMaster,提供slots
(8)JobMaster 将需要执行的任务分发给TaskManager
(9)TaskManager 执行任务,互相之间可以交换
三、Flink任务调度原理
Flink 集 群 启 动 后 , 首 先 会 启 动 一 个 JobManger 和一个或多个的TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。
TaskManger 与 Slots
一个 Worker(TaskManager)就是一个 JVM 进程,内部拥有一个或多个 Task Slot 进一步细分进程的 CPU 资源。
Slot 是指 TaskManager 最大能并发执行的能力->taskmanager.numberOfTaskSlots->ys
parallelism 是指 TaskManager 实际使用的并发能力->parallelism.default->p
同一Slot 中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离
假设一共有 3 个 TaskManager,每一个 TaskManager 中的分配 3 个TaskSlot,也就是每个 TaskManager 可以接收 3 个 task,一共 9 个 TaskSlot,如果我们设置 parallelism.default=1,即运行程序默认的并行度为 1,9 个 TaskSlot 只用了 1个,有 8 个空闲,因此,设置合适的并行度才能提高效率
程序与数据流(DataFlow)
所有的 Flink 程序都是由三部分组成的: Source 、Transformation 和 Sink。Source 负责读取数据源,Transformation 利用各种算子进行处理加工,Sink 负责输出。
Flink 上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分。每一个 dataflow 以一个或多个 sources 开始以一个或多个 sinks 结束。dataflow 类似于任意的有向无环图(DAG)。
程序中的转换运算(transformations)跟 dataflow 中的(operator)是一一对应的关系,但有时候,一个 transformation 可能对应多个 operator。
并行度(Parallelism)
Flink 程序的执行具有并行、分布式的特性。一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
Stream 在算子之间传输数据的形式有两种模式。
One-to-one:stream(比如在 source 和 map operator 之间)维护着分区以及元素的顺序。那意味着 map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap 等算子都是 one-to-one 的对应关系。
Redistributing:stream(map()跟 keyBy/window 之间或者 keyBy/window 跟 sink之间)的分区会发生改变。每一个算子的子任务依据所选择的 transformation 发送数据到不同的目标任务。例如,keyBy() 基于 hashCode 重分区、broadcast 和 rebalance会随机重新分区,这些算子都会引起 redistribute 过程
任务链(Operator Chains)
相同并行度的 one to one 操作,Flink 这样相连的算子链接在一起形成一个 task,原来的算子成为里面的一部分。将算子链接成 task 是非常有效的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。
更具体实践 。。。
1》怎样从Flink程序得到任务?
2》一个流处理程序,到底包含多少个任务?
3》最终执行任务,需要占用多少slot?
1.数据流图(Dataflow Graph)
Flink 是流式计算框架,它的程序结构其实就是定义了一连串的操作,每个数据输入之后都会调用每一个步骤一次计算,每一个操作都叫做”算子”(operator),可以理解为我们的程序是一串算子构成的管道,数据则像水流一样有序地流过。
所有的程序都由三部分组成。source(源算子,负责读取数据)、Transformation(转换算子,负责处理数据)、Sink(下沉子算子,负责数据的输出)。
在运行时,Flink 程序会被映射成所有算子按照逻辑顺序拼接成一张图,这种图被称为逻辑数据流(数据流图)。数据流图类似于任意的有向无环图(DAG-Directed Acyclic Graph)。图中的每一条数据流以一个或者多个source 开始,以一个或者多个sink 结束。
代码中,除了source和sink,其他可以被称为代码中如果返回值是 SingleOutputStreamOperator 的API 就可以称为一个算子,否则不会计算为算子(只能理解为中间的转换操作),比如:keyBy 返回值是 KeyedStream 就不是一个算子;org.apache.flink.streaming.api.datastream.KeyedStream#sum(int) 就是一个算子。
常见的算子:
source:读txt、socket、自定义输入等
transformation:flatMap、map、filter、process 处理操作,还有sum、max、maxBy、min、minBy 等也都是聚合算子(名字都是Keyed Aggregation)
sink: print、printToErr、writeAsText、writeAsCsv 等
org.apache.flink.streaming.api.datastream.DataStream 源码可以看出每个算子都有一个特定名称:
View Code
org.apache.flink.streaming.api.datastream.KeyedStream 针对集合的算子API:
View Code
比如之前的socket 例子:show plan 显示几个框可以理解为几个任务(一个任务可能有多个子任务,子任务的数量可以理解为并行度),两者为什么会这么合并在了解合并算子链(并行度相同的一对一算子会合并算子链)后就会明白。
(1) 并行度设置为2的时候show plan 计划如下:
(2)并行度设置为1 的时候show plan 如下
2. 并行度
1. 什么是并行计算
可以理解为,我们期望的是“数据并行”。也就是多条数据同时到来,我们可以同时读入,并且在不同的节点进行flatMap 等操作。
2. 并行子任务和并行度
为了实现并行操作,我们把一个算子操作,复制多分到多个节点,数据来了之后到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的子任务,再将他们分发到不同节点,就实现了真正的并行计算。
在Flink 执行过程中,每个算子(operator)可以包含一个或多个子任务,这些子任务在不同的线程、物理机或者容器中完全独立地执行。
一个特定算子的子任务的个数被=称之为并行度(parallelism)。一个流程序的并行度可以认为是其所有算子中最大的并行度。一个程序中不同的算子可能具有不同的并行度。
如下图:
当前数据流中有source、map、window、sink 四个算子,除最后sink外其他的算子的并行度都为2.整个程序包含7个子任务,至少需要两个分区来执行,可以认为这段程序的并行度就是2。
3. 并行度设置
设置按照最近原则,最先设置的优先生效。
(1)代码中设置
// 全局设置 executionEnvironment.setParallelism(3); // 对单个算子设置 txtDataSource .flatMap((String line, Collector<String> words) -> { Arrays.stream(line.split(" ")).forEach(words::collect); }).setParallelism(3)
(2)提交时设置(webui也可以设置)
./flink-1.13.0/bin/flink run -c cn.qz.SocketStreamWordCount -p 2 ./study-flink-1.0-SNAPSHOT.jar
(3) 在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:
parallelism.default: 1
这些参数都不是必须的,会按照由近到远的原则匹配(单个设置<env<-p<默认)。需要注意,有的算子即使设置了并行度也不会生效,比如读取socket 文本流的算子本身就不支持并行。在开发环境中,默认的并行度为当前机器的CPU核数(默认的任务槽的数量也是CPU核数)。
4. 测试例子
还是以socket 流为例子。
(1) 提交时选择并行度为2, 查看任务:
如上。 name 是每个算子的名称,我们在源码中可以看到为这些算子起的名称。 后面有子任务的数量。
(2) 7777 端口输入
[root@k8smaster01 conf]# nc -l 7777 hello china and beijing what is your name? my name is qz.
(3) 查看输出任务的详细信息
查看子任务信息:
(4) 查看两个子任务所在机器的标准输出: 可以看出输出前面加的序号(可以理解为分区序号、任务插槽号)
第一个子任务所在机器输出:
第二个机器:
这里自己理解最大并行度就是一个任务最多能分到几个资源(任务槽),任务会同时并行处理,可以理解为在不同的机器直接并行处理(至于每个机器并行几个线程跑,后面任务槽进行研究,目前是每个机器一个任务槽)。
补充: 针对1、2 进行的测试
比如如下程序:
package cn.qz; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class SocketStreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建执行环境(流处理执行环境) StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文件 DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("192.168.13.107", 7777); // 3. 转换数据格式 SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource .flatMap((String line, Collector<String> words) -> { Arrays.stream(line.split(" ")).forEach(words::collect); }) .returns(Types.STRING) .map(word -> Tuple2.of(word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要显示的声明类型信息 // 4. 分组 KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0); // 5. 求和 SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1); // 6. 打印 sum.print(); System.out.println("========"); // 7. 求最大 SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = singleOutputStreamOperator.keyBy(t -> t.f0).max(1); tuple2SingleOutputStreamOperator.printToErr(); // 8. 执行 executionEnvironment.execute(); } }
debug 查看其相关对象:可以看到默认的并行度和相关的转换
3. 算子链
我们观察webui 给出的计划图发现,节点数量和代码中的算子不是一一对应的。 有的节点会把多个任务连接在一起合并成一个大任务。下面解释其原因。
1.算子间的数据传输
一个数据流在算子之间传输数据的形式可以是一对一的直通模式(forwarding),也可以是打乱的重分区(redistributing)模式,具体是哪一种取决于算子的种类
(1)一对一直通
这种模式下,数据流维护着分区以及元素的顺序。比如图中的source和map 算子,source 读取完之后可以直接发给map 做处理,不需要重新分区,也不需要调整数据的顺序。这意味着map算子的子任务,看到的元素个数和顺序跟source 算子的子任务产生的完全一样,保证一对一的关系。map、filter、flatMap等算子都是这种一对一的关系。
(2)重分区
这种模式下,数据流的分区会发生改变。比如图中的map和后面的keyBy/window/apply算子、以及keyBy/window算子和sink 算子之间。
每一个算子的子任务会根据数据传输的策略,把数据发送到不同的下游目标任务。例如:keyBy是分组操作,本质上是基于key进行hash后重分区;比如从并行度为2的window 算子传递到并行度为1 的sink,这时的数据传输方式是再平衡(rebalance),会把数据均匀的向下游子任务分发出去。这些传输方式都会引起重分区(redistribute)。
2.合并算子链
并行度相同的一对一算子操作,可以直接连接在一起形成一个大的任务(task),这样原来算子就成了合并任务里的一部分。每个任务被一个线程执行。这就是合并算子链。合并后如下图:
合并后就有五个任务,由五个线程并行执行。合并算子链可以减少线程之间的转换,提升吞吐量。
Flink 默认按照算子链的原则进行合并,如果想禁止合并或者自定义,可以在代码对算子做一些特定设置:
// 禁用算子链 SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource .flatMap((String line, Collector<String> words) -> { Arrays.stream(line.split(" ")).forEach(words::collect); }).disableChaining() // 从当前算子开始新链 SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource .flatMap((String line, Collector<String> words) -> { Arrays.stream(line.split(" ")).forEach(words::collect); }).startNewChain()
4. 作业图与执行图
Flink任务调度执行的图,其按照顺序分为四层:
逻辑流图-》作业图-》执行图-》物理图
比如以soclet 为例子,其转换过程如下:
1. 逻辑流图
图中的节点一般对应算子操作。客户端完成的。
2. 作业图
数据流图经过优化就是作业图。主要的优化为将符合条件的节点连接在一起合并成一个任务节点,行成算子链。也是客户端完成的,作业提交时传递给JobMaster。
3. 执行图
JobMaster 收到JobGraph后用它生成执行图。执行图是JobGraph的并行化版本,是调度处最核心的数据结构。和作业图区别是对子任务进行了拆分,并明确任务之间传递数据的方式。
4. 物理图
JobMaster 生成执行图后,将它分发给TaskManager。TasjkManager 根据执行图部署任务,最终的物理执行过程行成物理图。
物理图在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。
5. 任务与任务槽
在之前的测试中,我们三个taskManager的slots任务槽为3。提交任务cn.qz.SocketStreamWordCount 的时候选择的并行度为2, 显示的任务应该是有5个(1+2+2),但是却占据了两个任务槽,下面解释其原因。
1.任务槽
flink中一个worker(taskmanager)是一个JVM进程,既然是进程就可以启动多个独立的线程来执行多个子任务(subtask)。
flink 中的多个独立的执行任务的线程数量就是任务槽,默认为1,可以进行修改。修改 flink-conf.yaml,如下修改后每个节点变为4个槽,总共3个节点就是12个slot。
taskmanager.numberOfTaskSlots: 4
需要注意的是,slot目前用来隔离内存,不涉及cpu的隔离。具体应用需要根据cpu 核心数进行调整。
2.任务对任务槽的共享
默认情况下,flink 允许子任务共享slot。所以2个子任务两个slot(最大的子任务数量)就可以完成。
不同任务节点的子任务可以共享一个slot, 换句话说同一个任务的多个子任务必须放置在不同的slot。比如并行度为2,可能的结果就是
到这里可能有个疑问就是既然想要最大利用计算资源,为什么又在一个任务槽并行处理多个任务了(一个线程干多件事)?
原因是: 不同的任务对资源占用不同,比如source、map 、sink可能处理时间极短,而window等转换操作时间长(资源密集型任务)。如果每个任务一个slot,造成的现象就是上游的source(等待下游的window任务发通知而阻塞,相当于背压)和下游的sink可能长时间浪费,但是windows却忙死,出现资源利用不平衡。于是出现了任务共享,将资源密集型和非密集型放到一个slot,这样就可以自行分配对资源占用的比例。
如果想某个任务独占一个slot,或者只有某部分算子共享slot,可以设置共享组:只有属于一个slot组的子任务才会开启共享slot,不同组之间的任务必须分配到不同的slot 数量。
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource .flatMap((String line, Collector<String> words) -> { Arrays.stream(line.split(" ")).forEach(words::collect); }).slotSharingGroup("1")
3.任务槽和并行度的关系
整个流处理程序的并行度,应该是所有算子中并行度最大的那个,也就是所需要的slot 数量(这种是不指定插槽组的情况)。
https://developer.aliyun.com/profile/expert/fkjpbdp6zbdkm?isInAliyunApp=false