Wednesday, October 26, 2016

Spark RDD

Spark Resilient Distributed Datasets (RDD)


Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

Spark makes use of the concept of RDD to achieve faster and efficient MapReduce operations. Let us first discuss how MapReduce operations take place and why they are not so efficient.

Data Sharing is Slow in MapReduce

MapReduce is widely adopted for processing and generating large datasets with a parallel, distributed algorithm on a cluster. It allows users to write parallel computations, using a set of high-level operators, without having to worry about work distribution and fault tolerance.
Unfortunately, in most current frameworks, the only way to reuse data between computations (Ex − between two MapReduce jobs) is to write it to an external stable storage system (Ex − HDFS). Although this framework provides numerous abstractions for accessing a cluster’s computational resources, users still want more.
Both Iterative and Interactive applications require faster data sharing across parallel jobs. Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Regarding storage system, most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.

Iterative Operations on MapReduce

Reuse intermediate results across multiple computations in multi-stage applications. The following illustration explains how the current framework works, while doing the iterative operations on MapReduce. This incurs substantial overheads due to data replication, disk I/O, and serialization, which makes the system slow.
Iterative Operations on MapReduce

Interactive Operations on MapReduce

User runs ad-hoc queries on the same subset of data. Each query will do the disk I/O on the stable storage, which can dominates application execution time.
The following illustration explains how the current framework works while doing the interactive queries on MapReduce.
Interactive Operations on MapReduce

Data Sharing using Spark RDD

Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.
Recognizing this problem, researchers developed a specialized framework called Apache Spark. The key idea of spark is Resilient Distributed Datasets (RDD); it supports in-memory processing computation. This means, it stores the state of memory as an object across the jobs and the object is sharable between those jobs. Data sharing in memory is 10 to 100 times faster than network and Disk.
Let us now try to find out how iterative and interactive operations take place in Spark RDD.

Iterative Operations on Spark RDD

The illustration given below shows the iterative operations on Spark RDD. It will store intermediate results in a distributed memory instead of Stable storage (Disk) which makes the system faster.
Note − If the Distributed memory (RAM) is not sufficient to store intermediate results (State of the JOB), then it will store those results on the disk.
Iterative Operations on Spark RDD

Interactive Operations on Spark RDD

This illustration shows interactive operations on Spark RDD. If different queries are run on the same set of data repeatedly, this particular data can be kept in memory for better execution times.
Interactive Operations on Spark RDD
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory, in which case Spark will keep the elements around on the cluster for much faster access, the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

Apache Spark Introduction

1. Apache Spark™ is a fast and general engine for large-scale data processing.

Apache Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing.

It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.


2. We should look at Spark as an alternative to Hadoop MapReduce rather than a replacement to Hadoop.


Spark Ecosystem

Spark takes MapReduce to the next level with less expensive shuffles in the data processing. With capabilities like in-memory data storage and near real-time processing, the performance can be several times faster than other big data technologies.
Spark also supports lazy evaluation of big data queries, which helps with optimization of the steps in data processing workflows. Spark holds intermediate results in memory rather than writing them to disk which is very useful especially when you need to work on the same dataset multiple times. It’s designed to be an execution engine that works both in-memory and on-disk. 
Other than Spark Core API, there are additional libraries that are part of the Spark ecosystem and provide additional capabilities in Big Data analytics and Machine Learning areas.
These libraries include:
  • Spark SQL:
    • Spark SQL provides the capability to expose the Spark datasets over JDBC API and allow running the SQL like queries on Spark data using traditional BI and visualization tools. Spark SQL allows the users to ETL their data from different formats it’s currently in (like JSON, Parquet, a Database), transform it, and expose it for ad-hoc querying.
  • Spark Streaming:
    • Spark Streaming can be used for processing the real-time streaming data. This is based on micro batch style of computing and processing. It uses the DStream which is basically a series of RDDs, to process the real-time data.
  • Spark MLlib:
    • MLlib is Spark’s scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as underlying optimization primitives.
  • Spark GraphX:
    • GraphX is the new (alpha) Spark API for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing the Resilient Distributed Property Graph: a directed multi-graph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.



Wednesday, October 19, 2016

Introduction to Apache Kafka

Introduction

Kafka is a distributed streaming platform. What exactly does that mean?

We think of a streaming platform as having three key capabilities:
  1. It let's you publish and subscribe to streams of records. In this respect it is similar to a message queue or enterprise messaging system.
  2. It let's you store streams of records in a fault-tolerant way.
  3. It let's you process streams of records as they occur.
What is Kafka good for?
It gets used for two broad classes of application:
  1. Building real-time streaming data pipelines that reliably get data between systems or applications
  2. Building real-time streaming applications that transform or react to the streams of data
To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.
First a few concepts:
  • Kafka is run as a cluster on one or more servers.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.
Kafka has four core APIs:
  • The Producer API allows an application to publish a stream records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to
In Kafka the communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. This protocol is versioned and maintains backwards compatibility with older version. We provide a Java client for Kafka, but clients are available in many languages.

In Kafka:
  • Kafka maintains the messages feeds in categories called Topics
  • Process which publish messages to a Kafka topic is called as Producer
  • Process which consumes messages from a Kafka topic is called as Consumer
  • All the above components are coordinated and managed via Kafka Broker. This broker or cluster could be set of physical servers.

Topics and Logs

Let's first dive into the core abstraction Kafka provides for a stream of records—the topic.
A topic is a category or feed name to which records are published. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.
For each topic, the Kafka cluster maintains a partitioned log that looks like this:

Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.
The Kafka cluster retains all published records—whether or not they have been consumed—using a configurable retention period. For example if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so storing data for a long time is not a problem.

In fact, the only metadata retained on a per-consumer basis is the offset or position of that consumer in the log. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from "now".
This combination of features means that Kafka consumers are very cheap—they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to "tail" the contents of any topic without changing what is consumed by any existing consumers.
The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism—more on that in a bit.

Distribution

The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.
Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.

Producers

Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!

Consumers

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.
If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.
More commonly, however, we have found that topics have a small number of consumer groups, one for each "logical subscriber". Each group is composed of many consumer instances for scalability and fault tolerance. This is nothing more than publish-subscribe semantics where the subscriber is a cluster of consumers instead of a single process.
The way consumption is implemented in Kafka is by dividing up the partitions in the log over the consumer instances so that each instance is the exclusive consumer of a "fair share" of partitions at any point in time. This process of maintaining membership in the group is handled by the Kafka protocol dynamically. If new instances join the group they will take over some partitions from other members of the group; if an instance dies, its partitions will be distributed to the remaining instances.
Kafka only provides a total order over records within a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over records this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.

Guarantees

At a high-level Kafka gives the following guarantees:
  • Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a record M1 is sent by the same producer as a record M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
  • A consumer instance sees records in the order they are stored in the log.
  • For a topic with replication factor N, we will tolerate up to N-1 server failures without losing any records committed to the log.
More details on these guarantees are given in the design section of the documentation.

Kafka as a Messaging System

How does Kafka's notion of streams compare to a traditional enterprise messaging system?
Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. Each of these two models has a strength and a weakness. The strength of queuing is that it allows you to divide up the processing of data over multiple consumer instances, which lets you scale your processing. Unfortunately queues aren't multi-subscriber—once one process reads the data it's gone. Publish-subscribe allows you broadcast data to multiple processes, but has no way of scaling processing since every message goes to every subscriber.
The consumer group concept in Kafka generalizes these two concepts. As with a queue the consumer group allows you to divide up processing over a collection of processes (the members of the consumer group). As with publish-subscribe, Kafka allows you to broadcast messages to multiple consumer groups.
The advantage of Kafka's model is that every topic has both these properties—it can scale processing and is also multi-subscriber—there is no need to choose one or the other.
Kafka has stronger ordering guarantees than a traditional messaging system, too.
A traditional queue retains records in-order on the server, and if multiple consumers consume from the queue then the server hands out records in the order they are stored. However, although the server hands out records in order, the records are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the records is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of "exclusive consumer" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing.
Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances in a consumer group than partitions.

Kafka as a Storage System

Any message queue that allows publishing messages decoupled from consuming them is effectively acting as a storage system for the in-flight messages. What is different about Kafka is that it is a very good storage system.
Data written to Kafka is written to disk and replicated for fault-tolerance. Kafka allows producers to wait on acknowledgement so that a write isn't considered complete until it is fully replicated and guaranteed to persist even if the server written to fails.
The disk structures Kafka uses scale well—Kafka will perform the same whether you have 50 KB or 50 TB of persistent data on the server.
As a result of taking storage seriously and allowing the clients to control their read position, you can think of Kafka as a kind of special purpose distributed filesystem dedicated to high-performance, low-latency commit log storage, replication, and propagation.

Kafka for Stream Processing

It isn't enough to just read, write, and store streams of data, the purpose is to enable real-time processing of streams.
In Kafka a stream processor is anything that takes continual streams of data from input topics, performs some processing on this input, and produces continual streams of data to output topics.
For example a retail application might take in input streams of sales and shipments, and output a stream of reorders and price adjustments computed off this data.
It is possible to do simple processing directly using the producer and consumer APIs. However for more complex transformations Kafka provides a fully integrated Streams API. This allows building applications that do non-trivial processing that compute aggregations off of streams or join streams together.
This facility helps solve the hard problems this type of application faces: handling out-of-order data, reprocessing input as code changes, performing stateful computations, etc.
The streams API builds on the core primitives Kafka provides: it uses the producer and consumer APIs for input, uses Kafka for stateful storage, and uses the same group mechanism for fault tolerance among the stream processor instances.

Putting the Pieces Together

This combination of messaging, storage, and stream processing may seem unusual but it is essential to Kafka's role as a streaming platform.
A distributed file system like HDFS allows storing static files for batch processing. Effectively a system like this allows storing and processing historical data from the past.
A traditional enterprise messaging system allows processing future messages that will arrive after you subscribe. Applications built in this way process future data as it arrives.
Kafka combines both of these capabilities, and the combination is critical both for Kafka usage as a platform for streaming applications as well as for streaming data pipelines.
By combining storage and low-latency subscriptions, streaming applications can treat both past and future data the same way. That is a single application can process historical, stored data but rather than ending when it reaches the last record it can keep processing as future data arrives. This is a generalized notion of stream processing that subsumes batch processing as well as message-driven applications.
Likewise for streaming data pipelines the combination of subscription to real-time events make it possible to use Kafka for very low-latency pipelines; but the ability to store data reliably make it possible to use it for critical data where the delivery of data must be guaranteed or for integration with offline systems that load data only periodically or may go down for extended periods of time for maintenance. The stream processing facilities make it possible to transform data as it arrives.
For more information on the guarantees, apis, and capabilities Kafka provides see the rest of the documentation.

List the various components in Kafka.

The four major components of Kafka are:
  • Topic – a stream of messages belonging to the same type
  • Producer – that can publish messages to a topic
  • Brokers – a set of servers where the publishes messages are stored
  • Consumer – that subscribes to various topics and pulls data from the brokers.

What is the role of the ZooKeeper?

Kafka uses Zookeeper to store offsets of messages consumed for a specific topic and partition by a specific Consumer Group.

Is it possible to use Kafka without ZooKeeper?

No, it is not possible to bypass Zookeeper and connect directly to the Kafka server. If, for some reason, ZooKeeper is down, you cannot service any client request.

What is the process for starting a Kafka server?

Since Kafka uses ZooKeeper, it is essential to initialize the ZooKeeper server, and then fire up the Kafka server.
  • To start the ZooKeeper server: > bin/zookeeper-server-start.sh config/zookeeper.properties
  • Next, to start the Kafka server: > bin/kafka-server-start.sh config/server.properties
How to create a topic?
Create a topic named “test” with a single partition and only one replica:
> bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
How Kafka depends on Zookeeper?
Zookeeper is an open source, high-performance co-ordination service used for distributed applications adapted by Kafka. No, it is not possible to bye-pass Zookeeper and connect straight to the Kafka broker. Once the Zookeeper is down, it cannot serve client request. 
Zookeeper is basically used to communicate between different nodes in a cluster in Kafka, it is used to commit offset, so if node fails in any case it can be retrieved from the previously committed offset.
The brokers depend on Zookeeper for:
1.Server failure detection.
2.Data partitioning.
3.In-sync data replication.
4.Consumer membership management.

Kafka Vs Flume

1Kafka is a publish-subscribe model messaging system, which offers strong durability, scalabitity and fault-tolerance support.Flume is a distributed, reliable, and available system for efficiently collecting, aggregating, and moving large amounts of data from many different sources to a centralized data store, such as HDFS
2Kafka Provides a back pressure to prevent overflowing a brokerFlume/Flume NG doesn’t provide any such a functionality
3With Kafka you pull data, so each consumer has and manages it's own read pointer. This allows a large number of consumers of each Kafka queue, that pull data at their own pace. With this, you could deliver your event streams to HBase, Cassandra, Storm, Hadoop, RDBMS all in parallel.To get data out of Flume, you use a sink, which writes to your target store (HDFS, HBase, Cassandra etc). Flume will re-try connections to your sinks if they are offline. Because Flume pushes data, you have to do some interesting work to sink data to two data stores
5With Kafka 0.8+ you get replication of your event data. If you lose a broker node, others will take up the slack to delivery your events without loss.With Flume & FlumeNG, and a File channel, if you loose a broker node you will lose access to those events until you recover that disk. The database channel with Flume is reported too slow for any production use cases at volume.
6Kafka just provides messagingFlume Provides number of pre built collectors
7Kafka’s main use-case is a distributed publish-subscribe messaging system. Most of the development effort is involved with allowing subscribers to read exactly the messages they are interested in, and in making sure the distributed system is scalable and reliable under many different conditions. It was not written to stream data specifically for Hadoop, and using it to read and write data to Hadoop is significantly more challenging than it is in Flume.Flume’s main use-case is to ingest data into Hadoop. It is tightly integrated with Hadoop’s monitoring system, file system, file formats, and utilities such a Morphlines. A lot of the Flume development effort goes into maintaining compatibility with Hadoop. Sure, Flume’s design of sources, sinks and channels mean that it can be used to move data between other systems flexibly, but the important feature is its Hadoop integration.
8Use Kafka if you need a highly reliable and scalable enterprise messaging system to connect many multiple systems, one of which is HadoopUse Flume if you have an non-relational data sources such as log files that you want to stream into Hadoop.

Tuesday, October 18, 2016

Import Data from RDBMS to HDFS using Sqoop

How to Import Data From Oracle database to HDFS using Sqoop.

Install Oracle JDBC Driver

You can download the JDBC Driver from the Oracle website, for example http://www.oracle.com/technetwork/database/enterprise-edition/jdbc-112010-090769.html. You must accept the license agreement before you can download the driver. Download theojdbc6.jar file and copy it to sqoop lib directory:
$ sudo cp ojdbc6.jar sqoop/lib
$ sudo cp ojdbc6.jar sqoop/lib
$ chmod -R 755 sqoop/lib

Import Data from Oracle to HDFS

Use below command to import the data

$ sqoop import -connect jdbc:oracle:thin:apps/apps@10.20.66.6:1521:vis1211 -username apps -P -table ap_invoices_all -columns "INVOICE_ID,VENDOR_ID" -target-dir /apinvoices -m 1

Required items for above command:
IPv4 Address – 10.20.66.6
Database Name – apps
Table Name – ap_invoices_allinvoices_all
Username – apps
Password – apps
Output Directory – Could be any. I have used apinvoices
Below commands may help you to identify if there is any issue

$ ping 10.20.66.6
$ nc -z 10.20.66.6 1521
Refer Sqoop user guide https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html for more details

Verify the Data on HDFS


Open the Browser and go to the URL: http://localhost:50070/dfshealth.jsp
Click on Browse the filesystem
Click on apinvoices directory
Click on part-m-00000 file to see the data.


Sqoop list commands


Run the commands on the Unix prompt, on the node where you have sqoop installed.


1. List databases

Lists databases in your mysql database.

$ sqoop list-databases --connect jdbc:mysql://<<mysql-server>>/employees --username vchennar --password myPassword
.
.
.
INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
information_schema
employees
test

2. List tables

Lists tables in your mysql database.

$ sqoop list-tables --connect jdbc:mysql://<<mysql-server>>/employees --username vchennar --password myPassword
.
.
.
INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
departments
dept_emp
dept_manager
employees
employees_exp_stg
employees_export
salaries
titles


Importing data in MySql into HDFS 


Replace "vchennar-mySqlServer-node" with the host name of the node running mySQL server, replace login credentials and target directory.

1. Importing a table into HDFS - basic import


$ sqoop import \
--connect jdbc:mysql://vchennar-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--table employees \
-m 1 \
--target-dir /user/vchennar/sqoop-mysql/employees
.
.
.
.9139 KB/sec)
INFO mapreduce.ImportJobBase: Retrieved 300024 records

2. Executing imports with an options file for static information


Rather than repeat the import command along with connection related input required, each time, you can pass an options file as an argument to sqoop.  Create a text file, as follows, and save it someplace, locally on the node you are running the sqoop client on.  

Note: This blog does not cover it, but you can create and use a password file as well, that you can pass as argument --password-file <<filename>>.

2.1. Sample Options file:
___________________________________________________________________________
$ vi SqoopImportOptions.txt 
#
#Options file for sqoop import
#

import
--connect
jdbc:mysql://vchennar-mySqlServer-node/employees
--username
myUID
--password
myPwd

#
#All other commands should be specified in the command line
___________________________________________________________________________

2.2. Command to execute import, using an options file:


Note: Provide the proper path for the options file

2.2.1. The command

$ sqoop --options-file SqoopImportOptions.txt \
--table departments \
-m 1 \
--target-dir /user/vchennar/sqoop-mysql/departments
.
.
.
INFO mapreduce.ImportJobBase: Transferred 153 bytes in 26.2453 seconds (5.8296 bytes/sec)
INFO mapreduce.ImportJobBase: Retrieved 9 records.


-m argument is to specify number of mappers.  The department table has a handful of records, so I am setting it to 1.

2.2.2 Data file contents:

$ hadoop fs -cat sqoop-mysql/departments/part-m-00000 | more

d009,Customer Service
d005,Development
d002,Finance
d003,Human Resources
d001,Marketing
d004,Production
d006,Quality Management
d008,Research
d007,Sales


Import all columns, filter rows using where clause


 $ sqoop --options-file SqoopImportOptions.txt \
--table employees  \
--where "emp_no > 499948\
--as-textfile \
-m 1 \
--target-dir /user/vchennar/sqoop-mysql/employeeGtTest

 Import with a free form query without where clause


 $ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where  $CONDITIONS' \
-m 1 \
--target-dir /user/vchennar/sqoop-mysql/employeeFrfrmQrySmpl2

(Case of the column needs to match that used to create table, or else the import fails)

 Import with a free form query with where clause


$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where EMP_NO < 20000 AND $CONDITIONS' \
-m 1 \
--target-dir /user/vchennar/sqoop-mysql/employeeFrfrmQry1

$ sqoop import --connect <connect> --username <username> --password <password> --query "select * from table WHERE rownum<10 OR \$CONDITIONS" --target-dir=/location -m 1 --verbose 

Split by


$ sqoop --options-file SqoopImportOptions.txt \
--query 'select EMP_NO,FIRST_NAME,LAST_NAME from employees where $CONDITIONS' \
--split-by EMP_NO \
--direct \
--target-dir /user/vchennar/sqoop-mysql/SplitByExampleImport


Import all tables


Command:
$ sqoop --options-file SqoopImportAllTablesOptions.txt \
--direct \
--warehouse-dir sqoop-mysql/EmployeeDatabase

Options file content:
$ more SqoopImportAllTablesOptions.txt
______________________________________________

#
#Options file for sqoop import
#

import-all-tables
--connect
jdbc:mysql://vchennar-mySqlServer-node/employees
--username
myUID
--password
myPWD

#
#All other commands should be specified in the command line
______________________________________________

Direct and quick queries or inserts and updates with Sqoop eval



The eval tool allows users to quickly run simple SQL queries against a database; results are printed to the console. This allows users to preview their import queries to ensure they import the data they expect.


1. Query:

$ sqoop eval --connect jdbc:mysql://vchennar-mySqlServer-node/employees \
--username myUID \
--password myPWD \
--query "select * from employees limit 2"

---------------------------------------------------------------------------------
| emp_no      | birth_date | first_name     | last_name        | gender | hire_date  | 
---------------------------------------------------------------------------------
| 10001       | 1953-09-02 | Georgi         | Facello          | M | 1986-06-26 | 
| 10002       | 1964-06-02 | Bezalel        | Simmel           | F | 1985-11-21 | 

2. Insert:

sqoop eval --connect jdbc:mysql://vchennar-mySqlServer-node/employees \
--username myUID \
--password myPWD \
-e "insert into employees_export values(550000,'1977-08-08','Mouse','Mickey','M','1999-04-12')"