Paper Review: Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks

Paper Review:

Summary:

Dryad is Microsoft version of distributed/parallel computing model. It aims to free the developers from the rigid settings and process of MapReduce and design your own concurrent computation model. It doesn’t care about the data flow connection method or the computation process as well (as long as it’s not cycle) so that developers can pretty much configure Dryad the way they want.

Strong points:

  1. Dryad strikes me as a generic version of MapReduce at first. It has all the nodes and looks like map/reduce model but the process is much more configurable. As long as the process forms a acyclic graph, Dryad can go with it. Unlike MapReduce, which is only good for certain kind of massive parallel processing model, Dryad seems able to fit into anything.
  2. One of the biggest constraint of MapReduce is that the developer doesn’t have the freedom to choose how intermediate data is transferred during the process. And Spark beats MapReduce partial due to it transfer data via memory. In Dryad, developers can choose different ways to make the transfer, like TCP, memory or disk. Of course the memory transfer would be the fast way but it’s always nice to have some other options around in case the memory is not enough or some other reason.
  3. The computation is modeled with acyclic graphs. Dryad offers ways to monitors the vertices as well as edges (state manager for each vertex and connection manager, which is not in the paper). It could make dynamic changes to the computation graph according to the monitored results to handle special cases like slow machines.

Weak points:

  1. While Dryad aims to “make it easier for developers to write efficient parallel and distributed applications”, it doesn’t hide all the execution details from developers. Instead it’s doing the exact opposite by exposing more internal structures and leave the decision to developers. The computation, connection, input/output and the monitoring looks intimidating. And the insufficient language support (from what I’ve seen so far it uses C++ and query languages only) makes things even harder.
  2. The execution stops as the job manager fails is a definitely a disadvantage in the system. It could be fixed with 1) distributed coordination with Paxos and some other consensus (slow but effective) 2) shadow master (faster recovery). Either way it’s not hard to implement, which makes me wonder why is this still an issue in Dryad.
  3. There are two reasons why the graph should be acyclic: 1) scheduling is easy because there are no deadlocks and the process is sequential and 2) without cycles, recovering is straightforward. However, there might be some cases where the developers might need to run one method on a piece of data for multiple times in order to meet the requirement. And this is not allowed in current Dryad system.
Advertisement

Paper Review: Dremel: Interactive Analysis of Web-Scale Datasets

Paper Review:

Summary:

Dremel is an interactive ad hoc query system designed to handle real-time web-scale queries with low latency. It uses tree-structured servers and columnar storage with replication to achieve great performance over MapReduce.

Strong points:

  1. Columnar storage is better for faster column retrieval and feature extraction. It exposes data in a more feature-oriented way and make programs easy to process data column by column with better data locality.
  2. Tree structure of server is great. There are more than two layers of servers, which is pretty much all we’ve seen so far (one for metadata and one for data). This structure reminds me of the biggest query system on earth: DNS. With a intermediate layer, the system benefits from one more layers of cache and way better scalabilty. Also it has the potential to yet expand to a bigger scale with another intermediate layer.
  3. The performance is great comparing to similar systems like Hive, which is not real time. It delivers all the requirements as a real-time interactive ad hoc querying system. However, it seems to me that Apache Drill could achieve pretty much the same thing with flexibility on data types.

Weak points:

  1. Relatively poor performance when few columns are read or dealing with unstructured data. In that case we cannot take advantage of the columnar storage. But I guess they are pretty sure about the query types that Dremel is going to handle so it’s fine. Dremel is design to deal with the Ad Hoc queries of structured data ready to be analysed.
  2. I don’t think MapReduce and Dremel make a valid comparison. Of course users can still use MapReduce do perform Dremel’s query and analysis job but that’s not what MapReduce is designed for, which is distributed batch processing. Those two are more complimentary rather than comparable to me, and that’s exactly what the authors suggested in the observation in the paper: “MR and query processing can be used in a complementary fashion; one layer’s output can feed another’s input”.
  3. There’s not you can do to modify the data (update or creation) expect append, which limits what users could perform with Dremel. I guess implement update method is not in their development priority since the data analysis rarely used modification but still it’s nice to have a flexible way to change the data.

Paper Review: Discretized Streams: Fault-Tolerant Streaming Computation at Scale

Paper Review:

Summary:

Spark Streaming is a batch process system based on Spark, trying to simulate streaming by divide real time events into micro-batches with time intervals. This causes inevitable high latency but brings great throughput at the same time. It handles fault and stragglers with checkpoints and parallelized re-computation.

Strong points:

  1. The best part of Spark Stream, which is also indicated in the title of the paper, is the fault tolerance and recovery strategy. The system doesn’t take the path of full replication, which requires 2 times of the storage space and coordination. It maintains checkpoints for data sets but doesn’t serially replay the process from the checkpoints during recovery. Spark Stream recovery is a parallel process that distributes the work of the failed node to all others, and brings the data back in seconds;
  2. Consistency is maintained naturally with discretized streams. Unlike Storm, the data is processed “exactly once” because the micro-batch is synchronized with time interval barriers;
  3. Great throughput comparing to any streaming system. Although it’s not fair to comparing micro-batch processing to stream, but when it comes to real world application, micro-batch processing could fit into many scenarios that doesn’t demand micro-seconds latency;

Weak points:

  1. Latency is inevitable because of the time interval that Spark Streaming batches the events. The latency won’t be much lower if we have fewer events in the batch, or more machines in the cluster. This is really different from Storm, in which the latency is directly associated with computation power and incoming events;
  2. Streaming and batch processing are fundamentally different, which makes the comparison between Storm and Spark Streaming invalid in many ways. Some guys from Storm project were really mad about the claim “Spark Streaming is X times faster than Storm”. I would love to see some comparison between Storm Trident and Spark Streaming because micro-batch to micro-batch makes much more sense.
  3. They mentioned way too much on how they could handle stragglers while other system can’t in the introduction and overall structure, but it turns out to be a simple time threshold calculated from the median running time and a node is marked as slow if it takes longer than that. So I’m guess if a node is marked as slow, it won’t be part of the workers in the future since it might bring latency. And it’s job will be  distributed by other workers. But what if the slow is caused by network communication loss, OS noise or underlying HDFS? I’m thinking about giving those slow nodes another chances periodically. But instead of assigning them with real tasks, we give them duplicated ones with special “no pass” marks. So the slow nodes can run the some processes together with normal nodes without returning duplicated results back. There won’t be any bad influence anyway. More sophisticatedly, they will be given the chances in the next [1st, 2nd, 4th, 8th, 16th … ] batches after marked slow.

Paper Review: Storm @ Twitter

Paper Review:

Summary:

Storm is a real-time data process system aimed to handle large-scale streaming data. It offers relatively strong fault-tolerant feature as well as two processing semantic guarantees. It’s now part of the Apache open source project that works well in the Hadoop ecosystem (runs on Mesos and uses ZooKeeper for coordination).

Strong points:

  1. Parallelism is enforced in may layers in the system. There are many worker nodes with one or more worker processes on each of them. For every work process, it runs JVM, carrying one or more executors. And for every executor there could be one or more tasks. So in sum, every machine is divided into multiple services ready for streaming jobs. Although Storm doesn’t optimize the parallelism to the level of machines, this kind of model could still be useful for handling large number of small, streaming process jobs.
  2. This paper doesn’t talk about fault tolerant a lot but it’s easy to see the reason behind. Nimbus and the Supervisor daemons are all stateless, which makes the backup easy to handle. And the snapshot of their memory state is kept locally or in the ZooKeeper. However, the Nimbus failure would block the whole system from topology submissions.
  3. As a real-time streaming process system, Storm has really good average latency. The latency is directly associated with the number of machines during the experiment, which means that the workload is distributed among all the workers nicely. The throughput measurements indicate good scalability as well.

Weak points:

  1. Currently, the programmer has to specify the number of instances for each spout and bolt. I’m not sure if this has already been fixed but if not, the developers could go through a lot of trail and error before they find out the right number for their data stream. I guess Pregel has the same thing with the vertices partitioning. The users have to pre-configure those things before the run and the configuration will be inflexible and inefficient considering what they are doing requires high-level parallelism.
  2. The processing semantics guarantee is weak comparing to Trident/Spark. It’s either at-least-once or at-most-once. Storm doesn’t offer exactly-once guarantee (and i guess that’s one of the reason it has better performance?). Although they have some tricks like the XOR to save memory space and make things look better, lack of stronger guarantee could a disadvantage for some applications.
  3. I guess Storm works well with simple tasks that could fit into bolt but for sophisticated streaming process, it will require a large number of bolt to divide the task into different simple executions and run in parallel, and the overhead for communication and coordination will grow as well. They didn’t compare different streaming process task types in the evaluation so there is no way to find out in the paper, besides the manually configuration for bolts and spouts makes the comparison harder.

Paper Notes:

  1. There are two kinds of vertices. Spouts pull data from the queue like Kafka and bolts process the incoming tuples and pass them down. There could be cycles in the process.
  2. A single master for coordination
  3. Each worker node runs one or more worker processes. Each work process runs a JVM, which runs a or more executors. Each executor is made of one or more tasks. Talking about parallelism.
  4. Summingbird is used for Storm topology generation.
  5. Nimbus and the Supervisors are fail-fast and stateless and all their states are kept in Zookeeper/local disk

Paper Review: Pregel: A System for Large-Scale Graph Processing

Paper Review:

Summary:

Graph problems are naturally hard to be parallelized and distributed because of all the dependency of the vertices and edges. Pregel offers a intuitive way of handling graph processing by using N machines to handle M vertices separately (M >> N). There is a single master in this system, used for coordination, aggregation and analysis.

I tried a similar approach last semester by using threads to simulate nodes in a clustering problem. I believe it’s a good model for demonstration but expensive in terms of communication and coordination overhead.

Strong points:

  1. The way they divide the work is straightforward and intuitive. All the vertices are acting independently with the message passing interface and coordination. Computation is also divided into multiple steps bounded with barriers, which is a model widely used for parallel programming;
  2. Combiners could be useful to reduce the messages that needs to be buffered and processed, hence decrease the running time for each superstep. It works good with shortest path computing;
  3. It’s flexible in many ways. There are different input methods; also the user-defined handlers could be used when the target vertex doesn’t exist or for topology mutation. These features make Pregel suitable for more graph-related problems;

Weak points:

  1. Load balancing and vertices partitioning in topology are hard to solve. Pregel doesn’t care about the load balancing and simply encapsulate steps between barriers regardless of the execution time for each machine. It’s very likely that some vertices have more computation to do because they have more edges or perform as cluster head in the graph. In that case, the threads running for these vertices are going to take longer in each step. The default hashing vertices partitioning might group nodes with heavy computation jobs together. So some machines are going to finish much faster than the others and waste their time by waiting for the barriers. In Pregel, there is no load-balancing solution before or during the run.
  2. Since the whole system is considered synchronous in terms of each step, what’s the actual use of the outdated checkpoints? Re-computation for recovery doesn’t make too much sense because the logged messages from healthy partitions don’t necessarily contain enough information to fully cover the lost partition. For example, if there is an island of nodes in the graph, which sends and receives no messages to/from other part of the graph before the crash, then how could we re-compute the states of the island nodes from the messages of other vertices after reloading the checkpoints. And as we are on the topic of fault tolerance, I’m not sure how does Pregel backup/recover the master.
  3. The paper doesn’t specify the guarantees for messages. It seems to me that slave machines interacting, reporting to the master and getting ACKed by the master could take awfully long if loss and congestion happens. And if one machine gets delayed, the whole barrier is pushed late as well. I’m wondering about how long would a single machine take to finish the same job in the evaluation and I’m almost certain that it takes shorter than we expected. The system doesn’t scale well partially because of the communication overhead.

 

 

Paper Notes:

  1. During every superstep, a user-defined function is executed for each vertex;
  2. Pregel programs are inherently lock-free and no race condition;
  3. Vertices are identifiable with a string and they are assigned with a value to represent the state; edges have source and destination with a user defined value as well;
  4. A vertex can vote to halt when it has done all of its work. However, it could still be revoked by receiving messages. The computation is over when all the vertices are inactive;
  5. Aggregators can be used to global reduction and coordination;
  6. Topology mutations processed in order (removal then addition) and conflicts are resolved with user defined handles.
  7. A single machine is chosen as the master, which is responsible for the coordination of the group;
  8. Periodic ping message is used for failure detection; fault tolerance is achieved through checkpointing

Paper Review: Spark: Cluster Computing withWorking Sets

Paper Review:

Summary:

Spark is a new computation model which is different from the mainstream MapReduce. It uses RDD, derived from DSM, so that the intermediate results could be stored in-memory to reduce the disk r/w overhead. With the lineage property of RDD, Spark doesn’t have to make disk checkpoints for the purpose of recovery since the lost data could be reconstructed with the existing RDD. Also, Spark is running on Mesos so it naturally extends some of the awesome features in Mesos like the optimal task assignment with delayed scheduling.

Strong points:

  1. Flexible in terms of languages that could be used in the driver, the cache option that enables users to trade off between the cost of storing and the speed of accessing data, and running on Mesos so that it could share hardware resources along with other frameworks.
  2. Delay scheduling and the almost optimal tasks assigning features from Mesos. Developers don’t have to worry about the underlying computation task distribution because it’s handled nicely by Mesos.
  3. RDD greatly reduces the read/write operations of the disk comparing to Hadoop. Also there are many other optimizations to avoid disk r/w like when a node is doing broadcasting, it checks whether the data is in it’s memory first. It’s safe to say that disk r/w operations is the bottleneck for Hadoop.
  4. From my understanding, Spark is a more general execution model than MapReduce. The operations written in Scala are user-defined and easy to copied around. It can run different tasks including MapReduce.

Weak points:

Most of the weak points are due to the lack of details of the paper.

  1. Choosing Scala over Java, or some other languages in implementing Spark is interesting. It’s discussed by the first author himself on Quora: Why is Apache Spark implemented in Scala. I’ll say that Spark is still in a performance-critical layer and applications will be built on it. Using Scala might lead to worse performance comparing to Java implementation and so it was showed in the results section in the paper. Spark was running slower than Hadoop without iterations.
  2. Lineage and recovery is not quite covered in the paper in implementation or performance analysis. I’m not sure about how fast could the recovery be and what percentage of lost data can I recover from the remaining RDD (i.e. how much data lost is accepted). And also the paper doesn’t cover some other solutions like distributing and replicating the in-memory data, which seems more straightforward to me (replicated memory could be recovered without computation);
  3. Another question for this insufficient performance analysis is that, what if the data can’t fit into each machine’s memory in Spark. In this way the machines will have to use disk and here comes the inevitable r/w operations to the disk. Here is the link to the performance comparison between Hadoop and disk-only Spark: Databricks demolishes big data benchmark to prove Spark is fast on disk, too. I haven’t read it through but I guess Spark is faster because it doesn’t have to copy the intermediate results around, in this case, from mapper to reducer due to the optimal task assignment.

 

Paper Notes:

  1. MapReduce are not good with:
    • iterative jobs. Machine learning algorithms applies a function on the same set of data multiple times. MapReduce reloads the  data from the disk repeatedly and hence not suitable for iterative jobs;
    • Interactive analytics. MapReduce queries are delayed because of the reload.
  2. Resilient distributed dataset (RDD):
    • Achieve fault tolerance through a notion of lineage: could be rebuild from other part of the dataset;
    • The first system that allows an efficient, general-purpose programming language to be used interactively to process large datasets on a cluster;
    • Ways to generate RDD:
      • From HDFS file;
      • Parallelize a Scala collection into slices;
      • Flat-map from the existing RDD;
      • By changing the persistence of an existing RDD.
  3. Parallel operations:
    • Reduce, collect and foreach;

Paper Review: ZooKeeper: Wait-free Coordination for Internet-scale Systems

Paper Review:

Summary:

ZooKeeper is a wait-free coordination used to construct lock, consensus and group membership for the clients. It uses hierarchical nodes with the same tree structure as UNIX file system to store the data and sequential number for recording. It guarantees FIFO execution for every single clients and it’s heavily optimized for faster reads.

Strong points:

  1. It’s just nice to have both synchronous and asynchronous versions of the APIs. Developers can choose whatever they like to fulfill their requirement on timing or consistency. Also we have a fast read and a slow read (sync + read), which makes thing flexible for developers and the applications.
  2. Locks are powerful primitives for distributed system, which, on the other hand, is hard to relax and fits into more applications. ZooKeeper has no lock but it’s easy to develop lock services upon ZooKeeper. Other different coordination models are also available in ZooKeeper.
  3. During the leader election, ZooKeeper clients don’t have to poll the locks and try to win the leader. Instead, it has a mechanism to keep clients in order and the previous leader failure only wakes the next client without interrupting others.

Weak points:

  1. Since ZooKeeper is an open source project aimed to serve variety of applications, it’s better to implement more options for the developers to choose from. For example, the watch mechanism is basically the same as Chubby with one-time trigger. If they could give some extra options, like when a client creates a znode, it could register for multiple triggers or notifications along with the updated data, that would possibly fit into more situations.
  2. Even with the guarantees on the ordering, the watch is an asynchronous according to (https://zookeeper.apache.org/doc/r3.4.5/zookeeperProgrammers.html#ch_zkWatches). This means the read could be stale not only because of the inconsistent local read from a non-leader server after a write, but also because of the late watch notification. There’s no good solution to this issue because any stronger consistency model could slow down the reads dramatically. However, since we already have sequence number for each znode, why don’t we bound the number with a timestamp or some version number so that the clients would at least have a better understanding of freshness of the data
  3. Looks like the simple locks without herd effect is done with a strict ordering of sequential znodes. A client will become a leader by being notified about the previous leader with the previous sequential number dies. Since the sequential number is increasing over the requests, does this mean the leader ordering is determined by the time they registered to the ZooKeeper? Sounds like this mechanism is not quite desirable in many cases. For example, if the developer wants the client with better hardware to become the leader, then it will be hard to implement because the znode sequential numbers don’t have anything to do with the hardware. (wait…. can I store the hardware scores into the data in each znode and maintain a ordered queue in the group znode? This way the developers can configure the leader election as they want.)
  4. There’s no mentioning on the issue of load balancing. Although all the ZooKeeper servers keep the exact same replicas, clients have access to any of the servers and can fire queries as they want. This could potentially lead to unbalanced request handling among all the servers and possibly some malicious attack as well. Again I couldn’t think of a good way to tackle this issue as a centralized load-balancing could not only slow down the client accesses but also add complexity to the system. Maybe we can implement the ZooKeeper servers in a way that if they detect a lot of TCP losses/timeouts during a time, it redirects some of its distanced clients to other servers

 

Paper Notes:

  1. ZooKeeper uses non-block primitives because with the blocking ones, like locks, slow and faulty clients can limit the overall performance of the system. The failure detection would increase the complexity. It’s a lock-free version of Chubby;
  2. The requests are handled with pipelined-fashion to support thousands of operation execution; the FIFO ordering of client operations enables asynchronous operations (multiple consecutive operations without reply);
  3. Zab for leader-based atomic broadcast protocol;
  4. Reads throughput is optimized by 2) servers handle reads locally 2) no strict ordering for reads and 3) client-side caching with a watch mechanism;
  5. Tree of directories and data (hierarchical name spaces) because it’s easier and familiar for users in a standard UNIX file system;
  6. Regular and ephemeral znodes
  7. Increasing sequential flag append to znode
  8. Watches to allow clients to receive timely notifications of changes without polling
    1. the clients send reads with watch flag set and the server will return the data with one-time promise to notify the clients about any change;
    2. watch only indicate the change without sending the actual data;
  9. Clients and ZooKeeper interact by using sessions with timeout. It ends when
    1. inactive clients over time
    2. close session by clients
    3. faulty clients detected by ZooKeeper
  10. Synchronous and asynchronous APIs
    1. no handles but use direct access to the path instead
    2. update with version number to enable version checking
  11. Two features:
    1. linearizability (multiple outstanding operations but ordered with guarantee)
    2. FIFO execution of queries
  12. For a leader re-electing scenario:
    1. lock service will prevent other processes using the old configurations but doesn’t offer a good solution when the new leader partially changed the configurations of all and then failed
    2. in ZooKeeper, the new leader is going to delete the ready znode, make configuration changes and then set the ready znode back on. The configuration change is send asynchronously. New configuration will not be used;
    3. ordering is the key preventing any client read anything before it’s ready;
    4. sync causes a server to apply all pending write requests before processing the read without having any extra write
  13. Write with atomic broadcast and read from any server for better availability
  14. Write-ahead log in disk
  15. Quorum-based algorithm with pipeline optimization
  16. Periodically snapshot for faster recovery. It’s inconsistent because of it’s a non-blocking depth first scan of the tree of each znode data

 

Paper Review: The Chubby lock services for loosely-coupled distributed systems

Paper Review:

Summary:

Chubby is a lock service provided to a large number of clients for access control and coordination. It’s made of typically 5 machines and one of them is master. The write operations require ACK from a quorum of machines and reads only need to access the current master.

Strong points:

  1. Chubby exports a file system with APIs similar to UNIX filesystem. Tree of files are stored in directories which makes the access structured and easier for all users. There are some differences though, like the file moving, directory access control and last-access times;
  2. The file handles with sequence number prevent checking at each read/write access and easy to see which master assigned the handles without referring to other replicas. This mechanism is good for small file with frequent accesses because checks are done during the open of handles instead of actual access. Each handle could be reused unless the master assigning the handle fails.
  3. Since the read operations greatly outnumber writes, data is cached in the client side to reduce the traffic of polling Chubby masters for every single read. The master will notify all the clients that possibly cached the data when the data is modified and invalidate the caches. I can imagine it saves a huge number of messages during the run. However, a timed-cached (TTL cache) combined with this mechanism might be better because as the system runs, clients with possible will increase over time. For every read from the clients, the master is going to return the data with a validation time limit, which implies that cache should not be used after this period of time. On the other hand, the master will still invalidate all the clients whose cache is still in the time limit upon modification. This way we can avoid the case when the master has to send too many invalidation messages after several consecutive writes and lead to bursty network usage. This is different from the KeepAlives jeopardy because the TTL is for every cached data while jeopardy is for every client

Weak points:

  1. This is trivial but still not thoroughly explained. The paper mentioned that the clients need so multicast request to all the Chubby replicas stored in DNS in order to locate the master, which means that all the Chubby replicas are managed by another service that keep track of the states and placement of Chubby replicas. The read operation only goes to Chubby master, but if the master is freshly selected, then either 1) the read service will be blocked for a while until the new master is up-to-date by checking with all other replicas or 2) the read returns a insufficiently updated value stored in the new master;
  2. The sequence numbers are assigned to any operations that interact with locks. And if any lock is lost in a abnormal circumstances like failure, the other clients have to wait for a period (1 minute). However, why don’t master actively sends messages to the lock-holding clients to check for liveness instead of waiting. At least having the option of proactively checking the clients could benefit systems with higher requirement on availability. (This could definitely use the TrueTime API to make sure of the ordering of events from different clients and save a lot of waiting/messages);
  3. When I was first reading the paper, I felt that the communication between Chubby clients and cells overlaps with TCP a lot in terms of their functionalities: both maintains a session with sequence number; both requires message exchange to keep alive otherwise the session expires. So I assumed that TCP is a nature for Chubby with all the strict ordering, congestion control and guarantees on the delivery. But the paper claims the TCP back-off policy lead to a lot of KeepAlives timeouts because the transport layer protocol is unaware of the timing of upper layer processes. However, the main reason we are triggering the back-off policy is the bursty message pattern, which is not discussed in the paper. If we can take advantage of the transport layer session to reduce the number of messages used for KeepAlives, then then back-off won’t be a problem in most scenarios. An alternative is to build a new reliable congestion-control protocol upon UDP with optimizations and priority for time-critical messages.
  4. Chubby master is definitely a bottleneck of the overall performance because all the operations, read or write, have to go through the master. On the other hand, Chubby slaves’ computation power and bandwidth is under-utilized. I guess it could be viewed as a down-side of the high consistency of Chubby, which is maintained by a single master decision making mechanism.

 

Paper Notes:

  1. Reliability, availability and scalability are considered the most important while the throughput and storage capacity are considered secondary.
  2. Lock services, although cannot offer a standard framework for programmers like a library embodying Paxos, maintains high availability as the system scales up; it’s suitable for clients to store and fetch large quantities of data as it uses consistent client caching rather than TTL time-based caching; quorum based consensus reduce the number of servers needed for client to make progress
  3. Only lock-interaction operations will be assigned with a sequence to ensure

Paper Review: Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center

Paper Review:

Summary:

Mesos is a thin layer application between different frameworks and resources, dynamically assigning tasks to a cluster of machines. Because of the different tasks and constraints of frameworks, the task assignment should be made with a lot of considerations. The overall system seems quite simple and elegant, which makes the results even better.

Strong points:

  1. The system takes a lot of things into consideration during the assignment: the constraints of frameworks, the data location, the time of the tasks, the priority and the fairness. But yet all these considerations could be expressed in simple values and dynamically changed during the run;
  2. Mesos leaves a lot of room for the frameworks, which we can see from it’s implementation with Hadoop. It takes advantages of the fine-grained task assigning of Hadoop and reduce the overhead when Mesos and Hadoop work together. This is something that static partitioning tool could have a hard time to achieve;
  3. Mesos master is implemented with minimum communication between the upper and lower layers. Also the state of master is replicated in other standby masters and managed by ZooKeeper. The data in the memory of Mesos master could be constructed from the frameworks and slaves, which makes the system more resilient.

Weak point:

  1. While the tasks are isolated with Linux Container and separated cores and memory chips, there are still shared system resources like the network bandwidth, which is not taken into consideration during the run as well;
  2. The actual task assignment is still performed by a centralized machine and the bandwidth of other standby masters are wasted. The Mesos master might face short-time network exhaustion during run caused by similar tasks finished all at once. This might explain why the Hadoop tasks have spiky share of cluster performance;
  3. There isn’t much in the paper talking about the resource offer preference is calculated from the data location and other factors (it simply mentioned how to select the suitable frameworks with lottery scheduling with given preference si). I guess the preference could be useful if we can take advantage of the data location by assigning the tasks to nodes which already have the necessary data to save the transfer time. But I guess this is something that’s hard to keep track of and it requires the the frameworks to cooperate, which makes Mesos less adaptive after all.

 

Paper Notes:

  1. Sharing a cluster is hard. There are two common solutions: statically partition the cluster and run one framework per partition or allocate a set of VMs to each framework. However, there is no way to perform fine-grained sharing across the frameworks. Mesos is a thin layer that lies between frameworks and cluster resources.
  2. A centralized scheduler cannot achieve the complexity and scalability. It cannot adapt to new frameworks. Also it overlaps with the frameworks’ scheduler.
  3. Mesos will offer frameworks with resources based on an organizational policy such as fair sharing and the frameworks decide which ones to accept and the tasks to run on them.
  4. Mesos has minimal interfaces and pushes the control to the frameworks to 1) leave more room for various frameworks and 2) keeps Mesos simple, scalable and low in system requirements
  5. Frameworks resources could be revoked due to a buggy job or a greedy framework; however, frameworks can be assigned with resources with guarantee so the tasks won’t be killed as long as the framework is below the guaranteed allocation.
  6. Linux containers are used for task isolation
  7. ZooKeeper for leader election from all the standby masters. Mesos master’s memory could be reconstructed from the frameworks and resource slaves
  8. Mandatory and preferred resources; lottery scheduling; resource reservation for short tasks

Paper Review: Mesa: Geo-Replicated, Near Real-Time, Scalable Data Warehousing

Paper Review:

Mesa is another Google’s data store layered upon Colossus and Bigtable, primarily designed for ads campaign data storage. The versioned kv store with aggregation is scalable and replicated globally. The metadata is consistently updated with Paxos and the data is batched and transferred every few minutes.

Strong points:

  1. In the chapter “experiences and lessons learned”, layered design was mentioned as a key design feature of Google products. Mesa is nicely layered and decoupled in both horizontal and vertical directions. It was build up on Colossus and Bigtable so we can see there isn’t too much about the read/write topics, which are covered in Bigtable. Inside the architecture, it has workers/servers, controllers and global services. The data maintenance/update and query is also decoupled. While there might be some overhead, but it enables clear designing, easy problem identification and detailed performance analysis.
  2. The resume key with streaming transmission is interesting. This way the failed/disconnected query server won’t waste clients’ time since the read operations could be continued on another server instead of firing the query once more.
  3. Parallelizing the worker operation using MapReduce  and linked schema change are good ideas. MapReduce could save days of computation time and the schema change, while not applicable in every scenario and add some computation on the query path, saves 50% of disk space than the traditional simple schema change.  This could be a life saver since Mesa requires a lot of storage space in the first place.

Weak points:

  1. The  controllers assign different tasks to different types of workers. The failure of workers will eventually be captured by the timer maintained for each task in the controllers. However, the timers could add a lot of work to the controllers if there are many tasks running simultaneously. On the other hand, work failures could result in long latency for the corresponding queries since before the timer expires and the tasks get re-assigned. This kind of latency could be resolved if the controllers/workers exchange heartbeat messages so that the worker failures could be detected earlier before the timer runs out.
  2. Query servers are assigned with their own responsible range of data to take advantage of prefetching and caching. However, this assignment could be a little inflexible if there are a lot of queries on the similar data. This way we will only have a small amount of query servers fetching data from Colossus for a long time while the rest of servers are doing nothing. This will be a performance bottleneck and waste of resource if we assume that the read operation in Colossus is lock-free and scales well. Also since we are on the chapter of querying, the global locator service is really vague in the paper. I assume that it is a stateless process running the controllers with the data replicated in Bigtable as well.
  3. The replication mechanism does not make too much sense to me. The Paxos will only make sure that metadata is replicated consistently by majority of Mesa instance and the data replication will be left behind without any strong guarantee. So for each Mesa instance (datacenter), the metadata could be out-dated if it failed during the Paxos; and even if it works fine during Paxos, there’s not much guarantee on the consistency of the actual data.
  4. There are two methods of data validation, the online one by re-aggregating the rows and check for computation errors and the off-line one, which is a light-weight process spanning the recent committed data. There are two problems with the corruption recovery: 1) the online checking will be perform in every update and query, which could result in unnecessary checking and increased latency and load; 2) the data is replicated asynchronously. If the freshly updated copy gets corrupted, there’s no other replicas that could help with the recovery

 

 

Notes:

Data is partitioned and replicated horizontally to achieve scalability and availability. (does this imply NoSQL?)

Multi-version key-value data store with Paxos for consistency

Leverage Bigtable and the Paxos tech underlying Spanner for metadata storage and maintenance.

Asynchronously replication for application data; synchronous replication for metadata.

Dealing with corruption for software and hardware

A query  to Mesa consists of a version number and a predicate P on the key space. And the response contains the corresponding P and a version number between 0 and n.

Strict ordering of updates can ensure atomicity and fraud detection of negative facts.

Mesa pre-aggregates certain versioned data (between v1 and v2 inclusively) and call it delta. Base compaction is the process of merging some of the version in to [0, B] and versions before base are no longer accessible. This kind of idea is commonly used (like append-only GFS) but it could be problematic if old-version data is still useful. Older data are stored in a versioned and expensive way in terms of the process of aggregation which means that Mesa is necessary for cleaning things up but how many versions are we going to keep is gonna be hard to determine for various kinds of data. Since Mesa is primary used for ads campaign data collection, which has uniform and specific requirements in terms of data storage and aggregation, this issue doesn’t seem to matter too much.

Each table has one or more indexes and each table index has its own copy of data that is sorted accordingly for fast search. And there is an index file containing short keys for row blocks for faster localizing the row block.

Metadata is stored in Bigtable and in the memory of controller. Each datacenter has one controller (I assume) and it does not directly interact with the tables. There are different tasks for data maintenance like updates, compaction, schema change and checksum. Note that the last two require coordination of different Mesa instances (datacenters). A set of workers of different types polling the controller for task of their own types. Each task will be assigned with a timer so failed workers won’t affect the system. The controller is also sharded and stateless with all the metadata consistently stored in Bigtable so Mesa is resilient to controller failures. A garbage collector runs separately and continuously, which reads from the metadata in Bigtable and delete the unwanted files in Colossus.

Mesa handles queries with different requirement with different labels and priorities. A set of query servers could access any table in principle but Mesa will direct  queries with the same range of data to the a subset of the query servers to take advantage of the pre-fetching and caching. Global locator service is used to coordinate the query servers.

The operations are batched in Mesa instances once every few minutes. A stateless global committer will assign each updates batch a version number and use Paxos to reinforce the data consistency. Controllers in every Mesa instance will be responsible for the updates batches and acknowledging the committer. There is no locking. Note that in this case metadata is replicated synchronously with Paxos while the data is incorporated asynchronously by various Mesa instances.

New Mesa instances will bootstrap with p2p load mechanism.