I have a confession to make: I’ve been using Map / Reduce for the past 5 years. Up till recently I thought it was the only way to realistically process massive amounts of data in a reasonable amount of time. I’ve assumed streaming technologies were only relevant for simplistic applications where speed is prioritized over accuracy.
I’ve been working on an amazing project for TripAdvisor and have come to the conclusion that those views are wrong. Map / Reduce introduces a cost to your application that grows over time as your requirements becomes more complex. This complexity erodes your productivity by forcing you to take into account more and more things that don’t provide business value.
My plan is to split this topic over 3 blog posts. This post talks about the issues I’ve encountered using Map / Reduce. The second article introduces you to Samza, a new streaming technology introduced by LinkedIn. Part 3 explains the architecture I’ve developed for TripAdvisor using the abstractions provided by Samza.
Before working on big data systems, I built large-scale web based applications. Traditionally these types of applications use a database for persistent state. The application logic tier uses some sort of caching to improve performance and scale. Map / Reduce forces you to think of the way you decompose problems in a whole new way. Rethinking the way you solve problems in a new way is generally a good thing. However, the final solution should be better than what you had before. I believe the M/R model forces you to make some unnatural design decisions that add additional complexity to projects.
So why am I picking on Map / Reduce in this post? After all it’s what made possible the whole “big data” movement. I remember back in the day if you worked on a system that processed 1 TB a day then you could claim some bragging rights. So yes, we do owe Map / Reduce some love as a platform for opening up a whole new class of applications. Before Map / Reduce, processing data at “Internet Scale” was nearly impossible.
However, I hope to show that some of the original design decisions that allows Map / Reduce to scale to amazing heights are also the same decisions that are causing a lot of work for developers today. I believe it all comes back to two decisions: stateless jobs and immutable file systems.
If your systems are stateless then scaling a system horizontally is very easy. If your system needs more throughput then you just keep on adding more servers. As long as you can split your data into reasonable chunks then you can just keep on adding more capacity. This is possible because every node in the job is independent of each other. There is no state to move around between the machines.
So what happens if a node within the job fails? After all we’re talking about hundreds or maybe even thousands of machines here. If something bad can happen, odds are it probably will at some point. The solution is to make things as repeatable as possible or idempotent. Any node within a job should be able to restart at any point.
To help guarantee this, HDFS is for the most part an immutable file system. I say ‘for the most part’ because recent versions of HDFS do have append operations. However, those aren’t widely used because you have to assume tasks within a job will be restarted.
So what’s the downstream effect of this architecture? Let’s walk through them one by one:
Without re-reading data from disk, M/R jobs have no notion of what happened before. If you’re doing strictly data transformation type of operations (the original use case of M/R) then this is fine and M/R works quite well. Just plow through the data and transform X into Y.
However as soon as your application needs to do something slightly more complex, such as making decisions based on what happened on the previous run, then things become more interesting. The simplest approach is to brute force through all the previous data even though you might only need 10% of it. This has obvious scaling issues.
Yes, partitioning in HDFS does help but you have to be careful. If you partition too finely then the amount of work done by each node becomes less than the cost of distributing the work across the cluster. The solution is to save off what you need to read for the next run in the current run. This too becomes interesting as applications become more complex.
Breaking problems down into their most discrete steps oftentimes requires the creation of many individual M/R jobs together which are then composed into a larger work flow. This is especially true as the cardinality of data changes or as you aggregate the data over different keys. Each job in the workflow takes the input from previous job, rereads the data and performs new operations over it. Splitting your application in this way has some interesting side effects:
- Your business logic becomes split across many different classes across the system. Understanding how the system works is more challenging.
- If a phase fails then do you restart the job from the beginning or at the phase that failed? Restarting at the beginning is easier but may not be an option if your SLA’s are tight. Also what do you do with successfully written data to HDFS from previous stages? M/R has no native support for restarting in the middle. Workflow systems, such as Oozie, have support for this but introduce their own complexities.
- You have to be very careful in the way you partition your data so that it is spread out as evenly as possible. Otherwise, the system will be as fast as the node with the most skewed data to process.
- Introducing additional phases will in most cases reduce the overall throughput of the system. Before moving onto the next phase, all tasks within the phase have to complete. Again, a phase is as fast as the slowest task in the phase. Resorting and shuffling the data for the next reduce phase is often the most time consuming step of any M/R job.
Enriching with Late Arriving Data
What I described at this point assumes all data used by the applications is present and accounted for. Map / Reduce can be used to enrich the “big data” with meta-data presented in external systems (such as databases). It’s possible to solve this problem in a few different ways. In your job you can create two separate mappers that read the data and partition over a common key. The reducer, for a given key, receives values from both mappers if both were present in the mapper phase. If your data is small enough to fit in memory, you can read all the meta-data in and write it to a file. Then using Hadoop’s distributed cache, it can be made available to every mapper. Then the mapper is responsible for joining the two pieces of data in.
What if data arrives late? This is very typical for independent systems processing data at different rates. A meta-data store might be down. The key might have gotten deleted. Do you fail the job and raise an alert the person on call? Do you handle the issue down stream and reconcile later on? A third option is to write the data aside and try again on the next run. Then you run into all the problems mentioned in phase decomposition.
Peak Loads / Multiple times per day
As your system grows over time so does your volume, usually faster than the growth rate of your user base. We ask new questions as we understand the original problem better. This often requires more detailed data. If you’re lucky then two things happen: People want data faster so they can make decisions faster and your volume becomes so high that it doesn’t become cost efficient to process it all in one large pass.
The solution is to run the process multiple times per day. To prevent duplicates, you have to be very careful to mark data completed that you successfully processed earlier in the day. Your production SLA windows become much tighter because you need to ensure the job finishes before the next one starts. How do you coordinate this so it happens automatically? If an inter-day run fails, then can the next scheduled run continue? Splitting a daily job into multiple runs per day solves one problem but introduces others.
I hope I’ve given you the impression that building a non-trivial system on top of M/R is not that simple. The M/R model itself is simple but the new complexity it introduces downstream into your architecture is not.
You may be thinking right now, why not just use Hive, Pig or one of the other many abstractions on top of M/R? In the end most of the complexity doesn’t go away, it’s just hidden from view. So you may not need to program every little detail but in the end they are still things you need to account for.
In my personal experience, I’ve spent orders-of-magnitude more time thinking about how to solve the issues outlined here instead of building the solution to the problem I was originally trying to solve.
For a long time I thought this is just the way it has to be. In my next blog post I’ll show that’s not necessary true. LinkedIn has open sourced Samza, their real-time streaming framework. Being able to program against a system that supports: fully replicated, mutable state, at massive scale is in my opinion a game changer. Samza allows you to think more about the problem you’re trying to solve instead of solving problems introduced by technology.