Things you should know about Spark: part 2

Data Analysis Skills 11

BW L.
Data Engineering Insight

--

This post is part of the Hadoop Series.

Now that we cover the basics, let’s get into some in-depth content.

The spark version in this post is 2.1.1+. Different version of Spark might have different properties that it supports over time. The properties in this post should be common to all spark version.

Dynamic allocation

As the name suggested, dynamic allocation is a way for YARN to dynamically allocate executors for the spark session. The number of executors of the spark session will change dynamically based on the workload.

To enable it, the spark property spark.dynamicAllocation.enabled should be set to 'true'. And when dynamic allocation is enabled, there are companion properties need to be set.

  • spark.dynamicAllocation.initialExecutors, the initial number of executors when the spark sessions is created. This value is set to minExecutors if not set.
  • spark.dynamicAllocation.minExecutors, the minimum number of executors the spark session should maintain.
  • spark.dynamicAllocation.maxExecutors, max number of executors the spark session should maintain.

You should decide if your Spark job can benefit from using dynamic allocation and maybe switch on and off based on the execution of the actual code (by looking at Spark UI).

Spark configurations

Spark sessions can be configured using command line options (for frequently used properties only) or spark config properties. Spark internally translate the command line options to spark config properties. In addition to the dynamic allocation properties, below are some frequently used properties:

  • Dynamic allocation
'spark.dynamicAllocation.enabled'
'spark.dynamicAllocation.minExecutors'
'spark.dynamicAllocation.initialExecutors'
'spark.dynamicAllocation.maxExecutors'
  • Without dynamic
'spark.executor.instances'
  • Yarn queue 'spark.yarn.queue'
  • memory
'spark.executor.memory'
'spark.driver.memory'
  • Spark UI. The property 'spark.ui.port' sets a port on the Hadoop server where you submit the spark code. 'spark.env.SPARK_LOCAL_IP.local' sets the hostname of the hadoop server. When both properties are set in the spark conf, then the spark UI can be directly accessible from http://<hostname>:<port> in your browser.

DO NOT install pyspark package yourself

I cannot stress enough about this. I have to put it as it’s own section because I have seen too many new users try to install pyspark inside their conda envrionment, expecting it to work and failed. (Of course, this is under the case of the user is using a long running hadoop cluster. EMR and DataProc works a little differently.)

Do not try to install pyspark package in your own environment. It will most likely not matching the spark version in your cluster. Even it does, there are dependencies of other Hadoop config and this kind of things are not easy to get right. The best way is to ask your Hadoop admin to provide the location of spark libraries on the Hadoop server you are using.

How to run spark code?

Spark code can be develop in Java, Scala or Python. We’ll only cover python (pyspark) since it’s more popular.

pyspark shell

The easies way is to just run pyspark command. This is a python interpreter with a spark session created by default. pyspark shell is good for quick and short code but not for complicated code development.

Of course, you’ll need to consider the spark configurations like driver memory, executor memory, queue, dynamic allocation. They must be passed in as command line options. Right after the pysparkcommand is executed and you enter the interpreter, the spark session is already up and you won’t be able to change most of the config you need.

spark-submit command line

The spark-submit command comes with the spark installation. You can use to submit spark code in python or scalar (in jar). The yarn-cluster mode is only available by using spark-submit command.

Use Jupyter notebook

If you are use to Jupyter notebook, you don’t need to use any of the above two ways. Just setup a notebook server as you would have develop any python code. You should also make sure this notebook server is running on a Hadoop server that has all spark libraries installed. The caveat of this approach is that it only supports yarn-client mode and will not work with yarn-cluster mode. So it’s good for active development of spark code.

It takes some effort for the jupyter kernel to work with the right spark libs. Below code can be run from inside jupyter lab or notebook.

Copy below code and paste to the beginning of your notebook (before importing pyspark package).

spark_home='/usr/hdp/current/spark-client'
spark_home_py = spark_home + '/python'
spark_home_py_lib = spark_home_py + '/lib'
py4j_zip = '{}/{}'.format(
spark_home_py_lib,
[ f for f in os.listdir(spark_home_py_lib) if f.startswith('py4j')][0])
sys.path.append(spark_home_py)
sys.path.append(py4j_zip)
os.environ["SPARK_HOME"] = spark_home

These will make sure the right spark libs are imported and system environment variable (SPARK_HOME in this case). Then you can import pyspark as usual.

from pyspark import SparkConf
from pyspark.sql import SparkSession

The spark session can be constructed by first set all the spark configurations using SparkConf object and pass the conf to SparkSession builder.

conf = SparkConf()
conf.set('spark.dynamicAllocation.minExecutors', numMinExecuters)
spark = SparkSession \
.builder \
.config(conf=conf) \
.enableHiveSupport() \
.getOrCreate()

All the spark properties should be set using the conf.set('spark.***', value) before the spark session spark is created.

--

--