Flink基本架构
Flink基本架构
Flink基本架构
处理无界和有界数据
任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。
数据可以被作为 无界 或者 有界 流来处理。
- 无界流:有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
- 有界流:有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理
Apache Flink 擅长处理无界和有界数据集 。精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。
Flink 中的 API
Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。
- Flink API 最底层的抽象为 有状态实时流处理 。
其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。
- Flink API 第二层抽象是 Core APIs 。
实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。
- Flink API 第三层抽象是 Table API 。
Table API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。
表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用。
- Flink API 最顶层抽象是 SQL 。
这层抽象在语义和程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。
部署应用到任意地方
Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 Hadoop YARN、 Apache Mesos 和 Kubernetes,但同时也可以作为独立集群运行。
Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。
部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成。
架构体系
在Flink整个软件架构体系中,同样遵循着分层的架构设计理念,在降低系统耦合度的同时,也为上层用户构建Flink应用提供了丰富且友好的接口。
整个Flink的架构体系基本上可以分为三层,由上往下依次是API&Libraries层、Runtime核心层以及物理部署层。
API&Libraries层
作为分布式数据处理框架,Flink同时提供了支撑流计算和批计算的接口,同时在此基础之上抽象出不同的应用类型的组件库,如基于流处理的CEP(复杂事件处理库)、SQL&Table库和基于批处理的FlinkML(机器学习库)等、Gelly(图处理库)等。
API层包括构建流计算应用的DataStream API和批计算应用的DataSet API,两者都提供给用户丰富的数据处理高级API,例如Map、FlatMap操作等,同时也提供比较低级的Process Function API,用户可以直接操作状态和时间等底层数据。
Runtime核心层
该层主要负责对上层不同接口提供基础服务,也是Flink分布式计算框架的核心实现层,支持分布式Stream作业的执行、JobGraph到ExecutionGraph的映射转换、任务调度等。将DataSteam和DataSet转成统一的可执行的Task Operator,达到在流式引擎下同时处理批量计算和流式计算的目的。
物理部署层
该层主要涉及Flink的部署模式,目前Flink支持多种部署模式:本地、集群(Standalone/YARN)、云(GCE/EC2)、Kubenetes。Flink能够通过该层能够支持不同平台的部署,用户可以根据需要选择使用对应的部署模式。
基本架构
Flink整个系统主要由两个组件组成,分别为JobManager和TaskManager,Flink架构也遵循Master-Slave架构设计原则,JobManager为Master节点,TaskManager为Worker(Slave)节点。所有组件之间的通信都是借助于Akka Framework,包括任务的状态以及Checkpoint触发等信息。
Client客户端
客户端负责将任务提交到集群,与JobManager构建Akka连接,然后将任务提交到JobManager,通过和JobManager之间进行交互获取任务执行状态。
客户端提交任务可以采用CLI方式或者通过使用Flink WebUI提交,也可以在应用程序中指定JobManager的RPC网络端口构建ExecutionEnvironment提交Flink应用。
JobManager
JobManager负责整个Flink集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中TaskManager上TaskSlot的使用情况,为提交的应用分配相应的TaskSlots资源并命令TaskManger启动从客户端中获取的应用。
JobManager相当于整个集群的Master节点,且整个集群中有且仅有一个活跃的JobManager,负责整个集群的任务管理和资源管理。JobManager和TaskManager之间通过Actor System进行通信,获取任务执行的情况并通过Actor System将应用的任务执行情况发送给客户端。
同时在任务执行过程中,Flink JobManager会触发Checkpoints操作,每个TaskManager节点收到Checkpoint触发指令后,完成Checkpoint操作,所有的Checkpoint协调过程都是在Flink JobManager中完成。
当任务完成后,Flink会将任务执行的信息反馈给客户端,并且释放掉TaskManager中的资源以供下一次提交任务使用。
TaskManager
TaskManager相当于整个集群的Slave节点,负责具体的任务执行和对应任务在每个节点上的资源申请与管理。客户端通过将编写好的Flink应用编译打包,提交到JobManager,然后JobManager会根据已经注册在JobManager中TaskManager的资源情况,将任务分配给有资源的TaskManager节点,然后启动并运行任务。
TaskManager从JobManager接收需要部署的任务,然后使用Slot资源启动Task,建立数据接入的网络连接,接收数据并开始数据处理。同时TaskManager之间的数据交互都是通过数据流的方式进行的。
可以看出,Flink的任务运行其实是采用多线程的方式,这和MapReduce多JVM进程的方式有很大的区别Fink能够极大提高CPU使用效率,在多个任务和Task之间通过TaskSlot方式共享系统资源,每个TaskManager中通过管理多个TaskSlot资源池进行对资源进行有效管理。
核心概念
作业管理器(JobManager)
Flink集群中任务管理和调度的核心,是控制应用执行的主进程,又包含三个不同的组件。
- JobMaster JobMaster是JobManager中最核心的组件,负责处理单独的Job,JobMaster和具体的Job是一一对应的
在Job提交时,JobMaster会先接收到要执行的应用,“应用”一般是客户端提交来的,Jar包、数据流图、作业图
JobMaster会把JobGraph转化成一个物理层面的数据流图(ExecutionGraph),它包含了所有可以并发执行的任务,JobMaster也会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源,一旦获取到足够的资源,就会将数据流图发到真正运行它们的TaskManager上。
JobMaster也会负责所有需要中央协调的操作
- 资源管理器(ResourceManager) 主要负责资源的分配和管理,在Flink集群中只有一个。所谓“资源”,主要是指TaskManager的任务槽(task slots),任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源,每一个Task都需要分配到一个slot上执行
Flink内置的RM和其他资源管理平台(比如YARN)的RM不同,针对不同的环境环境和资源管理平台(比如Standalone、YARN)有不同的具体实现。在Standalone部署时,因为TaskManager是单独启动的,没有Per-Job模式,RM只能分发可以TaskManager的任务槽,不能单独启动新TaskManager
在有资源管理平台时,当新的作业申请资源,RM会将空闲槽位的TaskManager分配给JobMaster。如果RM没有足够的任务槽,它还可以向资源提供平台发起会话,请求提供启动TaskManager进程的容器,另外,RM还负责停掉空闲的TaskManager,释放计算资源
- 分发器(Dispatcher) 主要负责提供一个REST接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的JobMaster组件。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
任务管理器(TaskManager)
Flink中的工作进程,数据流的具体计算是它来做的,称为“Worker”。Flink集群中必须至少有一个TaskManager,考虑分布式计算,会有多个。每一个TaskManager都包含了一定数量的任务槽(task slots),slot是资源调度的最小单位,slot的数量限制了TaskManager能够并行处理的任务数量。
启动之后,TaskManager会想资源管理器注册它的slots,收到资源管理器的指令后,TaskManager就会将一个或者多个槽位提供给JobMaster调用,JobMaster就可以分配任务来执行。
在执行过程中,TaskManager可以缓存数据,还可以跟其他运行同一应用的TaskManager交换数据
并行度(Parallelism)
对于Spark而言,是把根据程序生成的DAG划分阶段(stage)、进而分配任务,Spark基于MapReduce架构的思想是“数据不动代码动”,Flink类似于“代码不动数据流动”,原因就在于流失数据本身是连续到来的。
在大数场景下,我们都是依靠分布式架构做并行计算,从而提高数据吞吐量,我们可以将不同的算子操作任务,分配到不同的节点上执行,对任务做了分摊进而实现并行处理。但这种“并行”并不彻底,因为算子之间是由执行顺序的,对一条数据来说必须依次执行,而一个算子在同一时刻只能处理一个数据。
对于“任务并行”,我们真正关心的是“数据并行”,多条数据同时到来,我们应该可以同时读入,同时在不同节点执行flatMap操作
怎么实现数据并行?我们把一个算子操作“复制”多份到多个节点,数据来了之后到其中任意一个执行。这样一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks)
在Flink执行过程中,每一个算子可以包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。一个特定算子的子任务的个数被称之为并行度(parallelism),这样包含并行子任务的数据流就是并行数据流,它需要多个分区来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度,一个程序中,不同的算子可能具有不同的并行度。
作业图(JobGraph)与执行图(ExecutionGraph)
Flink程序直接映射成数据流图(逻辑流图),到具体的执行环节时,我们还有考虑合并子任务的分配、数据在任务间的传输,以及合并算子链的优化。Flink中任务调度执行的图,由生成顺序可以分成四层:
逻辑流图(StreamGraph) -> 作业图(JobGraph) -> 执行图(ExecutionGraph) -> 物理图(Physical Graph)
逻辑流图(StreamGraph)
用户通过DataStream API编写的代码生成的最初DAG图,用来表示程序的拓扑结构。 逻辑流图中的节点对应代码中的四步算子操作: 源算子Source(socketTextStream())→ 扁平映射算子Flat Map(flatMap()) → 分组聚合算子Keyed Aggregation(keyBy/sum()) →输出算子 Sink(print())
作业图(JobGraph)
提交给JobManager的数据结构,确定了当前作业中所有任务的划分。 主要的优化为:将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链。
执行图(ExecutionGraph)
JobMaster收到JobGraph后,会根据它生成执行图,直线图是JobGraph的并行划版本,是调度层最核心的数据结构 与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分,明确了任务间数据传输的方式
物理图(Physical Graph)
JobMaster将生成的执行图分发给TaskManager,各个TaskManager会根据执行图部署任务,最终的物理执行过程会形成物理图。 物理图主要就是在执行图的基础上进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager就可以对传递来的数据进行处理计算。
任务(Tasks)和任务槽(Task Slots)
任务槽(Task Slots)
在TaskManager上对每个任务运行所占用的资源作出明确的划分。 每个任务槽表示了TaskManager永远计算资源的一个固定大小的子集。
假如一个TaskManager 有三个 slot,那么它会将管理的内存平均分成三份,每个slot独自占据一份。这样一来,我们在slot上执行一个子任务时,相当于划定了一块内存“专款专用”,就不需要跟来自其他作业的任务去竞争内存资源了。所以现在我们只要 2 个 TaskManager,就 可以并行处理分配好的 5 个任务了
任务槽数量的设置
配置文件设定TaskManager的slot数量:taskmanager.numberOfTaskSlots: 8
通过调整slot的数量,我们就可以控制子任务之间的隔离级别,具体如下: 如果一个TaskManager只有一个slot,那将意味着每个任务都会运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的) 而一个TaskManager设置多个slot则意味着多个子任务可以共享同一个JVM。 它们的区别在于:前者任务之间完全独立运行,隔离级别更高、彼此间的影响可以降到最小;而后者在同一个JVM进程中运行的任务,将共享TCP连接和心跳消息,也可能共享数据集和数据结构,这就减少了每个任务的运行开销,在降低隔离级别的同时提升了性能
需要注意的是,slot目前仅仅用来隔离内存,不会涉及CPU的隔离。在具体应用时,可以将slot数量配置为机器的CPU核心数,尽量避免不同任务之间对CPU的竞争。这也是开发环境默认并行度设为机器CPU数量的原因
任务对任务槽的共享
Flink允许子任务共享slot,只要属于同一个作业,对于不同任务节点的并行子任务,就可以放到同一个slot上执行。 对于第一个任务节点source→map,它的6个并行子任务必须分到不同的slot上(如果在同一slot就没法数据并行了),而第二个任务节点 keyBy/window/apply 的并行子任务却可以和第一个任务节点共享slot。 最终将变成:每个任务节点的并行子任务一字排开,占据不同的slot;而不同的任务节点的子任务可以共享slot。一个slot中,可以将程序处理的所有任务都放在这里执行,我们把它叫作保存了整个作业的运行管道。 资源密集型(intensive)任务:涉及大量的数据、状态存储和计算,例如window算子所做的窗口操作。 slot共享:将资源密集型和非密集型的任务同时放到一个slot中,它们就可以自行分配对资源占用的比例,从而保证最重的活平均分配给所有的TaskManager;允许我们保存完整的作业管道,即使某个TaskManager出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行。
Flink默认运行slot共享,如果希望某个算子对应的任务完全独占一个slot,或者只有某一部分算子共享slot,设置slot共享组手动指定:.map(word -> Tuple2.of(word, 1L)).slotSharingGroup(“1”); 此时,属于同一个slot 共享组的子任务,才会开启 slot 共享;不同组之间的任务是完全隔离的,必须分配到不同的slot 上。在这种场景下,总共需要的slot数量,就是各个slot共享组最大并行度的总和
任务槽好并行度的关系
task slot:静态概念,指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置 并行度:动态概念,TaskManager运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置
并行度 <= task slot的总数,程序正常执行;并行度 >= task slot的总数,程序等待资源管理器分配更多的资源