Paper Review: Discretized Streams: Fault-Tolerant Streaming Computation at Scale

Paper Review:

Summary:

Spark Streaming is a batch process system based on Spark, trying to simulate streaming by divide real time events into micro-batches with time intervals. This causes inevitable high latency but brings great throughput at the same time. It handles fault and stragglers with checkpoints and parallelized re-computation.

Strong points:

  1. The best part of Spark Stream, which is also indicated in the title of the paper, is the fault tolerance and recovery strategy. The system doesn’t take the path of full replication, which requires 2 times of the storage space and coordination. It maintains checkpoints for data sets but doesn’t serially replay the process from the checkpoints during recovery. Spark Stream recovery is a parallel process that distributes the work of the failed node to all others, and brings the data back in seconds;
  2. Consistency is maintained naturally with discretized streams. Unlike Storm, the data is processed “exactly once” because the micro-batch is synchronized with time interval barriers;
  3. Great throughput comparing to any streaming system. Although it’s not fair to comparing micro-batch processing to stream, but when it comes to real world application, micro-batch processing could fit into many scenarios that doesn’t demand micro-seconds latency;

Weak points:

  1. Latency is inevitable because of the time interval that Spark Streaming batches the events. The latency won’t be much lower if we have fewer events in the batch, or more machines in the cluster. This is really different from Storm, in which the latency is directly associated with computation power and incoming events;
  2. Streaming and batch processing are fundamentally different, which makes the comparison between Storm and Spark Streaming invalid in many ways. Some guys from Storm project were really mad about the claim “Spark Streaming is X times faster than Storm”. I would love to see some comparison between Storm Trident and Spark Streaming because micro-batch to micro-batch makes much more sense.
  3. They mentioned way too much on how they could handle stragglers while other system can’t in the introduction and overall structure, but it turns out to be a simple time threshold calculated from the median running time and a node is marked as slow if it takes longer than that. So I’m guess if a node is marked as slow, it won’t be part of the workers in the future since it might bring latency. And it’s job will be  distributed by other workers. But what if the slow is caused by network communication loss, OS noise or underlying HDFS? I’m thinking about giving those slow nodes another chances periodically. But instead of assigning them with real tasks, we give them duplicated ones with special “no pass” marks. So the slow nodes can run the some processes together with normal nodes without returning duplicated results back. There won’t be any bad influence anyway. More sophisticatedly, they will be given the chances in the next [1st, 2nd, 4th, 8th, 16th … ] batches after marked slow.

Paper Review: Storm @ Twitter

Paper Review:

Summary:

Storm is a real-time data process system aimed to handle large-scale streaming data. It offers relatively strong fault-tolerant feature as well as two processing semantic guarantees. It’s now part of the Apache open source project that works well in the Hadoop ecosystem (runs on Mesos and uses ZooKeeper for coordination).

Strong points:

  1. Parallelism is enforced in may layers in the system. There are many worker nodes with one or more worker processes on each of them. For every work process, it runs JVM, carrying one or more executors. And for every executor there could be one or more tasks. So in sum, every machine is divided into multiple services ready for streaming jobs. Although Storm doesn’t optimize the parallelism to the level of machines, this kind of model could still be useful for handling large number of small, streaming process jobs.
  2. This paper doesn’t talk about fault tolerant a lot but it’s easy to see the reason behind. Nimbus and the Supervisor daemons are all stateless, which makes the backup easy to handle. And the snapshot of their memory state is kept locally or in the ZooKeeper. However, the Nimbus failure would block the whole system from topology submissions.
  3. As a real-time streaming process system, Storm has really good average latency. The latency is directly associated with the number of machines during the experiment, which means that the workload is distributed among all the workers nicely. The throughput measurements indicate good scalability as well.

Weak points:

  1. Currently, the programmer has to specify the number of instances for each spout and bolt. I’m not sure if this has already been fixed but if not, the developers could go through a lot of trail and error before they find out the right number for their data stream. I guess Pregel has the same thing with the vertices partitioning. The users have to pre-configure those things before the run and the configuration will be inflexible and inefficient considering what they are doing requires high-level parallelism.
  2. The processing semantics guarantee is weak comparing to Trident/Spark. It’s either at-least-once or at-most-once. Storm doesn’t offer exactly-once guarantee (and i guess that’s one of the reason it has better performance?). Although they have some tricks like the XOR to save memory space and make things look better, lack of stronger guarantee could a disadvantage for some applications.
  3. I guess Storm works well with simple tasks that could fit into bolt but for sophisticated streaming process, it will require a large number of bolt to divide the task into different simple executions and run in parallel, and the overhead for communication and coordination will grow as well. They didn’t compare different streaming process task types in the evaluation so there is no way to find out in the paper, besides the manually configuration for bolts and spouts makes the comparison harder.

Paper Notes:

  1. There are two kinds of vertices. Spouts pull data from the queue like Kafka and bolts process the incoming tuples and pass them down. There could be cycles in the process.
  2. A single master for coordination
  3. Each worker node runs one or more worker processes. Each work process runs a JVM, which runs a or more executors. Each executor is made of one or more tasks. Talking about parallelism.
  4. Summingbird is used for Storm topology generation.
  5. Nimbus and the Supervisors are fail-fast and stateless and all their states are kept in Zookeeper/local disk

Paper Review: Pregel: A System for Large-Scale Graph Processing

Paper Review:

Summary:

Graph problems are naturally hard to be parallelized and distributed because of all the dependency of the vertices and edges. Pregel offers a intuitive way of handling graph processing by using N machines to handle M vertices separately (M >> N). There is a single master in this system, used for coordination, aggregation and analysis.

I tried a similar approach last semester by using threads to simulate nodes in a clustering problem. I believe it’s a good model for demonstration but expensive in terms of communication and coordination overhead.

Strong points:

  1. The way they divide the work is straightforward and intuitive. All the vertices are acting independently with the message passing interface and coordination. Computation is also divided into multiple steps bounded with barriers, which is a model widely used for parallel programming;
  2. Combiners could be useful to reduce the messages that needs to be buffered and processed, hence decrease the running time for each superstep. It works good with shortest path computing;
  3. It’s flexible in many ways. There are different input methods; also the user-defined handlers could be used when the target vertex doesn’t exist or for topology mutation. These features make Pregel suitable for more graph-related problems;

Weak points:

  1. Load balancing and vertices partitioning in topology are hard to solve. Pregel doesn’t care about the load balancing and simply encapsulate steps between barriers regardless of the execution time for each machine. It’s very likely that some vertices have more computation to do because they have more edges or perform as cluster head in the graph. In that case, the threads running for these vertices are going to take longer in each step. The default hashing vertices partitioning might group nodes with heavy computation jobs together. So some machines are going to finish much faster than the others and waste their time by waiting for the barriers. In Pregel, there is no load-balancing solution before or during the run.
  2. Since the whole system is considered synchronous in terms of each step, what’s the actual use of the outdated checkpoints? Re-computation for recovery doesn’t make too much sense because the logged messages from healthy partitions don’t necessarily contain enough information to fully cover the lost partition. For example, if there is an island of nodes in the graph, which sends and receives no messages to/from other part of the graph before the crash, then how could we re-compute the states of the island nodes from the messages of other vertices after reloading the checkpoints. And as we are on the topic of fault tolerance, I’m not sure how does Pregel backup/recover the master.
  3. The paper doesn’t specify the guarantees for messages. It seems to me that slave machines interacting, reporting to the master and getting ACKed by the master could take awfully long if loss and congestion happens. And if one machine gets delayed, the whole barrier is pushed late as well. I’m wondering about how long would a single machine take to finish the same job in the evaluation and I’m almost certain that it takes shorter than we expected. The system doesn’t scale well partially because of the communication overhead.

 

 

Paper Notes:

  1. During every superstep, a user-defined function is executed for each vertex;
  2. Pregel programs are inherently lock-free and no race condition;
  3. Vertices are identifiable with a string and they are assigned with a value to represent the state; edges have source and destination with a user defined value as well;
  4. A vertex can vote to halt when it has done all of its work. However, it could still be revoked by receiving messages. The computation is over when all the vertices are inactive;
  5. Aggregators can be used to global reduction and coordination;
  6. Topology mutations processed in order (removal then addition) and conflicts are resolved with user defined handles.
  7. A single machine is chosen as the master, which is responsible for the coordination of the group;
  8. Periodic ping message is used for failure detection; fault tolerance is achieved through checkpointing

Paper Review: MapReduce Simplified Data Processing on Large Clusters

Paper review:

MapReduce is a distributed data processing approach. One machine acts as master and assign map/reduce tasks to all the machines in the cluster. Map generated intermediate k/v pairs and feed to the reduce workers using underlying filesystem. Reduce workers will merge the data with the same keys and return multiple output file (in the number same as reduce tasks) to the user.

Strong points:

  1. The system is inspired from map and reduce operations from functional programming languages, which is very much different from the way I thought about distributed data processing. This approach divided the programs into two smaller pieces. This division is easy to do in some situations and very important for the task distribution.
  2. The master in this system has relatively light-weighted task to do than workers. It only has to maintain very small amount of data and little computation as well. This means that one master could be used to support a large amount of workers.
  3. The output of distributed computation is guaranteed to be the same as a non-faulty sequential execution of the program when the map and/or reduce operators are non-deterministic. This is ensured by the master control as well as the underlying filesystem. I’m not sure about other data processing models (like spark) but this seems like a strong and necessary point to me.
  4. All the refinements in section 4 are quite useful in real world engineering. I can only imagine people get saved by bad records skipping and debug with local execution all the time.

Weak points:

  1. Each idle worker machine will be assigned with map or reduce task which means that one machine can only do a certain kind of job during a single MapReduce function. Does this mean that reduce workers kinda have to wait for the map workers to finish first? Also the map workers might be idle when reduce workers are busy. Even with the best coordination, we would still waiting time and wasted computing resources. Moreover, reduce workers have to load data from the disk of map workers.
  2. From the parallel programming’s point of view, if one worker node is slow but still in-progress (which means that it could be assigned with work), it will slow down the whole MapReduce process. Lol I wrote this down long before I read the section 3.6 and they think it’s one of the common causes that lengthens the processing time and call it straggler. They have a solution that’s simple and practical so I guess this one should go to the strong points.
  3. If the Map task splits are relatively the same in terms of computation and we have a bunch of node with same hardware and configuration. Map tasks running on different machines are very likely to be finished at the exact same time and this will cause the network bandwidth consumption to be extremely bursty since all the intermediate data will be moved from M workers to R workers using the same network media in the data center. It will significantly slows down the data transfer and the total running time.
  4. Based on the previous three points, why don’t we divide the input into N splits which is hundreds times of the number of machines in the cluster and then assign the inputs to workers with both M and R tasks? In that way if a node is slow, it would be assigned withe less splits and there’s virtually no waiting time for intermediate data transfer as well as waste of computation resource.
  5. MapReduce operation will be aborted if master fails. Even though it’s unlikely to happen considering there is only one master machine, the failure of master might cause huge loss on the computation progress. I don’t think it’s hard to keep a replica of some sort in different network and power supply. The replica serves as data backups under normal circumstances but will act as master if the original one fails. However, this will also bring overheads to master operations since we have to backup the computation states and other master data structures constantly before commit. I guess it’s more of a trade-off between reliability and performance than a weak point.
  6. MapReduce is not really flexible enough for all the big data processing situations. For example, there are cases when my operations couldn’t be expressed in the way of map and reduce. What if we have a huge data set that is used by all but in different computation method (in this case the data passed into each computer should be the same but the program tasks are not)? MapReduce has a rigid way of seeing programs/functions/tasks as a completely different thing from data. Can I use lambda calculus and see data and function as the same thing in distributed computing? That way the computation will no longer be a constraint of distribution. I can pass functions and/or data to all the workers with coordination and load-balancing without worrying about the trouble to figure out the map and reduce functions.

 

Paper Outline:

  1. Introduction:

    • Straightforward computation but large data;
    • Inspired from Lisp and other functional languages:
      • map: compute a set of intermediate k/v pairs;
      • reduce: merge all intermediate values with the same key;
  2. Programming model:

    • Example:

      • map takes a document as (name, content) pair and return a list of (word, 1) pairs to the reduce function;
      • reduce takes intermediate pairs with the same keys, which is word in this case, and increment to the total count of that word;
    • Types:

      • map: (k1, v1) -> list(k2, v2);
      • reduce: (k2, list(v2)) -> list(v2);
      • input k/v are drawn from a different domain than the output k/v;
      • intermediate k/v are from the same domain as output k/v;
    • More examples:

      • distributed grep;
      • count for URL access frequency;
      • reverse web-link graph;
      • term-vector per host;
      • inverted index;
      • distributed sort;
  3. Implementation:

    • Environment:

      • a cluster with thousands with commodity machine connected with commodity networking hardware;
      • supported with distributed file system using replication to provide availability and reliability;
      • a scheduling system takes orders from users;
    • Execution overview:

      • input will be divided (in parallel by different machines) into M splits, with a typical size of 16 to 64 MB per split;
      • one of the program copy acts as master and assign work to the workers. It picks idle workers and assigns each one a M or R task;
      • the workers with M task read the content of corresponding input splits and buffer the intermediate output k/v pairs in the memory;
      • the buffered pairs are saved to local disks periodically, partitioned into R regions. The locations of those pairs are passed to the master to perform R tasking assignment;
      • reduce workers are notified by the master about the data locations. They read the buffered data from the local disks of map workers;
      • reduce workers sorts the data so that the pairs with the same keys are grouped together. Sorting is necessary to reduce the workload and will be external if needed;
      • reduce workers iterate though the data and append the reduce function output to a final output;
      • master wakes up user program and returns;
    • Master data structures:

      • state (idle, in-progress or completed) and identities of the workers;
      • locations of intermediate file regions;
    • Fault tolerance:

      • worker failure:
        • detected with periodical ping from the master;
        • any finished tasks will be reassigned to other nodes upon failure;
        • finished map tasks will be reassigned to other nodes upon failure;
          • because the intermediate data is stored in local disk and therefore inaccessible. On the other hand, completed reduce tasks output will be stored globally;
          • all R workers will be notified about the reassignment;
      • master failure:
        • periodic checkpoints of the master data structures;
        • computation will be aborted if master fails;
          • highly unlikely for single master machine;
      • semantics in the presence of failures:
        • same as sequential execution if M/R is deterministic because:
          • atomic commits of map and reduce task output;
          • master ignores duplicated commit for map operations;
          • underlying file system guarantees to eliminate duplicated reduce operation output;
        • weak but reasonable semantics if M/R is non-deterministic;
          • due to different execution orderings;
    • Locality:

      • input data is stored on the local disks of the machines in the cluster;
      • files are divided into blocks of 64 MB and stores multiple copies (typically three) over the cluster;
      • master will try to save bandwidth by:
        • assign machines with data replications as M workers;
        • assign machines near data replicas as M workers;
    • Task granularity

      • the number of M and R tasks should be significantly bigger than the number of machines to improve dynamic load balancing and speed up recovery upon node failure;
      • O(M+R) scheduling decisions and O(M*R) task states should be handled by the master machine;
      • R shouldn’t be too large otherwise it might result in fragmentation of the output file;
    • Backup tools:

      • straggler: a machine which takes too long to complete the task;
      • the master perform reassignment of the in-progress tasks when it comes close to the end of MapReduce operation;
      • little overhead in theory but works in large MapReduce operations;
  4. Refinements:

    • Partitioning function:

      • “hash(key) mod R” as partitioning function;
      • other ones with special needs;
    • Ordering guarantees:

      • within a given partition, intermediate k/v pairs are processed in increasing key order;
        • easier to sort output file per partition;
    • Combiner function:

      • combiner function can partially merge intermediate data;
        • executed on each machine that performs a map task;
        • typically the same as reduce function;
    • Input and output types:

      • user can make their own support for input type by providing reader interface;
    • Side-effects:

      • no support for multiple output files from a single task;
    • Skipping bad records:

      • optional mode of execution to ignore bad records and move on;
    • Local execution;

    • Status Information;

    • Counters;