Thursday, May 8, 2014

Map-Reduce: Joins


There are 2 kinds of joins in mapreduce - map side join and the reduce side join.

  1. Map-side join - This join happens before the input reaches the map phase. It is suited for 2 scenarios:
    1. One of the inputs is small enough to be fit in memory - Consider the example of some kind of a metadata which needs to be associated with a much larger number of records. In this particular case, the smaller input could be replicated across all the tasktracker nodes in memory and a join could be performed as the bigger input is being read by the mapper.
    2. Both the inputs are sorted and partitioned into equal sizes with the guarantee that records belonging to a key fall in the same partition - Consider the example of outputs coming out of multiple reducer jobs which had equal number of reducers and the same keys emitted. In this case, an index could be built from one of the inputs (key, filename, offset) and it could be looked up as the other input is read.

  2. Reduce-side join - This join happens at the reducer phase. It places no restrictions on the size of the input, the only disadvantage being that all the data/records (from both the inputs) have to go through the shuffle and sort phase. It works as following : The map phase tags the records with an identifier to distinguish the sources and the parsing logic at the reducer. Records pertaining to the same key reach the same reducer and the reducer takes care of joining, taking care of the fact that records from different source tags need to be parsed and dealt with differently.

Tuesday, November 27, 2012

Map-Reduce: Shuffle and Sort


The map phase guarantees that the input to the reducer will be sorted on its key. The process by which output of the mapper is sorted and transferred across to the reducers is known as the shuffle.

The following figure (taken from Hadoop-The Definitive Guide) illustrates the shuffle and sort phase:

Buffering Map Writes

The mapper does not directly write to the disk rather it takes advantage of buffering the writes. Each mapper has a circular memory buffer with a default size of 100MB which can be tweaked by changing the io.sort.mb property. It does the flush in a very smart manner. When the buffer is filled up to a certain threshold (default of 80%- can be changed by tweaking the io.sort.spill.percent property) , a separate thread gets triggered which spills the content in the buffer to the disk.

Role of the Partitioner & Combiner

Before the spill happens to the disk, the thread (which is entrusted with the task of performing the spill) partitions the data according to the reducers it needs to go to and a background thread performs an in-memory sort within the partition based on the key. If a combiner is present, it consumes the output of the in-memory sort. There maybe several spill files which get generated as a part of the above process, hence at the end of the map phase an on-disk merge is performed to form bigger partitions (bigger in size and less in number - number depends on the number of reducers) and the sorting order is taken care of during the merge process. 

Copy phase to the Reducer

Now, the output of the several map tasks is sitting on different nodes and it needs to get copied over to the node on which the reducer is going to run in order to consume the output of the map tasks. If the data from the map tasks is able to fit inside the reducer's tasktracker memory, then an in-memory merge  is performed of the sorted map output files coming from different nodes. As soon as a threshold is reached, the merged output is written onto the disk and the process repeated till all the map tasks have been accounted for this reducer's partition. Then, an on-disk merge is performed in groups of files and a final group of files is directly feeded into the reducer performing an in-memory merge while feeding (thus saving an extra trip to the disk).

Final Step :  The reduce phase

From the final merge (which was a mixture of an in-memory and on-disk merge) , the data is fed to the reduce phase which may optionally perform some further processing and finally the data is written to HDFS.

Thursday, November 15, 2012

Hive - A SQL like Database over Hadoop


Developed originally at Facebook, Hive adds structure to the data stored on HDFS. The schema of tables is stores in a separate metadata store. It converts SQL like semantics to multiple map reduce jobs running on HDFS in the backend.


Traditional databases follow the schema on write policy where once a schema is designed for a table, at the time of writing data itself, it is checked whether the data to be written conforms to the pre-defined schema. If it does not, the write is rejected.

In case of Hive, it is the opposite. It uses the schema on read policy. Both the policies have their own individual trade-offs:

In case of schema on write, load time is more and loads are slower because schema conformance is verified at the time of loading data. However, it provides faster query time because it can index data based on pre-defined columns in the schema.

However there may be cases where the indexing cannot be specified while populating the data initially
and this is where schema on read comes in handy. It provides the option to have 2 different schema present on the same underlying data depending on the kind of analysis required.

What is Hive suited for?

Hive is well suited for bulk access, updates of data as a new update requires a completely new table to be constructed. Also, query time is slower as compared to traditional databases because of the absence of indexing.

Hive Internals

  • Hive stores the metadata into a relational database called the "Metastore".
  • There are 2 kinds of tables in Hive:
    • Managed tables - Where the data file for the table is predefined and is moved to the Hive warehouse directory on HDFS (in general, or any other Hadoop filesystem). When a table is deleted, in that case, the metadata and the data both are deleted from the filesystem.
    • External tables - Here you can create data into the table lazily. There is no data moved to the Hive warehouse directory in this case and the schema/metadata is loosely coupled to the actual data. When a table is deleted, only the metadata gets deleted and the actual data is left untouched. It becomes helpful in cases if you want the data to be used by multiple databases. Another reason of using the same maybe when you need multiple schemas on the same underlying data.
  • There is a provision to partition and sub-partition data in Hive based on a certain field. The advantage of doing this is that the data is grouped by that field and kept in one file/location for faster access. Hive performs input pruning so that only the relevant files are scanned when a SELECT query is issued.
  • There is also the concept of buckets in Hive where it adds more structure to the data and faster retrieval times by effecting map side joins where if two tables are bucketed on the same columns, then only the relevant columns are picked from the second table as an input to the map.
  • The partitions and buckets are effected through directory-file structuring like for example, one directory per partition and one file in a directory per bucket in the corresponding partition.

Monday, November 5, 2012

Hadoop - Namenode, DataNode, Job Tracker and TaskTracker


Namenode is the node which stores the filesystem metadata i.e. which file maps to what block locations and which blocks are stored on which datanode.

The namenode maintains two in-memory tables, one which maps the blocks to datanodes (one block maps to 3 datanodes for a replication value of 3) and a datanode to block number mapping.
Whenever a datanode reports a disk corruption of a particular block, the first table gets updated and whenever a datanode is detected to be dead (because of a node/network failure) both the tables get updated.

Failover semantics: The secondary namenode regularly connects to the primary namenode and keeps snapshotting the filesystem metadata into local/remote storage.


The data node is where the actual data resides.

Some interesting traits of the same are as follows:
  • All datanodes send a heartbeat message to the namenode every 3 seconds to say that they are alive. If the namenode does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead/out of service and initiates replication of blocks which were hosted on that data node to be hosted on some other data node.
  • The data nodes can talk to each other to rebalance data, move and copy data around and keep the replication high.
  • When the datanode stores a block of information, it maintains a checksum for it as well. The data nodes update the namenode with the block information periodically and before updating verify the checksums. If the checksum is incorrect for a particular block i.e. there is a disk level corruption for that block, it skips that block while reporting the block information to the namenode. In this way, namenode is aware of the disk level corruption on that datanode and takes steps accordingly.

Job Tracker and TaskTracker

  • The primary function of the job tracker is resource management (managing the task trackers), tracking resource availability and task life cycle management (tracking its progress, fault tolerance etc.)
  • The task tracker has a simple function of following the orders of the job tracker and updating the job tracker with its progress status periodically.
  • The task tracker is pre-configured with a number of slots indicating the number of tasks it can accept. When the job tracker tries to schedule a task, it looks for an empty slot in the tasktracker running on the same server which hosts the datanode where the data for that task resides. If not found, it looks for the machine in the same rack. There is no consideration of system load during this allocation.
  • HDFS is rack aware in the sense that the namenode and the job tracker obtain a list of rack ids corresponding to each of the slave nodes (data nodes) and creates a mapping between the IP address and the rack id. HDFS uses this knowledge to replicate data across different racks so that data is not lost in the event of a complete rack power outage or switch failure.
  • Job Performance - Hadoop does speculative execution where if a machine is slow in the cluster and the map/reduce tasks running on this machine are holding on to the entire map/reduce phase, then it runs redundant jobs on other machines to process the same task, and whichever task gets completed first reports back to the job tracker and results from the same are carried forward into the next phase.
  • Fault Tolerance - 
    • The task tracker spawns different JVM processes to ensure that process failures do not bring down the task tracker.
    • The task tracker keeps sending heartbeat messages to the job tracker to say that it is alive and to keep it updated with the number of empty slots available for running more tasks.
    • From version 0.21 of Hadoop, the job tracker does some checkpointing of its work in the filesystem. Whenever, it starts up it checks what was it upto till the last CP and resumes any incomplete jobs. Earlier, if the job tracker went down, all the active job information used to get lost.
  • The status and information about the job tracker and the task tracker are exposed vis jetty onto a web interface.

YARN - Next Generation Hadoop 

In Yarn, the job tracker is split into two different daemons called Resource Manager and Node Manager (node specific). The resource manager only manages the allocation of resources to the different jobs apart from comprising a scheduler which just takes care of the scheduling jobs without worrying about any monitoring or status updates. Different resources such as memory, cpu time, network bandwidth etc. are put into one unit called the Resource Container. There are different AppMasters running on different nodes which talk to a number of these resource containers and accordingly update the Node Manager with the monitoring/status details.

Hadoop Distributed File System (HDFS)

Main Idea

HDFS is built around the idea that the most efficient approach to storing data for processing is to optimize it for write once, and read many approach.

Hadoop is built to be run on commodity hardware which is available from multiple vendors and does not require highly reliable hardware (that is why it has built in reliable HA semantics).

Where does HDFS fail?

  • Low-latency access to data- Focus is on throughput rather than access. Hbase is a better choice for this.
  • Cannot support large number of small files as the filesystem metadata increases with every new file, and hence it is not able to scale to billions of files. This filesystem metadata is loaded into memory and since memory is limited, so is the number of files supported.
  • Modification of files is not supported. Also, multiple writers to a file is not allowed.

Block Abstraction

  • HDFS block size is of 64MB ( very large as compared to other filesystems). Also, unlike other filesystems, a file smaller than the block size does not occupy the complete block size’s worth of memory. The block size is kept so large so that less time is made doing disk seeks as compared to the data transfer rate.
  • Why block abstraction:
    • Files can be bigger than individual disks
    • Filesystem metadata does not need to be associated with each and every block.
    • Simplifies storage management - Easy to figure out the number of blocks which can be stored on each disk.
    • Fault tolerance and storage replication can be easily done on a per-block basis (storage/HA policies can be run on individual blocks).

  • The namenode does not store the block locations persistently. It is reconstructed from the data nodes as the system starts.

Fault Tolerance

HDFS is rack aware in the sense that the namenode and the job tracker obtain a list of rack ids corresponding to each of the slave nodes (data nodes) and creates a mapping between the IP address and the rack id. HDFS uses this knowledge to replicate data across different racks so that data is not lost in the event of a complete rack power outage or switch failure.

The HDFS replication has a default value of 3 where data is replicated on 2 nodes on the same rack and 1 on a different rack.


  • When a client issues a read request on a particular file, the request goes to the namenode requesting information about the data nodes on which blocks corresponding to the given file are hosted in increasing order of distance from the requesting node.
  • When  a client issues a write request, the filesystem metadata is written onto the namenode and data written to a datanode (on the requesting node itself if there is a datanode hosted on the same machine) and simultaneously this data node keeps transferring the data to another data node for replication through a pipeline. This process gets repeated for a third level of replication as well (default replication value is 3).


There are 2 ways of backing up the filesystem metadata which maps different filenames with their data stored as different blocks on various data nodes:
  • Writing the filesystem metadata persistently onto a local disk as well as on a remote NFS mount.
  • Running a secondary namenode.

HDFS Comic

Here is a comic describing the various attributes/functions of Hadoop/HDFS:

Disclaimer : This has been taken from another resource.


The following text is based on Google’s publication : “MapReduce: Simplified Data Processing on Large Clusters” - Jeffrey Dean and Sanjay Ghemawat


MapReduce is a programming model and an associated implementation for processing and generating large data sets.
Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.

The implementation hides the messy details of parallelization,fault-tolerance, data distribution and load balancing in a library. It uses re-execution as the primary mechanism for fault tolerance.

Some Applications

  • Distributed Grep : The map function emits a line if it matches a supplied pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output.
  • URL Access Frequency : The map function processes logs of web page requests and outputs (URL,1). The reduce function adds together all values for the same URL and emits a (URL,total count) pair.
  • Reverse Web Link Graph : For determining page rank of a URL/webpage often it is required to count the number of URLs directing to it. The map function looks at the crawled web pages and makes a mapping (target URL, source URL) for each of the URLs it encounters. The reduce function merges the list of source URLs for a given target URL and hence you have all the source URLs pointing to a given URL.
  • Inverted Index : Sometimes, it is desired to track word positions in a group of documents. The map function parses each document and emits a sequence of (word, document ID) pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a (word, list(document ID)) pair.

Execution Steps

  1. Splitting Input Data : The MapReduce library in the user program first splits the input files into M pieces of typically 16MB to 64MB. It then starts up many copies of the program on a cluster of machines.
  2. Worker processes : One of the copies of the program is special - the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task and a reduce task.
  3. Mapper phase :   A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.
  4. Partitioning phase : Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.
  5. Reduce side reading : When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used.
  6. Reducer phase : The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate value to the user’s Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.
  7. Relinquish control to user program : When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code.

Master Data Structures

  • It stores the state (idle, in-progress, or completed) of the map and reduce tasks, and the identity of the worker machine (for non-idle tasks).
  • It acts as a pipeline for passing the location of intermediate file regions from map tasks to the reduce tasks. Stores the locations and sizes of the R intermediate file regions produced by the map task.

Fault Tolerance

The master pings every worker periodically. When a worker does not send a response within a stipulated amount of time , the master marks that worker as dead. If the job being executed by that worker was in either in-progress or completed, the job is given to another
worker process. The reason behind spawning completed jobs again is that if a worker has become dead, then it may well be the case that the machine hosting that worker is not accessible by the master and since the mapper outputs are stored locally instead of the HDFS, those would have become inaccessible.

Advantages of using Map-Reduce:

  • Works well with unstructured and semi-structured data
  • It takes away the developer/engineer's headache of managing jobs which are spread across on a cluster of nodes.

How is Map-Reduce related to HDFS block size- The data locality principle

Hadoop tries its best to run map tasks on nodes where the data is present locally to optimize on the network and inter-node communication latency. As the input data is split into pieces and fed to different map tasks, it is desirable to have all the data fed to that map task available on a single node.Since HDFS only guarantees data having size equal to its block size (64M) to be present on one node, it is advised/advocated to have the split size equal to the HDFS block size so that the map task can take advantage of this data localization.

Path and storage of data across map-reduce phases

  • Output of the mapper is stored on the local disk and not on HDFS. This is done because the map’s output is intermediate and needs to be thrown away once the reduce task is complete. Storing it in HDFS which would do replication for the same, would be an overkill. In case of disk failures or the data being unavailable when the reducer needs to consume it, hadoop fires another map task on another node to regenerate the data.

  • Reducer cannot take advantage of the data locality optimization as it is a consumer of data from multiple mappers. Therefore, output data of the mapper is transferred across the network to the node running the reduce task. The output of the reducer is stored in HDFS for reliability.

Partitioning of data

  • In case of multiple reducers, the map tasks split their output into multiple partitions depending on the number of reduce tasks. While partitioning (using the default partitioner), it is ensured that the key value pairs corresponding to a single key are all present in the same partition.

Possibility of zero reduce tasks

  • Zero reduce tasks is also a possibility when the map tasks are adequate enough to process the data and in that case the map output is written to the HDFS.


  • As there is an inherent network bandwidth dependency of data transfer between the mappers and the reducers, so to cut down on the data which is received by the reducer to do the final processing, combiners are used to do local aggregation of the mapper’s output.

  • For example, consider a case where we need to calculate the maximum on a given set of data.

  • Now, it may happen that data with the same key is gets split across multiple mappers and there are n tuples/records corresponding to that key, coming out from each mapper. Say, the number of mappers producing this output is m.

  • In this particular case, in the absence of a combiner, a total of m*n tuples would need to get transferred to the reduce task’s node for calculating the overall max.
By making use of a combiner, we can cut down on the above transfer by calculating the local max from the output of each mapper and only transferring that tuple from the respective mapper’s output. In this way, we would only transfer m (which is the number of mappers) tuples instead of m*n (where n is the number of tuples for the key coming out from a single mapper). It must be noted that all kinds of functions may not be able to do local aggregation. For example, the mean function may not be suitable to do any local aggregation.


  • Hadoop streaming is an API to run Map Reduce from a number of languages.
  • Hadoop Pipes is the C++ API for Hadoop, the only difference (from Hadoop streaming) being it uses sockets (and not JNI) instead of using standard input and output which is used by streaming.

No. of mappers/reducers

  • No. of mappers is decided in accordance with the data locality principle as described earlier. If we see some mappers running for a very small period of time, try to bring down the number of mappers and make them run longer for a minute or so.
  • No. of reducers should be slightly less than the number of reduce slots in the cluster (the concept of slots comes in with a pre-configuration in the job/task tracker properties while configuring the cluster) so that all the reducers finish in one wave and make full utilisation of the cluster resources.