除了大家都知道的基于内存运算外,真正的原因是Spark将复杂任务分解成为一个DAG ,通过其特性实现了快速处理的能力。
DAG 提供高效内存计算能力的底层原理与实现
DAG(Directed Acyclic Graph,直接有向无环图) 是现代分布式计算框架(如 Apache Spark、Flink 等)高效内存计算能力的核心。它不仅优化了计算任务的执行流程,还减少了不必要的磁盘读写,从而实现高性能计算。以下是从底层原理到源代码实现的全面解析。
1. DAG 的基本概念
1.1 什么是 DAG?
DAG 是一个数据结构,由节点和有向边组成,且无环。节点(Node):表示计算操作或任务(如数据过滤、聚合、排序等)。有向边(Edge):表示数据流动,定义了任务之间的依赖关系。1.2 DAG 的作用
任务调度优化:清晰定义任务之间的依赖关系,避免不必要的重复计算。并行计算:基于 DAG 分解任务,调度器可以并行执行无依赖的任务。内存计算:通过中间数据缓存,减少磁盘 I/O。2. 高效内存计算的底层原理
2.1 传统 MapReduce 的问题
MapReduce 采用严格的两阶段处理:Map 和 Reduce,每个阶段的中间结果都会写入磁盘。磁盘 I/O 成为性能瓶颈,特别是迭代计算或多阶段任务时。2.2 DAG 的改进
DAG 提供了一种更灵活的任务执行模型:
任务分解:
将一个复杂计算分解为多个基本操作(如 Map、Filter、Join)。使用 DAG 表示这些操作的依赖关系。数据缓存:
中间结果可以直接存储在内存中,而不是写入磁盘。在重复使用数据时(如机器学习的迭代计算),无需重新读取磁盘。任务优化:
DAG 优化器会重排任务顺序、合并操作(如将多个 Map 操作合并为一个)、消除无效计算。3. 源代码与实现解析:以 Spark 为例
3.1 RDD:数据模型
在 Spark 中,DAG 是通过 RDD(Resilient Distributed Dataset) 来构建的。RDD 的特点:
不可变性:每个 RDD 是只读的,所有变换操作都会生成新的 RDD。分区(Partition):RDD 被分区存储,分区是计算的基本单位。延迟计算(Lazy Evaluation):DAG 的构建是延迟的,只有在触发 Action 操作(如 count 或 collect)时才会开始计算。RDD 源代码示例:
val rdd = sc.textFile("data.txt")
.filter(line => line.contains("error"))
.map(line => line.split(",")(1))
rdd.collect()
上述代码并不会立即执行,而是构建一个 DAG,直到 collect() 时才触发执行。
3.2 DAG 构建
在 Spark 中,DAG 是由一系列 Transformation(变换)操作(如 map、filter)构建的:
Transformation:延迟操作,描述数据如何从一个 RDD 转换到另一个 RDD。Action:触发操作,启动 DAG 的执行。DAG 构建的实现流程:
逻辑计划(Logical Plan):每个 RDD 的 Transformation 会记录在逻辑计划中。阶段划分(Stage Division):根据窄依赖(数据可以直接传递)和宽依赖(需要 shuffle)划分为多个阶段。任务调度(Task Scheduling):为每个分区生成一个独立的任务,依赖于 DAG 调度器。核心代码(简化):
// 逻辑计划构建
val dagScheduler = new DAGScheduler()
val finalStage = dagScheduler.newStage(rdd, task)
dagScheduler.submitStage(finalStage)
3.3 DAG 优化
DAG 优化是 Spark 高效计算的关键步骤,分为以下几部分:
(1) 合并窄依赖任务
窄依赖:父分区直接传递给子分区,无需全局数据交换。通过合并窄依赖,减少中间数据传输。示例:
rdd.map(...).filter(...)
这两个操作会被合并为一个任务,避免中间结果的存储。
(2) 延迟计算与任务分配
DAG 的延迟计算允许 Spark 收集所有操作后再全局优化。优化点:减少 Shuffle 操作。核心代码逻辑:
override def submitJob(rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Seq[Int]): Unit = {
// 延迟提交任务
val dag = new ResultStage(...)
dagScheduler.submitStage(dag)
}
(3) 缓存中间数据
DAG 执行时会判断哪些数据需要重复使用,并将其缓存到内存中。如果内存不足,Spark 会使用 LRU 策略将数据写入磁盘。代码实现:
rdd.persist(StorageLevel.MEMORY_AND_DISK)
3.4 DAG 执行
DAG 执行过程
Spark 会从最后一个 Action 开始,向前递归地执行 DAG。先执行所有无依赖的任务(叶子节点),然后逐步向上合并结果。
容错机制
每个 RDD 包含其父 RDD 的依赖关系(称为 Lineage)。如果某个分区的计算失败,Spark 会重新根据 DAG 计算该分区的数据,而无需重新执行整个任务。源代码片段:
override def runTask(context: TaskContext): Unit = {
val result = parentRDD.iterator(partition, context)
context.write(result)
}
4. 为什么 DAG 提供高效的内存计算能力?
减少磁盘 I/O:
中间数据存储在内存中,避免了传统 MapReduce 的磁盘写入和读取。例如,机器学习中的迭代计算(如梯度下降),不需要每次都重新加载数据。
任务优化:
合并窄依赖,减少任务数量。优化 Shuffle 阶段,减少全局数据交换。
并行执行:
无依赖的任务可以并行执行,充分利用集群资源。
容错机制:
通过 DAG 的 Lineage(血统)追踪,可以快速恢复失败的任务。5. 打个比方,方便理解
DAG 的核心价值:
DAG 通过分解任务、优化执行顺序、缓存中间结果等手段,实现高效计算,特别是迭代计算场景中,性能提升显著。
通俗解释:
传统的任务执行像流水线,结果需要保存后再执行下一步,浪费时间。DAG 则像流水工厂,根据所有任务的关系,设计出最快的流程,直接把结果传递下去,不需要保存中间步骤。通过 DAG 的高效内存计算能力,现代分布式计算框架能够在海量数据处理上实现性能的飞跃。