Paper Review: Pig Latin: A Not-So-Foreign Language for Data Processing

Summary:

Pig is a scripting language built upon MapReduce, trying to save data scientist from the control flow MR programs that they are struggling with. It’s a combination of SQL and procedural language, which compiles highly abstract data flow programs into MapReduce executions. So developers don’t have to worry much about parallelism, fault-tolerance and pipelining etc.

Strong points:

  1. Pig is a combination of SQL querying language and low level MapReduce (high level renders and low level manipulations), which grants Pig some benefits of both. Programmers can easily write “sequential (not declarative)” programs without worrying about database layout and MapReduce parallelism.
  2. Pig is flexible in many ways like various data models and user-define function with Java and Python (in later versions) support etc. You can specify the control flow of programs at each step, just like any other procedural languages while focusing on data flow at the same time.
  3. Also Pig is different from relation database or OLAP. It can deal with large unstructured datasets or nested data structure. On the other hand, it enjoys parallelism just like parallel database, thanks to the internal MapReduce compilation.
  4. Each step is pipelined and optimized for execution. Also it’s based on MapReduce so all the fault-tolerance, scalability and other MR features are given. As a developer you don’t have to worry much about the lower level of the language.

Weak points:

  1. First of all Pig programs are still compiled into MapReduce, which is inefficient and inflexible without doubt. Data must be materialized and replicated on the distributed storage between successive MapReduce steps and this makes thing much slower even with the pipeline. Also the bandwidth might become an issue when pipelined Pig programs are running: at the end of each step, all the intermediate data will be transferred via network almost simultaneously.
  2. Execution order is not actually sequential as the programs wrote in Pig. This is also mentioned in the “future work” part of the paper, that some of the execution order will be modified to meet better performance. While this is a good feature in most scenarios, it could be dangerous and hard to debug and fix in some cases.
  3. The flexibility could be a bit of challenge for developers. For example, only small part of parallelized primitives are included in Pig and users are responsible for the rest of the implementation as well as efficiency. I can imagine it would be a pain to develop a  efficient UDF without support from the community.
Advertisement

Paper Review: Druid: A Real-time Analytical Data Store

Paper Review:

Summary:

Druid is a real-time analytical datastore with column-oriented layout and different types of nodes to support different functionality. It depends on deep storage, MySQL and Zookeeper for persistent storage, query and coordination purpose separately and could handle queries faster than MySQL and Hadoop.

Strong points:

  1. Historical node tier offers flexibility and better performance. Nodes within the same tier shares the identical configuration and different tiers are divided according to their data “importance”. A tier with more cores and memories is suitable for frequently accessed data. Although the tier seems to be manually configured, it’s still a good solution to take full advantage of the limited hardware resources.
  2. Zookeeper and MySQL outages do not impact current data availability on historical nodes. With the cached data, historical nodes are still able to handle its current queries. Also the caching saves time by eliminating unnecessary accesses to the deep storage. However, the coordinator nodes might become unable to perform any operation.
  3. Druid uses column-oriented storage, which is similar to Dremel. Column storage is more efficient in the data analytics since only the useful data is actually loaded and scanned. In row-based storage, all the data in the rows have to be loaded, which results in significant longer running time.

Weak points:

  1. A Druid cluster consists of different types of nodes, which are only responsible for specific kinds of tasks. While this might be a simple design, the load balancing between different roles might be an issue and makes the system inflexible under certain scenarios. If a system has a load of events during day time and all the queries are in the night, Druid still have to separate the roles of real-time nodes and historical nodes, which means that only part of the system is working at any time of the data while the rest of the machines (either real-time nodes or historical nodes) are not contributing and wasted.
  2. I think Druid has the same downside as Hadoop MapReduce, in the sense that all the “intermediate results” are saved remotely in some disk. To access the intermediate result, MapReduce has to load them from disk and transfer using internet bandwidth and so is Druid. In Druid, the batch data are saved in deep storage, which is a distributed file system and all the processing requires loading the data remotely. This brings another layer of delay and dependency. I wonder if the “store” and “processor” could be on the same machine so that we can save the loading and transferring.
  3. The coordinator nodes could be redundant since we already have Zookeeper. Zookeeper is definitely capable of telling what data the historical nodes should load and replicate, and It’s not hard to implement Zookeeper with simple load balancing function. On the other hand, the historical nodes might suffer short outage during the leader election of coordinator nodes. So for me it doesn’t really make sense to add coordinator nodes into the system.
  4. The overall design doesn’t fit the the philosophy of Druid class in role playing games at all. Yes, Druid (in this article) is capable of handling complex data analysis, but the design is messy and inelegant since it solves the problem using different types of nodes and dependency. Druid class, in the game, could transfer into different animals to tackle different problems. It’s flexible and independent. So I guess the “real Druid design” of the system should be one kind of nodes that is programed to handle all the problems in different manner.

 

 

Paper Review: Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks

Paper Review:

Summary:

Dryad is Microsoft version of distributed/parallel computing model. It aims to free the developers from the rigid settings and process of MapReduce and design your own concurrent computation model. It doesn’t care about the data flow connection method or the computation process as well (as long as it’s not cycle) so that developers can pretty much configure Dryad the way they want.

Strong points:

  1. Dryad strikes me as a generic version of MapReduce at first. It has all the nodes and looks like map/reduce model but the process is much more configurable. As long as the process forms a acyclic graph, Dryad can go with it. Unlike MapReduce, which is only good for certain kind of massive parallel processing model, Dryad seems able to fit into anything.
  2. One of the biggest constraint of MapReduce is that the developer doesn’t have the freedom to choose how intermediate data is transferred during the process. And Spark beats MapReduce partial due to it transfer data via memory. In Dryad, developers can choose different ways to make the transfer, like TCP, memory or disk. Of course the memory transfer would be the fast way but it’s always nice to have some other options around in case the memory is not enough or some other reason.
  3. The computation is modeled with acyclic graphs. Dryad offers ways to monitors the vertices as well as edges (state manager for each vertex and connection manager, which is not in the paper). It could make dynamic changes to the computation graph according to the monitored results to handle special cases like slow machines.

Weak points:

  1. While Dryad aims to “make it easier for developers to write efficient parallel and distributed applications”, it doesn’t hide all the execution details from developers. Instead it’s doing the exact opposite by exposing more internal structures and leave the decision to developers. The computation, connection, input/output and the monitoring looks intimidating. And the insufficient language support (from what I’ve seen so far it uses C++ and query languages only) makes things even harder.
  2. The execution stops as the job manager fails is a definitely a disadvantage in the system. It could be fixed with 1) distributed coordination with Paxos and some other consensus (slow but effective) 2) shadow master (faster recovery). Either way it’s not hard to implement, which makes me wonder why is this still an issue in Dryad.
  3. There are two reasons why the graph should be acyclic: 1) scheduling is easy because there are no deadlocks and the process is sequential and 2) without cycles, recovering is straightforward. However, there might be some cases where the developers might need to run one method on a piece of data for multiple times in order to meet the requirement. And this is not allowed in current Dryad system.

Paper Review: Dremel: Interactive Analysis of Web-Scale Datasets

Paper Review:

Summary:

Dremel is an interactive ad hoc query system designed to handle real-time web-scale queries with low latency. It uses tree-structured servers and columnar storage with replication to achieve great performance over MapReduce.

Strong points:

  1. Columnar storage is better for faster column retrieval and feature extraction. It exposes data in a more feature-oriented way and make programs easy to process data column by column with better data locality.
  2. Tree structure of server is great. There are more than two layers of servers, which is pretty much all we’ve seen so far (one for metadata and one for data). This structure reminds me of the biggest query system on earth: DNS. With a intermediate layer, the system benefits from one more layers of cache and way better scalabilty. Also it has the potential to yet expand to a bigger scale with another intermediate layer.
  3. The performance is great comparing to similar systems like Hive, which is not real time. It delivers all the requirements as a real-time interactive ad hoc querying system. However, it seems to me that Apache Drill could achieve pretty much the same thing with flexibility on data types.

Weak points:

  1. Relatively poor performance when few columns are read or dealing with unstructured data. In that case we cannot take advantage of the columnar storage. But I guess they are pretty sure about the query types that Dremel is going to handle so it’s fine. Dremel is design to deal with the Ad Hoc queries of structured data ready to be analysed.
  2. I don’t think MapReduce and Dremel make a valid comparison. Of course users can still use MapReduce do perform Dremel’s query and analysis job but that’s not what MapReduce is designed for, which is distributed batch processing. Those two are more complimentary rather than comparable to me, and that’s exactly what the authors suggested in the observation in the paper: “MR and query processing can be used in a complementary fashion; one layer’s output can feed another’s input”.
  3. There’s not you can do to modify the data (update or creation) expect append, which limits what users could perform with Dremel. I guess implement update method is not in their development priority since the data analysis rarely used modification but still it’s nice to have a flexible way to change the data.

Paper Review: Discretized Streams: Fault-Tolerant Streaming Computation at Scale

Paper Review:

Summary:

Spark Streaming is a batch process system based on Spark, trying to simulate streaming by divide real time events into micro-batches with time intervals. This causes inevitable high latency but brings great throughput at the same time. It handles fault and stragglers with checkpoints and parallelized re-computation.

Strong points:

  1. The best part of Spark Stream, which is also indicated in the title of the paper, is the fault tolerance and recovery strategy. The system doesn’t take the path of full replication, which requires 2 times of the storage space and coordination. It maintains checkpoints for data sets but doesn’t serially replay the process from the checkpoints during recovery. Spark Stream recovery is a parallel process that distributes the work of the failed node to all others, and brings the data back in seconds;
  2. Consistency is maintained naturally with discretized streams. Unlike Storm, the data is processed “exactly once” because the micro-batch is synchronized with time interval barriers;
  3. Great throughput comparing to any streaming system. Although it’s not fair to comparing micro-batch processing to stream, but when it comes to real world application, micro-batch processing could fit into many scenarios that doesn’t demand micro-seconds latency;

Weak points:

  1. Latency is inevitable because of the time interval that Spark Streaming batches the events. The latency won’t be much lower if we have fewer events in the batch, or more machines in the cluster. This is really different from Storm, in which the latency is directly associated with computation power and incoming events;
  2. Streaming and batch processing are fundamentally different, which makes the comparison between Storm and Spark Streaming invalid in many ways. Some guys from Storm project were really mad about the claim “Spark Streaming is X times faster than Storm”. I would love to see some comparison between Storm Trident and Spark Streaming because micro-batch to micro-batch makes much more sense.
  3. They mentioned way too much on how they could handle stragglers while other system can’t in the introduction and overall structure, but it turns out to be a simple time threshold calculated from the median running time and a node is marked as slow if it takes longer than that. So I’m guess if a node is marked as slow, it won’t be part of the workers in the future since it might bring latency. And it’s job will be  distributed by other workers. But what if the slow is caused by network communication loss, OS noise or underlying HDFS? I’m thinking about giving those slow nodes another chances periodically. But instead of assigning them with real tasks, we give them duplicated ones with special “no pass” marks. So the slow nodes can run the some processes together with normal nodes without returning duplicated results back. There won’t be any bad influence anyway. More sophisticatedly, they will be given the chances in the next [1st, 2nd, 4th, 8th, 16th … ] batches after marked slow.

Paper Review: Storm @ Twitter

Paper Review:

Summary:

Storm is a real-time data process system aimed to handle large-scale streaming data. It offers relatively strong fault-tolerant feature as well as two processing semantic guarantees. It’s now part of the Apache open source project that works well in the Hadoop ecosystem (runs on Mesos and uses ZooKeeper for coordination).

Strong points:

  1. Parallelism is enforced in may layers in the system. There are many worker nodes with one or more worker processes on each of them. For every work process, it runs JVM, carrying one or more executors. And for every executor there could be one or more tasks. So in sum, every machine is divided into multiple services ready for streaming jobs. Although Storm doesn’t optimize the parallelism to the level of machines, this kind of model could still be useful for handling large number of small, streaming process jobs.
  2. This paper doesn’t talk about fault tolerant a lot but it’s easy to see the reason behind. Nimbus and the Supervisor daemons are all stateless, which makes the backup easy to handle. And the snapshot of their memory state is kept locally or in the ZooKeeper. However, the Nimbus failure would block the whole system from topology submissions.
  3. As a real-time streaming process system, Storm has really good average latency. The latency is directly associated with the number of machines during the experiment, which means that the workload is distributed among all the workers nicely. The throughput measurements indicate good scalability as well.

Weak points:

  1. Currently, the programmer has to specify the number of instances for each spout and bolt. I’m not sure if this has already been fixed but if not, the developers could go through a lot of trail and error before they find out the right number for their data stream. I guess Pregel has the same thing with the vertices partitioning. The users have to pre-configure those things before the run and the configuration will be inflexible and inefficient considering what they are doing requires high-level parallelism.
  2. The processing semantics guarantee is weak comparing to Trident/Spark. It’s either at-least-once or at-most-once. Storm doesn’t offer exactly-once guarantee (and i guess that’s one of the reason it has better performance?). Although they have some tricks like the XOR to save memory space and make things look better, lack of stronger guarantee could a disadvantage for some applications.
  3. I guess Storm works well with simple tasks that could fit into bolt but for sophisticated streaming process, it will require a large number of bolt to divide the task into different simple executions and run in parallel, and the overhead for communication and coordination will grow as well. They didn’t compare different streaming process task types in the evaluation so there is no way to find out in the paper, besides the manually configuration for bolts and spouts makes the comparison harder.

Paper Notes:

  1. There are two kinds of vertices. Spouts pull data from the queue like Kafka and bolts process the incoming tuples and pass them down. There could be cycles in the process.
  2. A single master for coordination
  3. Each worker node runs one or more worker processes. Each work process runs a JVM, which runs a or more executors. Each executor is made of one or more tasks. Talking about parallelism.
  4. Summingbird is used for Storm topology generation.
  5. Nimbus and the Supervisors are fail-fast and stateless and all their states are kept in Zookeeper/local disk

Paper Review: Pregel: A System for Large-Scale Graph Processing

Paper Review:

Summary:

Graph problems are naturally hard to be parallelized and distributed because of all the dependency of the vertices and edges. Pregel offers a intuitive way of handling graph processing by using N machines to handle M vertices separately (M >> N). There is a single master in this system, used for coordination, aggregation and analysis.

I tried a similar approach last semester by using threads to simulate nodes in a clustering problem. I believe it’s a good model for demonstration but expensive in terms of communication and coordination overhead.

Strong points:

  1. The way they divide the work is straightforward and intuitive. All the vertices are acting independently with the message passing interface and coordination. Computation is also divided into multiple steps bounded with barriers, which is a model widely used for parallel programming;
  2. Combiners could be useful to reduce the messages that needs to be buffered and processed, hence decrease the running time for each superstep. It works good with shortest path computing;
  3. It’s flexible in many ways. There are different input methods; also the user-defined handlers could be used when the target vertex doesn’t exist or for topology mutation. These features make Pregel suitable for more graph-related problems;

Weak points:

  1. Load balancing and vertices partitioning in topology are hard to solve. Pregel doesn’t care about the load balancing and simply encapsulate steps between barriers regardless of the execution time for each machine. It’s very likely that some vertices have more computation to do because they have more edges or perform as cluster head in the graph. In that case, the threads running for these vertices are going to take longer in each step. The default hashing vertices partitioning might group nodes with heavy computation jobs together. So some machines are going to finish much faster than the others and waste their time by waiting for the barriers. In Pregel, there is no load-balancing solution before or during the run.
  2. Since the whole system is considered synchronous in terms of each step, what’s the actual use of the outdated checkpoints? Re-computation for recovery doesn’t make too much sense because the logged messages from healthy partitions don’t necessarily contain enough information to fully cover the lost partition. For example, if there is an island of nodes in the graph, which sends and receives no messages to/from other part of the graph before the crash, then how could we re-compute the states of the island nodes from the messages of other vertices after reloading the checkpoints. And as we are on the topic of fault tolerance, I’m not sure how does Pregel backup/recover the master.
  3. The paper doesn’t specify the guarantees for messages. It seems to me that slave machines interacting, reporting to the master and getting ACKed by the master could take awfully long if loss and congestion happens. And if one machine gets delayed, the whole barrier is pushed late as well. I’m wondering about how long would a single machine take to finish the same job in the evaluation and I’m almost certain that it takes shorter than we expected. The system doesn’t scale well partially because of the communication overhead.

 

 

Paper Notes:

  1. During every superstep, a user-defined function is executed for each vertex;
  2. Pregel programs are inherently lock-free and no race condition;
  3. Vertices are identifiable with a string and they are assigned with a value to represent the state; edges have source and destination with a user defined value as well;
  4. A vertex can vote to halt when it has done all of its work. However, it could still be revoked by receiving messages. The computation is over when all the vertices are inactive;
  5. Aggregators can be used to global reduction and coordination;
  6. Topology mutations processed in order (removal then addition) and conflicts are resolved with user defined handles.
  7. A single machine is chosen as the master, which is responsible for the coordination of the group;
  8. Periodic ping message is used for failure detection; fault tolerance is achieved through checkpointing