Paper Review: Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks

Paper Review:

Summary:

Dryad is Microsoft version of distributed/parallel computing model. It aims to free the developers from the rigid settings and process of MapReduce and design your own concurrent computation model. It doesn’t care about the data flow connection method or the computation process as well (as long as it’s not cycle) so that developers can pretty much configure Dryad the way they want.

Strong points:

  1. Dryad strikes me as a generic version of MapReduce at first. It has all the nodes and looks like map/reduce model but the process is much more configurable. As long as the process forms a acyclic graph, Dryad can go with it. Unlike MapReduce, which is only good for certain kind of massive parallel processing model, Dryad seems able to fit into anything.
  2. One of the biggest constraint of MapReduce is that the developer doesn’t have the freedom to choose how intermediate data is transferred during the process. And Spark beats MapReduce partial due to it transfer data via memory. In Dryad, developers can choose different ways to make the transfer, like TCP, memory or disk. Of course the memory transfer would be the fast way but it’s always nice to have some other options around in case the memory is not enough or some other reason.
  3. The computation is modeled with acyclic graphs. Dryad offers ways to monitors the vertices as well as edges (state manager for each vertex and connection manager, which is not in the paper). It could make dynamic changes to the computation graph according to the monitored results to handle special cases like slow machines.

Weak points:

  1. While Dryad aims to “make it easier for developers to write efficient parallel and distributed applications”, it doesn’t hide all the execution details from developers. Instead it’s doing the exact opposite by exposing more internal structures and leave the decision to developers. The computation, connection, input/output and the monitoring looks intimidating. And the insufficient language support (from what I’ve seen so far it uses C++ and query languages only) makes things even harder.
  2. The execution stops as the job manager fails is a definitely a disadvantage in the system. It could be fixed with 1) distributed coordination with Paxos and some other consensus (slow but effective) 2) shadow master (faster recovery). Either way it’s not hard to implement, which makes me wonder why is this still an issue in Dryad.
  3. There are two reasons why the graph should be acyclic: 1) scheduling is easy because there are no deadlocks and the process is sequential and 2) without cycles, recovering is straightforward. However, there might be some cases where the developers might need to run one method on a piece of data for multiple times in order to meet the requirement. And this is not allowed in current Dryad system.
Advertisement

Paper Review: Dremel: Interactive Analysis of Web-Scale Datasets

Paper Review:

Summary:

Dremel is an interactive ad hoc query system designed to handle real-time web-scale queries with low latency. It uses tree-structured servers and columnar storage with replication to achieve great performance over MapReduce.

Strong points:

  1. Columnar storage is better for faster column retrieval and feature extraction. It exposes data in a more feature-oriented way and make programs easy to process data column by column with better data locality.
  2. Tree structure of server is great. There are more than two layers of servers, which is pretty much all we’ve seen so far (one for metadata and one for data). This structure reminds me of the biggest query system on earth: DNS. With a intermediate layer, the system benefits from one more layers of cache and way better scalabilty. Also it has the potential to yet expand to a bigger scale with another intermediate layer.
  3. The performance is great comparing to similar systems like Hive, which is not real time. It delivers all the requirements as a real-time interactive ad hoc querying system. However, it seems to me that Apache Drill could achieve pretty much the same thing with flexibility on data types.

Weak points:

  1. Relatively poor performance when few columns are read or dealing with unstructured data. In that case we cannot take advantage of the columnar storage. But I guess they are pretty sure about the query types that Dremel is going to handle so it’s fine. Dremel is design to deal with the Ad Hoc queries of structured data ready to be analysed.
  2. I don’t think MapReduce and Dremel make a valid comparison. Of course users can still use MapReduce do perform Dremel’s query and analysis job but that’s not what MapReduce is designed for, which is distributed batch processing. Those two are more complimentary rather than comparable to me, and that’s exactly what the authors suggested in the observation in the paper: “MR and query processing can be used in a complementary fashion; one layer’s output can feed another’s input”.
  3. There’s not you can do to modify the data (update or creation) expect append, which limits what users could perform with Dremel. I guess implement update method is not in their development priority since the data analysis rarely used modification but still it’s nice to have a flexible way to change the data.

Paper Review: Discretized Streams: Fault-Tolerant Streaming Computation at Scale

Paper Review:

Summary:

Spark Streaming is a batch process system based on Spark, trying to simulate streaming by divide real time events into micro-batches with time intervals. This causes inevitable high latency but brings great throughput at the same time. It handles fault and stragglers with checkpoints and parallelized re-computation.

Strong points:

  1. The best part of Spark Stream, which is also indicated in the title of the paper, is the fault tolerance and recovery strategy. The system doesn’t take the path of full replication, which requires 2 times of the storage space and coordination. It maintains checkpoints for data sets but doesn’t serially replay the process from the checkpoints during recovery. Spark Stream recovery is a parallel process that distributes the work of the failed node to all others, and brings the data back in seconds;
  2. Consistency is maintained naturally with discretized streams. Unlike Storm, the data is processed “exactly once” because the micro-batch is synchronized with time interval barriers;
  3. Great throughput comparing to any streaming system. Although it’s not fair to comparing micro-batch processing to stream, but when it comes to real world application, micro-batch processing could fit into many scenarios that doesn’t demand micro-seconds latency;

Weak points:

  1. Latency is inevitable because of the time interval that Spark Streaming batches the events. The latency won’t be much lower if we have fewer events in the batch, or more machines in the cluster. This is really different from Storm, in which the latency is directly associated with computation power and incoming events;
  2. Streaming and batch processing are fundamentally different, which makes the comparison between Storm and Spark Streaming invalid in many ways. Some guys from Storm project were really mad about the claim “Spark Streaming is X times faster than Storm”. I would love to see some comparison between Storm Trident and Spark Streaming because micro-batch to micro-batch makes much more sense.
  3. They mentioned way too much on how they could handle stragglers while other system can’t in the introduction and overall structure, but it turns out to be a simple time threshold calculated from the median running time and a node is marked as slow if it takes longer than that. So I’m guess if a node is marked as slow, it won’t be part of the workers in the future since it might bring latency. And it’s job will be  distributed by other workers. But what if the slow is caused by network communication loss, OS noise or underlying HDFS? I’m thinking about giving those slow nodes another chances periodically. But instead of assigning them with real tasks, we give them duplicated ones with special “no pass” marks. So the slow nodes can run the some processes together with normal nodes without returning duplicated results back. There won’t be any bad influence anyway. More sophisticatedly, they will be given the chances in the next [1st, 2nd, 4th, 8th, 16th … ] batches after marked slow.

Paper Review: Storm @ Twitter

Paper Review:

Summary:

Storm is a real-time data process system aimed to handle large-scale streaming data. It offers relatively strong fault-tolerant feature as well as two processing semantic guarantees. It’s now part of the Apache open source project that works well in the Hadoop ecosystem (runs on Mesos and uses ZooKeeper for coordination).

Strong points:

  1. Parallelism is enforced in may layers in the system. There are many worker nodes with one or more worker processes on each of them. For every work process, it runs JVM, carrying one or more executors. And for every executor there could be one or more tasks. So in sum, every machine is divided into multiple services ready for streaming jobs. Although Storm doesn’t optimize the parallelism to the level of machines, this kind of model could still be useful for handling large number of small, streaming process jobs.
  2. This paper doesn’t talk about fault tolerant a lot but it’s easy to see the reason behind. Nimbus and the Supervisor daemons are all stateless, which makes the backup easy to handle. And the snapshot of their memory state is kept locally or in the ZooKeeper. However, the Nimbus failure would block the whole system from topology submissions.
  3. As a real-time streaming process system, Storm has really good average latency. The latency is directly associated with the number of machines during the experiment, which means that the workload is distributed among all the workers nicely. The throughput measurements indicate good scalability as well.

Weak points:

  1. Currently, the programmer has to specify the number of instances for each spout and bolt. I’m not sure if this has already been fixed but if not, the developers could go through a lot of trail and error before they find out the right number for their data stream. I guess Pregel has the same thing with the vertices partitioning. The users have to pre-configure those things before the run and the configuration will be inflexible and inefficient considering what they are doing requires high-level parallelism.
  2. The processing semantics guarantee is weak comparing to Trident/Spark. It’s either at-least-once or at-most-once. Storm doesn’t offer exactly-once guarantee (and i guess that’s one of the reason it has better performance?). Although they have some tricks like the XOR to save memory space and make things look better, lack of stronger guarantee could a disadvantage for some applications.
  3. I guess Storm works well with simple tasks that could fit into bolt but for sophisticated streaming process, it will require a large number of bolt to divide the task into different simple executions and run in parallel, and the overhead for communication and coordination will grow as well. They didn’t compare different streaming process task types in the evaluation so there is no way to find out in the paper, besides the manually configuration for bolts and spouts makes the comparison harder.

Paper Notes:

  1. There are two kinds of vertices. Spouts pull data from the queue like Kafka and bolts process the incoming tuples and pass them down. There could be cycles in the process.
  2. A single master for coordination
  3. Each worker node runs one or more worker processes. Each work process runs a JVM, which runs a or more executors. Each executor is made of one or more tasks. Talking about parallelism.
  4. Summingbird is used for Storm topology generation.
  5. Nimbus and the Supervisors are fail-fast and stateless and all their states are kept in Zookeeper/local disk

Paper Review: Pregel: A System for Large-Scale Graph Processing

Paper Review:

Summary:

Graph problems are naturally hard to be parallelized and distributed because of all the dependency of the vertices and edges. Pregel offers a intuitive way of handling graph processing by using N machines to handle M vertices separately (M >> N). There is a single master in this system, used for coordination, aggregation and analysis.

I tried a similar approach last semester by using threads to simulate nodes in a clustering problem. I believe it’s a good model for demonstration but expensive in terms of communication and coordination overhead.

Strong points:

  1. The way they divide the work is straightforward and intuitive. All the vertices are acting independently with the message passing interface and coordination. Computation is also divided into multiple steps bounded with barriers, which is a model widely used for parallel programming;
  2. Combiners could be useful to reduce the messages that needs to be buffered and processed, hence decrease the running time for each superstep. It works good with shortest path computing;
  3. It’s flexible in many ways. There are different input methods; also the user-defined handlers could be used when the target vertex doesn’t exist or for topology mutation. These features make Pregel suitable for more graph-related problems;

Weak points:

  1. Load balancing and vertices partitioning in topology are hard to solve. Pregel doesn’t care about the load balancing and simply encapsulate steps between barriers regardless of the execution time for each machine. It’s very likely that some vertices have more computation to do because they have more edges or perform as cluster head in the graph. In that case, the threads running for these vertices are going to take longer in each step. The default hashing vertices partitioning might group nodes with heavy computation jobs together. So some machines are going to finish much faster than the others and waste their time by waiting for the barriers. In Pregel, there is no load-balancing solution before or during the run.
  2. Since the whole system is considered synchronous in terms of each step, what’s the actual use of the outdated checkpoints? Re-computation for recovery doesn’t make too much sense because the logged messages from healthy partitions don’t necessarily contain enough information to fully cover the lost partition. For example, if there is an island of nodes in the graph, which sends and receives no messages to/from other part of the graph before the crash, then how could we re-compute the states of the island nodes from the messages of other vertices after reloading the checkpoints. And as we are on the topic of fault tolerance, I’m not sure how does Pregel backup/recover the master.
  3. The paper doesn’t specify the guarantees for messages. It seems to me that slave machines interacting, reporting to the master and getting ACKed by the master could take awfully long if loss and congestion happens. And if one machine gets delayed, the whole barrier is pushed late as well. I’m wondering about how long would a single machine take to finish the same job in the evaluation and I’m almost certain that it takes shorter than we expected. The system doesn’t scale well partially because of the communication overhead.

 

 

Paper Notes:

  1. During every superstep, a user-defined function is executed for each vertex;
  2. Pregel programs are inherently lock-free and no race condition;
  3. Vertices are identifiable with a string and they are assigned with a value to represent the state; edges have source and destination with a user defined value as well;
  4. A vertex can vote to halt when it has done all of its work. However, it could still be revoked by receiving messages. The computation is over when all the vertices are inactive;
  5. Aggregators can be used to global reduction and coordination;
  6. Topology mutations processed in order (removal then addition) and conflicts are resolved with user defined handles.
  7. A single machine is chosen as the master, which is responsible for the coordination of the group;
  8. Periodic ping message is used for failure detection; fault tolerance is achieved through checkpointing

Paper Review: ZooKeeper: Wait-free Coordination for Internet-scale Systems

Paper Review:

Summary:

ZooKeeper is a wait-free coordination used to construct lock, consensus and group membership for the clients. It uses hierarchical nodes with the same tree structure as UNIX file system to store the data and sequential number for recording. It guarantees FIFO execution for every single clients and it’s heavily optimized for faster reads.

Strong points:

  1. It’s just nice to have both synchronous and asynchronous versions of the APIs. Developers can choose whatever they like to fulfill their requirement on timing or consistency. Also we have a fast read and a slow read (sync + read), which makes thing flexible for developers and the applications.
  2. Locks are powerful primitives for distributed system, which, on the other hand, is hard to relax and fits into more applications. ZooKeeper has no lock but it’s easy to develop lock services upon ZooKeeper. Other different coordination models are also available in ZooKeeper.
  3. During the leader election, ZooKeeper clients don’t have to poll the locks and try to win the leader. Instead, it has a mechanism to keep clients in order and the previous leader failure only wakes the next client without interrupting others.

Weak points:

  1. Since ZooKeeper is an open source project aimed to serve variety of applications, it’s better to implement more options for the developers to choose from. For example, the watch mechanism is basically the same as Chubby with one-time trigger. If they could give some extra options, like when a client creates a znode, it could register for multiple triggers or notifications along with the updated data, that would possibly fit into more situations.
  2. Even with the guarantees on the ordering, the watch is an asynchronous according to (https://zookeeper.apache.org/doc/r3.4.5/zookeeperProgrammers.html#ch_zkWatches). This means the read could be stale not only because of the inconsistent local read from a non-leader server after a write, but also because of the late watch notification. There’s no good solution to this issue because any stronger consistency model could slow down the reads dramatically. However, since we already have sequence number for each znode, why don’t we bound the number with a timestamp or some version number so that the clients would at least have a better understanding of freshness of the data
  3. Looks like the simple locks without herd effect is done with a strict ordering of sequential znodes. A client will become a leader by being notified about the previous leader with the previous sequential number dies. Since the sequential number is increasing over the requests, does this mean the leader ordering is determined by the time they registered to the ZooKeeper? Sounds like this mechanism is not quite desirable in many cases. For example, if the developer wants the client with better hardware to become the leader, then it will be hard to implement because the znode sequential numbers don’t have anything to do with the hardware. (wait…. can I store the hardware scores into the data in each znode and maintain a ordered queue in the group znode? This way the developers can configure the leader election as they want.)
  4. There’s no mentioning on the issue of load balancing. Although all the ZooKeeper servers keep the exact same replicas, clients have access to any of the servers and can fire queries as they want. This could potentially lead to unbalanced request handling among all the servers and possibly some malicious attack as well. Again I couldn’t think of a good way to tackle this issue as a centralized load-balancing could not only slow down the client accesses but also add complexity to the system. Maybe we can implement the ZooKeeper servers in a way that if they detect a lot of TCP losses/timeouts during a time, it redirects some of its distanced clients to other servers

 

Paper Notes:

  1. ZooKeeper uses non-block primitives because with the blocking ones, like locks, slow and faulty clients can limit the overall performance of the system. The failure detection would increase the complexity. It’s a lock-free version of Chubby;
  2. The requests are handled with pipelined-fashion to support thousands of operation execution; the FIFO ordering of client operations enables asynchronous operations (multiple consecutive operations without reply);
  3. Zab for leader-based atomic broadcast protocol;
  4. Reads throughput is optimized by 2) servers handle reads locally 2) no strict ordering for reads and 3) client-side caching with a watch mechanism;
  5. Tree of directories and data (hierarchical name spaces) because it’s easier and familiar for users in a standard UNIX file system;
  6. Regular and ephemeral znodes
  7. Increasing sequential flag append to znode
  8. Watches to allow clients to receive timely notifications of changes without polling
    1. the clients send reads with watch flag set and the server will return the data with one-time promise to notify the clients about any change;
    2. watch only indicate the change without sending the actual data;
  9. Clients and ZooKeeper interact by using sessions with timeout. It ends when
    1. inactive clients over time
    2. close session by clients
    3. faulty clients detected by ZooKeeper
  10. Synchronous and asynchronous APIs
    1. no handles but use direct access to the path instead
    2. update with version number to enable version checking
  11. Two features:
    1. linearizability (multiple outstanding operations but ordered with guarantee)
    2. FIFO execution of queries
  12. For a leader re-electing scenario:
    1. lock service will prevent other processes using the old configurations but doesn’t offer a good solution when the new leader partially changed the configurations of all and then failed
    2. in ZooKeeper, the new leader is going to delete the ready znode, make configuration changes and then set the ready znode back on. The configuration change is send asynchronously. New configuration will not be used;
    3. ordering is the key preventing any client read anything before it’s ready;
    4. sync causes a server to apply all pending write requests before processing the read without having any extra write
  13. Write with atomic broadcast and read from any server for better availability
  14. Write-ahead log in disk
  15. Quorum-based algorithm with pipeline optimization
  16. Periodically snapshot for faster recovery. It’s inconsistent because of it’s a non-blocking depth first scan of the tree of each znode data

 

Paper Review: The Chubby lock services for loosely-coupled distributed systems

Paper Review:

Summary:

Chubby is a lock service provided to a large number of clients for access control and coordination. It’s made of typically 5 machines and one of them is master. The write operations require ACK from a quorum of machines and reads only need to access the current master.

Strong points:

  1. Chubby exports a file system with APIs similar to UNIX filesystem. Tree of files are stored in directories which makes the access structured and easier for all users. There are some differences though, like the file moving, directory access control and last-access times;
  2. The file handles with sequence number prevent checking at each read/write access and easy to see which master assigned the handles without referring to other replicas. This mechanism is good for small file with frequent accesses because checks are done during the open of handles instead of actual access. Each handle could be reused unless the master assigning the handle fails.
  3. Since the read operations greatly outnumber writes, data is cached in the client side to reduce the traffic of polling Chubby masters for every single read. The master will notify all the clients that possibly cached the data when the data is modified and invalidate the caches. I can imagine it saves a huge number of messages during the run. However, a timed-cached (TTL cache) combined with this mechanism might be better because as the system runs, clients with possible will increase over time. For every read from the clients, the master is going to return the data with a validation time limit, which implies that cache should not be used after this period of time. On the other hand, the master will still invalidate all the clients whose cache is still in the time limit upon modification. This way we can avoid the case when the master has to send too many invalidation messages after several consecutive writes and lead to bursty network usage. This is different from the KeepAlives jeopardy because the TTL is for every cached data while jeopardy is for every client

Weak points:

  1. This is trivial but still not thoroughly explained. The paper mentioned that the clients need so multicast request to all the Chubby replicas stored in DNS in order to locate the master, which means that all the Chubby replicas are managed by another service that keep track of the states and placement of Chubby replicas. The read operation only goes to Chubby master, but if the master is freshly selected, then either 1) the read service will be blocked for a while until the new master is up-to-date by checking with all other replicas or 2) the read returns a insufficiently updated value stored in the new master;
  2. The sequence numbers are assigned to any operations that interact with locks. And if any lock is lost in a abnormal circumstances like failure, the other clients have to wait for a period (1 minute). However, why don’t master actively sends messages to the lock-holding clients to check for liveness instead of waiting. At least having the option of proactively checking the clients could benefit systems with higher requirement on availability. (This could definitely use the TrueTime API to make sure of the ordering of events from different clients and save a lot of waiting/messages);
  3. When I was first reading the paper, I felt that the communication between Chubby clients and cells overlaps with TCP a lot in terms of their functionalities: both maintains a session with sequence number; both requires message exchange to keep alive otherwise the session expires. So I assumed that TCP is a nature for Chubby with all the strict ordering, congestion control and guarantees on the delivery. But the paper claims the TCP back-off policy lead to a lot of KeepAlives timeouts because the transport layer protocol is unaware of the timing of upper layer processes. However, the main reason we are triggering the back-off policy is the bursty message pattern, which is not discussed in the paper. If we can take advantage of the transport layer session to reduce the number of messages used for KeepAlives, then then back-off won’t be a problem in most scenarios. An alternative is to build a new reliable congestion-control protocol upon UDP with optimizations and priority for time-critical messages.
  4. Chubby master is definitely a bottleneck of the overall performance because all the operations, read or write, have to go through the master. On the other hand, Chubby slaves’ computation power and bandwidth is under-utilized. I guess it could be viewed as a down-side of the high consistency of Chubby, which is maintained by a single master decision making mechanism.

 

Paper Notes:

  1. Reliability, availability and scalability are considered the most important while the throughput and storage capacity are considered secondary.
  2. Lock services, although cannot offer a standard framework for programmers like a library embodying Paxos, maintains high availability as the system scales up; it’s suitable for clients to store and fetch large quantities of data as it uses consistent client caching rather than TTL time-based caching; quorum based consensus reduce the number of servers needed for client to make progress
  3. Only lock-interaction operations will be assigned with a sequence to ensure

Paper Review: Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center

Paper Review:

Summary:

Mesos is a thin layer application between different frameworks and resources, dynamically assigning tasks to a cluster of machines. Because of the different tasks and constraints of frameworks, the task assignment should be made with a lot of considerations. The overall system seems quite simple and elegant, which makes the results even better.

Strong points:

  1. The system takes a lot of things into consideration during the assignment: the constraints of frameworks, the data location, the time of the tasks, the priority and the fairness. But yet all these considerations could be expressed in simple values and dynamically changed during the run;
  2. Mesos leaves a lot of room for the frameworks, which we can see from it’s implementation with Hadoop. It takes advantages of the fine-grained task assigning of Hadoop and reduce the overhead when Mesos and Hadoop work together. This is something that static partitioning tool could have a hard time to achieve;
  3. Mesos master is implemented with minimum communication between the upper and lower layers. Also the state of master is replicated in other standby masters and managed by ZooKeeper. The data in the memory of Mesos master could be constructed from the frameworks and slaves, which makes the system more resilient.

Weak point:

  1. While the tasks are isolated with Linux Container and separated cores and memory chips, there are still shared system resources like the network bandwidth, which is not taken into consideration during the run as well;
  2. The actual task assignment is still performed by a centralized machine and the bandwidth of other standby masters are wasted. The Mesos master might face short-time network exhaustion during run caused by similar tasks finished all at once. This might explain why the Hadoop tasks have spiky share of cluster performance;
  3. There isn’t much in the paper talking about the resource offer preference is calculated from the data location and other factors (it simply mentioned how to select the suitable frameworks with lottery scheduling with given preference si). I guess the preference could be useful if we can take advantage of the data location by assigning the tasks to nodes which already have the necessary data to save the transfer time. But I guess this is something that’s hard to keep track of and it requires the the frameworks to cooperate, which makes Mesos less adaptive after all.

 

Paper Notes:

  1. Sharing a cluster is hard. There are two common solutions: statically partition the cluster and run one framework per partition or allocate a set of VMs to each framework. However, there is no way to perform fine-grained sharing across the frameworks. Mesos is a thin layer that lies between frameworks and cluster resources.
  2. A centralized scheduler cannot achieve the complexity and scalability. It cannot adapt to new frameworks. Also it overlaps with the frameworks’ scheduler.
  3. Mesos will offer frameworks with resources based on an organizational policy such as fair sharing and the frameworks decide which ones to accept and the tasks to run on them.
  4. Mesos has minimal interfaces and pushes the control to the frameworks to 1) leave more room for various frameworks and 2) keeps Mesos simple, scalable and low in system requirements
  5. Frameworks resources could be revoked due to a buggy job or a greedy framework; however, frameworks can be assigned with resources with guarantee so the tasks won’t be killed as long as the framework is below the guaranteed allocation.
  6. Linux containers are used for task isolation
  7. ZooKeeper for leader election from all the standby masters. Mesos master’s memory could be reconstructed from the frameworks and resource slaves
  8. Mandatory and preferred resources; lottery scheduling; resource reservation for short tasks

Paper Review: Mesa: Geo-Replicated, Near Real-Time, Scalable Data Warehousing

Paper Review:

Mesa is another Google’s data store layered upon Colossus and Bigtable, primarily designed for ads campaign data storage. The versioned kv store with aggregation is scalable and replicated globally. The metadata is consistently updated with Paxos and the data is batched and transferred every few minutes.

Strong points:

  1. In the chapter “experiences and lessons learned”, layered design was mentioned as a key design feature of Google products. Mesa is nicely layered and decoupled in both horizontal and vertical directions. It was build up on Colossus and Bigtable so we can see there isn’t too much about the read/write topics, which are covered in Bigtable. Inside the architecture, it has workers/servers, controllers and global services. The data maintenance/update and query is also decoupled. While there might be some overhead, but it enables clear designing, easy problem identification and detailed performance analysis.
  2. The resume key with streaming transmission is interesting. This way the failed/disconnected query server won’t waste clients’ time since the read operations could be continued on another server instead of firing the query once more.
  3. Parallelizing the worker operation using MapReduce  and linked schema change are good ideas. MapReduce could save days of computation time and the schema change, while not applicable in every scenario and add some computation on the query path, saves 50% of disk space than the traditional simple schema change.  This could be a life saver since Mesa requires a lot of storage space in the first place.

Weak points:

  1. The  controllers assign different tasks to different types of workers. The failure of workers will eventually be captured by the timer maintained for each task in the controllers. However, the timers could add a lot of work to the controllers if there are many tasks running simultaneously. On the other hand, work failures could result in long latency for the corresponding queries since before the timer expires and the tasks get re-assigned. This kind of latency could be resolved if the controllers/workers exchange heartbeat messages so that the worker failures could be detected earlier before the timer runs out.
  2. Query servers are assigned with their own responsible range of data to take advantage of prefetching and caching. However, this assignment could be a little inflexible if there are a lot of queries on the similar data. This way we will only have a small amount of query servers fetching data from Colossus for a long time while the rest of servers are doing nothing. This will be a performance bottleneck and waste of resource if we assume that the read operation in Colossus is lock-free and scales well. Also since we are on the chapter of querying, the global locator service is really vague in the paper. I assume that it is a stateless process running the controllers with the data replicated in Bigtable as well.
  3. The replication mechanism does not make too much sense to me. The Paxos will only make sure that metadata is replicated consistently by majority of Mesa instance and the data replication will be left behind without any strong guarantee. So for each Mesa instance (datacenter), the metadata could be out-dated if it failed during the Paxos; and even if it works fine during Paxos, there’s not much guarantee on the consistency of the actual data.
  4. There are two methods of data validation, the online one by re-aggregating the rows and check for computation errors and the off-line one, which is a light-weight process spanning the recent committed data. There are two problems with the corruption recovery: 1) the online checking will be perform in every update and query, which could result in unnecessary checking and increased latency and load; 2) the data is replicated asynchronously. If the freshly updated copy gets corrupted, there’s no other replicas that could help with the recovery

 

 

Notes:

Data is partitioned and replicated horizontally to achieve scalability and availability. (does this imply NoSQL?)

Multi-version key-value data store with Paxos for consistency

Leverage Bigtable and the Paxos tech underlying Spanner for metadata storage and maintenance.

Asynchronously replication for application data; synchronous replication for metadata.

Dealing with corruption for software and hardware

A query  to Mesa consists of a version number and a predicate P on the key space. And the response contains the corresponding P and a version number between 0 and n.

Strict ordering of updates can ensure atomicity and fraud detection of negative facts.

Mesa pre-aggregates certain versioned data (between v1 and v2 inclusively) and call it delta. Base compaction is the process of merging some of the version in to [0, B] and versions before base are no longer accessible. This kind of idea is commonly used (like append-only GFS) but it could be problematic if old-version data is still useful. Older data are stored in a versioned and expensive way in terms of the process of aggregation which means that Mesa is necessary for cleaning things up but how many versions are we going to keep is gonna be hard to determine for various kinds of data. Since Mesa is primary used for ads campaign data collection, which has uniform and specific requirements in terms of data storage and aggregation, this issue doesn’t seem to matter too much.

Each table has one or more indexes and each table index has its own copy of data that is sorted accordingly for fast search. And there is an index file containing short keys for row blocks for faster localizing the row block.

Metadata is stored in Bigtable and in the memory of controller. Each datacenter has one controller (I assume) and it does not directly interact with the tables. There are different tasks for data maintenance like updates, compaction, schema change and checksum. Note that the last two require coordination of different Mesa instances (datacenters). A set of workers of different types polling the controller for task of their own types. Each task will be assigned with a timer so failed workers won’t affect the system. The controller is also sharded and stateless with all the metadata consistently stored in Bigtable so Mesa is resilient to controller failures. A garbage collector runs separately and continuously, which reads from the metadata in Bigtable and delete the unwanted files in Colossus.

Mesa handles queries with different requirement with different labels and priorities. A set of query servers could access any table in principle but Mesa will direct  queries with the same range of data to the a subset of the query servers to take advantage of the pre-fetching and caching. Global locator service is used to coordinate the query servers.

The operations are batched in Mesa instances once every few minutes. A stateless global committer will assign each updates batch a version number and use Paxos to reinforce the data consistency. Controllers in every Mesa instance will be responsible for the updates batches and acknowledging the committer. There is no locking. Note that in this case metadata is replicated synchronously with Paxos while the data is incorporated asynchronously by various Mesa instances.

New Mesa instances will bootstrap with p2p load mechanism.

 

Paper Review: Spanner Google’s Globally-Distributed Database

Paper Review:

Summary:

Spanner is Google’s new global data store with semi-relational data model and standard query language. It uses Paxos and 2PC for operations and uses bounded real time for external consistent transactions.

Strong points:

  1. Spanner switches from NoSQL to NewSQL (?), which is easy to work with (with semi-relational data model and query language) and excellent scalability; however, the data is also version-ed (with TrueTime timestamps) so clients can decide if the read is up-to-date.
  2. TrueTime is just impressive. It enables external consistency and a bunch of cool features like consistent snapshot read across the data centers and dynamic schema changes. It’s like having a wall clock for all the replicas with bounded uncertainty. Not to mention that the uncertainty is controlled sophisticated using GPS and atomic clocks as underlying hardware and algorithm for lair detection;
  3. Data are stored in tablets, which are also classified into different “buckets”. Applications can control the locality of data by carefully assigning keys to the data. This feature could potentially lower the latency (by choosing closer datacenters for storage);
  4. Dynamic controlled replication configuration might be helpful when the application is trying to change the data location or replication factors during the run.

Weak points:

  1. The write operations are still using Paxos for consensus and two phase commit during the transaction. It enforces strong consistency for sure but a) the master could be troublesome. Master failover might result in long waiting time and b) communication overhead is inevitable which increase the latency of every transaction;
  2. TrueTime is sophisticated designed with redundant hardware support and algorithms to verify its correctness. However, the write transactions (to a single Paxos group) performed during a period of time is bounded by epsilon and so is the system’s overall accuracy. Epsilon is caused mainly with hardware errors and hard to be eliminated, which means that Spanner is unlikely to have better writing performance or timestamp accuracy;
  3. Since the system’s ordering is based on clock time and the clock time is uncertain, there are many occasions that we have to wait till the system is definitely sure that the previous event is already done even when the waiting is simply for the purpose of making TT.after true. For example, the commit timestamp, even with all the replicas get back to leader, it still has to wait till it’s certain about the timing;
  4. If a TrueTime API is used with a faulty timestamp, say it fires a read operation in the future, will it block other transactions, or get halted, or return with an error?

 

 

Paper Outline:

  1. Introduction:

    • Globally scale database that shards data across many sets of Paxos state machines in the highest level of abstraction;
    • Main focus is managing cross-datacenter replicated data but also designing/implementing important database features;
      • Bigtable (NoSQL) can be difficult for applications with complex schema or strong consistency in the presence of wide-area replication;
      • Megastore (semi-relational data model) supports synchronous replication but has poor write throughput;
    • Spanner is evolved from Bigtable where  data is stored in schematized semi-relational tables and version-ed; it provides a SQL-based query language;
      • replications configuration can be dynamically controlled;
      • externally consistent read/write operations;
      • these features are enabled by the globally-assigned timestamps, which is supported by the TrueTime API and its implementation;
  2. Implementation:

    • Overall structure:

      • a Spanner deployment is called a universe;
      • Spanner is organized with a set of zones which are unit of administrative deployment and resemble data centers;
      • each zone has:
        • one zonemaster;
        • hundreds of spanservers (roughly analog to Bigtable servers);
        • location proxies are used by clients to locate data;
      • universe master and placement driver are singletons:
        • universe master is primary a console that displays stats info;
        • placement driver handles auto movement of data across zones;
    • Spanserver software stack:

      • spanserver structure:
        • each spanserver is responsible for 100 to 1,00 instances of tablets, which is similar to Bigtable’s tablet abstraction;
        • unlike Bigtable, Spanner assigns timestamps to data, which makes it more of a multi-version database than a key-value store;
        • tablet states are stored in B-tree-like files and a write-ahead log;
        • all storage happens on Colossus;
      • coordination and consistency:
        • a single Paxos state machine for each spanserver;
        • a state machine stores its metadata and log in corresponding tablet;
        • long-lived leaders and time-based leader leases for Paxos;
        • every Paxos writes twice: in the tablet log and in the Paxos log;
        • writes must initiate Paxos protocol at the leader but reads access state directly from the underlying tablet as long as it’s up-to-date;
        • each Paxos leader implements a lock table for concurrency control:
          • lock table contains the state of two-phase locking;
          • only operations require synchronization acquire locks;
        • each Paxos leader implements a transaction manager to support distributed transactions:
          • used to implement a participant leader;
          • transaction involves only one Paxos group will bypass the transaction manager;
          • for transactions that involves multiple Paxos groups:
            • one of the participant group is chosen as the coordinator;
            • others are referred as coordinator slaves;
    • Directories and placement:

      • a bucket of contiguous keys that share a common prefix is a directory which allows applications to control the locality of data by choosing keys;
      • all data in a directory share the same replication configuration and could only be moved directory by directory (while the client operations are still ongoing);
      • not necessarily a single lexicographically contiguous partition of the row space but  instead a container that may encapsulate multiple partitions of the row space so that directories could be put together;
      • Movedir task:
        • the background task moving directories between Paxos groups;
        • also used to add/remove replicas to Paxos groups;
        • a part-by-part background process between two Paxos groups;
      • directory is also the smallest unit whose placement can be specified;
        • administrators control the number and types of replicas, and the geographic placement of those replicas;
        • an application controls how data is replicated, by tagging each database and/or individual directories with a combination of those options;
      • shard a directory into multiple fragments if it grows too large;
        • fragments could be served by different Paxos groups;
        • movedir in this case will actually move fragments not directories;
    • Data model:

      • data features for applications:
        • a data model based on schematized semi-relational tables;
          • used by  Megastore; simpler to manage unlike Bigtable;
          • synchronous replication across datacenters unlike Bigtable which only supports eventual consistency;
        • a query language;
          • because of popularity of Dremel as an interactive data analysis;
        • general purpose transactions;
          • complaint on the lack of cross-row transactions in Bigtable;
          • two-phase commit over Paxos mitigates the availability problems (but expensive to support);
        • application data model:
          • layered on the directory-bucketed key-value mapping;
          • an application can create one or more database in a universe;
          • a database can contain unlimited schematized tables;
          • uses a SQL-like query language with extra features;
      • Spanner data model:
        • not purely relational because every table is required to have an ordered set of one or more primary-key columns;
        • each table defines a mapping from the primary-key columns to non-primary-key columns;
        • it lets applications control data locality through key choices;
  3. TrueTime:

    • TureTime

      • explicitly represents time as a TTinterval with bounded time uncertainty, which is different from standard time interface;
    • GPS and atomic clocks failure modes:

      • GPS reference-source vulnerabilities:
        • antenna and receiver failures;
        • local radio interference;
        • correlated failures;
        • GPS system outages;
      • atomic failures;
        • time drift due to frequency error;
    • master/slave implementation:

      • a set of time master machines per datacenter;
        • majority of them have GPS and geographically separated;
          • reduce the effect to failures;
          • uncertainty close to zero;
        • the rest have atomic clocks and are called Armageddon masters;
          • slowly increasing time uncertainty;
        • regularly compared against each other and local clock;
      • timeslave daemon per machine:
        • polls a variety types of masters;
        • applies a variant of Marzullo’s algorithm to detect and reject lairs;
        • worst-case local clock drift is a saw-tooth function;
          • master clock uncertainty, communication delay, local drift;
  4. Concurrency control:

    • Supported operations:

      • read-write transaction;
        • pessimistic and requires leader replication;
      • read-only transactions;
        • not a read-write transaction without write; non-blocking;
      • snapshot reads;
    • Timestamp management:

      • Paxos leader leases:
        • long-live leader is selected with a quorum-based vote;
        • lease could be extended on a successful write or near expiration;
      • assigning timestamps to RW transactions:
        • Spanner assigns timestamp that Paxos assigns to the Paxos write;
        • external consistency: if the start of a transaction T_2 is later than T_1, then the timestamp of T_2 must be greater than T_1;
        • start: the coordinator leader for a write T_i assigns a commit timestamp s_i no less than the value of TT.now().lastest;
        • commit wait: the coordinator leader ensures that clients cannot see any data committed by Ti until TT.after(s_i) is true;
      • serving reads at a timestamp:
        • every replica tracks a value called safe time t_safe, which is the maximum timestamp at which a replica is up-to-date;
      • assigning timestamps to RO transactions:
        • two-phase transaction: timestamp s_read assigning and execute snapshot reads at s_read;
    • Details:

      • read-write transactions:
        • issues read to the leader replica of the appropriate group;
        • wound-wait read recent data with timestamps;
        • keep-alive messages to the leader to maintain locks;
        • a Paxos algorithm with TrueTime to enforce consistency;
        • buffered writes until two phase commit with the same timestamps on all participants;
      • read-only transactions:
        • scope required for read-only transactions, which summarizes the keys read in the transaction;
        • contact the leader when the scope’s values are served by one Paxos group;
          • S_read ensures that last write is returned;
        • multiple Paxos group read:
          • a round of communication with all the leaders;
          • just read S_read = TT.now().latest to see the sufficiently up-to-date values;
      • schema-change transactions:
        • generally non-blocking variant of a standard transaction;
        • reserve a timestamp in the future and block the transactions with bigger timestamps behind the schema changes;
      • refinements:
        • lock with metadata to prevent false conflicts;
        • use leader-lease interval for snapshot read after a Paxos write;
  5. Evaluation:

    • Microbenchmarks:

    • Availability:

    • TrueTime:

    • F1

  6. Related work:

  7. Future work:

  8. Conclusion: