Tuesday, February 28, 2017

How to write UDF in Pig

While writing UDF’s using Java, we can create and use the following three types of functions −
  • Filter Functions − The filter functions are used as conditions in filter statements. These functions accept a Pig value as input and return a Boolean value.
  • Eval Functions − The Eval functions are used in FOREACH-GENERATE statements. These functions accept a Pig value as input and return a Pig result.
  • Algebraic Functions − The Algebraic functions act on inner bags in a FOREACHGENERATE statement. These functions are used to perform full MapReduce operations on an inner bag.

Eval/Filter Functions

  • Each UDF must extend the EvalFunc (or) FilterFunc class
  • Provide implementation to exec() function
  • Create a jar file with UDF class
  • Register jar file in pig script
  • Define and use it 

Extend EvalFunc Class and implement exec() function

public class SimpleUDF extends EvalFunc<String>{
   
   public String exec(Tuple input) throws IOException {

      .........
     }
   } 

OR

Extend FilterFunc Class and implement exec() function

public class SimpleUDF extends FilterFunc {

    @Override
    public Boolean exec(Tuple input) throws IOException {
       ....
    }
} 

Register jar file

REGISTER '/$PIG_HOME/simpleUDF.jar'

Define the alias for simpleUDF as shown below.
DEFINE simpleUDF simpleUDF();

Use it in pig

grunt> Upper_case = FOREACH emp_data GENERATE simpleUDF(name);




UDF, UDAF and UDTF in Hive

There are two different interfaces you can use for writing UDFs for Apache Hive. 
  • Simple API - org.apache.hadoop.hive.ql.exec.UDF
  • Complex API - org.apache.hadoop.hive.ql.udf.generic.GenericUDF
How to write UDF (User-Defined Functions) in Hive?
  1. Create Java class for User Defined Function which extends ora.apache.hadoop.hive.sq.exec.UDF 
  2. Implement evaluate() method.
package com.xyz.udf;
import org.apache.hadoop.hive.ql.exec.UDF;

public class ArraySum extends UDF {

  public double evaluate(List<Double> value) {
    double sum = 0;
    for (int i = 0; i < value.size(); i++) {
      if (value.get(i) != null) {
        sum += value.get(i);
      }
    }
    return sum;
  }

}

      3. Package your Java class into JAR file
      4.  ADD your JAR in Hive Shell
ADD JAR  Test_UDF-1.0-SNAPSHOT-jar-with-dependencies.jar;

      5. CREATE TEMPORARY FUNCTION in hive which points to your Java class
CREATE TEMPORARY FUNCTION arraySum AS "com.xyz.udf.ArraySum";

      6. Use it in Hive SQL and have fun!
SELECT arraySum(1.0, 2.0, 3.0) FROM table_name; 


How to write UDAF (User-Defined Aggregation Functions)?
  • Create Java class which extends org.apache.hadoop.hive.ql.exec.hive.UDAF;
  • Create Inner Class which implements UDAFEvaluator
  • Implement five methods ()
    • init() – The init() method initalizes the evaluator and resets its internal state. We are using new Column() in code below to indicate that no values have been aggregated yet.
    • iterate() – this method is called everytime there is anew value to be aggregated. The evaulator should update its internal state with the result of performing the agrregation (we are doing sum – see below). We return true to indicate that input was valid.
    • terminatePartial() – this method is called when Hive wants a result for the partial aggregation. The method must return an object that encapsulates the state of the aggregation.
    • merge() – this method is called when Hive decides to combine one partial aggregation with another.
    • terminate() – this method is called when the final result of the aggregation is needed.
  • Compile and Package JAR
  • ADD JAR <JarName>
  • CREATE TEMPORARY FUNCTION in hive CLI
  • Run Aggregation Query – Verify Output!!!
How to write UDTF (User-Defined Table Functions)?

User defined tabular function (UDTF) works on one row as input and returns multiple rows as output. For example, Hive built in EXPLODE() function. Now lets take an array column USER_IDS as 10,12,5,45 then SELECT EXPLODE(USER_IDS) will give 10,12,5,45 as four different rows in output.
  • Create Java class which extends base Class Generic UDTF
  • Override 3 methods 
    •  initialize()
    •  process() 
    •  close()
  • Package your Java class into JAR file 
  • ADD your JAR
  • CREATE TEMPORARY FUNCTION in hive which points to your Java class
  • Use it in Hive SQL



Monday, February 27, 2017

How to set column headers in Hive

To print column headers along with the output, the following hive conf property should be set to true before executing the query.

set hive.cli.print.header=true;
if you wants this property to be ON permanently, then set this property hive.cli.print.header to true in hive-default.xml or hive-site.xml.

Spark Performance Tuning

Spark Performance Optimization:
1. Use Kryo serialization : Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
spark.serializer=org.apache.spark.serializer.KryoSerializer
2. File Format and compression: Parquet with Snappy compression
The best format for Spark performance is parquet with snappy compression, which is the default in Spark 2.x. Parquet stores data in columnar format, and is highly optimized in Spark. Snappy also gives reasonable compression with high speed. Apache Parquet gives the fastest read performance with Spark. Parquet arranges data in columns, putting related values in close proximity to each other to optimize query performance, minimize I/O, and facilitate compression. Spark 2.x has a vectorized Parquet reader that does decompression and decoding in column batches, providing ~ 10x faster read performance.

When reading CSV and JSON files, you will get better performance by specifying the schema, instead of using inference; specifying the schema reduces errors for data types and is recommended for production code.

Before or when writing a DataFrame, you can use dataframe.coalesce(N) to reduce the number of partitions in a DataFrame, without shuffling, or df.repartition(N) to reorder and either increase or decrease the number of partitions with shuffling data across the network to achieve even load balancing.

3. Broadcast Hash Join:
By default, Spark uses the SortMerge join type. This type of join is best suited for large data sets, but is otherwise computationally expensive because it must first sort the left and right sides of data before merging them. A Broadcast join is best suited for smaller data sets, or where one side of the join is much smaller than the other side. This type of join broadcasts one side to all executors, and so requires more memory for broadcasts in general. You can change the join type in your configuration by setting spark.sql.autoBroadcastJoinThreshold

4. Cost-Based Optimizer (CBO) : CBO is used to improve query plans. This is especially useful for queries with multiple joins. For this to work it is critical to collect table and column statistics and keep them up to date.

5. Adaptive Execution (AE) Engine For Apache Spark SQL : 
   Three main features in adaptive execution
– Auto setting the shuffle partition number
– Optimize join strategy at runtime
– Handle skewed join at runtime

6. Bucketing is similar to partitioning, but partitioning creates a directory for each partition, whereas bucketing distributes data across a fixed number of buckets by a hash on the bucket value. Tables can be bucketed on more than one value and bucketing can be used with or without partitioning. Partitioning should only be used with columns that have a limited number of values; bucketing works well when the number of unique values is large. Columns which are used often in queries and provide high selectivity are good choices for bucketing. Spark tables that are bucketed store metadata about how they are bucketed and sorted, which optimizes:

Data Serialization
  • Java serialization: By default, Spark serializes objects using Java’s ObjectOutputStream framework, and can work with any class you create that implements java.io.Serializable. You can also control the performance of your serialization more closely by extendingjava.io.Externalizable. Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes.
  • Kryo serialization: Spark can also use the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
The only reason Kryo is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application. Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.


Memory Tuning

There are three considerations in tuning memory usage: the amount of memory used by your objects (you may want your entire dataset to fit in memory), the cost of accessing those objects, and the overhead of garbage collection (if you have high turnover in terms of objects).

By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space than the “raw” data inside their fields. This is due to several reasons:
  • Each distinct Java object has an “object header”, which is about 16 bytes and contains information such as a pointer to its class. For an object with very little data in it (say one Int field), this can be bigger than the data.
  • Java Strings have about 40 bytes of overhead over the raw string data (since they store it in an array of Chars and keep extra data such as the length), and store each character as two bytes due to String’s internal usage of UTF-16 encoding. Thus a 10-character string can easily consume 60 bytes.
  • Common collection classes, such as HashMap and LinkedList, use linked data structures, where there is a “wrapper” object for each entry (e.g. Map.Entry). This object not only has a header, but also pointers (typically 8 bytes each) to the next object in the list.
  • Collections of primitive types often store them as “boxed” objects such as java.lang.Integer.

Memory Management Overview:

Memory usage in Spark largely falls under one of two categories: execution and storage. Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations, while storage memory refers to that used for caching and propagating internal data across the cluster. In Spark, execution and storage share a unified region (M). When no execution memory is used, storage can acquire all the available memory and vice versa.

Determining Memory Consumption:

The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it into cache, and look at the “Storage” page in the web UI. The page will tell you how much memory the RDD is occupying.


Serialized RDD Storage:                

When your objects are still too large to efficiently store despite this tuning, a much simpler way to reduce memory usage is to store them in serialized form, using the serialized StorageLevels in the RDD persistence API, such as MEMORY_ONLY_SER. Spark will then store each RDD partition as one large byte array. The only downside of storing data in serialized form is slower access times, due to having to deserialize each object on the fly. We highly recommend using Kryo if you want to cache data in serialized form, as it leads to much smaller sizes than Java serialization (and certainly than raw Java objects).

Garbage Collection Tuning

the cost of garbage collection is proportional to the number of Java objects, so using data structures with fewer objects (e.g. an array of Ints instead of a LinkedList) greatly lowers this cost.

Advanced GC Tuning

To further tune garbage collection, we first need to understand some basic information about memory management in the JVM:
  • Java Heap space is divided in to two regions Young and Old. The Young generation is meant to hold short-lived objects while the Old generation is intended for objects with longer lifetimes.
  • The Young generation is further divided into three regions [Eden, Survivor1, Survivor2].
  • A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old enough or Survivor2 is full, it is moved to Old. Finally when Old is close to full, a full GC is invoked.
The goal of GC tuning in Spark is to ensure that only long-lived RDDs are stored in the Old generation and that the Young generation is sufficiently sized to store short-lived objects. This will help avoid full GCs to collect temporary objects created during task execution. Some steps which may be useful are:
  • Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for before a task completes, it means that there isn’t enough memory available for executing tasks.
  • If there are too many minor collections but not many major GCs, allocating more memory for Eden would help. You can set the size of the Eden to be an over-estimate of how much memory each task will need. If the size of Eden is determined to be E, then you can set the size of the Young generation using the option -Xmn=4/3*E. (The scaling up by 4/3 is to account for space used by survivor regions as well.)
  • In the GC stats that are printed, if the OldGen is close to being full, reduce the amount of memory used for caching by lowering spark.memory.fraction; it is better to cache fewer objects than to slow down task execution. Alternatively, consider decreasing the size of the Young generation. This means lowering -Xmn if you’ve set it as above. If not, try changing the value of the JVM’s NewRatio parameter. Many JVMs default this to 2, meaning that the Old generation occupies 2/3 of the heap. It should be large enough such that this fraction exceeds spark.memory.fraction.
  • Try the G1GC garbage collector with -XX:+UseG1GC. It can improve performance in some situations where garbage collection is a bottleneck. Note that with large executor heap sizes, it may be important to increase the G1 region size with -XX:G1HeapRegionSize
  • As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the size of the block. So if we wish to have 3 or 4 tasks’ worth of working space, and the HDFS block size is 128 MB, we can estimate size of Eden to be 4*3*128MB.
  • Monitor how the frequency and time taken by garbage collection changes with the new settings.

Other Considerations


Level of Parallelism

increase the level of parallelism : You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions documentation), or set the config property spark.default.parallelism to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.


Memory Usage of Reduce Tasks

Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in groupByKey, was too large. Spark’s shuffle operations (sortByKeygroupByKeyreduceByKeyjoin, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller. Spark can efficiently support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has a low task launching cost, so you can safely increase the level of parallelism to more than the number of cores in your clusters.

Broadcasting Large Variables

Using the broadcast functionality available in SparkContext can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program inside of them (e.g. a static lookup table), consider turning it into a broadcast variable. Spark prints the serialized size of each task on the master, so you can look at that to decide whether your tasks are too large; in general tasks larger than about 20 KB are probably worth optimizing.

Data Locality

Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other.

Data locality is how close data is to the code processing it. 

Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same server, or b) immediately start a new task in a farther away place that requires moving data there.
What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see thespark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.

Summary
This has been a short guide to point out the main concerns you should know about when tuning a Spark application – most importantly, data serialization and memory tuning. For most programs, switching to Kryo serialization and persisting data in serialized form will solve most common performance issues

References:

1. https://www.slideshare.net/databricks/an-adaptive-execution-engine-for-apache-spark-with-carson-wang
2. https://issues.apache.org/jira/browse/SPARK-16026

Sunday, February 26, 2017

Apache Storm Overview

Apache Storm is a free and open source distributed realtime computation systemStorm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing.

Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more

Architecture of Apache Storm



Storm Components 

Topology :

In simple words, Topology is a network of spouts and bolts as in above figure. It is analogous to a MR Job in Hadoop. It is a graph of computation consisting of spouts and bolts. Spouts as data stream source tasks and Bolts as actual processing tasks.

Spout :
Spout is the entry point in a storm topology. It is the source of streams in the topology. A spout connects to the actual data source such as a message queue as Kafka , gets continuous data , converts the actual data into stream of tuples, emits them to bolts for actual processing. 

Bolt :Bolt contains the actual processing logic. It works only on streams and can emit streams too for further processing downstream by other bolts or can export/save data for persistent storage. It receives stream from either one or more spouts or some other bolts. Bolts can do anything from run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases, and more. 

Map Reduce Internals


Mapper

Mapper maps input key/value pairs to a set of intermediate key/value pairs.
Maps are the individual tasks that transform input records into intermediate records. 
The Hadoop MapReduce framework spawns one map task for each InputSplit.
The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.
Users can optionally specify a Combiner, to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.
How Many Maps?
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.

Reducer

Reducer reduces a set of intermediate values which share a key to a smaller set of values.
Reducer has 3 primary phases: shuffle, sort and reduce.
Shuffle
Input to the Reducer is the sorted output of the mappers. In this phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.
Sort
The framework groups Reducer inputs by keys (since different mappers may have output the same key) in this stage.
The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.
Secondary Sort
If equivalence rules for grouping the intermediate keys are required to be different from those for grouping keys before reduction, then one may specify a Comparator viaJobConf.setOutputValueGroupingComparator(Class). Since JobConf.setOutputKeyComparatorClass(Class) can be used to control how intermediate keys are grouped, these can be used in conjunction to simulate secondary sort on values.

Partitioner

Partitioner partitions the key space.
Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.
HashPartitioner is the default Partitioner.

Refhttps://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html


Spark Architecture



Driver:
  • Entry point for Spark Shell
  • The place where SparkContext is created
  • Translates RDD into execution graph
  • Splits graph into stages
  • Schedules tasks and controls their execution
Cluster manager
  • An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)
Worker node
  • Any node that can run application code in the cluster
Executor
  • A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
Task
  • A unit of work that will be sent to one executor
Job
  • A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. savecollect); you'll see this term used in the driver's logs.
Stage
  • Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs.

Spark Overview

Apache Spark™ is a general purpose compute engine for large scale data processing.

Spark offers
  • Lazy Computations
  • In Memory data caching
Two main abstraction of Spark
  • RDD (Resilient Distributed Data Set)
    • Collection of data items split into partitions and stored in memory on worker nodes of the cluster.
  • Shared Variables : ships a copy of each variable used in the function to each task
    • broadcast variables : to cache a value in memory on all nodes
    • accumulators variables that are only “added” to, such as counters and sums.
RDD
  • Main and only tool for data manipulation in Spark
  • Two classes of operations
    • Transformations
    • Actions


Driver Program:

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster.

SparkContext 


RDDs (Resilient Distributed Datasets)

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.


Parallelized Collections
Parallelized collections are created by calling JavaSparkContext’s parallelize method on an existing Collection in your driver program.


List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

External Datasets


Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext’s textFile method. 


JavaRDD<String> distFile = sc.textFile("data.txt");

RDD Operations

RDDs support two types of operations: transformations, which create a new dataset from an existing one (example: map ), and actions, which return a value to the driver program after running a computation on the dataset (example: reduce ).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.


Basics
To illustrate RDD basics, consider the simple program below:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.


Transformations

The following table lists some of the common transformations supported by Spark. 

TransformationMeaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacementfractionseed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoinrightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command[envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
Actions
The following table lists some of the common actions supported by Spark. 
ActionMeaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacementnum, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n[ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
RDD Persistence
One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.