Pig is a scripting language built upon MapReduce, trying to save data scientist from the control flow MR programs that they are struggling with. It’s a combination of SQL and procedural language, which compiles highly abstract data flow programs into MapReduce executions. So developers don’t have to worry much about parallelism, fault-tolerance and pipelining etc.
- Pig is a combination of SQL querying language and low level MapReduce (high level renders and low level manipulations), which grants Pig some benefits of both. Programmers can easily write “sequential (not declarative)” programs without worrying about database layout and MapReduce parallelism.
- Pig is flexible in many ways like various data models and user-define function with Java and Python (in later versions) support etc. You can specify the control flow of programs at each step, just like any other procedural languages while focusing on data flow at the same time.
- Also Pig is different from relation database or OLAP. It can deal with large unstructured datasets or nested data structure. On the other hand, it enjoys parallelism just like parallel database, thanks to the internal MapReduce compilation.
- Each step is pipelined and optimized for execution. Also it’s based on MapReduce so all the fault-tolerance, scalability and other MR features are given. As a developer you don’t have to worry much about the lower level of the language.
- First of all Pig programs are still compiled into MapReduce, which is inefficient and inflexible without doubt. Data must be materialized and replicated on the distributed storage between successive MapReduce steps and this makes thing much slower even with the pipeline. Also the bandwidth might become an issue when pipelined Pig programs are running: at the end of each step, all the intermediate data will be transferred via network almost simultaneously.
- Execution order is not actually sequential as the programs wrote in Pig. This is also mentioned in the “future work” part of the paper, that some of the execution order will be modified to meet better performance. While this is a good feature in most scenarios, it could be dangerous and hard to debug and fix in some cases.
- The flexibility could be a bit of challenge for developers. For example, only small part of parallelized primitives are included in Pig and users are responsible for the rest of the implementation as well as efficiency. I can imagine it would be a pain to develop a efficient UDF without support from the community.
Druid is a real-time analytical datastore with column-oriented layout and different types of nodes to support different functionality. It depends on deep storage, MySQL and Zookeeper for persistent storage, query and coordination purpose separately and could handle queries faster than MySQL and Hadoop.
- Historical node tier offers flexibility and better performance. Nodes within the same tier shares the identical configuration and different tiers are divided according to their data “importance”. A tier with more cores and memories is suitable for frequently accessed data. Although the tier seems to be manually configured, it’s still a good solution to take full advantage of the limited hardware resources.
- Zookeeper and MySQL outages do not impact current data availability on historical nodes. With the cached data, historical nodes are still able to handle its current queries. Also the caching saves time by eliminating unnecessary accesses to the deep storage. However, the coordinator nodes might become unable to perform any operation.
- Druid uses column-oriented storage, which is similar to Dremel. Column storage is more efficient in the data analytics since only the useful data is actually loaded and scanned. In row-based storage, all the data in the rows have to be loaded, which results in significant longer running time.
- A Druid cluster consists of different types of nodes, which are only responsible for specific kinds of tasks. While this might be a simple design, the load balancing between different roles might be an issue and makes the system inflexible under certain scenarios. If a system has a load of events during day time and all the queries are in the night, Druid still have to separate the roles of real-time nodes and historical nodes, which means that only part of the system is working at any time of the data while the rest of the machines (either real-time nodes or historical nodes) are not contributing and wasted.
- I think Druid has the same downside as Hadoop MapReduce, in the sense that all the “intermediate results” are saved remotely in some disk. To access the intermediate result, MapReduce has to load them from disk and transfer using internet bandwidth and so is Druid. In Druid, the batch data are saved in deep storage, which is a distributed file system and all the processing requires loading the data remotely. This brings another layer of delay and dependency. I wonder if the “store” and “processor” could be on the same machine so that we can save the loading and transferring.
- The coordinator nodes could be redundant since we already have Zookeeper. Zookeeper is definitely capable of telling what data the historical nodes should load and replicate, and It’s not hard to implement Zookeeper with simple load balancing function. On the other hand, the historical nodes might suffer short outage during the leader election of coordinator nodes. So for me it doesn’t really make sense to add coordinator nodes into the system.
- The overall design doesn’t fit the the philosophy of Druid class in role playing games at all. Yes, Druid (in this article) is capable of handling complex data analysis, but the design is messy and inelegant since it solves the problem using different types of nodes and dependency. Druid class, in the game, could transfer into different animals to tackle different problems. It’s flexible and independent. So I guess the “real Druid design” of the system should be one kind of nodes that is programed to handle all the problems in different manner.
Cassandra is another fully distributed data store pretty much like Dynamo in Amazon. It also works with consistent hashing but different in the load balancing part. It has relatively flexible data types and replication methods than Dynamo and is one of the best non-relational data store in the world.
(pretty much every advantage/disadvantage of Dynamo could be listed here so I’m just going to focus on the minor differences they have)
- Column indices for faster lookup. This is a great optimization for wide-column data store by preventing the column-by-column search during data retrieval.
- Load balancing scheme is more tractable and deterministic because we are not assigning virtual nodes to machines any more. We are actually analyzing the load and performance of each machine and moving data from node to node to balance the distribution. This means that 1. the load distribution is more observation-based which means that the balancing result should be better than virtual node assignment and 2. this balancing method is dynamic and could react to run-time imbalance in load assignment.
- While Cassandra is a fully distributed system, the use of Zookeeper really make thing simple and elegant. For example, the replication location is chosen by the leader elected by Zookeeper rather than using consensus algorithms. This way the replication requires less coordination among all nodes and could substantially reduce the message exchange amount and overall latency.
- “Cassandra morphs all writes to disk into sequential writes thus maximizing disk write throughput”. I guess it means that concurrent write operations are treated sequentially with their timestamps and the ordering is agreed by all replicas by coordinated consensus or leader decision. Either way this method of serialization could end up with a different ordering from the reality.
- Cassandra resembles NoSQL database which is proven to be flexible and scalable (just like Dynamo and Bigtable) than relational databases. However, NoSQL databases requires more attention to the structure of storage and the data could be hard to import/export without standards and proper support of query language.
- I’m just being nitpicking here but the load balancing could be potentially problematic. It requires periodical background scanning of loads, which consumes time, bandwidth and computation power, and a recovered light-loaded node could be assigned too much data in a short span of time and might result in node crush and/or network jam.
- A table is a distributed multi dimensional map indexed by a key;
- The value is highly structured:
- row key is a string with no size limit;
- every operation under a single row key is atomic per replica;
- columns are grouped together to form column families;
- super column families are column families within column families;
- the sort order within a column family is application-specified;
- insert(table, key, rowMutation);
- get(table, key, columnName);
- delete(table, key, columnName);
- uses order preserving consistent hashing to replicate data across clusters;
- the data is assigned onto a ring structure of the hash output;
- walk clockwise and find the first data node as coordinator;
- main principle of this design:
- leaving/joining of a data node only affects the one other node;
- random assignment causes non-uniform data and load distribution;
- oblivious to the heterogeneity in the performance of nodes;
virtual nodes like the ones in Dynamo;
- analyze load info on the ring and move lightly loaded nodes on the ring to alleviate heavily loaded nodes. This makes the design and implementation very traceable and helps to make very deterministic choices about load balancing;
- N as replication factor is configured per-instance;
- coordinator of the data is responsible for replication:
- rack unaware:
- pick the next N-1 data nodes on the ring as replicas;
- rack aware/datacenter aware:
- leader election by Zookeeper;
- makes a concerted effort to maintain the replication;
- metadata backed up at each node and Zookeeper;
- each node is aware of every other node in the system:
- durability guarantees by relaxing the quorum requirements;
- cluster membership is based on Scttlebutt Gossip;
- failure detection:
- Accrual Failure Detection: not up/down statues but a suspicion level of each monitored nodes;
- every node in the system maintains a sliding window of inter-arrival times of gossip messages from other nodes;
- the distribution of these inter-arrival times is determines and suspicion level is calculated from it;
- when a new node is joining:
- randomly pick a token on the ring;
- position backed up on local disk and Zookeeper;
- position info is gossiped around the cluster;
- read the configuration file (a list of a few contact points);
- manual startup and joining in to restore a failed node;
Scaling the cluster:
- new node should alleviate a heavily loaded node;
- data streams using kernel-kernel copy technique;
- local file system for data persistence;
- typical write operation involves:
- a write into a commit log for durability and recover-ability;
- an update into an in-memory data structure after;
- dump in-memory data into local disk when it reaches threshold;
- write is sequential and indexed for easy lookup;
- periodically merging files;
- read operations:
- read memory before disk;
- bloom filter for key checking before read;
- maintain column indices to jump to the right chunk;
- Cassandra process on a single machine consists of:
- partitioning module;
- cluster membership and failure detection module;
- storage engine module;
- modules rely on an event driven substrate where the message processing and task pipelines are split into multiple stages along the line of SEDA;
- system control messages are over UDP while the application related messages are over TCP;
- states for read/write request:
- identify the nodes that own the data for the key;
- route the request to the nodes and wait on response;
- return to the client if timeout;
- figure out the latest response based on timestamp;
- schedule a repair of the data if necessary;
- purging commit log;
- fast sync mode: write to buffered log with potential risk of data loss;
- morphs all writes to disk into sequential writes and lockless to maximize the write throughput;
First post. Good stuff coming soon (or not).