Spark学习笔记


一、Spark概述

Spark 风雨十年

1.1 十年发展

加州伯克利分校AMP实验室开发的通用大数据处理框架。

image-20220903121109356

1.2 Spark 风雨十年

Spark从2016年开始后开源问题提交量最多的项目。

image-20220903122723773

Sparak和Hadoop区别

1.1 技术栈区别

Hadoop Spark
类型 基础平台,包含计算,存储,调度 纯计算工具
场景 海量数据批处理(磁盘迭代计算) 海量数据批处理(内存迭代几段、交互式计算)、海量数据流计算
价格 便宜 对内存有要求,价格高
编程范式 MR,API较为底层,算法适应性差 RDD组成DAG有向无环图,API较为顶层,方便使用
数据存储结构 MapReduce中间计算结果在HDFS磁盘上,延迟大 RDD中间运算结果在内存中,延迟小
运行方式 Task以进程方式维护,任务启动慢 Task以线程方式维护,任务启动快,可批量创建提高并行能力。

Spark 4大特点

特点

  • 速度快
    • Spark比MR快100倍,硬盘快10倍
      • 1.可以将结果放在内存中处理
      • 2.提供了丰富的API算子,做到在一个Spark程序中完成
  • 易于使用
  • 通用性强
    • Spark 提供了Spark SQL、Spark Streaming 、MLib及GraphX在内的多个工具库,我们可以在一个应用中无缝地使用这些工具库。
    • image-20220903135147353
  • 运行方式
    • Spark支持多种运行方式,包括在Hasoop和Mesos上,也支持Standalone的独立运行模式,同时也可以运行在K8S上。
    • Sprak可以读取多种数据源
    • image-20220903135518793

Spark 框架模块

模块组成

  • Spark Core
    • Spark的核心,提供结构化数据处理模块。以RDD为数据抽象提供各种语言API,实现海量离线数据批处理计算。
  • Spark SQL
    • 基于Core,支持以Core对数据进行处理,针对的是离线场景。Spark提供了StructuredStreaming,可以用SparkSQL为基础进行流式计算。
  • Spark Streaming
    • 基于Core为基础,提供流式计算(诞生较早。有一定缺陷)
  • Spark GraphX
    • 图计算
  • Spark MLib
    • 机器学习模块

Spark 运行模式

运行模式

image-20220903140838277

  • 本地模式(Local 开发和测试)
    • 以一个独立的进程,通过内部的多个线程来模拟整个Spark运行时环境。
  • Standalone(集群)
    • 各个角色独立进程形式的存在,并组成Spark集群环境。
  • Hadoop Yarn模式
    • 各个角色运行在Yarn容器内部,并组成Spark集群环境。
  • K8S(容器集群)
    • 各个角色运行在K8S容器内部,并组成Spark集群环境
  • 云服务模式(运行在云平台上)

Spark 节点角色

image-20220903141439654

资源层面

  • Master角色:集群资源管理
  • Worker的角色:单机资源管理

Worker中任务运行层面

  • Driver:单个任务的管理者

  • Executor: 任务运行功能

总结

Spark 解决了海量数据的计算,可以进行理想批处理以及实时流计算。

Spark 5大模块

Spark 4大特点

Spark 3大运行模式

Spark 4种运行角色

二、Spark部署

Spark Local模式

image-20220904095541173

Local模式下角色分布:

资源管理:

  • Master:Local进程本身

  • Worker:Local进程本身

任务执行:

  • Driver:Local进程本身
  • Executor:不存在,没有独立的Executor角色,由Local进程内的线程提供计算能力

注意:Driver也是一直特殊的Executor,只不过多数时候,我们将Executor当做纯Worker来对待。

image-20220904100015803

Spark Standalone 模式

image-20220904100518841

StandAlone是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多个机器集群,用于实际的大数据处理。

Standalone模式下角色分布:

  • 主节点
    • Master角色,管理整个集群资源,并托管运行各个任务的Driver
  • 从节点Workers
    • 管理每个极其的资源,分配对应的资源来运行Executor
    • 每个从节点分配资源信息给Worker,资源信息包含内存Memory和CPU Cores核数
  • 历史服务器HistorySever(可选)
    • 保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行和相关信息
  • image-20220904183429392

其他

1. RDD:

1.2.1 RDD概念

Spark 理论基石 —— RDD

  • RDD学名可伸缩的分布式数据集(Resilient Distributed Dataset)。是一种对数据集形态的抽象,基于此抽象,使用者可以在集群中执行一系列计算,而不用将中间结果落盘。
  • 对于分布式系统,容错支持是必不可少的。为了支持容错,RDD 只支持粗粒度的变换。即,输入数据集是 immutable (或者说只读)的,每次运算会产生新的输出。不支持对一个数据集中细粒度的更新操作。这种约束,大大简化了容错支持,并且能满足很大一类的计算需求。
  • RDD 是一个基于分区的、只读的数据记录集抽象。RDD 只可以通过对持久存储或其他 RDD 进行确定性运算得来,这种运算被称为变换。常用的变换算子包括:map,filter 和 join。
  • RDD 没有选择不断的做检查点以进行容错,而是会记下 RDD 从最初的外存的数据集变化而来的变化路径,也就是其谱系(lineage)。理论上所有的 RDD 都可以在出错后从外存中依据谱系图进行重建。一般来说,重建的粒度是分区(Partition)而非整个数据集,一来代价更小,二来不同分区可能在不同机器上。
  • 用户可以对 RDD 的两个方面进行控制:持久化和分区控制。对于前者,如果某些 RDD 需要复用,那么用户可以指示系统按照某种策略将其进行持久化。后者来说,用户可以定制分区路由函数,将数据集合中的记录按照某个键值路由到不同分区。比如进行 Join 操作的时候,可以讲待 Join 数据集按照相同的策略进行分区,以并行 Join。
  • 数据落盘
    • 在很多的场景中,我们经常要确保数据已经安全的写到磁盘,以便在系统宕机或重启之后还能读到这些数据。但是我们知道,linux系统的IO路径很复杂,并且分为很多层,每一层可能都会有buffer来加速IO读写。因此,想要将数据安全的写到磁盘,并不是简单调一个write/fwrite就可以搞定的。

1.2.2 举例

假设我们利用 Spark 接口相对存在于 HDFS 上的日志文件,找出错误条目,针对出现 hdfs 关键字的具体条目进行分析。

//基于某个 hdfs 上的文件定义一个 rdd(每一行作为集合中的一个条目)
lines = spark.textFile("hdfs://...") 
//第二行通过 filter 变换生成新的 rdd
errors = lines.filter(_.startsWith("ERROR"))
//第三行请求 spark 将其结果进行暂存
errors.persist()

// 以一个 collect 的动作结尾,求出包含 HDFS 关键字的所有行数的各个字段。
//(assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
      .map(_.split(’\t’)(3))
      .collect()
  • 第一行基于某个 hdfs 上的文件定义一个 rdd(每一行作为集合中的一个条目)。
  • 第二行通过 filter 变换生成新的 rdd
  • 第三行请求 spark 将其结果进行暂存
  • 最后一行以一个 collect 的动作结尾,求出包含 HDFS 关键字的所有行数的各个字段。

其计算谱系图如下:

img

有两点需要注意:

  1. 直到遇到 collect 这个动作(Action)之前都没有发生实际的运算。
  2. 链式操作时不保存中间结果;

由于第三行将结果在内存中进行了缓存,因此还可以基于此做其他动作。比如,计算包含 ‘MySQL’ 关键字的错误条数:

// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()

1.2.3 RDD 模型的优点

  1. 使用 lineage 来按需恢复数据,而不用定期 snapshot,减小了不必要开销。
  2. 每个 Partition 出错后可以单独进行恢复,而不用进行全数据集的重建。
  3. RDD 舍弃了对任意内存位置进行更新,只允许批量的写入数据,从而提高了容错效率
  4. RDD 的不可变的特点允许系统叫较容易的对某些计算进行迁移,不用考虑一致性的问题。
  5. 由于只支持批量计算,因此调度系统可以比较好的利用数据局部性的特点加快运算速度。
  6. 由于只支持批量计算,因此调度系统可以比较好的利用数据局部性的特点加快运算速度。

1.2.4 Spark编程接口

  • Spark 通过暴露与编程语言集成的算子来提供操作 RDD 的接口。 其中 RDD 表现为编程语言中的类,而 RDD 的算子为作用于这些类上的函数。之前的系统如 DryadLINQ 和 FlumeJava 也使用了类似的形式。

  • 用户使用 RDD 时,首先将数据从持久化存储中通过变换Transformations,如 map 或者 filter)将其载入内存,然后可以对 RDD 施加任何系统支持的一系列变换,最后利用动作Action)算子,将 RDD 重新持久化到外存中或者将控制权交还用户。

  • 这个加载-变换-落盘的过程是声明式(Declarative,或者说是惰式[2])的,Spark 在拿到整个拓扑后会利用执行引擎进行执行优化(比如将并行化、流水线化,之后会进一步讨论)。

  • 此外很重要的一个接口是 persist,可以由用户来告诉系统哪些 RDD 需要持久化,如何持久化(本机硬盘还是跨机器存储),如果有多个 RDD 需要持久化,那么优先级如何确定。Spark 默认将 RDD 保存在内存中,如果内存不够用了会根据用户配置将数据溢出(spill)到硬盘上。

  • 声明式

    • 它可以看作是命令式的反面。曾有人言:一切非命令式,皆是声明式。从这个意义上说,越是偏离图灵机的图像越远的,就越是声明式的。
    • 所以,函数式编程(Functional Programming)是声明式的,因为它不使用可变状态,也不需要指定任何的执行顺序关系(可以假定所有的函数都是同时执行的,因为存在引用透明性,所谓的参数和变量都只是一堆符号的别名而已)。逻辑式编程(Logical Programming)也是声明式的,因为我们只需要通过facts和rules描述我们所需要解决的问题,具体的求解路径由编译器和程序运行时自动决定。
  • Spark 利用 Scala 语言作为 RDD 抽象的接口,因为 Scala 兼顾了精确(其函数式语义适合交互式场景)与高效(使用静态类型)。

  • 开发者利用 Spark 提供的库编写驱动程序 (driver programe)以使用 Spark。驱动程序会定义一到多个 RDD,并对其进行各种变换。Spark 提供的库会连接 Spark 集群,生成计算拓扑,并将拓扑分散到多个 workers 上去进行执行,同时记下变换的谱系(lineage)。这些 workers 是分散在 Spark 集群内各个机器上的常驻进程,它们在内存里保存计算过程中生成的 RDD 的各个分区。

  • img

1.2.5 RDD的表示

RDD 抽象的核心组成主要有以下五个部分:

  1. 分区集(partition set)。分区是每个 RDD 的最小构成单元。
  2. 依赖集(dependencies set)。主要是 RDD 间的父子依赖关系。
  3. 变换函数(compute function)。作用于分区上的变换函数,可以由几个父分区计算得到一个子分区。
  4. 分区模式(partition scheme)。该 RDD 的分区是基于哈希分片的还是直接切分的。
  5. 数据放置(data placement)。知道分区的存放位置可以进行计算优化。

在 RDD 的接口设计中最有趣的一个点是如何对 RDD 间的依赖关系进行规约。最后发现可以将所有依赖归纳为两种类型:

  1. 窄依赖(narrow dependencies):父 RDD 的分区最多被一个子 RDD 的分区所依赖,比如 map
  2. 宽依赖(wide dependencies):父 RDD 的分区可能被多个子 RDD 的分区所依赖,比如 join

preview

调度优化。对于窄依赖,可以对分区间进行并行流水化调度,先计完成某个窄依赖算子(比如说 map)的分区不用等待其他分区而直接进行下一个窄依赖算子(比如 filter )的运算。与之相对,宽依赖的要求父 RDD 的所有分区就绪,并进行跨节点的传送后,才能进行计算。类似于 MapReduce 中的 shuffle。

数据恢复。在某个分区出现错误或者丢失时,窄依赖的恢复更为高效。因为涉及到的父分区相对较少,并且可以并行恢复。而对于宽依赖,由于依赖复杂(如上图,子 RDD 的每个分区都会依赖父 RDD 的所有分区),一个分区的丢失可能就会引起全盘的重新计算。

HDFS 文件partitions 函数返回 HDFS 文件的所有 block,每个 block 被当做一个 partition。 preferredLocations 返回每个 block 所在的位置,Iterator 会对每个 block 进行读取。

map:在任意 RDD 上调用 map 会返回一个 MappedRDD 对象,该对象的 partitions 函数和 preferredLocations 与父 RDD 保持一致。对于 iterator,只需要将传给 map 算子的函数以此作用到其父 RDD 的各个分区即可。

union: 在两个 RDD 上调用 union 会返回一个新的 RDD,该 RDD 的每个分区由对应的两个父 RDD 通过窄依赖计算而来。

sample:抽样函数和 map 大体一致。但该函数会给每个分区保存一个随机数种子来决定父 RDD 的每个记录是否保留。

join:在两个 RDD 上调用 join 操作可能会导致两个窄依赖(比如其分区都是按待 join 的key 哈希的),两个宽依赖,或者混合依赖。每种情况下,子 RDD 都会有一个 partitioner 函数,或继承自父分区,或是默认的hash 分区函数。


yg9538 2022年9月4日 23:01 1036 收藏文档