Pigs, Bees, and Elephants: A Comparison of Eight MapReduce Languages

by Alan | April 25th, 2011

This week’s guest blogger is Dataspora’s own Antonio Piccolboni. The original post can be found on his personal blog.

On a quest for an elegant and effective map reduce language, I went through a number of options and put together some considerations. And the winner is …

In a couple of blog entries from my personal blog I described some map-reduce algorithms for statistical and graph problems and sketched their implementation using pseudo-code. Pseudo-code has two problems: not everybody agrees on what a statement means and it doesn’t run, so you can’t test it or use it. Real programming languages on the other hand tend to obscure the logic of a program with unnecessary detail and have other issues that hinder readability, the reason why people resort to pseudo-code. But there is more to it than just aesthetics. Conciseness of code is related to programming abstractions, constructs that achieve higher generality and remove unnecessary detail; to reuse, whereby the same code is used in different contexts, reducing total program size; and even testing, that is concise programs can be tested more easily. In sum, shorter programs are better. The elegance of less is hardly my own or a software engineering discovery. As Antoine de Saint-Exupery, French writer and aviator, so eloquently put it :

Perfection is achieved, not when there is nothing more to add, but when there is nothing left to take away.

Unfortunately, in some circles, dull, predictable, repetitive code is considered simpler than short and to the point code, or at least tolerable. From java.util.Arrays:

478 /*
479 * The code for each of the seven primitive types is largely identical.
480 * C’est la vie.
481 */

In this case, repetition gets a free pass in exchange for efficiency. Very expressive languages tend to exact a higher toll on resources, and the different map-reduce environments we will look into are no exception.
I will present for each language or library the implementation of a word count program, lifted from its documentation, since this has become sort of the “Hello World” for map reduce. I don’t think such a simple program is the ultimate test of the quality of a language, so this is just to give a taste of the language. What I am most interested in is:

  • Can I write reasonably concise, abstract programs in this language or library?
  • Can I write the “inside” of map reduce, that is the code for the mapper and the reducer, as well as the “outside”, the logic that decides which map reduce jobs to run?
  • Is it general? Can I write any map-reduce program, including programs that require multiple map-reduce jobs, including the case of a data dependent number and type of jobs?
Java Hadoop

This is the original, the real thing, the current performance champion and what “real men” write in. It is also the most mature of the different options. But take a look:

public static class MapClass extends MapReduceBase
public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable(1);
             private Text word = new Text();

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          output.collect(word, one);
        }
      }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
          sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
      }
    }

    public static void main(String[] args) throws Exception {
      JobConf conf = new JobConf(WordCount.class);
      conf.setJobName("wordcount");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(Map.class);
      conf.setCombinerClass(Reduce.class);
      conf.setReducerClass(Reduce.class);

      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      conf.setInputPath(new Path(args[0]));
      conf.setOutputPath(new Path(args[1]));

      JobClient.runJob(conf);
    }
}

48 lines to write a word count program (and I stripped the import statements at the top out of mercy)! My favorite line is number 5, a line devoted to redefining the number one. This makes sense in a world where programmer productivity is measured by number of lines of code written or for a production job that runs on a 1,000 node cluster for 5 hours every night, in which case efficiency may trump other considerations. But for a blog, for discussing and enjoying code, anything remotely more interesting than a word count program would not fit the size of an entry but would have to be an attachment, as John Mount did with his painstaking Java/Hadoop implementation of logistic regression. I wonder how many people opened that tar file and read it through and through.

Hadoop was started by Doug Cutting and is developed by a large community with a significant group employed at Yahoo.

Cascading

Cascading is a Java library written on top of Hadoop. It enables programming in a dataflow style, with some primitives inspired by SQL (like GroupBy). But according to a person closely related to the project, “it’s still Java, it’s still boilerplate code”. My favorite line is number 18. Remarkably, it trims down the line count for the word count program to half as many as plain Hadoop. I don’t have first hand experience with Cascading, but since there is no or little performance penalty compared to the real thing — depending on programmer skill, it could actually be better — it’s worth a try for production work.

Cascading is developed by Chris Wensel at Concurrent.

Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );

Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

Pipe assembly = new Pipe( "wordcount" );

String regex = "(?>!\pL)(?=\pL)[^ ]*(?<=\pL)(?!\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );

assembly = new GroupBy( assembly, new Fields( "word" ) );

Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );

Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );

FlowConnector flowConnector = new FlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

flow.complete();
Pipes — C++

C++ fits into the Java environment not without some effort, which is encapsulated in a library called Pipes. The word count program looks more compact than in Java/Hadoop. I’ve read opposite comments on the efficiency of Pipes/C++ vs Hadoop/Java and I suspect it may depend on the specific problem being tackled. Even if I used to be quite proficient in C++, I do not remember fondly the 8000 characters template-induced error messages and I don’t think it is the type of language I would want to use to discuss algorithms or for prototyping.

Pipes is developed as part of the Hadoop project.

class WordCountMap: public HadoopPipes::Mapper {
public:
 WordCountMap(HadoopPipes::TaskContext& context){}
 void map(HadoopPipes::MapContext& context) {
   std::vector<std::string> words =
     HadoopUtils::splitString(context.getInputValue(), " ");
   for(unsigned int i=0; i < words.size(); ++i) {
     context.emit(words[i], "1");
   }
 }
};

class WordCountReduce: public HadoopPipes::Reducer {
public:
 WordCountReduce(HadoopPipes::TaskContext& context){}
 void reduce(HadoopPipes::ReduceContext& context) {
   int sum = 0;
   while (context.nextValue()) {
     sum += HadoopUtils::toInt(context.getInputValue());
   }
   context.emit(context.getInputKey(), HadoopUtils::toString(sum));
 }
};

int main(int argc, char *argv[]) {
 return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap,
                             WordCountReduce>());
}
Hive

Hive is a SQL-like language that is interpreted on top of Hadoop. It can also be combined with small programs written in a variety of languages, to make up for the fact that the language itself is not general purpose. For what it does, it is very concise and expressive, but outside that you need to supplement it with other languages. Case in point, the word count example where two additional scripts are left as an exercise for the reader.
Hive started as part of the Hadoop project.

FROM
(MAP docs.contents USING 'tokenizer_script' AS word, cnt
FROM docs
CLUSTER BY word) map_output

REDUCE map_output.word, map_output.cnt USING 'count_script' AS word, cnt;
Pig

Pig adds to the limitations of Hive the hubris of creating a brand new language, as if creating a new programming language were easy. As you can see, it is inspired by SQL to a degree. It is not a general purpose language as clearly explained here. It interfaces with any JVM based language for custom extensions.

A = load '/tmp/bible+shakes.nopunc';
B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;
C = filter B by word matches '\w+';
D = group C by word;
E = foreach D generate COUNT(C) as count, group as word;
F = order E by count desc;
store F into '/tmp/wc';

Pig development was started at Yahoo.

Rhipe

Rhipe is an R package to describe and execute map-reduce jobs. It is reasonably high level and satisfies all the criteria I listed above. It’s not a speed daemon, because of R itself, there are some quirks in the API and it’s still at an initial stage of development, but interesting.

rhinit()
m <- expression({
 y <- strsplit(unlist(map.values)," ")
 lapply(y,function(r) rhcollect(r,T))
})
r <- expression(
   pre={
     count=0
   },
   reduce={
     count <- sum(as.numeric(unlist(reduce.values)),count)
   },post={
     rhcollect(reduce.key,count)
   })
z=rhmr(map=m,reduce=r,comb=T,inout=c("text","sequence"),ifolder="/tmp/50mil",ofolder='/tmp/tof')
rhex(z)

Rhipe is developed by Saptarshi Guha at Mozilla at Mountain. View, California

Dumbo

Dumbo is a Hadoop library for python, but also imposes a set of tools to run dumbo program. If you look at the word count program in Dumbo, below, it almost looks like pseudo-code! Finally! But there is a serious catch. There can only be a run statement per dumbo-powered program — I asked the author himself after seeing some outlandish looking errors. To coordinate two runs, for instance one that starts based on the output of the first, one has to run separate python programs and go through the unix shell. This is different from static composition of jobs, which is well supported, but not general enough for my purposes. Other options for python include MR Job and pydoop, but I haven’t had time to look into these yet.

mapper (filename, file-contents):
def mapper(key,value):
  for word in value.split(): yield word,1
def reducer(key,values):
  yield key,sum(values)
if __name__ == "__main__":
  import dumbo
  dumbo.run(mapper,reducer)

Dumbo is developed by Klaas Bosteels at last.fm.

Cascalog

Built on top of the already powerful cascading as a domain specific language within Clojure, Cascalog wins the word count conciseness contest with a one-liner. Indeed, word counting is simple enough that a line is all that it should take. But look at what a line:

(?<- (stdout) [?word ?count] (sentence ?s) (split ?s :> ?word) (c/ count ?count))

It probably looks familiar to anybody who’s familiar with it. Conciseness can become terseness, but once some domain specific concepts have been grasped a terse program such as this might become perfectly clear. It was to me at some point. My misgivings here are more about the JVM-powered revival of LISP in the form of Clojure. LISP has been around some 50-odd years without taking off despite several attempts at its revival (Common LISP, Scheme, Arc and now Clojure). I suspect something is wrong with it, even if popularity is not an accurate gauge of language quality, as BASIC has long proved. Personally, I dislike LISP odd syntax, the widespread use of side effects in a functional language and the poor abstraction that lists represent over RAM, from a performance point of view — indeed LISP variants often add additional data structures, somehow negating the “LIS” part of the language. In the specific case of Clojure, the fact that a compiled language is compiled into an interpreted one, JVM bytecode, combining a slow dev cycle with suboptimal performance, makes me think Clojure users must be glutton for punishment.

Cascalog is developed by Nathan Martz at Backtype.

Final thoughts

At the end of this by necessity incomplete and unscientific language and library comparison, there is a winner and there isn’t. There isn’t because language comparison is always multidimensional and subjective but also because the intended applications are very different. On the other hand, looking for a general purpose, moderately elegant, not necessarily most efficient, not necessarily mature language for exploration purposes, Rhipe seems to fit the bill pretty nicely. First, it is just a library, which means that one can continue to use the tools he’s familiar with. I found it particularly useful to run map-reduce jobs in the interpreter, inspecting the inputs and outputs of each, an invaluable debugging help — but no, you can not step into a mapper or reducer, I use counters instead to trace what’s going on in there. I also like that one can read and write sequence files with one call, to examine the output of previous jobs and decide what to do next. Additionally since R is a statistical language and Hadoop is the tool of choice for big data analytics, this seems like a natural fit. Personally, I am familiar with both, which helps, and I have used R, in combination with Hive or Hadoop, to do analytics in the past, but not at this level of integration. Since there is nothing like trying a more substantial example than word count to figure out a language pros and cons, stay tuned for a fairly complex example. After that is published, I plan to pose a friendly challenge to experts in the languages and libraries above or other Hadoop related languages and see what an implementation of the same algorithm would look like in their language of choice and learn something from the comparison. Maybe among my “25 readers” there is someone who will take it up.

  1. Yang says:

    Hi, you might be interested in Scala Map Reduce – more pseudo-code-ish than Python, has the performance of a compiled JVM language, and it’s just a wrapper around Java Hadoop so it comes without the caveats of some of the other runtimes:

    object WordCount {
    def main(args : Array[String]) {
    val (h,remainingArgs) = Hadoop.fromArgs(args,new Path(“output”));
    val words = for( (offset,line) <- h.loadLines(remainingArgs);
    word
    (word,iter.reduceLeft(_+_));
    }
    sums.elements foreach println
    }
    }

  2. Yang says:

    Hmmm…not sure how to format the above, and also some text got cut out. I got the above from: http://www.scala-blogs.org/2008/09/scalable-language-and-scalable.html

  3. Saptarshi says:

    Hi Antonio,

    Good writeup. Just wanted to point out that I now work for Mozilla in the Metrics team.

  4. jake says:

    thanks for the nice post, antonio.

    agreed that there’s not a great all-purpose solution currently available.

    however, we tend to get a good amount of mileage out of pig + streaming. as you know, pig provides a nice framework for high-level logic and joins, groupbys, filters, etc., and streaming within pig is a great way to avoid writing lengthy java udfs for more specific functionality.

    hopefully 0.9′s python udfs will improve this a bit (e.g., right now there are major limits on importing python modules) …

  5. Ken Krugler says:

    Hi Antonio,

    Thanks for taking the time to write up your comparisons – very interesting.

    We use Cascading for a lot of our development, and once you get past the initial learning curve it’s really good for creating complex data processing workflows.

    And just for grins, here’s a more minimal (in terms of line counts, and text) version:

    Tap source = new Hfs( new TextLine(), INPUT_PATH );
    Tap sink = new Hfs( new TextLine(), OUTPUT_PATH, SinkMode.REPLACE );

    Pipe assembly = new Each( “wordcount”, new Fields( “line” ), new RegexGenerator( new Fields( “word” ), WORDCOUNT_PATTERN ) );
    assembly = new Every( new GroupBy( assembly, new Fields( “word” ) ), new Count() );

    Properties properties = new Properties();
    FlowConnector.setApplicationJarClass( properties, MinWordcountTest.class );
    new FlowConnector( properties ).connect( “word-count”, source, sink, assembly ).complete();

    – Ken

  6. Tim says:

    Your statements about Lisp are downright bizarre.

    “LISP has been around some 50-odd years without taking off despite several attempts at its revival”

    If popularity is important to you, have “Pig” or “Hive” or even “R” “taken off”? Why don’t you make similar comments for them? R has been around for almost 20 years without “taking off”!

    “Personally, I dislike LISP odd syntax, the widespread use of side effects in a functional language and the poor abstraction that lists represent over RAM”

    Poor abstraction … yet every other language you could find was between 5 and 50 times longer at the same trivial task. What exactly do you mean by “poor abstraction”?

    “In the specific case of Clojure, the fact that a compiled language is compiled into an interpreted one, JVM bytecode, combining a slow dev cycle with suboptimal performance

    So how WAS its performance? Is this JVM-compiled code actually slower than the others? This seems like it would be extremely easy to test, so you would not have to rely on assumptions.

    “makes me think Clojure users must be glutton for punishment”

    I’m looking at your 50-line Java program to count words, including the part where it took a whole line to define the number one, and the part where it took 4 lines to sum a sequence of numbers, and I have no idea how you arrived at this conclusion.

    • Antonio says:

      Hi Tim,
      Pig, Hive and R are relatively young, the jury is still out. All languages start unpopular and some stay that way. I wasn’t even trying to suggest that LISP doesn’t allow abstraction, not even remotely. It’s making lists so central to the whole idea to the detriment of other data structures that I find difficult to accept, even if I’ve been evangelized multiple times about the advantages of seeing everything as a list. But, performance-wise, giving up vectors or relegating them to the role of extensions is a bit too much for me to still consider it practical. So it’s a poor abstraction in the sense that it is too far from the metal, not that it is not abstract. But this is introduced by a “personally” because it is influenced by the class of applications I work on, not an absolute statement about the language. I am glad it works for you. Your JVM observation is correct in that the same argument applies to Java and Cascading, I just didn’t mention it because a) there were other issues that seemed more pressing for those two b) With LISP I have an expectation to have an interpreter available and so it was more surprising and interesting that there isn’t one. Didn’t mean to put Clojure at the bottom of the pile by any means, just surprised that people are willing to use a LISP without an interpreter. No program in this post is my own, let’s set that straight. All quoted from documentation.

  7. I’m a little surprised no one has commented here since your post is getting quite a lot of coverage (and amusement) on Twitter…

    Your comments about Clojure are very uninformed, I’m afraid. You criticize it for being compiled into interpreted bytecode – yet that is exactly how every other language on the JVM operates, including Java, and the JVM is not a pure interpreter anyway but compiles “hot” code to native binary instructions on the fly – which benefits Clojure just like it benefits Java. You say this slows down the dev cycle yet Clojure has a REPL and allows hot replacement of running code so the natural mode of iterative, interactive development that Lisp programmers love is just as prevalent in Clojure.

    You also complain about “the widespread use of side effects in a functional language” but Clojure is based on immutable data structures and restricts any side effects to tightly controlled STM (Software Transactional Memory). Perhaps you intended this to be a criticism of Lisp itself and just didn’t know that Clojure does not have that same approach?

    Syntax is a very personal thing and many people think they won’t like the parentheses but it’s much more about placement – and prefix notation for math expressions – than the amount of parentheses, and once you start using a Lisp-family language, the parentheses pretty much disappear (most folks use editors that auto-insert closing parentheses and offer structural editing). I don’t like Ruby, PHP, or Perl because of their syntax – which is just my personal preference. Lisps by comparison have almost no syntax :)

    I’m glad that you chose to cover Cascalog but I’m sorry that your evaluation of it was undermined by your lack of knowledge of Clojure.

  8. nickik says:

    >My misgivings here are more about the JVM-powered revival of LISP in the form of Clojure. LISP has been around some 50-odd years without taking off despite several attempts at its revival (Common LISP, Scheme, Arc and now Clojure).

    Ok, wait a moment, I think you dont know a lot about lisp. CL and Scheme were and are not attemps to revive lisp. LISP (capital letters) itself was writen 50 years ago and since then there were lots and lots of dialects. In the 80tis (lisp was used for a lot of reserach spezially AI) people where fed up with having so many dialects of the lisp-like-language famally. So the started to attempted standardising on CL. While that was going on the hole AI researchfield collapst (the AI Winter) and since then Lisp isn’t big anymore. So if people talk about “the revivel of lisp” the don’t talk about the 50 year olt LISP the talk about the wide spread use of lisp like languages.

    Scheme was always a language for research (often programming language research) and teaching because it was (and is) a minimal language.

    So that Common Lisp and Scheme are attempts to revival of LISP is nonsence.

    Now lets talk about Arc. Arc was in effect an attempted to revival lisp and creat the 100 year language. Until know no a lot has came from it. There are still people working on it but not the original creater Paul Graham.

    So next up Clojure. Clojure was not an attempted to do a big lisp revival. Rich Hicky the greater just wanted a language that was as good at it could be at concurency and he wanted it pragmatic so he based it on the jvm. He made the syntax lisp like because he likes it not because he want to be the great lisp reviver. Clojure brings in a lot of stuff from haskell, ml and java.

    >Personally, I dislike LISP odd syntax,

    Have you programmed enough lisp that you can really say that? Anyway to you know why Cascalog is so concise? Because macros allow easy implementing of DSLs. Why are macros so easy in lisp? Because of the wired syntax. So you see if the syntax wasn’t a little wired then there would even be something like Cascalog.

    > the widespread use of side effects in a functional language

    What do you mean by that? The main value of functional langauges is to avoid side effects.

    >and the poor abstraction that lists represent over RAM, from a performance point of view — indeed LISP variants often add additional data structures, somehow negating the “LIS” part of the language.

    Whats more importend having a good language or embracing the name? So Clojure is nagating the “LIS”-part, who cares its called clojure not Lisp. In Clojure vectors, maps, sets and lists all have literal form and are equally easy to use.

    List: ‘(1 2 3 4)
    Vector: [1 2 3 4]
    Set: #{1 2 3 4}
    hash map: {“key1″ 1, “key2″ 2}

    You can equally use all the functions like map, reduce, filter on all the data structures because clojures datastructures are based on abstractions (in form of interfaces). Thats what makes clojure diffrent from most other lisps out there.

    >In the specific case of Clojure, the fact that a compiled language is compiled into an interpreted one, JVM bytecode

    JVM bytecode is not interpreted its JIT Compiled. Why don’t you have the same problem with Java. Java is a langauge that is compiled to JVM bytecode.

    >combining a slow dev cycle

    The clojure dev cycle is very fast. You get all the benefits you always got from lisps (or knowdays from python, ruby ….). Clojure has a repl and feels like an iterpreted langauge.

    > with suboptimal performance

    I dont know about the the performance of Cascalog but Clojure is pretty fast. I can often get to speeds near Java. As far as I know Cascalog alows for hint to optimice the MapReduce Job. There is a video of Nathan talking about that but I cant find it at the moment.

    I know that not everybody can know the history of lisp and its ok the make a error or two (i probebly did) but you where of pretty far with alot of things.

    P.S.

    R is an implementation of the S programming language combined with lexical scoping semantics inspired by Scheme.
    http://en.wikipedia.org/wiki/R_programming_language

    So writing in R will only save you from lisp syntax.

    P.S.S

    >First, it is just a library, which means that one can continue to use the tools he’s familiar with.

    Cascalog is just a library too.

    >I also like that one can read and write sequence files with one call,

    Thats typical lisp. Clojuer would be (load-file “path”)

    >Additionally since R is a statistical language and Hadoop is the tool of choice for big data analytics, this seems like a natural fit. Personally, I am familiar with both, which helps, and I have used R, in combination with Hive or Hadoop, to do analytics in the past, but not at this level of integration.

    In clojure there is a statistical DSL called incanter.
    From the website:
    Incanter is a Clojure-based, R-like platform for statistical computing and graphics. – http://incanter.org

    So you get statistics and MapReduce in one language to plus you get the benefit that it all runnes on the jvm like hadoop does.

  9. Quora says:

    What are some good class projects for Machine Learning using MapReduce?…

    Try implementing some ML algorithms not yet covered in Apache Mahout: What are some important algorithms not yet covered in Mahout? , and What are the top 10 data mining or machine learning algorithms? See open items: https://cwiki.apache.org/confluenc...

  10. eustache says:

    Hi,

    I think for Python you should really look deeper at PyDoop since it exposes most of the Pipes API in Python. You thus have full control of MapRed _and_ HDFS through a “simple like pseudo-code” language.

    Eustache