Paper Review: Dremel: Interactive Analysis of Web-Scale Datasets

Paper Review:


Dremel is an interactive ad hoc query system designed to handle real-time web-scale queries with low latency. It uses tree-structured servers and columnar storage with replication to achieve great performance over MapReduce.

Strong points:

  1. Columnar storage is better for faster column retrieval and feature extraction. It exposes data in a more feature-oriented way and make programs easy to process data column by column with better data locality.
  2. Tree structure of server is great. There are more than two layers of servers, which is pretty much all we’ve seen so far (one for metadata and one for data). This structure reminds me of the biggest query system on earth: DNS. With a intermediate layer, the system benefits from one more layers of cache and way better scalabilty. Also it has the potential to yet expand to a bigger scale with another intermediate layer.
  3. The performance is great comparing to similar systems like Hive, which is not real time. It delivers all the requirements as a real-time interactive ad hoc querying system. However, it seems to me that Apache Drill could achieve pretty much the same thing with flexibility on data types.

Weak points:

  1. Relatively poor performance when few columns are read or dealing with unstructured data. In that case we cannot take advantage of the columnar storage. But I guess they are pretty sure about the query types that Dremel is going to handle so it’s fine. Dremel is design to deal with the Ad Hoc queries of structured data ready to be analysed.
  2. I don’t think MapReduce and Dremel make a valid comparison. Of course users can still use MapReduce do perform Dremel’s query and analysis job but that’s not what MapReduce is designed for, which is distributed batch processing. Those two are more complimentary rather than comparable to me, and that’s exactly what the authors suggested in the observation in the paper: “MR and query processing can be used in a complementary fashion; one layer’s output can feed another’s input”.
  3. There’s not you can do to modify the data (update or creation) expect append, which limits what users could perform with Dremel. I guess implement update method is not in their development priority since the data analysis rarely used modification but still it’s nice to have a flexible way to change the data.

Paper Review: Pregel: A System for Large-Scale Graph Processing

Paper Review:


Graph problems are naturally hard to be parallelized and distributed because of all the dependency of the vertices and edges. Pregel offers a intuitive way of handling graph processing by using N machines to handle M vertices separately (M >> N). There is a single master in this system, used for coordination, aggregation and analysis.

I tried a similar approach last semester by using threads to simulate nodes in a clustering problem. I believe it’s a good model for demonstration but expensive in terms of communication and coordination overhead.

Strong points:

  1. The way they divide the work is straightforward and intuitive. All the vertices are acting independently with the message passing interface and coordination. Computation is also divided into multiple steps bounded with barriers, which is a model widely used for parallel programming;
  2. Combiners could be useful to reduce the messages that needs to be buffered and processed, hence decrease the running time for each superstep. It works good with shortest path computing;
  3. It’s flexible in many ways. There are different input methods; also the user-defined handlers could be used when the target vertex doesn’t exist or for topology mutation. These features make Pregel suitable for more graph-related problems;

Weak points:

  1. Load balancing and vertices partitioning in topology are hard to solve. Pregel doesn’t care about the load balancing and simply encapsulate steps between barriers regardless of the execution time for each machine. It’s very likely that some vertices have more computation to do because they have more edges or perform as cluster head in the graph. In that case, the threads running for these vertices are going to take longer in each step. The default hashing vertices partitioning might group nodes with heavy computation jobs together. So some machines are going to finish much faster than the others and waste their time by waiting for the barriers. In Pregel, there is no load-balancing solution before or during the run.
  2. Since the whole system is considered synchronous in terms of each step, what’s the actual use of the outdated checkpoints? Re-computation for recovery doesn’t make too much sense because the logged messages from healthy partitions don’t necessarily contain enough information to fully cover the lost partition. For example, if there is an island of nodes in the graph, which sends and receives no messages to/from other part of the graph before the crash, then how could we re-compute the states of the island nodes from the messages of other vertices after reloading the checkpoints. And as we are on the topic of fault tolerance, I’m not sure how does Pregel backup/recover the master.
  3. The paper doesn’t specify the guarantees for messages. It seems to me that slave machines interacting, reporting to the master and getting ACKed by the master could take awfully long if loss and congestion happens. And if one machine gets delayed, the whole barrier is pushed late as well. I’m wondering about how long would a single machine take to finish the same job in the evaluation and I’m almost certain that it takes shorter than we expected. The system doesn’t scale well partially because of the communication overhead.



Paper Notes:

  1. During every superstep, a user-defined function is executed for each vertex;
  2. Pregel programs are inherently lock-free and no race condition;
  3. Vertices are identifiable with a string and they are assigned with a value to represent the state; edges have source and destination with a user defined value as well;
  4. A vertex can vote to halt when it has done all of its work. However, it could still be revoked by receiving messages. The computation is over when all the vertices are inactive;
  5. Aggregators can be used to global reduction and coordination;
  6. Topology mutations processed in order (removal then addition) and conflicts are resolved with user defined handles.
  7. A single machine is chosen as the master, which is responsible for the coordination of the group;
  8. Periodic ping message is used for failure detection; fault tolerance is achieved through checkpointing

Paper Review: The Chubby lock services for loosely-coupled distributed systems

Paper Review:


Chubby is a lock service provided to a large number of clients for access control and coordination. It’s made of typically 5 machines and one of them is master. The write operations require ACK from a quorum of machines and reads only need to access the current master.

Strong points:

  1. Chubby exports a file system with APIs similar to UNIX filesystem. Tree of files are stored in directories which makes the access structured and easier for all users. There are some differences though, like the file moving, directory access control and last-access times;
  2. The file handles with sequence number prevent checking at each read/write access and easy to see which master assigned the handles without referring to other replicas. This mechanism is good for small file with frequent accesses because checks are done during the open of handles instead of actual access. Each handle could be reused unless the master assigning the handle fails.
  3. Since the read operations greatly outnumber writes, data is cached in the client side to reduce the traffic of polling Chubby masters for every single read. The master will notify all the clients that possibly cached the data when the data is modified and invalidate the caches. I can imagine it saves a huge number of messages during the run. However, a timed-cached (TTL cache) combined with this mechanism might be better because as the system runs, clients with possible will increase over time. For every read from the clients, the master is going to return the data with a validation time limit, which implies that cache should not be used after this period of time. On the other hand, the master will still invalidate all the clients whose cache is still in the time limit upon modification. This way we can avoid the case when the master has to send too many invalidation messages after several consecutive writes and lead to bursty network usage. This is different from the KeepAlives jeopardy because the TTL is for every cached data while jeopardy is for every client

Weak points:

  1. This is trivial but still not thoroughly explained. The paper mentioned that the clients need so multicast request to all the Chubby replicas stored in DNS in order to locate the master, which means that all the Chubby replicas are managed by another service that keep track of the states and placement of Chubby replicas. The read operation only goes to Chubby master, but if the master is freshly selected, then either 1) the read service will be blocked for a while until the new master is up-to-date by checking with all other replicas or 2) the read returns a insufficiently updated value stored in the new master;
  2. The sequence numbers are assigned to any operations that interact with locks. And if any lock is lost in a abnormal circumstances like failure, the other clients have to wait for a period (1 minute). However, why don’t master actively sends messages to the lock-holding clients to check for liveness instead of waiting. At least having the option of proactively checking the clients could benefit systems with higher requirement on availability. (This could definitely use the TrueTime API to make sure of the ordering of events from different clients and save a lot of waiting/messages);
  3. When I was first reading the paper, I felt that the communication between Chubby clients and cells overlaps with TCP a lot in terms of their functionalities: both maintains a session with sequence number; both requires message exchange to keep alive otherwise the session expires. So I assumed that TCP is a nature for Chubby with all the strict ordering, congestion control and guarantees on the delivery. But the paper claims the TCP back-off policy lead to a lot of KeepAlives timeouts because the transport layer protocol is unaware of the timing of upper layer processes. However, the main reason we are triggering the back-off policy is the bursty message pattern, which is not discussed in the paper. If we can take advantage of the transport layer session to reduce the number of messages used for KeepAlives, then then back-off won’t be a problem in most scenarios. An alternative is to build a new reliable congestion-control protocol upon UDP with optimizations and priority for time-critical messages.
  4. Chubby master is definitely a bottleneck of the overall performance because all the operations, read or write, have to go through the master. On the other hand, Chubby slaves’ computation power and bandwidth is under-utilized. I guess it could be viewed as a down-side of the high consistency of Chubby, which is maintained by a single master decision making mechanism.


Paper Notes:

  1. Reliability, availability and scalability are considered the most important while the throughput and storage capacity are considered secondary.
  2. Lock services, although cannot offer a standard framework for programmers like a library embodying Paxos, maintains high availability as the system scales up; it’s suitable for clients to store and fetch large quantities of data as it uses consistent client caching rather than TTL time-based caching; quorum based consensus reduce the number of servers needed for client to make progress
  3. Only lock-interaction operations will be assigned with a sequence to ensure

Paper Review: Mesa: Geo-Replicated, Near Real-Time, Scalable Data Warehousing

Paper Review:

Mesa is another Google’s data store layered upon Colossus and Bigtable, primarily designed for ads campaign data storage. The versioned kv store with aggregation is scalable and replicated globally. The metadata is consistently updated with Paxos and the data is batched and transferred every few minutes.

Strong points:

  1. In the chapter “experiences and lessons learned”, layered design was mentioned as a key design feature of Google products. Mesa is nicely layered and decoupled in both horizontal and vertical directions. It was build up on Colossus and Bigtable so we can see there isn’t too much about the read/write topics, which are covered in Bigtable. Inside the architecture, it has workers/servers, controllers and global services. The data maintenance/update and query is also decoupled. While there might be some overhead, but it enables clear designing, easy problem identification and detailed performance analysis.
  2. The resume key with streaming transmission is interesting. This way the failed/disconnected query server won’t waste clients’ time since the read operations could be continued on another server instead of firing the query once more.
  3. Parallelizing the worker operation using MapReduce  and linked schema change are good ideas. MapReduce could save days of computation time and the schema change, while not applicable in every scenario and add some computation on the query path, saves 50% of disk space than the traditional simple schema change.  This could be a life saver since Mesa requires a lot of storage space in the first place.

Weak points:

  1. The  controllers assign different tasks to different types of workers. The failure of workers will eventually be captured by the timer maintained for each task in the controllers. However, the timers could add a lot of work to the controllers if there are many tasks running simultaneously. On the other hand, work failures could result in long latency for the corresponding queries since before the timer expires and the tasks get re-assigned. This kind of latency could be resolved if the controllers/workers exchange heartbeat messages so that the worker failures could be detected earlier before the timer runs out.
  2. Query servers are assigned with their own responsible range of data to take advantage of prefetching and caching. However, this assignment could be a little inflexible if there are a lot of queries on the similar data. This way we will only have a small amount of query servers fetching data from Colossus for a long time while the rest of servers are doing nothing. This will be a performance bottleneck and waste of resource if we assume that the read operation in Colossus is lock-free and scales well. Also since we are on the chapter of querying, the global locator service is really vague in the paper. I assume that it is a stateless process running the controllers with the data replicated in Bigtable as well.
  3. The replication mechanism does not make too much sense to me. The Paxos will only make sure that metadata is replicated consistently by majority of Mesa instance and the data replication will be left behind without any strong guarantee. So for each Mesa instance (datacenter), the metadata could be out-dated if it failed during the Paxos; and even if it works fine during Paxos, there’s not much guarantee on the consistency of the actual data.
  4. There are two methods of data validation, the online one by re-aggregating the rows and check for computation errors and the off-line one, which is a light-weight process spanning the recent committed data. There are two problems with the corruption recovery: 1) the online checking will be perform in every update and query, which could result in unnecessary checking and increased latency and load; 2) the data is replicated asynchronously. If the freshly updated copy gets corrupted, there’s no other replicas that could help with the recovery




Data is partitioned and replicated horizontally to achieve scalability and availability. (does this imply NoSQL?)

Multi-version key-value data store with Paxos for consistency

Leverage Bigtable and the Paxos tech underlying Spanner for metadata storage and maintenance.

Asynchronously replication for application data; synchronous replication for metadata.

Dealing with corruption for software and hardware

A query  to Mesa consists of a version number and a predicate P on the key space. And the response contains the corresponding P and a version number between 0 and n.

Strict ordering of updates can ensure atomicity and fraud detection of negative facts.

Mesa pre-aggregates certain versioned data (between v1 and v2 inclusively) and call it delta. Base compaction is the process of merging some of the version in to [0, B] and versions before base are no longer accessible. This kind of idea is commonly used (like append-only GFS) but it could be problematic if old-version data is still useful. Older data are stored in a versioned and expensive way in terms of the process of aggregation which means that Mesa is necessary for cleaning things up but how many versions are we going to keep is gonna be hard to determine for various kinds of data. Since Mesa is primary used for ads campaign data collection, which has uniform and specific requirements in terms of data storage and aggregation, this issue doesn’t seem to matter too much.

Each table has one or more indexes and each table index has its own copy of data that is sorted accordingly for fast search. And there is an index file containing short keys for row blocks for faster localizing the row block.

Metadata is stored in Bigtable and in the memory of controller. Each datacenter has one controller (I assume) and it does not directly interact with the tables. There are different tasks for data maintenance like updates, compaction, schema change and checksum. Note that the last two require coordination of different Mesa instances (datacenters). A set of workers of different types polling the controller for task of their own types. Each task will be assigned with a timer so failed workers won’t affect the system. The controller is also sharded and stateless with all the metadata consistently stored in Bigtable so Mesa is resilient to controller failures. A garbage collector runs separately and continuously, which reads from the metadata in Bigtable and delete the unwanted files in Colossus.

Mesa handles queries with different requirement with different labels and priorities. A set of query servers could access any table in principle but Mesa will direct  queries with the same range of data to the a subset of the query servers to take advantage of the pre-fetching and caching. Global locator service is used to coordinate the query servers.

The operations are batched in Mesa instances once every few minutes. A stateless global committer will assign each updates batch a version number and use Paxos to reinforce the data consistency. Controllers in every Mesa instance will be responsible for the updates batches and acknowledging the committer. There is no locking. Note that in this case metadata is replicated synchronously with Paxos while the data is incorporated asynchronously by various Mesa instances.

New Mesa instances will bootstrap with p2p load mechanism.


Paper Review: Spanner Google’s Globally-Distributed Database

Paper Review:


Spanner is Google’s new global data store with semi-relational data model and standard query language. It uses Paxos and 2PC for operations and uses bounded real time for external consistent transactions.

Strong points:

  1. Spanner switches from NoSQL to NewSQL (?), which is easy to work with (with semi-relational data model and query language) and excellent scalability; however, the data is also version-ed (with TrueTime timestamps) so clients can decide if the read is up-to-date.
  2. TrueTime is just impressive. It enables external consistency and a bunch of cool features like consistent snapshot read across the data centers and dynamic schema changes. It’s like having a wall clock for all the replicas with bounded uncertainty. Not to mention that the uncertainty is controlled sophisticated using GPS and atomic clocks as underlying hardware and algorithm for lair detection;
  3. Data are stored in tablets, which are also classified into different “buckets”. Applications can control the locality of data by carefully assigning keys to the data. This feature could potentially lower the latency (by choosing closer datacenters for storage);
  4. Dynamic controlled replication configuration might be helpful when the application is trying to change the data location or replication factors during the run.

Weak points:

  1. The write operations are still using Paxos for consensus and two phase commit during the transaction. It enforces strong consistency for sure but a) the master could be troublesome. Master failover might result in long waiting time and b) communication overhead is inevitable which increase the latency of every transaction;
  2. TrueTime is sophisticated designed with redundant hardware support and algorithms to verify its correctness. However, the write transactions (to a single Paxos group) performed during a period of time is bounded by epsilon and so is the system’s overall accuracy. Epsilon is caused mainly with hardware errors and hard to be eliminated, which means that Spanner is unlikely to have better writing performance or timestamp accuracy;
  3. Since the system’s ordering is based on clock time and the clock time is uncertain, there are many occasions that we have to wait till the system is definitely sure that the previous event is already done even when the waiting is simply for the purpose of making TT.after true. For example, the commit timestamp, even with all the replicas get back to leader, it still has to wait till it’s certain about the timing;
  4. If a TrueTime API is used with a faulty timestamp, say it fires a read operation in the future, will it block other transactions, or get halted, or return with an error?



Paper Outline:

  1. Introduction:

    • Globally scale database that shards data across many sets of Paxos state machines in the highest level of abstraction;
    • Main focus is managing cross-datacenter replicated data but also designing/implementing important database features;
      • Bigtable (NoSQL) can be difficult for applications with complex schema or strong consistency in the presence of wide-area replication;
      • Megastore (semi-relational data model) supports synchronous replication but has poor write throughput;
    • Spanner is evolved from Bigtable where  data is stored in schematized semi-relational tables and version-ed; it provides a SQL-based query language;
      • replications configuration can be dynamically controlled;
      • externally consistent read/write operations;
      • these features are enabled by the globally-assigned timestamps, which is supported by the TrueTime API and its implementation;
  2. Implementation:

    • Overall structure:

      • a Spanner deployment is called a universe;
      • Spanner is organized with a set of zones which are unit of administrative deployment and resemble data centers;
      • each zone has:
        • one zonemaster;
        • hundreds of spanservers (roughly analog to Bigtable servers);
        • location proxies are used by clients to locate data;
      • universe master and placement driver are singletons:
        • universe master is primary a console that displays stats info;
        • placement driver handles auto movement of data across zones;
    • Spanserver software stack:

      • spanserver structure:
        • each spanserver is responsible for 100 to 1,00 instances of tablets, which is similar to Bigtable’s tablet abstraction;
        • unlike Bigtable, Spanner assigns timestamps to data, which makes it more of a multi-version database than a key-value store;
        • tablet states are stored in B-tree-like files and a write-ahead log;
        • all storage happens on Colossus;
      • coordination and consistency:
        • a single Paxos state machine for each spanserver;
        • a state machine stores its metadata and log in corresponding tablet;
        • long-lived leaders and time-based leader leases for Paxos;
        • every Paxos writes twice: in the tablet log and in the Paxos log;
        • writes must initiate Paxos protocol at the leader but reads access state directly from the underlying tablet as long as it’s up-to-date;
        • each Paxos leader implements a lock table for concurrency control:
          • lock table contains the state of two-phase locking;
          • only operations require synchronization acquire locks;
        • each Paxos leader implements a transaction manager to support distributed transactions:
          • used to implement a participant leader;
          • transaction involves only one Paxos group will bypass the transaction manager;
          • for transactions that involves multiple Paxos groups:
            • one of the participant group is chosen as the coordinator;
            • others are referred as coordinator slaves;
    • Directories and placement:

      • a bucket of contiguous keys that share a common prefix is a directory which allows applications to control the locality of data by choosing keys;
      • all data in a directory share the same replication configuration and could only be moved directory by directory (while the client operations are still ongoing);
      • not necessarily a single lexicographically contiguous partition of the row space but  instead a container that may encapsulate multiple partitions of the row space so that directories could be put together;
      • Movedir task:
        • the background task moving directories between Paxos groups;
        • also used to add/remove replicas to Paxos groups;
        • a part-by-part background process between two Paxos groups;
      • directory is also the smallest unit whose placement can be specified;
        • administrators control the number and types of replicas, and the geographic placement of those replicas;
        • an application controls how data is replicated, by tagging each database and/or individual directories with a combination of those options;
      • shard a directory into multiple fragments if it grows too large;
        • fragments could be served by different Paxos groups;
        • movedir in this case will actually move fragments not directories;
    • Data model:

      • data features for applications:
        • a data model based on schematized semi-relational tables;
          • used by  Megastore; simpler to manage unlike Bigtable;
          • synchronous replication across datacenters unlike Bigtable which only supports eventual consistency;
        • a query language;
          • because of popularity of Dremel as an interactive data analysis;
        • general purpose transactions;
          • complaint on the lack of cross-row transactions in Bigtable;
          • two-phase commit over Paxos mitigates the availability problems (but expensive to support);
        • application data model:
          • layered on the directory-bucketed key-value mapping;
          • an application can create one or more database in a universe;
          • a database can contain unlimited schematized tables;
          • uses a SQL-like query language with extra features;
      • Spanner data model:
        • not purely relational because every table is required to have an ordered set of one or more primary-key columns;
        • each table defines a mapping from the primary-key columns to non-primary-key columns;
        • it lets applications control data locality through key choices;
  3. TrueTime:

    • TureTime

      • explicitly represents time as a TTinterval with bounded time uncertainty, which is different from standard time interface;
    • GPS and atomic clocks failure modes:

      • GPS reference-source vulnerabilities:
        • antenna and receiver failures;
        • local radio interference;
        • correlated failures;
        • GPS system outages;
      • atomic failures;
        • time drift due to frequency error;
    • master/slave implementation:

      • a set of time master machines per datacenter;
        • majority of them have GPS and geographically separated;
          • reduce the effect to failures;
          • uncertainty close to zero;
        • the rest have atomic clocks and are called Armageddon masters;
          • slowly increasing time uncertainty;
        • regularly compared against each other and local clock;
      • timeslave daemon per machine:
        • polls a variety types of masters;
        • applies a variant of Marzullo’s algorithm to detect and reject lairs;
        • worst-case local clock drift is a saw-tooth function;
          • master clock uncertainty, communication delay, local drift;
  4. Concurrency control:

    • Supported operations:

      • read-write transaction;
        • pessimistic and requires leader replication;
      • read-only transactions;
        • not a read-write transaction without write; non-blocking;
      • snapshot reads;
    • Timestamp management:

      • Paxos leader leases:
        • long-live leader is selected with a quorum-based vote;
        • lease could be extended on a successful write or near expiration;
      • assigning timestamps to RW transactions:
        • Spanner assigns timestamp that Paxos assigns to the Paxos write;
        • external consistency: if the start of a transaction T_2 is later than T_1, then the timestamp of T_2 must be greater than T_1;
        • start: the coordinator leader for a write T_i assigns a commit timestamp s_i no less than the value of;
        • commit wait: the coordinator leader ensures that clients cannot see any data committed by Ti until TT.after(s_i) is true;
      • serving reads at a timestamp:
        • every replica tracks a value called safe time t_safe, which is the maximum timestamp at which a replica is up-to-date;
      • assigning timestamps to RO transactions:
        • two-phase transaction: timestamp s_read assigning and execute snapshot reads at s_read;
    • Details:

      • read-write transactions:
        • issues read to the leader replica of the appropriate group;
        • wound-wait read recent data with timestamps;
        • keep-alive messages to the leader to maintain locks;
        • a Paxos algorithm with TrueTime to enforce consistency;
        • buffered writes until two phase commit with the same timestamps on all participants;
      • read-only transactions:
        • scope required for read-only transactions, which summarizes the keys read in the transaction;
        • contact the leader when the scope’s values are served by one Paxos group;
          • S_read ensures that last write is returned;
        • multiple Paxos group read:
          • a round of communication with all the leaders;
          • just read S_read = to see the sufficiently up-to-date values;
      • schema-change transactions:
        • generally non-blocking variant of a standard transaction;
        • reserve a timestamp in the future and block the transactions with bigger timestamps behind the schema changes;
      • refinements:
        • lock with metadata to prevent false conflicts;
        • use leader-lease interval for snapshot read after a Paxos write;
  5. Evaluation:

    • Microbenchmarks:

    • Availability:

    • TrueTime:

    • F1

  6. Related work:

  7. Future work:

  8. Conclusion:


Paper Review: Megastore Providing Scalable, Highly Available Storage for Interactive Services


Megastore is  Google’s solution for multi-datacenter storage, which layers on top pf Bigtable. It has a semi-relational data model using optimized Paxos for transactions within a entity group and asynchronous message queuing for transactions across entity groups. It has relatively low writing throughput with strong consistency (if using 2PC) and availability (works as long as majority of the replicas, including witness replicas, are still alive).

Strong points:

  1. Most criticisms Bigtable received are focusing on its NoSQL data model, which slows down the development and increases the cost of maintenance. Megastore maps its semi-relational data model into Bigtable to encounter some disadvantages we had in Bigtable.
  2. I found the variety of different read operations quite interesting: current read operation is a entity-scope read for committed writes; snapshot read operation is also entity-scope but can read uncommitted states; inconsistent read ignores the log and simply return the latest value which can save a lot of time. This kind of implementation could really make the applications on Megastore flexible in order to meet their different requirement on latency and consistency.
  3. Witness replica is a brilliant idea to enhance system durability without adding too much bandwidth consumption and storage space. They solve the cases where only a few typical replicas and any failover/network partition might end up blocking the Paxos. On the other hand, read-only replicas are also great for distributing data in multiple datacenters without slowing down the write Paxos.

Weak points:

  1. I understand that Paxos is one of the key features in Megastore to implement ACID transactions without heavyweight master or data loss on failure. But could this strategy end up with lower throughput and too many communication messages between replicas for the coordination? It seems to me that Paxos would introduce significantly more network-round-trip delays to a single transaction than any other replication strategies, even with the optimization they made on Paxos. This is also confirmed in the Spanner paper that write throughput is bad in Megastore.
  2. Entity groups could be considered as a logically hierarchical way to organize the data. Transactions within a entity group will be using Paxos while the cross-entity transactions are usually via asynchronous messaging. Paxos offers strong consistency but also leads to communication overhead in each operation while the asynchronous messaging is fast but lacks consistency guarantees. It works when our data model aligns with this kind of hierarchy, like Email and Blog where each user could be visualized as a natural entity and the cross-entity transactions are rare and not really require strong consistency but what if our data model is flat? Or our data model requires things to be store together for faster retrieval but on the other hand requires each entity to have high-level of consistency? Then the whole system could either be expensive due to the 2PC overhead for consistency or not consistent enough. I assume that Megastore has some assumptions on the data models prior to the design. (Also since the entities has very different sizes, could they become problematic during load balancing?);
  3. Megastore is built on Bigtable, which is using GFS, SSTable and Chubby as basic storage management and coordination. While the philosophy of decoupling and layer design is great for debugging, system analysis and faster development, the overhead caused by coordination is gonna be terrible。
  4. Queuing for transactions between multiple groups is asynchronous, which means that the consistency is not guaranteed in a limited period of time. On the other hand, buffer overflow or node failure could directly result in message loss and inconsistency is inevitable. The alternative of queuing is 2PC and it’s way more complex and the communication overhead is gonna hurt the performance. It’s always nice to have an alternative though.


Paper Notes:

  1. Introduction:
    • Since all the requirements like scalability, rapid development for users, low latency, consistency and highly availability are in fact conflicting with each other, Megastore picks a mid-ground between RDBMS and NoSQL:
      • datastore is partitioned and replicated with full ACID semantics but limited consistency guarantees;
      • traditional database features are supported if they can scale with tolerable latency limits and compatible with partitioning scheme;
    • Paxos is optimized in Megastore for low latency operations and used for variety of things including primary user data replication;
  2. Toward availability and scale:
    • Common strategies:
      • asynchronous master/slave (data loss on failures);
        • ACK at master and transmission at slaves in parallel;
        • risks downtime or data loss during failover to a slave;
        • requires a consensus protocol to mediate mastership;
      • synchronous master/slave (heavyweight master);
        • master waits on slaves before ACK;
        • master/slave failures need external detection;
      • optimistic replication (no ACID transaction):
        • any member of a homogeneous replica group can accept mutations;
        • asynchronously propagated through the group;
        • global mutation ordering is not known at commit time so transactions are impossible;
    • Enter Paxos:
      • any node can initiate reads and writes. Each log append blocks on acknowledgments from a majority of replicas, and the rest catch up as they are able;
      • multiple logs increase throughput (reducing possibility of distanced nodes using one log) and availability (operations won’t block when majority fails to ACK), each governing its own partition of the data set;
    • Entity groups:
      • data is partitioned into entity groups;
        • single-phase ACID transactions within an entity group via Paxos;
        • cross-entity transactions could be via expensive 2PC or asynchronous message queuing (looser consistency);
    • Physical layout:
      • each datacenter is a Bigtable instance;
      • minimize the latency by letting applications control data placement;
  3. A tour of Megastore:
    • API design:
      • normalized relational schemas are not used in Megastore  because:
        • high-volume interactive workloads benefit more from predicable performance than from an expressive query language;
        • reads dominates writes so it pays to move work from read time to write time;
        • storing and querying hierarchical data is straightforward in simple key-value data stores like Bigtable;
      • (I have no background on databases so I’ll leave this section later);
    • Data model:
      • entity group root tables and child tables;
      • each entity is mapped into a single Bigtable row;
      • local index is treated as separate indexes for each entity group;
      • global index spans entity groups, used to find entities without knowing in advance the entity groups that contain them;
      • storing clause for faster access at read time;
      • mapping to Bigtable:
        • store root entity as a single row, which allows atomically update;
    • Transactions and concurrency control:
      • store multiple values in same row/column with different timestamps;
      • reads and writes don’t block each other;
      • Megastore provides current, snapshot and inconsistent reads;
      • write operations life cycle: read, application logic, commit, apply, clean up. Note that only one write wins in one Paxos;
    • Other features:
      • periodical full snapshot;
      • optional data encryption;
  4. Replication:
    • Current reads guarantee:
      • a read always observes the last-acknowledged write;
      • after a write has been observed, all future reads observe that write;
    • Brief summary of Paxos:
      • majority of replicas must be active to proceed;
      • use Paxos to replicate a transaction log and positions in the log;
    • Master-based approaches:
      • writes are reduced to single round of communication;
      • writes can be batched together to improve the throughput;
      • master failover can result in user-visible outages;
    • Megastore’s approach:
      • fast reads:
        • local read is allowed with the help of coordinator;
        • coordinator at each datacenter tracks a set of entity groups;
      • fast writes:
        • master-based system using leaders;
        • closest replica as leader;
      • replica types:
        • witness replica vote in Paxos and store the write-ahead log without applying the log or storing the data;
        • read-only replicas only stores data without voting; distribute the data without adding any write latency;
    • Data structure and algorithms:
      • out-of-order proposals are acceptable;
      • catchup when a replica was found out of date during read;
      • read: query local, find position, catchup, validate, query data;
      • write: accept leader, prepare, accept, invalidate, apply;
    • Coordinator availability:
      • failure detection:
        • use out-of-band protocol to detect coordinator failures;
        • coordinators obtain Chubby locks;
        • coordinator failure will be handled(reconstructed) quickly without affecting read/write;


Paper Review: Bigtable A Distributed Storage System for Structured Data

Paper review:

This paper is about a data storage system build upon google’s own file system GFS and Paxos-based coordinator Chubby. It offers flexible storage types with great scalabilty and availability. Some of the optimizations like prefetching and multi-level caching are really impressive and useful.

strong points:

  1. just like GFS, clients are communicating directly with tablet servers for read/write operations. This could help the system to overcome the bottleneck effect brought by the centralized coordination of the master.
  2. There are two different layers of centralization in this system and they are all handled pretty well in order to survive failures. The master of tablet servers is supported by Chubby lock service which ensures that there is one master at any time to eliminate the possibility of inconsistency. The other master is located in GFS and is backup-ed periodically.
  3. The client-side prefetching more than one tablet when it reads from METADATA is brilliant. It reminds me of the implementation of computer memory. By pre-read multiple tablets, the clients are much less likely to refer to the METADATA again. Since the referring is expensive because of the network round-trip time and the limitation of the centralized metadata, we better think of ways to reduce it. Another way I was thinking about is to have a metadata cache between clients and servers. It serves just like a local DNS server. It fetches tablet row keys for clients and caches the “hot” or adjacent rows for later usage. That way clients will be referring to the cache server first before they make it to the metadata. This will save us a lot of time and resource if multiple clients are likely to use or reuse the same chunk of table. A downside of my solution is definitely the network bottleneck for the cache server since the overall bandwidth is under-utilized.  Also if clients are using completely different tables then this will only result-in one more round-trip time.

weak points:

  1. while the usage of different systems and applications (GFS, SSTable, Chubby) decouples different layers and aspects of Bigtable (GFS is the low level file storage solution, SSTable is the actually data structure, Chubby is responsible for metadata, cluster management and the stats monitoring), the interactions of systems could lead to overhead and complexity for maintenance. I wonder if it’s gonna be more efficient in performance if we build Bigtable like some extra feature of Google File System instead of combining a bunch of underlying systems together. For example, one of the most obvious performance bottleneck would be the number of tablet servers. Each tablet server interact with GFS independently as a client and as the number of tablet servers grow, the performance of GFS drops due to coordination overhead. What if the system is not build in different layers and each tablet server has their own storage and distributed replicas?
  2. memtable is a great design which offers a buffer contains recent mutations for data. However, the paper doesn’t specify the where the memtable is, so I’m going to assume it’s located in each tablet server because the master will have a lot to handle if it maintains the memtable for all. So there comes the problem, if a tablet server crushes with a full memtable, then all those mutations will be lost since memtables are stored in memory without any forms of backup. This could result in situations like users find data unchanged even if the mutation operation already finished.
  3.  compaction is used for faster transfer of a source tablet server to the target. The compaction is completed within two states to make sure that during the first phase compacting, the source server is still available for serving this tablet. However, the source tablet server could be really heavy-loaded with a lot of operations going on and the computation might be slower but the target is very likely to be light in load. So why don’t we just leave the compaction to the target server since the computation amount won’t be any different.
  4. Bigtable is supported by Google File System and GFS has its own mechanism to replicate data to handle occasional (or is it) node failure. However, in Bigtable it’s specified that one tablet is only stored in one tablet server (in 5.2 tablet assignment). So I’m not sure where they store the replications. All the tablets must be replicated since the master can reassign them upon node failure. If there are replications, how do they handle the consistency issue? Does the assigned tablet server act like a master/leader?  (So GFS is acting like a disk underlying Bigtable and all servers (master and slaves) have access to. The block assignment is more of a way to “let this tablet server handle request from index A to B” rather than store some data locally in that server. At least I think it is.)

Paper Outline:

  1. Introduction:

    • Goals achieved by Bigtable:

      • wide applicability;
      • scalability;
      • high performance;
      • high availability;
    • Bigtable resembles a database:

      • but provides a different interface;
      • does not support a full relational data model;
      • simple data model which supports dynamic control;
      • treats data like uninterpreted strings;
  2. Data model:

    • Bigtable is a sparse, distributed, persistent multidimensional sorted map;

    • Rows:

      • up to 64 KB in size;
      • each r/w operation on a row is atomic;
        • regardless of the number of columns it accesses;
      • row range:
        • dynamically allocated;
        • called tablet, the unit of distribution and load balancing;
        • read in shorter range will require communication with less machines;
          • URLs are stored reversely to group the related content together;
    • Column families

      • column keys are grouped into sets called column family;
      • column family is the basic unit of access control;
        • family is required before any data is stored under any column key;
      • data stored in the same column family is usually the same type;
      • column key:
        • unlike column family names, it doesn’t have to be printable;
    • Timestamps:

      • timestamps identify the same data of different versions in the cell;
      • 64-bit integer:
        • represent real-time in microseconds by default;
        • or any other client-explicit version representation;
        • must be unique and decreasing order so that the recent version can be read first;
      • Two settings for garbage collection:
        • only the last n versions;
        • only new-enough versions;
  3. API:

    • Provides functions for creating and deleting tables and column families;

    • Provides functions for changing cluster, table and column family metadata;

      • such as row access right;
    • Other features that allow the user to manipulate the data in more complex way:

      • supports single-row transactions;
      • allows cells to be used as integer counters;
      • supports the execution of client-supplied scripts in the address spaces of the servers;
        • in a special language developed by Google called Sawzall;
    • Can be used with MapReduce;

  4. Building blocks:

    • Uses the distributed Google File System to store logs and data files;

    • Depends on a cluster management system for scheduling jobs, managing failures  and monitoring machine stats;

      • because it runs a shared pools of machines;
    • SSTable file format is used internally to store Bigtable data:

      • provides a persistent, ordered immutable map for from keys to values;
      • SSTable contains a sequence of blocks:
        • blocks are located with indexes loaded into memory when opened;
        • binary search in-memory index and then read the data from disk;
        • optionally, SSTable can be completed mapped into memory;
    • Relies on a highly-available and persistent distributed lock service Chubby;

      • a Chubby service consists of five active replicas;
        • one of the replicas is the master that serves requests;
        • the system is active when the majority of the machines are running;
        • uses Paxos algorithm to keep its replicas consistent;
      • read/write operations are atomic;
      • the Chubby client library provides caching of Chubby files;
      • each client maintains a Chubby service session;
      • Bigtable uses Chubby for variety of tasks:
        • ensure there is one master at most at any time;
        • store the bootstrap location Bigtable data;
        • discover table services and finalize tablet server deaths;
        • store Bigtable schema information;
        • store access control lists;
  5. Implementation:

    • Three major components:

      • a library that is linked into every client;
      • one lightly loaded master server;
        • responsible for assigning tablets to tablet servers;
        • responsible for detecting the addition/expiration of tablet servers;
        • balancing load of tablet servers;
        • garbage collection of files in Google File System;
        • handles schema changes;
      • many tablet servers;
        • could be dynamically added or removed;
        • handles a set of tablets;
        • handles read and write requests;
        • splits tablets that has grown too large;
          • maintaining a size of 100-200 MB;
        • clients communicate directly with tablet servers;
          • most clients never communicate with the master;
    • Tablet location:

      • three-level hierarchy to store the tablet location information:
        • first level: a Chubby file contains location of root tablet:
          • root tablet contains the location of all tablets;
          • root tablet is the 1st METADATA tablet;
          • never split to make sure of the three-level structure;
        • second level: rest of the METADATA tablets:
          • stores the location of a tablet under a row key;
          • stores secondary information for debugging and analysis;
      • localizing:
        • client library caches tablet locations;
        • moves to the hierarchy if doesn’t know the correct location;
          • requires three network round-trips if the cache is empty;
          • at most six network round-trips if the cache is stale;
          • client library prefetch more than one line to reduce access cost;
    • Tablet assignment:

      • each tablet is assigned to one tablet server at a time;
      • the master keeps track of the set of love tablet servers and the current assignment of tablets to tablet servers;
      • Bigtable uses Chubby to keep track of tablet servers:
        • acquires an exclusive lock when a tablet server starts up;
        • the master monitors the lock directory and discovers tablet server;
        • a tablet server will lose its lock due to network partition;
        • reacquire a lock as long as the file exists;
      • master:
        • responsible for detecting the status of tablet servers:
          • periodically asking for lock status;
        • acquire a exclusive lock if the tablet server expires or is unavailable:
          • delete the file;
          • reassign the tablet;
        • kills itself if its Chubby session expires:
          • this doesn’t change the assignment of tablets to tablet servers;
        • master start-up:
          • grabs master lock in Chubby;
          • scans the servers directory in Chubby to find live servers;
          • contact with all live tablet servers for the tablet assignments;
          • scans(or add) METADATA table to learn the the set of tables;
        • table changes:
          • master initiate add/delete and merge operations to tables;
          • tablet servers initiate split operations:
            • commit the split by recording information for the new tablet in the METADATA table;
            • notifies master;
    • Tablet serving:

      • persistent state of tablet stored in GFS;
      • updates are stored in a commit log:
        • recent ones are kept in memory buffer called a memtable;
        • older ones are stored in a sequence of SSTable;
      • write operation:
        • check for validity and authorization;
        • mutation is written to commit log;
          • group commit improves the throughput of small mutations;
        • after all the writes has been committed, its contents are inserted into memtable;
      • read operation:
        • check for validity and authorization;
        • merge SSTables and memtable;
          • lexicographically sorted so it’s efficient;
    • Compaction:

      • when the memtable reaches its maximum size, it freezes and a new memtable is created. And the old one will be converted to SSTable in GFS.
        • shrinks the memory usage of the tablet server;
        • reduces the amount of data for recovery;
      • small SSTables converted from memtable will be merged periodically;
        • called major compaction;
        • product of major compaction contains no deletion info or data;
  6. Refinements

    • Locality groups:

      • clients group multiple column families together into a locality group;
      • generates a separate SSTable;
    • Compression:

      • clients can decide whether a SSTable for locality group is compressed;
      • as well as the compression format;
    • Caching for read performance:

      • two levels of cache in tablet servers;
        • scan cache is higher-level, which caches the KV pairs returned by SSTable interface to the tablet server;
        • block cache is lower-level, which caches the SSTable blocks read from GFS;
    • Bloom filters:

      • allows us to ask whether an SSTable might contain any data for a specified row/column pair;
    • Commit-log implementation:

      • append mutations to a single commit log per tablet server;
      • boost performance during normal operation;
      • but slows down recovery;
        • sort the log to avoid duplicate reads;
        • two logs to avoid blocking;
    • Speeding up tablet recovery:

      • minor compaction;
    • Exploiting immutability:

      • differentiate mutable and immutable data;
  7. Performance evaluation;


Paper Review: MapReduce Simplified Data Processing on Large Clusters

Paper review:

MapReduce is a distributed data processing approach. One machine acts as master and assign map/reduce tasks to all the machines in the cluster. Map generated intermediate k/v pairs and feed to the reduce workers using underlying filesystem. Reduce workers will merge the data with the same keys and return multiple output file (in the number same as reduce tasks) to the user.

Strong points:

  1. The system is inspired from map and reduce operations from functional programming languages, which is very much different from the way I thought about distributed data processing. This approach divided the programs into two smaller pieces. This division is easy to do in some situations and very important for the task distribution.
  2. The master in this system has relatively light-weighted task to do than workers. It only has to maintain very small amount of data and little computation as well. This means that one master could be used to support a large amount of workers.
  3. The output of distributed computation is guaranteed to be the same as a non-faulty sequential execution of the program when the map and/or reduce operators are non-deterministic. This is ensured by the master control as well as the underlying filesystem. I’m not sure about other data processing models (like spark) but this seems like a strong and necessary point to me.
  4. All the refinements in section 4 are quite useful in real world engineering. I can only imagine people get saved by bad records skipping and debug with local execution all the time.

Weak points:

  1. Each idle worker machine will be assigned with map or reduce task which means that one machine can only do a certain kind of job during a single MapReduce function. Does this mean that reduce workers kinda have to wait for the map workers to finish first? Also the map workers might be idle when reduce workers are busy. Even with the best coordination, we would still waiting time and wasted computing resources. Moreover, reduce workers have to load data from the disk of map workers.
  2. From the parallel programming’s point of view, if one worker node is slow but still in-progress (which means that it could be assigned with work), it will slow down the whole MapReduce process. Lol I wrote this down long before I read the section 3.6 and they think it’s one of the common causes that lengthens the processing time and call it straggler. They have a solution that’s simple and practical so I guess this one should go to the strong points.
  3. If the Map task splits are relatively the same in terms of computation and we have a bunch of node with same hardware and configuration. Map tasks running on different machines are very likely to be finished at the exact same time and this will cause the network bandwidth consumption to be extremely bursty since all the intermediate data will be moved from M workers to R workers using the same network media in the data center. It will significantly slows down the data transfer and the total running time.
  4. Based on the previous three points, why don’t we divide the input into N splits which is hundreds times of the number of machines in the cluster and then assign the inputs to workers with both M and R tasks? In that way if a node is slow, it would be assigned withe less splits and there’s virtually no waiting time for intermediate data transfer as well as waste of computation resource.
  5. MapReduce operation will be aborted if master fails. Even though it’s unlikely to happen considering there is only one master machine, the failure of master might cause huge loss on the computation progress. I don’t think it’s hard to keep a replica of some sort in different network and power supply. The replica serves as data backups under normal circumstances but will act as master if the original one fails. However, this will also bring overheads to master operations since we have to backup the computation states and other master data structures constantly before commit. I guess it’s more of a trade-off between reliability and performance than a weak point.
  6. MapReduce is not really flexible enough for all the big data processing situations. For example, there are cases when my operations couldn’t be expressed in the way of map and reduce. What if we have a huge data set that is used by all but in different computation method (in this case the data passed into each computer should be the same but the program tasks are not)? MapReduce has a rigid way of seeing programs/functions/tasks as a completely different thing from data. Can I use lambda calculus and see data and function as the same thing in distributed computing? That way the computation will no longer be a constraint of distribution. I can pass functions and/or data to all the workers with coordination and load-balancing without worrying about the trouble to figure out the map and reduce functions.


Paper Outline:

  1. Introduction:

    • Straightforward computation but large data;
    • Inspired from Lisp and other functional languages:
      • map: compute a set of intermediate k/v pairs;
      • reduce: merge all intermediate values with the same key;
  2. Programming model:

    • Example:

      • map takes a document as (name, content) pair and return a list of (word, 1) pairs to the reduce function;
      • reduce takes intermediate pairs with the same keys, which is word in this case, and increment to the total count of that word;
    • Types:

      • map: (k1, v1) -> list(k2, v2);
      • reduce: (k2, list(v2)) -> list(v2);
      • input k/v are drawn from a different domain than the output k/v;
      • intermediate k/v are from the same domain as output k/v;
    • More examples:

      • distributed grep;
      • count for URL access frequency;
      • reverse web-link graph;
      • term-vector per host;
      • inverted index;
      • distributed sort;
  3. Implementation:

    • Environment:

      • a cluster with thousands with commodity machine connected with commodity networking hardware;
      • supported with distributed file system using replication to provide availability and reliability;
      • a scheduling system takes orders from users;
    • Execution overview:

      • input will be divided (in parallel by different machines) into M splits, with a typical size of 16 to 64 MB per split;
      • one of the program copy acts as master and assign work to the workers. It picks idle workers and assigns each one a M or R task;
      • the workers with M task read the content of corresponding input splits and buffer the intermediate output k/v pairs in the memory;
      • the buffered pairs are saved to local disks periodically, partitioned into R regions. The locations of those pairs are passed to the master to perform R tasking assignment;
      • reduce workers are notified by the master about the data locations. They read the buffered data from the local disks of map workers;
      • reduce workers sorts the data so that the pairs with the same keys are grouped together. Sorting is necessary to reduce the workload and will be external if needed;
      • reduce workers iterate though the data and append the reduce function output to a final output;
      • master wakes up user program and returns;
    • Master data structures:

      • state (idle, in-progress or completed) and identities of the workers;
      • locations of intermediate file regions;
    • Fault tolerance:

      • worker failure:
        • detected with periodical ping from the master;
        • any finished tasks will be reassigned to other nodes upon failure;
        • finished map tasks will be reassigned to other nodes upon failure;
          • because the intermediate data is stored in local disk and therefore inaccessible. On the other hand, completed reduce tasks output will be stored globally;
          • all R workers will be notified about the reassignment;
      • master failure:
        • periodic checkpoints of the master data structures;
        • computation will be aborted if master fails;
          • highly unlikely for single master machine;
      • semantics in the presence of failures:
        • same as sequential execution if M/R is deterministic because:
          • atomic commits of map and reduce task output;
          • master ignores duplicated commit for map operations;
          • underlying file system guarantees to eliminate duplicated reduce operation output;
        • weak but reasonable semantics if M/R is non-deterministic;
          • due to different execution orderings;
    • Locality:

      • input data is stored on the local disks of the machines in the cluster;
      • files are divided into blocks of 64 MB and stores multiple copies (typically three) over the cluster;
      • master will try to save bandwidth by:
        • assign machines with data replications as M workers;
        • assign machines near data replicas as M workers;
    • Task granularity

      • the number of M and R tasks should be significantly bigger than the number of machines to improve dynamic load balancing and speed up recovery upon node failure;
      • O(M+R) scheduling decisions and O(M*R) task states should be handled by the master machine;
      • R shouldn’t be too large otherwise it might result in fragmentation of the output file;
    • Backup tools:

      • straggler: a machine which takes too long to complete the task;
      • the master perform reassignment of the in-progress tasks when it comes close to the end of MapReduce operation;
      • little overhead in theory but works in large MapReduce operations;
  4. Refinements:

    • Partitioning function:

      • “hash(key) mod R” as partitioning function;
      • other ones with special needs;
    • Ordering guarantees:

      • within a given partition, intermediate k/v pairs are processed in increasing key order;
        • easier to sort output file per partition;
    • Combiner function:

      • combiner function can partially merge intermediate data;
        • executed on each machine that performs a map task;
        • typically the same as reduce function;
    • Input and output types:

      • user can make their own support for input type by providing reader interface;
    • Side-effects:

      • no support for multiple output files from a single task;
    • Skipping bad records:

      • optional mode of execution to ignore bad records and move on;
    • Local execution;

    • Status Information;

    • Counters;



Paper Review: The Google File System

Paper Review:

The Google File System supports common filesystem APIs along with append operation. It ensures the data redundancy and integrity by maintaining several replicas in chunkservers, which are coordinated by a single master machine. The master machine backups all the operations, logs and checkpoints regularly locally and remotely.

Strong points:

  1. Separating data flow with control during replication is brilliant because the control flow has a centralized structure (star structure from master to primary to secondary replicas in this case) and the data flow is much bigger in the size and should be optimized by using chain/pipeline structure in order to make full use of the network bandwidth;
  2. The concept of master replication and shadow master is great. It has really high-level of security and redundancy. Local backup for master data is simply not enough for critical metadata and it handles centralized master failure nicely. Moreover, they minimize the metadata to make the replication more efficient;
  3. The simplicity (at least on the glimpse of the overall structure)  of the design is just stunning. There’s no complex algorithms/solutions in this filesystem but yet each possible problem is handled elegantly. For example, I can imagine how much trouble a fully distributed filesystem like xFS would have without centralized server for coordination.

Weak points:

  1. There are a lot of assumptions about the data they are going to handle on this particular file system. In order to optimize the system, they mainly focus on large files and append operations, which means the small files and random writes are not well-supported in GFS.
  2. In the construction of chain-structured data flow during replication, “finding the closest IP” is not a sophisticated way to construct the chain of servers. Close IP address doesn’t always mean close in network and surely doesn’t imply faster data transfer. I guess I will choose multi-casting supported by routers to transfer data with in the same data center and use a better network distance estimation (like historical latency and transfer speed) for data transfer between different data centers.
  3. Bottleneck effect of the master
    • clients have to communicate with master before data access;
    • in-memory data structure is of limited size;
    • master failure do effect the system performance even with shadow masters;
  4. Since we already have so many machines for the master replication, why don’t we make master distributed a little? For example, clients with read operation request can ask the master replicas for the primary instead of asking the master to utilize the bandwidth.
  5. The client will repeat the mutation operation if any replica fails, which means that all the data will be transferred again, which consumes time and bandwidth and causes duplicate record. Can primary replica be responsible if any secondary failures? Say we have 3 replicas: P for primary and S1, S2 for secondary. In normal situation, P send control message to S1 and S2 and data flows from P to S1 then to S2. If S2 fails, P will be responsible to send S1 a message to retry the data transfer from S1 to S2 for several attempts. This could save the bandwidth and avoid duplication as well. (without the duplication, all the replicas would be identical and checksum is gonna be easier with higher level of data integrity)
  6. Stale replicas will be detected during master’s regular scan and removed in the garbage collection phase. However, since most files are append-only and we have the version number, stale replicas could be reused easily to save the resource for re-replication.
  7. Concurrent write operations are not defined (maybe solve this with an extra layer between clients and the master? So that concurrent operations could be rearranged in that extra layer. So it could be like there is only one client with serial operations)
  8. What if master and primary are both stale? In that case the client will accept the stale replication. I think a Paxos-like voting phase before master replies to the client would solve this case. Actually no, master and primary will never be both stale. The master will update all the metadata upon the startup. If the master doesn’t crush, it will always have the up-to-date version number.
  9. Checksum in each chunkserver is definitely necessary but isn’t it overhead if we are doing the checksumming upon every request? Say if there is storage hotspot with only one chunk of data, the chunkserver will keep on checking for integrity over and over again. Since we don’t have to worry too much about the stale data (because of version number), we can check the integrity when the data is read for the first time then continue without checksumming because this part is already verified. As long as it has the latest version number, it must be correct. Actually no. Hotspot is a rare situation and data corruption might still happen after it was stored into the disk (I guess). Since the checksumming has little effect I guess we could just leave it there.


Paper Outline:

  1. Intro to the problem:

    • Component failures are norm rather than exceptions;
    • Files are huge by traditional standards;
    • Files are mutated by appending rather than overwriting;
    • Co-designing the applications and FS APIs benefits the overall system;
  2. Design overview:

    • Interface:

      • create, delete, open, close, read, write;
      • snapshot, record append;
    • Architecture:

      • master and multiple chunkservers structure;
      • master keeps all the file metadata;
      • masters periodically communicate with chunkservers with HeartBeat messages;
    • Single master:

      • sophisticated chunk placement and replication decision using global knowledge;
      • might become bottleneck of the system; should minimize the involvement of the master during r/w;
        • no more communication with master once chunk is located
        • multiple chunks in the same request;
    • Chunk size:

      • 64 MB in size;
      • pros:
        • reduce the client-master interaction;
        • reduce the network overhead by keeping persistent TCP connection;
        • reduce the size of metadata stored on the master;
      • cons:
        • wasting space (maybe?);
        • small files with less chunks may become hot spot (could be resolved with higher replication factor or allow clients to read data from other clients);
    • Metadata:

      • three major types:
        • the file and chunk namespace ( in memory. local disk and remote);
        • the mapping from files to chunks ( in memory. local disk and remote);
        • the location of each chunk replicas (in memory);
          • master does not store it persistently but ask chunkservers upon startups and other changes.
      • in-memory data structure
        • periodical fast scanning for the entire state (for garbage collection, re-replication upon chunkserver failures and chunk migration);
        • the system capacity is  limited by the master’s memory;
          • not a serious issue because memory consumption is small and extra memories are cheap.
      • chunk locations:
        • obtained and updated with monitoring all the chunkservers’ states with regular HeartBeat messages;
        • chunkservers have final words about the chunk locations so there’s no point trying to maintain a constant view on the master;
      • operation log:
        • historical record of critical metadata changes;
        • defines the order of concurrent operations with logical time;
        • local and remote replications:
          • flushes the log replications before answering to clients;
          • batches several flush requests to reduce the throughput consumption;
        • master recovers by replaying the logs:
          • checkpoints its state when logs grow beyond certain size;
          • load the latest checkpoint and then replay the subsequent logs to reduce the startup time;
    • Consistent model:

      • Guarantees by GFS:
        • consistent: all clients will always see the same data regardless of which replicas they read from;
        • defined: consistent and all clients will always see what the mutation writes in its entirety;
        • serial successful write operation is defined (apply operations to replicas in the same order and detect/collect the outdated ones);
        • concurrent successful write operation is consistent but undefined;
        • record append is defined;
        • client might access stale data because chunk location is cached on the client side.
          • this is limited due to the cache size;
          • most of the files are append-only so stale replica usually returns a premature end of chunk;
        • data corruption is detected with regular handshakes and check-summing  between master and all chunkservers;
  3. System interactions:

    • Leases and mutation order:

      • master grants a chunk lease to one replica as primary;
      • primary defines the mutation order to all;
      • lease mechanism:
        • initial timeout of 60 secs;
        • could be extended indefinitely with HeartBeat messages;
        • could be revoked by master;
      • lease process:
        • client asks the master which chunkserver holds the lease;
        • master returns the primary and secondary replicas;
        • client pushes the data to all replicas in any order;
        • client sends write request to primary if all replicas ACked;
        • primary forwards to write request to all secondary replicas in the exact same serial order;
        • the secondary replicas reply to primary upon completion (nodes failure might result in inconsistency here. It is handled with few more attempts before falling back);
        • the primary replies to client (with errors or not);
      • a “large” write will be divided into small ones and could be interleaved with or overwritten by concurrent operations.
    • Data flow:

      • the control flow is from client to primary and then to secondaries; but the data is pushed linearly along a chain of chunkservers in a pipelined fashion (to fully utilize the bandwidth of each machine);
    • Atomic record appends:

      • only data is specified (not the offset) and GFS guarantees to append the data at least once;
      • primary replica is responsible to check if the appending operation might result in over-sized chunk;
        • If so, it pads the current chunk (and tell the secondary replicas to do the same) and then replies to the client.
      • the client retries the operation if any replica fails and this might cause duplicate record;
    • Snapshot:

      • master revokes leases to ensure that any subsequent writes will require an interaction the master;
      • master saves operations to local disk and then load this log record to its in-memory state;
      • the next request from clients will first ask the master about primary;
      • data is copied locally to create new chunks for the following operations (in order to save the backup data for last snapshot);
  4. Master operation:

    • Namespace management and locking:

      • locks are used to enable simultaneous master operations;
      • files are stored with full path-names and r/w operations will require all the locks along the directory;
      • it allows concurrent mutations in the same directory;
    • Replica placement

      • maximize the data reliability and availability;
      • maximize network bandwidth utilization;
    • Creation, re-replication, rebalancing:

      • factors to consider upon chunk creation:
        • on chunkservers with lower disk utilization;
        • limit the number of recent creation on each chunkserver;
        • spread replicas across racks;
      • re-replication priority factors:
        • how far it is from the re-replication goal;
        • chunks of live files over the recent deleted files;
        • chunks that are blocking client progress;
      • rebalancing happens periodically and gradually;
    • Garbage collection:

      • mechanism:
        • rename and hide the file;
        • remove the hidden file, orphan chunks and the metadata during master’s regular scan;
      • discussion:
        • garbage is easily identified with chunk handle mappings;
        • advantage over eager deletion:
          • simple and reliable in distributed system with constant failure;
          • merge storage reclamation into master regular activities to minimize the cost;
          • safety against accidental, irrevocably deletion;
    • Stale replica detection:

      • chunk version number is maintain in the master;
      • increase the chunk version number when new lease is granted;
      • stale replicas are removed during the garbage collection;
      • master will send the version number to the client along with the primary when the client is asking for a chunk;
  5. Fault tolerance and diagnosis

    • High availability:

      • fast recovery from normal/abnormal termination;
      • chunk replications;
      • master replication:
        • operation logs and checkpoints are replicated on multiple machines;
        • operations is committed only after it flushes to all replicas;
        • master could be replaced by the replicas during failure;
          • replicas serve as read-only shadow masters with the same canonical name as the original master;
    • Data integrity:

      • each chunkserver must independently verify the integrity:
        • comparing between chunkservers is impractical;
        • divergent replicas are legal (not guaranteed to be identical);
      • chunks are broken into 64 KB pieces with 32-bit long checksums;
        • checksums are part of metadata;
        • checksumming has little effect on read operations;
        • update the checksum for the last partial checksum block after append operations;
    • Diagnostic tools:

      • RPC logs with minimal impact on performance;



[1] Ghemawat, Sanjay, Howard Gobioff, and Shun-Tak Leung. “The Google File System.” Proceedings of the Nineteenth ACM Symposium on Operating Systems Principles – SOSP ’03 (2003). Web.