My Big Data Journey with Cloudera

In this tutorial series, I will be taking you through an end-to-end solution to big data problems in real life. This will not only help you in understanding the Big Data environment but will also be a guiding force in implementing big data projects at scale. So let's get started …

I. Basics of Big Data and the Apache Hadoop Environment

1. What is Big Data?
Big data is a field of computer science that treats ways to analyze, extract information from, or otherwise, deal with data sets that are too large or complex to be dealt with by traditional data-processing application software.

2. How does a typical Big data architecture look like?

3. What is Apache Hadoop?
Generally speaking, Apache Hadoop is a solution to the Big Data problem. Apache Hadoop is an open-source framework and a collection of open-source software utilities that are used to efficiently store and process large datasets ranging in size from gigabytes to petabytes of data.

4. How is it different from traditional RDBMS data warehouses?
Traditional RDBMS data warehouses use one large computer to store and process data. While Hadoop on the other hand allows a network of many computers to analyze massive amounts of data in parallel.

5. How do we ingest data into Apache Hadoop?

There are two ways of ingesting data into Apache Hadoop:

  1. Structured Data Ingestion using Apache Sqoop

Apache Sqoop is a tool designed for efficiently transferring bulk data between structured relational databases/archives and HDFS (Hadoop Distributed File System)

Fig. 1
Fig. 2

6. How is Sqoop different from transferring data using scripts?
Transferring data using scripts is inefficient and time-consuming. Hence, in Sqoop, the dataset being transferred is sliced up into different partitions and a map-only job is launched with individual mappers responsible for transferring a slice of this dataset.

II. Load data from RDBMS to HDFS using Sqoop:

Here I am using Cloudera Virtual Machine to demonstrate some examples on how to use Sqoop.

[cloudera@quickstart ~]$ mysql -uroot -pcloudera
Fig. 3
mysql > show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| cm |
| firehose |
| hue |
| metastore |
| mysql |
| nav |
| navms |
| oozie |
| retail_db |
| rman |
| sentry |
+--------------------+
12 rows in set (0.00 sec)
mysql > use retail_db;mysql > show tables;
Fig. 4

Now that we have checked our databases and tables on our source systems. Open another terminal and fetch the list of databases using Sqoop.

[cloudera@quickstart ~]$ sqoop list-databases \
> --connect jdbc:mysql://quickstart:3306/ \
> --username root \
> --password cloudera

NB: Oracle database users can use the connect attribute in the following manner:
--connect jdbc:oracle:thin:@OracleServer:OraclePort:OracleSID
--connect jdbc:oracle:thin:@TNSName

Fig. 5. Note: You should use -P instead of --password in command-line.

Now let us fetch the list of tables under the `retail_db` database using the Sqoop command.

[cloudera@quickstart ~]$ sqoop list-tables \
> --connect jdbc:mysql://quickstart:3306/retail_db \
> --username root \
> --password cloudera
Fig. 6 Note: Same List as Fig. 4. You should use -P instead of — password in command-line.

Now let us import a specific table for example customers from the retail database.

[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart:3306/retail_db \
> --username root \
> --password cloudera \
> --table customers
Fig. 7

After we run the command, Sqoop automatically prepares the map-reduce function for us, and before starting the job execution, it checks the number of records in the table and splits it into a default value of 4 which can be modified through a command and can be set to run on different nodes in a cluster.

Fig. 8 The transfer speed will largely depend on the number of nodes on which the job is running. Here I am using a virtual machine which is for demonstration purposes only.

The data that is transferred can be viewed in GUI and console. For the GUI interface, open Apache Hue followed by HDFS to find the same.

Fig. 9

To view the data in the console:

[cloudera@quickstart ~]$ hadoop fs -ls
Fig. 10
Fig. 11

To change the target directory:

[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart:3306/retail_db \
> --username root \
> --password cloudera
> --table customers
> --target-dir cust1
....[cloudera@quickstart ~]$ hadoop fs -ls
Fig. 12

Filtering data and the number of mappings :

[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart:3306/retail_db \
> --username root \
> --password cloudera \
> --table customers \
> --target-dir customerdata \
> --m 5 \
> --where "customer_id>12430"
Fig. 13

To check the actual data you can also you cat command with Hadoop like this:

[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/customerdata/part-*
Fig. 14. Note: As shown in Fig. 13. Splits = 5.

III. Can I query the database directly?

Why not? Do check out the syntax below and the example.

SYNTAX: sqoop import \
--query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' --split-by a.id --target-dir /user/foo/joinresults
EXAMPLE:[cloudera@quickstart ~]$ sqoop import \
> --query 'select a.*, b.* from products as a, categories as b where a.product_category_id=b.category_id AND $CONDITIONS limit 5' \
> -m1 \
> --target-dir customprodcats
> --connect jdbc:mysql://quickstart:3306/retail_db \
> --username root
> --password cloudera
20/10/09 14:58:57 INFO mapreduce.ImportJobBase: Transferred 761 bytes in 70.8253 seconds (10.7447 bytes/sec)
20/10/09 14:58:57 INFO mapreduce.ImportJobBase: Retrieved 5 records.

Note: If you want to use more mappers, remove (-m1), and then you can use the --split-by attribute along with the above to split the data.

IV. Import data to Hive Tables:

But wait what is Hive?

[cloudera@quickstart ~]$ sqoop import-all-tables \
-m 1 \
--connect jdbc:mysql://quickstart:3306/retail_db \
--username=retail_dba \
--password=cloudera \
--compression-codec=snappy \
--as-parquetfile \
--warehouse-dir=/user/hive/warehouse \
--hive-import
[cloudera@quickstart ~]$ hadoop fs -ls /user/hive/warehouse/
[cloudera@quickstart ~]$ hadoop fs -ls /user/hive/warehouse/categories/

V. Incremental Import using Sqoop:

Let's create an emp database and an employee table.

mysql> create database empdb;Query OK, 1 row affected (0.01 sec)mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| cm |
| empdb |
| firehose |
| hue |
| metastore |
| mysql |
| nav |
| navms |
| oozie |
| retail_db |
| rman |
| sentry |
+--------------------+
13 rows in set (0.00 sec)
mysql> CREATE TABLE employees (
-> emp_id int not null,
-> emp_name varchar(240)
-> );
Query OK, 0 rows affected (0.12 sec)mysql> show tables;
+-----------------+
| Tables_in_empdb |
+-----------------+
| employees |
+-----------------+
1 row in set (0.01 sec)
mysql> insert into employees values(1, 'Tutorial Purposes');
Query OK, 1 row affected (0.04 sec)
mysql> insert into employees values(2, 'Tutorial 2 Purposes');
Query OK, 1 row affected (0.03 sec)

Since we do not have a primary key here we will have two options either to split the data on the basis of a column ( --split-by) for our mapper job or to bring it as a unified data using a single mapper (-m1).

[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart:3306/empdb \
> --username root \
> --password cloudera \
> --table employees \
> -m1

Now let us check the data.

[cloudera@quickstart ~]$ hadoop fs -ls
Found 4 items
drwxr-xr-x - cloudera cloudera 0 2020-10-08 16:04 cust1
drwxr-xr-x - cloudera cloudera 0 2020-10-08 16:14 customerdata
drwxr-xr-x - cloudera cloudera 0 2020-10-08 15:35 customers
drwxr-xr-x - cloudera cloudera 0 2020-10-09 04:18 employees

You can also check the number of mappers used to fetch the data.

[cloudera@quickstart ~]$ hadoop fs -ls employees
Found 2 items
-rw-r--r-- 1 cloudera cloudera 0 2020-10-09 04:18 employees/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 41 2020-10-09 04:18 employees/part-m-00000

As you can see the (-m1) attribute helped in having a single mapper and hence a single file was generated.

Now let us say that after two days three more records were added.

mysql> insert into employees values(3, 'Tutorial3 Purposes'), (4, 'Tuts 4'), (5, 'Tuts 5');
Query OK, 3 rows affected (0.04 sec)
Records: 3 Duplicates: 0 Warnings: 0

On the other terminal write the following command to import the latest values:

[cloudera@quickstart ~]$ sqoop import \
> --connect jdbc:mysql://quickstart:3306/empdb \
> --username root \
> --password cloudera \
> --table employees \
> --icremental append \
> --check-column emp_id \
> --last-value 2 \
> -m1
FIg. 15

Not only Sqoop does the incremental load but also provides us the parameters for the next incremental load as shown in the figure.

An alternate table update strategy supported by Sqoop is called lastmodified mode. You should use this when rows of the source table may be updated, and each such update will set the value of a last-modified column to the current timestamp. Rows where the check column holds a timestamp more recent than the timestamp specified with --last-value are imported.” — Sqoop Docs

Let us check the data now:

[cloudera@quickstart ~]$ hadoop fs -ls employees
Found 3 items
-rw-r--r-- 1 cloudera cloudera 0 2020-10-09 04:18 employees/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 41 2020-10-09 04:18 employees/part-m-00000
-rw-r--r-- 1 cloudera cloudera 39 2020-10-09 04:39 employees/part-m-00001
[cloudera@quickstart ~]$ hadoop fs -cat employees/part*
1,Tutorial Purposes
2,Tutorial2 Purposes
3,Tutorial3 Purposes
4,Tuts 4
5,Tuts 5
[cloudera@quickstart ~]$

So our incremental data load is complete. Let us check the import all functionality that is available in Sqoop.

[cloudera@quickstart ~]$ sqoop import-all-tables \
> --connect jdbc:mysql://quickstart:3306/retail_db \
> --username root \
> --password cloudera \
> --warehouse-dir all_retail

VI. Jobs in Sqoop (with incremental import):

[cloudera@quickstart ~]$ sqoop job --create himyjob -- import  --connect jdbc:mysql://quickstart:3306/empdb --username root --password cloudera --table employees --incremental append --check-column emp_id --last-value 0 -m1
20/10/09 15:29:40 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.12.0
20/10/09 15:29:44 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
[cloudera@quickstart ~]$ sqoop job --list
20/10/09 15:31:28 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.12.0
Available jobs:
himyjob

Execute the job:

[cloudera@quickstart ~]$ sqoop job --exec himyjob
20/10/09 15:38:30 INFO tool.ImportTool: Saving incremental import state to the metastore
20/10/09 15:38:31 INFO tool.ImportTool: Updated data for job: himyjob
[cloudera@quickstart ~]$ hadoop fs -ls employees
Found 1 items
-rw-r--r-- 1 cloudera cloudera 80 2020-10-09 15:38 employees/part-m-00000
[cloudera@quickstart ~]$ hadoop fs -cat employees/part*
3,Tutorial3 Purposes
4,Tuts 4
5,Tuts 5
1,Tutorial Purposes
2,Tutorial2 Purposes

Now let's add some more data and execute the same job that we created earlier:

mysql> insert into employees values(6, ‘Tuts 6’), (7, ‘Tuts 7’), (8, ‘Tuts 8’);[cloudera@quickstart ~]$ sqoop job --exec himyjob
...
...
20/10/09 15:45:47 INFO mapreduce.ImportJobBase: Transferred 27 bytes in 69.3158 seconds (0.3895 bytes/sec)
20/10/09 15:45:47 INFO mapreduce.ImportJobBase: Retrieved 3 records.
20/10/09 15:45:47 INFO util.AppendUtils: Appending to directory employees
20/10/09 15:45:47 INFO util.AppendUtils: Using found partition 1
20/10/09 15:45:47 INFO tool.ImportTool: Saving incremental import state to the metastore
20/10/09 15:45:48 INFO tool.ImportTool: Updated data for job: himyjob
[cloudera@quickstart ~]$ hadoop fs -ls employees
Found 2 items
-rw-r--r-- 1 cloudera cloudera 80 2020-10-09 15:38 employees/part-m-00000
-rw-r--r-- 1 cloudera cloudera 27 2020-10-09 15:45 employees/part-m-00001
[cloudera@quickstart ~]$ hadoop fs -cat employees/part*
3,Tutorial3 Purposes
4,Tuts 4
5,Tuts 5
1,Tutorial Purposes
2,Tutorial2 Purposes
6,Tuts 6
7,Tuts 7
8,Tuts 8

As you can see only the new data was added to the existing employees' data in a different partition within the same folder.

Note: These two files can also be merged using the ‘sqoop merge’ tool.

VII. Export data from HDFS to RDBMS using Sqoop

Exporting data from HDFS to RDBMS:

mysql> truncate table employees;
Query OK, 0 rows affected (0.05 sec)
mysql> select * from employees;
Empty set (0.00 sec)

Now let us export the data from the Hadoop file system to our database table

[cloudera@quickstart ~]$ sqoop export \
> --connect jdbc:mysql://quickstart:3306/empdb \
> --username root \
> --password cloudera \
> --table employees \
> --export-dir employees
20/10/09 06:42:42 INFO mapreduce.ExportJobBase: Transferred 764 bytes in 100.6668 seconds (7.5894 bytes/sec)
20/10/09 06:42:42 INFO mapreduce.ExportJobBase: Exported 5 records.

Let's verify the data in the database.

mysql> select * from employees;
+--------+--------------------+
| emp_id | emp_name |
+--------+--------------------+
| 3 | Tutorial3 Purposes |
| ..| ........ |
+--------+--------------------+
8 rows in set (0.00 sec)

To be Continued…

Part II: Apache Kafka / Flume

Part III: Apache Spark / Pig / HiveQL & Apache Impala

Part IV: Apache Oozie to Schedule (Sqoop, Spark/Pig/hive Jobs) Workflow

Part V: Apache Hue Dashboard ( With Customer Support )