Spark Action on Spark Jobs
Spark Action Running Against HDFS
Spark can access the HDFS file system to read and write data after processing. The following is a sample program and describes a word count of testFile.txt
stored in /data/input/
in HDFS. This sample stores the result in /data/output/
. This sample file is named as pysparkWc.py
.
from pyspark import SparkContext, SparkConf
appNameTEST ="my first working application"
conf = SparkConf().setAppName(appNameTEST)
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///data/output/")
The following is the workflow XML for executing the sample job. The previous pyspark file must be placed in the same location as workflow.xml
.
<workflow-app name="Spark Test" xmlns="uri:oozie:workflow:0.5">
<start to="spark-ac92"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-ac92">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>pysparkWc.py</jar>
<file>pysparkWc.py#pysparkWc.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Spark Action Running Against Hive
Spark can access the Hive write data after processing. The following is a sample program that describes an example to create and store data into employee table.
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("oozie-hive").enableHiveSupport().getOrCreate()
print("##################### Starting execution ##################")
create_query = "create table if not exists employee (int id,string name)"
spark.sql(create_query).show()
ins_query = "insert into employee values (1, 'John')"
print("query = " + ins_query)
spark.sql(ins_query).show()
ins_query = "insert into employee values (2, 'Mary')"
print("query = " + ins_query)
spark.sql(ins_query).show()
print("##################### Execution finished #################")
There's a distinction between HA clusters and non-HA clusters when accessing Hive using Spark. The main difference is the authorization and access protocols present for HA clusters. We require hcat and Ranger permissions to access Hive in an HA cluster.
For non-HA clusters, the following workflow XML file is enough to access Hive for the preceding script.
<workflow-app name="Spark-Hive-NHA" xmlns="uri:oozie:workflow:0.5">
<start to="spark-ad44"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-ad44">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>oozie-hive.py</jar>
<file>oozie-hive.py#oozie-hive.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
For HA clusters, you must provide the hcat credential for the workflow. The following example contains hcat.metastore.uri
and hcat.metastore.principal
to complete the authorization. Spark is integrated with Ranger. To work with Hive tables from Spark, you must provide the keytab (must be present in HDFS file system) of the user that the program is executed as part of spark-opts
.
<workflow-app name="Spark-Hive-HA" xmlns="uri:oozie:workflow:0.5">
<credentials>
<credential name="hcat" type="hcat">
<property>
<name>hcat.metastore.uri</name>
<value>thrift://training-cluster.bmbdcsad1.bmbdcs.oraclevcn.com:9083</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM</value>
</property>
</credential>
</credentials>
<start to="spark-345a"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-345a" cred="hcat">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>oozie-hive</name>
<jar>oozie-hive.py</jar>
<spark-opts>--principal training/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM --keytab training.service.keytab</spark-opts>
<file>oozie-hive.py#oozie-hive.py</file>
<file>training.service.keytab#training.service.keytab</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Spark Action Running Against Object Storage
In Big Data Service, Spark provides access to Object Storage using an API key created for the cluster. The following is an example for Object Storage, which stores word count of file in Object Storage in output
.
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("spark-object-store")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("oci://training-bucket@namesparce/output/")
The following API key parameters are provided as argument in spark-opts
:
spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID
spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID
spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT
conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH
spark.hadoop.BDS_OSS_CLIENT_REGION
spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE
Details of these parameters can be obtained from API key details of the user and can be populated in the workflow XML shown here:
<workflow-app name="Spark-Obj" xmlns="uri:oozie:workflow:0.5">
<start to="spark-74ff"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-74ff">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>spark-object-storage.py</jar>
<spark-opts>--conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<userOcid> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<Tenancy ID> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<Fingerprint> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<Location of Pem File> --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<Region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<PassPhrase></spark-opts>
<file>spark-object-storage.py#spark-object-storage.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Adding User Libraries for Execution
Add user libraries using Spark options spark.driver.extraclasspath
or spark.executor.extraclasspath
.