Showing posts with label Hive Query performance. Show all posts
Showing posts with label Hive Query performance. Show all posts

Friday, November 4, 2016

Best Practices for Apache Hive to improve performance

1. Partitioning Tables


Hive partitioning is an effective method to improve the query performance on larger tables. Partitioning allows you to store data in separate sub-directories under table location. It greatly helps the queries which are queried upon the partition key(s). Although the selection of partition key is
always a sensitive decision, it should always be a low cardinal attribute, ex. if your data is associated with time dimension, then date could be a good partition key. Similarly, if data has association with location, like a country or state, then it’s a good idea to have hierarchical partitions like country/state.

2. Bucketing


Bucketing improves the join performance if the bucket key and join keys are common. (Tweet this) Bucketing in Hive distributes the data in different buckets based on the hash results on the bucket key. It also reduces the I/O scans during the join process if the process is happening on the same keys (columns).

Additionally it’s important to ensure the bucketing flag is set (SET hive.enforce.bucketing=true;) every time before writing data to the bucketed table. To leverage the bucketing in the join operation we should SET hive.optimize.bucketmapjoin=true. This setting hints to Hive to do bucket level join during the map stage join. It also reduces the scan cycles to find a particular key because bucketing ensures that the key is present in a certain bucket.

3. Compression


Compression techniques significantly reduce the intermediate data volume, which internally reduces the amount of data transfers between mappers and reducers. All this generally occurs over the network. Compression can be applied on the mapper and reducer output individually. Keep in mind that gzip compressed files are not splitable. That means this should be applied with caution. A compressed file size should not be larger than a few hundred megabytes. Otherwise it can potentially lead to an imbalanced job. Other options of compression codec could be snappy, lzo, bzip, etc.

4. Use ORC Table Storage format


ORC(Optimized Row Columnar) is a new table storage format that gives better performance as Row Columniar formats (RCFile, ORC etc.) allows you to reduce the read operations in analytics queries by allowing each column to be accessed individually.

ORC supports both compressed storage (like SNAPPY) and uncompressed storage. ORC format with Snappy compression can offer higher performance.

5. Parallel Execution


Hadoop can execute MapReduce jobs in parallel, and several queries executed on Hive automatically use this parallelism. However, single, complex Hive queries commonly are translated to a number of MapReduce jobs that are executed by default sequencing. Often though, some of a query’s MapReduce stages are not interdependent and could be executed in parallel. They can take advantage of spare capacity on a cluster and improve cluster utilization while at the same time reducing the overall query executions time. The configuration in Hive to change this behavior is merely switching a single flag SET hive.exec.parallel=true.


6.  Vectorization


Vectorization allows Hive to process a batch of rows together instead of processing one row at a time. Each batch consists of a column vector which is usually an array of primitive types. Operations are performed on the entire column vector, which improves the instruction pipelines and cache usage. To enable vectorization, set this configuration parameter SET hive.vectorized.execution.enabled=true.

7. Map Join



Map joins are really efficient if a table on the other side of a join is small enough to fit in the memory. Hive supports a parameter, hive.auto.convert.join, which when it’s set to “true” suggests that Hive try to map join automatically. When using this parameter, be sure the auto convert is enabled in the Hive environment.

8. Tez Execution Engine in Hive

Tez Execution Engine – Hive Optimization Techniques, to increase the Hive performance of our hive query by using our execution engine as Tez. On defining Tez, it is a new application framework built on Hadoop Yarn. That executes complex-directed acyclic graphs of general data processing tasks. However, we can consider it to be a much more flexible and powerful successor to the map-reduce framework.

set hive.execution.engine=tez;

9. Cost-Based Optimization in Hive (CBO)

Cost-Based Optimization in Hive – Hive Optimization Techniques, before submitting for final execution Hive optimizes each Query’s logical and physical execution plan. Although, until now these optimizations are not based on the cost of the query.

However, CBO, performs, further optimizations based on query cost in a recent addition to Hive. That results in potentially different decisions: how to order joins, which type of join to perform, the degree of parallelism and others.

To use CBO, set the following parameters at the beginning of your query:

set hive.cbo.enable=true;

set hive.compute.query.using.stats=true;

set hive.stats.fetch.column.stats=true;

set hive.stats.fetch.partition.stats=true;

Then, prepare the data for CBO by running Hive’s “analyze” command to collect various statistics on the tables for which we want to use CBO.

10. Avoid “SELECT count(DISTINCT field) FROM tbl”

This query looks familiar to SQL users, but this query is very slow because only one reducer is used to process the request.

SELECT count(DISTINCT field) FROM tbl
Rewrite the query to leverage multiple reducers:

SELECT
  count(1)
FROM
(SELECT DISTINCT field FROM tbl) t

11. Limiting the Data ( 

While querying only required data should be loaded into execution engine, in other terms should use WHERE clause wherever it is required. Use Row number or Rank process only latest data, etc.

While Joining a table, instead of joining the whole table write a subquery using a where condition while joining.

11.1 Predicate Push-Down Optimization
The columnar nature of the ORC format helps avoid reading unnecessary columns, but it is still possible to read unnecessary rows. The example in this subsection reads all rows in which the age value is between 0 and 100, even though the query requested rows in which the age value is less than 15 ("...WHERE age < 15"). Such full table scanning is an expensive operation.
ORC avoids this type of overhead by using predicate push-down, with three levels of built-in indexes within each file: file level, stripe level, and row level:
File-level and stripe-level statistics are in the file footer, making it easy to determine if the rest of the file must be read.
Row-level indexes include column statistics for each row group and position, for finding the start of the row group.
ORC uses these indexes to move the filter operation to the data loading phase by reading only data that potentially includes required rows.

12 Optimize Queries Using Partition Pruning

When predicate push-down optimization is not applicable—for example, if all stripes contain records that match the predicate condition—a query with a WHERE clause might need to read the entire data set. This becomes a bottleneck over a large table. Partition pruning is another optimization method; it exploits query semantics to avoid reading large amounts of data unnecessarily.

Partition pruning is possible when data within a table is split across multiple logical partitions. Each partition corresponds to a particular value of a partition column and is stored as a subdirectory within the table root directory on HDFS. Where applicable, only the required partitions (subdirectories) of a table are queried, thereby avoiding unnecessary I/O.

Spark supports saving data in a partitioned layout seamlessly, through the partitionBy method available during data source write operations. To partition the "people" table by the “age” column, you can use the following command:

people.write.format("orc").partitionBy("age").save("peoplePartitioned")
As a result, records are automatically partitioned by the age field and then saved into different directories: for example, peoplePartitioned/age=1/, peoplePartitioned/age=2/, and so on.

13.