Paper Review: Bigtable A Distributed Storage System for Structured Data

Paper review:

This paper is about a data storage system build upon google’s own file system GFS and Paxos-based coordinator Chubby. It offers flexible storage types with great scalabilty and availability. Some of the optimizations like prefetching and multi-level caching are really impressive and useful.

strong points:

  1. just like GFS, clients are communicating directly with tablet servers for read/write operations. This could help the system to overcome the bottleneck effect brought by the centralized coordination of the master.
  2. There are two different layers of centralization in this system and they are all handled pretty well in order to survive failures. The master of tablet servers is supported by Chubby lock service which ensures that there is one master at any time to eliminate the possibility of inconsistency. The other master is located in GFS and is backup-ed periodically.
  3. The client-side prefetching more than one tablet when it reads from METADATA is brilliant. It reminds me of the implementation of computer memory. By pre-read multiple tablets, the clients are much less likely to refer to the METADATA again. Since the referring is expensive because of the network round-trip time and the limitation of the centralized metadata, we better think of ways to reduce it. Another way I was thinking about is to have a metadata cache between clients and servers. It serves just like a local DNS server. It fetches tablet row keys for clients and caches the “hot” or adjacent rows for later usage. That way clients will be referring to the cache server first before they make it to the metadata. This will save us a lot of time and resource if multiple clients are likely to use or reuse the same chunk of table. A downside of my solution is definitely the network bottleneck for the cache server since the overall bandwidth is under-utilized.  Also if clients are using completely different tables then this will only result-in one more round-trip time.

weak points:

  1. while the usage of different systems and applications (GFS, SSTable, Chubby) decouples different layers and aspects of Bigtable (GFS is the low level file storage solution, SSTable is the actually data structure, Chubby is responsible for metadata, cluster management and the stats monitoring), the interactions of systems could lead to overhead and complexity for maintenance. I wonder if it’s gonna be more efficient in performance if we build Bigtable like some extra feature of Google File System instead of combining a bunch of underlying systems together. For example, one of the most obvious performance bottleneck would be the number of tablet servers. Each tablet server interact with GFS independently as a client and as the number of tablet servers grow, the performance of GFS drops due to coordination overhead. What if the system is not build in different layers and each tablet server has their own storage and distributed replicas?
  2. memtable is a great design which offers a buffer contains recent mutations for data. However, the paper doesn’t specify the where the memtable is, so I’m going to assume it’s located in each tablet server because the master will have a lot to handle if it maintains the memtable for all. So there comes the problem, if a tablet server crushes with a full memtable, then all those mutations will be lost since memtables are stored in memory without any forms of backup. This could result in situations like users find data unchanged even if the mutation operation already finished.
  3.  compaction is used for faster transfer of a source tablet server to the target. The compaction is completed within two states to make sure that during the first phase compacting, the source server is still available for serving this tablet. However, the source tablet server could be really heavy-loaded with a lot of operations going on and the computation might be slower but the target is very likely to be light in load. So why don’t we just leave the compaction to the target server since the computation amount won’t be any different.
  4. Bigtable is supported by Google File System and GFS has its own mechanism to replicate data to handle occasional (or is it) node failure. However, in Bigtable it’s specified that one tablet is only stored in one tablet server (in 5.2 tablet assignment). So I’m not sure where they store the replications. All the tablets must be replicated since the master can reassign them upon node failure. If there are replications, how do they handle the consistency issue? Does the assigned tablet server act like a master/leader?  (So GFS is acting like a disk underlying Bigtable and all servers (master and slaves) have access to. The block assignment is more of a way to “let this tablet server handle request from index A to B” rather than store some data locally in that server. At least I think it is.)

Paper Outline:

  1. Introduction:

    • Goals achieved by Bigtable:

      • wide applicability;
      • scalability;
      • high performance;
      • high availability;
    • Bigtable resembles a database:

      • but provides a different interface;
      • does not support a full relational data model;
      • simple data model which supports dynamic control;
      • treats data like uninterpreted strings;
  2. Data model:

    • Bigtable is a sparse, distributed, persistent multidimensional sorted map;

    • Rows:

      • up to 64 KB in size;
      • each r/w operation on a row is atomic;
        • regardless of the number of columns it accesses;
      • row range:
        • dynamically allocated;
        • called tablet, the unit of distribution and load balancing;
        • read in shorter range will require communication with less machines;
          • URLs are stored reversely to group the related content together;
    • Column families

      • column keys are grouped into sets called column family;
      • column family is the basic unit of access control;
        • family is required before any data is stored under any column key;
      • data stored in the same column family is usually the same type;
      • column key:
        • unlike column family names, it doesn’t have to be printable;
    • Timestamps:

      • timestamps identify the same data of different versions in the cell;
      • 64-bit integer:
        • represent real-time in microseconds by default;
        • or any other client-explicit version representation;
        • must be unique and decreasing order so that the recent version can be read first;
      • Two settings for garbage collection:
        • only the last n versions;
        • only new-enough versions;
  3. API:

    • Provides functions for creating and deleting tables and column families;

    • Provides functions for changing cluster, table and column family metadata;

      • such as row access right;
    • Other features that allow the user to manipulate the data in more complex way:

      • supports single-row transactions;
      • allows cells to be used as integer counters;
      • supports the execution of client-supplied scripts in the address spaces of the servers;
        • in a special language developed by Google called Sawzall;
    • Can be used with MapReduce;

  4. Building blocks:

    • Uses the distributed Google File System to store logs and data files;

    • Depends on a cluster management system for scheduling jobs, managing failures  and monitoring machine stats;

      • because it runs a shared pools of machines;
    • SSTable file format is used internally to store Bigtable data:

      • provides a persistent, ordered immutable map for from keys to values;
      • SSTable contains a sequence of blocks:
        • blocks are located with indexes loaded into memory when opened;
        • binary search in-memory index and then read the data from disk;
        • optionally, SSTable can be completed mapped into memory;
    • Relies on a highly-available and persistent distributed lock service Chubby;

      • a Chubby service consists of five active replicas;
        • one of the replicas is the master that serves requests;
        • the system is active when the majority of the machines are running;
        • uses Paxos algorithm to keep its replicas consistent;
      • read/write operations are atomic;
      • the Chubby client library provides caching of Chubby files;
      • each client maintains a Chubby service session;
      • Bigtable uses Chubby for variety of tasks:
        • ensure there is one master at most at any time;
        • store the bootstrap location Bigtable data;
        • discover table services and finalize tablet server deaths;
        • store Bigtable schema information;
        • store access control lists;
  5. Implementation:

    • Three major components:

      • a library that is linked into every client;
      • one lightly loaded master server;
        • responsible for assigning tablets to tablet servers;
        • responsible for detecting the addition/expiration of tablet servers;
        • balancing load of tablet servers;
        • garbage collection of files in Google File System;
        • handles schema changes;
      • many tablet servers;
        • could be dynamically added or removed;
        • handles a set of tablets;
        • handles read and write requests;
        • splits tablets that has grown too large;
          • maintaining a size of 100-200 MB;
        • clients communicate directly with tablet servers;
          • most clients never communicate with the master;
    • Tablet location:

      • three-level hierarchy to store the tablet location information:
        • first level: a Chubby file contains location of root tablet:
          • root tablet contains the location of all tablets;
          • root tablet is the 1st METADATA tablet;
          • never split to make sure of the three-level structure;
        • second level: rest of the METADATA tablets:
          • stores the location of a tablet under a row key;
          • stores secondary information for debugging and analysis;
      • localizing:
        • client library caches tablet locations;
        • moves to the hierarchy if doesn’t know the correct location;
          • requires three network round-trips if the cache is empty;
          • at most six network round-trips if the cache is stale;
          • client library prefetch more than one line to reduce access cost;
    • Tablet assignment:

      • each tablet is assigned to one tablet server at a time;
      • the master keeps track of the set of love tablet servers and the current assignment of tablets to tablet servers;
      • Bigtable uses Chubby to keep track of tablet servers:
        • acquires an exclusive lock when a tablet server starts up;
        • the master monitors the lock directory and discovers tablet server;
        • a tablet server will lose its lock due to network partition;
        • reacquire a lock as long as the file exists;
      • master:
        • responsible for detecting the status of tablet servers:
          • periodically asking for lock status;
        • acquire a exclusive lock if the tablet server expires or is unavailable:
          • delete the file;
          • reassign the tablet;
        • kills itself if its Chubby session expires:
          • this doesn’t change the assignment of tablets to tablet servers;
        • master start-up:
          • grabs master lock in Chubby;
          • scans the servers directory in Chubby to find live servers;
          • contact with all live tablet servers for the tablet assignments;
          • scans(or add) METADATA table to learn the the set of tables;
        • table changes:
          • master initiate add/delete and merge operations to tables;
          • tablet servers initiate split operations:
            • commit the split by recording information for the new tablet in the METADATA table;
            • notifies master;
    • Tablet serving:

      • persistent state of tablet stored in GFS;
      • updates are stored in a commit log:
        • recent ones are kept in memory buffer called a memtable;
        • older ones are stored in a sequence of SSTable;
      • write operation:
        • check for validity and authorization;
        • mutation is written to commit log;
          • group commit improves the throughput of small mutations;
        • after all the writes has been committed, its contents are inserted into memtable;
      • read operation:
        • check for validity and authorization;
        • merge SSTables and memtable;
          • lexicographically sorted so it’s efficient;
    • Compaction:

      • when the memtable reaches its maximum size, it freezes and a new memtable is created. And the old one will be converted to SSTable in GFS.
        • shrinks the memory usage of the tablet server;
        • reduces the amount of data for recovery;
      • small SSTables converted from memtable will be merged periodically;
        • called major compaction;
        • product of major compaction contains no deletion info or data;
  6. Refinements

    • Locality groups:

      • clients group multiple column families together into a locality group;
      • generates a separate SSTable;
    • Compression:

      • clients can decide whether a SSTable for locality group is compressed;
      • as well as the compression format;
    • Caching for read performance:

      • two levels of cache in tablet servers;
        • scan cache is higher-level, which caches the KV pairs returned by SSTable interface to the tablet server;
        • block cache is lower-level, which caches the SSTable blocks read from GFS;
    • Bloom filters:

      • allows us to ask whether an SSTable might contain any data for a specified row/column pair;
    • Commit-log implementation:

      • append mutations to a single commit log per tablet server;
      • boost performance during normal operation;
      • but slows down recovery;
        • sort the log to avoid duplicate reads;
        • two logs to avoid blocking;
    • Speeding up tablet recovery:

      • minor compaction;
    • Exploiting immutability:

      • differentiate mutable and immutable data;
  7. Performance evaluation;



Paper Review: MapReduce Simplified Data Processing on Large Clusters

Paper review:

MapReduce is a distributed data processing approach. One machine acts as master and assign map/reduce tasks to all the machines in the cluster. Map generated intermediate k/v pairs and feed to the reduce workers using underlying filesystem. Reduce workers will merge the data with the same keys and return multiple output file (in the number same as reduce tasks) to the user.

Strong points:

  1. The system is inspired from map and reduce operations from functional programming languages, which is very much different from the way I thought about distributed data processing. This approach divided the programs into two smaller pieces. This division is easy to do in some situations and very important for the task distribution.
  2. The master in this system has relatively light-weighted task to do than workers. It only has to maintain very small amount of data and little computation as well. This means that one master could be used to support a large amount of workers.
  3. The output of distributed computation is guaranteed to be the same as a non-faulty sequential execution of the program when the map and/or reduce operators are non-deterministic. This is ensured by the master control as well as the underlying filesystem. I’m not sure about other data processing models (like spark) but this seems like a strong and necessary point to me.
  4. All the refinements in section 4 are quite useful in real world engineering. I can only imagine people get saved by bad records skipping and debug with local execution all the time.

Weak points:

  1. Each idle worker machine will be assigned with map or reduce task which means that one machine can only do a certain kind of job during a single MapReduce function. Does this mean that reduce workers kinda have to wait for the map workers to finish first? Also the map workers might be idle when reduce workers are busy. Even with the best coordination, we would still waiting time and wasted computing resources. Moreover, reduce workers have to load data from the disk of map workers.
  2. From the parallel programming’s point of view, if one worker node is slow but still in-progress (which means that it could be assigned with work), it will slow down the whole MapReduce process. Lol I wrote this down long before I read the section 3.6 and they think it’s one of the common causes that lengthens the processing time and call it straggler. They have a solution that’s simple and practical so I guess this one should go to the strong points.
  3. If the Map task splits are relatively the same in terms of computation and we have a bunch of node with same hardware and configuration. Map tasks running on different machines are very likely to be finished at the exact same time and this will cause the network bandwidth consumption to be extremely bursty since all the intermediate data will be moved from M workers to R workers using the same network media in the data center. It will significantly slows down the data transfer and the total running time.
  4. Based on the previous three points, why don’t we divide the input into N splits which is hundreds times of the number of machines in the cluster and then assign the inputs to workers with both M and R tasks? In that way if a node is slow, it would be assigned withe less splits and there’s virtually no waiting time for intermediate data transfer as well as waste of computation resource.
  5. MapReduce operation will be aborted if master fails. Even though it’s unlikely to happen considering there is only one master machine, the failure of master might cause huge loss on the computation progress. I don’t think it’s hard to keep a replica of some sort in different network and power supply. The replica serves as data backups under normal circumstances but will act as master if the original one fails. However, this will also bring overheads to master operations since we have to backup the computation states and other master data structures constantly before commit. I guess it’s more of a trade-off between reliability and performance than a weak point.
  6. MapReduce is not really flexible enough for all the big data processing situations. For example, there are cases when my operations couldn’t be expressed in the way of map and reduce. What if we have a huge data set that is used by all but in different computation method (in this case the data passed into each computer should be the same but the program tasks are not)? MapReduce has a rigid way of seeing programs/functions/tasks as a completely different thing from data. Can I use lambda calculus and see data and function as the same thing in distributed computing? That way the computation will no longer be a constraint of distribution. I can pass functions and/or data to all the workers with coordination and load-balancing without worrying about the trouble to figure out the map and reduce functions.


Paper Outline:

  1. Introduction:

    • Straightforward computation but large data;
    • Inspired from Lisp and other functional languages:
      • map: compute a set of intermediate k/v pairs;
      • reduce: merge all intermediate values with the same key;
  2. Programming model:

    • Example:

      • map takes a document as (name, content) pair and return a list of (word, 1) pairs to the reduce function;
      • reduce takes intermediate pairs with the same keys, which is word in this case, and increment to the total count of that word;
    • Types:

      • map: (k1, v1) -> list(k2, v2);
      • reduce: (k2, list(v2)) -> list(v2);
      • input k/v are drawn from a different domain than the output k/v;
      • intermediate k/v are from the same domain as output k/v;
    • More examples:

      • distributed grep;
      • count for URL access frequency;
      • reverse web-link graph;
      • term-vector per host;
      • inverted index;
      • distributed sort;
  3. Implementation:

    • Environment:

      • a cluster with thousands with commodity machine connected with commodity networking hardware;
      • supported with distributed file system using replication to provide availability and reliability;
      • a scheduling system takes orders from users;
    • Execution overview:

      • input will be divided (in parallel by different machines) into M splits, with a typical size of 16 to 64 MB per split;
      • one of the program copy acts as master and assign work to the workers. It picks idle workers and assigns each one a M or R task;
      • the workers with M task read the content of corresponding input splits and buffer the intermediate output k/v pairs in the memory;
      • the buffered pairs are saved to local disks periodically, partitioned into R regions. The locations of those pairs are passed to the master to perform R tasking assignment;
      • reduce workers are notified by the master about the data locations. They read the buffered data from the local disks of map workers;
      • reduce workers sorts the data so that the pairs with the same keys are grouped together. Sorting is necessary to reduce the workload and will be external if needed;
      • reduce workers iterate though the data and append the reduce function output to a final output;
      • master wakes up user program and returns;
    • Master data structures:

      • state (idle, in-progress or completed) and identities of the workers;
      • locations of intermediate file regions;
    • Fault tolerance:

      • worker failure:
        • detected with periodical ping from the master;
        • any finished tasks will be reassigned to other nodes upon failure;
        • finished map tasks will be reassigned to other nodes upon failure;
          • because the intermediate data is stored in local disk and therefore inaccessible. On the other hand, completed reduce tasks output will be stored globally;
          • all R workers will be notified about the reassignment;
      • master failure:
        • periodic checkpoints of the master data structures;
        • computation will be aborted if master fails;
          • highly unlikely for single master machine;
      • semantics in the presence of failures:
        • same as sequential execution if M/R is deterministic because:
          • atomic commits of map and reduce task output;
          • master ignores duplicated commit for map operations;
          • underlying file system guarantees to eliminate duplicated reduce operation output;
        • weak but reasonable semantics if M/R is non-deterministic;
          • due to different execution orderings;
    • Locality:

      • input data is stored on the local disks of the machines in the cluster;
      • files are divided into blocks of 64 MB and stores multiple copies (typically three) over the cluster;
      • master will try to save bandwidth by:
        • assign machines with data replications as M workers;
        • assign machines near data replicas as M workers;
    • Task granularity

      • the number of M and R tasks should be significantly bigger than the number of machines to improve dynamic load balancing and speed up recovery upon node failure;
      • O(M+R) scheduling decisions and O(M*R) task states should be handled by the master machine;
      • R shouldn’t be too large otherwise it might result in fragmentation of the output file;
    • Backup tools:

      • straggler: a machine which takes too long to complete the task;
      • the master perform reassignment of the in-progress tasks when it comes close to the end of MapReduce operation;
      • little overhead in theory but works in large MapReduce operations;
  4. Refinements:

    • Partitioning function:

      • “hash(key) mod R” as partitioning function;
      • other ones with special needs;
    • Ordering guarantees:

      • within a given partition, intermediate k/v pairs are processed in increasing key order;
        • easier to sort output file per partition;
    • Combiner function:

      • combiner function can partially merge intermediate data;
        • executed on each machine that performs a map task;
        • typically the same as reduce function;
    • Input and output types:

      • user can make their own support for input type by providing reader interface;
    • Side-effects:

      • no support for multiple output files from a single task;
    • Skipping bad records:

      • optional mode of execution to ignore bad records and move on;
    • Local execution;

    • Status Information;

    • Counters;



Paper Review: The Google File System

Paper Review:

The Google File System supports common filesystem APIs along with append operation. It ensures the data redundancy and integrity by maintaining several replicas in chunkservers, which are coordinated by a single master machine. The master machine backups all the operations, logs and checkpoints regularly locally and remotely.

Strong points:

  1. Separating data flow with control during replication is brilliant because the control flow has a centralized structure (star structure from master to primary to secondary replicas in this case) and the data flow is much bigger in the size and should be optimized by using chain/pipeline structure in order to make full use of the network bandwidth;
  2. The concept of master replication and shadow master is great. It has really high-level of security and redundancy. Local backup for master data is simply not enough for critical metadata and it handles centralized master failure nicely. Moreover, they minimize the metadata to make the replication more efficient;
  3. The simplicity (at least on the glimpse of the overall structure)  of the design is just stunning. There’s no complex algorithms/solutions in this filesystem but yet each possible problem is handled elegantly. For example, I can imagine how much trouble a fully distributed filesystem like xFS would have without centralized server for coordination.

Weak points:

  1. There are a lot of assumptions about the data they are going to handle on this particular file system. In order to optimize the system, they mainly focus on large files and append operations, which means the small files and random writes are not well-supported in GFS.
  2. In the construction of chain-structured data flow during replication, “finding the closest IP” is not a sophisticated way to construct the chain of servers. Close IP address doesn’t always mean close in network and surely doesn’t imply faster data transfer. I guess I will choose multi-casting supported by routers to transfer data with in the same data center and use a better network distance estimation (like historical latency and transfer speed) for data transfer between different data centers.
  3. Bottleneck effect of the master
    • clients have to communicate with master before data access;
    • in-memory data structure is of limited size;
    • master failure do effect the system performance even with shadow masters;
  4. Since we already have so many machines for the master replication, why don’t we make master distributed a little? For example, clients with read operation request can ask the master replicas for the primary instead of asking the master to utilize the bandwidth.
  5. The client will repeat the mutation operation if any replica fails, which means that all the data will be transferred again, which consumes time and bandwidth and causes duplicate record. Can primary replica be responsible if any secondary failures? Say we have 3 replicas: P for primary and S1, S2 for secondary. In normal situation, P send control message to S1 and S2 and data flows from P to S1 then to S2. If S2 fails, P will be responsible to send S1 a message to retry the data transfer from S1 to S2 for several attempts. This could save the bandwidth and avoid duplication as well. (without the duplication, all the replicas would be identical and checksum is gonna be easier with higher level of data integrity)
  6. Stale replicas will be detected during master’s regular scan and removed in the garbage collection phase. However, since most files are append-only and we have the version number, stale replicas could be reused easily to save the resource for re-replication.
  7. Concurrent write operations are not defined (maybe solve this with an extra layer between clients and the master? So that concurrent operations could be rearranged in that extra layer. So it could be like there is only one client with serial operations)
  8. What if master and primary are both stale? In that case the client will accept the stale replication. I think a Paxos-like voting phase before master replies to the client would solve this case. Actually no, master and primary will never be both stale. The master will update all the metadata upon the startup. If the master doesn’t crush, it will always have the up-to-date version number.
  9. Checksum in each chunkserver is definitely necessary but isn’t it overhead if we are doing the checksumming upon every request? Say if there is storage hotspot with only one chunk of data, the chunkserver will keep on checking for integrity over and over again. Since we don’t have to worry too much about the stale data (because of version number), we can check the integrity when the data is read for the first time then continue without checksumming because this part is already verified. As long as it has the latest version number, it must be correct. Actually no. Hotspot is a rare situation and data corruption might still happen after it was stored into the disk (I guess). Since the checksumming has little effect I guess we could just leave it there.


Paper Outline:

  1. Intro to the problem:

    • Component failures are norm rather than exceptions;
    • Files are huge by traditional standards;
    • Files are mutated by appending rather than overwriting;
    • Co-designing the applications and FS APIs benefits the overall system;
  2. Design overview:

    • Interface:

      • create, delete, open, close, read, write;
      • snapshot, record append;
    • Architecture:

      • master and multiple chunkservers structure;
      • master keeps all the file metadata;
      • masters periodically communicate with chunkservers with HeartBeat messages;
    • Single master:

      • sophisticated chunk placement and replication decision using global knowledge;
      • might become bottleneck of the system; should minimize the involvement of the master during r/w;
        • no more communication with master once chunk is located
        • multiple chunks in the same request;
    • Chunk size:

      • 64 MB in size;
      • pros:
        • reduce the client-master interaction;
        • reduce the network overhead by keeping persistent TCP connection;
        • reduce the size of metadata stored on the master;
      • cons:
        • wasting space (maybe?);
        • small files with less chunks may become hot spot (could be resolved with higher replication factor or allow clients to read data from other clients);
    • Metadata:

      • three major types:
        • the file and chunk namespace ( in memory. local disk and remote);
        • the mapping from files to chunks ( in memory. local disk and remote);
        • the location of each chunk replicas (in memory);
          • master does not store it persistently but ask chunkservers upon startups and other changes.
      • in-memory data structure
        • periodical fast scanning for the entire state (for garbage collection, re-replication upon chunkserver failures and chunk migration);
        • the system capacity is  limited by the master’s memory;
          • not a serious issue because memory consumption is small and extra memories are cheap.
      • chunk locations:
        • obtained and updated with monitoring all the chunkservers’ states with regular HeartBeat messages;
        • chunkservers have final words about the chunk locations so there’s no point trying to maintain a constant view on the master;
      • operation log:
        • historical record of critical metadata changes;
        • defines the order of concurrent operations with logical time;
        • local and remote replications:
          • flushes the log replications before answering to clients;
          • batches several flush requests to reduce the throughput consumption;
        • master recovers by replaying the logs:
          • checkpoints its state when logs grow beyond certain size;
          • load the latest checkpoint and then replay the subsequent logs to reduce the startup time;
    • Consistent model:

      • Guarantees by GFS:
        • consistent: all clients will always see the same data regardless of which replicas they read from;
        • defined: consistent and all clients will always see what the mutation writes in its entirety;
        • serial successful write operation is defined (apply operations to replicas in the same order and detect/collect the outdated ones);
        • concurrent successful write operation is consistent but undefined;
        • record append is defined;
        • client might access stale data because chunk location is cached on the client side.
          • this is limited due to the cache size;
          • most of the files are append-only so stale replica usually returns a premature end of chunk;
        • data corruption is detected with regular handshakes and check-summing  between master and all chunkservers;
  3. System interactions:

    • Leases and mutation order:

      • master grants a chunk lease to one replica as primary;
      • primary defines the mutation order to all;
      • lease mechanism:
        • initial timeout of 60 secs;
        • could be extended indefinitely with HeartBeat messages;
        • could be revoked by master;
      • lease process:
        • client asks the master which chunkserver holds the lease;
        • master returns the primary and secondary replicas;
        • client pushes the data to all replicas in any order;
        • client sends write request to primary if all replicas ACked;
        • primary forwards to write request to all secondary replicas in the exact same serial order;
        • the secondary replicas reply to primary upon completion (nodes failure might result in inconsistency here. It is handled with few more attempts before falling back);
        • the primary replies to client (with errors or not);
      • a “large” write will be divided into small ones and could be interleaved with or overwritten by concurrent operations.
    • Data flow:

      • the control flow is from client to primary and then to secondaries; but the data is pushed linearly along a chain of chunkservers in a pipelined fashion (to fully utilize the bandwidth of each machine);
    • Atomic record appends:

      • only data is specified (not the offset) and GFS guarantees to append the data at least once;
      • primary replica is responsible to check if the appending operation might result in over-sized chunk;
        • If so, it pads the current chunk (and tell the secondary replicas to do the same) and then replies to the client.
      • the client retries the operation if any replica fails and this might cause duplicate record;
    • Snapshot:

      • master revokes leases to ensure that any subsequent writes will require an interaction the master;
      • master saves operations to local disk and then load this log record to its in-memory state;
      • the next request from clients will first ask the master about primary;
      • data is copied locally to create new chunks for the following operations (in order to save the backup data for last snapshot);
  4. Master operation:

    • Namespace management and locking:

      • locks are used to enable simultaneous master operations;
      • files are stored with full path-names and r/w operations will require all the locks along the directory;
      • it allows concurrent mutations in the same directory;
    • Replica placement

      • maximize the data reliability and availability;
      • maximize network bandwidth utilization;
    • Creation, re-replication, rebalancing:

      • factors to consider upon chunk creation:
        • on chunkservers with lower disk utilization;
        • limit the number of recent creation on each chunkserver;
        • spread replicas across racks;
      • re-replication priority factors:
        • how far it is from the re-replication goal;
        • chunks of live files over the recent deleted files;
        • chunks that are blocking client progress;
      • rebalancing happens periodically and gradually;
    • Garbage collection:

      • mechanism:
        • rename and hide the file;
        • remove the hidden file, orphan chunks and the metadata during master’s regular scan;
      • discussion:
        • garbage is easily identified with chunk handle mappings;
        • advantage over eager deletion:
          • simple and reliable in distributed system with constant failure;
          • merge storage reclamation into master regular activities to minimize the cost;
          • safety against accidental, irrevocably deletion;
    • Stale replica detection:

      • chunk version number is maintain in the master;
      • increase the chunk version number when new lease is granted;
      • stale replicas are removed during the garbage collection;
      • master will send the version number to the client along with the primary when the client is asking for a chunk;
  5. Fault tolerance and diagnosis

    • High availability:

      • fast recovery from normal/abnormal termination;
      • chunk replications;
      • master replication:
        • operation logs and checkpoints are replicated on multiple machines;
        • operations is committed only after it flushes to all replicas;
        • master could be replaced by the replicas during failure;
          • replicas serve as read-only shadow masters with the same canonical name as the original master;
    • Data integrity:

      • each chunkserver must independently verify the integrity:
        • comparing between chunkservers is impractical;
        • divergent replicas are legal (not guaranteed to be identical);
      • chunks are broken into 64 KB pieces with 32-bit long checksums;
        • checksums are part of metadata;
        • checksumming has little effect on read operations;
        • update the checksum for the last partial checksum block after append operations;
    • Diagnostic tools:

      • RPC logs with minimal impact on performance;



[1] Ghemawat, Sanjay, Howard Gobioff, and Shun-Tak Leung. “The Google File System.” Proceedings of the Nineteenth ACM Symposium on Operating Systems Principles – SOSP ’03 (2003). Web.