Notes on "Designing Data-Intensive Applications" by Martin Kleppmann

Thomas S. Repantis

Chapter 1: Reliable, Scalable, and Maintainable Applications Chapter 2: Data Models and Query Languages Chapter 3: Storage and Retrieval Chapter 4: Encoding and Evolution Chapter 5: Replication Chapter 6: Partitioning Chapter 7: Transactions Chapter 8: The Trouble with Distributed Systems Chapter 9: Consistency and Consensus Chapter 10: Batch Processing Chapter 11: Stream Processing Chapter 12: The Future of Data Systems
Chapter 1: Reliable, Scalable, and Maintainable Applications
Data-intensive: Amount of data, complexity of data, speed at which data is changing. Reliability: Tolerating hardware & software faults, human error. Scalability: Measuring load & performance, latency percentiles, throughput. Maintainability: Operability, simplicity, evolvability. Umbrella term "data systems", because the boundaries between databases, queues, caches, datastores are becoming blurred. Fault: A component of the system deviating from its spec. Failure: The system as a whole stops providing the required service to the user. Implementing Twitter by inserting into a global collection of tweets, vs maintaining a cache for each user's timeline, vs doing the former for celebrities and the latter for everyone else. Throughput for batch processing systems vs response time for online systems. Latency: Duration that a request is waiting to be handled. Response time: What the client sees, includes service time, network delays, queueing delays. SLA example: A service considered to be up if it has a median response time of less than 200ms and a 99th percentile under 1s, and the service is required to be up at least 99.9% of the time. Head of line blocking: It only takes a small number of slow requests to hold up the processing of subsequent requests. Tail latency amplification: Even if only a small percentage of backend calls are slow, the chance of getting a slow call increases if an end-user request requires multiple backend calls, and so a higher proportion of end-user requests end up being slow. Accidental complexity: Not inherent in the problem that the software solves (as seen by the users) but arises only from the implementation.
Chapter 2: Data Models and Query Languages
The advantage of using an ID instead of a string is that because it has no meaning to humans, it never needs to change. As a rule of thumb, if you're duplicating values that could be stored in just one place, the schema is not normalized. Document data model benefits: Schema flexibility, better performance due to locality, closer to the data structures used by some applications. Relational data model benefits: Better support for joins, many-to-one and many-to-many relationships. Graph data model benefits: Better support for complex connections within data (complex many-to-many relationships). Schema-on-read (document and graph databases) vs schema-on-write (relational databases). Declarative (e.g., SQL) vs imperative. Map and reduce functions must be pure, they only use data passed to them as input, cannot perform additional queries, and must not have any side effects. Property graphs store vertices and edges. Triple-stores store information in the form of three-part-statements: subject, predicate, object. Semantic web: Websites publishing machine-readable data using RDF.
Chapter 3: Storage and Retrieval
Log-structured storage engines vs page-oriented storage engines. Log: An append-only sequence of records. Well-chosen indexes speed up read queries, but every index slows down writes. Hash index: An in-memory hash map where every key is mapped to a byte offset in an append-only data file, the location at which the value can be found. Compaction: Throwing away duplicate keys of the append-only log, and only keeping the most recent update for each key. Several segments can also be merged together during compaction. Binary log format: Encodes the length of a string in bytes, followed by the raw string. Tombstone: A special record telling the merging process to discard previous values for a deleted key. Crash recovery: Storing a snapshot of each segment's hash map on disk. Partially written records avoided using checksums. Concurrency control by using a single writer thread (and multiple reader threads). An append-only log offers performance and simple concurrency and crash recovery. SSTable: Sorted string table: The sequence of key-value pairs is sorted by key. Memtable: An in-memory balanced tree structure. Stored to disk as an SSTable file whenever it grows bigger than some threshold. LSM-trees: Keep a cascade of SSTables that are merged in the background. Log-structured indexes break the database down into variable-size segments. B-tree indexes break the database down into fixed-size blocks (pages). Pages refer to other pages, from a root to the leaves. The number of references to child pages in one page is called the branching factor. A B-tree with n keys always has a depth of O(logn). A write-ahead log (WAL or redo log) logs B-tree modifications before they are applied, to aid with crash recovery. Latches (lightweight logs) aid with concurrency control, by merging in the background and atomically swapping segments. LSM-trees are faster for writes, whereas B-trees are faster for reads. Write amplification: One database write results in multiple disk writes over the course of the database's lifetime. Values can be stored in the index or in a heap file. R-trees are spatial indexes commonly used for multi-dimensional indexing. For full-text search and fuzzy indexing: Edit distance of 1 means one letter has been added, removed, or replaced. Transaction processing (OLTP): Allowing clients to make low-latency reads and writes. Batch processing (OLAP): Run periodically (for example, once per day). Extract-Transform-Load (ETL): Process of getting OLTP data to a data warehouse for OLAP. Star and snowflake schemas for analytics, with fact table in the center and dimension tables around it. OLTP: Row-oriented storage OLAP: Column-oriented storage Tight loop: No function calls. Rows are reconstructed by knowing that the kth item in one column belongs to the same row as the kth item in another column. Materialized view vs standard (virtual) view. Data cube (or OLAP cube): A grid of aggregates grouped by different dimensions.
Chapter 4: Encoding and Evolution
Rolling upgrade, i.e., staged rollout. Backward compatibility: Newer code can read data written by older code. Forward compatibility: Older code can read data written by newer code. Encoding, i.e., serialization, or marshalling. Decoding, i.e., deserialization, unmarshalling, or parsing. As long as people agree on what the data interchange format is, it often doesn't matter how pretty or efficient the format is. The difficulty of getting different organizations to agree on anything outweighs most other concerns. Language-specific formats. JSON, XML, and Binary variants. Thrift and Protocol Buffers. Avro (addresses combatibility via reader's and writer's schema). Dataflow via databases, service calls (REST and RPC), or asynchronous message passing. Data outlives code. XMLHttpRequest used for Ajax. SOA/microservices enable encapsulation. REST (HTTP-inspired) vs SOAP (XML-based) OpenAPI/Swagger to describe and document RESTful APIs. RPC's approach of trying to make network requests look like local function calls runs against their many significant differences. Custom RPC protocols with binary encoding formats can achieve better performance than generic JSON over REST, but sacrifice the debugging ease and ecosystem support. Message-passing via message brokers or distributed actor frameworks. Sending messages via an intermediary (message broker aka message queue aka message-oriented middleware), which stores the message temporarily. Message brokers are asynchronous. Senders normally don't expect responses. If they do need them, they receive them on a separate channel. One process sends a message to a named queue or topic, and the broker ensures that the message is delivered to one or more consumers of or subscribers to that queue or topic. Can have multiple producers and consumers on the same topic. Unknown fields need to be preserved for forward compatibility. Distributed actor frameworks integrate message brokers with the actor programming model.
Chapter 5: Replication
Distributed data for scalability, fault tolerance/high availability, disconnected operation, latency. Scaling up vs scaling out. Replication vs partitioning. Single-leader, multi-leader, and leaderless replication. Replication log or change stream flows from leader to followers. Synchronous vs asynchronous replication In practice, synchronous replication usually means that one of the followers is synchronous and the rest are asynchronous (semi-synchronous configuration). Setting up new followers involves a snapshot and processing changes since (via replication log, e.g., log sequence number in PostgreSQL, or binlog coordinates in MySQL). Failover: Determining leader has failed, choosing new leader, reconfiguring system to use new leader. Split brain if two nodes believe are both leaders and accept writes. Fencing or STONITH to shut down one node if two leaders are detected. Replication logs implemented via statement-based replication, write-ahead log (WAL) shipping, logical (row-based) log replication, trigger-based replication. Dealing with replication lag: Read your own writes, monotonic reads, consistent prefix reads. Handling write conflicts in multi-leader replication: Synchronous vs asynchronous conflict detection, conflict avoidance, convergence to consistency, custom conflict resolution logic on write or read, automatic conflict resolution. Multi-leader replication topologies, e.g, circular, star, all-to-all. Leaderless replication by having both writes and reads go to multiple nodes in parallel and use quorum. Read repair and anti-entropy to catch up with missed writes. Quorum: write acks + read acks > number of replicas Handling concurrent writes: Last write wins, happens before relationship, merging, version vectors.
Chapter 6: Partitioning
Partition data for scalability. Partitioning can be combined with replication. Aim to avoid hot spots and retrieve data without querying all nodes. Partitioning by key range or by hash of key. Secondary indexes can be partitioned using local or global indexes. Rebalancing partitions by mod would move data around excessively. Rebalancing can be automatic or manual. Requests can be routed by forwarding, by a routing tier, or by the client.
Chapter 7: Transactions
ACID: Atomicity, Consistency, Isolation, Durability. Single- and multi-object operations. Dirty reads: Read data before they are committed. Dirty writes: Overwrite data before they are committed. Read skew: See different parts of the database at different points in time. Commonly prevented with snapshot isolation, implemented with multi-version concurrency control (MVCC). Lost updates: Overwrite another write. Can be prevented via a manual lock (select for update). Write skew: Write based on an outdated read. Prevented by serializable isolation. Phantom read: Write making a read outdated. Can be prevented by index-range locks. Serializable isolation protects against concurrency problems but at a high performance cost: Serially execute transactions, two-phase locking, serializable snapshot isolation. Snapshot isolation: Readers and writers don't block each other. Reading requires no lock. Writing requires a lock. Maintain several versions of objects in use. Two-phase locking: Readers and writers block each other. Reading requires a shared lock. Writing requires an exclusive lock.
Chapter 8: The Trouble with Distributed Systems
Unlike single computers, distributed systems are susceptible to partial (nondeterministic) failures. Cloud computing vs supercomputing. You can build a reliable system from unreliable components (e.g., error-correcting codes, or TCP). Asynchronous packet networks are susceptible to unreliability of message delivery. Short timeouts may detect faults faster but may incorrectly declare a node dead. Synchronous networks could provide predictable delays but by decreasing resource utilization, i.e., increasing cost. Monotonic vs time-of-day clocks. Clock drift makes it dangerous to rely on timestamps for ordering events or global snapshots. Clock readings have a confidence interval. Process pauses may interfere with leadership leases. Quorum can determine the truth. Lock can identify the leader. The server can use fencing to confirm that the lock a client presents is still valid. Timing system models: Synchronous, partially synchronous, asynchronous. Failure system models: Crash-stop faults, crash-recovery faults, byzantine (arbitrary) faults. Safety: Nothing bad happens. Liveness: Something good eventually happens.
Chapter 9: Consistency and Consensus
Linearizability (atomic consistency, strong consistency, immediate consistency, external consistency): Make a system appear as if there were only one copy of the data and all operations on it are atomic. Serializability: Guarantees that transactions behave the same as if they had executed in some serial order. Linearizability useful for locking and leader election, constraints and uniqueness guarantees, and cross-channel timing dependencies. Single-leader replication: Potentially linearizable. Consensus algorithms: Linearizable. Multi-leader replication: Not linearizable. Leaderless replication: Probably not linearizable. CAP Theorem: Consitency, Availability, Partition tolerance: Pick 2 out of 3. In practice, no choice over network partitions, so choice is between consistency or availability when network partitions occur. Often linearizability is dropped in favor of performance. Linearizability: Enforces total order of operations. Causality: Enforces partial order of operations that are causally related. Sequence numbers or timestamps from logical clocks can be used to order events. Lamport timestamps generate sequence numbers consistent with causality: Pairs of (counter, node ID). Total order broadcast helps to identify when the total ordering of operations is finalized. Total order broadcast requires reliable delivery and totally ordered delivery. Consensus services like ZooKeeper and etcd implement total order broadcast. Consensus required for leader election, as well as atomic commits. Two-Phase commit splits commit in two steps, using a coordinator. XA (eXtended Architecture) is a standard for implementing two-phase commit across heterogeneous technologies, via a network driver or client library. A consensus algorithm must satisfy: Uniform agreement, Integrity, Validity, and Termination. Common fault-tolerant consensus algorithms: Viewstamped Replication (VSR), Paxos, Raft, and Zab. Membership and coordination services like ZooKeeper and etcd implement total order broadcast (and hence consensus), but also other useful features: Linearizable atomic operations, total ordering of operations, failure detection, and change notifications.
Chapter 10: Batch Processing
Services (online systems), Batch processing systems (offline systems), Stream processing systems (near-real-time systems). Batch processing with Unix tools: Each program does one thing well, the output of every program can become the input to another, design and build software to be tried early, use tools instead of unskilled help. MapReduce: Mapper extracts keys and values from input record. Reducer takes the key-value pairs produced by the mapper, aggregates per key, and iterates over the collections of values. MapReduce can parallelize execution across machines. MapReduce treats inputs as immutable, and allows retries overwriting incomplete outputs. Hadoop is a MapReduce framework over a distributed filesystem (HDFS). Massively Parallel Processing (MPP) databases focus on parallel execution of analytic SQL queries on a cluster of machines, whereas MapReduce and a distributed filesystem are more general-purpose. Higher-level programming models (Pig, Hive, Cascading, Crunch) are abstractions on top of MapReduce. Dataflow engines for distributed batch computations (Spark, Tez, Flink) handle an entire workflow as one job, instead of breaking it up into independent subjobs. Bulk Synchronous Paralle (BSP) model of computation (Apache Giraph, Spark GraphX API, Flink Gelly API, Google Pregel) optimizes for batch processing graphs. Distributed batch processing frameworks solve partitioning, and fault tolerance. Join algorithms for MapReduce: Sort-merge join, broadcast hash join, partitioned hash join. Batch processing deals with bounded input data. Stream processing deals with unbounded input data.
Chapter 11: Stream Processing
Stream: Data that is incrementally available over time. Event: A small, self-contained, immutable object containing the details of something that happened at some point in time. Events generated by producers (publishers, senders), and potentially processed by multiple consumers (subscribers, recipients). Related events grouped together into a topic or stream. For continual processing with low delays, polling a datastore for updates is inefficient, and triggers are limited in what they can do. Messaging systems: Producers send messages containing events, which are then pushed to consumers (publish/subscribe). When producers are faster than consumers: Drop messages, buffer messages in a queue, or apply backpressure (flow control, blocking the producer from sending more messages). When nodes go offline: Messages may be lost, or not (via disk writing or replication). Direct messaging from producers to consumers vs message brokers. AMQP/JMS-style message brokers: Message queues; servers that producers and consumers connect to. Load balancing: Sharing the work of consuming a topic among consumers. Fan-out: Delivering each message to multiple consumers. Log-based message brokers: Partitioned logs; producers send by appending to the log, consumers receive by reading the log sequentially. Change data capture (CDC): Observing all data changes written to a database and extracting them in a form that can be replicated to other systems. Processing streams: Store in a database, push to users, produce output based on input. Complex event processing (CEP): Store queries and continuously process events to match specific patterns. Materialized views: Keeping derived data systems up to date. Stream analytics: Computing windowed aggregations, e.g., measure rate of some type of event (how often per time interval), calculate rolling average of a value over some time period, compare current statistics to previous time intervals. Stream analytics can use probabilistic algorithms: Bloom filters for set membership, HyperLogLog for cardinality estimation, percentile estimation algorithms. Event time vs processing time. Straggler events can be ignored, or corrections can be published. Tumbling window: Fixed length. Hopping window: Fixed length, but overlapping. Sliding window: Events occurring within some interval of each other. Session window: No fixed duration, group events for the same user occurring closely together in time. Joins: Stream-stream (window join), stream-table (stream enrichment), table-table (materialized view maintenance). Restarting tasks may process events multiple times, but want to maintain visible effect in output as if they were processed once (exactly-once semantics). Exactly-once via microbatching and checkpointing, via atomic commits, via idempotence, or via rebuilding state after a failure.
Chapter 12: The Future of Data Systems
Deciding on a total order of events, ensuring that all consumers see messages in the same order, is known as total order broadcast and is equivalent to consensus. Federated databases: Unified reads across several different systems. Unbundled databases: Unified writes across several different systems. Big advantage of log-based integration is loose coupling between components, both at system-level and human-level. Separation of Church (lambda calculus, i.e., computation) and state. Separate stateless application logic from state management (database): No application logic in the database and no persistent state in the application. Data flow systems can achieve better fault tolerance and better performance compared to microservices: Use one-directional asynchronous message streams rather than synchronous request/response interactions. Caches, indexes, and materialized views allow more work on the write path, by precomputing results, to save effort on the read path. Offline-first applications use a local database and sync with remote servers in the background when a network connection is available. Server-sent events (EventSource API) and WebSockets provide communication channels for a server to push messages to the browser. End-to-end argument: Consider the end-to-end flow of a request, using a unique ID, across the application and the database. Consistency involves timeliness and integrity. Violations of timeliness are eventual consistency, violations of integrity are perpetual inconsistency. We can relax the timeliness constraint (and avoid coordination) by compensating for violations after the fact, if acceptable by the business. If you have enough devices running your software, even very unlikely things do happen. Self-validating or self-auditing systems trust but verify: Assume that things work correctly most of the time, but not always. End-to-end checks are best: The more systems included in an integrity check, the fewer opportunities for corruption to go unnoticed at some stage of the process. Continuous end-to-end integrity checks increase confidence, which increases development speed. Cryptographic auditing and integrity checking often relies on Merkle trees, which are trees of hashes that can be used to efficiently prove that a record appears in some dataset. Certificate transparency, to check the validity of TLS/SSL certificates also relies on Merkle trees, in addition to cryptocurrencies. Engineers building systems have a responsibility to carefully consider the consequences and consciously decide what kind of world we want to live in. Treat data with humanity and respect. Algorithmic decision-making and predictive analytics can systematically and arbitrarily exclude a person from participating in society without any proof of guilt and with little chance of appeal. In addition to automating the decision making process, we automate the process of inferring rules. Systematic bias in input will likely lead to amplified bias in output. E.g., postal code or IP address can be strong predictor of race. Machine learning can be like money laundering for bias. Data-driven decision making will require making algorithms accountable and transparent, avoiding reinforcing existing biases, and fixing biases when inevitable mistakes happen. Surveillance captures tracking of users in the interest of other entities. Declining to use a service is a privilege for people that have the time and knowledge to understand its privacy policy and can afford missing out on the benefits of participation. Having privacy does not mean keeping everything secret, it means having the freedom to choose which things to reveal to who. Surveillance infrastructure transfers privacy rights from the object of surveillance (user) to the data collector. When collecting data, we need to consider not just today's political environment, but all possible future governments. We should not retain data forever, but purge it as soon as it is no longer needed.

tsr home
cd /home