Wednesday, November 23, 2016

Apache Pig Interview Questions

1. Can you name complex data types in Pig Latin?
Ans:

Map: Set of key-value pairs; keys must be character arrays, but values may be any type. Ex: ['a'#'pomegranate']
Tuple: Sequence of fields of any type. Ex: (1,'pomegranate')
Bag: Unordered collection of tuples, possibly with duplicates. Ex: {(1,'pomegranate'),(2)}



Wednesday, November 9, 2016

Sqoop best practices

Best practices:

1.  With imports, use the output line formatting options wherever possible, for accuracy of data transfer - "--enclosed -by", "--fields-terminated-by",  "--escaped-by"

2.  Use your judgement when you provide number of mappers  to ensure you appropriately parallelize the import without increasing overall completion time. (default - 4 tasks are run parallel)

3.  Use direct connectors where available for better performance.

4.  With imports, use a boundary query for better performance.

5.  When importing into Hive and using dynamic partitions, think through partition criteria and number of files generated...you dont want too many small files on your cluster;  Also, there is a limit on the number of partitions on each node.

6.  Be cognizant of the configuration of concurrent connections allowed to the database; Use fetch size for controlling number of records to be read from the database, and also factor in the number of parallel tasks.

7.  Do not use the same table for import and export.

8.  Use an options file for reusability.

9. Be aware of case sensitivity nuances of Sqoop - you might save on time you would spend trouble-shooting issues.

10.  For exports, use a staging table where possible, during development phase, it will help with troubleshooting.

11.  Use the verbose argument (--verbose) for more information during trouble-shooting.


Trouble-shoting tips:

https://sqoop.apache.org/docs/1.4.0-incubating/SqoopUserGuide.html#id1774381


Import Data from RDBMS to Hive using Sqoop

Import data into Hive


1. Basic import


Here is some data in the employees database that we will use:

mysql> select * from departments;
+---------+--------------------+
| dept_no | dept_name          |
+---------+--------------------+
| d009    | Customer Service   |
| d005    | Development        |
| d002    | Finance            |
| d003    | Human Resources    |
| d001    | Marketing          |
| d004    | Production         |
| d006    | Quality Management |
| d008    | Research           |
| d007    | Sales              |
+---------+--------------------+
9 rows in set (0.00 sec)

sqoop import comand:
sqoop import \
--connect jdbc:mysql://vchennar-mySqlServer/employees \
--username myUID \
--password myPWD \
--table departments \
--direct \
-m 1 \
--hive-import \
--create-hive-table \
--hive-table departments_mysql \
--target-dir /user/hive/warehouse/ \
--enclosed-by '\"' \
--fields-terminated-by , \
--escaped-by \\ \


File created in HDFS:
$ hadoop fs -ls -R /user/hive/warehouse | grep /part* | awk '{print $8}'

/user/hive/warehouse/departments_mysql/part-m-00000 

Validate the number of records:

$ hadoop fs -cat /user/hive/warehouse/departments_mysql/part-m-00000 | wc -l

9

Check the imported data in HDFS:
$ hadoop fs -cat /user/hive/warehouse/departments_mysql/part-m-00000 | more

"d009","Customer Service"
"d005","Development"
"d002","Finance"
"d003","Human Resources"
"d001","Marketing"
"d004","Production"
"d006","Quality Management"
"d008","Research"
"d007","Sales"

Validate results in Hive:

$ hive

hive> show tables;

departments_mysql


hive> select * from departments_mysql;


"d009"           "Customer Service"
"d005"           "Development"
"d002"           "Finance"
"d003"           "Human Resources"
"d001"           "Marketing"
"d004"           "Production"
"d006"           "Quality Management"
"d008"           "Research"
"d007"           "Sales"

2. Importing into Hive with partitions


To try this functionality out, I decided on gender as my partition criteria.

mysql> select gender, count(*) from employees group by gender;  
+--------+----------+
| gender | count(*) |
+--------+----------+
| M      |   179973 |
| F      |   120051 |
+--------+----------+

Import gender="M"

$ sqoop import \
--connect jdbc:mysql://vchennar-mySqlServer-node/employees \
--username myUID \
--password myPwd \
--query 'select EMP_NO,birth_date,first_name,last_name,hire_date from employees where gender="M" AND $CONDITIONS'  \
--direct \
-m 6 \
--split-by EMP_NO \
--hive-import \
--create-hive-table \
--hive-table employees_import_parts \
--target-dir /user/hive/warehouse/employee-parts \
--hive-partition-key gender \
--hive-partition-value 'M' \
--enclosed-by '\"' \
--fields-terminated-by , \
--escaped-by \\ \


Note 1: Gender column should not be included in the query.
The two arguments (--hive-partition...) highlighted in yellow are required.
Also, note that I have added a where clause to filter on just gender="M".

Note 2: If the column emp_no is listed in lower case in the query, only null is retrieved.  If we switch the case of this just one field, to EMP_NO, it works fine.

See if files were created:

$ hadoop fs -ls -R /user/hive/warehouse/employees_import_parts | grep /part* | awk '{print $8}'

/user/hive/warehouse/employees_import_parts/gender=M/part-m-00000
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00001
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00002
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00003
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00004
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00005


Do a line count:
$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/gender=M/* | wc -l

179973

Open a file to see if it is formatted right:

$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/gender=M/part-m-00005 | more

"418333","1954-11-10","Jackson","Simmen","1993-11-14"
"418334","1954-04-12","Jingling","Orlowski","1985-06-19"
"418335","1963-09-09","Kwok","Dalton","1986-07-28"
"418337","1961-08-31","Eberhardt","Ramras","1988-02-25"

Note: gender is not in the data file but in a directory name/partition name.

Check if table got created
hive> show tables;

employees_import_parts

Display column headers:
hive> set hive.cli.print.header=true;

Validate record count:
hive> select gender, count(*) from employees_import_parts group by gender;   

gender _c1
M 179973

The count is accurate.

Review one record for accuracy:
hive> select * from employees_import_parts limit 1;


emp_no birth_date first_name last_name hire_date gender
"1234" "1953-09-02" "Georgi" "Facello" "1986-06-26" M

Validate if table is partitioned

hive> show partitions employees_import_parts;

partition
gender=F
gender=M

Import gender="F"

$ sqoop import \
--connect jdbc:mysql://airawat-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--query 'select emp_no,birth_date,first_name,last_name,hire_date from employees where gender="F" AND $CONDITIONS'  \
--direct \
-m 6 \
--split-by emp_no \
--hive-import \
--hive-overwrite \
--hive-table employees_import_parts \
--target-dir /user/hive/warehouse/employee-parts_F \
--hive-partition-key gender \
--hive-partition-value 'F' \
--enclosed-by '\"' \
--fields-terminated-by , \
--escaped-by \\ \

Files generated:

$ hadoop fs -ls -R /user/hive/warehouse/employees_import_parts | grep /part* | awk '{print $8}'

/user/hive/warehouse/employees_import_parts/gender=F/part-m-00000
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00001
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00002
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00003
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00004
/user/hive/warehouse/employees_import_parts/gender=F/part-m-00005
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00000
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00001
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00002
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00003
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00004
/user/hive/warehouse/employees_import_parts/gender=M/part-m-00005


Record count for gender=F:

$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/gender=F/part* | wc -l

120051

The count is accurate.


Record count for employees in total:

Expected: 300024

$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/*/part* | wc -l

300024


Validate a bunch of records for accuracy of format:


$ hadoop fs -cat /user/hive/warehouse/employees_import_parts/gender=F/part-m-00005 | more

"418330","1953-06-13","Pranas","McFarlan","1989-12-23"
"418331","1954-04-07","Chaosheng","Greenaway","1996-05-21"
"418332","1961-04-19","Koichi","Cannard","1986-01-21"
"418336","1954-02-14","Georgy","Thimonier","1994-03-21"


Validate count in Hive:

hive> select gender, count(*) from employees_import_parts group by gender;   

gender _c1
F 120051
M 179973

The counts are accurate.

Validate records in Hive:

hive> select * from employees_import_parts where gender='F' limit 2;

emp_no birth_date first_name last_name hire_date gender
NULL "1964-06-02" "Bezalel" "Simmel" "1985-11-21" F
NULL "1953-04-20" "Anneke" "Preusig" "1989-06-02" F

How to Schedule Sqoop jobs in Oozie


Oozie Sqoop Action Extension

IMPORTANT: The Sqoop action requires Apache Hadoop 0.23.

The sqoop action runs a Sqoop job.

The workflow job will wait until the Sqoop job completes before continuing to the next action.

To run the Sqoop job, you have to configure the sqoop action with the =job-tracker=, name-node and Sqoop command or arg elements as well as configuration.

sqoop action can be configured to create or delete HDFS directories before starting the Sqoop job.

Sqoop configuration can be specified with a file, using the job-xml element, and inline, using the configuration elements.

Oozie EL expressions can be used in the inline configuration. Property values specified in the configuration element override values specified in the job-xml file.

Note that Hadoop mapred.job.tracker and fs.default.name properties must not be present in the inline configuration.

As with Hadoop map-reduce jobs, it is possible to add files and archives in order to make them available to the Sqoop job. Refer to the [WorkflowFunctionalSpec#FilesAchives][Adding Files and Archives for the Job] section for more information about this feature.

Syntax:
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <command>[SQOOP-COMMAND]</command>
            <arg>[SQOOP-ARGUMENT]</arg>
            ...
            <file>[FILE-PATH]</file>
            ...
            <archive>[FILE-PATH]</archive>
            ...
        </sqoop>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

The prepare element, if present, indicates a list of paths to delete or create before starting the job. Specified paths must start with hdfs://HOST:PORT .

The job-xml element, if present, specifies a file containing configuration for the Sqoop job. As of schema 0.3, multiple job-xml elements are allowed in order to specify multiple job.xml files.

The configuration element, if present, contains configuration properties that are passed to the Sqoop job.

Sqoop command

The Sqoop command can be specified either using the command element or multiple arg elements.

When using the command element, Oozie will split the command on every space into multiple arguments.

When using the arg elements, Oozie will pass each argument value as an argument to Sqoop.

The arg variant should be used when there are spaces within a single argument.

Consult the Sqoop documentation for a complete list of valid Sqoop commands.

All the above elements can be parameterized (templatized) using EL expressions.

Examples:
Using the command element:
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirsthivejob">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-traker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${jobOutput}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <command>import  --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir hdfs://localhost:8020/user/tucu/foo -m 1</command>
        </sqoop>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

The same Sqoop action using arg elements:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirsthivejob">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-traker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${jobOutput}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <arg>import</arg>
            <arg>--connect</arg>
            <arg>jdbc:hsqldb:file:db.hsqldb</arg>
            <arg>--table</arg>
            <arg>TT</arg>
            <arg>--target-dir</arg>
            <arg>hdfs://localhost:8020/user/tucu/foo</arg>
            <arg>-m</arg>
            <arg>1</arg>
        </sqoop>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

NOTE: The arg elements syntax, while more verbose, allows to have spaces in a single argument, something useful when using free from queries.

Apache Oozie documentation on Sqoop action: 
https://oozie.apache.org/docs/4.0.0/DG_SqoopActionExtension.html


Apache Oozie

Overview

Oozie is a workflow scheduler system to manage Apache Hadoop jobs.
Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions.
Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability.
Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop and Distcp) as well as system specific jobs (such as Java programs and shell scripts).
Oozie is a scalable, reliable and extensible system.

Oozie Workflow Overview

Oozie is a server based Workflow Engine specialized in running workflow jobs with actions that run Hadoop Map/Reduce and Pig jobs.

Oozie is a Java Web-Application that runs in a Java servlet-container.

For the purposes of Oozie, a workflow is a collection of actions (i.e. Hadoop Map/Reduce jobs, Pig jobs) arranged in a control dependency DAG (Direct Acyclic Graph). "control dependency" from one action to another means that the second action can't run until the first action has completed.

Oozie workflows definitions are written in hPDL (a XML Process Definition Language).

Oozie workflow actions start jobs in remote systems (i.e. Hadoop, Pig). Upon action completion, the remote systems callback Oozie to notify the action completion, at this point Oozie proceeds to the next action in the workflow.

Oozie workflows contain control flow nodes and action nodes.

Control flow nodes define the beginning and the end of a workflow ( start , end and fail nodes) and provide a mechanism to control the workflow execution path ( decision , fork and join nodes).

Action nodes are the mechanism by which a workflow triggers the execution of a computation/processing task. Oozie provides support for different types of actions: Hadoop map-reduce, Hadoop file system, Pig, SSH, HTTP, eMail and Oozie sub-workflow. Oozie can be extended to support additional type of actions.

Oozie workflows can be parameterized (using variables like ${inputDir} within the workflow definition). When submitting a workflow job values for the parameters must be provided. If properly parameterized (i.e. using different output directories) several identical workflow jobs can concurrently.

WordCount Workflow Example

Workflow Diagram:
hPDL Workflow Definition:
<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.1">
    <start to='wordcount'/>
    <action name='wordcount'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.myorg.WordCount.Map</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.myorg.WordCount.Reduce</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to='end'/>
        <error to='end'/>
    </action>
    <kill name='kill'>
        <message>Something went wrong: ${wf:errorCode('wordcount')}</message>
    </kill/>
    <end name='end'/>
</workflow-app>


http://oozie.apache.org/docs/4.2.0/WorkflowFunctionalSpec.html


Q) What are the alternatives to Oozie workflow scheduler?

Azkaban is a batch workflow job scheduler 

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.

apache Falcon - Feed management and data processing platform

Friday, November 4, 2016

Best Practices for Apache Hive to improve performance

1. Partitioning Tables


Hive partitioning is an effective method to improve the query performance on larger tables. Partitioning allows you to store data in separate sub-directories under table location. It greatly helps the queries which are queried upon the partition key(s). Although the selection of partition key is
always a sensitive decision, it should always be a low cardinal attribute, ex. if your data is associated with time dimension, then date could be a good partition key. Similarly, if data has association with location, like a country or state, then it’s a good idea to have hierarchical partitions like country/state.

2. Bucketing


Bucketing improves the join performance if the bucket key and join keys are common. (Tweet this) Bucketing in Hive distributes the data in different buckets based on the hash results on the bucket key. It also reduces the I/O scans during the join process if the process is happening on the same keys (columns).

Additionally it’s important to ensure the bucketing flag is set (SET hive.enforce.bucketing=true;) every time before writing data to the bucketed table. To leverage the bucketing in the join operation we should SET hive.optimize.bucketmapjoin=true. This setting hints to Hive to do bucket level join during the map stage join. It also reduces the scan cycles to find a particular key because bucketing ensures that the key is present in a certain bucket.

3. Compression


Compression techniques significantly reduce the intermediate data volume, which internally reduces the amount of data transfers between mappers and reducers. All this generally occurs over the network. Compression can be applied on the mapper and reducer output individually. Keep in mind that gzip compressed files are not splitable. That means this should be applied with caution. A compressed file size should not be larger than a few hundred megabytes. Otherwise it can potentially lead to an imbalanced job. Other options of compression codec could be snappy, lzo, bzip, etc.

4. Use ORC Table Storage format


ORC(Optimized Row Columnar) is a new table storage format that gives better performance as Row Columniar formats (RCFile, ORC etc.) allows you to reduce the read operations in analytics queries by allowing each column to be accessed individually.

ORC supports both compressed storage (like SNAPPY) and uncompressed storage. ORC format with Snappy compression can offer higher performance.

5. Parallel Execution


Hadoop can execute MapReduce jobs in parallel, and several queries executed on Hive automatically use this parallelism. However, single, complex Hive queries commonly are translated to a number of MapReduce jobs that are executed by default sequencing. Often though, some of a query’s MapReduce stages are not interdependent and could be executed in parallel. They can take advantage of spare capacity on a cluster and improve cluster utilization while at the same time reducing the overall query executions time. The configuration in Hive to change this behavior is merely switching a single flag SET hive.exec.parallel=true.


6.  Vectorization


Vectorization allows Hive to process a batch of rows together instead of processing one row at a time. Each batch consists of a column vector which is usually an array of primitive types. Operations are performed on the entire column vector, which improves the instruction pipelines and cache usage. To enable vectorization, set this configuration parameter SET hive.vectorized.execution.enabled=true.

7. Map Join



Map joins are really efficient if a table on the other side of a join is small enough to fit in the memory. Hive supports a parameter, hive.auto.convert.join, which when it’s set to “true” suggests that Hive try to map join automatically. When using this parameter, be sure the auto convert is enabled in the Hive environment.

8. Tez Execution Engine in Hive

Tez Execution Engine – Hive Optimization Techniques, to increase the Hive performance of our hive query by using our execution engine as Tez. On defining Tez, it is a new application framework built on Hadoop Yarn. That executes complex-directed acyclic graphs of general data processing tasks. However, we can consider it to be a much more flexible and powerful successor to the map-reduce framework.

set hive.execution.engine=tez;

9. Cost-Based Optimization in Hive (CBO)

Cost-Based Optimization in Hive – Hive Optimization Techniques, before submitting for final execution Hive optimizes each Query’s logical and physical execution plan. Although, until now these optimizations are not based on the cost of the query.

However, CBO, performs, further optimizations based on query cost in a recent addition to Hive. That results in potentially different decisions: how to order joins, which type of join to perform, the degree of parallelism and others.

To use CBO, set the following parameters at the beginning of your query:

set hive.cbo.enable=true;

set hive.compute.query.using.stats=true;

set hive.stats.fetch.column.stats=true;

set hive.stats.fetch.partition.stats=true;

Then, prepare the data for CBO by running Hive’s “analyze” command to collect various statistics on the tables for which we want to use CBO.

10. Avoid “SELECT count(DISTINCT field) FROM tbl”

This query looks familiar to SQL users, but this query is very slow because only one reducer is used to process the request.

SELECT count(DISTINCT field) FROM tbl
Rewrite the query to leverage multiple reducers:

SELECT
  count(1)
FROM
(SELECT DISTINCT field FROM tbl) t

11. Limiting the Data ( 

While querying only required data should be loaded into execution engine, in other terms should use WHERE clause wherever it is required. Use Row number or Rank process only latest data, etc.

While Joining a table, instead of joining the whole table write a subquery using a where condition while joining.

11.1 Predicate Push-Down Optimization
The columnar nature of the ORC format helps avoid reading unnecessary columns, but it is still possible to read unnecessary rows. The example in this subsection reads all rows in which the age value is between 0 and 100, even though the query requested rows in which the age value is less than 15 ("...WHERE age < 15"). Such full table scanning is an expensive operation.
ORC avoids this type of overhead by using predicate push-down, with three levels of built-in indexes within each file: file level, stripe level, and row level:
File-level and stripe-level statistics are in the file footer, making it easy to determine if the rest of the file must be read.
Row-level indexes include column statistics for each row group and position, for finding the start of the row group.
ORC uses these indexes to move the filter operation to the data loading phase by reading only data that potentially includes required rows.

12 Optimize Queries Using Partition Pruning

When predicate push-down optimization is not applicable—for example, if all stripes contain records that match the predicate condition—a query with a WHERE clause might need to read the entire data set. This becomes a bottleneck over a large table. Partition pruning is another optimization method; it exploits query semantics to avoid reading large amounts of data unnecessarily.

Partition pruning is possible when data within a table is split across multiple logical partitions. Each partition corresponds to a particular value of a partition column and is stored as a subdirectory within the table root directory on HDFS. Where applicable, only the required partitions (subdirectories) of a table are queried, thereby avoiding unnecessary I/O.

Spark supports saving data in a partitioned layout seamlessly, through the partitionBy method available during data source write operations. To partition the "people" table by the “age” column, you can use the following command:

people.write.format("orc").partitionBy("age").save("peoplePartitioned")
As a result, records are automatically partitioned by the age field and then saved into different directories: for example, peoplePartitioned/age=1/, peoplePartitioned/age=2/, and so on.

13.