Deep Dive into Apache Spark Memory Allocation
In this blog post, I will explain the memory allocation for the Spark driver and executor. If you are here, I assume you are already familiar with Apache Spark, its architecture, and why Spark needs memory. However, I will quickly revise a few concepts to bring all readers to the same page. Let’s start with the following question.
What is Apache Spark
Apache Spark is an open-source, distributed processing system for big data workloads. It utilizes in-memory data processing and caching for fast analytic queries against data of any size. It provides development APIs in Java, Scala, Python, and R. You can use Apache Spark across multiple workload types, including batch processing, interactive queries, real-time analytics, machine learning, and graph processing. Over time, Apache Spark has become one of the most popular big data distributed processing frameworks.
Apache Spark Application Architecture
Apache Spark is a distributed processing engine, and every Spark application runs using a master/worker architecture. In this architecture, your application will have the following components.
- Application Driver – Only one per Spark application
- Spark Workers – One or more per Spark application
The following figure explains the Spark runtime architecture for a PySpark application. If you created your Spark application in Java/Scala, you could remove the green boxes from the below diagram to represent a runtime architecture of your Spark application.
How do you submit the Apache Spark application?
Spark developers can create Spark applications and test them on their local machines. However, at the end of the development, you must deploy your application in a test or production cluster. But how will you do that?
We have many ways to submit a Apache Spark application to your cluster. However, the most commonly used method is the spark-submit command-line tool. The spark-submit is a command-line tool that allows you to submit the Spark application to the cluster. Here is a general structure of the spark-submit command.
spark-submit --class <main-class> --master <master-url> --deploy-mode <deploy-mode> <application-jar> [application-arguments]
This is a minimalist example with the most commonly used options. So you can use the spark-submit command and pass some configuration options using — in front of the option name. The second last argument is your application jar or a PySpark script. Finally, you have a command-line argument for your application. The example uses only three configurations, but here is a list of the most commonly used options.
--class --master --deploy-mode --conf --driver-cores --driver-memory --num-executors --executor-cores --executor-memory
The class option is only valid for Java or Scala and is not required when submitting a PySpark application. This class option tells the driving class name where you defined your main() method for java and scala.
The master option is to tell the cluster manager. If you are using YARN, the value of your master is yarn. But if you want to run your application on the local machine, you should set the master to local. You can also use local to run spark locally with three threads.
The deploy mode takes one of the following two configurations.
Remember. You have only two deploy modes. I see people often talking about the local mode. But the local is a master configuration and not a valid deploy mode.
The conf option allows you to set additional spark configurations. For example, you can set spark.executor.memoryOverhead = 0.20 using the –conf. The default value for spark.executor.memoryOverhead is 0.10. I will cover overhead memory configuration in a later part of this article.
Now comes the resource allocation options. Apache Spark application runs as one driver and one or more executors. So you can use the driver-cores option to ask for CPU cores for your driver container. Similarly, using the driver-memory option, you can ask for RAM for the driver container.
Then we have three options for executors. The num-executors is to set the required number of executors. And the executor-cores and executor-memory are for the CPU and memory of the executor containers.
Let’s look at an example.
spark-submit --master yarn --deploy-mode cluster --driver-cores 2 --driver-memory 8g --num-executors 4 --executor-cores 4 --executor-memory 16g hello-spark.py
And below is an equivalent version of the Scala application.
spark-submit --class guru.learningjournal.HelloSpark --master yarn --deploy-mode cluster --driver-cores 2 --driver-memory 8g --num-executors 4 --executor-cores 4 --executor-memory 16g hello-spark.jar
So this example is trying to submit a hello-spark application. I can submit and run it in my Hadoop YARN cluster, so the master is yarn. I want this application to run in cluster mode, so the deploy-mode is the cluster. I am requesting YARN RM to allocate a driver container with 2 CPU cores and 8 GB memory. So I defined two driver-cores and 8G driver-memory. I also want the driver to ask YARN RM for four executor containers and start four executors. So I defined num-executors as four. Then I wanted to get 2 CPU cores and 16 GB of memory for each executor container. So I defined executor-cores and executor-memory accordingly.
Spark Driver Memory Allocation
Now let’s come to the actual topic of this article. Assume you submitted a spark application in a YARN cluster. The YARN RM will allocate an application master (AM) container and start the driver JVM in the container. The driver will start with some memory allocation that you requested. However, do you know how to ask for the driver’s memory?
You can ask for the driver memory using two configurations.
So let’s assume you asked for the spark.driver.memory = 1GB. And the default value of spark.driver.memoryOverhead = 0.10
In the above scenario, the YARN RM will allocate 1 GB of memory for the driver JVM. And it will also allocate 10% of your requested memory or 384 MB, whatever is higher for container overhead. We asked for 1 GB spark.driver.memory. Right?
So the 10% of 1 GB is 100 MB. But 100 MB is less than the 384 MB. So the YARN RM will allocate 384 MB for overhead. So what is the total memory for the container?
It comes to 1 GB + 384 MB. Right?
But what is the purpose of 384 MB overhead? The overhead memory is used by the container process or any other non-JVM process within the container. Your Spark driver uses all the JVM heap but nothing from the overhead. Great! That’s all about the driver’s memory allocation.
Now the driver is started with 1 GB of JVM heap. So the driver will again request the executor containers from the YARN. The YARN RM will allocate a bunch of executor containers. But how much memory do you get for each executor container?
Spark Executor Memory Allocation
Once the driver starts, it will again go back to the cluster resource manager and request the executor containers. The total memory allocated to the executor container is the sum of the following.
- Overhead Memory – spark.executor.memoryOverhead
- Heap Memory – spark.executor.memory
- Off Heap Memory – spark.memory.offHeap.size
- Pyspark Memory – spark.executor.pyspark.memory
So a Spark driver will ask for executor container memory using four configurations as listed above. So the driver will look at all the above configurations to calculate your memory requirement and sum it up.
Now let’s assume you asked for spark.executor.memory = 8 GB
The default value of spark.executor.memoryOverhead = 10%
Let’s assume the other two configurations are not set, and the default value is zero. So how much memory do you get for your executor container?
You asked spark.executor.memory = 8 GB, so you will get 8 GB for JVM. Then you asked for spark.executor.memoryOverhead = 10%, so you will get 800 MB extra for the overhead. And the total container memory comes to 8800 MB.
So the driver will ask for 8.8 GB containers to the YARN RM. But do we get an 8.8 GB container?
That depends on your cluster configuration. The container should run on a worker node in the YARN cluster. What if the worker node is a 6 GB machine?
YARN cannot allocate an 8 GB container on a 6 GB machine. Because there is not enough physical memory.
You should check with your cluster admin for the maximum allowed value before asking for the driver or executor memory. If you are using YARN RM, you should look for the following configurations.
For example, let’s assume you are using AWS c4.large instance type to set up your Spark cluster using AWS EMR. The c4.large instance comes with 3.75 GB RAM. You launched an EMR cluster using c4.large instances. The EMR cluster will start with the default value of yarn.scheduler.maximum-allocation-mb = 1792
That means you cannot ask for a container of higher than 1.792 GB. That’s your physical memory limit for each container. Even if you ask, you are not going to get it. Make sense?
Summarizing Driver and Executor Memory
Let me summarize what we discussed, and I am using the above diagram to summarize our discussion. I am assuming that we have enough physical memory on the worker nodes. And we learned about the driver and executor memory. The total physical memory for a driver container comes from the following two configurations.
Once allocated, it becomes your physical memory limit for your spark driver. For example, if you asked for a 4 GB spark.driver.memory, you will get 4 GB JVM heap and 400 MB off JVM Overhead memory. Now you have three limits.
- Your Spark driver JVM cannot use more than 4 GB.
- Your non-JVM workload in the container cannot use more than 400 MB.
- Your container cannot use more than 4.4 GB of memory in total.
If any of these limits are violated, you will see an OOM exception.
Now let me summarize executor memory allocation. The total physical memory for an executor container comes from the following four configurations.
The default value for the third and fourth configurations are not defined. So you can consider them zero.
Now let’s assume you asked for spark.executor.memory = 8 GB.
So you will get 8 GB for the Spark executor JVM. You will also get 800 MB for overhead. The total physical memory of your container is 8.8 GB.
Now you have three limits.
- Your executor JVM cannot use more than 8 GB of memory.
- Your non JVM processes cannot use more than 800 MB.
- Your container has a maximum physical limit of 8.8 GB.
You will see an OOM exception when any of these limits are crossed.
Great! So far, so good. But you should ask two more questions?
- What is the Physical memory limit at the worker node?
- What is the PySpark executor memory?
I already talked about the Physical memory limit for your workers. You should look out for yarn.scheduler.maximum-allocation-mb for the maximum limit. You cannot get more than the maximum-allocation-mb.
The next question is, how much memory do you get for your PySpark?
We didn’t talk about the PySpark memory yet. All we are talking about is JVM and non-JVM memory.
You do not need to worry about PySpark memory if you write your Spark application in Java or Scala. But if you are using PySpark, this question becomes critical. Let’s try to understand.
For the above example, how much memory do you get for PySpark?
The answer might surprise you.
PySpark is not a JVM process. So you will not get anything from those 8 GBs. All you have is 800 MB of overhead memory. Some 300 to 400 MB of this is constantly consumed by the container processes and other internal processes, so your PySpark will get approximately 400 MB.
So now you have one more limit.
If your PySpark consumes more than what can be accommodated in the overhead, you will see an OOM error.
Great! So if you look from the YARN perspective.
You have a container, and the container has got some memory. This total memory is broken into two parts.
- Heap memory( driver/executor memory)
- Overhead memory (OS Memory)
The heap memory goes to your JVM. We call it driver memory when you are running a driver in this container. Similarly, we call it executor memory when the container is running an executor. The overhead memory is used for a bunch of things. In fact, the overhead is also used for network buffers. So you will be using overhead memory as your shuffle exchange or reading partition data from remote storage etc. The point is straight.
Both the memory portions are critical for your Spark application. And more than often, lack of enough overhead memory will cost you an OOM exception. Because the overhead memory is often overlooked, but it is used as your shuffle exchange or network read buffer.
Great! That’s all for the driver and executor memory allocations.
Keep Learning and Keep Growing.A