Paper Review: ZooKeeper: Wait-free Coordination for Internet-scale Systems

Paper Review:

Summary:

ZooKeeper is a wait-free coordination used to construct lock, consensus and group membership for the clients. It uses hierarchical nodes with the same tree structure as UNIX file system to store the data and sequential number for recording. It guarantees FIFO execution for every single clients and it’s heavily optimized for faster reads.

Strong points:

  1. It’s just nice to have both synchronous and asynchronous versions of the APIs. Developers can choose whatever they like to fulfill their requirement on timing or consistency. Also we have a fast read and a slow read (sync + read), which makes thing flexible for developers and the applications.
  2. Locks are powerful primitives for distributed system, which, on the other hand, is hard to relax and fits into more applications. ZooKeeper has no lock but it’s easy to develop lock services upon ZooKeeper. Other different coordination models are also available in ZooKeeper.
  3. During the leader election, ZooKeeper clients don’t have to poll the locks and try to win the leader. Instead, it has a mechanism to keep clients in order and the previous leader failure only wakes the next client without interrupting others.

Weak points:

  1. Since ZooKeeper is an open source project aimed to serve variety of applications, it’s better to implement more options for the developers to choose from. For example, the watch mechanism is basically the same as Chubby with one-time trigger. If they could give some extra options, like when a client creates a znode, it could register for multiple triggers or notifications along with the updated data, that would possibly fit into more situations.
  2. Even with the guarantees on the ordering, the watch is an asynchronous according to (https://zookeeper.apache.org/doc/r3.4.5/zookeeperProgrammers.html#ch_zkWatches). This means the read could be stale not only because of the inconsistent local read from a non-leader server after a write, but also because of the late watch notification. There’s no good solution to this issue because any stronger consistency model could slow down the reads dramatically. However, since we already have sequence number for each znode, why don’t we bound the number with a timestamp or some version number so that the clients would at least have a better understanding of freshness of the data
  3. Looks like the simple locks without herd effect is done with a strict ordering of sequential znodes. A client will become a leader by being notified about the previous leader with the previous sequential number dies. Since the sequential number is increasing over the requests, does this mean the leader ordering is determined by the time they registered to the ZooKeeper? Sounds like this mechanism is not quite desirable in many cases. For example, if the developer wants the client with better hardware to become the leader, then it will be hard to implement because the znode sequential numbers don’t have anything to do with the hardware. (wait…. can I store the hardware scores into the data in each znode and maintain a ordered queue in the group znode? This way the developers can configure the leader election as they want.)
  4. There’s no mentioning on the issue of load balancing. Although all the ZooKeeper servers keep the exact same replicas, clients have access to any of the servers and can fire queries as they want. This could potentially lead to unbalanced request handling among all the servers and possibly some malicious attack as well. Again I couldn’t think of a good way to tackle this issue as a centralized load-balancing could not only slow down the client accesses but also add complexity to the system. Maybe we can implement the ZooKeeper servers in a way that if they detect a lot of TCP losses/timeouts during a time, it redirects some of its distanced clients to other servers

 

Paper Notes:

  1. ZooKeeper uses non-block primitives because with the blocking ones, like locks, slow and faulty clients can limit the overall performance of the system. The failure detection would increase the complexity. It’s a lock-free version of Chubby;
  2. The requests are handled with pipelined-fashion to support thousands of operation execution; the FIFO ordering of client operations enables asynchronous operations (multiple consecutive operations without reply);
  3. Zab for leader-based atomic broadcast protocol;
  4. Reads throughput is optimized by 2) servers handle reads locally 2) no strict ordering for reads and 3) client-side caching with a watch mechanism;
  5. Tree of directories and data (hierarchical name spaces) because it’s easier and familiar for users in a standard UNIX file system;
  6. Regular and ephemeral znodes
  7. Increasing sequential flag append to znode
  8. Watches to allow clients to receive timely notifications of changes without polling
    1. the clients send reads with watch flag set and the server will return the data with one-time promise to notify the clients about any change;
    2. watch only indicate the change without sending the actual data;
  9. Clients and ZooKeeper interact by using sessions with timeout. It ends when
    1. inactive clients over time
    2. close session by clients
    3. faulty clients detected by ZooKeeper
  10. Synchronous and asynchronous APIs
    1. no handles but use direct access to the path instead
    2. update with version number to enable version checking
  11. Two features:
    1. linearizability (multiple outstanding operations but ordered with guarantee)
    2. FIFO execution of queries
  12. For a leader re-electing scenario:
    1. lock service will prevent other processes using the old configurations but doesn’t offer a good solution when the new leader partially changed the configurations of all and then failed
    2. in ZooKeeper, the new leader is going to delete the ready znode, make configuration changes and then set the ready znode back on. The configuration change is send asynchronously. New configuration will not be used;
    3. ordering is the key preventing any client read anything before it’s ready;
    4. sync causes a server to apply all pending write requests before processing the read without having any extra write
  13. Write with atomic broadcast and read from any server for better availability
  14. Write-ahead log in disk
  15. Quorum-based algorithm with pipeline optimization
  16. Periodically snapshot for faster recovery. It’s inconsistent because of it’s a non-blocking depth first scan of the tree of each znode data

 

Paper Review: The Hadoop Distributed File System

Paper Review:

The HDFS is heavily inspired by the GFS in many ways. However, it does not focus on the append-only large files but offers a more general and flexible way to handle the data. Some of the approaches like configurable block placement policy and block size is great for common users but it doesn’t really solve any problem appears in GFS seven years ago. (On the other hand, Colossus kinda overcomes the issue of centralized control/coordination of single master machine and brings the scalability to the new level. At least that what they said)

Strong points:

  1. Comparing to GFS, HDFS doesn’t make too many assumptions on the data and operations that it’s likely to handle. The chunk size is 128 MB by default but still user configurable file-to-file. This could be considered a more general solution to distributed file system. On the other hand, since I won’t write this again in the weak points, the fact that it makes no assumptions could result in under-utilization in many situations. For example, HDFS must have much lower efficiency than GFS when it comes to massive amount of concurrent append operations;
  2. The fact that HDFS offers version and software version verification during node startup is really good when it comes to software debugging and upgrade. It prevents uncoordinated snapshot and other possible issue with the data integrity.
  3. hflush operation is really interesting. I was just about to write the packet buffer as a weak point because the write changes could be invisible to reader. hflush operation flushes everything down the pipeline and wait for ACKs so that the recent write operations are visible in all replicas. Btw since TCP connection already takes care of the packet  receiving order, we don’t have to worry about it and wait for ACKs when we are sending a sequence of packets.
  4. The configurable block placement policy is awesome. I can’t wait to see some experiments on the performance analysis of different block placement policy. I feel that in many aspects, HDFS is similar to GFS but more flexible and configurable, like the block size, rack identification and this one. (or I’m just a little biased because this is an open-source project).

Weak points:

  1. Most of the weak points of GFS are repeated here, like the centralized coordination of NameNode might cause unavailability when it’s down and limited scalability
  2. The heartbeats interval is three seconds by default. However, the NameNode only consider a DataNode out pf service and block the replicas hosted by that DataNode after ten minutes. The heartbeat interval and the out-of-service time don’t really match. There will be a long period of time (9 min and 57 sec) when the DataNode is down but the NameNode continues to assume the node might still be alive.
  3. The snapshot is not really effective. While I can’t think of a better way to backup local data on DataNode, I don’t think a copy of current file directories and hard link those directories to be of too much help. I can only roll back to a “state” of the previous storage by restoring all repositories but the files cannot be rolled back even with checkpoints. I think in GFS things are better because most files are append-only and there are ways to restore files with a list of operations. Also GFS seems to allow local copies of files during snapshot. Again I know this kind of backup requires a lot of storage resource but the fact that HDFS doesn’t offer this option to users could definitely be a downside.
  4. In HDFS they have balancer running as an extra application on every single DataNode that it moves replicas around in order to balance the disk utilization and does not reduce the number of replicas and the racks used by that block. However, unlike GFS which takes care of the balancing mostly in the placement, HDFS uses balancer as an extra feature which consumes extra computing power and computation. It’s likely that a new block is randomly placed in a wrong place (a node with high utilization) and then the balancer has to move it to a new DataNode. Even the simple balancing during placement could result in too much traffic for a newly-joined DataNode, it’s not that hard to come up with a algorithm to cope with that (like a interval between each new placement on a single DataNode). A sophisticated placement decision is way better than rebalancing after the placement.

 

 

Paper Outline:

  1. Introduction and related work:

    • HDFS is the file system component of Hadoop;
    • Stores metadata (on NameNode) and data (on DataNodes) separately;
    • Data is replicated on different machine to:
      • ensure the reliability and durability;
      • data transfer bandwidth is multiplied;
    • Multiple metadata servers;
  2. Architecture:

    • NameNode:

      • files and directories are represented on the NameNode by inodes:
        • record attributes like permissions, modification and access times, namespace and disk space quotas;
        • files are split into big blocks (typically 128 MB) and replicated on DataNodes (typically three replicas);
      • maintains namespace tree and mapping of file blocks to DataNodes;
      • read request from HDFS client:
        • contact NameNode first for the file locations;
        • read contents from the closest DataNode;
      • write request from HDFS client:
        • nominate a suite of three DataNodes;
        • write to all three in a pipeline fashion;
      • single NameNode for each cluster;
      • image: inode data with a list of block mappings;
        • stored in RAM entirely and replicated as checkpoint in local disk;
        • NameNode also stores modification log of image called journal in local disk;
        • restores the namespace by reading the namespace checkpoints and replaying the journal;
    • DataNodes:

      • each block replica on a DataNode is represented by two files in local disk:
        • the first file contains the data itself;
        • the second file is block’s metadata:
          • checksums and generation stamp;
      • handshake with NameNode during startup;
        • verify namespace ID and software version;
        • namespace ID is assigned to the file system instance when it is formatted. Nodes with different namespace IDs will not be able to join the cluster to preserve the data integrity;
      • register with the NameNode after handshake:
        • persistently store their unique storage IDs (if they don’t have it);
        • makes it recognizable even it restarts with a different IP;
      • identifies block replicas and send the block report to NameNode;
        • contains block ids and the generation stamps;
        • the first report is sent immediately after registration and the subsequent reports will be sent on hourly basis to NameNode;
      • DataNode send heartbeats to the NameNode:
        • to confirm its normal operation;
        • default interval is three seconds;
        • NameNode marks out of service after 10 minutes without heartbeats;
        • heartbeats also carry information on storage capacity, fraction of storage in use, and the number of data transfers in progress;
        • NameNode replies to heartbeats with instructions like:
          • replicate blocks to other nodes;
          • remove local block replicas;
          • re-register or to shut down the node;
          • send an immediate block report;
    • HDFS client:

      • supports read, write and delete files; create and delete directories;
      • when an application reads a file:
        • client asks NameNode for the list of DataNodes with replicas;
        • client contacts a DataNode directly for the file;
      • when an application writes a file:
        • client asks NameNode to choose DataNodes to host replicas;
        • client organizes a pipeline from node to node and sends data;
        • client has to contact NameNode for new DataNodes if the previous data block is filled.
      • special APIs:
        • one that exposes the locations of a file blocks;
        • one that changes the replication factor;
    • Image and journal:

      • image describes organization of application data as directories and files:
        • persistent record stored in local disk is called checkpoints;
      • journal is a write-ahead commit log for changes to the file system:
        • must be persistent;
        • every transaction is recorded before the change is committed;
      • checkpoint file:
        • cannot be changed by the NameNode
        • could be replaced with a new checkpoint (derived from old checkpoint and journal) after startup or by CheckpointNode;
      • checkpoint and journal are stored in multiple storage directories:
        • on different columns and remotely;
        • batches multiple transactions from different clients to remove the bottleneck of disk access of multiple threads;
    • CheckpointNode:

      • downloads the current checkpoints and journal files from NameNode, combines them and return the merged checkpoint periodically;
      • truncate the journal to prevent file loss/corruption and save restart time;
    • BackupNode:

      • in addition to periodical checkpoint creation, it maintains an in memory image of the namespace that is always synchronized with NameNode;
      • accepts stream of transaction from NameNode and saves them;
        • can create checkpoints without downloading;
        • can be viewed as a read-only NameNode that contains all metadata except for block locations (which is in the memory of NameNode);
    • Upgrades, file system snapshots:

      • only one snapshot can exist;
      • the NameNode first reads the checkpoint and journal files and merges them in memory;
      • NameNode instruct DataNodes to create local snapshot during handshake:
        • which is merely a copy of storage directory;
      • cluster admin can choose to roll back when restarting the system;
        • no option to roll forward;
      • layout version is also compared during node startup;
  3. File I/O operations and replica management

    • File read and write:

      • a HDFS client request a lease to write to a file;
        • renew the lease periodically;
          • if soft limit expires, others can request lease;
          • if hard limit expires, the lease will be revoked;
        • no other clients can write to the same file;
        • lease revoked when the file is closed;
      • writing to a block in pipeline from node to node:
        • 64 KB buffer packets are pushed into pipeline when filled;
        • next packets can be sent without ACKs for the previous packets;
        • HDFS does not guarantee that changes are visible to readers till the file is closed (because of the packet pushing);
        • hflush operation flushes the current packet to the pipeline and returns after ACKed and make the change visible immediately;
      • checksums are used while reading:
        • a DataNode stores checksums in a metadata file separately;
        • client computes checksum upon data receive and verifies with the checksum sent from DataNodes;
      • when a clients opens a file to read:
        • select the closest replica first and to the next closet replica if it fails;
        • HDFS permits read during write;
      • HDFS I/O is particularly optimized for batch processing systems:
        • high throughput for sequential read/write;
        • reduce the read/write response time;
    • Block placement:

      • spread the data across multiple racks, which will also increase the network round-trip time;
      • HDFS allows an administrator to configure a script that returns a node’s rack identification given a node’s address;
        • rack identifications are assigned to DataNode during register by NameNode;
      • HDFS offers a configurable block placement policy interface:
        • by default the first replica is where the writer is and the rest two are assigned randomly across all the racks with some restrictions;
        • No DataNode contains more than one replica;
        • No rack contains more than two replicas of the same block, provided that there are sufficient amount of racks;
      • reading from closest replica reduces the inter-node and inter-rack traffic;
    • Replication management:

      • NameNode detects if a block is over- or under- replicated from block reports:
        • removes blocks (from DataNodes with least amount of available disk space) that are over-replicated;
        • replicates blocks if they are under-replicated:
          • replicates one-replica block first on a different rack;
          • if two replicas are on different racks, the third replication should be placed on one of the two racks to reduce cost;
        • treats the situation where all replicas are on the same rack as under-replicated:
          • create a replica on a different rack;
          • remove one of the three replicas in the original rack;
    • Balancer:

      • HDFS block placement strategy doesn’t take DataNode disk space utilization into account;
        • avoid placing new data into a small set of new nodes;
      • balancer is a tool that balances disk space usage:
        • takes a threshold value as input parameter;
        • deployed as an application program that can be run by admin;
        • iteratively moves replicas from DataNodes with higher utilization to lower ones;
        • guarantees that the decision does not reduce the number of replicas or the number of racks used for a particular block;
        • optimizes the balancing by minimizing the inter-rack traffic;
        • configured to limit the bandwidth consumed by rebalancing;
    • Block scanner:

      • each DataNode runs a block scanner that:
        • periodically scans all replicas and verifies the checksums;
          • client read could help the verification;
        • verification time of each block is stored in a readable file;
        • NameNode marks the replica as corrupt if checksum fails:
          • NameNode starts to replicate a good copy immediately;
          • then the corrupt replica is scheduled for deletion;
          • allows user to retrieve data from the corrupt replicas;
    • Decommissioning:

      • a black list for joining the cluster;
      • once a DataNode is marked as decommissioning:
        • it won’t be selected as replica target;
        • continues to serve read requests;
        • NameNode replicates the blocks on this DataNode;
        • the node enters decommissioned state after and can be safely removed from the cluster;
    • Inter-cluster data copy:

      • DistCp for large inter/intra-cluster parallel copying:
        • a MapReduce job;
  4. Practice at Yahoo;

  5. Future work:

    • Effectively unavailable when its NameNode is down;
    • Scalability of the NameNode has been a key struggle;
      • one solution is to let multiple namespaces and NameNodes to share the physical storage. But this costs a lot in terms of management.

 

 

Paper Review: The Google File System

Paper Review:

The Google File System supports common filesystem APIs along with append operation. It ensures the data redundancy and integrity by maintaining several replicas in chunkservers, which are coordinated by a single master machine. The master machine backups all the operations, logs and checkpoints regularly locally and remotely.

Strong points:

  1. Separating data flow with control during replication is brilliant because the control flow has a centralized structure (star structure from master to primary to secondary replicas in this case) and the data flow is much bigger in the size and should be optimized by using chain/pipeline structure in order to make full use of the network bandwidth;
  2. The concept of master replication and shadow master is great. It has really high-level of security and redundancy. Local backup for master data is simply not enough for critical metadata and it handles centralized master failure nicely. Moreover, they minimize the metadata to make the replication more efficient;
  3. The simplicity (at least on the glimpse of the overall structure)  of the design is just stunning. There’s no complex algorithms/solutions in this filesystem but yet each possible problem is handled elegantly. For example, I can imagine how much trouble a fully distributed filesystem like xFS would have without centralized server for coordination.

Weak points:

  1. There are a lot of assumptions about the data they are going to handle on this particular file system. In order to optimize the system, they mainly focus on large files and append operations, which means the small files and random writes are not well-supported in GFS.
  2. In the construction of chain-structured data flow during replication, “finding the closest IP” is not a sophisticated way to construct the chain of servers. Close IP address doesn’t always mean close in network and surely doesn’t imply faster data transfer. I guess I will choose multi-casting supported by routers to transfer data with in the same data center and use a better network distance estimation (like historical latency and transfer speed) for data transfer between different data centers.
  3. Bottleneck effect of the master
    • clients have to communicate with master before data access;
    • in-memory data structure is of limited size;
    • master failure do effect the system performance even with shadow masters;
  4. Since we already have so many machines for the master replication, why don’t we make master distributed a little? For example, clients with read operation request can ask the master replicas for the primary instead of asking the master to utilize the bandwidth.
  5. The client will repeat the mutation operation if any replica fails, which means that all the data will be transferred again, which consumes time and bandwidth and causes duplicate record. Can primary replica be responsible if any secondary failures? Say we have 3 replicas: P for primary and S1, S2 for secondary. In normal situation, P send control message to S1 and S2 and data flows from P to S1 then to S2. If S2 fails, P will be responsible to send S1 a message to retry the data transfer from S1 to S2 for several attempts. This could save the bandwidth and avoid duplication as well. (without the duplication, all the replicas would be identical and checksum is gonna be easier with higher level of data integrity)
  6. Stale replicas will be detected during master’s regular scan and removed in the garbage collection phase. However, since most files are append-only and we have the version number, stale replicas could be reused easily to save the resource for re-replication.
  7. Concurrent write operations are not defined (maybe solve this with an extra layer between clients and the master? So that concurrent operations could be rearranged in that extra layer. So it could be like there is only one client with serial operations)
  8. What if master and primary are both stale? In that case the client will accept the stale replication. I think a Paxos-like voting phase before master replies to the client would solve this case. Actually no, master and primary will never be both stale. The master will update all the metadata upon the startup. If the master doesn’t crush, it will always have the up-to-date version number.
  9. Checksum in each chunkserver is definitely necessary but isn’t it overhead if we are doing the checksumming upon every request? Say if there is storage hotspot with only one chunk of data, the chunkserver will keep on checking for integrity over and over again. Since we don’t have to worry too much about the stale data (because of version number), we can check the integrity when the data is read for the first time then continue without checksumming because this part is already verified. As long as it has the latest version number, it must be correct. Actually no. Hotspot is a rare situation and data corruption might still happen after it was stored into the disk (I guess). Since the checksumming has little effect I guess we could just leave it there.

 

Paper Outline:

  1. Intro to the problem:

    • Component failures are norm rather than exceptions;
    • Files are huge by traditional standards;
    • Files are mutated by appending rather than overwriting;
    • Co-designing the applications and FS APIs benefits the overall system;
  2. Design overview:

    • Interface:

      • create, delete, open, close, read, write;
      • snapshot, record append;
    • Architecture:

      • master and multiple chunkservers structure;
      • master keeps all the file metadata;
      • masters periodically communicate with chunkservers with HeartBeat messages;
    • Single master:

      • sophisticated chunk placement and replication decision using global knowledge;
      • might become bottleneck of the system; should minimize the involvement of the master during r/w;
        • no more communication with master once chunk is located
        • multiple chunks in the same request;
    • Chunk size:

      • 64 MB in size;
      • pros:
        • reduce the client-master interaction;
        • reduce the network overhead by keeping persistent TCP connection;
        • reduce the size of metadata stored on the master;
      • cons:
        • wasting space (maybe?);
        • small files with less chunks may become hot spot (could be resolved with higher replication factor or allow clients to read data from other clients);
    • Metadata:

      • three major types:
        • the file and chunk namespace ( in memory. local disk and remote);
        • the mapping from files to chunks ( in memory. local disk and remote);
        • the location of each chunk replicas (in memory);
          • master does not store it persistently but ask chunkservers upon startups and other changes.
      • in-memory data structure
        • periodical fast scanning for the entire state (for garbage collection, re-replication upon chunkserver failures and chunk migration);
        • the system capacity is  limited by the master’s memory;
          • not a serious issue because memory consumption is small and extra memories are cheap.
      • chunk locations:
        • obtained and updated with monitoring all the chunkservers’ states with regular HeartBeat messages;
        • chunkservers have final words about the chunk locations so there’s no point trying to maintain a constant view on the master;
      • operation log:
        • historical record of critical metadata changes;
        • defines the order of concurrent operations with logical time;
        • local and remote replications:
          • flushes the log replications before answering to clients;
          • batches several flush requests to reduce the throughput consumption;
        • master recovers by replaying the logs:
          • checkpoints its state when logs grow beyond certain size;
          • load the latest checkpoint and then replay the subsequent logs to reduce the startup time;
    • Consistent model:

      • Guarantees by GFS:
        • consistent: all clients will always see the same data regardless of which replicas they read from;
        • defined: consistent and all clients will always see what the mutation writes in its entirety;
        • serial successful write operation is defined (apply operations to replicas in the same order and detect/collect the outdated ones);
        • concurrent successful write operation is consistent but undefined;
        • record append is defined;
        • client might access stale data because chunk location is cached on the client side.
          • this is limited due to the cache size;
          • most of the files are append-only so stale replica usually returns a premature end of chunk;
        • data corruption is detected with regular handshakes and check-summing  between master and all chunkservers;
  3. System interactions:

    • Leases and mutation order:

      • master grants a chunk lease to one replica as primary;
      • primary defines the mutation order to all;
      • lease mechanism:
        • initial timeout of 60 secs;
        • could be extended indefinitely with HeartBeat messages;
        • could be revoked by master;
      • lease process:
        • client asks the master which chunkserver holds the lease;
        • master returns the primary and secondary replicas;
        • client pushes the data to all replicas in any order;
        • client sends write request to primary if all replicas ACked;
        • primary forwards to write request to all secondary replicas in the exact same serial order;
        • the secondary replicas reply to primary upon completion (nodes failure might result in inconsistency here. It is handled with few more attempts before falling back);
        • the primary replies to client (with errors or not);
      • a “large” write will be divided into small ones and could be interleaved with or overwritten by concurrent operations.
    • Data flow:

      • the control flow is from client to primary and then to secondaries; but the data is pushed linearly along a chain of chunkservers in a pipelined fashion (to fully utilize the bandwidth of each machine);
    • Atomic record appends:

      • only data is specified (not the offset) and GFS guarantees to append the data at least once;
      • primary replica is responsible to check if the appending operation might result in over-sized chunk;
        • If so, it pads the current chunk (and tell the secondary replicas to do the same) and then replies to the client.
      • the client retries the operation if any replica fails and this might cause duplicate record;
    • Snapshot:

      • master revokes leases to ensure that any subsequent writes will require an interaction the master;
      • master saves operations to local disk and then load this log record to its in-memory state;
      • the next request from clients will first ask the master about primary;
      • data is copied locally to create new chunks for the following operations (in order to save the backup data for last snapshot);
  4. Master operation:

    • Namespace management and locking:

      • locks are used to enable simultaneous master operations;
      • files are stored with full path-names and r/w operations will require all the locks along the directory;
      • it allows concurrent mutations in the same directory;
    • Replica placement

      • maximize the data reliability and availability;
      • maximize network bandwidth utilization;
    • Creation, re-replication, rebalancing:

      • factors to consider upon chunk creation:
        • on chunkservers with lower disk utilization;
        • limit the number of recent creation on each chunkserver;
        • spread replicas across racks;
      • re-replication priority factors:
        • how far it is from the re-replication goal;
        • chunks of live files over the recent deleted files;
        • chunks that are blocking client progress;
      • rebalancing happens periodically and gradually;
    • Garbage collection:

      • mechanism:
        • rename and hide the file;
        • remove the hidden file, orphan chunks and the metadata during master’s regular scan;
      • discussion:
        • garbage is easily identified with chunk handle mappings;
        • advantage over eager deletion:
          • simple and reliable in distributed system with constant failure;
          • merge storage reclamation into master regular activities to minimize the cost;
          • safety against accidental, irrevocably deletion;
    • Stale replica detection:

      • chunk version number is maintain in the master;
      • increase the chunk version number when new lease is granted;
      • stale replicas are removed during the garbage collection;
      • master will send the version number to the client along with the primary when the client is asking for a chunk;
  5. Fault tolerance and diagnosis

    • High availability:

      • fast recovery from normal/abnormal termination;
      • chunk replications;
      • master replication:
        • operation logs and checkpoints are replicated on multiple machines;
        • operations is committed only after it flushes to all replicas;
        • master could be replaced by the replicas during failure;
          • replicas serve as read-only shadow masters with the same canonical name as the original master;
    • Data integrity:

      • each chunkserver must independently verify the integrity:
        • comparing between chunkservers is impractical;
        • divergent replicas are legal (not guaranteed to be identical);
      • chunks are broken into 64 KB pieces with 32-bit long checksums;
        • checksums are part of metadata;
        • checksumming has little effect on read operations;
        • update the checksum for the last partial checksum block after append operations;
    • Diagnostic tools:

      • RPC logs with minimal impact on performance;

 

Citations:

[1] Ghemawat, Sanjay, Howard Gobioff, and Shun-Tak Leung. “The Google File System.” Proceedings of the Nineteenth ACM Symposium on Operating Systems Principles – SOSP ’03 (2003). Web.