Tuesday, September 5, 2017

Spark and Hive Useful Commands


Q. How to submit pyspark script to Spark in Cluster YARN mode?

spark-submit --master yarn --deploy-mode cluster <Config_Options> <SparkScript> <PARAMETERS>

Ex:

spark-submit --master yarn --deploy-mode cluster --driver-memory 5G --conf spark.yarn.executor.extraClassPath=./ --conf spark.scheduler.mode=fair --conf spark.yarn.maxAppAttempts=1  --files /home/hadoop/hive-site.xml,s3://test-emr-bin/qa/test.config s3://test-emr-bin/qa/aggregates/emr_scripts/spark/test_aggregate_spark.py QA aggregates-qa aggregates/consume/ QA_COMM QA_AGGREGATES


Q. How to generate a hive query output to a file with columns separated by | and sorted?

hive -f src.hql > output; cat output | sed 's/\t/|/g' | sort > source;

Q. How to do you compare source and target with hive query results?

hive -f target.hql > output; cat output | sed 's/\t/|/g' | sort > target;

Q. diff source target

cat source | head -1

cat target | head -1


Q: How to update partitions with  msck repair table?

msck repair table test_daily_aggregate

Q How to show partitions

show partitions test_daily_aggregate;

Q. How to run a task in Airflow scheduler?

airflow run -i -f <DAG_NAME> <TASK_NAME> <Schedule_Date> <AIRFLOW_SCHEDULER>

airflow run -i -f demand_aggregate spark_abc_incremental_aggregate 2017-04-17 abcsla

Q. How to compile a new DAG?

airflow list_dags -sd test_dag.py 

Q. How to push a DAG?

   $ push_dag abc_data

Q. How to list Airflow dags

   $ airflow list_dags

Q. How do you pass arguments to Hive Query?

hive -f s3://abc-emr-bin/qa/pos/emr_scripts/hive/abc_transaction_lookup_stg.hive -hivevar HIVEDATABASE_STG=QA_STG_ABC -hivevar HIVEDATABASE=QA_ABC -hivevar DATE_RANGE=2017-02-10


Q. How to know size of folder in S3?

$ aws s3 ls --summarize --human-readable --recursive s3://bi-manage/dev/bi/test/global_test_daily_snapshot 

Q. How to know list of files in S3?

aws s3 ls s3://test/bi/global_test_daily_snapshot/geo_part=USA/transfer_local_snapshot_date_part=2017-05-01/ --recursive

Q. How to copy a file from local machine to EMR cluster

scp -i /Users/test/Desktop/rsa_keys/EMR.cer test.txt hadoop@10.1.1.120:/home/hadoop/

Q. How to get YARN logs

YARN logs :
---------
$ yarn logs -applicationId application_1495046596880_0037

$ yarn logs -applicationId <Application_ID>

Q. How to enable debug logging in Hive?

hive --hiveconf hive.root.logger=DEBUG,console

Q. How to know IP Address?

$ ifconfig

Q. How to add jar file to Spark (pyspark)

pyspark --jars UDFs-1.0-SNAPSHOT-jar-with-dependencies.jar;


Q. How to change execution engine in hive from Tez to mr

set hive.execution.engine=mr;


Q. How to create permanent function:

CREATE FUNCTION qa_TestFunc AS "com.org.nbac.AbcTest" using JAR 's3://emr-bin/qa/data-test/lib/UDFs-1.0-SNAPSHOT-jar-with-dependencies.jar';  

Q. How to run a java class file in a jar:

java -cp s3://emr-bin/dev/data-protection/lib/UDFs-1.0-SNAPSHOT-jar-with-dependencies.jar com.org.nbac.AbcTest.class


Q. How to source env file

$ source source.env 
$ source /Users/bac/Features/path.evn

Q. How to Build UDF Jar file using maven

$ mvn clean install -Dmaven.test.skip


Q. Basic Unit Testing of UDF in Hive:


ADD JAR  s3://-emr-bin/dev/data-test-/lib/UDFs-1.0-SNAPSHOT-jar-with-dependencies.jar;

CREATE TEMPORARY FUNCTION Encrypt AS "com.abc.test.Encrypt";

CREATE TEMPORARY FUNCTION EncryptSignature AS "com.abc.test.EncryptSignature";

CREATE TEMPORARY FUNCTION Decrypt AS "com.abc.test.Decrypt";

SELECT Encrypt('abc', 'xyz', "dev_test");


Q. Basic Unit Testing of UDF in Spark:

spark.sql('CREATE TEMPORARY FUNCTION Encrypt AS "com.abc.test.Encrypt"')
spark.sql('CREATE TEMPORARY FUNCTION EncryptSignature AS "com.abc.test.EncryptSignature"')
spark.sql('CREATE TEMPORARY FUNCTION Decrypt AS "com.abc.test.Decrypt"')

spark.sql('select source_id,EncryptSignature(source_id, current_status, "dev_activity_status") as sourceid_signature from stg_inactivity')

spark-submit --master yarn --deploy-mode cluster --executor-cores 5 --num-executors 90 --driver-memory 5GB --executor-memory 10G --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 --conf fs.s3n.multipart.uploads.enabled=true --conf spark.dynamicAllocation.enabled=false --conf spark.yarn.executor.memoryOverhead=5120 --jars s3://test-emr-bin/dev/test-protection/lib/Test_UDFs-1.0-SNAPSHOT-jar-with-dependencies.jar test_encrypt.py

Q. How to add partitions in Hive

ALTER TABLE dev_test ADD  PARTITION 's3n://bi-managed/dev/test/process_date=2017-05-31'


Q. Insert Overwrite table in Hive

INSERT OVERWRITE TABLE dev_list
SELECT member_id, email_addr
FROM dev_test
;


Q. How to create an external table in Hive

DROP TABLE IF EXISTS dev_list;

CREATE EXTERNAL TABLE `dev_list`(
  `source_id` string, 
  `sourceid_signature` string, 
  `source_cd` string, 
  `status` string)
STORED AS PARQUET
LOCATION
  's3://bi-managed/prod/test/data/2017-06-12'
;


Q. How to submit a job in Spark 

spark-submit --jars s3://emr-bin/dev/common/emr_scripts/jars/UDFs-1.0-SNAPSHOT-jar-with-dependencies.jar pysark_job.py bi-managed/prod bi-managed/prod 2017-06-12


Q. How to set JAVA compiler in Maven to JDK 1.7 
-------------
<properties>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

Ref: https://maven.apache.org/plugins/maven-compiler-plugin/examples/set-compiler-source-and-target.html
--------------------------

Q. How to get avro schema from given avro data file

java -jar ~/Downloads/avro-tools-1.7.4.jar getschema part-r-00000-c7fc5e23-842e-4723-86e1-c069afdeb7c1.avro > chn_pos_clean_stage.avsc


Q. How is use of coalesce() function in Spark SQL

public static Column coalesce(Column... e)

Returns the first column that is not null, or null if all inputs are null.

For example, coalesce(a, b, c) will return a if a is not null, or b if a is null and b is not null, or c if both a and b are null but c is not null.

Q. What is use of concat_ws() function in Spark SQL

public static Column concat_ws(java.lang.String sep, Column... exprs)

Concatenates multiple input string columns together into a single string column, using the given separator.

No comments:

Post a Comment