Hadoop: The Definitive Guide

Tom White

Mentioned 13

Counsels programmers and administrators for big and small organizations on how to work with large-scale application datasets using Apache Hadoop, discussing its capacity for storing and processing large amounts of data while demonstrating best practices for building reliable and scalable distributed systems.

More on Amazon.com

Mentioned in questions and answers.

I am trying to understand ZooKeeper, how it works and what it does. And I am totally confused. Is there any application which is comparable to ZooKeeper?

If you know, then how would you describe ZooKeeper to a layman. (Considering I am one)

I have tried apache wiki, zookeeper sourceforge...but I am still not able to relate to it. Any help would be appreciated!

I just read thru http://zookeeper.sourceforge.net/index.sf.shtml, so aren't there more services like this? Is it as simple as just replicating a server service?

In a nutshell, ZooKeeper helps you build distributed applications.

How it works

You may describe ZooKeeper as a replicated synchronization service with eventual consistency. It is robust, since the persisted data is distributed between multiple nodes (this set of nodes is called an "ensemble") and one client connects to any of them (i.e., a specific "server"), migrating if one node fails; as long as a strict majority of nodes are working, the ensemble of ZooKeeper nodes is alive. In particular, a master node is dynamically chosen by consensus within the ensemble; if the master node fails, the role of master migrates to another node.

How writes are handled

The master is the authority for writes: in this way writes can be guaranteed to be persisted in-order, i.e., writes are linear. Each time a client writes to the ensemble, a majority of nodes persist the information: these nodes include the server for the client, and obviously the master. This means that each write makes the server up-to-date with the master. It also means, however, that you cannot have concurrent writes.

The guarantee of linear writes is the reason for the fact that ZooKeeper does not perform well for write-dominant workloads. In particular, it should not be used for interchange of large data, such as media. As long as your communication involves shared data, ZooKeeper helps you. When data could be written concurrently, ZooKeeper actually gets in the way, because it imposes a strict ordering of operations even if not strictly necessary from the perspective of the writers. Its ideal use is for coordination, where messages are exchanged between the clients.

How reads are handled

This is where ZooKeeper excels: reads are concurrent since they are served by the specific server that the client connects to. However, this is also the reason for the eventual consistency: the "view" of a client may be outdated, since the master updates the corresponding server with a bounded but undefined delay.

In detail

The replicated database of ZooKeeper comprises a tree of znodes, which are entities roughly representing file system nodes (think of them as directories). Each znode may be enriched by a byte array, which stores data. Also, each znode may have other znodes under it, practically forming an internal directory system.

Sequential znodes

Interestingly, the name of a znode can be sequential, meaning that the name the client provides when creating the znode is only a prefix: the full name is also given by a sequential number chosen by the ensemble. This is useful, for example, for synchronization purposes: if multiple clients want to get a lock on a resources, they can each concurrently create a sequential znode on a location: whoever gets the lowest number is entitled to the lock.

Ephemeral znodes

Also, a znode may be ephemeral: this means that it is destroyed as soon as the client that created it disconnects. This is mainly useful in order to know when a client fails, which may be relevant when the client itself has responsibilities that should be taken by a new client. Taking the example of the lock, as soon as the client having the lock disconnects, the other clients can check whether they are entitled to the lock.


The example related to client disconnection may be problematic if we needed to periodically poll the state of znodes. Fortunately, ZooKeeper offers an event system where a watch can be set on a znode. These watches may be set to trigger an event if the znode is specifically changed or removed or new children are created under it. This is clearly useful in combination with the sequential and ephemeral options for znodes.

Where and how to use it

A canonical example of Zookeeper usage is distributed-memory computation, where some data is shared between client nodes and must be accessed/updated in a very careful way to account for synchronization.

ZooKeeper offers the library to construct your synchronization primitives, while the ability to run a distributed server avoids the single-point-of-failure issue you have when using a centralized (broker-like) message repository.

ZooKeeper is feature-light, meaning that mechanisms such as leader election, locks, barriers, etc. are not already present, but can be written above the ZooKeeper primitives. If the C/Java API is too unwieldy for your purposes, you should rely on libraries built on ZooKeeper such as cages and especially curator.

Where to read more

Official documentation apart, which is pretty good, I suggest to read Chapter 14 of Hadoop: The Definitive Guide which has ~35 pages explaining essentially what ZooKeeper does, followed by an example of a configuration service.

At the beginning I would like to describe my current position and the goal that would like to achieve.

I am a researcher dealing with machine learning. So far have gone through several theoretical courses covering machine learning algorithms and social network analysis and therefore have gained some theoretical concepts useful for implementing machine learning algorithms and feed in the real data.

On simple examples the algorithms work well and the running time is acceptable whereas the big data represent a problem if trying to run algoritghms on my PC. Regarding the software I have enough experiences to implement whatever algorithm from articles or design my own using whatever language or IDE (so far have used Matlab, Java with Eclipse, .NET...) but so far haven't got much experiences with setting-up infrastructure. I have started to learn about Hadoop, NoSQL databases etc., but am not sure what strategy would be the best taking into consideration the learning time constraints.

The final goal is to be able to set-up a working platform for analyzing big data with focusing on implementing my own machine learning algorithms and put all together into production, ready for solving useful question by processing big data.

As the main focus is on implementing machine learning algorithms I would like to ask whether there is any existing running platform, offering enough CPU resources to feed in large data, upload own algorithms and simply process the data without thinking about distributed processing.

Nevertheless such platform exists or not, I would like to gain a picture big enough to be able to work in a team which could put into production the whole system tailored upon the specific customer demands. For example a retailor would like to analyze daily purchases so all the daily records have to be uploaded to some infrastructure, capable enough to process the data by using custom machine learning algorithms.

To put all the above into simple question: How to design a custom data mining solution for real life problems with main focus on machine learning algorithms and put it into production, if possible, by using the existing infrastructure and if not, design distributed system (by using Hadoop or whatever framework).

I would be very thankful for any advice or suggestions about books or other helpful resources.

First of all, your question needs to define more clearly what you intend by Big Data.

Indeed, Big Data is a buzzword that may refer to various size of problems. I tend to define Big Data as the category of problems where the Data size or the Computation time is big enough for "the hardware abstractions to become broken", which means that a single commodity machine cannot perform the computations without intensive care of computations and memory.

The scale threshold beyond which data become Big Data is therefore unclear and is sensitive to your implementation. Is your algorithm bounded by Hard-Drive bandwidth ? Does it have to feet into memory ? Did you try to avoid unnecessary quadratic costs ? Did you make any effort to improve cache efficiency, etc.

From several years of experience in running medium large-scale machine learning challenge (on up to 250 hundreds commodity machine), I strongly believe that many problems that seem to require distributed infrastructure can actually be run on a single commodity machine if the problem is expressed correctly. For example, you are mentioning large scale data for retailers. I have been working on this exact subject for several years, and I often managed to make all the computations run on a single machine, provided a bit of optimisation. My company has been working on simple custom data format that allows one year of all the data from a very large retailer to be stored within 50GB, which means a single commodity hard-drive could hold 20 years of history. You can have a look for example at : https://github.com/Lokad/lokad-receiptstream

From my experience, it is worth spending time in trying to optimize algorithm and memory so that you could avoid to resort to distributed architecture. Indeed, distributed architectures come with a triple cost. First of all, the strong knowledge requirements. Secondly, it comes with a large complexity overhead in the code. Finally, distributed architectures come with a significant latency overhead (with the exception of local multi-threaded distribution).

From a practitioner point of view, being able to perform a given data mining or machine learning algorithm in 30 seconds is one the key factor to efficiency. I have noticed than when some computations, whether sequential or distributed, take 10 minutes, my focus and efficiency tend to drop quickly as it becomes much more complicated to iterate quickly and quickly test new ideas. The latency overhead introduced by many of the distributed frameworks is such that you will inevitably be in this low-efficiency scenario.

If the scale of the problem is such that even with strong effort you cannot perform it on a single machine, then I strongly suggest to resort to on-shelf distributed frameworks instead of building your own. One of the most well known framework is the MapReduce abstraction, available through Apache Hadoop. Hadoop can be run on 10 thousands nodes cluster, probably much more than you will ever need. If you do not own the hardware, you can "rent" the use of a Hadoop cluster, for example through Amazon MapReduce.

Unfortunately, the MapReduce abstraction is not suited to all Machine Learning computations. As far as Machine Learning is concerned, MapReduce is a rigid framework and numerous cases have proved to be difficult or inefficient to adapt to this framework:

– The MapReduce framework is in itself related to functional programming. The Map procedure is applied to each data chunk independently. Therefore, the MapReduce framework is not suited to algorithms where the application of the Map procedure to some data chunks need the results of the same procedure to other data chunks as a prerequisite. In other words, the MapReduce framework is not suited when the computations between the different pieces of data are not independent and impose a specific chronology.

– MapReduce is designed to provide a single execution of the map and of the reduce steps and does not directly provide iterative calls. It is therefore not directly suited for the numerous machine-learning problems implying iterative processing (Expectation-Maximisation (EM), Belief Propagation, etc.). The implementation of these algorithms in a MapReduce framework means the user has to engineer a solution that organizes results retrieval and scheduling of the multiple iterations so that each map iteration is launched after the reduce phase of the previous iteration is completed and so each map iteration is fed with results provided by the reduce phase of the previous iteration.

– Most MapReduce implementations have been designed to address production needs and robustness. As a result, the primary concern of the framework is to handle hardware failures and to guarantee the computation results. The MapReduce efficiency is therefore partly lowered by these reliability constraints. For example, the serialization on hard-disks of computation results turns out to be rather costly in some cases.

– MapReduce is not suited to asynchronous algorithms.

The questioning of the MapReduce framework has led to richer distributed frameworks where more control and freedom are left to the framework user, at the price of more complexity for this user. Among these frameworks, GraphLab and Dryad (both based on Direct Acyclic Graphs of computations) are well-known.

As a consequence, there is no "One size fits all" framework, such as there is no "One size fits all" data storage solution.

To start with Hadoop, you can have a look at the book Hadoop: The Definitive Guide by Tom White

If you are interested in how large-scale frameworks fit into Machine Learning requirements, you may be interested by the second chapter (in English) of my PhD, available here: http://tel.archives-ouvertes.fr/docs/00/74/47/68/ANNEX/texfiles/PhD%20Main/PhD.pdf

If you provide more insight about the specific challenge you want to deal with (type of algorithm, size of the data, time and money constraints, etc.), we probably could provide you a more specific answer.

edit : another reference that could prove to be of interest : Scaling-up Machine Learning

I have a Hadoop cluster setup and working under a common default username "user1". I want to put files into hadoop from a remote machine which is not part of the hadoop cluster. I configured hadoop files on the remote machine in a way that when

hadoop dfs -put file1 ...

is called from the remote machine, it puts the file1 on the Hadoop cluster.

the only problem is that I am logged in as "user2" on the remote machine and that doesn't give me the result I expect. In fact, the above code can only be executed on the remote machine as:

hadoop dfs -put file1 /user/user2/testFolder

However, what I really want is to be able to store the file as:

hadoop dfs -put file1 /user/user1/testFolder

If I try to run the last code, hadoop throws error because of access permissions. Is there anyway that I can specify the username within hadoop dfs command?

I am looking for something like:

hadoop dfs -username user1 file1 /user/user1/testFolder


By default authentication and authorization is turned off in Hadoop. According to the Hadoop - The Definitive Guide (btw, nice book - would recommend to buy it)

The user identity that Hadoop uses for permissions in HDFS is determined by running the whoami command on the client system. Similarly, the group names are derived from the output of running groups.

So, you can create a new whoami command which returns the required username and put it in the PATH appropriately, so that the created whoami is found before the actual whoami which comes with Linux is found. Similarly, you can play with the groups command also.

This is a hack and won't work once the authentication and authorization has been turned on.

I am a Web developer. I have experience in Web technologies like JavaScript , Jquery , Php , HTML . I know basic concepts of C. Recently I had taken interest in learning more about mapreduce and hadoop. So I enrolled my self in parallel data processing in mapreduce course in my university. Since I dont have any prior programing knowledge in any object oriented languages like Java or C++ , how should I go about learning map reduce and hadoop. I have started to read Yahoo hadoop tutorials and also OReilly's Hadoop The Definitive Guide 2nd.Edition.

I would like you guys to suggest me ways I could go about learning mapreduce and hadoop.

Go through the Yahoo Hadoop tutorial before going through Hadoop the definitive guide. The Yahoo tutorial gives you a very clean and easy understanding of the architecture. I think the concepts are not arranged properly in the Book. That makes it a little difficult to study it. So do not study it together. Go through the web tutorial first.

Ok, I am attempting to learn Hadoop and mapreduce. I really want to start with mapreduce and what I find are many, many simplified examples of mappers and reducers, etc. However, I seen to be missing something. While an example showing how many occurrences of a word are in a document is simple to understand it does not really help me solve any "real world" problems. Does anybody know of a good tutorial of implementing mapreduce in a psuedo-realistic situation. Say, for instance, I want to use hadoop and mapreduce on top of a data store similar to Adventureworks. Now I want to get orders for a given product in the month of may. How would that look from a hadoop/mapreduce perspective? (I realize this may not be the type of problem mapreduce is intended to solve but, it just came to mind quickly.)

Any direction would help.

Hadoop can be used for a wide variety of problems. Check this blog entry from atbrox. Also, there is a lot of information on the internet about Hadoop and MapReduce and it's easy to get lost. So, here is the consolidated list of resources on Hadoop.

BTW, Hadoop - The Definitive Guide 3rd edition is due in May. Looks like it also covers MRv2 (NextGen MapReduce) and also includes more case studies. The 2nd edition is worth as mentioned by orangeoctopus.

While running a mapreduce job I get an output like this:

 11/09/15 21:35:16 INFO mapreduce.Job: Counters: 24
 File System Counters
 FILE: Number of bytes read=255967
 FILE: Number of bytes written=397273
 FILE: Number of read operations=0
 FILE: Number of large read operations=0
 FILE: Number of write operations=0
 Map-Reduce Framework
 Map input records=5
 Map output records=5
 Map output bytes=45

Here in the first line it says Counters: 24. Where can I find more info about these counters.

I am most interested in large read operations=0, what are these?
If anyone have any knowledge or reference about these, please reply.


I would recommend you take a look at Tom White's Hadoop book, especially Chapter 8.1 where he gives a detailed list of counters and their meanings. You can find an online version here.

For the large read operations, it corresponds to the number of large file system read operations such as list files under a large directory. It was introduced in HADOOP-6859 where it is described as this: on file system, most of the operations are small except listFiles for a large directory. Iterative listFiles was introduced in HDFS to break down a single large operation into smaller steps. This counter is incremented for every iteration of listFiles, when listing files under a large directory.

This ticket also explains some of the other new counters:

  • read operations - number of read operations such as listStatus, getFileBlockLocations, open etc.
  • write operations - number of write operations such as create, append, setPermission etc.

I would advise you look at the FileSystem.Statistics class which details some additional filesystem counters as described here

I have a case in which Mapper emits data that belongs to a subgroup and the subgroup belongs to a group.

I need to add up all the values in the subgroup and find the minimal value between all subgroups of the group, for each of the groups.

So, I have an output from Mapper that looks like this

Group 1


Group 2


And my output should be

Group1, 1, (2+3+4)
Group1, 2, (1+2)
Group1, 3, (1+2+5)

Group1 min = min((2+3+4),(1+2),(1+2+5))

Same for Group 2.

So I practically need to group twice, first group by GROUP and then inside of it group by SUBGROUPID.

So I should emit the minimal sum from a group, in the given example my reducer should emit (2,3), since the minimal sum is 3 and it comes from element with id 2.

So, it seems that it could be solved best using reduce twice, first reduce would get elements grouped by id and that would be passed to the second Reducer grouped by Group id.

Does this make sense and how to implement it? I've seen ChainedMapper and ChainedReducer, but they don't fit for this purpose.


Your approach (summarized below), is how I would do it.

Job 1:

  1. Mapper: Assigns an id to a subgroupid
  2. Combiner/Reducer(same class): Finds the minimum value for subgroupid.

Job 2:

  1. Mapper: Assigns a groupid to a subgroupid.
  2. Combiner/Reducer(same class): Finds the minimum value for groupid.

This is best implemented in two jobs for the following reasons:

  • Simplifies the mapper and reducer significantly (you don't need to worry about finding all the groupids the first time around). Finding the (groupid, subgroupid) pairs in the mapper could be non-trivial. Writing the two mappers should be trivial.
  • Follows the map reduce programming guidelines given by Tom White in Hadoop: The Definitive Guide (Chapter 6).
  • An Oozie workflow can easily and simply accommodate the dependent jobs.
  • The intermediate file products (key:subgroupid, value: min value for subgroupid) should be small, limiting the use of network resources.

It seems as though the documentation on hadoop filesystem.statistics class is somewhat lacking.

What is meant by "bytes read" and "bytes written" and how are the counters implemented ?

What is meant by "bytes read" and "bytes written"

According to the Hadoop : The Definitive Guide

Filesystem bytes read - The number of bytes read by each filesystem by map and reduce tasks. There is a counter for each filesystem: Filesystem may be Local, HDFS, S3, KFS, etc.

Filesystem bytes written - The number of bytes written by each filesystem by map and reduce tasks.

how are the counters implemented ?

Check these tutorials (1, 2, 3) on how counters are implemented.

from what i've read, HDFS is fast because it relaxes some POSIX technics, but how does this work? or at least which ones? i've not found the answer because on google, i've found someone redirecting the asker to a large site!

According to the Hadoop - The Definitive Guide (suggest to get the book)

After creating a file, it is visible in the filesystem namespace, as expected:

A coherency model for a filesystem describes the data visibility of reads and writes for a file. HDFS trades off some POSIX requirements for performance, so some operations may behave differently than you expect them to.

However, any content written to the file is not guaranteed to be visible, even if the stream is flushed. So the file appears to have a length of zero:

Once more than a block’s worth of data has been written, the first block will be visible to new readers. This is true of subsequent blocks, too: it is always the current block being written that is not visible to other readers.

HDFS provides a method for forcing all buffers to be synchronized to the datanodes via the sync() method on FSDataOutputStream. After a successful return from sync(), HDFS guarantees that the data written up to that point in the file is persisted and visible to all new readers:

Another thing is

There are three types of permission: the read permission (r), the write permission ( w), and the execute permission (x). The read permission is required to read files or list the contents of a directory. The write permission is required to write a file, or for a directory, to create or delete files or directories in it. The execute permission is ignored for a file since you can’t execute a file on HDFS (unlike POSIX), and for a directory it is required to access its children.

How can I provide each line of a file fed to the mapper with splits of the same file?

Basically what i want to do is

for each line in file-split

    for each line in file{     


Can i do this using map reduce in java?

Actually when a mapreduce job is triggered, it first check the input file(s), for simplicity consider we have only one big inputfile!. If it is larger in size than the block size, job tracker split this file by the block size, then initiate No. of map tasks = No. of Splits generated and pass each split to each mapper task for processing. So no more than one split will be processed by each mapper. Also if the input file size is less than the block size, then jobtracker will take it as a separate split.

Suppose block size is 64MB, and you have 2 files each having 10MB in size, then jobtracker will generate 2 splits!,because according to FileInputFormat a split can be exactly a single file(incase filesize <= block size) or a part of a file (in case its size > blocksize).

Thus, a mapper will only process a single split, also a split cannot contain more than one file (true for FileInputFormat the default format, but in case of combine file input format it can span over multiple files).

I guess you are using FilInputFormat. HTH!

You can refer the Hadoop: The Definitive Guide to understand its basics.

I am currently reading Hadoop in Action .The book is very good, however it uses hadoop 1.2.1 to explain and showcase all the examples. But, I am using hadoop 2.2.0.

Does anybody know where I can find a full documentation about hadoop api changes ? and a simple mapping between 1.2.1 and 2.2.0 ?

For examples

DataJoinMapperBase, DataJoinReducerBase, and TaggedMapOutput 

Does not present in 2.2.0 and I am looking for there counterparts in 2.2.0 :)


"Hadoop: The Definitive Guide, Third Edition" by Tom White (Buy Here)

supports hadoop v2.2.

The source code is give on github https://github.com/tomwhite/hadoop-book

as mentioned on github, the code of the book is tested with

This version of the code has been tested with:
 * Hadoop 1.2.1/0.22.0/0.23.x/2.2.0
 * Avro 1.5.4
 * Pig 0.9.1
 * Hive 0.8.0
 * HBase 0.90.4/0.94.15
 * ZooKeeper 3.4.2
 * Sqoop 1.4.0-incubating
 * MRUnit 0.8.0-incubating

Regarding your question
Hadoop 2.2 use mapreduce api v2 while Hadoop 1.x use old mapreduce api. Check this book, it clearly explain the mapreduce code difference between 1.x and 2.2.

hope it helps..!!!

I tried to understand map reduce anatomy from various books/blogs.. But I am not getting a clear idea.

What happens when I submit a job to the cluster using this command:

..Loaded the files into hdfs already

bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

Can anyone explain the sequence of opreations that happens right from the client and inside the cluster?

Read Chapter 6 ("How MapReduce Works") of "Hadoop: The Definitive Guide". It explains it in good language. For quick read, see this and this.

I understand that VIntWritable can significantly reduce the size needed to store an integer, when compared to IntWritable.

My questions are: What is the cost of using VIntWritable instead of IntWritable? Is it (only) the time needed for compression? In other words, when should I use IntWritable, instead of VIntWritable?

How do you choose between a fixed-length and a variable-length encoding?

Fixedlength encodings are good when the distribution of values is fairly uniform across the whole value space, such as a (well-designed) hash function. Most numeric variables tend to have nonuniform distributions, and on average the variable-length encoding will save space. Another advantage of variable-length encodings is that you can switch from VIntWritable to VLongWritable, because their encodings are actually the same. So by choosing a variable-length representation, you have room to grow without committing to an 8-byte long representation from the beginning.

I just picked this up from the definitive guide book page 98