Table of Contents
Psst. Do you already know segmented logs well? If yes, jump here.
Prologue: The Log 📃🪵
First, let's clarify what we mean by a "log" in this context. Here, log refers to an append only ordered collection of records where the records are ordered by time.
Since the records are ordered by time, the log's record indices can be thought of as timestamps, with the convenient property of being decoupled from actual wall-clock time.
The log indices effectively behave as Lamport clock timestamps.
A lamport clock is a logical counter to establish causality between two events. Since it's decoupled from wall-clock time, it's used in distributed-systems for ordering events.
So, why do we care about these kinds of logs? Why are they useful?
Log Usage: An example scenario
There's a teacher and a set of students in a classroom. The teacher wants to hold an elementary arithmetic lesson.
The teacher makes every student write down a particular initial number. (e.g 42). Next, the teacher plans to give a sequence of instructions to the students. The teacher may give the following kinds of instructions:
- Add x; where x is a number
- Sub x; where x is a number
- Mul x; where x is a number
- Div x; where x is a non-zero number
For every instruction, the students are to apply the instruction to their current number, calculate the new number, and write down the new number as their current number. The students are to continue this process for every instruction, till the teacher finishes giving instructions. They are then required to submit the final calculated number to the teacher.
So for instance, if the current number is 42, and the received instruction is
Add 3
, the student calculates:result = Add 3 (current_number) = current_number + 3 = 42 + 3 = 45 current_number = result ;; 45
Next if the student receives,
Div 5
:result = Div 5 (current_number) = current_number / 5 = 45 / 5 = 9 current_number = result ;; 9
Now, if all students start from the same initial number, and apply the same sequence of operations on their number, they are bound to come to the same result! So the teacher also gives the students a self grading lesson by telling the final number at the end of the class. If the student got the same final number, they scored full marks.
Notice that the students must follow the following to get full marks:
- Start from the same correct initial number
- Apply all the operations correctly in the given order, without random mistakes in the correct pre-determined way.
With computer science, we can model the students as deterministic state machines, the students' current_number
as their internal state, and the instructions as inputs to the state machines. From here, we can say the following:
If different identical deterministic state machines start from the same initial state, and receive the same set of inputs in the same order, they will end in the same state.
We call this the state machine replication principle.
State machine replication
Now there's a new problem: the last two rows of students can't hear the teacher properly. What do they do now? The teacher needs a solution that enables them to give the same set of instructions to the backbenchers without actually giving them the answers.
The solution here is to give the instructions in writing. However, in an exercise to foster teamwork between their students, the teacher delegates the instruction sharing task to the students.
The students come up with the following solution:
They first write down the instructions sequentially on sheets of paper, and then perform the calculations separately on their own private sheet. When they are done writing on a sheet of paper with the instructions, they only share the sheet containing the instructions. They never share their private sheet. After passing a sheet full of instructions, they start writing the subsequent instructions on a new sheet of paper.
The backbenchers are able to receive the same set of instructions in the same sequence through the sheets. They perform the necessary calculations on their own private sheet starting from the same initial number using the instructions from the received sheets.
If we inspect carefully, this mechanism of sharing the instructions through the sheets behaves like a log. The private sheets act as the internal states. The collection of sheets collectively act as a log.
Now, in our case, because the backbenchers receive the same set of instructions in the same sequence, they go through the same set of internal states in the same sequence and arrive at the same final state. They effectively replicate the front and middle-benchers. Since, we can model students as state machines, we effectively did state machine replication with a log.
Finally, since the backbenchers receive the instructions through the log and not directly from the teacher, they lag behind a bit but eventually arrive at the same results. So we can say there is a replication lag.
These concepts directly translate to distributed systems. Consider this:
There is a database partition in a distributed database responsible for a certain subset of data. When any data manipulation queries are routed to it, it has to handle the queries. Instead of directly committing the effect of the queries on the underlying storage, it first writes the operations to be applied on the local storage, one-by-one into a log called the "write-ahead-log". Then it applies the operations from the write ahead log to the local storage.
In case there is a database failure, it can re-apply the operations in the write-ahead-log from the last committed entry and arrive at the same state.
When this database partition needs to replicate itself to other follower partitions, it simply replicates the write-ahead-log to the followers instead of copying over the entire materialized state. The followers can use the same write ahead log to arrive to the same state as the leader partition.
Now the follower partitions, have to receive the write-ahead-log first over the network. Only then can they apply the operations. As a result they lag behind the leader. This is replication lag.
Asynchronous processing
Now in the same classroom scenario, what happens when a student is absent? Turns out they still need to do the assignment.
The backbenchers come to the rescue! They share the ordered sheets of instructions and the initial number with the student in distress. The student gleefully applies the instructions on the number, arrives at the same results and scores full marks.
However, notice what happened here: The student did the assignment in a completely out of sync or asynchronous way with respect to the other students.
Logs enable asynchronous processing of requests.
Message queues provide a convenient abstraction over logs to enable asynchronous processing. A server might not be able to synchronously handle all requests due to lack of resources. So instead, it can buffer the requests in a message queue. A different server can then pick up the requests one-by-one and handle them.
Now it's not necessary that all the requests have to be handled by the same server. Because the log is shared, different servers may choose to share the load with each other. In this case, the requests are distributed between the servers to load-balance the requests. (Provided that there is no causal dependency between the requests.)
For instance, a simple scheme might be: If there are N servers, the server for a particular request is decided with request.index % N
.
If you want to read more about the usage of logs in distributed systems, read Jay Krep's (co-creator of Apache Kafka) excellent blog post on this topic here.
Segmented Log 🪚🪵
It might come as a surpise, but we have already come across a segmented log in the previous example.
Introduction
In the previous example, the collection of sheets containing the instructions collectively behaved as a log. We can also say that the instruction log was segmented across the different sheets of paper.
Call the individual sheets of paper segments. The collection of sheets can now be called a segmented log.
Let's go back to the log. At the end of the day a log is sequential collection of elements. What's the simplest data structure we can use to implement this?
An array.
However, we need persistence. So let's use a file based abstraction instead.
We can quite literally map a file to a process's virtual memory address space using the
mmap()
system call and then use it like an array, but that's a topic for a different day.
Since our file based abstraction needs to support an append only data-structure, it internally sequentially writes to the end of the internal file. Assume that this abstraction allows you to uniquely refer to any entry using it's index.
Now, this setup has some problems:
- All entries are sequentially written to a single large file
- A single large file is difficult to store, move and copy
- Few bad sectors in the underlying disk can make the whole file unrecoverable. This can render all stored data unusable.
The logical next step is to split this abstraction across multiple smaller units. We call these smaller units segments.
In this solution:
- The record index range is split across smaller units called segments. The index ranges of different segments are non-overlapping.
- Each segment individually behaves like a log
- For each segment we maintain an entry:
segment
. Thissegment
entry stores the index range serviced by it along with a handle to the underlying file - We keep the
segment
entries sorted by their starting index - The first n - 1 segments are called read segments. Their
segment
entries are stored in a vector calledread_segments
- The last segment is called the write segment. We assign it's
segment
entry towrite_segment
.
Write behaviour:
- All writes go to the
write_segment
- Each
segment
has a threshold on it's size - When the
write_segment
size exceeds it's threshold:- We close the
write_segment
- We reopen it as a read segment.
- We push back the newly opened read segment
segment
entry to the vectorread_segments
. - We create a new
segment
with it index range starting after the end of the previous write segment. Thissegment
is assigned towrite_segment
- We close the
Read behaviour (for reading a record at particular index):
- Locate the
segment
where the index falls within thesegment
's index range. Look first in theread_segments
vector, fall back towrite_segment
- Read the record at the given index from the located
segment
Original description in the Apache Kafka paper
This section presents the segmented_log
as described in the Apache Kafka paper.
Simple storage: Kafka has a very simple storage layout. Each partition of a topic corresponds to a logical log. Physically, a log is implemented as a set of segment files of approximately the same size (e.g., 1GB). Every time a producer publishes a message to a partition, the broker simply appends the message to the last segment file. For better performance, we flush the segment files to disk only after a configurable number of messages have been published or a certain amount of time has elapsed. A message is only exposed to the consumers after it is flushed.
Unlike typical messaging systems, a message stored in Kafka doesn’t have an explicit message id. Instead, each message is addressed by its logical offset in the log. This avoids the overhead of maintaining auxiliary, seek-intensive random-access index structures that map the message ids to the actual message locations. Note that our message ids are increasing but not consecutive. To compute the id of the next message, we have to add the length of the current message to its id. From now on, we will use message ids and offsets interchangeably.
A consumer always consumes messages from a particular partition sequentially. If the consumer acknowledges a particular message offset, it implies that the consumer has received all messages prior to that offset in the partition. Under the covers, the consumer is issuing asynchronous pull requests to the broker to have a buffer of data ready for the application to consume. Each pull request contains the offset of the message from which the consumption begins and an acceptable number of bytes to fetch. Each broker keeps in memory a sorted list of offsets, including the offset of the first message in every segment file. The broker locates the segment file where the requested message resides by searching the offset list, and sends the data back to the consumer. After a consumer receives a message, it computes the offset of the next message to consume and uses it in the next pull request.
The layout of an Kafka log and the in-memory index is depicted in Figure 2. Each box shows the offset of a message.
The main difference here is that instead of referring to records with a simple index, we refer to it with a logical offset. This is important because the offset is dependent on the record sizes. The offset for next record has to be calculated as the sum of current record offset and current record size.
A segmented_log
implementation
The code for this section is in this repository: https://github.com/arindas/laminarmq/ More specifically, in the storage module.
While I would love to discuss testing, benchmarking and profiling, this blog post is becoming quite lengthy. So, please look them up on the repository provided above.
Note: Some of the identifier names might be different on the repository. I have refactored the code sections here to improve readability on various devices. Also there are more comments here to make it easier to understand.
Implementation outline
Fig: Data organisation for persisting a segmented_log
on a *nix
file system.
A segmented log is a collection of read segments and a single write segment. Each "segment" is backed by a storage file on disk called "store". The offset of the first record in a segment is the base_offset
.
The log is:
- "immutable", since only "append", "read" and "truncate" operations are allowed. It is not possible to update or delete records from the middle of the log.
- "segmented", since it is composed of segments, where each segment services records from a particular range of offsets.
All writes go to the write segment. A new record is written at write_segment.next_offset
.
When we max out the capacity of the write segment, we close the write segment and reopen it as a read segment. The re-opened segment is added to the list of read segments. A new write segment is then created with base_offset
equal to the next_offset
of the previous write segment.
When reading from a particular offset, we linearly check which segment contains the given read segment. If a segment capable of servicing a read from the given offset is found, we read from that segment. If no such segment is found among the read segments, we default to the write segment. The following scenarios may occur when reading from the write segment in this case:
- The write segment has synced the messages including the message at the given offset. In this case the record is read successfully and returned.
- The write segment hasn't synced the data at the given offset. In this case the read fails with a segment I/O error.
- If the offset is out of bounds of even the write segment, we return an "out of bounds" error.
Enhancements to the design to enable streaming writes
While the conventional segmented_log
data structure is quite performant for a commit_log
implementation, it still requires the following properties to hold true for the record being appended:
- We have the entire record in memory
- We know the record bytes' length and record bytes' checksum before the record is appended
It's not possible to know this information when the record bytes are read from an asynchronous stream of bytes. Without the enhancements, we would have to concatenate intermediate byte buffers to a vector. This would not only incur more allocations, but also slow down our system.
Hence, to accommodate this use case, we introduced an intermediate indexing layer to our design.
//! Index and position invariants across segmented_log
// segmented_log index invariants
segmented_log.lowest_index = segmented_log.read_segments[0].lowest_index
segmented_log.highest_index = segmented_log.write_segment.highest_index
// record position invariants in store
records[i+1].position = records[i].position + records[i].record_header.length
// segment index invariants in segmented_log
segments[i+1].base_index = segments[i].highest_index
= segments[i].index[index.len-1].index + 1
Fig: Data organisation for persisting a segmented_log
on a *nix
file system.
In the new design, instead of referring to records with a raw offset, we refer to them with indices. The index in each segment translates the record indices to raw file position in the segment store file.
Now, the store append operation accepts an asynchronous stream of bytes instead of a contiguously laid out slice of bytes. We use this operation to write the record bytes, and at the time of writing the record bytes, we calculate the record bytes' length and checksum. Once we are done writing the record bytes to the store, we write it's corresponding record_header
(containing the checksum and length), position and index as an index_record
in the segment index.
This provides two quality of life enhancements:
- Allow asynchronous streaming writes, without having to concatenate intermediate byte buffers
- Records are accessed much more easily with easy to use indices
Now, to prevent a malicious user from overloading our storage capacity and memory with a maliciously crafted request which infinitely loops over some data and sends it to our server, we have provided an optional append_threshold
parameter to all append operations. When provided, it prevents streaming append writes to write more bytes than the provided append_threshold
.
As we write to segments, the remaining segment capacity is used as the append_threshold
. However record bytes aren't guaranteed to be perfectly aligned to segment_capacity
.
At the segment level, this requires us to keep a segment_overflow_capacity
. All segment append operations now use:
append_threshold = segment_capacity - segment.size + segment_overflow_capacity
A good segment_overflow_capacity
value could be segment_capacity / 2
.
Component implementation
We now proceed in a bottom-up fashion to implement the entirety of an "indexed segmented log".
AsyncIndexedRead
(trait)
If we notice carefully, there is a common thread between Index
, Segment
and even SegmentedLog
as a whole. Even though they are at different levels in the compositional hierarchy, the share some similar traits:
- They allow reading items from specific logical indices
- They have a notion of highest index and lowest index
- The read operation has to be asynchronous in nature to support both in-memory and on-disk storage mechanisms.
Let's formalize these notions:
/// Collection providing asynchronous read access to an indexed set of records (or
/// values).
AsyncTruncate
(trait)
Now each of our components need to support a "truncate" operation, where everything sequentially after a certain "mark" is removed. This notion can be expressed as a trait:
/// Trait representing a truncable collection of records, which can be truncated after a
// "mark".
AsyncConsume
(trait)
Next, every abstraction related to storage needs to be safely closed to persist data, or be removed all together. We call such operations "consume" operations. As usual, we codify this general notion with a trait:
/// Trait representing a collection that can be closed or removed entirely.
Sizable
(trait)
Every entity capable of storing data needs a mechanism for measuring it's storage footprint i.e size in number of bytes.
/// Trait representing collections which have a measurable size in number of bytes.
Storage
(trait)
Finally, we need a mechanism to read and persist data. This mechanism needs to support reads at random positions and appends to the end.
"But Arindam!", you interject. "Such a mechanism exists! It's called a file.", you say triumphantly. And you wouldn't be wrong. However we have the following additional requirements:
- It has to be cross platorm and independent of async runtimes.
- It needs to provide a simple API for random reads without having to seek some pointer.
- It needs to support appending a stream of byte slices.
Alright, let's begin:
;
// ... impl Display for StreamUnexpectedLength ...
/// Trait representing a read-append-truncate storage media.
// ...where's streaming append?
First, let's unpack what's going on here:
- We support a notion of
Content
, the slice of bytes that is read - We also have a notion for
Position
to identify the position of reads and appends. - First, we have a simple
append_slice()
API the simply appends a given slice of bytes to the end of the storage. It returns the position at which slice was written, along with the number of bytes written. - Next, we have a
read()
API for reading a slice of bytes of a particularsize
from a particularposition
. It returns aContent
, the associated type used to represent slice of bytes that are read from this storage. - Every operation here is fallible. So errors and
Result
(s) are natural. However, what's up withStreamUnexpectedLength
? Keep that in mind for now. - Our storage can be truncated. We inherit from
AsyncTruncate
. We treatPosition
as the truncation mark. - Our storage is also consumable. We inherit from
AsyncConsume
forclose()
andremove()
- Finally, our storage has a notion of size with
Sizable
. We use ourPosition
type for representing sizes.
Now, we need to support streaming appends with the existing methods.
Let's begin by asking ourselves, what exactly are the arguments here?
A stream of byte slices. How do we represent that?
Let's start with just a slice. Let's call it XBuf
. The bound for that is simple enough:
XBuf:
Now we need a stream of these. Also note that we need the reading of a single item from the stream to also be fallible.
First let's just consider a stream of XBuf
. Let's call the stream X
:
X:
Now, to let's consider an error type XE
. To make every read from the stream fallible, X
needs the following bounds:
X:
Now our stream needs to be Unpin
so that it we can safely take a &mut
reference to it in our function.
This blog post: https://blog.cloudflare.com/pin-and-unpin-in-rust/, goes into detail about why
Pin
andUnpin
are necessary. Also don't forget to consult the standard library documentation:
pin
module: https://doc.rust-lang.org/std/pin/index.htmlPin
struct: https://doc.rust-lang.org/std/pin/struct.Pin.htmlUnpin
marker trait: https://doc.rust-lang.org/std/marker/trait.Unpin.html
Apart from our Stream
argument, we also need a upper bound on the number of bytes to be written. A Stream
can be infinite, but unfortunaly, computer storage is not.
Using the above considerations, let us outline our function:
// ... inside Storage trait
async
When append_threshold
is None
, we attempt to exhaustively read the entire stream to write to our storage. If it's Some(thresh)
, we only write upto thresh
bytes.
Let's proceed with our implementation:
// ... inside Storage trait
async
We maintain a counter for the number of bytes already written: bytes_written
. For every byte slice read from the stream, we check if it can be accomodated in our storage in accordance with the append_threshold
. If not, we error out.
We also keep the write position around in pos
. It is simply the size of this storage before we append anything.
Now, let's try to append it:
// ... inside Storage::append
while let Some = buf_stream.next.await
That's reasonable. We append if possible, and propagate the error. Continuing...
// ... inside Storage::append
while let Some = buf_stream.next.await
So for every byte slice, we add the number of bytes in it to bytes_written
if everything goes well. However, if anything goes wrong:
- We rollback all writes by truncating at the position before all writes, stored in
pos
. - We return the error encountered.
Finally, once we exit the loop, we return the position at which the record stream was written, along with the total bytes written:
} // ... end of: while let Some(buf) = buf_stream.next().await {...}
Ok
} // ... end of Storage::append
We can coalesce the match blocks together by inilining all the results. Putting everything all together:
/// Error to represent undexpect stream termination or overflow, i.e a stream
/// of unexpected length.
;
/// Trait representing a read-append-truncate storage media.
Now to answer one of the initial questions, we needed StreamUnexpectedLength
as a sentinel error type to represent the error case when the stream unexpectedly errors out while reading, or has more bytes in total than our append_threshold
.
A sample Storage
impl.
Let's explore a tokio::fs::File
based implementation of Storage
:
First, let's outline our struct:
TokioFile
is an alias fortokio::fs::File
. It's defined in a use directive in the source.
We need a buffered writer over our file to avoid hitting the file too many times for small writes. Since our workload will largely be composed of small writes and reads, this is important.
The need for RwLock
will be clear in a moment.
Next, let's proceed with the constructor for this struct:
As you can see, for the underlying storage file, we enable the following flags:
write
: enables writing to the fileappend
: new writes are appended (as opposed to truncating the file before writes)create
: if the file doesn't exist, it's createdread
: enable reading from the file
Now before we implement the Storage
trait for StdSeekReadFileStorage
, we need to implement Storage
's inherited traits. Let's proceed one by one.
First, we have Sizable
:
Next, we implement AsyncTruncate
:
We also need AsyncConsume
:
With the pre-requisites ready, let's proceed with our Storage
impl:
Notice that the Storage
trait uses a &self
for Storage::read
. This was to support idempotent operations.
In our case, we need to update the seek position of the file, from behind a &self
. So we need some interior mutability to achieve this. Hence the RwLock
.
Hopefully, the need for the RwLock
is clear now. In retrospect, we could also have used a Mutex
but using a RwLock
keeps the option of multiple readers for read only operations open.
Note that, the read operation is still idempotent as we restore the old file position after reading.
Record
(struct)
Before we move on to the concept of a CommitLog
, we need to abstract a much more fundamental aspect of our implementation. How do we represent the actual "records"?
Let's see... a record in the most general sense needs only two things:
- The actual value to be contained in the record
- Some metadata about the record
So, we express that directly:
CommitLog
(trait)
Now, with the abstractions presented above, we are ready to express the notion of a CommitLog
. The properties of a CommitLog
are:
- It allows reading records from random indices
- Naturally it has some index bounds
- It allows appending records which may contain a stream of byte slices as it value.
- It can be truncated at a specific index.
- It supports
close()
andremove()
operations to safely persist or remove data respectively.
All of these properties have already been represented with the traits above. We now use them to define the concept of a CommitLog
:
Optionally, a CommitLog
implementation might need to remove some records that are older by a certain measure of time. Let's call them expired records. So we provide a function for that in case different implementations need it.
Index
(struct)
Let's start with our first direct component of our indexed segmented log, the Index
.
First, we need to answer two primary questions:
- What kind of data are we storing?
- In what layout will we store the said data?
So recall, at a high level, an Index
is logical mapping from indices to byte positions on storage.
So this at least has store 2-tuples of the form: (record_index, record_position)
Now we need two additional data points:
- Number of bytes in the record i.e
record_length
- A checksum of the contents of the record, e.g. crc32
These two datapoints are essential to verify if the record data is valid or corrupted on the storage media. ("storage media" = Storage
trait impl.)
So we arrive at this 4-tuple: (checksum, length, index, position)
Let's call this an IndexRecord
.
Now, an Index
stores index records sequentially. So:
index_record[i+1].index = index_record[i].index + 1
Now, all these 4-tuples need the exact same number of bytes to be stored. Let's call this size IRSZ
. If the index records are laid out sequentially:, every index record will be at a position which is an integral multiple of IRSZ
:
## storage (Index)
(checksum, length, index, position) @ storage::position = 0
(checksum, length, index, position) @ storage::position = 1 x IRSZ
(checksum, length, index, position) @ storage::position = 2 x IRSZ
(checksum, length, index, position) @ storage::position = 3 x IRSZ
...
Note: the
position
in the tuple refers to the position of the actualRecord
inStore
.storage::position
here refers to the position within theIndex
file (Storage
impl).
Due to this property, the index can be derived from the position of the record itself. The number of records is simply:
len(Index) = size(Index) / IRSZ
Using this, we can conclude that storing the index
in each IndexRecord
is redundant.
However, an Index
can start from an arbitrary high index. Let's call this the base_index
. So if we store a marker record of sorts, the contains the base_index
, and then store all the index records after it sequentially, we can say:
// simple row major address calculation
index_record_position_i = size(base_marker) + i * IRSZ
So now we can lay out our IndexRecord
instances on storage as follows:
## storage (Index)
[base_index_marker] @ storage::position = 0
(checksum, length, position) @ storage::position = size(base_index_marker) + 0
(checksum, length, position) @ storage::position = size(base_index_marker) + 1 x IRSZ
(checksum, length, position) @ storage::position = size(base_index_marker) + 2 x IRSZ
(checksum, length, position) @ storage::position = size(base_index_marker) + 3 x IRSZ
...
Now, number of records is calculated as:
// number of records in Index
len(Index) = (size(Index) - size(base_index_marker)) / IRSZ
Now let's finalize the bytewise layout on storage:
## IndexBaseMarker (size = 16 bytes)
┌───────────────────────────┬──────────────────────────┐
│ base_index: u64 ([u8; 8]) │ _padding: u64 ([u8; 8]) │
└───────────────────────────┴──────────────────────────┘
## IndexRecord (size = 16 bytes)
┌─────────────────────────┬───────────────────────┬─────────────────────────┐
│ checksum: u64 ([u8; 8]) │ length: u32 ([u8; 4]) │ position: u32 ([u8; 4]) │
└─────────────────────────┴───────────────────────┴─────────────────────────┘
We add padding to the IndexBaseMarker
to keep it aligned with IndexRecord
.
We represent these records as follows:
Since we 32 bytes to represent positions of records, a Segment
can contain only be upto 4GiB (232 unique byte positions = 232 bytes = 4GiB). Practically speaking, segments generally don't exceed 1GB. If segments are too big, individual segments are difficult to move around. So this limit is not a problem.
We use binary encoding to store these records.
Now we could use serde and bincode to serialize these records on Storage
impls. However, since these records will be serialized and deserialized fairly often, I wanted to serialize in constant space, with a simple API.
First, let us generalize over both IndexBaseMarker
and IndexRecord
. We need to formalize an entity with the folowing properties:
- It has a known size at compile time
- It can be read from and written to any storage
We can express this directly:
Now we need a kind of SizedRecord
that can be stored on a Storage
impl. Let's call it PersistentSizedRecord
:
/// Wrapper struct to enable `SizedRecord` impls to be stored on Storage impls.
///
/// REPR_SIZE is the number of bytes required to store the inner SizedRecord.
;
Next, we implement the SizedRecord
trait for IndexBaseMarker
and IndexRecord
:
[ Quiz 💡]: We dont read or write the
_padding
bytes in ourIndexBaseMarker
SizedRecord
impl. So how is it still aligned?[ A ]: Remember that we pass in a const generic parameter
REPR_SIZE
when creating aPersistentSizedRecord
. When writing or reading, we always readREPR_SIZE
number of bytes, regardless of how we serialize or deserialize ourIndexRecord
orIndexBaseMarker
. In this case we just pass aconst usize
with value16
.
We also declare some useful constants to keep things consistent:
/// Extension used by backing files for Index instances.
pub const INDEX_FILE_EXTENSION: &str = "index";
/// Number of bytes required for storing the base marker.
pub const INDEX_BASE_MARKER_LENGTH: usize = 16;
/// Number of bytes required for storing the record header.
pub const INDEX_RECORD_LENGTH: usize = 16;
/// Lowest underlying storage position
pub const INDEX_BASE_POSITION: u64 = 0;
Before we proceed with our Index
implementation, let us do a quick back of the handle estimate on how big Index
files can be.
Every IndexRecord
is 16 bytes. So for every Record
we have 16 bytes. Let's assume that Record
sizes are 1KB
on average. Let's assume that Segment
files are 1GB
on average.
So we can calculate as follows:
1GB Segment == pow(10, 6) * 1KB Record
1 * 1KB Record => 1 * 16B IndexRecord
pow(10, 6) * 1KB Record => pow(10, 6) * 16B IndexRecord
=> 16MB Index
Or, 1GB Segment => 16MB Index (Result)
e.g. 10 * 1GB segment files => 10 * 16MB Index files = 160 MB overhead
ITB total data through 1000 segment files => 16GB overhead
Keep this calculation in mind as we proceed through our implementation.
With the groundwork ready, let's begin our Index
implementation:
Why do we need an in-memory cache in the fist place? Well IndexRecord
instances are fairly small and there are usually few of them (<= 1000
) in an Index
. A simple in-memory cache makes sense rather than hitting the storage everytime. (We could probably mmap()
but this is simple enough.)
Alright then, why make it optional?
Recall that for 1TB
of data with 1KB
record size, we end up having 16GB
of Index
overhead. It's clearly not practical allocating this amount of memory, since we expect our system to able to handle this scale.
So we make caching IndexRecord
instances optional. This would enable us to decide which Index
instances to cache based on access patterns.
For instance, we could maintain an LRUCache
of Index
instances that are currently cached. When an Index
outside of the LRUCache
is accessed, we add it to the LRUCache
. When an Index
from within the LRUCache
is accessed, we update the LRUCache
accordingly. The LRUCache
will have some maximum capacity, which decides the maximum number of Index
instances that can be cached at the same time. We could replace LRUCache
with other kinds of cache (e.g. LFUCache
) for different performance characteristics. The Index
files are still persisted on storage so there is no loss of data.
I wanted this implementation to handle
1TB
of data on a Raspberry Pi 3B. Unfortunately, it has only1GB
RAM. However, if we enforce a limit that only10
Index
instances are cached at a time (e.g. by setting theLRUCache
max capacity to10
), that would be a160MB
overhead. That would make this implementation usable on an RPi 3B, albeit at the cost of some latency.For storage, I can connect a 1TB external hard disk to the RPi 3B and proceed as usual.
Now, let's define some utilities for constructing Index
instances.
Next, we define our constructors for Index
:
Next, we define some functions for managing caching behaviour:
Some minor utilities for ease of implementation:
Now, we move on to the primary responsiblities of our Index.
First, let's implement a mechanism to read IndexRecord
instances from our Index
:
Next, we need a mechanism to append IndexRecord
instances to our Index
:
We need to be able to truncate our Index
:
Finally, we define how to close or remove our Index
:
Notice how all of the primary functions of our Index
are supported by the traits we wrote earlier.
Store
(struct)
Now that we have our Index
ready, we can get started with our backing Store
. Store
is responsible for persiting the record data to Storage
.
So remember that we have to validate the record bytes persisted using Store
with the checksum
and length
? To make it easier to work with it, we create a virtual RecordHeader
. This virtual RecordHeader
is never actually persisted, but it is computed from the bytes to be written or bytes that are read from the storage.
In a previous index-less segmented-log implementation,
RecordHeader
instances used to be persisted right before every record on theStore
. Once we moved recordposition
,checksum
andlength
metadata to theIndex
, it was no longer necessary to persist theRecordHeader
.
We only need a constructor for RecordHeader
:
Now we can proceed with our Store
implementation:
We also need mechanism for constructing IndexRecord
instances from RecordHeader
instances once the record bytes are written to the store;
Store
also has AsyncTruncate
, AsyncConsume
and Sizable
trait impls, where it delegates the implementation to the underlying Storage
impl.
Now that we have our Store
and Index
ready, we can move on to our Segment
.
Segment
(struct)
As we have discussed before, a Segment
is the smallest unit in a SegmentedLog
that can act as a CommitLog
.
In our implementation a Segment
comprises of an Index
and Store
. Here's how it handles reads and appends:
- For reads, it first looks up the
IndexRecord
inIndex
corresponding to the given record index. With theposition
,length
andchecksum
present in theIndexRecord
, it reads theRecord
serialized bytes from theStore
. It then deserialize the bytes as necessary and returns theRecord
requested. - For appends, it first serializes the given
Record
. Next, it writes the serialized bytes to theStore
. Using theRecordHeader
andposition
obtained fromStore::append
, it creates theIndexRecord
and appends it to theIndex
Now that we know the needed behaviour, let's proceed with the implementation.
First, we represent the configuration schema for our Segment
:
When these limits are crossed, a segment is considered "maxed out" and has to be rotated back as a read segment.
Next, we define our Segment
struct:
I will clarify the generic parameters in a while. However, we have an additional requirement:
We want to enable records stored in the segmented-log, to contain the record index in the metadata.
In order to achieve this, we create a new struct MetaWithIdx
and use it as follows:
Here's what I want to highlight:
- We create a struct
MetaWithIdx
to use as the metadata value used forcommit_log::Record
. - Next we create a type alias
commit_log::segmented_log::Record
which uses theMetaWithIdx
struct for metadata.
Why am I saying all this? Well I did need to clarify the module structure a bit, but there's another reason.
Let's go back to our Segment
struct and describe the different generic parameters:
S
:Storage
impl used forIndex
andStore
M
: Metadata used as generic parameter to theMetaWithIdx
struct (this is why I needed to explain howMetaWithIdx
fits in first)H
:Hasher
impl used for computing checksumsIdx
: Type to represent primitive used for for representing record indices (u32
,usize
etc.)Size
: Type to represent record sizes in bytes (u64
,usize
etc)SERP
:SerializationProvider
impl used for serializing metadata.
So here's what the SerializationProvider
trait looks like:
use Deref;
use ;
/// Trait to represent a serialization provider.
It's used to generalize over different serde
data formats.
Now, we need a basic constructor for Segment
:
Next, we express the conditions for when a segment is expired or maxed out.
Before, we proceed with storing records in our segment, we need to formalize the byte layout for serialized records:
// byte layout for serialzed record
┌───────────────────────────┬──────────────────────────────┬───────────────────────────────────────────────┐
│ metadata_len: u32 [u8; 4] │ metadata: [u8; metadata_len] │ value: [u8; record_length - metadata_len - 4] │
└───────────────────────────┴──────────────────────────────┴───────────────────────────────────────────────┘
├────────────────────────────────────────── [u8; record_length] ───────────────────────────────────────────┤
As you can see, serialized record has the following parts:
metadata_len
: The number of bytes required to represent serializedmetadata
. Stored asu32
in4
bytes.metadata
: The metadata associated with the record. Stored inmetadata_len
bytes.value
: The value contained in the record. Stored in the remainingrecord_length - metadata_len - 4
bytes.
With, our byte layout ready, let's proceed with our Segment::append
implementation:
That looks more involved than it actually is. Still, let's go through it once:
append()
- Validate the append index, and obtain it if not provided.
- Serialize the metadata, specifically the
MetaWithIdx
instance in theRecord
- Find the length of the serialized
metadata_bytes
asmetadata_bytes_len
- Serialze the
metadata_bytes_len
tometadata_bytes_len_bytes
- Create a sum type to generalize over serialized byte slices and record value byte slices
- Chain the byte slices in a stream in the order
[metadata_bytes_len_bytes, metadata_bytes, ...record.value]
- Call
append_serialized_record()
on the final chained stream of slices
append_serialized_record()
- Copy current
highest_index
towrite_index
- Obtain the
remaining_store_capacity
using the expressionconfig.max_store_size - store.size()
append_threshold
is then the remaining capacity along with overflow bytes allowed, i.e.remaining_store_capacity + config.max_store_overflow
- Append the serialized stream of slices to the underlying
Store
instance with the computedappend_threshold
- Using the
(position, index_record)
obtained fromstore.append()
, we create theIndexRecord
- Append the
IndexRecord
to the underlyingIndex
instance. - Return the
index
at which the serialized record was written, (returnwrite_index
)
- Copy current
Next, we implement the AsyncIndexedRead
trait for Segment
using the same byte layout:
Again, let's summarize, what's happening above:
- Read the
IndexRecord
at the given indexidx
from the underlyingIndex
instance - Read the serialized record bytes using the
IndexRecord
from the underlyingStore
instance. - Split and deserialize the serialized record bytes to
metadata_bytes_len
,metadata
and recordvalue
- Returns a
Record
instance containing the readmetadata
andvalue
.
Segment::append
and theAsyncIndexedRead
trait impl form the majority of the responsiblities of aSegment
.
Next, we need to provide an API for managing Index
caching on Segment
instances:
As you can see, it simply exposes the caching api of the underlying Index
.
When constructing our Segment
, most of the times we will need to read the Segment
with a given base_index
from some storage media. Ideally we want a mechanism that allows us to:
- Find the base indices of all the segments stored in some storage media
- Given a
base_index
, get theStorage
trait impl. instances associated with theSegment
having thatbase_index
Now a Segment
contains an Index
and Store
. Each of them have distinct underlying Storage
trait impl. instances associated with them. However, they are still part of the same unit.
Let's create a struct SegmentStorage
to express ths notion:
Now, let's express our notion of the storage media that provides SegmentStorage
instances:
/// Provides SegmentStorage for the Segment with the given base_index from some storage media.
We rely on the SegmentStorageProvider
for allocating files or other storage units for our Segment
instances. The receivers are &mut
since the operations presented here might need to manipulate the underlying storage media.
With SegmentStorageProvider
, we can completely decouple storage media from our Segment
, and by extension, our SegmentedLog
implementation.
Now let's go back to our Segment
. Let's create a Segment
constructor that uses the SegmentStorageProvider
:
Next, we utilize the SegmentStorageProvider
to provide an API to flush data written in a Segment
to the underlying storage media. The main idea behind flushing is to close and reopen the underlying storage handles. This method is generally a consistent method of flushing data across different storage platforms. We implement this as follows:
Finally, we implement AsyncTruncate
and AsyncConsume
for our Segment
:
SegmentedLog
(struct)
With our underlying components in place, we are ready to encapsulate the segmented-log data-structure.
Similar to Segment
, we need to represent the configuration schema of our SegmentedLog
first:
/// Configuration for a SegmentedLog
Next, we express our notion of a segmented-log as the SegmentedLog
struct:
The generic parameters are as follows:
S
:Storage
trait impl. to be used as storage foor underlyingSegment
instancesM
: Metadata to be used as parameter toMetaWithIdx
in everyRecord
Idx
: Unsigned integer type to be used as record indicesSize
: Unsigned integer type to be used as storage sizeSERP
:SerializationProvider
trait impl.SSP
:SegmentStorageProvider
trait impl.C
:Cache
trait impl.
The Cache
trait is from the crate generational-cache. It represents an abstract Cache
, and is defined as follows:
/// A size bounded map, where certain existing entries are evicted to make space for new
/// entires.
/// An evicted value from cache.
Now this is all fine and dandy but you are probably wondering, "Why do we need a cache again?" Remember that if all Segment
instances are Index
cached, for every 1GB
of record data, we need 16MB
of heap memory if record sizes are 1KB
. So we made Index
caching optional to keep memory usage from exploding.
How do we decide which Segment
instances are to cache their Index? We use another cache segments_with_cached_index
to decide which Segment
instances cache their Index
. We can choose the cache type based on a access patterns (LRU, LFU etc.)
Now we don't need to store the Segment
instances itself in the Cache
implementation. We can instead store the index of the Segment
instance in the read_segments
vector. Also we don't need to store any explicit values in our Cache
, just the keys will do. So our bound would be: Cache<usize, ()>
.
However, there might be cases, where the user might want all Segment
instances to cache their Index
. So we also make segments_with_cached_index
optional.
Next, let's implement a constructor for our SegmentedLog
:
Let's summarize the above method:
- We obtain the base indices of all the
Segment
instances persisted in the givenSegmentStorageProvider
instance insegment_base_indices
. - We split the read base indices into
read_segment_base_indices
andwrite_segment_base_index
.write_segment_base_index
is the last element insegment_base_indices
. Ifsegment_base_indices
is empty (meaning there are noSegment
instances persisted), we useconfig.initial_index
as thewrite_segment_base_index
. The remaining base indices areread_segment_base_indices
. - We create the read
Segment
instances and the writeSegment
using their appropriate base indices. ReadSegment
instances are cached only ifnum_index_cached_read_segments
limit is not set. If this limit is set, we don't inded-cache readSegment
instances in this constructor. Instead we index-cache them when they are referenced. - We store the read
Segment
instances in a vectorread_segments
. - Write
Segment
is always cached. - We creae a
segments_with_cached_index
Cache
instance to keep track of whichSegment
instances are currently index-cached. We limit its capacity to only as much as necessary. - With the read
Segment
vector, writeSegment
,config
,segments_with_cached_index
andsegment_storage_provider
we create ourSegmentedLog
instance and return it.
Before we proceed further, let's define a couple of macros to make our life a bit easier:
/// Creates a new write Segment instance with the given base_index for
/// the given SegmentedLog instance
/// Consumes the given Segment instance with the given consume_method
/// (close() or remove())
/// Takes ownership of the write Segment instance from the given
/// SegmentedLog.
/// Obtaines a reference to the write Segment in the given
/// SegmentedLog with the given ref_method.
/// (as_mut() or as_ref())
These macros are strictly meant for internal use.
With our groundwork ready, let's proceed with the read/write API for our SegmentedLog
.
Now for reads, we need to be able to read Record
instances by their index
in the SegmentedLog
. This requires us to be able to resolve which Segment
contains the Record
with the given index
.
We know that the Segment
instances are sorted by their base_index
and have non-overlapping index ranges. This enables us to do a binary search on the read_segments
vector to check which Segment
has the given index
within their index range. If none of the read Segment
instances contain this index
we default to the write_segment
.
If the write_segment
doen't contain the index
, it's read API will error out.
Let's implement this behaviour:
pub type ResolvedSegmentMutResult<'a, S, M, H, Idx, SERP, C> =
;
pub type ResolvedSegmentResult<'a, S, M, H, Idx, SERP, C> =
;
Now we can implement AsyncIndexedRead
for our SegmentedLog
:
Notice that this API doesn't use any caching behaviour. This API has been designed to not contain any side effects and be perfectly idempotent in nature.
We need a different API to enable side effects like index-caching.
Let's introduce a new trait to achieve this:
/// Alternative to the AsyncIndexedRead trait where the invoker is guranteed
/// to have exclusive access to the implementing instance.
Next, let's implement some structs and methods for controlling the caching behaviour:
With our caching behaviour implemented, we implement the AsyncIndexedExclusiveRead
trait for our SegmentedLog
:
There are some other methods to read Record
instances efficiently for different workloads:
read_seq
: Sequentially read records in the segmented-log by sequentially iterating over the underlying segments. Avoids segment search overhead.read_seq_exclusive
:read_seq
with caching behaviourstream
: Returns a stream ofRecord
instances within a given range of indicesstream_unbounded
:stream
with index range set to entire range of the segmented-log
Read them on the repository in the SegmentedLog
module.
Next, we need to prepare for our SegmentedLog::append
implementation. The basic outline of append()
is as follows:
- If current write segment is maxed, rotate write segment to a read segment, and create a new write segment that start off where it left.
- Append the record to the write segment
So, we need to implemenent write segment rotation. Let's proceed:
A previous implementation used to directly close and re-open the write segment to flush it. This led to readng the index records multiple times when rotating segments. The new
Segment::flush
API avoids doing that, making the currentrotate_new_write_segment
implementation more efficient.
With this we are aready to implement CommitLog::append
for our SegmentedLog
:
Exactly, as discussed. Now let't implement the missing remove_expired_segments
method:
Let's summarize what is going on above:
- Flush the write segment
- Make a copy of the current
highest_index
asnext_index
. It is to be used as thebase_index
of the next write segment to be created. - Take all segments (both read and write) into a single vector
- These segments are sorted by both index and age. The ages are in descending order
- We find the first segment in this segment that is young enough to not be considered expired
- We split the vector into two parts, the ones
to_remove
and the onesto_keep
. The onesto_keep
starts from the first non-expired segment. The older ones are the onesto_remove
- We isolate the last segment from the segments
to_keep
as the write segment. If there are no segments to keep (i.eto_keep
is empty), we create a new write segment withbase_index
set to the thenext_index
we stored earlier. - We remove the segments to be removed (i.e the ones in
to_remove
) from storage. We also remove their entries from the cache.
Next, let's see the AsyncTruncate
trait impl. for SegmentedLog
:
Let's summarize what is going on above:
- If the given index is out of bounds, error out
- If the given index is contained withing the write segment, truncate the write segment and call it a day.
- If none of the above conditions are true continue on
- Find the segment which contains the given index
- Truncate the segment at the given index
- Remove all segments that come after this segment; also remove their entries from the cache
- Create a new write segment which has it's
base_index
set to thehighest_index
of the truncated segment. Set it as the new write segment
Finally we have the AsyncConsume
trait impl. for SegmentedLog
:
/// Consumes all Segment instances in this SegmentedLog.
An example application using SegmentedLog
Let's summarize what we want to achieve here:
- A HTTP API server that provides RPC like endpoints for a commit log API
- Providing on disk persitence to the underlying commit log using
tokio::fs
basedStorage
andSegmentStorageProvider
impls.
Recall that we already wrote a Storage
impl using tokios::fs
earlier here. Now we need a SegmentStorageProvider
impl. However, could we do even better?
The mechanics for creating a maintaining a file hierarchy for storing segment store and index files will remain largely the same, even across different async runtimes and file implementations. What if we could also abstract that complexity away?
PathAddressedStorageProvider
(trait)
A PathAddressedStorageProvider
obtains Storage
impl instances adrressed by paths. We don't specify at this point where those paths belong (whether on disk based fs, vfs, nfs file share etc.)
DiskBackedSegmentStorageProvider
(struct)
DiskBackedSegmentStorageProvider
uses a PathAddressedStorageProvider
impl. instance to implement SegmentStorageProvider
. The PathAddressedStorageProvider
implementing instance is expected to use on-disk filesystem backed paths and consequently, return Storage
instances backed on the on-disk filesystem.
// ...
Next, we will flesh out the SegmentStorageProvider
implementation in detail.
First, we have some standard file extensions for Segment
Store
and Index
files:
pub const STORE_FILE_EXTENSION: &str = "store";
pub const INDEX_FILE_EXTENSION: &str = "index";
We maintain a mostly flat hierarchy for storing our files:
storage_directory/
├─ <segment_0_base_index>.store
├─ <segment_0_base_index>.store
├─ <segment_1_base_index>.store
├─ <segment_1_base_index>.store
...
Following this hierarchy, let's implement SegmentStorageProvider
for our DiskBackedSegmentStorageProvider
:
With these utilities in place we can proceed with our commit log server example.
laminarmq-tokio-commit-log-server
(crate)
A simple persistent commit log server using the tokio runtime.
The code for this example can be found here.
This server exposes the following HTTP endpoints:
.route // obtain the index bounds
.route // obtain the record at given index
.route // append a new record at the end of the
// commit-log
.route // truncate the commit log
// expects JSON:
// { "truncate_index": <idx: number> }
// records starting from truncate_index
// are removed
Architecture outline for our commit-log server
As you can see, we divide the responsiblity of the commit-log server between two halves:
- axum client facing web request handler: Responsible for routing and parsing HTTP requests
- commit-log request processing: Uses an on disk persisted
CommitLog
impl instance to process different commit-log API requests
In order to process commit-log requests we run a dedicated request handler loop on it's own single threaded tokio runtime. The web client facing half forwards the parsed requests to the request processing half over a dedicated channel, collects the result and responds back to the client.
In order the complete the loop, the request processing half also sends a channel send half resp_tx
, while keeping the recv half resp_rx
with themselves. The request processing half sends back the result using the send half resp_tx
it received.
We will be using axum
for this example.
Now that we have an outline of our architecture, let's proceed with the implementation.
Request and Response types
Let's use Rust's excellent enums to represent our request and response types:
Why did we use structs for certain enum values? Well, we will be using those structs later for parsing json requests in
axum
routes.
Now recall that we will be communicating between the axum server task and the commit-log request processing task. Let's define a Message
type to encode the medium of communication.
type ResponseResult = ;
/// Unit of communication between the client facing task and the request
/// processing task
Commit Log server config
We also need to two configurable paramters. Let's define them in a struct:
/// Configuration for the commit-log request processing server.
Commit Log server request handler
With the pre-requisites ready, let's proceed with actually processing our commit-log requests.
First, we need a struct to manage commit-log server instances:
/// Abstraction to process commit-log requests
Here CL
is a type implementing the CommitLog
trait.
There's also an error type and a few aliases to make life easier. Feel free to look them up in the repository.
Next, we define our request handler that maps every request to it's corresponding response using the CommitLog
impl instance:
Notice that we are directly passing in
Body
to ourCommitLog::append()
without usingto_bytes()
. This is possible becauseBody
implementsStream<Result<Bytes, _>>
which satisfies the trait boundStream<Result<Deref<Target = [u8]>, _>>
. This allows us to write the entire request body in a streaming manner without concatenating the intermediate (packet) buffers. (SeeCommitLog
andStorage
for a refresher.)
The above implementation is fairly straightforward: there is a one-to-one mapping between the request, the commit-log methods and the responses.
Commit Log server task managment and orchestration
As discussed before, we run our commit-log server tasks and request handling loop in single-threaded tokio runtime.
However, let's first derive a basic outline of the request handling loop. In the simplest form, it could be something as follows:
while let Some = message_rx.recv.await
Notice that we explicitly match on Message::Connection
so that we can exit the loop when we receive a Message::Terminate
.
Now we want to service multiple connections concurrently. Sure. Does this work?
while let Some = message_rx.recv.await
Almost. We just need to impose concurrency control. Let's do that:
while let Some = message_rx.recv.await
Let us now look at the actual implementation:
Don't sweat the individual details too much. However, try to see how this implementation fleshes out the basic outline we derived a bit earlier.
Finally, we need to orchestrate the serve()
function inside a single threaded tokio runtime:
All this method does is setup the channel for receiving messages, spawn a thread, create a single threaded rt in it and then call serve()
within the single-threaded rt.
We return the JoinHandle
and the channel send end Sender
from this function. They allow us to join()
the spawned thread and send Message
instances to our CommitLogServer
respectively.
Client facing axum server
Let's now move on to the client facing end of our commit-log server. This side has three major responsiblities:
- Parse HTTP Requests to appropriate
AppRequest
instances using the request path and body - Send a
Message::Connection
containing the parsedAppRequest
to theCommitLogServer
- Retrieve the response from the
CommitLogServer
using the connections receive end and respond back to the user
Our axum
app state simply needs to contain the message channel Sender
. We also add a method to making enqueuing requests easier:
Our route handler functions will be mostly identical. I will show the read and append route handlers here. Feel free to read the rest of the route handlers here
// ...
async
async
// ...
Finally, we have our main()
function for our binary:
async
Feel free to checkout the remaining sections of the commit-log server implementation here
Closing notes
This blog discussed a segmented-log implementation right from the theoretical foundations, to a production level library. At the end of the implementation, we explored an example commit-log server using our segmented-log implementation.
Read more about laminarmq
milestones here
References
We utilized the following resources as references for this blog post:
Lamport, Leslie. "Time, clocks, and the ordering of events in a distributed system." Concurrency: the Works of Leslie Lamport. 2019. 179-196. https://dl.acm.org/doi/pdf/10.1145/359545.359563
Jay Kreps. "The Log: What every software engineer should know about real-time data's unifying abstraction." LinkedIn engineering blog. 2013. https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
Kreps, Jay, Neha Narkhede, and Jun Rao. "Kafka: A distributed messaging system for log processing." Proceedings of the NetDB. Vol. 11. No. 2011. 2011. https://pages.cs.wisc.edu/~akella/CS744/F17/838-CloudPapers/Kafka.pdf