Big Data with Hadoop Ecosystem

Add New Chapter

Tutorial

Motivation

When we think of traditional database system data resides on disk and is loaded into memory when needed. We do some processing in CPU and then data is preseneted to user for analysis.

motivation_hadoop__50.png

The problem with above picture is resources are limited and it is very expensive to load data from disk and in the case of big data it is very unlikely that all the data fits into memory at once.

Let’s take example of google.

Google

  • ~40 Billion Web Pages * 30KB each = Petabyte
  • Today’s average disk speed reads about 120MB/sec
  • Little over 3 months to read the web
  • Approximately 1000 drives to store and use

Note: So, the problem is we can’t scale up enough to meet the demands. Then what’s the solution???

Distributed Computing

We need to scale out with distributed computing by connecting many servers to perform the job.

Challenges

  • Volume, Velocity and Variety
  • With more resources things are bound to fail much more frequently.
  • Ability to recover from failures in scalable manner.
  • Shared nothing architecture: To scale out efficiently different components of system should not be burdend with tasks like coordination and communication with other nodes. Infact each node doesn’t know anything about any other node in the cluster about what data they have and on what they are working on.
  • Master server: The coordintation tasks are managed by some master nodes. In hadoop these masters are called Name Node and Job Tracker and this allows to easily add more nodes to the system.

distributed_system__50.png

These challenges are solved by Hadoop using its 2 core components:

  • HDFS (Hadoop Distributed File System)
  • MapReduce.

Hadoop File System (HDFS)

  • It is a distributed file system and deals with files hence it is not a database.
  • When a file is set to be saved on HDFS, hadoop does 2 main things:
  • It splits the files into chunks or blocks typically 64MB-128MB each placing each block of data on a different data nodes.
  • Then it replicates each block to 3 nodes by default.
Replication Factor = 3 (default)

HDFS__50.png

MapReduce

  • A typical map reduce job runs thorugh 3 phases:

3_phase_map_reduce__50.png

Map Phase

  • In MapReduce everything is handled in terms of key-values pairs.
  • In MapReduce job can use one or more mappers depending on the number of blocks file is spread across.
  • In most cases a mapper is assigned to each block.

mapper__50.png

  • Mappers take input elements as key and value and process them one at a time, the file input format determines what those keys and values are.
  • Reducers receive a list of key value pairs as output from mappers and aggregate the results together.

mapper_reducer__50.png

  • Each mapper generates an arbitrary set of key value pairs depending on the mapper logic and block of data each mapper is working with.
  • So, the number of key value pairs and their actual content can be different for each mapper.
  • Upto this point output is in no particular order.

Shuffle and Sort Phase

  • The shuffle and sort phase has the job of sorting the output of each mapper based on the key values and distributing in sorted order b/w reducers.
  • By default, all the values for a given key is guaranteed to be sent to same reducer for example all the values with red square key go to Reducer-B and so on further.
  • Number of reducer nodes is not relevant or dependent on number of mappers.
  • The number of mappers is usually associated with number of blocks of data required to read the input with few exceptions.
  • The number of reducers depends on the job configuration and problem being solved.
  • The shuffle part is responsible for the distribution of keys among reducers.
  • Programmatically there is function called partitioner does this step.
  • The default partitioner uses the hash value of the keys and splits keys into buckets according to number of reducers.
  • Some keys mays be more frequent than others and then some reducers get more data and has to do more work. There are few ways to solve this problem, one of which is writing custom partitioner.

Reduce Phase

  • Reducer work on the data received is usually performing some sort of aggregate.
  • Each reducer results in its own file representing output of that reducer into a directory in HDFS.

MapReduce__50.png

Word Count Example

Counting the number of occurrences of words.

Map and Sort Shuffle Phase

word_count_map_shuffle__50.png

Reduce Phase

word_count_reduce__50.png

Reducer A:

first  1
second 1
the    2
This   2

Reducer B:

is     2
line   2

Note: Map task is responsible for making sense out of input data and reading the data in parallel and the reducer performs some sort of aggregate against the data.

Environment Setup and Basic Hadoop Commands

Basic Hadoop Commands

  • fs: The fs command give access to the HDFS shell command for working with files.
$ hadoop fs -ls /

Above command list all the hadoop files.

Note: It’s very important to keep in mind that when using hadoop fs, you’re browsing the Hadoop File System on the cluster, and not your own system. So whatever files and directory structure you see when using Hadoop fs, is a logical view of how your data is stored on the Hadoop cluster, and isn’t just local to you. Anyone looking at the same cluster will see the same files and directories you do

  • Now just create a directory on local system to work with HDFS.
$ mkdir /home/cloudera/bigdata
$ cd /home/cloudera/bigdata

Here cloudera is the username of the computer which comes by default with cloudera installation.

  • Now we need to download a dataset to work upon, this dataset is a dataset for movies, and movie reviews, made available from the GroupLens Lab at the Computer Science and Engineering Department at the University of Minnesota.
$ wget http://www.grouplens.org/system/files/ml-100k.zip
  • Let’s extract the data, and then switch over, and create a similar bigdata folder on HDFS. Remember, the fs command in Hadoop is used for file system operations, and similar to Linux, we can create a directory using mkdir. Let’s go ahead and create a directory for movies.We’ll also create one for userinfo.
$ unzip ml-100k.zip
$ hadoop fs -mkdir /bigdata
$ hadoop fs -mkdir /bigdata/movies
$ hadoop fs -mkdir /bigdata/userinfo
  • Hadoop supports two file system commands for getting data in to and out of HDFS. The fs put command is used to get data into HDFS, and the fs get command, which we can use to get data out of HDFS. Now let’s put the u.item data from the dataset we extracted on our local file system, into the movies directory on HDFS. Similarly, let’s put the u.info file into the userinfo directory on HDFS.
$ cd ml-100k
$ hadoop fs -put u.item /bigdata/movies
$ hadoop fs -put u.info /bigdata/userinfo
  • There are also copy and move commands, similar to Linux. For example, let’s copy the u.item file from the movies directory to the bigdata directory. We can verify the operation, and see the file using hadoop fs -ls.
$ hadoop fs -cp /bigdata/movies/u.item /bigdata/
$ hadoop fs -ls /bigdata
  • Now let’s remove this copied file using rm command similar to linux.
$ hadoop fs -rm /bigdata/u.item
  • Now let’s create another directory under bigdata called test.
$ hadoop fs -mkdir /bigdata/test
  • An additional feature of the copy command is that it lets you specify more than one source file, and the destination, in this case, must be a directory.
$ hadoop fs -cp /bigdata/movies/u.item /bigdata/userinfo/u.info /bigdata/test
$ hadoop fs -ls /bigdata/test
  • Finally, we’ll use the recursive remove, or rmr command, to delete the test directory and its contents. Depending on our configuration, Hadoop can have a trash, or recycle bin. This is to keep files safe and recoverable from accidental deletion. For both rm, and rmr, we can skip going to trash by specifying the -skipTrash option.
$ hadoop fs -rmr -skipTrash /bigdata/test
  • The final command we’re going to examine is the cat command, which copies source paths to standard output. As we’re using Linux, you can also pipe the results of the hadoop fs cat command to anything else.
$ hadoop fs -cat /bigdata/movies/*
$ hadoop fs -cat /bigdata/movies/* | less

For a complete list of the Hadoop File System commands and options, go to hadoop.apache.org and look up the File System Shell Guide.

We can also go to Hue and te open file manager and do all these operations from Hue user interface. Hue becomes very useful when performing simple tasks like this, instead of going to Bash and running Shell commands.

Summary

  • We looked at the kind of problems Big Data poses today, and the solutions to tackle those problems, by scaling out, and building a framework that tolerates failures, and reverses the traditional processing cycle by taking the processing to the data instead of the data to where it needs to be processed.

  • This, along with the framework that can distribute work across multiple nodes, makes tackling these problems possible. In fact, since failures are bound to happen, and the framework needs to account for that, that’s what makes Hadoop so economically enticing, because we don’t need to get the best, and most expensive hardware in the world. You can rely on commodity servers, and keep adding when needed to scale to your demand.

  • We also examined MapReduce in detail, and looked at how the Map phase works on separate blocks of a file in parallel, how everything is handled in terms of key-value pairs, and how the Shuffle and Sort, and the number of reducers play an important role in the algorithm being solved.

  • Finally, we set up the environment and walked through some of Hadoop’s File System commands, placing some data onto HDFS from our local file system.