Disclaimer: I should point out that I wrote this project for those who are already working with Elixir and have need of integrating their existing codebases with Hadoop. I'm going to try assume limited knowledge of Hadoop/Elixir within this post - but I apologise if it goes a little too deep on either.

Background

Recently I've been working with Elixir more and more, for both personal projects as well as my job at Axway. Elixir is an open source language based on Erlang, and both of them run on the BEAM (the Erlang Virtual Machine). As such, it takes advantage of the concurrency model the BEAM provides which is slowly making it more popular amongst web developers (particularly those coming from Ruby) as it eases the creation of performant web apps. I've been using Elixir for about a year (I started learning it back in November 2015), and can easily say it's up there as one of my favourite languages.

A few months back, I began working on an Elixir application to replace a legacy system at work with something smaller, more flexible, and generally more efficient. Elixir worked pretty well due to the nature of the project (which I'm not going to go into), and it went into production around the middle of the year. This was a standalone application with few dependencies, but we immediately saw a performance throughput in the replaced system. This sparked conversation about moving some of our other systems over to Elixir, but unfortunately we hit an issue with our Hadoop integrations. Again, without going into it too much, we have a layer of common code between some of application code and Hadoop (written in Java in order to be compatible with Hadoop).

This would mean that we would have to keep copies of the common codebase for both languages; Elixir for the application nodes, and Java for the Hadoop nodes. As you can probably guess, we rejected this notion pretty quickly (our MapReduce code is not small). We realised we'd have to stick with our current setup (for now). I went away to research using languages other than Java against Hadoop, just out of interest, and soon discovered Hadoop Streaming. At a high level, Hadoop Streaming is a wrapper which can pipe inputs (in Hadoop fashion) through to arbitrary executables with the standard IO devices.

This looked intriguing and I found a couple of examples of the protocol being used mainly in Python scripting. It seemed simple enough, but there was enough overhead to make me not want to use it because of the way you have to interact with the IO - you have to split keys and values yourself, group your values yourself, etc. I figured it'd be nicer if there was some sort of library hiding the finer points of Hadoop Streaming.

Enter Vessel

So, that's when I decided to work on Vessel. As with most things I've written in the last year which are OSS, it was born of my own use case and I'm making it available just in case anyone else may need it. At a high level, Vessel is essentially a MapReduce framework written in Elixir which allows you to structure your MapReduce jobs through function overrides. This allows you to write jobs extremely quickly whilst keeping the details of the MapReduce process hidden. It's written in a way which means that you can use it to write command-line MapReduce binaries, even if you don't want to use Hadoop.

The interface is extremely simple, and takes care of storing state for you without introducing any overhead. There are grace functions to match the interface for Hadoop as it exists in Java, so setup/1, map/3, reduce/3 and cleanup/1 are all available. There are examples in the repository (only a couple for now) which demonstrate some popular MapReduce patterns. We're going to look at how you would get started with a traditional Word Counter example which just counts the number of words inside a file.

Creating a Project

In addition to the Elixir library, you can also install an Elixir archive which provides Mix command-line tasks inside your local environment to enable you to setup projects quickly. The available tasks just provide a quick way to set up a Vessel project, allowing you to get straight to your app logic rather than having to stub out the job.

Let's create a new project. To do this, we'll first install the Vessel archive and then use the Mix Task to generate a skeleton project.

# install the Vessel archive
zackehh:/tmp$ mix archive.install https://github.com/zackehh/vessel/releases/download/v0.8.0/vessel_arch-0.8.0.ez
Are you sure you want to install archive "https://github.com/zackehh/vessel/releases/download/v0.8.0/vessel_arch-0.8.0.ez"? [Yn] Y
* creating /Users/zackehh/.asdf/installs/elixir/1.3.4/.mix/archives/vessel_arch-0.8.0
# update to latest just in case
zackehh:/tmp$ mix local.vessel
Found existing archive: /Users/zackehh/.asdf/installs/elixir/1.3.4/.mix/archives/vessel_arch-0.8.0.
Are you sure you want to replace it with "https://github.com/zackehh/vessel/releases/download/v0.8.0/vessel_arch-0.8.0.ez"? [Yn] Y
* creating /Users/zackehh/.asdf/installs/elixir/1.3.4/.mix/archives/vessel_arch-0.8.0
# generate a new project
zackehh:/tmp$ mix vessel.new word_counter
* creating word_counter/config/config.exs
* creating word_counter/lib/word_counter/mapper.ex
* creating word_counter/lib/word_counter/reducer.ex
* creating word_counter/lib/word_counter.ex
* creating word_counter/test/word_counter_test.exs
* creating word_counter/test/test_helper.exs
* creating word_counter/.gitignore
* creating word_counter/mix.exs
* creating word_counter/README.md

We are all set! Compile your Vessel binaries:

    $ cd word_counter
    $ mix deps.get
    $ mix vessel.compile

Any built binaries will be in rel/ by default.

You can see that a new project has been created inside word_counter/ and some basic files have been setup for use.

Compiling Binaries

You should be aware that Vessel requires a custom compilation task to create your Vessel binaries (as Hadoop requires a binary). To do this, you just have to run mix vessel.compile which will place your built binaries inside the rel/ directory in a subdirectory named after your current project version.

Let's compile our binaries at this point just to make sure everything appears to be working.

zackehh:/tmp$ cd word_counter/
zackehh:/tmp/word_counter$ mix deps.get
Running dependency resolution
Dependency resolution completed
  exscript: 0.1.0
  vessel: 0.8.0
* Getting vessel (Hex package)
  Checking package (https://repo.hex.pm/tarballs/vessel-0.8.0.tar)
  Fetched package
* Getting exscript (Hex package)
  Checking package (https://repo.hex.pm/tarballs/exscript-0.1.0.tar)
  Using locally cached package
zackehh:/tmp/word_counter$ mix vessel.compile
==> exscript
Compiling 1 file (.ex)
Generated exscript app
==> vessel
Compiling 9 files (.ex)
Generated vessel app
==> word_counter
Compiling 3 files (.ex)
Generated word_counter app
Generated escript ./rel/v0.1.0/word_counter-mapper with MIX_ENV=dev
Generated escript ./rel/v0.1.0/word_counter-reducer with MIX_ENV=dev
warning: redefining module :word_counter_mapper_escript (current version defined in memory)
  deps/exscript/lib/exscript.ex:195

Generated escript ./rel/v0.1.0/word_counter-mapper with MIX_ENV=dev
warning: redefining module :word_counter_reducer_escript (current version defined in memory)
  deps/exscript/lib/exscript.ex:195

Generated escript ./rel/v0.1.0/word_counter-reducer with MIX_ENV=dev

Your code should compile, and you should see /rel/v0.1.0/word_counter-mapper and ./rel/v0.1.0/word_counter-reducer being built (these are your binaries). You'll see in the output above that there are a couple of warnings related to redefined modules - this is something I should have resolved soon (Vessel is currently v0.8.0 at the time of writing). They don't mean anything hasn't worked in our case, and so they can safely be ignored.

Writing Our Jobs

At this point, we know how to create a project and how to compile a project, but we don't actually know how to write a project. The two important files are contained inside lib/word_counter/mapper.ex and lib/word_counter/reducer.ex. If you take a look in the former, you can see a very small outline of what a job should look like and hopefully it should be evident as to which type of code goes where.

defmodule WordCounter.Mapper do
  @moduledoc """
  This module contains the implementation of the WordCounter mapping phase.
  """
  use Vessel.Mapper

  # Invoked once before the first set of input is read.
  #
  # You can carry out any initial steps in this callback, and can modify the Job
  # context as required for your application logic.
  #
  # You may safely remove this callback if you have no custom logic implemented.
  def setup(context) do
    context
  end

  # Invoked for every key/value pair read in by the input stream.
  #
  # The value here will be your input, and the key simply a unique identifier
  # for each input value.
  #
  # This is where you should transform and emit your values as your business
  # logic requires. You can write values using `Vessel.write/2`.
  def map(_key, _value, context) do
    context
  end

  # Invoked once after the final set of input is read.
  #
  # This is the appropriate place to remove temporary files, close connections,
  # and any other cleanup operations which might be required.
  #
  # You may safely remove this callback if you have no custom logic implemented.
  def cleanup(context) do
    context
  end

end

The function signatures match those from the native Hadoop Java interface, and act as the job lifecycle. The setup/1 function fires before any input has been processed, which makes it the perfect time to set up and dependencies you might need in advance of processing. The map/3 (or reduce/3 in a Reducer) is where your job logic is carried out, and the cleanup/1 function is the inverse of the setup function; it executes after all input has been processed.

Our Mapper

We're writing a very simple job, and so we don't need to manage any state or lifecycle - this means we can safely remove the setup/1 and cleanup/1 functions. If they're not defined in your module, Vessel will use default behaviour to remove the need to have them lying around in your code if you don't need to use them. We only need to keep the map/3 function around for the time being.

In order to have something to work from, we can assume that this will be our job input (for the sake of testing):

this is our input
something extra to say
this is another line of our input

Remember (or please note) that Hadoop will feed input into your job on a per-line basis, which means you will have the map/3 function called 3 times, once with each line. In order to find the words inside our binary, we can use String.split/2 to simply split the input lines on a space before writing them out to the job context. Obviously this isn't a perfect solution, but it'll suffice for our demo input. There is a more apt solution inside the Vessel repo. Here is our new map/3 function:

def map(_key, value, context) do
  value
  |> String.split(" ")
  |> Enum.each(&Vessel.write(context, { &1, 1 }))
end

Once we split our String into words, we can iterate through them and write them out to the job context. This is done via the Vessel.write/2 function, which will emit your values to the job output stream. You can test your map function by building your binaries using mix vessel.compile, saving the test input into input.txt and then using the command below to consume the input. The result will be sent to your Terminal window.

zackehh:/tmp/word_counter$ cat input.txt | ./rel/v0.1.0/word_counter-mapper
this    1
is      1
our     1
input   1
something       1
extra   1
to      1
say     1
this    1
is      1
another 1
line    1
of      1
our     1
input   1

In case you can't see it in the browser, the key and value are separated by a tab character. This may be customized using Hadoop configuration; please see the documentation on their end if you need to modify this (Vessel should be compatible regardless).

You can verify the output of your mapper with the output I placed above to see if everything is working correctly. As a challenge to the reader, feel free to place things such as commas and periods in the input and see if you can still parse out your words correctly.

Our Reducer

The output of the Mapper is the input for our Reducer. The key will be the word, as shown above, and the value will be a list of values written to that key. If you look above you will see that this was written twice, this means that your key will be this and your values will be [1, 1].

Our reducer is really simple to write, because every number we wrote to the job context is a 1. This means that we don't have to sum everything; we can just look at the length of the list and write that out to the context against the key. This should be fairly straightforward - it's done the same way as we did it inside the map/3 function.

def reduce(key, values, context) do
  Vessel.write(context, { key, length(values) })
end

You can test your Reducer binary in the same way you tested your Mapper binary; but please be aware that your input should be correctly sorted so that the keys are grouped together correctly. You can replicate this using the following command to pipe all the way through:

zackehh:/tmp/word_counter$ cat input.txt | ./rel/v0.1.0/word_counter-mapper | sort -k1,1 | ./rel/v0.1.0/word_counter-reducer 
another 1
extra   1
input   2
is      2
line    1
of      1
our     2
say     1
something       1
this    2
to      1

Success!

We can now see that our input has been counted correctly and voila, we have our first MapReduce job ready for Hadoop. We wrote only 4 lines of actual Elixir code to get to this point, which should demonstrate how much Vessel masks from you when writing your jobs. If you have any issue replicating the above job flow, feel free to reach out either over Twitter or on the GitHub repo.

The example above is very simple just to show the kind of interface you'll be working with, but there are many other functions available to Vessel - including those related to keeping state between invocations in your job. Please read the repo documentation for more information and visit the code documentation to look at the functions available.

Hopefully this quick look at Vessel was useful to demonstrate not only how it integrates with MapReduce but also how you can use it for basic analysis on your local machine even if you don't wish to use Hadoop. If you have any design suggestions or improvements, please feel free to file them in the repo and/or reach out!