From 6bd3b32b5332ebf97743a828f417dd6df8b4a9c4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 1 Jul 2026 14:43:13 -0700 Subject: [PATCH] [SPARK-57867][CORE] Driver should not reserve off-heap memory in non-local mode --- .../scala/org/apache/spark/SparkContext.scala | 6 +++++- .../scala/org/apache/spark/SparkEnv.scala | 11 ++++++++-- .../org/apache/spark/SparkContextSuite.scala | 20 +++++++++++++++++++ .../plugin/PluginContainerSuite.scala | 5 +++-- 4 files changed, 37 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0a701e1967bdf..3610c676c40fb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -590,7 +590,11 @@ class SparkContext(config: SparkConf) extends Logging { _plugins = PluginContainer(this, _resources.asJava) _resourceProfileManager = new ResourceProfileManager(_conf, _listenerBus) _env.initializeShuffleManager() - _env.initializeMemoryManager(SparkContext.numDriverCores(master, conf)) + // The driver in non-local deployments never runs tasks or stores off-heap blocks, and its + // container memory is not sized for spark.memory.offHeap.size, so don't reserve off-heap + // memory there. + _env.initializeMemoryManager( + SparkContext.numDriverCores(master, conf), offHeapAllowed = isLocal) // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 39f12e03a9336..2bc10adcbe3a5 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -347,10 +347,17 @@ class SparkEnv ( } } - private[spark] def initializeMemoryManager(numUsableCores: Int): Unit = { + private[spark] def initializeMemoryManager( + numUsableCores: Int, + offHeapAllowed: Boolean = true): Unit = { Preconditions.checkState(null == memoryManager, "Memory manager already initialized to %s", _memoryManager) - _memoryManager = UnifiedMemoryManager(conf, numUsableCores) + val memoryManagerConf = if (offHeapAllowed) { + conf + } else { + conf.clone.set(MEMORY_OFFHEAP_ENABLED, false).set(MEMORY_OFFHEAP_SIZE, 0L) + } + _memoryManager = UnifiedMemoryManager(memoryManagerConf, numUsableCores) } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 36a1a1d07daa1..517506b0f26f4 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -1481,6 +1481,26 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu }.getMessage assert(m.contains("Number of cores to allocate for each task should be positive.")) } + + test("SPARK-57867: Driver should not reserve off-heap memory in non-local mode") { + val conf = new SparkConf() + .setAppName("test") + .setMaster("local-cluster[1,1,1024]") + .set(MEMORY_OFFHEAP_ENABLED, true) + .set(MEMORY_OFFHEAP_SIZE, 5L * 1024 * 1024) + sc = new SparkContext(conf) + assert(sc.env.memoryManager.maxOffHeapStorageMemory === 0) + } + + test("SPARK-57867: Driver should reserve off-heap memory in local mode") { + val conf = new SparkConf() + .setAppName("test") + .setMaster("local") + .set(MEMORY_OFFHEAP_ENABLED, true) + .set(MEMORY_OFFHEAP_SIZE, 5L * 1024 * 1024) + sc = new SparkContext(conf) + assert(sc.env.memoryManager.maxOffHeapStorageMemory > 0) + } } object SparkContextSuite { diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index 700a17649b760..b1c5683cb2256 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -244,8 +244,9 @@ class PluginContainerSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(conf) val memoryManager = sc.env.memoryManager - assert(memoryManager.tungstenMemoryMode == MemoryMode.OFF_HEAP) - assert(memoryManager.maxOffHeapStorageMemory == MemoryOverridePlugin.offHeapMemory) + // SPARK-57867: The driver does not reserve off-heap memory in non-local mode + assert(memoryManager.tungstenMemoryMode == MemoryMode.ON_HEAP) + assert(memoryManager.maxOffHeapStorageMemory == 0) val defaultResourceProfile = sc.resourceProfileManager.defaultResourceProfile assert(512L ==