Cloudera workloads and S3 dataset(s)
Part 3 of the series on Cloudera S3 access to a Pure Storage FlashBlade covering Spark, Hive and distcp.
Part 1 : GUI configuration of Cloudera v7 to use on premise S3 storage
Part 2 : S3 credentials in Cloudera
Spark job using S3 dataset(s)
The following is an example spark job to leverage our New York City taxi dataset, it reads in the file and then counts the lines. In order to run this spark job we will leverage the credential provider to store our S3 access keys (se part 2 of the Cloudera and FlashBlade S3 series).
Create on one of the Cloudera nodes a s3job.py file with the following contents:
from pyspark import SparkConffrom pyspark import SparkContextfrom pyspark.sql import SparkSessionconf = SparkConf()conf.setMaster(‘yarn’)conf.set(“spark.submit.deployMode”, “client”)conf.setAppName(‘spark-s3’)conf.set(“hadoop.security.credential.provider.path”, “jceks://hdfs/user/hdfs/home/keystores/fb01.jceks”)sc = SparkContext(conf=conf)spark = SparkSession(sc)s3csv = “s3a://clouderabucket/yellow_tripdata_2010–01.csv”nycs3 = spark.read.load(s3csv, format=”csv”, header=True)print(“#######################”)print(nycs3.count())print(“#######################”)
Then submit the job to the yarn scheduler as follows:
$ sudo -u hdfs spark-submit s3job.py
Note that the usual — master yarn — deploy-mode client parameters are included in the code snippet.
The job will start, connect to yarn and schedule the required spark resources within the yarn manager web UI we can see the job:
And eventually finish, outputting the desired result of 14863778 lines
Hive query on S3 datasets
Within Cloudera management GUI go to HIVE > Configuration for you cluster and add the fs.s3a.security.credential.provider.path configuration setting to the Hive Service Advanced Configuration Snippet (Safety Valve) for hive-site.xml, note that I have created a copy of my credential file and assigned correct permissions for the hive user:
$ sudo -u hdfs hadoop fs -ls /user/hiveFound 1 items-rw-r — r — 3 hive hive 1040 2021–04–28 10:08 /user/hive/fb01.jceks
Remember to restart to apply the configuration changes to all the Hive nodes in the cluster.
We can list the hive-site.xml files on our node and check the latest one for the added setting:
$ sudo tail -5 /run/cloudera-scm-agent/process/1546342555-hive-HIVESERVER2/hive-site.xml<property><name>fs.s3a.security.credential.provider.path</name><value>jceks://hdfs/user/hive/fb01.jceks</value></property></configuration>
Hive by default uses its own generated credstore passwords so we need to disable this (or create the credential store using this password — not covered see Cloudera):
We can now open a hive connection and create a table from our S3 dataset, open a connection to the local hive db:
$ beelinebeeline> !connect jdbc:hive2://jbt-cdw-d5.----.purestorage.com:10000connecting to jdbc:hive2://jbt-cdw-d5.----.purestorage.com:10000Enter username for jdbc:hive2://jbt-cdw-d5.----.purestorage.com:10000: adminEnter password for jdbc:hive2://jbt-cdw-d5.----.purestorage.com:10000: ****Connected to: Apache Hive (version 3.1.3000.7.1.4.0–203)Driver: Hive JDBC (version 3.1.3000.7.1.4.0–203)Transaction isolation: TRANSACTION_REPEATABLE_READ0: jdbc:hive2://jbt-cdw-d5.----.purestorage.>
Create the external table using our S3 located NYC taxi csv file:
0: jdbc:hive2://jbt-cdw-d5.----.purestorage.com> CREATE EXTERNAL TABLE nycs3yellow(vendor_id string, pickup_datetime string, dropoff_datetime string, passenger_count string, trip_distance string, pickup_longitude string, pickup_latitude string, rate_code string, store_and_fwd_flag string, dropoff_longitude string, dropoff_latitude string, payment_type string, fare_amount string, surcharge string, mta_tax string, tip_amount string, tolls_amount string, total_amount string, dispatching_base_num string, pickup_date string, locationid string). . . . . . . . . . . . . . . . . . . . . . .> LOCATION ‘s3a://clouderabucket/’. . . . . . . . . . . . . . . . . . . . . . .> TBLPROPERTIES (“skip.header.line.count”=”1");INFO : Compiling command(queryId=hive_20210428120001_3ffc3dd9–5fe0–4ade-b469-ad082ce06d85): CREATE EXTERNAL TABLE nycs3yellow(vendor_id string, pickup_datetime string, dropoff_datetime string, passenger_count string, trip_distance string, pickup_longitude string, pickup_latitude string, rate_code string, store_and_fwd_flag string, dropoff_longitude string, dropoff_latitude string, payment_type string, fare_amount string, surcharge string, mta_tax string, tip_amount string, tolls_amount string, total_amount string, dispatching_base_num string, pickup_date string, locationid string)LOCATION ‘s3a://clouderabucket/’TBLPROPERTIES (“skip.header.line.count”=”1")INFO : Semantic Analysis Completed (retrial = false)INFO : Created Hive schema: Schema(fieldSchemas:null, properties:null)INFO : Completed compiling command(queryId=hive_20210428120001_3ffc3dd9–5fe0–4ade-b469-ad082ce06d85); Time taken: 2.772 secondsINFO : Concurrency mode is disabled, not creating a lock managerINFO : Executing command(queryId=hive_20210428120001_3ffc3dd9–5fe0–4ade-b469-ad082ce06d85): CREATE EXTERNAL TABLE nycs3yellow(vendor_id string, pickup_datetime string, dropoff_datetime string, passenger_count string, trip_distance string, pickup_longitude string, pickup_latitude string, rate_code string, store_and_fwd_flag string, dropoff_longitude string, dropoff_latitude string, payment_type string, fare_amount string, surcharge string, mta_tax string, tip_amount string, tolls_amount string, total_amount string, dispatching_base_num string, pickup_date string, locationid string)LOCATION ‘s3a://clouderabucket/’TBLPROPERTIES (“skip.header.line.count”=”1")INFO : Starting task [Stage-0:DDL] in serial modeINFO : Completed executing command(queryId=hive_20210428120001_3ffc3dd9–5fe0–4ade-b469-ad082ce06d85); Time taken: 0.932 secondsINFO : OKNo rows affected (3.74 seconds)0: jdbc:hive2://jbt-cdw-d5.----.purestorage.com>
Let us count all the rows using tez connect in beeline and check / set the default execution engine:
0: jdbc:hive2://jbt-cdw-d5.----.purestorage.> SET hive.execution.engine;+ — — — — — — — — — — — — — — +| set |+ — — — — — — — — — — — — — — +| hive.execution.engine=tez |+ — — — — — — — — — — — — — — +1 row selected (0.012 seconds)
I then set the memory for the execution app and run the count operation:
0: jdbc:hive2://jbt-cdw-d5.----.purestorage.> SET hive.tez.container.size=1500;0: jdbc:hive2://jbt-cdw-d5.----.purestorage.> select count(*) from nycs3yellow;INFO : Compiling command(queryId=hive_20210506132814_f5a331b5–5059–40e9-a1fb-bb6c12cde9ac): select count(*) from nycs3yellowINFO : Semantic Analysis Completed (retrial = false)INFO : Created Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, type:bigint, comment:null)], properties:null)INFO : Completed compiling command(queryId=hive_20210506132814_f5a331b5–5059–40e9-a1fb-bb6c12cde9ac); Time taken: 0.091 secondsINFO : Concurrency mode is disabled, not creating a lock managerINFO : Executing command(queryId=hive_20210506132814_f5a331b5–5059–40e9-a1fb-bb6c12cde9ac): select count(*) from nycs3yellowINFO : Query ID = hive_20210506132814_f5a331b5–5059–40e9-a1fb-bb6c12cde9acINFO : Total jobs = 1INFO : Launching Job 1 out of 1INFO : Starting task [Stage-1:MAPRED] in serial modeINFO : Subscribed to counters: [] for queryId: hive_20210506132814_f5a331b5–5059–40e9-a1fb-bb6c12cde9acINFO : Session is already openINFO : Dag name: select count(*) from nycs3yellow (Stage-1)INFO : Status: Running (Executing on YARN cluster with App id application_1619546211400_0047) — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — Map 1 ………. container SUCCEEDED 6 6 0 0 0 0Reducer 2 …… container SUCCEEDED 1 1 0 0 0 0 — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 54.71 s — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — -+ — — — — — -+| _c0 |+ — — — — — -+| 14863779 |+ — — — — — -+1 row selected (55.138 seconds)
We can also use mapreduce as the query engine:
> set hive.execution.engine=mr;1: jdbc:hive2://jbt-cdw-d5.uklab.purestorage.> set hive.execution.engine;+ — — — — — — — — — — — — — -+| set |+ — — — — — — — — — — — — — -+| hive.execution.engine=mr |+ — — — — — — — — — — — — — -+
1 row selected (0.012 seconds)
Run the query get the line count of the external S3 csv file table :
1: jdbc:hive2://jbt-cdw-d5.----.purestorage.> select count(*) from nycs3yellow;INFO : Compiling command(queryId=hive_20210506120826_dc3f2a01-e73f-4c05-a032–04880c1cacdc): select count(*) from nycs3yellowINFO : Semantic Analysis Completed (retrial = false)INFO : Created Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, type:bigint, comment:null)], properties:null)INFO : Completed compiling command(queryId=hive_20210506120826_dc3f2a01-e73f-4c05-a032–04880c1cacdc); Time taken: 0.124 secondsINFO : Concurrency mode is disabled, not creating a lock managerINFO : Executing command(queryId=hive_20210506120826_dc3f2a01-e73f-4c05-a032–04880c1cacdc): select count(*) from nycs3yellowINFO : Query ID = hive_20210506120826_dc3f2a01-e73f-4c05-a032–04880c1cacdcINFO : Total jobs = 1INFO : Launching Job 1 out of 1...INFO : The url to track the job: http://jbt-cdw-d3.----.purestorage.com:8088/proxy/application_1619546211400_0030/INFO : Starting Job = job_1619546211400_0030, Tracking URL = http://jbt-cdw-d3.----.purestorage.com:8088/proxy/application_1619546211400_0030/INFO : Kill Command = /opt/cloudera/parcels/CDH-7.1.4–1.cdh7.1.4.p0.6300266/lib/hadoop/bin/mapred job -kill job_1619546211400_0030INFO : Hadoop job information for Stage-1: number of mappers: 10; number of reducers: 1INFO : 2021–05–06 12:08:41,422 Stage-1 map = 0%, reduce = 0%
...INFO : Completed executing command(queryId=hive_20210506120826_dc3f2a01-e73f-4c05-a032–04880c1cacdc); Time taken: 55.802 secondsINFO : OK+ — — — — — -+| _c0 |+ — — — — — -+| 14863780 |+ — — — — — -+
In the above two cases the job is submitted to yarn as a tez or a mapreduce job and able to use our credentials to count the rows of our external s3 table.
Distcp between HDFS and S3
We can also use distcp to move datasets between our legacy Cloudera HDFS and our S3 FlashBlade storage. Within my environment I will list the available files on my HDFS storage:
$ sudo -u hdfs hadoop fs -ls /userFound 6 items-rw-r — r — 3 hdfs supergroup 1083312 2021–03–11 21:05 /user/asimplehdfsfiledrwxr-xr-x — hdfs supergroup 0 2021–04–27 11:56 /user/hdfsdrwxrwxrwx — mapred hadoop 0 2021–03–10 07:31 /user/historydrwxr-x — x — spark spark 0 2021–03–10 07:29 /user/sparkdrwxr-xr-x — hdfs supergroup 0 2021–03–10 07:30 /user/yarndrwx — — — — zeppelin zeppelin 0 2021–04–27 11:00 /user/zeppelin
Then copy a file to the S3 bucket on the FlashBlade:
$ sudo -u hdfs hadoop distcp -D hadoop.security.credential.provider.path=jceks://hdfs/user/hdfs/home/keystores/fb01.jceks /user/asimplehdfsfile s3a://clouderabucket/
And verify the contents of our bucket:
$ sudo -u hdfs hadoop fs -D hadoop.security.credential.provider.path=jceks://hdfs/user/hdfs/home/keystores/fb01.jceks -ls s3a://clouderabucket/…21/04/27 20:00:01 INFO s3a.S3AFileSystem: S3Guard is disabled on this bucket: clouderabucketFound 2 items-rw-rw-rw- 1 hdfs hdfs 1083312 2021–04–27 19:58 s3a://clouderabucket/asimplehdfsfile-rw-rw-rw- 1 hdfs hdfs 2728058790 2021–04–16 11:19 s3a://clouderabucket/yellow_tripdata_2010–01.csv…