spark统一内存管理模型

参考:https://www.iteblog.com/archives/2342.html
spark新的内存管理框架(1.6以上)上使用两个参数来控制spark.memory.fraction和spark.memory.storageFraction。
spark.storage.memoryFraction spark.shuffle.memoryFraction不再使用
在spark1.6之前采用的是静态内存模型,1.6之后采用的是统一内存模型
概述
统一内存管理模块包括了堆内内存(On-heap Memory)和堆外内存(Off-heap Memory)两大区域
对于堆内内存

  • System Memory = executor-memory = (Eden + FROM +TO) = execution-memory + storage-memory + user-memory + reserver-memory
  • spark-memory = execution-memory + storage-memory
  • reserver-memory固定大小为200M
  • user-meomry = (SystemMemory-200m)* (1-spark.memory.fraction)
  • spark-meomry = (SystemMemory-200m)*spark.memory.fraction
  • storage-memory = spark-memory * spark.memory.storageFraction
    -execution-memory = spark-memory * (1-spark.memory.storageFraction)
On-heap Memory 堆内内存
默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块:
  • Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据
  • Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;
  • 用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。
  • 预留内存(Reserved Memory):系统预留内存,会用来存储Spark内部对象。
systemMemory = Runtime.getRuntime.maxMemory,其实就是通过参数 spark.executor.memory 或 --executor-memory 配置的。
reservedMemory 在 Spark 2.2.1 中是写死的,其值等于 300MB,这个值是不能修改的(如果在测试环境下,我们可以通过 spark.testing.reservedMemory 参数进行修改);
usableMemory = systemMemory - reservedMemory,这个就是 Spark 可用内存
Off-heap Memory 堆外内存
java的unsafe相关的api使用的是堆外内存
这种模式不在 JVM 内申请内存,而是调用 Java 的 unsafe 相关 API 进行诸如 C 语言里面的 malloc() 直接向操作系统申请内存,由于这种方式不进过 JVM 内存管理,所以可以避免频繁的 GC,这种内存申请的缺点是必须自己编写内存申请和释放的逻辑
默认情况下,堆外内存是关闭的,我们可以通过 spark.memory.offHeap.enabled 参数启用,并且通过 spark.memory.offHeap.size 设置堆外内存大小,单位为字节。如果堆外内存被启用,那么 Executor 内将同时存在堆内和堆外内存,两者的使用互补影响,这个时候 Executor 中的 Execution 内存是堆内的 Execution 内存和堆外的 Execution 内存之和,同理,Storage 内存也一样。相比堆内内存,堆外内存只区分 Execution 内存和 Storage 内存
Execution 内存和 Storage 内存动态调整
具体的实现逻辑如下:
程序提交的时候我们都会设定基本的 Execution 内存和 Storage 内存区域(通过 spark.memory.storageFraction 参数设置);
在程序运行时,如果双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策略是按照 LRU 规则进行的。若己方空间不足而对方空余时,可借用对方的空间; (存储空间不足是指不足以放下一个完整的 Block)
Execution 内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间
Storage 内存的空间被对方占用后,目前的实现是无法让对方"归还",因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂;而且 Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用。
注意,上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。
一个示例
现在我们提交的 Spark 作业关于内存的配置如下:
--executor-memory 18g
由于没有设置 spark.memory.fraction 和 spark.memory.storageFraction 参数
上图很清楚地看到 Storage Memory 的可用内存是 10.1GB,这个数是咋来的呢?根据前面的规则,我们可以得出以下的计算:
systemMemory = spark.executor.memory
reservedMemory = 300MB
usableMemory = systemMemory - reservedMemory
StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction
如果我们把数据代进去,得出以下的结果:
systemMemory = 18Gb = 19327352832 字节
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 19327352832 - 314572800 = 19012780032
StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction
= 19012780032 * 0.6 * 0.5 = 5703834009.6 = 5.312109375GB
不对啊,和上面的 10.1GB 对不上啊。为什么呢?这是因为 Spark UI 上面显示的 Storage Memory 可用内存其实等于 Execution 内存和 Storage 内存之和,也就是 usableMemory * spark.memory.fraction:
StorageMemory= usableMemory * spark.memory.fraction
= 19012780032 * 0.6 = 11407668019.2 = 10.62421GB
还是不对,这是因为我们虽然设置了 --executor-memory 18g,但是 Spark 的 Executor 端通过 Runtime.getRuntime.maxMemory 拿到的内存其实没这么大,只有 17179869184 字节,所以 systemMemory = 17179869184,然后计算的数据如下:
systemMemory = 17179869184 字节
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
StorageMemory= usableMemory * spark.memory.fraction
= 16865296384 * 0.6 = 9.42421875 GB
我们通过将上面的 16865296384 * 0.6 字节除于 1024 * 1024 * 1024 转换成 9.42421875 GB,和 UI 上显示的还是对不上,这是因为 Spark UI 是通过除于 1000 * 1000 * 1000 将字节转换成 GB,如下:
systemMemory = 17179869184 字节
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
StorageMemory= usableMemory * spark.memory.fraction
= 16865296384 * 0.6 字节 = 16865296384 * 0.6 / (1000 * 1000 * 1000) = 10.1GB
我们设置了 --executor-memory 18g,但是 Spark 的 Executor 端通过 Runtime.getRuntime.maxMemory 拿到的内存其实没这么大,只有 17179869184 字节,这个数据是怎么计算的? Runtime.getRuntime.maxMemory 是程序能够使用的最大内存,其值会比实际配置的执行器内存的值小。这是因为内存分配池的堆部分分为 Eden,Survivor 和 Tenured 三部分空间,而这里面一共包含了两个 Survivor 区域,而这两个 Survivor 区域在任何时候我们只能用到其中一个,所以我们可以使用下面的公式进行描述:ExecutorMemory = Eden + 2 * Survivor + Tenured Runtime.getRuntime.maxMemory =Eden + Survivor + Tenured 上面的 17179869184 字节可能因为你的 GC 配置不一样得到的数据不一样,但是上面的计算公式是一样的

使用了堆外内存
用了堆内和堆外内存
现在如果我们启用了堆外内存,情况咋样呢?我们的内存相关配置如下:
spark.executor.memory 18g
spark.memory.offHeap.enabled true
spark.memory.offHeap.size 10737418240
其实 Spark UI 上面显示的 Storage Memory 可用内存等于堆内内存和堆外内存之和,计算公式如下:
堆内
systemMemory = 17179869184 字节
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
totalOnHeapStorageMemory = usableMemory * spark.memory.fraction
= 16865296384 * 0.6 = 10119177830
堆外
totalOffHeapStorageMemory = spark.memory.offHeap.size = 10737418240
【spark统一内存管理模型】StorageMemory = totalOnHeapStorageMemory + totalOffHeapStorageMemory
= (10119177830 + 10737418240) 字节
= (20856596070 / (1000 * 1000 * 1000)) GB
= 20.9 GB

    推荐阅读