跳转至

Flink 面试题

Flink

Apache Flink 是一个开源的基于流的有状态计算框架。它是分布式地执行的,具备低延迟、高吞吐的优秀性能,并且非常擅长处理有状态的复杂计算逻辑场景

  • Flink 是一个全面的流处理和批处理框架,提供了低延迟和高吞吐量的实时数据处理能力
  • 而Hadoop更侧重于离线批处理。

数据流

所有产生的数据都天然带有时间概念,把事件按照时间顺序排列起来,就形成了一个事件流,也被称作数据流。

流批一体

首先必须先明白什么是有界数据无界数据

Flink 流批一体

  • 有界数据,就是在一个确定的时间范围内的数据流,有开始,有结束,一旦确定就不会再改变,一般批处理用来处理有界数据,如上图的 bounded stream。
  • 无界数据,就是持续产生的数据流,数据是无限的,有开始,无结束,一般流处理用来处理无界数据。如图 unbounded stream。

Flink 的设计思想是以为核心,批是流的特例,擅长处理**无界和有界数据, Flink 提供精确的时间控制能力和有状态计算机制,可以轻松应对无界数据流,同时提供窗口处理有界数据流。所以被成为流批一体。

容错能力

在分布式系统中,硬件故障、进程异常、应用异常、网络故障等异常无处不在,Flink 引擎必须保证故障发生后不仅可以重启应用程序,还要确保其内部状态保持一致,从最后一次正确的时间点重新出发

Flink提供 集群级容错 和 应用级容错 能力

  • 集群级容错: Flink 与集群管理器紧密连接,如 YARN、Kubernetes,当进程挂掉后,自动重启新进程接管之前的工作。同时具备高可用性 ,可消除所有单点故障,
  • 应用级容错:Flink 使用轻量级分布式快照,设计检查点(checkpoint)实现可靠容错。

Flink 利用检查点特性,在框架层面 提供 Exactly-once 语义,即端到端的一致性,确保数据仅处理一次,不会重复也不会丢失,即使出现故障,也能保证数据只写一次。

Flink 和 Spark Sreaming 最大的区别在于:

  • Flink 是标准的实时处理引擎,基于事件驱动,以流为核心
  • Spark Streaming 的RDD 实际是一组小批次的 RDD 集合,是微批(Micro-Batch)的模型,以批为核心

下面我们介绍两个框架的主要区别:

架构模型

  • Spark Streaming 在运行时的主要角色包括:
    • 服务架构集群和资源管理 Master / Yarn Application Master;
    • 工作节点 Work / Node Manager;
    • 任务调度器 Driver;任务执行器 Executor

Spark 运行架构

  • Flink 在运行时主要包含(图见下述 Flink 的运行架构):
    • 客户端 Client
    • 作业管理 Jobmanager
    • 任务管理 Taskmanager。

任务调度

  • Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图 DAG,Spark Streaming 会依次创建 DStreamGraph、JobScheduler。 Spark 任务调度

  • Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager 进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度,根据物理执行图部署到 Taskmanager 上形成具体的 Task 执行。 Flink 任务调度

时间机制

  • Spark Streaming 支持的时间机制有限,只支持处理时间。
  • Flink 支持了流处理程序在时间上的三个定义:事件时间 EventTime、摄入时间 IngestionTime 、处理时间 ProcessingTime。同时也支持 watermark 机制来处理滞后数据。 Flink 时间类型

容错机制

  • 对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。
  • Flink 则使用两阶段提交协议来解决这个问题。

Flink 技术架构

  • Flink 作为流批一体的分布式计算引擎,必须提供面向开发人员的 API 层
  • 同时还需要跟外部数据存储进行交互,需要连接器
  • 作业开发、测试完毕后,需要提交集群执行,需要部署层
  • 同时还需要运维人员能够管理和监控
  • 还提供图计算、机器学习、SQL等,需要应用框架层。

Flink 集群采取 Master - Slave 架构:
Master 的角色为 JobManager,负责集群和作业管理
Slave 的角色是 TaskManager,负责执行计算任务,
同时,Flink 提供客户端 Client 来管理集群和提交任务,JobManager 和 TaskManager 是集群的进程。

Flink 技术架构

  1. Client Flink 客户端是 Flink 提供的 CLI 命令行工具,用来提交 Flink 作业到 Flink 集群,在客户端中负责 StreamGraph (流图)和 Job Graph (作业图)的构建。
  2. JobManager JobManager 根据并行度将 Flink 客户端提交的 Flink 应用分解为子任务,从资源管理器 ResourceManager 申请所需的计算资源,资源具备之后,开始分发任务到 TaskManager 执行 Task,并负责应用容错,跟踪作业的执行状态,发现异常则恢复作业等。
  3. TaskManager TaskManager 接收 JobManage 分发的子任务,根据自身的资源情况管理子任务的启动、 停止、销毁、异常恢复等生命周期阶段。Flink 程序中必须有一个 TaskManager。

Flink 的核心概念主要有四个:Event Stream、State、Time 和 Snapshots

  1. Event Streams:即事件流,事件流可以是实时的也可以是历史的。Flink 是基于流的,但它不止能处理流,也能处理批,而流和批的输入都是事件流,差别在于实时与批量。
  2. State:Flink 擅长处理有状态的计算。通常的复杂业务逻辑都是有状态的,它不仅要处理单一的事件,而且需要记录一系列历史的信息,然后进行计算或者判断。
  3. Time:最主要处理的问题是数据乱序的时候,一致性如何保证。
  4. Snapshots:实现了数据的快照、故障的恢复,保证数据一致性和作业的升级迁移等。

Flink运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:作业管理器
(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager),以及分发器
(Dispatcher)。因为Flink是用Java和Scala实现的,所以所有组件都会运行在Java虚拟机上。每个组件的职责如下:

  1. 作业管理器(JobManager) 控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
    JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图
    (logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器
    (TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的
    TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点 (checkpoints)的协调。

  2. 资源管理器(ResourceManager) 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及
    standalone部署。当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。

  3. 任务管理器(TaskManager) Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给
    JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个
    TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。

  4. 分发器(Dispatcher) 可以跨作业运行,它为应用提交提供了REST接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。由于是REST接口,所以Dispatcher可以作为集群的一个HTTP接入点,这样就能够不受防火墙阻挡。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
    Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。

Flink中的任务链(Task Chaining)是什么?

任务链是将多个算子(Operators)连接在一起形成连续运行的优化技术。
它将多个算子合并为一个任务,减少了数据的序列化和网络传输开销,提高了执行效率。

  • 事件时间是数据实际生成的时间,而处理时间是数据到达 Flink 系统的时间
  • 事件时间可以通过时间戳标记数据,而处理时间是 Flink 根据数据到达的顺序生成的。

Flink 的数据处理时间特性包括事件时间(Event Time)、处理时间(Processing Time)、摄取时间(Ingestion Time)和元数据时间(Metadata Time)

通过水印(Watermark)机制来处理乱序事件。水印用于表示事件时间进度,通过设置适当的水印来处理可能到达的迟到事件。

窗口是 Flink 中用于对无限数据流进行有界处理的机制。它将无限流切分为有限的、不重叠的块,并对每个窗口进行计算。

  • 滚动窗口(Tumbling Window)
  • 滑动窗口(Sliding Window)
  • 会话窗口(Session Window)

Flink支持哪些类型的窗口函数(Window Function)?

Flink 支持常见的窗口函数,如聚合函数(sum、min、max 等)、投影函数、reduce 函数、处理函数等。
此外,Flink 还支持自定义窗口函数来实现特定的业务逻辑

  • 事件时间窗口是根据事件实际生成的时间来进行划分的窗口
  • 处理时间窗口是根据数据到达 Flink 系统的时间来进行划分的窗口

Flink的时间窗口触发器(Trigger)是什么?

时间窗口触发器用于控制何时触发计算窗口的输出。它可以基于元素数量、处理时间、水印等条件进行触发。

  • 计数触发器(Count Trigger)
  • 处理时间触发器(Processing Time Trigger)
  • 事件时间触发器(Event Time Trigger)等

水印是用于表示事件时间进度的标记。它通常与数据流中的时间戳一起使用,用于处理乱序事件和延迟数据。

Flink 的 Watermark 机制可解决乱序事件处理和延迟数据处理的问题。通过设置适当的水印来处理乱序事件和等待延迟的数据,从而使数据处理更加准确和完整。

状态后端是 Flink 用于保存和管理应用程序状态的机制。它可以存储状态到内存、文件系统或分布式存储系统(如HDFS)中。

  • 内存状态后端(Memory State Backend)
  • 文件系统状态后端(File System State Backend)
  • RocksDB 状态后端(RocksDB State Backend)

状态后端(如 RocksDB)可以对性能产生影响,因为它涉及到 IO 操作和状态数据的持久化和恢复。
选择适当的状态后端并合理配置参数可以平衡性能和状态存储的需求。

Flink 使用检查点(Checkpoint)机制实现容错。它会定期保存应用程序的状态,并在发生故障时恢复到最近的一个检查点状态。

Flink 的容错机制在大规模数据处理中具有较好的可伸缩性和性能。通过检查点机制实现的容错保证了任务的一致性,并且在故障发生时能够快速恢复。

Flink 通过检查点(Checkpoint)机制来处理流处理应用程序的版本升级。
可以使用保存的检查点状态来保持应用程序的版本兼容性,并支持升级到新版本。

可以通过使用 Kafka 的分区(Partition)和 Flink 的并行度(Parallelism)来实现事件的顺序处理和保证。

处理延迟数据可以通过使用 Flink 的水印(Watermark)机制和事件时间(Event Time)来实现。
水印可以为延迟数据提供等待时间,以便进行正确的计算。

Flink 的状态是用于在流处理和批处理中持久化保存数据的机制。它可以存储和访问计算过程中的中间结果和维护状态。

Flink 实现 Exactly-Once 语义是通过在容错检查点(Checkpoint)和幂等性操作的支持下实现的。
检查点机制用于保存应用程序的状态,幂等性操作能够确保在发生故障和重启后不会产生重复的结果。

Flink 的容错机制(例如检查点)可以在故障发生时确保数据一致性和恢复能力。
但在一些情况下可能对性能产生一定影响。因为它需要在容错时保存和恢复状态数据,通过调整检查点的频率和使用高效的状态后端可以在性能和容错之间取得平衡。

Flink 的批处理和流处理之间的区别在于数据到达的方式和处理模式。

  • 批处理是对有界数据集进行离线处理
  • 流处理是实时处理无界数据流

Flink支持哪些连接器(Connector),试举例?

  • Kafka Connector
  • JDBC Connector
  • Elasticsearch Connector

Flink的数据源(Source)可以是哪些类型?

  • Kafka
  • Kinesis
  • RabbitMQ
  • 文件系统
  • 其它
  • 自定义数据源:只需实现 Flink 的 SourceFunction 接口

Flink 的广播变量是一种在并行计算中共享数据的机制。它可以将一个数据集广播到并行任务中,以便任务共享相同的数据集。

流水线并行执行是指将多个不同算子的任务串联在一起,形成连续的任务链,从而实现更高效的执行和资源利用。通过减少数据的序列化和网络传输开销,可以获得更好的性能。

Flink 程序在执行的时候,会被映射成一个 Streaming Dataflow。一个 Streaming Dataflow 是由一组 Stream 和 Transformation Operator 组成的。在启动时从一个或多个 Source Operator 开始,结束于一个或多个 Sink Operator。
Flink 程序本质上是并行的和分布式的,在执行过程中,一个流(stream)包含一个或多个流分区,而每一个 operator 包含一个或多个 operator 子任务。操作子任务间彼此独立,在不同的线程中执行,甚至是在不同的机器或不同的容器上。
operator 子任务的数量是这一特定 operator 的并行度。相同程序中的不同 operator 有不同级别的并行度。

Flink 技术架构

一个 Stream 可以被分成多个 Stream 的分区,也就是 Stream Partition。一个 Operator 也可以被分为多个 Operator Subtask。如上图中,Source 被分成 Source1 和 Source2,它们分别为 Source 的 Operator Subtask。每一个 Operator Subtask 都是在不同的线程当中独立执行的。一个 Operator 的并行度,就等于 Operator Subtask 的个数
上图 Source 的并行度为 2。而一个 Stream 的并行度就等于它生成的 Operator 的并行度。数据在两个 operator 之间传递的时候有两种模式:

  1. One to One 模式:两个 operator 用此模式传递的时候,会保持数据的分区数和数据的 排序;如上图中的 Source1 到 Map1,它就保留的 Source 的分区特性,以及分区元素处 理的有序性。
  2. Redistributing (重新分配)模式:这种模式会改变数据的分区数;每个 operator subtask 会根据选择 transformation 把数据发送到不同的目标 subtasks,比如 keyBy() 会通过 hashcode 重新分区,broadcast() 和 rebalance() 方法会随机重新分区;

我们在实际生产环境中可以从四个不同层面设置并行度:

  • 操作算子层面(Operator Level)
  • 执行环境层面(Execution Environment Level)
  • 客户端层面(Client Level)
  • 系统层面(System Level)

需要注意的优先级:算子层面>环境层面>客户端层面>系统层面。

Flink 中的重启策略包括固定延迟间隔重启、失败率重启和无限重启。可以根据需求选择适当的重启策略。

  • Table API 基于 SQL 风格的查询语言,更适合直观的关系型操作
  • DataStream API 是一种低级别的 API,提供了更多的灵活性和对底层处理的直接控制能力

避免 Flink 中的数据倾斜可以采取一些策略,如合理选择键,使用键控状态(Keyed State),对数据进行预分区,重新分区(Repartition)等,
以将算子中的数据进行均匀分布,有效地分散和平衡数据负载可以减轻数据倾斜的问题。

Flink 的依赖管理使用 Maven 或 Gradle 作为构建工具,通过在应用程序的 build 文件中添加所需的依赖项来管理项目的依赖。构建工具会处理依赖项的下载和构建路径的配置

  • 连续处理是对无界数据流进行实时处理,持续接收和处理数据
  • 迭代处理是对有界数据集进行迭代计算,直到满足特定的终止条件为止

UCR(Unbounded Continuous Rows)是 Flink 中一种数据结构,用于表示无界数据流,在流处理中进行操作和计算

前后台压力是指在流处理中前台和后台操作之间的流量控制机制。它通过动态调整流量来平衡速度和稳定性,防止任务因压力过大而失败

Flink 的迭代是通过特殊的迭代算子和终止条件来实现迭代计算。在每次迭代中,数据会被反复处理,直到满足设定的终止条件为止。

Flink 支持与机器学习和图计算相关的库,如 FlinkML、Gelly 等。这些库提供了丰富的算法和工具,使 Flink 成为处理机器学习和图数据的强大框架。

Flink 的并行度可以通过设置全局并行度和算子级别的并行度来控制。

  • 全局并行度指定了整个应用程序的默认并行度
  • 而算子级别的并行度可以对特定算子进行细粒度的控制

降低 Flink 应用程序的延迟可以通过优化水印生成和事件处理逻辑来实现。例如,使用事件时间窗口来减少乱序处理的影响,调整水印生成策略以减少迟到事件的等待时间等。

  • 通过配置合适的 JVM 参数,如堆内存和堆外内存大小
  • 调整状态后端的配置
  • 合理控制并行度
  • 算子的内存需求

是的,Flink 支持动态调整并行度。
可以通过 Flink 的 REST API 或命令行工具来动态修改并行度,从而根据实际需求进行动态的资源分配和任务调整。

Flink 可以通过水平扩展来处理超大规模数据。可以使用 Flink 的流式和增量计算模型,以及分布式计算和存储技术,将任务和数据分布到多个节点上,以实现大规模数据的高效处理

较大的状态大小可能会增加序列化、传输和存储开销,影响任务的吞吐量和延迟。因此,需要合理设计和管理状态大小,以平衡性能和资源消耗

包括调整并行度和资源分配、优化算子链和任务链、使用异步IO和批量处理等技术手段,以及合理选择状态后端和容错机制等。

Flink 中可以使用广播变量(Broadcast Variable)或连接预分区来处理连接操作的数据倾斜。

  • 广播变量可以将小数据集广播到所有并行任务中
  • 而连接预分区可以将连接操作的输入数据进行预分区,以便更均匀地分布数据负载。

Flink 应用程序主要由三部分组成,源 Source、转换 transformation、目的地 sink。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个目的地(sink)结束。

Flink 编辑模型

Flink作业中的DataStream,Transformation介绍一下

Flink 作业中,包含两个基本的块:数据流(DataStream)和 转换(Transformation)。

DataStream 是逻辑概念,为开发者提供API接口,Transformation 是一系列处理行为的抽象,包含了数据的读取、计算、写出。
所以Flink 作业中的DataStream API 调用,实际上构建了多个由 Transformation 组成的数据处理流水线(Pipeline)
DataStream API 和 Transformation 的转换如下图:

Flink DataStream & Transformation

目前 Flink 支持 8 种分区策略的实现,数据分区体系如下图:

Flink 分区策略

  1. GlobalPartitioner

    数据会被分发到下游算子的第一个实例中进行处理。

  2. ForwardPartitioner

    在 API层面上ForwardPartitioner应用在DataStream上,生成一个新的 DataStream。
    该 Partitioner 比较特殊,用于在同一个 OperatorChain 中上下游算子之间的数据转发,实际上数据是直接传递给下游的,要求上下游并行度一样。

  3. ShufflePartitioner

    随机的将元素进行分区,可以确保下游的Task能够均匀地获得数据,使用代码如下:dataStream.shuffle();

  4. RebalancePartitioner

    以 Round-robin 的方式为每个元素分配分区,确保下游的 Task 可以均匀地获得数据,避免数据倾斜。使用代码如下:dataStream.rebalance();

  5. RescalePartitioner

    根据上下游 Task 的数量进行分区, 使用 Round-robin 选择下游的一个Task 进行数据分区,如上游有2个 Source.,下游有6个 Map,那么每个 Source 会分配3个固定的下游 Map,不会向未分配给自己的分区写人数据。这一点与 ShufflePartitioner 和 RebalancePartitioner 不同, 后两者会写入下游所有的分区。

    Flink Rescale Partitioner

    运行代码如下:dataStream.rescale();

  6. BroadcastPartitioner

    将该记录广播给所有分区,即有N个分区,就把数据复制N份,每个分区1份,其使用代码如下: dataStream.broadcast();

  7. KeyGroupStreamPartitioner

    在 API 层面上,KeyGroupStreamPartitioner 应用在 KeyedStream 上,生成一个新的 KeyedStream。
    KeyedStream 根据 keyGroup 索引编号进行分区,会将数据按 Key 的 Hash 值输出到下游算子实例中。该分区器不是提供给用户来用的。
    KeyedStream 在构造 Transformation 的时候默认使用 KeyedGroup 分区形式,从而在底层上支持作业 Rescale 功能。

  8. CustomPartitionerWrapper

    用户自定义分区器。需要用户自己实现Partitioner接口,来定义自己的分区逻辑。
    更详细的介绍,请参考 Flink 分区策略:你可以不会,但不能不懂

主要包含以下几步:

  1. 获取运行环境 StreamExecutionEnvironment
  2. 接入 source 源
  3. 执行转换操作,如 map()flatmap()keyby()sum()
  4. 输出 sink 源,如 print()
  5. 执行 execute
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception {
        int port; // 定义 socket 的端口号

        try{
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("没有指定port参数,使用默认值9000");
            port = 9000;
        }

        // 获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 连接 socket 获取输入的数据
        DataStreamSource<String> text = env.socketTextStream("10.192.12.106", port, "\n");
        // 计算数据
        // 打平操作,把每行的单词转为<word, count>类型的数据
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word:splits) {
                    out.collect(new WordWithCount(word,1L));
                }
            }
        }).keyBy("word")  // 针对相同的 word 数据进行分组
        .timeWindow(Time.seconds(2),Time.seconds(1))  // 指定计算数据的窗口大小和滑动窗口大小
        .sum("count");

        // 把数据打印到控制台
        // 使用一个并行度
        windowCount.print().setParallelism(1);

        // 注意:因为 flink 是懒加载的,所以必须调用 execute 方法,上面的代码才会执行
        env.execute("streaming word count");
      }

      /**  
       * 主要为了存储单词以及单词出现的次数
       */
        public static class WordWithCount{
            public String word;
            public long count;

            public WordWithCount(){}
            public WordWithCount(String word, long count) {
                this.word = word;
                this.count = count;
            }

            @Override
            public String toString() {
                return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';
            }
        }
    }
}
  1. 数据读取,这是 Flink 流计算应用的起点,常用算子有:
  2. 从内存读:fromElements
  3. 从文件读:readTextFile
  4. Socket 接入 :socketTextStream
  5. 自定义读取:createInput

  6. 处理数据的算子,主要用于 转换 过程 常用的算子包括:Map(单输入单输出)、FlatMap(单输入、多输出)、Filter(过滤)、KeyBy(分组)、Reduce(聚合)、Window(窗口)、Connect(连接)、Split(分割)等。

  • Checkpoint
  • State
  • Time
  • Window

窗口概念:将无界流的数据,按时间区间,划分成多份数据,分别进行统计(聚合)
Flink支持两种划分窗口的方式(time和count),第一种,按时间驱动进行划分、另一种按数据驱动进行划分。

窗口划分方式

  1. 按时间驱动 Time Window 划分可以分为滚动窗口 Tumbling Window 和滑动窗口 Sliding Window。

  2. 按数据驱动 Count Window也可以划分为滚动窗口 Tumbling Window 和滑动窗口 Sliding Window。

  3. Flink 支持窗口的两个重要属性(窗口长度 size 和滑动间隔 interval),通过窗口长度和滑动间隔来区分滚动窗口和滑动窗口。

    如果size=interval,那么就会形成tumbling-window(无重叠数据)--滚动窗口 如果size(1min)>interval(30s),那么就会形成sliding-window(有重叠数据)--滑动窗口

通过组合可以得出四种基本窗口:

  1. time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))---基于时间的滚动窗口

    时间滚动窗口 时间滚动窗口

  2. time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(10), Time.seconds(5))---基于时间的滑动窗口

    时间滑动窗口 时间滑动窗口

  3. count-tumbling-window 无重叠数据的数量窗口,设置方式举例:countWindow(5)---基于数量的滚动窗口

    数量滚动窗口

  4. count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(10,5)---基于数量的滑动窗口

    数量滑动窗口

  5. Flink 中还支持一个特殊的窗口: 会话窗口 SessionWindows

会话窗口

session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况
session 窗口在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。
一个 session 窗口通过一个 session 间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去,如下图所示:

窗口原理流程图

WindowAssigner

  1. 窗口算子负责处理窗口,数据流源源不断地进入算子(window operator)时,每一个到达的元素首先会被交给 WindowAssigner。WindowAssigner 会决定元素被放到哪个或哪些窗口(window),可能会创建新窗口。因为一个元素可以被放入多个窗口中(个人理解是滑动窗口,滚动窗口不会有此现象),所以同时存在多个窗口是可能的。注意,Window 本身只是一个 ID 标识符,其内部可能存储了一些元数据,如 TimeWindow 中有开始和结束时间,但是并不会存储窗口中的元素。窗口中的元素实际存储在 Key/Value State 中,key 为 Window,value 为元素集合(或聚合值)。为了保证窗口的容错性,该实现依赖了 Flink 的 State 机制。

WindowTrigger

  1. 每一个 Window 都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。每当有元素加入到该窗口,或者之前注册的定时器超时了,那么Trigger都会被调用。Trigger的返回结果可以是 :

    • continue(继续、不做任何操作),
    • Fire(触发计算,处理窗口数据),
    • Purge(触发清理,移除窗口和窗口中的数据),
    • Fire + purge(触发计算+清理,处理数据并移除窗口和窗口中的数据)。

当数据到来时,调用 Trigger 判断是否需要触发计算,如果调用结果只是 Fire 的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据不清理,等待下次 Trigger fire 的时候再次执行计算。窗口中的数据会被反复计算,直到触发结果清理。在清理之前,窗口和数据不会释放没所以窗口会一直占用内存。

Trigger 触发流程:

  1. 当Trigger Fire了,窗口中的元素集合就会交给 Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给函数进行计算。

  2. 计算函数收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口的结果值可以是一个也可以是多个。DataStream API 上可以接收不同类型的计算函数,包括预定义的 sum(), min(), max(),还有 ReduceFunctionFoldFunction,还有 WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。

  3. Flink 对于一些聚合类的窗口计算(如sum,min)做了优化,因为聚合类的计算不需要将窗口中的所有数据都保存下来,只需要保存一个result值就可以了。每个进入窗口的元素都会执行一次聚合函数并修改result值。这样可以大大降低内存的消耗并提升性能。但是如果用户定义了 Evictor,则不会启用对聚合窗口的优化,因为 Evictor 需要遍历窗口中的所有元素,必须要将窗口中所有元素都存下来。

在Flink的流式处理中,会涉及到时间的不同概念,主要分为三种时间机制,如下图所示:
时间类型

  1. 事件时间 EventTime

    事件发生的时间,例如:点击网站上的某个链接的时间,每一条日志都会记录自己的生成时间。
    如果以 EventTime 为基准来定义时间窗口那将形成 EventTimeWindow,要求消息本身就应该携带 EventTime

  2. 摄入时间 IngestionTime

    数据进入 Flink 的时间,如某个 Flink 节点的 sourceoperator 接收到数据的时间,例如:某个 source 消费到 kafka 中的数据
    如果以 IngesingtTime 为基准来定义时间窗口那将形成 IngestingTimeWindow,以 source 的 systemTime 为准

  3. 处理时间 ProcessingTime

    某个 Flink 节点执行某个 operation 的时间,例如:timeWindow 处理数据时的系统时间,默认的时间属性就是 Processing Time
    如果以 ProcessingTime 基准来定义时间窗口那将形成 ProcessingTimeWindow,以 operator 的 systemTime 为准

在 Flink 的流式处理中,绝大部分的业务都会使用 EventTime,一般只在 EventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。
如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所示:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironrnent();
    // 使用处理时间
    // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    // 使用摄入时间
    // env.setStrearnTimeCharacteristic(TimeCharacteristic.IngestionTime);
    // 使用事件时间
    env.setStrearnTimeCharacteristic(TimeCharacteristic.EventTime);

在流数据处理中,有没有遇到过数据延迟等问题,通过什么处理呢?

数据延迟问题举个例子:

  1. 案例 1

    假你正在去往地下停车场的路上,并且打算用手机点一份外卖。
    选好了外卖后,你就用在线支付功能付款了,这个时候是 11 点 50 分。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在 Retry 重试“支付”这个操作。
    当你找到自己的车并且开出地下停车场的时候,已经是 12 点 05 分了。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。
    在上面这个场景中你可以看到,支付数据的事件时间是 11 点 50 分,而支付数据的处理时间是 12 点 05 分

  2. 案例 2

    某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。
    A 用户在 11:02 对 App 进行操作,B 用户在 11:03 操作了 App,
    但是 A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到 B 用户 11:03 的消息,然后再接受到 A 用户 11:02 的消息,消息乱序了

一般处理数据延迟、消息乱序等问题,通过 WaterMark 水印来处理。
水印是用来解决数据延迟、数据乱序等问题,总结如下图所示:
水印概念
水印就是一个时间戳(timestamp),Flink 可以给数据流添加水印

  • 水印并不会影响原有 EventTime 事件时间
  • 当数据流添加水印后,会按照水印时间来触发窗口计算,也就是说 watermark 水印是用来触发窗口计算的
  • 设置水印时间,会比事件时间小几秒钟,表示最大允许数据延迟达到多久
  • 水印时间 = 事件时间 - 允许延迟时间 (例如:10:09:57 = 10:10:00 - 3s )

Watermark 原理讲解一下?

如下图所示:
watermark 原理
窗口是 10 分钟触发一次,现在在 12:00~12:10 有一个窗口,本来有一条数据是在 12:00~12:10 这个窗口被计算,但因为延迟,12:12 到达,这时 12:00~12:10 这个窗口就会被关闭,只能将数据下发到下一个窗口进行计算,这样就产生了数据延迟,造成计算不准确。
现在添加一个水位线:数据时间戳为 2 分钟。这时用数据产生的事件时间 12:12 - 允许延迟的水印 2 分钟 = 12:10 >= 窗口结束时间 。窗口触发计算,该数据就会被计算到这个窗口里。
在 DataStream API 中使用 TimestampAssigner 接口定义时间戳的提取行为,包含两个子接口 AssignerWithPeriodicWatermarks 接口和 AssignerWithPunctuatedWaterMarks 接口
watermark assigner

如果数据延迟非常严重呢?只使用 WaterMark 可以处理吗?那应该怎么解决?

使用 WaterMark + EventTimeWindow 机制可以在一定程度上解决数据乱序的问题,但是,WaterMark 水位线也不是万能的,在某些情况下,数据延迟会非常严重,即使通过 Watermark + EventTimeWindow 也无法等到数据全部进入窗口再进行处理,因为窗口触发计算后,对于延迟到达的本属于该窗口的数据,Flink 默认会将这些延迟严重的数据进行丢弃
那么如果想要让一定时间范围的延迟数据不会被丢弃,可以使用 Allowed Lateness(允许迟到机制/侧道输出机制)设定一个允许延迟的时间和侧道输出对象来解决
即使用 WaterMark + EventTimeWindow + Allowed Lateness 方案(包含侧道输出),可以做到数据不丢失。

API 调用:

  1. allowedLateness(lateness:Time) ---设置允许延迟的时间

    该方法传入一个 Time 值,设置允许数据迟到的时间,这个时间和 watermark 中的时间概念不同。再来回顾一下,
    watermark=数据的事件时间-允许乱序时间值
    随着新数据的到来,watermark 的值会更新为最新数据事件时间-允许乱序时间值,但是如果这时候来了一条历史数据,watermark 值则不会更新。
    总的来说,watermark 永远不会倒退它是为了能接收到尽可能多的乱序数据。
    那这里的 Time 值呢?主要是为了等待迟到的数据,如果属于该窗口的数据到来,仍会进行计算,后面会对计算方式仔细说明
    注意:该方法只针对于基于 event-time 的窗口

  2. sideOutputLateData(outputTag:OutputTag[T]) ---保存延迟数据

    该方法是将迟来的数据保存至给定的 outputTag 参数,而 OutputTag 则是用来标记延迟数据的一个对象。

  3. DataStream.getSideOutput(tag:OutputTag[X]) ---获取延迟数据

    通过 window 等操作返回的 DataStream 调用该方法,传入标记延迟数据的对象来获取延迟的数据

简单说一下什么是 State?

Flink 状态管理

在 Flink 中,状态被称作 state,是用来保存中间的计算结果或者缓存数据
根据状态是否需要保存中间结果,分为无状态计算有状态计算
对于流计算而言,事件持续产生,如果每次计算相互独立,不依赖上下游的事件,则相同输入,可以得到相同输出,是无状态计算
如果计算需要依赖于之前或者后续事件,则被称为有状态计算

有/无状态计算

有状态计算如 sum 求和,数据类加等。

有状态计算的例子

  • 托管状态(ManagedState):由 Flink 自行进行管理的 State。
  • 原始状态(Raw State):由用户自行进行管理。

两者区别:

  1. 从状态管理方式的方式来说,Managed StateFlink Runtime 管理,自动存储,自动恢复,在内存管理上有优化;而 Raw State 需要用户自己管理,需要自己序列化,Flink 不知道 State 中存入的数据是什么结构,只有用户自己知道,需要最终序列化为可存储的数据结构。

  2. 从状态数据结构来说,Managed State 支持已知的数据结构,如 Value、List、Map 等。而 Raw State 只支持字节数组,所有状态都要转换为二进制字节数组才可以。

  3. 从推荐使用场景来说,Managed State 大多数情况下均可使用,而 Raw State 是当 Managed State 不够用时,比如需要自定义 Operator 时,才会使用 Raw State。在实际生产过程中,只推荐使用 Managed State。

State 按照是否有 key 划分为 KeyedStateOperatorState 两种。

keyedState 特点

  1. 只能用在 keyedStream 上的算子中,状态跟特定的 key 绑定。

  2. keyStream 流上的每一个 key 对应一个 state 对象。若一个 operator 实例处理多个 key,访问相应的多个 State,可对应多个 state

  3. keyedState 保存在 StateBackend 中

  4. 通过 RuntimeContext 访问,实现 Rich Function 接口。

  5. 支持多种数据结构:ValueState、ListState、ReducingState、AggregatingState、MapState。

Flink keyBy

OperatorState 特点

  1. 可以用于所有算子,但整个算子只对应一个 state。

  2. 并发改变时有多种重新分配的方式可选:均匀分配;

  3. 实现 CheckpointedFunction 或者 ListCheckpointed 接口。

  4. 目前只支持 ListState 数据结构。

Flink WordCount ListState

这里的 fromElements 会调用 FromElementsFunction 的类,其中就使用了类型为 List state 的 operator state

Flink 中,广播状态中叫作 BroadcastState。在广播状态模式中使用。所谓广播状态模式,就是来自一个流的数据需要被广播到所有下游任务,在算子本地存储,在处理另一个流的时候依赖于广播的数据。
下面以一个示例来说明广播状态模式:

Flink BroadcastState

上图这个示例包含两个流:
一个为 kafka 模型流,该模型是通过机器学习或者深度学习训练得到的模型,将该模型通过广播,发送给下游所有规则算子,规则算子将规则缓存到 Flink 的本地内存中;
另一个为 Kafka 数据流,用来接收测试集,该测试集依赖于模型流中的模型,通过模型完成测试集的推理任务。

广播状态(State)必须是 MapState 类型,广播状态模式需要使用广播函数进行处理,广播函数提供了处理广播数据流和普通数据流的接口。

在 Flink 中使用状态,包含两种状态接口:

  • 状态操作接口:使用状态对象本身存储,写入、更新数据。
  • 状态访问接口:从 StateBackend 获取状态对象本身。

状态操作接口

Flink 中的状态操作接口面向两类用户,即应用开发者Flink 框架本身。所有 Flink 设计了两套接口

  1. 面向开发者 State 接口

    面向开发的 State 接口只提供了对 State 中数据的增删改基本操作接口,用户无法访问状态的其他运行时所需要的信息。接口体系如下图:
    面向开发者的 State 接口体系

  2. 面向内部 State 接口

    内部 State 接口 是给 Flink 框架使用,提供更多的 State 方法,可以根据需要灵活扩展。除了对 State 中数据的访问之外,还提供内部运行时信息,如 State 中数据的序列化器,命名空间(namespace)、命名空间的序列化器、命名空间合并的接口。内部 State 接口命名方式为 InternalxxxState

状态访问接口

有了状态之后,开发者自定义 UDF 时,应该如何访问状态?

状态会被保存在 StateBackend 中,但 StateBackend 又包含不同的类型。所有 Flink 中抽象了两个状态访问接口:OperatorStateStoreKeyedStateStore,用户在编写 UDF 时,就无须考虑到底是使用哪种 StateBackend 类型接口。

OperatorStateStore 接口原理:
Flink OperatorStateStore
OperatorState 数据以 Map 形式保存在内存中,并没有使用 RocksDBStateBackend 和 HeapKeyedStateBackend。

KeyedStateStore 接口原理:
Flink KeyedStateStore

keyedStateStore 数据使用 RocksDBStateBackend 或者 HeapKeyedStateBackend 来存储,KeyedStateStore 中创建、获取状态都交给了具体的 StateBackend 来处理,KeyedStateStore 本身更像是一个代理。

在 Flink中,状态存储被叫做 StateBackend, 它具备两种能力: - 在计算过程中提供访问 State 能力,开发者在编写业务逻辑中能够使用 StateBackend 的接口读写数据。 - 能够将 State 持久化到外部存储,提供容错能力。

Flink 状态提供三种存储方式

  1. 内存:MemoryStateBackend,适用于验证、测试、不推荐生产使用。
  2. 文件:FSStateBackend,适用于长周期大规模的数据。
  3. RocksDB: RocksDBStateBackend,适用于长周期大规模的数据。


上面提到的 StateBackend 是面向用户的,在 Flink 内部 3 种 State 的关系如下图:
StateBackend


在运行时,MemoryStateBackendFSStateBackend 本地的 State 都保存在 TaskManager 的内存中,所以其底层都依赖于 HeapKeyedStateBackend。HeapKeyedStateBackend 面向 Flink 引擎内部,使用者无须感知。

内存型 StateBackend

MemoryStateBackend,运行时所需的 State 数据全部保存在 TaskManager JVM 堆上内存中,KV 类型的 State、窗口算子的 State 使用 HashTable 来保存数据、触发器等。执行检查点的时候,会把 State 的快照数据保存到 JobManager 进程的内存中

MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。

基于内存的 Stateßackend 在生产环境下不建议使用,可以在本地开发调试测试 。

注意点如下

1
2
3
4
5
1) State 存储在 JobManager 的内存中.受限于 JobManager的内存大小

2) 每个 State 默认 5MB,可通过 MemoryStateBackend 构造函数调整

3) 每个 Stale 不能超过 Akka Frame 大小

文件型 StateBackend

FSStateBackend,运行时所需的 State 数据全部保存在 TaskManager 的内存中,执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中

可以是分布式或者本地文件系统,路径如:

HDFS 路径:hdfs://namenode:40010/flink/checkpoints

本地路径:file:///data/flink/checkpoints

FSStateBackend 适用于处理大状态、长窗口、或者大键值状态的有状态处理任务

注意点如下

1
2
3
4
5
1) State 数据首先被存在 TaskManager 的内存中

2) State大小不能超过TM内存

3) TM异步将State数据写入外部存储

MemoryStateBackend 和FSStateBackend 都依赖于HeapKeyedStateBackend,HeapKeyedStateBackend 使用 State存储数据

RocksDBStateBackend

RocksDBStateBackend 跟内存型和文件型都不同 。

RocksDBStateBackend 使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的State数据全量或者增量持久化到配置的文件系统中,

在 JobManager 内存中会存储少量的检查点元数据。RocksDB克服了State受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

缺点:RocksDBStateBackend 相比基于内存的StateBackend,访问State的成本高很多,可能导致数据流的吞吐量剧烈下降,甚至可能降低为原来的 1/10

适用场景

1
2
3
4
5
1) 最适合用于处理大状态长窗口或大键值状态的有状态处理任务

2) RocksDBStateBackend 非常适合用于高可用方案

3) RocksDBStateBackend **目前唯一支持增量检查点的后端** 增量检查点非常适用于超 大状态的场景

注意点

1
2
3
4
5
6
7
1)  State 大小仅限于磁盘大小不受内存限制

2) **RocksDBStateBackend** 也需要配置外部文件系统集中保存 State 

3) RocksDB的 JNI API **基于 byte 数组** key 和单 Value 的大小不能超过 8 字节

4) 对于使用具有合并操作状态的应用程序 ListState 随着时间可能会累积到超过 2*31 次方字节大小这将会导致在接下来的查询中失败

首选,Flink 的状态最终都要持久化到第三方存储中,确保集群故障或者作业挂掉后能够恢复。

RocksDBStateBackend 持久化策略有两种:

  • 全量持久化策略 RocksFullSnapshotStrategy
  • 增量持久化策略 RocksIncementalSnapshotStrategy

全量持久化策略

每次将全量的 State 写入到状态存储中(HDFS)。内存型、文件型、RocksDB 类型的 StataBackend 都支持全量持久化策略。

Snapshot Strategy

快照保存策略类体系

在执行持久化策略的时候,使用异步机制,每个算子启动1个独立的线程,将自身的状态写入分布式存储可靠存储中。在做持久化的过程中,状态可能会被持续修改,

基于内存的状态后端使用 CopyOnWriteStateTable 来保证线程安全,RocksDBStateBackend 则使用 RocksDB 的快照机制,使用快照来保证线程安全。

增量持久化策略

增量持久化就是每次持久化增量的 State,只有 RocksDBStateBackend 支持增量持久化

Flink 增量式的检查点以 RocksDB 为基础, RocksDB 是一个基于 LSM-Tree 的KV 存储。新的数据保存在内存中, 称为 memtable。如果 Key 相同,后到的数据将覆盖之前的数据,一旦memtable写满了,RocksDB就会将数据压缩并写入磁盘。memtable的数据持久化到磁盘后,就变成了不可变的 sstable

因为 sstable 是不可变的,Flink 对比前一个检查点创建和删除的 RocksDB sstable 文件就可以计算出状态有哪些发生改变。

为了确保 sstable 是不可变的,Flink 会在 RocksDB 触发刷新操作,强制将 memtable 刷新到磁盘上 。在 Flink 执行检查点时,会将新的 sstable 持久化到 HDFS 中,同时保留引用。这个过程中 Flink 并不会持久化本地所有的sstable,因为本地的一部分历史sstable 在之前的检查点中已经持久化到存储中了,只需增加对 sstable文件的引用次数就可以。

RocksDB 会在后台合并 sstable 并删除其中重复的数据。然后在 RocksDB 删除原来的 sstable,替换成新合成的 sstable。新的 sstable 包含了被删除的 sstable 中的信息,通过合并历史的 sstable 会合并成一个新的 sstable,并删除这些历史 sstable. 可以减少检查点的历史文件,避免大量小文件的产生。

  1. DataStream 中状态过期

    可以对 DataStream 中的每一个状态设置清理策略 StateTtlConfig,可以设置的内容如下:

    过期时间:超过多长时间未访问,视为State过期,类似于缓存。

    过期时间更新策略:创建和写时更新、读取和写时更新。

    State 可见性:未清理可用,超时则不可用。

  2. Flink SQL 中状态过期

    Flink SQL 一般在流 Join、聚合类场景使用 State,如果 State 不定时清理,则导致 State 过多,内存溢出。清理策略配置如下:

StreamQueryConfig qConfig = ...
// 设置过期时间为 min = 12 小时 ,max = 24小时
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));

Flink Checkpoint

Flink 使用 轻量级分布式快照,设计检查点(checkpoint)实现可靠容错。

什么是 Checkpoin 检查点?

Checkpoint 被叫做检查点,是 Flink 实现容错机制最核心的功能,是 Flink 可靠性的基石,它能够根据配置周期性地基于 Stream 中各个 Operator 的状态来生成 Snapshot 快照,从而将这些状态数据定期持久化存储下来,当 Flink 程序一旦意外崩溃时,重新运行程序时可以有选择地从这些 Snapshot 进行恢复,从而修正因为故障带来的程序数据状态中断。

Flink 的 checkpoint 机制原理来自“Chandy-Lamport algorithm”算法

注意:区分 State 和 Checkpoint

  1. State:

    一般指一个具体的 Task/Operator 的状态(operator 的状态表示一些算子在运行的过程中会产生的一些中间结果)

    State 数据默认保存在 Java 的堆内存中/TaskManage 节点的内存中

    State 可以被记录,在失败的情况下数据还可以恢复。

  2. Checkpoint:

    表示了一个 FlinkJob 在一个特定时刻的一份全局状态快照,即包含了所有 Task/Operator 的状态

    可以理解为 Checkpoint 是把 State 数据定时持久化存储了

    比如 KafkaConsumer 算子中维护的 Offset 状态,当任务重新恢复的时候可以从 Checkpoint 中获取。

什么是 Savepoint 保存点?

保存点在 Flink 中叫作 Savepoint. 是基于 Flink 检查点机制的应用完整快照备份机制. 用来保存状态可以在另一个集群或者另一个时间点,从保存的状态中将作业恢复回来。适用于应用升级、集群迁移、 Flink 集群版本更新、A/B 测试以及假定场景、暂停和重启、归档等场景。保存点可以视为一个(算子 ID -> State)的Map,对于每一个有状态的算子,Key 是算子 ID,Value 是算子 State。

什么是 CheckpointCoordinator 检查点协调器?

Flink 中检查点协调器叫作 CheckpointCoordinator,负责协调 Flink 算子的 State 的分布式快照。当触发快照的时候,CheckpointCoordinator 向 Source 算子中注入 Barrier 消息 ,然后等待所有的 Task 通知检查点确认完成,同时持有所有 Task 在确认完成消息中上报的 State 句柄。

Checkpoint 中保存的是什么信息?

检查点里面到底保存着什么信息呢?我们以 flink 消费 kafka 数据 wordcount 为例:

  1. 我们从 Kafka 读取到一条条的日志,从日志中解析出 app_id,然后将统计的结果放到内存中一个 Map 集合,app_id 做为 key,对应的 pv 做为 value,每次只需要将相应 app_id 的 pv 值 +1 后 put 到 Map 中即可;

  2. kafka topic:test;

  3. flink 运算流程如下:
    Flink Kafka Source Pv Calculation

kafka topic 有且只有一个分区

假设 kafka 的 topic-test 只有一个分区,flink 的 Source task 记录了当前消费到 kafka test topic 的所有 partition 的 offset

1
例:(0,1000)表示 0 号 partition 目前消费到 offset 为 1000 的数据

Flink的 pv task 记录了当前计算的各 app 的 pv 值,为了方便讲解,我这里有两个 app:app1、app2

1
2
3
4
例:
(app1,50000)表示 app1 当前 pv 值为 50000 
(app2,10000)表示 app2 当前 pv 值为 10000 
每来一条数据,只需要确定相应 app_id,将相应的 value 值 +1 后 put 到 map 中即可;

该案例中,CheckPoint 保存的其实就是第 n 次 CheckPoint 消费的 offset 信息和各 app 的 pv 值信息,记录一下发生 CheckPoint 当前的状态信息,并将该状态信息保存到相应的状态后端。图下代码:(注:状态后端是保存状态的地方,决定状态如何保存,如何保障状态高可用,我们只需要知道,我们能从状态后端拿到 offset 信息和 pv 信息即可。状态后端必须是高可用的,否则我们的状态后端经常出现故障,会导致无法通过 checkpoint 来恢复我们的应用程序)。

1
2
3
4
5
6
7
chk-100
    该状态信息表示第 100 次 CheckPoint 的时候
offset:(0,1000)
    partition 0 offset 消费到了 1000
pv 统计:
(app1,50000)
(app2,10000)

当作业失败后,检查点如何恢复作业?

Flink 提供了应用自动恢复机制手动作业恢复机制

应用自动恢复机制:

Flink 设置有作业失败重启策略,包含三种:

  1. 定期恢复策略:fixed-delay

    固定延迟重启策略会尝试一个给定的次数来重启 Job,如果超过最大的重启次数,Job 最终将失败,在连续两次重启尝试之间,重启策略会等待一个固定时间,默认 Integer.MAX_VALUE 次

  2. 失败比率策略:failure-rate

    失败率重启策略在 job 失败后重启,但是超过失败率后,Job 会最终被认定失败,在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

  3. 直接失败策略:None

    失败不重启

手动作业恢复机制。

因为 Flink 检查点目录分别对应的是 JobId,每通过 flink run 方式/页面提交方式恢复都会重新生成 jobId,Flink 提供了在启动之时通过设置 -s .参数指定检查点目录的功能,让新的 jobld 读取该检查点元文件信息和状态信息,从而达到指定时间节点启动作业的目的。

启动方式如下:

/bin/flink -s /flink/checkpoints/03112312a12398740a87393/chk-50/_metadata

当作业失败后,从保存点如何恢复作业?

从保存点恢复作业并不简单,尤其是在作业变更(如修改逻辑、修复 bug))的情况下, 需要考虑如下几点:

  1. 算子的顺序改变

    如果对应的 UID 没变,则可以恢复,如果对应的 UID 变了恢复失败。

  2. 作业中添加了新的算子

    如果是无状态算子,没有影响,可以正常恢复,如果是有状态的算子,跟无状态的算子 一样处理。

  3. 从作业中删除了一个有状态的算子

    默认需要恢复保存点中所记录的所有算子的状态,如果删除了一个有状态的算子,从保存点回复的时候被删除的 OperatorID 找不到,所以会报错 可以通过在命令中添加 allowNonReStoredSlale(short: -n),跳过无法恢复的算子 。

  4. 添加和删除无状态的算子

    如果手动设置了 UID 则可以恢复,保存点中不记录无状态的算子,如果是自动分配的 UID,那么有状态算子的可能会变(Flink 一个单调递增的计数器生成 UID,DAG 改版,计数器极有可能会变)很有可能恢复失败。

要实现分布式快照,最关键的是能够将数据流切分。Flink 中使用 Barrier(屏障)来切分数据流。Barrier 会周期性地注入数据流中,作为数据流的一部分,从上游到下游被算子处理。Barrier 会严格保证顺序,不会超过其前边的数据。Barrier 将记录分割成记录集,两个 Barrier 之间的数据流中的数据隶属于同一个检查点。每一个 Barrier 都携带一个其所属快照的 ID 编号。Barrier 随着数据向下流动,不会打断数据流,因此非常轻量。 在一个数据流中,可能会存在多个隶属于不同快照的 Barrier,并发异步地执行分布式快照,如下图所示:
Flink Checkpoint Barrier
Barrier 会在数据流源头被注人并行数据流中。Barrier n 所在的位置就是恢复时数据重新处理的起始位置。 例如,在 Kafka 中,这个位置就是最后一个记录在分区内的偏移量(offset),作业恢复时,会根据这个位置从这个偏移量之后向 kafka 请求数据,这个偏移量就是 State 中保存的内容之一。

Barrier 接着向下游传递。当一个非数据源算子从所有的输入流中收到了快照 n 的 Barrier 时,该算子就会对自己的 State 保存快照,并向自己的下游广播发送快照 n 的 Barrier。一旦 Sink 算子接收到 Barrier,有两种情况:

  1. 如果是引擎内严格一次处理保证,当 Sink 算子已经收到了所有上游的 Barrie n 时,Sink 算子对自己的 State 进行快照,然后通知检查点协调器(CheckpointCoordinator)。当所有的算子都向检查点协调器汇报成功之后,检查点协调器向所有的算子确认本次快照完成。

  2. 如果是端到端严格一次处理保证,当 Sink 算子已经收到了所有上游的 Barrie n 时,Sink 算子对自己的 State 进行快照,并预提交事务(两阶段提交的第一阶段),再通知检查点协调器(CheckpointCoordinator),检查点协调器向所有的算子确认本次快照完成,Sink 算子提交事务(两阶段提交的第二阶段),本次事务完成。

我们接着 Checkpoint 中保存的是什么信息? 的案例来具体说一下如何执行分布式快照:

1
2
3
4
5
6
7
对应到 pv 案例中就是,Source Task 接收到 JobManager 的编号为 chk-100(从最近一次恢复)的 CheckPoint 触发请求后,
发现自己恰好接收到 kafka offset(0,1000)处的数据,所以会往 offset(0,1000)数据之后 offset(0,1001)数据之前安插一个 barrier,
然后自己开始做快照,也就是将 offset(0,1000)保存到状态后端 chk-100 中。
然后 barrier 接着往下游发送,当统计 pv 的 task 接收到 barrier 后,也会暂停处理数据,将自己内存中保存的 pv 信息(app1,50000)(app2,10000)保存到状态后端 chk-100 中。
OK,flink 大概就是通过这个原理来保存快照的;

统计 pv 的 task 接收到 barrier,就意味着 barrier 之前的数据都处理了,所以说,不会出现丢数据的情况。

什么是 Barrier 对齐?

Flink Barrier 对齐
一旦 Operator 从输入流接收到 CheckPoint barrier n,它就不能处理来自该流的任何数据记录,直到它从其他所有输入接收到 barrier n 为止。否则,它会混合属于快照 n 的记录和属于快照 n+1 的记录;

如上图所示:

  • 图 1,算子收到数字流的 Barrier,字母流对应的 barrier 尚未到达

  • 图 2,算子收到数字流的 Barrier,会继续从数字流中接收数据,但这些流只能被搁置,记录不能被处理,而是放入缓存中,等待字母流 Barrier 到达。在字母流到达前, 1,2,3 数据已经被缓存。

  • 图 3,字母流到达,算子开始对齐 State 进行异步快照,并将 Barrier 向下游广播,并不等待快照执行完毕。

  • 图 4,算子做异步快照,首先处理缓存中积压数据,然后再从输入通道中获取数据。

什么是 Barrier 不对齐?

checkpoint 是要等到所有的 barrier 全部都到才算完成

上述图 2 中,当还有其他输入流的 barrier 还没有到达时,会把已到达的 barrier 之后的数据 1、2、3 搁置在缓冲区,等待其他流的 barrier 到达后才能处理。

barrier 不对齐:就是指当还有其他流的 barrier 还没到达时,为了不影响性能,也不用理会,直接处理 barrier 之后的数据。等到所有流的 barrier 的都到达后,就可以对该 Operator 做 CheckPoint 了。

为什么要进行 barrier 对齐?不对齐到底行不行?

Exactly Once 时必须 barrier 对齐,如果 barrier 不对齐就变成了At Least Once;

CheckPoint 的目的就是为了保存快照,如果不对齐,那么在 chk-100 快照之前,已经处理了一些 chk-100 对应的 offset 之后的数据,当程序从 chk-100 恢复任务时,chk-100 对应的 offset 之后的数据还会被处理一次,所以就出现了重复消费。

Exactly-Once 语义:指端到端的一致性,从数据读取、引擎计算、写入外部存储的整个过程中,即使机器或软件出现故障,都确保数据仅处理一次,不会重复、也不会丢失。

要实现 Exactly-Once,需具备什么条件?

流系统要实现 Exactly-Once,需要保证上游 Source 层、中间计算层和下游 Sink 层三部分同时满足端到端严格一次处理,如下图:
Flink 端对端一致性
Flink 端到端严格一次处理:

  • Source 端:数据从上游进入Flink,必须保证消息严格一次消费。同时 Source 端必须满足可重放(replay)。否则 Flink 计算层收到消息后未计算,却发生 failure 而重启,消息就会丢失。

  • Flink 计算层:利用 Checkpoint 机制,把状态数据定期持久化存储下来,Flink 程序一旦发生故障的时候,可以选择状态点恢复,避免数据的丢失、重复。

  • Sink 端:Flink 将处理完的数据发送到 Sink 端时,通过两阶段提交协议 ,即 TwoPhaseCommitSinkFunction 函数。该 SinkFunction 提取并封装了两阶段提交协议中的公共逻辑,保证 Flink 发送 Sink 端时实现严格一次处理语义。同时:Sink 端必须支持事务机制,能够进行数据回滚或者满足幂等性

    • 回滚机制:即当作业失败后,能够将部分写入的结果回滚到之前写入的状态。

    • 幂等性:就是一个相同的操作,无论重复多少次,造成的结果和只操作一次相等。即当作业失败后,写入部分结果,但是当重新写入全部结果时,不会带来负面结果,重复写入不会带来错误结果。

什么是两阶段提交协议?

两阶段提交协议(Two-Phase Commit,2PC)是解决分布式事务问题最常用的方法,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即实现 ACID 中的 A(原子性)。

两阶段提交协议中有两个重要角色:协调者(Coordinator)参与者(Participant),其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个。

两阶段提交阶段分为两个阶段:投票阶段(Voting)提交阶段(Commit)

投票阶段:

  1. 协调者向所有参与者发送 prepare 请求和事务内容,询问是否可以准备事务提交,等待参与者的相应。

  2. 参与者执行事务中包含的操作,并记录 undo 日志(用于回滚)和 redo 日志(用于重放),但不真正提交。

  3. 参与者向协调者返回事务操作的执行结果,执行成功返回yes,失败返回no。

提交阶段:

分为成功与失败两种情况。

若所有参与者都返回 yes,说明事务可以提交:

  • 协调者向所有参与者发送 commit 请求。
  • 参与者收到 commit 请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回 ack 。
  • 协调者收到所有参与者的 ack 消息,事务成功完成,如下图:

Flink 2PC 投票阶段(成功)
Flink 2PC 提交阶段(成功)

若有参与者返回 no 或者超时未返回,说明事务中断,需要回滚:

  • 协调者向所有参与者发送 rollback 请求。
  • 参与者收到 rollback 请求后,根据 undo 日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回 ack。
  • 协调者收到所有参与者的ack消息,事务回滚完成。

Flink 2PC 投票阶段(失败)
Flink 2PC 回滚阶段

Flink 通过两阶段提交协议来保证 Exactly-Once 语义。

  • 对于 Source 端:Source 端严格一次处理比较简单,因为数据要进入 Flink 中,所以 Flink 只需要保存消费数据的偏移量 (offset)即可。如果 Source 端为 kafka,Flink 将 Kafka Consumer 作为 Source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性。
  • 对于 Sink 端:Sink 端是最复杂的,因为数据是落地到其他系统上的,数据一旦离开 Flink 之后,Flink 就监控不到这些数据了,所以严格一次处理语义必须也要应用于 Flink 写入数据的外部系统,故这些外部系统必须提供一种手段允许提交或回滚这些写入操作,同时还要保证与 Flink Checkpoint 能够协调使用(Kafka 0.11 版本已经实现精确一次处理语义)。

我们以 Kafka - Flink -Kafka 为例 说明如何保证 Exactly-Once 语义。
Flink Exactly-Once - Kafka

如上图所示:Flink 作业包含以下算子。

  • 一个 Source 算子,从 Kafka 中读取数据(即 KafkaConsumer)
  • 一个窗口算子,基于时间窗口化的聚合运算(即 window+window 函数)
  • 一个 Sink 算子,将结果写会到 Kafka(即 kafkaProducer)

Flink 使用两阶段提交协议:预提交(Pre-commit)阶段和提交(Commit)阶段保证端到端严格一次。

预提交阶段

  1. 当 Checkpoint 启动时,进入预提交阶段,JobManager 向 Source Task 注入检查点分界线(CheckpointBarrier),Source Task 将 CheckpointBarrier 插入数据流,向下游广播开启本次快照,如下图所示:

    预提交阶段
    预处理阶段:Checkpoint 启动

  2. Source 端Flink Data Source 负责保存 KafkaTopic 的 offset 偏移量,当 Checkpoint 成功时 Flink 负责提交这些写入,否则就终止取消掉它们,当 Checkpoint 完成位移保存,它会将 checkpoint barrier(检查点分界线) 传给下一个 Operator,然后每个算子会对当前的状态做个快照,保存到状态后端(State Backend)

对于 Source 任务而言,就会把当前的 offset 作为状态保存起来。下次从 Checkpoint 恢复时,Source 任务可以重新提交偏移量,从上次保存的位置开始重新消费数据,如下图所示:

预提交阶段

预处理阶段:checkpoint barrier传递 及 offset 保存

3、Sink 端:从 Source 端开始,每个内部的 transformation 任务遇到 checkpoint barrier(检查点分界线)时,都会把状态存到 Checkpoint 里。数据处理完毕到 Sink 端时,Sink 任务首先把数据写入外部 Kafka,这些数据都属于预提交的事务(还不能被消费),此时的 Pre-commit 预提交阶段下 Data Sink 在保存状态到状态后端的同时还必须预提交它的外部事务,如下图所示:

预提交阶段

预处理阶段:预提交到外部系统

提交阶段

当所有算子任务的快照完成(所有创建的快照都被视为是 Checkpoint 的一部分),也就是这次的 Checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 Checkpoint 完成,此时 Pre-commit 预提交阶段才算完成。才正式到两阶段提交协议的第二个阶段:commit 阶段。该阶段中 JobManager 会为应用中每个 Operator 发起 Checkpoint 已完成的回调逻辑。

本例中的 Data Source 和窗口操作无外部状态,因此在该阶段,这两个 Opeartor 无需执行任何逻辑,但是 Data Sink 是有外部状态的,此时我们必须提交外部事务,当 Sink 任务收到确认通知,就会正式提交之前的事务,Kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了,如下图所示:

预提交阶段

提交阶段:数据精准被消费

注:Flink 由 JobManager 协调各个 TaskManager 进行 Checkpoint 存储,Checkpoint 保存在 StateBackend(状态后端) 中,默认 StateBackend 是内存级的,也可以改为文件级的进行持久化保存。

Exactly-Once 语文总结

Flink 广播机制
从图中可以理解:广播就是一个公共的共享变量,广播变量是发给 TaskManager 的内存中,所以广播变量不应该太大,将一个数据集广播后,不同的 Task 都可以在节点上获取到,每个节点只存一份。 如果不使用广播,每一个 Task 都会拷贝一份数据集,造成内存资源浪费。

反压(backpressure)是实时计算应用开发中,特别是流式计算中,十分常见的问题。反压意味着数据管道中某个节点成为瓶颈,下游处理速率跟不上上游发送数据的速率,而需要对上游进行限速。由于实时计算应用通常使用消息队列来进行生产端和消费端的解耦,消费端数据源是 pull-based 的,所以反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。

简单来说就是下游处理速率跟不上上游发送数据的速率,下游来不及消费,导致队列被占满后,上游的生产会被阻塞,最终导致数据源的摄入被阻塞。

反压会影响到两项指标: checkpoint 时长和 state 大小

  1. 前者是因为 checkpoint barrier 是不会越过普通数据的,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint 总体时间(End to End Duration)变长。

  2. 后者是因为为保证 EOS(Exactly-Once-Semantics,准确一次),对于有两个以上输入管道的 Operator,checkpoint barrier 需要对齐(Alignment),接受到较快的输入管道的 barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到state 里面,导致 checkpoint 变大。

这两个影响对于生产环境的作业来说是十分危险的,因为 checkpoint 是保证数据一致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题。

Flink 社区提出了 FLIP-76: Unaligned Checkpoints[4] 来解耦反压和 checkpoint。

定位反压节点:要解决反压首先要做的是定位到造成反压的节点,这主要有两种办法:
(1)通过 Flink Web UI 自带的反压监控面板 (2)通过 Flink Task Metrics

  1. 反压监控面板

    Flink Web UI 的反压监控提供了 SubTask 级别的反压监控,原理是通过周期性对 Task 线程的栈信息采样,得到线程被阻塞在请求 Buffer(意味着被下游队列阻塞)的频率来判断该节点是否处于反压状态。默认配置下,这个频率在 0.1 以下则为 OK,0.1 至 0.5 为 LOW,而超过 0.5 则为 HIGH。

    Flink 反压监控面板

  2. Task Metrics

    Flink 提供的 Task Metrics 是更好的反压监控手段:

    如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;

    如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。

Flink 数据类型
从图中可以看到 Flink 类型可以分为基础类型(Basic)、数组(Arrays)、复合类型(Composite)、辅助类型(Auxiliary)、泛型和其它类型(Generic)。Flink 支持任意的 Java 或是 Scala 类型。

所谓序列化和反序列化的含义:

  • 序列化:就是将一个内存对象转换成二进制串,形成网络传输或者持久化的数据流。
  • 反序列化:将二进制串转换为内存对。

TypeInformation 是 Flink 类型系统的核心类

在 Flink 中,当数据需要进行序列化时,会使用 TypeInformation 的生成序列化器接口调用一个 createSerialize() 方法,创建出 TypeSerializer,TypeSerializer 提供了序列化和反序列化能力。如下图所示:Flink 的序列化过程
Flink TypeInformation

对于大多数数据类型 Flink 可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化 ,如下图:
Flink TypeInformation Details

比如,BasicTypeInfo、WritableTypeIno ,但针对 GenericTypeInfo 类型,Flink 会使用 Kyro 进行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。
通过一个案例介绍 Flink 序列化和反序列化:
Flink MemorySegment

如上图所示,当创建一个 Tuple 3 对象时,包含三个层面,一是 int 类型,一是 double 类型,还有一个是 Person。Person 对象包含两个字段,一是 int 型的 ID,另一个是 String 类型的 name,

  • 在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到 Tuple 3 会把 int 类型通过 IntSerializer 进行序列化操作,此时 int 只需要占用四个字节。
  • Person 类会被当成一个 Pojo 对象来进行处理,PojoSerializer 序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由 MemorySegment 去支持。

MemorySegment 具有什么作用呢?

MemorySegment 在 Flink 中会将对象序列化到预分配的内存块上,它代表 1 个固定长度的内存,默认大小为 32 kb。MemorySegment 代表 Flink 中的一个最小的内存分配单元,相当于是 Java 的一个 byte 数组。每条记录都会以序列化的形式存储在一个或多个 MemorySegment 中。

因为在内存中存储大量的数据(包括缓存和高效处理)时,JVM 会面临很多问题,包括如下:

JVM 内存管理的不足:

  1. Java 对象存储密度低。Java 的对象在内存中存储包含 3 个主要部分:对象头、实例 数据、对齐填充部分。例如,一个只包含 boolean 属性的对象占 16byte:对象头占 8byte, boolean 属性占 1byte,为了对齐达到 8 的倍数额外占 7byte。而实际上只需要一个 bit(1/8 字节)就够了。

  2. Full GC 会极大地影响性能。尤其是为了处理更大数据而开了很大内存空间的 JVM 来说,GC 会达到秒级甚至分钟级。

  3. OOM 问题影响稳定性。OutOfMemoryError 是分布式计算框架经常会遇到的问题, 当 JVM 中所有对象大小超过分配给 JVM 的内存大小时,就会发生 OutOfMemoryError 错误, 导致 JVM 崩溃,分布式框架的健壮性和性能都会受到影响。

  4. 缓存未命中问题。CPU 进行计算的时候,是从 CPU 缓存中获取数据。现代体系的 CPU 会有多级缓存,而加载的时候是以 Cache Line 为单位加载。如果能够将对象连续存储, 这样就会大大降低 Cache Miss。使得 CPU 集中处理业务,而不是空转。

Flink 并不是将大量对象存在堆内存上,而是将对象都序列化到一个预分配的内存块上, 这个内存块叫做 MemorySegment,它代表了一段固定长度的内存(默认大小为 32KB),也 是 Flink 中最小的内存分配单元,并且提供了非常高效的读写方法,很多运算可以直接操作 二进制数据,不需要反序列化即可执行。每条记录都会以序列化的形式存储在一个或多个 MemorySegment 中。如果需要处理的数据多于可以保存在内存中的数据,Flink 的运算符会 将部分数据溢出到磁盘。

Flink总体内存类图如下:
Flink 内存类图

主要包含 JobManager 内存模型TaskManager 内存模型

JobManager 内存模型

JobManagerFlinkMemory

在 1.10 中,Flink 统一了 TM 端的内存管理和配置,相应的在 1.11 中,Flink 进一步 对 JM 端的内存配置进行了修改,使它的选项和配置方式与 TM 端的配置方式保持一致。

Flink JobManager 配置

TaskManager 内存模型

Flink 1.10 对 TaskManager 的内存模型和 Flink 应用程序的配置选项进行了重大更改, 让用户能够更加严格地控制其内存开销。

Process Memory

Process Memory

内存说明

JVM Heap:JVM 堆上内存

  1. Framework Heap Memory

    Flink 框架本身使用的内存,即 TaskManager 本身所 占用的堆上内存,不计入 Slot 的资源中。

    配置参数:taskmanager.memory.framework.heap.size=128MB,默认 128MB

  2. Task Heap Memory

    Task 执行用户代码时所使用的堆上内存。

    配置参数:taskmanager.memory.task.heap.size

Off-Heap Mempry:JVM 堆外内存

  1. DirectMemory:JVM 直接内存

    1. Framework Off-Heap Memory:Flink框架本身所使用的内存,即TaskManager 本身所占用的对外内存,不计入 Slot 资源。

      配置参数:taskmanager.memory.framework.off-heap.size=128MB,默认 128MB

    2. Task Off-Heap Memory:Task 执行用户代码所使用的对外内存。

      配置参数:taskmanager.memory.task.off-heap.size=0,默认 0

    3. Network Memory:网络数据交换所使用的堆外内存大小,如网络数据交换 缓冲区

  2. Managed Memory:Flink 管理的堆外内存,

    用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。

JVM specific memory:JVM 本身使用的内存

  1. JVM metaspace:JVM 元空间

  2. JVM over-head 执行开销:JVM 执行时自身所需要的内容,包括线程堆栈、IO、 编译缓存等所使用的内存。

    配置参数:taskmanager.memory.jvm-overhead.min=192mb

    taskmanager.memory.jvm-overhead.max=1gb

    taskmanager.memory.jvm-overhead.fraction=0.1

总体内存

  1. 总进程内存:Flink Java 应用程序(包括用户代码)和 JVM 运行整个进程所消 耗的总内存。

    总进程内存 = Flink 使用内存 + JVM 元空间 + JVM 执行开销

    配置项:taskmanager.memory.process.size: 1728m

  2. Flink 总内存:仅 Flink Java 应用程序消耗的内存,包括用户代码,但不包括 JVM 为其运行而分配的内存。

    Flink 使用内存:框架堆内外 + task 堆内外 + network + manage

Flink 在资源管理上可以分为两层:集群资源自身资源。集群资源支持主流的资源管理系统,如 yarn、mesos、k8s 等,也支持独立启动的 standalone 集群。自身资源涉及到每个子 task 的资源使用,由 Flink 自身维护。

1 集群架构剖析

Flink 的运行主要由 客户端、一个 JobManager(后文简称 JM)和 一个以上的 TaskManager(简称 TM 或 Worker)组成。 Flink-Client-JobManager-TaskManager

客户端

客户端主要用于提交任务到集群,在 Session 或 Per Job 模式中,客户端程序还要负责解析用户代码,生成 JobGraph;在 Application 模式中,直接提交用户 jar 和执行参数即可。客户端一般支持两种模式:detached 模式,客户端提交后自动退出。attached 模式,客户端提交后阻塞等待任务执行完毕再退出。

JobManager

JM 负责决定应用何时调度 task,在 task 执行结束或失败时如何处理,协调检查点、故障恢复。该进程主要由下面几个部分组成:

  1. ResourceManager,负责资源的申请和释放、管理 slot(Flink 集群中最细粒度的资源管理单元)。Flink 实现了多种 RM 的实现方案以适配多种资源管理框架,如 yarn、mesos、k8s 或 standalone。在 standalone 模式下,RM 只能分配 slot,而不能启动新的 TM。注意:这里所说的 RM 跟 Yarn 的 RM 不是一个东西,这里的 RM 是 JM 中的一个独立的服务。

  2. Dispatcher,提供 Flink 提交任务的 rest 接口,为每个提交的任务启动新的 JobMaster,为所有的任务提供 web ui,查询任务执行状态。

  3. JobMaster,负责管理执行单个 JobGraph,多个任务可以同时在一个集群中启动,每个都有自己的 JobMaster。注意这里的 JobMaster 和 JobManager 的区别。

TaskManager

TM 也叫做 worker,用于执行数据流图中的任务,缓存并交换数据。集群至少有一个 TM,TM 中最小的资源管理单元是 Slot,每个 Slot 可以执行一个 Task,因此 TM 中 slot 的数量就代表同时可以执行任务的数量。

2 Slot 与资源管理

每个 TM 是一个独立的 JVM 进程,内部基于独立的线程执行一个或多个任务。TM 为了控制每个任务的执行资源,使用 task slot 来进行管理。每个 task slot 代表 TM 中的一部分固定的资源,比如一个 TM 有 3 个 slot,每个 slot 将会得到 TM 的 1/3 内存资源。不同任务之间不会进行资源的抢占,注意 GPU 目前没有进行隔离,目前 slot 只能划分内存资源。

比如下面的数据流图,在扩展成并行流图后,同一的 task 可能分拆成多个任务并行在集群中执行。操作链可以把多个不同的任务进行合并,从而支持在一个线程中先后执行多个任务,无需频繁释放申请线程。同时操作链还可以统一缓存数据,增加数据处理吞吐量,降低处理延迟。

在 Flink 中,想要不同子任务合并需要满足几个条件:下游节点的入边是1(保证不存在数据的 shuffle);子任务的上下游不为空;连接策略总是 ALWAYS;分区类型为 ForwardPartitioner;并行度一致;当前 Flink 开启 Chain 特性。

Flink Streaming DataFlow

在集群中的执行图可能如下:

Flink TaskManager Process

Flink 也支持 slot 的共享,即把不同任务根据任务的依赖关系分配到同一个 Slot 中。这样带来几个好处:方便统计当前任务所需的最大资源配置(某个子任务的最大并行度);避免 Slot 的过多申请与释放,提升 Slot 的使用效率。

Flink TaskManager Process

通过 Slot 共享,就有可能某个 Slot 中包含完整的任务执行链路。

3 应用执行

一个 Flink 应用就是用户编写的 main 函数,其中可能包含一个或多个 Flink 的任务。这些任务可以在本地执行,也可以在远程集群启动,集群既可以长期运行,也支持独立启动。下面是目前支持的任务提交方案:

Session 集群

  • 生命周期:集群事先创建并长期运行,客户端提交任务时与该集群连接。即使所有任务都执行完毕,集群仍会保持运行,除非手动停止。因此集群的生命周期与任务无关。

  • 资源隔离:TM 的 slot 由 RM 申请,当上面的任务执行完毕会自动进行释放。由于多个任务会共享相同的集群,因此任务间会存在竞争,比如网络带宽等。如果某个TM挂掉,上面的所有任务都会失败。

  • 其他方面:拥有提前创建的集群,可以避免每次使用的时候过多考虑集群问题。比较适合那些执行时间很短,对启动时间有比较高的要求的场景,比如交互式查询分析。

Per Job 集群

  • 生命周期:为每个提交的任务单独创建一个集群,客户端在提交任务时,直接与 ClusterManager 沟通申请创建 JM 并在内部运行提交的任务。TM则根据任务运行需要的资源延迟申请。一旦任务执行完毕,集群将会被回收。

  • 资源隔离:任务如果出现致命问题,仅会影响自己的任务。

  • 其他方面:由于 RM 需要申请和等待资源,因此启动时间会稍长,适合单个比较大、长时间运行、需要保证长期的稳定性、不在乎启动时间的任务。

Application 集群

  • 生命周期:与 Per Job 类似,只是 main() 方法运行在集群中。任务的提交程序很简单,不需要启动或连接集群,而是直接把应用程序打包到资源管理系统中并启动对应的 EntryPoint,在 EntryPoint 中调用用户程序的 main() 方法,解析生成 JobGraph,然后启动运行。集群的生命周期与应用相同。

  • 资源隔离:RM 和 Dispatcher 是应用级别。

Flink Core

Flink Source Code

  1. 在 Flink Client 中,通过反射启动 ja r中的 main 函数,生成 Flink StreamGraph 和 JobGraph,将 JobGraph 提交给 Flink 集群
  2. Flink 集群收到 JobGraph(JobManager 收到)后,将 JobGraph 翻译成 ExecutionGraph,然后开始调度,启动成功之后开始消费数据。

总结来说:Flink 核心执行流程,对用户 API 的调用可以转为 StreamGraph --> JobGraph --> ExecutionGraph。

  1. Local 方式:即本地提交模式,直接在 IDEA 运行代码。
  2. 远程提交方式
    • 分为 Standalone 方式
    • Yarn 方式
    • Yarn-perJob 模式
    • Yarn-Session 模式
    • Yarn-Application 模式
  3. K8s 方式

StreamGraph、JobGraph 全部是在 Flink Client 客户端生成的,即提交集群之前生成,原理图如下:
StreamGraph-JobGraph

那在 jobGraph 提交集群之前都经历哪些过程?

  1. 用户通过启动 Flink 集群,使用命令行提交作业,运行 flink run -c WordCount xxx.jar
  2. 运行命令行后,会通过 run 脚本调用 CliFrontend 入口,CliFrontend 会触发用户提交的 jar 文件中的 main 方法,然后交给 PipelineExecuteor#execute 方法,最终根据提交的模式选择触发一个具体的 PipelineExecutor 执行。
  3. 根据具体的 PipelineExecutor 执行,将对用户的代码进行编译生成 streamGraph,经过优化后生成 jobgraph。

具体流程图如下:
jobGraph 提交过程

看你提到 PipeExecutor,它有哪些实现类?

PipeExecutor 在 Flink 中被叫做 流水线执行器,它是一个接口,是 Flink Client 生成 JobGraph 之后,将作业提交给集群的重要环节,前面说过,作业提交到集群有好几种方式,最常用的是 yarn 方式,yarn 方式包含3种提交模式,主要使用 session 模式,perjob 模式。Application 模式 jaobGraph 是在集群中生成。 所以 PipeExecutor 的实现类如下图所示:(在代码中按 CTRL+H 就会出来)
Flink PipeExecutor
除了上述框的两种模式外,在 IDEA 环境中运行 Flink MiniCluster 进行调试时,使用 LocalExecutor。

Local 提交模式有啥特点,怎么实现的?

Local 是在本地 IDEA 环境中运行的提交方式,不上集群,主要用于调试,原理图如下:
Flink Local 提交模式

  1. Flink 程序由 JobClient 进行提交

  2. JobClient 将作业提交给 JobManager

  3. JobManager 负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的 TaskManager

  4. TaskManager 启动一个线程开始执行,TaskManager 会向 JobManager 报告状态更改,如开始执行,正在进行或者已完成。

  5. 作业执行完成后,结果将发送回客户端。

源码分析:通过 Flink1.12.2 源码进行分析的

  1. 创建获取对应的 StreamExecutionEnvironment 对象:LocalStreamEnvironment,调用 StreamExecutionEnvironment 对象的 execute 方法
    getExecutionEnvironment
    LocalStreamEnvironment
    execute

  2. 获取 streamGraph
    streamgraph

  3. 执行具体的 PipeLineExecutor ->得到 localExecutorFactory
    localExecutorFactory

  4. 获取 JobGraph
    根据 localExecutorFactory 的实现类 LocalExecutor 生成 JobGraph
    localExecutorFactory->jobgraph
    上面这部分全部是在 Flink Client 生成的,由于是使用 Local 模式提交。所有接下来将创建 MiniCluster 集群,由 miniCluster.submitJob 指定要提交的 jobGraph

  5. 实例化 MiniCluster 集群
    Flink-local-mode-minicluster

  6. 返回 JobClient 客户端
    在上面执行 miniCluster.submitJob 将 JobGraph 提交到本地集群后,会返回一个 JobClient 客户端,该 JobClient 包含了应用的一些详细信息,包括 JobID,应用的状态等等。最后返回到代码执行的上一层,对应类为 StreamExecutionEnvironment。

以上就是 Local 模式的源码执行过程。

远程提交模式都有哪些?

  • Standalone 方式
    • session 方式
  • yarn 方式
    • Yarn-perJob 模式
    • Yarn-Session 模式
    • Yarn-Application 模式
  • K8s 方式
    • session 模式

Standalone 模式简单介绍一下?

Standalone 模式为 Flink 集群的单机版提交方式,只使用一个节点进行提交,常用 Session 模式。
作业提交原理图如下:
flink-standalone-job-submit
提交命令如下:

bin/flink run org.apache.flink.WordCount xxx.jar
  1. client 客户端提交任务给 JobManager

  2. JobManager 负责申请任务运行所需要的资源并管理任务和资源,

  3. JobManager 分发任务给 TaskManager 执行

  4. TaskManager 定期向 JobManager 汇报状态

Yarn 集群提交方式介绍一下?

通过 yarn 集群提交分为 3 种提交方式:分别为 session 模式、perjob 模式、application 模式

yarn-session 模式特点?

提交命令如下:

./bin/flink run -t yarn-session \
  -D yarn.application.id=application_XXXX_YY xxx.jar

Yarn-Session 模式:所有作业共享集群资源,隔离性差,JM 负载瓶颈,main 方法在客户端执行。适合执行时间短,频繁执行的短任务,集群中的所有作业只有一个 JobManager,另外,Job 被随机分配给 TaskManager
特点:
Session-Cluster 模式需要先启动集群,然后再提交作业,接着会向 yarn 申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享 Dispatcher 和 ResourceManager;共享资源;适合规模小执行时间短的作业。
原理图如下:
Yarn-Session

yarn-perJob 模式特点?

提交命令:

./bin/flink run -t yarn-per-job --detached  xxx.jar

Yarn-Per-Job 模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在客户端执行。在 per-job 模式下,每个 Job 都有一个 JobManager,每个 TaskManager 只有单个 Job。
特点:
一个任务会对应一个 Job,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
原理图如下:
Flink-Yarn-PerJob

yarn-application 模式特点?

提交命令如下:

./bin/flink run-application -t yarn-application xxx.jar

Yarn-Application 模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在 JobManager 上执行特点:

在 yarn-per-job 和 yarn-session 模式下,客户端都需要执行以下三步,即: 1. 获取作业所需的依赖项; 2. 通过执行环境分析并取得逻辑计划,即 StreamGraph→JobGraph; 3. 将依赖项和 JobGraph 上传到集群中。

Flink-Yarn-Application
只有在这些都完成之后,才会通过 env.execute() 方法 触发 Flink 运行时真正地开始执行作业。如果所有用户都在同一个客户端上提交作业,较大的依赖会消耗更多的带宽,而较复杂的作业逻辑翻译成 JobGraph 也需要吃掉更多的 CPU 和内存,客户端的资源反而会成为瓶颈
为了解决它,社区在传统部署模式的基础上实现了 Application 模式。原本需要客户端做的三件事被转移到了 JobManager 里,也就是说 main() 方法在集群中执行(入口点位于 ApplicationClusterEntryPoint),客户端只需要负责发起部署请求了
原理图如下:
Flink Yarn Application Job Submit

综上所述,Flink 社区比较推荐使用 yarn-perjob 或者 yarn-application 模式进行提交应用。

yarn-session 提交流程详细介绍一下?

提交流程图如下
Flink Yarn-Session Job Submit

启动集群

  1. Flink Client 向 Yarn ResourceManager 提交任务信息。

    1) Flink Client将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户Jar文件、JobGraph对象等)上传至分布式存储HDFS中。

    2) Flink Client向Yarn ResourceManager提交任务信息

  2. Yarn 启动 Flink 集群,做 2 步操作:

    1) 通过Yarn Client 向Yarn ResourceManager提交Flink创建集群的申请,Yarn ResourceManager 分配Container 资源,并通知对应的NodeManager上启动一个ApplicationMaster(每提交一个flink job 就会启动一个applicationMaster),ApplicationMaster会包含当前要启动的 JobManager和 Flink自己内部要使用的ResourceManager。

    2) 在JobManager 进程中运行 YarnSessionClusterEntryPoint 作为集群启动的入口。初始化Dispatcher,Flink自己内部要使用的ResourceManager,启动相关RPC服务,等待Flink Client 通过Rest接口提交JobGraph。

作业提交

  1. Flink Client 通过 Rest 向 Dispatcher 提交编译好的 JobGraph。Dispatcher 是 Rest 接口,不负责实际的调度、指定工作。

  2. Dispatcher 收到 JobGraph 后,为作业创建一个 JobMaster,将工作交给 JobMaster,JobMaster负责作业调度,管理作业和Task的生命周期,构建 ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构)

以上两步执行完后,作业进入调度执行阶段。

作业调度执行

  1. JobMaster向ResourceManager申请资源,开始调度ExecutionGraph

  2. ResourceManager将资源请求加入等待队列,通过心跳向YarnResourceManager申请新的Container来启动TaskManager进程。

  3. YarnResourceManager启动,然后从HDFS加载Jar文件等所需相关资源,在容器中启动TaskManager,TaskManager启动TaskExecutor

  4. TaskManager启动后,向ResourceManager 注册,并把自己的Slot资源情况汇报给ResourceManager。

  5. ResourceManager从等待队列取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给哪个JobMaster。

  6. TaskManager向JobMaster回复自己的一个Slot属于你这个任务,JobMaser会将Slot缓存到SlotPool。

  7. JobMaster调度Task到TaskMnager的Slot上执行。

yarn-perjob 提交流程详细介绍一下?

提交命令如下:

./bin/flink run -t yarn-per-job --detached xxx.jar

Flink Yarn-PerJob-Job-Submit

启动集群

  1. Flink Client 向 Yarn ResourceManager 提交任务信息。

    1) Flink Client将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户Jar文件、JobGraph对象等)上传至分布式存储HDFS中。

    2) Flink Client向Yarn ResourceManager提交任务信息

  2. Yarn 启动 Flink集群,做2步操作:

    1) 通过Yarn Client 向Yarn ResourceManager提交Flink创建集群的申请,Yarn ResourceManager 分配Container 资源,并通知对应的NodeManager上启动一个ApplicationMaster(每提交一个Flink job 就会启动一个ApplicationMaster),ApplicationMaster会包含当前要启动的 JobManager和 Flink自己内部要使用的ResourceManager。

    2) 在JobManager 进程中运行 YarnJobClusterEntryPoint 作为集群启动的入口。初始化Dispatcher,Flink自己内部要使用的ResourceManager,启动相关RPC服务,等待Flink Client 通过Rest接口提交JobGraph。

作业提交

ApplicationMaster启动Dispatcher,Dispatcher启动ResourceManager和JobMaster(该步和Session不同,Jabmaster是由Dispatcher拉起,而不是Client传过来的)。JobMaster负责作业调度,管理作业和Task的生命周期,构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构)

以上两步执行完后,作业进入调度执行阶段。

作业调度执行

  1. JobMaster向ResourceManager申请Slot资源,开始调度ExecutionGraph

  2. ResourceManager将资源请求加入等待队列,通过心跳向YarnResourceManager申请新的Container来启动TaskManager进程。

  3. YarnResourceManager启动,然后从HDFS加载Jar文件等所需相关资源,在容器中启动TaskManager。

  4. TaskManager在内部启动TaskExecutor。

  5. TaskManager启动后,向ResourceManager 注册,并把自己的Slot资源情况汇报给ResourceManager。

  6. ResourceManager从等待队列取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给哪个JobMaster。

  7. TaskManager向JobMaster回复自己的一个Slot属于你这个任务,JobMaser会将Slot缓存到SlotPool。

  8. JobMaster调度Task到TaskMnager的Slot上执行。