Writing MapReduce Jobs Using Rust and Efflux
During my work life, I spend a lot of time working with MapReduce-style workflows, particularly with Hadoop infrastructure. A lot of this work is spent with larger amounts of data in order to implement the batch layer of the Lambda architecture. Due to this, the largest concern is that the behaviour is consistent across both the batch layer and the realtime layer - naturally you wouldn't want sporadic behaviour across the two. The easiest way to do this is to share code across the layers, to avoid having to keep implementations in sync. We have recently been working with Rust, which has been a little difficult to integrate with Hadoop MapReduce flows due to the fact it's mainly written in Java.
It's because of this that I began to work on a small library named Efflux. It's designed as a very small interface to the MapReduce pattern, and implemented in Rust to allow us to share code across the batch layer easily. The name stems from the fact that it integrates with Hadoop Streaming (and "river" was taken). The library itself is pretty small, at maybe a dozen modules, and has a very easy learning curve due to the interface being pretty familiar. Although initially written for interaction with Hadoop, it has also proved useful for command line utilities following the MapReduce pattern.
The Lifecycle Trait
The familiarity of the API comes through the use of traits to represent what would be called an interface in the Java layer of MapReduce. There is a "main" trait, named Lifecycle
. The lifecycle trait represents anything that can be used to react to an IO lifecycle; for example a stream of input from either stdin
or a file. This trait is pretty simple:
/// Lifecycle trait to allow hooking into IO streams.
pub trait Lifecycle {
/// Startup hook for the IO stream.
fn on_start(&mut self, _ctx: &mut Context) {}
/// Entry hook for the IO stream to handle input values.
fn on_entry(&mut self, _line: String, _ctx: &mut Context) {}
/// Finalization hook for the IO stream.
fn on_end(&mut self, _ctx: &mut Context) {}
}
It looks like pretty much any other lifecycle implementation would. There's a hook both before the stream is consumed, and after the stream has ended, as well as a hook for each block of input provided by the stream (which is a line of text, in our case). Each of these hooks is optional, as no action can be a sensible default in certain cases.
The Context
struct represents the way to share state across the stages of execution, and is a name borrowed from the Context
provided by the Java implementation of Hadoop. Note that each reference to the context is mutable, allowing you to easily modify state throughout the lifecycle. Lifecycle context is a generic container which allows you to store arbitrary types, so it's pretty easy to throw anything you want inside there.
The Mapper/Reducer Traits
To provide an easy interface to the developer in the public layer of Efflux, there are another couple of traits; Mapper
and Reducer
. Just like the Context
struct, these traits are based on the interfaces in the Java layer of Hadoop. They provide a very easy layer over the MapReduce pattern without the developer having to focus on how data gets from one stage to another. Each trait has the Lifecycle
trait implemented in such a way that they both act as an IO lifecycle; this is done through two structs of MapperLifecycle
and ReducerLifecycle
due to some complications with generics. Below is the Mapper
trait, which receives a key (represented by the byte offset of the current input), and a val
which is a line of String
input.
/// Trait to represent the mapping stage of MapReduce.
pub trait Mapper {
/// Setup handler for the current `Mapper`.
fn setup(&mut self, _ctx: &mut Context) {}
/// Mapping handler for the current `Mapper`.
fn map(&mut self, key: usize, val: String, ctx: &mut Context) {
ctx.write(key, val);
}
/// Cleanup handler for the current `Mapper`.
fn cleanup(&mut self, _ctx: &mut Context) {}
}
The ctx
argument is a Context
just like the one provided in the Lifecycle trait. The context is used to write pairs to, which emits pairs to stdio behind the scenes. The reason this has to happen via Context
rather than eprintln!
is due to the ability to customize your pair separator in the output (and they're stored in the context).
The setup
function allows the user to initialize any state before inputs are received, and the cleanup
function is used to destroy any state correctly. The Reducer
trait is almost the same, with the small difference that the val
value is a Vec<String>
(due to the grouping done in the job flow).
You may notice how this trait is very close in behaviour to Lifecycle
. This is deliberate; the difference is that Lifecycle
is a driver layer whereas Mapper
and Reducer
are user facing. Under the hood both Mapper
and Reducer
instances have the Lifecycle
trait implemented in such a way as to fit the Hadoop MapReduce protocols. This abstraction makes it easy for a user to write just two trait implementations and have a fully functional MapReduce implementation.
Creating Tasks
Before we look at writing a task by hand, it's worth mentioning that the repository contains a couple of examples which work well as base projects. In addition, Efflux provides a template which can be used to generate an empty skeleton project. You can use the Kickstart tool to create a new project from the template pretty easily:
# First, install Kickstart into your Cargo setup
$ cargo install kickstart
# Then invoke Kickstart to generate from the template
$ kickstart -s examples/template https://github.com/whitfin/efflux
This will prompt you for various details about your project; if you're just trying it out the defaults should be sufficient to generate a typical skeleton. A project setup will be created automatically based on the values you provide, and running cargo build
will then generate all necessary binaries.
Word Counting!
The easiest way to look at how these traits work is with a simple example. In the MapReduce realm the typical example is a word counter. This example simply accepts a stream of input (i.e. a text file) and counts the number of appearances of the words contained within. This is a good example in this case, because it can be tested directly from the command line easily - without a Hadoop installation.
For the purposes of this post, we'll use a trivial "word" definition here where we assume that our input is purely words - no punctuation, etc. The repository contains a more complicated example (here!) for those who want to look at a more complete implementation. We'll begin with our mapper; the first thing to do is create a new structure to act as our implementor. We don't need any state, so this is pretty much a marker struct for this example. This struct will then implement the Mapper trait; we don't need any setup/cleanup, so we purely need to implement the map function.
/// Our marker struct.
struct WordcountMapper;
/// Our mapping implementation.
impl Mapper for WordcountMapper {
/// Maps an input string to counts of input words.
fn map(&mut self, _key: usize, value: String, ctx: &mut Context) {
// Split each input into words.
//
// This is a trivial implementation which will just
// assume a stream of words without punctuation. Each
// word is written against a 1 to represent 1 instance.
for word in value.split(" ") {
ctx.write(word, 1);
}
}
}
Although pretty basic, this should suffice for now. Below is an example of an input and the output generated using the algorithm above. You can see that our reducer needs to combine the duplicated values into a single value; for example instead of having two entries for "this", we should have a single result with a count of 2.
/// Input
this is a string
this is another string
/// Output
this 1
is 1
a 1
string 1
this 1
is 1
another 1
string 1
This combination is what happens in our reduction phase. The string on the left side is provided as the key
in the Reducer
, and the numbers on the right are provided as the Vec<String>
value - where the Vector contains all numbers emitted against the String. With that said, this means that our Reducer
is a simple sum of the values for the String
:
/// Our marker struct.
struct WordcountReducer;
/// Our reducing implementation.
impl Reducer for WordcountReducer {
/// Reduces a number of word/count pairs into a single pair.
fn reduce(&mut self, key: String, values: Vec<String>, ctx: &mut Context) {
// Emit the key again, but with the total number of instances
//
// This works because we know each number is 1; so the length
// is the same as the total. To make this more robust we could
// parse each String as usize and sum mathematically.
ctx.write(key, values.len());
}
}
The implementation above just sums all of those 1
values for the key, and emits a total value. Voila, we have a basic word counter! It takes a very little amount of code to get to this point; feel free to try it yourself. The final output of this job at this point looks like the following:
/// Input
this is a string
this is another string
/// Output
this 2
is 2
a 1
string 2
another 1
Efflux In Practice
We typically use Efflux for Hadoop based MapReduce jobs based on datasets somewhere in the realms of TBs of gzipped data (of course, depending on the use case). In most cases it's just as fast as if it were written in Java; in several cases it's actually faster due to the performance throughputs we can gain from optimized Rust code. The place it does shine the most is the productivity with which we can write tasks; the small APIs make it practically boilerplate free to write a task quickly.
The one thing I do plan on improving in future (although it will have to happen in a major bump) is the ability to maintain types across phases in a job. In the examples shown above, note how the Reducer
receives String types even if you emit numbers (or anything else) from the Mapper
. In theory this is avoidable; I just need to figure out how to correctly pass across the types considering I have to go through stdio and back.
Anyway! Feel free to try out Efflux and let me know of any potential improvements! One place I got a little stuck was trying to implement Lifecycle
automatically for anything which implements Mapper
. Sadly because I was trying to do the same for Reducer
, there was potential overlap in the generics that I couldn't figure because the generic type could potentially implement both Mapper
and Reducer
. To resolve this I made two wrapping types of MapperLifecycle
and ReducerLifecycle
, but if anyone knows of a better solution please let me know!