skip to content
Relatively General .NET

Implementing a file pager in Zig

by Oren Eini

posted on: January 24, 2022

Databases will typically divide their data files into pages, and manage all data in pages. As you can imagine, the notion of pages is pretty important to a database. Up to this point, we worked with pages, but not really. A page was just a buffer that we could read and write to. It took me 13 posts to manage to read and write pages from disk, but that is just the beginning. We need to add some structure for the pages. The usual way to do that is to have a page header, which contains some metadata about the page. That leads to an interesting question, how do we manage the page metadata? One way to do that is to reserve a part of the page for this purpose, like so:   This is probably the simplest option (and what we use in Voron), but I don’t like it very much. It makes certain operations harder to deal with, because the usable size of the page is no longer a power of 2. Assuming we have 8KB pages and a page header of 64 bytes, that means that we can use just 8,128 bytes in the page. A lot of data structures are far easier to use if we can assume that the size is a power of 2. Another thing to remember is that the underlying pager that we use operates on the level of 2 MB chunks. In other words, reading a value from the same 2MB range is as cheap as reading a value from the same 8KB page. For that reason, we can utilize the following approach, we’ll reserve the first section of the chunk for all the page headers. Here is what this will look like:   A chunk is 2MB in size, and pages are 8KB. That gives us 256 pages in a chunk. If we reserve the first page for page headers, that gives us 32 bytes for each page header. That is quite a lot, to be honest, which is great, because having more space on the header gives us a lot of options down the road. An important consideration to pay attention to is that we read from the disk at 2MB chunks, but when we write, the writes happen at the page boundary. A change on Page #3, for example, means that we have to change both the header page and then page #3 as separate operations. If we were using an embedded page header, we’ll have just a single write to deal with. In practice, I don’t think this is a major issue. We are doing asynchronous writes and write coalescing anyway. For that matter, writes are actually done lazily (and the critical path goes to the Write Ahead Log anyway) so I don’t think this aspect matters so much. At any rate, let’s get some idea about what the page header will look like: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const PageTag = enum(u8) { none = 0, freeSpaceBitmap = 1, metadata = 2, multiPage = 3, }; pub const PageHeader = extern struct { tag: PageTag, reservedAlignment: [3]u8, data: extern union { fresSpaceBitmap: extern struct { freePages: u32 }, multiPage: extern struct { pagesCount: u32 }, reserved: [20]u8, }, checksum: [8]u8, pub fn getPageCount(self: *PageHeader) u32 { return switch (self.tag) { .none, .metadata => 1, .freeSpaceBitmap => pager.PagesInFile / @bitSizeOf(u8) / pager.PageSize, // 128 .multiPage => self.data.multiPage.pagesCount, }; } }; comptime { if (@sizeOf(PageHeader) != 32) { @compileError(std.fmt.comptimePrint("The PageHeader must be exactly 32 bytes in size but was {}", .{@sizeOf(PageHeader)})); } } view raw PageHeader.zig hosted with ❤ by GitHub I should note that this is merely a skeleton of the page header, just showing its structure. Over time, we’ll add a lot more details, but this is sufficient to understand the direction I’m going. A page header is denoted by a tag, which determines what type of page this is. Based on the type of the page, we can set various fields inside the header. You can also see the getPageCount(), which is a good demonstration of how we work with the page header. I need to have fine-grained control over the bytes in the header, so I’m setting them up in this manner. Zig currently has a number of bugs related to packed / extern structs and unions, which make this a bit harder than expected, sadly. I’m waiting for the self hosted compiler, which will hopefully fix those issues. With the PageHeader struct in place, we can start making use of this, like so: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const Page = struct { page: u64, header: *PageHeader, buffer: []u8, }; fn getPage(p: *pager.Pager, loadedChunks: *pager.Pager.ChunksSet, page: u64, options: pager.GetPageOptions) !Page { const headerPage = (page / pager.PagesInChunk) * pager.PagesInChunk; const headerBytes = try p.getPage(loadedChunks, headerPage, 1, options); const pageIndexInChunk = page % pager.PagesInChunk; const headerEntries: []PageHeader = std.mem.bytesAsSlice(PageHeader, headerBytes); const pageHeader: *PageHeader = &headerEntries[pageIndexInChunk]; const pageCount = pageHeader.getPageCount(); const buffer = try p.getPage(loadedChunks, page, pageCount, options); return Page{ .page = page, .header = pageHeader, .buffer = buffer }; } view raw getPage.zig hosted with ❤ by GitHub There isn’t much going on here, which is great, but it does deserve some explanation. We first get the first page in the chunk for the page that was requested. For example, if we wanted to get page # 4, we get page #0, if we wanted to get page # 300, we’ll get page #256.  That page is the metadata page for the chunk, the only thing that it contains is the headers for all the pages in the chunk in question (including itself, of course). The reason that this matters is that the header for the page is used to compute the number of pages that we need to return. As you’ll note, the API we have here does not provide a way for the caller to say how many pages should be returned. We need to figure it out on our own. To do that, however, we need to store that information somewhere. That is where the page header comes into place. We get the page header of the page we care about from the metadata page of the chunk and compute the number of pages to get from the pager. Then we return to the caller the page header and the page buffer as a single unit. Of course, this presupposes that the values got to the pages somehow. That leads us to other directions, however, dealing with page allocations. You can already see some hints of that in the code above, where we use freeSpaceBitmap. I’ll dedicate my next post to this topic.

Awaiting an async void method in .NET

by Gérald Barré

posted on: January 24, 2022

async void methods are not a good way to define async methods. You should return a Task or ValueTask instead. The main point is to be able to await the method. But, is it possible to await an async void method? I don't say you should use async void in your code, I'm just questioning the point of no

Implementing a file pager in Zig

by Oren Eini

posted on: January 21, 2022

In the last post, we figured out how we can find what pages we should flush to disk. In this one, I want to talk about how we can actually do the flushing. Once a page is marked as ready for flushing, we need to start an async process in which we write it to the disk. While that process is running, we need to ensure that there are no further writes happening on that page. We want to be able to handle flushing writes in the background with as little fuss as we possibly can, while at the same time maximizing parallelism. That turns out to be a significant challenge. Our previous write strategy simply gathered all the writes in memory and then flushed them to disk. Now we need to have an ongoing process. In addition, we also want to handle the scenario where we flush everything in memory for certain scenarios, such as checkpoints or orderly shutdown. In short, the approach we previously took isn’t going to be sufficient for our needs. For that reason, I want to clear the board and start from scratch. As a reminder, here is the API that we provide: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters { var writer = pager.acquireWriter(); defer writer.release(); writer.write(pageNum, pageBuffer); } view raw usage.zig hosted with ❤ by GitHub The rules for this API are fairly simple. You acquire the writer and then may call the write() method any number of times. The writer will aggregate the writes in memory, and will use write behind to write the data to disk in the background. In other words, we aren’t necessarily going to write to the disk when write() is called, only to our own buffer. Note that the writer is working using pages, not chunks, even though the buffer pool we use operates on chunks. In addition to the write() method, we have a flush() which pushes everything that is currently in the writer to the disk. That one is meant to be called when we want to create a checkpoint or when we do an orderly shutdown. The flush() call starts an asynchronous process that we can wait upon, so we aren’t going to stall on checkpoints.  While we write a page to disk, we’ll reject any writes to that page. This is important, because to do otherwise will mean that we may send messed up data to disk. We also need to handle any and all sequences of writes. For example, consider: write(2, [1 page]);  // write a single page #2 write(4, [4 pages]);// write 4 pages from page #4 (so pages: 4,5,6,7) Now, we may actually get additional writes as well: write(6, [3 pages]); write(4, [1 page]); In other words, there are no requirements from the caller about what should be called and in what order. The caller may write a set of pages and then repurpose them again. In this case, we expect that the end result is that we’ll issue the following writes to the disk: write(2, 1 page); write(4, 1 page); write(6, 3 pages); Note that we have page #5, it was written in the first call to page #4 (when it used 4 pages), however, with the update of page #4 to be a single page, the data in page #5 is no longer relevant and we can skip writing it to disk. This is an effect of working with pages. The data doesn’t make sense outside of a page boundary, after all, so we should only write meaningful data. After spending a lot of time thinking about it, I decided to go with the following behavior: The Pager’s writer will manage the write buffers, flushing to the disk when it is time. Callers can decide to let writes happen in the background or to wait for them. Here is the new state of the Writer: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const Writer = struct { lock: std.Thread.Mutex, cache: Lru, parent: *Pager, inflight: collections.SortedList(*WriteState, sortWriteState), completed: ?*WriteState, backgroundWrites: std.atomic.Atomic(u32), }; view raw Writer.zig hosted with ❤ by GitHub There are a few interesting fields here: The lock is there to ensure that we have only a single writer. As a reminder, the actual I/O is asynchronous. The cache implements an Lru with a twist, I improved on that since the last post, more on this later. The parent field is the owning pager, obviously. The last three fields (inflight, completed and backgroundWrites) are all about handling the asynchronous writes for us. Let’s take a look at how we handle the state for a particular write and then we’ll be able to discuss the implementation in depth: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters const WriteState = struct { page: u64, count: u32, writer: *Writer, chunks: ChunksSet = .{}, err: ?anyerror = null, list: *Lru.List = undefined, next: ?*WriteState = null, }; view raw WriteState.zig hosted with ❤ by GitHub On the surface, the WriteState is pretty simple. It contains the page we are writing and how many pages are involved in a particular write, but there are also quite a few additional fields that are… strange. The writer field is used when we are returning from a callback, the chunks are used to record which chunks are relevant for this particular write (remember, the Writer operates at the page level, while the Pager operates at the chunk level). Because we deal with individual pages, we have to ensure that each one of them has its own reference for the chunk in question. That also has the effect of pinning any dirty pages in memory until they are flushed to disk. The err field holds whether there was an error (and what was it) when writing to the disk. The list field is used in conjunction with the write behind policy and will be discussed shortly. We use the next field to create a single linked list with the head being the completed field on the Writer. With that in mind, let’s look at how the write() method is implemented: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn write(self: *Writer, page: u64, buffer: []u8) !void { std.debug.assert(buffer.len % FileChunks.PageSize == 0); const allocator = self.parent.allocator; const count = @intCast(u32, buffer.len / FileChunks.PageSize); _ = try self.drainCompleted(); try self.ensureNotWrittenTo(page, count); if (self.cache.get(page)) |ws| { // can just update the buffer... if (ws.count == count) { var cachedBuffer = try self.parent.getPage(&ws.chunks, page, count, .{ .forWrites = true }); std.mem.copy(u8, cachedBuffer, buffer); return; } if (self.cache.remove(allocator, page)) |removed| { // previous was the wrong size, so can just discard try removed.deinit(self); } } { const state = try allocator.create(WriteState); errdefer allocator.destroy(state); state.* = .{ .writer = self, .page = page, .count = count }; var cachedBuffer = try self.parent.getPage(&state.chunks, page, count, .{ .forWrites = true }); std.mem.copy(u8, cachedBuffer, buffer); try self.cache.push(allocator, state); } while (self.cache.evictOverflow(allocator)) |ws| { try self.writePageToDisk(ws); } } view raw write.zig hosted with ❤ by GitHub We start by draining any completed writes. It is important to detect any errors and to release the references from the chunks we locked during the write portion. We treat any error during the write process as fatal and abort immediately. At this point, we don’t know what the state of the system is. The only safe way to handle this is to shutdown and run recovery. The next step is ensureNotWrittenTo() call. This is required because we may be in the process of writing a page to the disk, and we cannot modify the memory for that page (or pages) while they are being written to the disk. This is a recoverable error, by the way. The caller should wait until the pending writes are complete and retry. We then check in the cache if the page is already there. If it is, we have to consider the potential for a mismatch in the sizes (described earlier in this post), when we have a previous write that spanned three pages but the current one only applies to a single page. Since the page boundary is important, we can just discard the previous write and continue. For the common case of a write to a page that was previously written to, however, we can simply copy the data from the provided buffer to the right page in memory and move on. Most of the hot pages should fit into this code path. We then allocate the state for the current write, get the page (or pages) and copy the data from the provided buffer to our own memory. We also, crucially, push the new state into the writer’s cache. In the previous post that action would evict a page that we need to handle. In the current code, that is handled at the epilog of the method. We iterate over the cache and evict any pages that don’t fit into the cache any longer. This is because a single write may evict multiple pages. If we have space for 32KB, and we currently have 4 separate pages in the cache, we are full. Adding a write for 2 pages, however, will need to evict two items out of the cache. Another aspect that I changed from the last post is that I’m no longer requiring that the current page will not be evicted immediately. I mentioned that I changed the Lru’s implementation significantly.  Previously it was fairly generic, but it turns out that it is better to specialize it specifically for our needs. Here is the current version: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters const Lru = struct { const List = struct { list: collections.DoublyLinkedList(*WriteState) = .{}, used: usize = 0, max: usize, }; young: List, old: List, map: std.AutoHashMapUnmanaged(u64, *collections.DoublyLinkedList(*WriteState).Node) = .{}, }; view raw Lru.zig hosted with ❤ by GitHub This Lru is implemented using a generational approach. The idea is that we can specify the sizes of the young and old generations independently. A new page that is added to the cache is written to the young generation. If there is another write to the page, it will be moved to the old generation. If the size of the data in each generation is too big, we use a simple Least Recently Used to evict items. An item that expired on the old generation will be moved to the young generation, while an item that expires from the young generation will be evicted completely. Here is how this looks like in code: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn get(self: *Lru, page: u64) ?*WriteState { if (self.map.get(page)) |existing| { const item = existing.item; if (item.list != &self.old) { item.list.used -= item.count * FileChunks.PageSize; item.list.list.evict(existing); item.list = &self.old; self.old.used += item.count * FileChunks.PageSize; } self.old.list.moveFirst(existing); return existing.item; } return null; } pub fn push(self: *Lru, allocator: std.mem.Allocator, item: *WriteState) !void { std.debug.assert(self.map.get(item.page) == null); var node = try self.young.list.prepend(allocator, item); item.list = &self.young; self.young.used += item.count * FileChunks.PageSize; try self.map.put(allocator, item.page, node); } pub fn evictOverflow(self: *Lru, allocator: std.mem.Allocator) ?*WriteState { if (self.old.used > self.old.max) { if (self.old.list.evictLast()) |e| { self.young.list.moveFirst(e); self.young.used += e.item.count * FileChunks.PageSize; self.old.used -= e.item.count * FileChunks.PageSize; e.item.list = &self.young; } } if (self.young.used > self.young.max) { if (self.young.list.popLast(allocator)) |e| { self.young.used -= e.count * FileChunks.PageSize; return e; } } return null; } view raw Lru.imp.zig hosted with ❤ by GitHub This is the same rough algorithm used by MySQL. The advantage here is that there is O(1) cost for the entire process, but at the same time, busy pages actually have a good chance of not being written to the disk (which is ideal, since if they are busy, they keep changing). To give some context to the sizes involved, we may decide that the old generation contains 192 MB and the young generation is 64 MB. A page write will remain in the young generation until we have another 64 MB (8,192 individual page writes), if there were any writes to it again in that time frame, it will go to the old generation. Only if we didn't have any writes to it in that time frame will we flush it to disk. Once a page was moved to the old generation, it has to go through a buffer of 256 MB (32K page writes) before it will go to the disk. Hot pages should be safe from spurious writes, especially since we’ll write them to the disk anyway whenever we do a checkpoint. When a page is evicted from the cache, the writePageToDisk() method is called on it, there is a bit of work going on here: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn writePageToDisk(self: *Writer, ws: *WriteState) !void { // merge backward while (ws.page > 0 and try self.tryMergePages(ws, ws.page - 1)) {} // merge forward while (try self.tryMergePages(ws, ws.page + ws.count)) {} try self.inflight.add(self.parent.allocator, ws); var pageBuffer = try self.parent.getPage(&ws.chunks, ws.page, ws.count, .{}); _ = self.backgroundWrites.fetchAdd(1, .Release); var fileIdx = ws.page / FileChunks.PagesInFile; var file = self.parent.files[fileIdx] orelse return error.FileIsNotLoaded; self.parent.ring.submit(.{ .tag = .Write, .fd = file.fd, .buffer = pageBuffer, .offset = (ws.page % FileChunks.PagesInFile), .context = @ptrToInt(ws), .callback = completeFlushCallback, }) catch |e| { // we aren't removing the inflight record, an error here is probably // fatal, and we already merged pages, so we can't revert the state self.backgroundWrites.fetchSub(1, .Release); return e; }; } fn tryMergePages(self: *Writer, cur: *WriteState, page: u64) !bool { var curFileIdx = cur.page / FileChunks.PagesInFile; var fileIdx = page / FileChunks.PagesInFile; if (curFileIdx != fileIdx) return false; if (self.cache.remove(self.parent.allocator, page)) |p| { cur.page = std.math.min(cur.page, page); cur.count += p.count; // move chunk ownership var it = p.chunks.keyIterator(); while (it.next()) |c| { try cur.chunks.put(self.parent.allocator, c.*, {}); } try p.deinit(self); return true; } return false; } view raw writePageToDisk.zig hosted with ❤ by GitHub An important observation about writing to the disk is that coalescing writes can have a huge impact on performance. Instead of having to go to the disk multiple times, we can schlepp the data there just once. The amount of the data we write is actually far less significant than what you may expect. You can see that the first thing that we do is to try to merge the pages (both forward and backward). If we find a candidate, we expand our own WriteState and release the one that subsumed. This is an important aspect, because the writes we send to the disk don’t necessarily match the old / young generations division. If we evicted a page that is adjacent to another modified page, we’ll write them both to the disk. That is even if the second page is in the old generation (and that will evict the page from the write cache). In practice, databases exhibit a lot of locality, so I don’t expect that to be an actual issue. The last thing we do before actually submitting the I/O operation is to register the write in the inflight record. This is basically a sorted list (by the first page for that write) which we’ll check on any future writes. We also increment the number of background writes we have so we’ll know to wait for them to complete. Note that if we fail to submit the I/O, we decrement the number of backgroundWrites, we use this value to wait for async operations (for example, during shutdown). And since such errors are probably fatal, we are heading that way soon. The actual I/O happens in the background and we’ll call the callback function when that is done (either successfully or with an error). So far, all the code we saw was single threaded and protected by the lock mutex. The callback, on the other hand, is running without any such protections. Furthermore, the callback thread is running all the notifications we have. We want to do as little as possible there. Here is what I ended up with: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn completeFlushCallback(work: *PagerRing.Work) void { // this is called from background thread, without the lock! // we need to do minimal amount of work here var ws = @intToPtr(*WriteState, work.context); var self = ws.writer; while (true) { ws.next = @atomicLoad(?*WriteState, &self.completed, .Acquire); const result = @cmpxchgWeak(?*WriteState, &self.completed, ws.next, ws, .AcqRel, .Acquire); if (result == null) { break; } // failed to update, need to retry } // now can wake any waiters... _ = self.backgroundWrites.fetchSub(1, .Release); std.Thread.Futex.wake(&self.backgroundWrites, std.math.maxInt(u32)); } view raw completeFlushCallback.zig hosted with ❤ by GitHub We register the completed write in the completed linked list and wake any pending waiters. Any actual processing, of course, is moved to the part of the process where we actually run under lock and don’t have to worry about concurrency and multi threading. The actual behavior around completing writes is implemented in the drainCompleted() call that is part of the write() call. This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn drainCompleted(self: *Writer) !bool { var hasAny = false; while (@atomicLoad(?*WriteState, &self.completed, .Acquire)) |done| { if (@cmpxchgWeak(?*WriteState, &self.completed, done, done.next, .AcqRel, .Acquire)) |_| { // failed to update, need to retry continue; } hasAny = true; _ = self.inflight.remove(done); if (done.err) |err| { done.deinit(self) catch { // return the original error, if it happened }; return err; // we abort on the first error } try done.deinit(self); } return hasAny; } view raw drainCompleted.zig hosted with ❤ by GitHub Even though we are under lock, the I/O completion may compete with us on the completed list, so we’re using lock free methods to iterate through the list of completed actions. The actual behavior is straightforward, we remove the write from the list of inflight writes and return an error if the write had a problem. Note that we have to deal with potentially nested errors as well.  Aside from calling this as part of the write call(), we may also want to explicitly wait for all the pending operations to complete. This is done using the waitForWrites() call: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn waitForWrites(self: *Writer, timeout: ?u64) !bool { var hasAny = false; while (true) { if (try self.drainCompleted()) { hasAny = true; } var pending = self.backgroundWrites.load(.Acquire); if (pending == 0) return hasAny; try std.Thread.Futex.wait(&self.backgroundWrites, pending, timeout); } } view raw waitForWrites.zig hosted with ❤ by GitHub We drain all the completed writes and then wait for any pending ones to complete as well. We repeat this until there are no more backgroundWrites. Note that we run this under the lock, so we know that there can never be additional writes happening. In special circumstances, we may want to ask the writer to push everything to the disk, we do that by calling the checkpoint() method: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn checkpoint(self: *Writer, timeout: ?u64) !void { // first, clear all pending writes to disk... while (self.cache.evictYoungest()) |ws| { try self.writePageToDisk(ws); } // now wait for them to complete _ = try self.waitForWrites(timeout); } view raw checkpoint.zig hosted with ❤ by GitHub There is almost nothing there, I’m happy to say, we are almost done. We evict pages from the cache from the youngest to the oldest, using the same write coalescing as before. Given write locality, that is likely to produce good results. We complete the process by waiting for all those writes to complete, and we are done. There is just one last topic that I want to cover: Shutting  down: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn deinit(self: *Writer, allocator: std.mem.Allocator) void { while (true) { self.waitForWrites(null) catch { // if we are here, the waitForWrites returned an error // however, we still need to drain the whole queue of async // operations, otherwise, we may have a memory leak. // an error at this stage on the write isn't something that we // *can* handle, and we are tearing down the writer anyway continue; }; break; } // we just discard the pending writes at this point, caller should either // call checkpoint() or will run recovery on startup while (self.cache.evictYoungest(allocator)) |e| { e.deinit(self) catch { // nothing that we can do about this }; } self.cache.deinit(allocator); self.inflight.deinit(allocator); } view raw deinit.zig hosted with ❤ by GitHub Shutting down turns out to be a fairly involved process. This is especially the case if we are shutting down because of an error. We need to wait until all the pending writes have been completed (to do otherwise is to invite Use-After-Free bugs). That is why we call the waitForWrites() until it completes successfully. At worst, each time this is called will process a single write. On shutdown, we can’t really start writing to the disk (after all, we may be shutting down because of the disk). We just discard the data in this case. That is something that we are okay to do, because we assume that any important data can be restored when we run recovery. This is one of those cases where the fact that we are building a database makes our life easier. We don’t need to try very hard to persist to disk, we can assume that as a given. I’m pretty happy with how all this turned out. The full code for the pager is now at around 1,500 lines of code. I think that at this point we are pretty much at or near what you’ll need to replace the mmap() based pager that I demonstrated earlier in this series. This is the sort of code that needs a metric ton of tests, of course, and a lot of actual experience throwing stuff at it. The nice thing about this approach is that this is entirely in our hands. We can now implement whatever policies we want. The actual behavior is more or less done, but we can play with the exact mechanics and what will trigger what at will. In some cases, that can give us a great deal of power and flexibility.

Don’t assume the result of read()

by Oren Eini

posted on: January 20, 2022

I read this post and it took me very little time to spot a pretty nasty bug. Here is the relevant section: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters loop { let read_count = encrypted_file.read(&mut buffer)?; if read_count == BUFFER_LEN { let plaintext = stream_decryptor .decrypt_next(buffer.as_slice()) .map_err(|err| anyhow!("Decrypting large file: {}", err))?; dist_file.write(&plaintext)?; } else if read_count == 0 { break; } else { let plaintext = stream_decryptor .decrypt_last(&buffer[..read_count]) .map_err(|err| anyhow!("Decrypting large file: {}", err))?; dist_file.write(&plaintext)?; break; } } view raw bug.rs hosted with ❤ by GitHub The bug is on line #4. The code assumes that a call to read() will return less than the requested number of bytes only at the end of the file. The problem with that approach is that this is explicitly documented to not work this way: It is not an error if the returned value n is smaller than the buffer size, even when the reader is not at the end of the stream yet. This may happen for example because fewer bytes are actually available right now (e. g. being close to end-of-file) or because read() was interrupted by a signal. This is a super common error in many cases. And in the vast majority of the cases, that would work. Except when it wouldn’t. The underlying implementation of File::read() will call read() or ReadFile(). ReadFile() (Windows) is documented to read as much as you requested, unless you hit the end of file. The read() call, on Unix, is documented to allow returning less than requested: It is not an error if this number is smaller than the number of bytes requested Aside from signals, the file system is free to do a partial read if it has some of the data in memory and some not. I’m not sure if this is implemented in this manner, but it is allowed to do so. And the results for the code above in this case are absolutely catastrophic (decryption will fail, encryption will emit partial information with no error, etc). I’m writing this blog post because reading the code made the error jump at me. Was bitten by this assumption too many times.

Generate SSH RSA Key Pairs on Windows with WSL

by Ardalis

posted on: January 20, 2022

Secure Shell Protocol (SSH) keys provide an alternate way to authenticate with many services like GitHub. Creating them on Windows is simple…Keep Reading →

Implementing a file pager in Zig

by Oren Eini

posted on: January 19, 2022

In the previous post I outlined some ideas about how to implement a more efficient write behind. The idea is that whenever we write pages to the Pager, we’ll not trigger an immediate write to the disk. Instead, we’ll keep the data in memory and only write to the disk when we hit a certain threshold. In many write scenarios, there are certain pages that are modified a lot (like the root page in a B+Tree) and pages that are modified rarely (a leaf page that got entries and will not be modified again). There is no point in writing the popular page to the disk, we’ll likely get another write to them shortly anyway. That calls to a Least Frequently Modified approach. We don’t need to use a more complex approach, (like the Clock Sweep algorithm we use for the Pager), because we don’t have to deal with the same scenarios. There are not likely to be cases similar to scans, which throws a lot of complexities of buffer pool implementations. Writes operations are far more predictable in general and follow a pretty strict power law distribution. The task is simple: we have the list of pages that were modified, and at capacity, we’ll select some to send to the disk. The question is how to make that decision. The simplest option is to go with the least recently used model. That is trivial to implement, The idea is that we have the following sequence of writes (assuming we have a capacity of 4 pages): 1, 2, 3, 2, 3, 1, 2, 3, 4, 2, 1, 2, 3, 4, 4, 2, 1, 6 In this case, the page that will be selected for eviction is #3, since it wasn’t modified the longest. The other alternative is to use least frequently used, in which case we have the following frequency table: Page Usages 1 4 2 5 3 4 4 3 6 1 In this case, we’ll want to select page #4 for eviction. Since it is the one least used. (We don't consider #6 because it is the one we just inserted). I can make arguments for both sides, to be frank. It makes sense that the least frequently used is going to be the most relevant, right? The problem is that we need to also account for decaying usage over time. What do I mean by this? We may have a page that is very hot, it gets used a lot for a certain period of time. After that point, however, it is no longer being written to, but because it was frequently used, it will take a long time to evict from the pool. A good example of such a scenario is when we have a B+Tree and we are inserting values in ascending orders. All the values for the tree are going to be placed in the same page, so if we have a lot of inserts, that page is going to be hot. Once it is full, however, we’ll start using another page as the target and then the page will reside in memory until some other page will have more usage.  A good discussion of least frequency used implementation is in this blog post. A nice way to deal with the issue of decaying priorities over time in an LFU setup is to use the following formula to compute the priority of the pages: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const NodePriority = struct { prevAccess: u64, deltaSum : u64, usages : u64, priority : u64 fn computePriority(self: *NodePriority) void { var now = @intCast(u64, std.time.milliTimestamp()); if(self.prevAccess == 0) { self.prevAccess = now - std.time.ms_per_s * 60; } var delta = now - self.prevAccess; self.deltaSum += delta; self.usages += 1; self.priority = now - (self.deltaSum / self.usages); } }; view raw priority.zig hosted with ❤ by GitHub The idea is that we compute the priority of the page based on the last access, so we are very close to the most recently used option. However, note that we compute the distance between accesses to the page. A page that is infrequently accessed will have low usages and a high delta sum. That will reduce its priority. Conversely, a page that is heavily used will have a low delta sum and high usage, so its value will be near the top. Another option is to go with another clock sweep option. In this case, we use a simple least recently used model, but we keep count of the frequency of usages. In that case, if we have to evict a page that is heavily used, we can reduce its usages and give it another chance. The advantage here is that this is a far simpler model to work with, but gives roughly the same results. Another option we have is to use the midpoint insertion LRU. There is also another consideration to take. The I/O cost isn’t linear. If I’m writing page #3 to disk, it is basically free from my perspective to write nearby pages. It is the same exact cost, after all, so why not do that? We’ll need to write our own doubly linked list. The Zig’s standard library only contains a single linked list. It doesn’t take long to write such a data structure, but it is fun to do so. I absolutely get why implementing linked lists used to be such a common practice in interviews: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn DoublyLinkedList(comptime T: type) type { return struct { const Self = @This(); pub const Node = struct { next: ?*Node, prev: ?*Node, item: T, pub fn removeFromSiblings(node: *Node) void { if (node.prev) |prev| { prev.next = node.next; } if (node.next) |next| { next.next = node.prev; } } }; head: ?*Node = null, tail: ?*Node = null, pub fn deinit(self: *Self, allocator: std.mem.Allocator) void { while (self.head) |n| { self.head = n.next; allocator.destroy(n); } } pub fn prepend(self: *Self, allocator: std.mem.Allocator, item: T) !*Node { var node = try allocator.create(Node); errdefer allocator.destroy(node); node.* = .{ .next = self.head, .prev = null, .item = item }; if (self.head) |head| { head.prev = node; } if (self.tail == null) { self.tail = node; } self.head = node; return node; } pub fn append(self: *Self, allocator: std.mem.Allocator, item: T) !*Node { var node = try allocator.create(Node); errdefer allocator.destroy(node); node.* = .{ .next = null, .prev = self.tail, .item = item }; if (self.tail) |tail| { tail.next = node; } if (self.head == null) { self.head = node; } self.tail = node; return node; } pub fn moveFirst(self: *Self, node: *Node) void { if (self.head == node) return; if (self.tail == node) { self.tail = node.prev; } node.removeFromSiblings(); if (self.head) |h| { h.prev = node; } node.next = self.head; self.head = node; node.prev = null; } pub fn moveLast(self: *Self, node: *Node) void { if (self.tail == node) return; if (self.head == node) { self.head = node.next; } node.removeFromSiblings(); if (self.tail) |t| { t.next = node; } node.prev = self.tail; self.tail = node; node.next = null; } pub fn remove(self: *Self, allocator: std.mem.Allocator, node: *Node) void { defer allocator.destroy(node); node.removeFromSiblings(); if (self.head == node) { self.head = node.next; } if (self.tail == node) { self.tail = node.prev; } } pub const IteratorDirection = enum { forward, backward }; pub const Iterator = struct { it: ?*Node, direction: IteratorDirection, pub fn next(self: *Iterator) ?T { if (self.it) |cur| { self.it = if (self.direction == .forward) cur.next else cur.prev; return cur.item; } return null; } }; pub fn iterate(self: *Self, direction: IteratorDirection) Iterator { return .{ .direction = direction, .it = if (direction == .forward) self.head else self.tail, }; } pub fn popLast(self: *Self, allocator: std.mem.Allocator) ?T { if (self.tail) |t| { var item = t.item; self.remove(allocator, t); return item; } return null; } pub fn popFirst(self: *Self, allocator: std.mem.Allocator) ?T { if (self.head) |h| { var item = h.item; self.remove(allocator, h); return item; } return null; } }; } view raw DoublyLinkedList.zig hosted with ❤ by GitHub There isn’t really much to discuss here, to be honest. There is a bunch of code here, but it is fairly simple. I just had to implement a few operations. The code itself is straightforward. It is a lot more interesting when we see it being used to implement the LRU: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn Lru(comptime T: type) type { return struct { const Self = @This(); const Item = struct { val: T, usage: usize }; list: DoublyLinkedList(Item) = .{}, map: std.AutoHashMapUnmanaged(T, *DoublyLinkedList(Item).Node) = .{}, capacity: usize, pub fn init(capacity: usize) Self { return .{ .capacity = std.math.max(capacity, 2) }; } pub fn deinit(self: *Self, allocator: std.mem.Allocator) void { self.list.deinit(allocator); self.map.deinit(allocator); } pub fn push(self: *Self, allocator: std.mem.Allocator, val: T) !?T { if (self.map.get(val)) |existing| { existing.item.usage += 1; self.list.moveFirst(existing); return null; } var node = try self.list.prepend(allocator, .{ .val = val, .usage = 1 }); try self.map.put(allocator, val, node); if (self.map.count() <= self.capacity) return null; while (self.list.tail) |last| { if (last.item.usage > 1 or last == node) { // let's give it another try... last.item.usage /= 2; self.list.moveFirst(last); continue; } var dropped = last.item.val; self.list.remove(allocator, last); _ = self.map.remove(dropped); return dropped; } unreachable; } }; } view raw Lru.zig hosted with ❤ by GitHub The push() method is where it all happens. We have two options here: We have a page already inside the LRU. In that case, we increment its usage counter and move it to the front of the list. This is a new page, so we have to add it to the LRU. If there is enough capacity, we can just add it to the front of the list and be done with it. However, things get interesting when we are at capacity. At that point, we actually need to select a page to evict. How can we do that? We scan the end of the list (the oldest page) and check its usage. If it has more than a single usage, we half its usage counter and move it to the front. We continue to work on the tail in this manner. In essence, high usage counter will get reset rather quickly, but this will still give us a fairly balanced approach, with more popular pages remaining in the pool for longer. When we evict a page, we can return it back to the caller, which can then write it to the disk. Of course, you probably don’t want to just write a single page. We need to check if we have additional pages nearby, so we can consolidate all of them at once to the disk. I’ll touch on that in my next post.

Implementing a file pager in Zig

by Oren Eini

posted on: January 18, 2022

In the last blog post I presented the manner in which the Pager can write data to disk. Here is a reminder: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters var w = pager.acquireWriter(); defer w.release(); try w.write(0, buffer); try w.flushWrites(); view raw usage.zig hosted with ❤ by GitHub We acquire the writer (and a lock on it), call write on the pages we want to write on (passing the buffer to write on them), and finalize the process by calling flushWrites() to actually write the data to disk. As a reminder, we assume that the caller of the Pager is responsible for coordination. While we are writing to a specific page, it is the responsibility of the caller to ensure that there are no reads to that page. The API above is intentionally simplistic , it doesn’t give us a lot of knobs to play with. But that is sufficient to do some fairly sophisticated things. One of the interesting observations is that we split the process of updating the data file into discrete steps. There is the part in which we are updating the in memory data, which allows other threads to immediately observe it (since they’ll read the new details from the Pager’s cache). Separately, there is the portion in which we write to the disk. The reason that I built the API in this manner is that it provides me with the flexibility to make decisions. Here are some of the things that I can do with the current structure: I can decide not to write the data to the disk. If the amount of modified pages is small (very common if I’m continuously modifying the same set of pages) I can skip the I/O costs entirely and do everything in memory. Flushing the data to disk can be done in an asynchronous manner. In fact, it is already done in an asynchronous manner, but we are waiting for it to complete. That isn’t actually required. The way the Pager works, we deposit the writes in the pager, and at some future point the Pager will persist them to disk. The durability aspect of a database is not reliant on the Pager, it is a property of the Write Ahead Log, usually. If I wanted to implement a more sophisticated approach for writing to the disk, I could implement a least recently used cache for the written pages. When the number of pages in memory exceeds a certain size, we’ll start writing the oldest to disk. That keeps the most used pages in memory and avoids needless I/O.  At certain points, we can ask the Pager to flush everything to the disk, this gives us a checkpoint, where we can safely trim the Write Ahead Log. A good place to do that is whenever we reach the file size limit of the log and need to create a new one. So far, by the way, you’ll notice that I’m not actually talking about durability, just writing to the disk. The durability aspect is coming from something we did long ago, but didn’t really pay attention to. Let’s look at how we are opening files, shall we: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters const PagerFile = struct { chunks: *FileChunks, fd: std.os.fd_t, pub fn init(allocator: std.mem.Allocator, file: []const u8) !PagerFile { var chunks = try FileChunks.init(allocator); errdefer chunks.deinit(); var fd = try std.os.open( file, std.os.O.RDWR | std.os.O.CREAT | std.os.O.CLOEXEC | std.os.O.DIRECT | std.os.O.DSYNC, std.os.S.IRUSR | std.os.S.IWUSR, ); return PagerFile{ .chunks = chunks, .fd = fd, }; } pub fn deinit(self: *PagerFile) void { self.chunks.deinit(); std.os.close(self.fd); } }; view raw PagerFile.zig hosted with ❤ by GitHub Take a look at the flags that we pass to the open() command, we are asking the OS to use direct I/O (bypassing the buffer pool, since we’ll use our own) as well as using DSYNC write mode. The two together means that the write will skip any buffering / caching along the way and hit the disk in a durable manner. The fact that we are using async I/O means that we need to ensure that the buffers we write are not modified while we are saving them. As we currently have the API, there is a strong boundary for consistency. We acquire the writer, write whatever pages we need and flush immediately. A more complex system would be needed to manage higher performance levels. The issue is that in order to do that, we have to give up a level of control. Instead of knowing exactly where something will happen, we can have a more sophisticated approach, but  we’ll need to be aware that we don’t really know at which point the data will be persisted. At this point, however, there is a good reason to ask, do we even need to write durably? If we are limiting the consistency of the data to specific times requested by the caller (such as when we replace the Write Ahead Log), we can just call fsync() at the appropriate times, no? That would allow us to use buffered writes from most I/O. I don’t think that this would be a good idea. Remember that we are using multiple files. If we’ll use buffered I/O and fsync(), we’ll need to issue multiple fsync() calls, which can be quite expensive. It also means higher memory usage on the system because of the file system cache, for data we determine is no longer in memory. It is simpler to use direct I/O for the whole thing, after all. In the next post, I’m going to show how to implement a more sophisticated write-behind algorithm and discuss some of the implications of such a design.

Guard Clauses and Exceptions or Validation?

by Ardalis

posted on: January 18, 2022

Guard Clauses provide an elegant way to ensure code inputs are valid, typically by throwing exceptions. Validation provides a solution to a…Keep Reading →

Implementing a file pager in Zig

by Oren Eini

posted on: January 17, 2022

At long last, we are now at the point where we can write data back to the disk.  Before we can do that, however, we need to figure out what sort of writes we want to allow. The idea that I have in mind for the Pager is to follow the same path as Voron does. In other words, for writes, we can make the following assumptions: There is only a single thread doing writes to the pager. There are no readers for the writes until the write is completed. There is a distinction between writing the data to the pager and writing the data to the disk. Let’s break those assumptions apart and see what they bring to the table. The fact that we can assume only a single writer thread at any given point is pretty important. It means that the level of complexity that we have to face is greatly reduced. In the same sense, the fact that we don’t need to deal with concurrent readers or any consistency boundary for the data while it is being written will greatly simplify things for us. Finally, we make a distinction between writing to the pager and writing to the disk. Writing to the disk is _slow_, so we want to avoid doing that at any critical areas and push that to the background. Finally, there is another aspect to consider. Internally, the Pager works with 2MB chunks, but to the outside world, it is using 8KB pages. When we write, we always write at the 8KB pages, not chunks. How would that work for the Pager? The Pager itself is concurrent, but we only allow a single writer at a time, we can achieve this by centralizing all the write activities in the Writer struct, like so: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters // inside Pager's struct... pub fn acquireWriter(self: *Pager) *Writer { self.writer.lock.lock(); return &self.writer; } pub const Writer = struct { writeResults: std.ArrayListUnmanaged(anyerror), writtenPages: std.AutoArrayHashMapUnmanaged(u64, u32), loadedChunksForWrites: ChunksSet, writeFlushCompleted: std.atomic.Atomic(u32), lock: std.Thread.Mutex, parent: *Pager, pub fn release(self: *Writer) void { self.lock.unlock(); } } view raw Writer.zig hosted with ❤ by GitHub For now, I’m going to ignore the fields in the Writer struct, we’ll touch on them in detail later. In order to use the writer, you need to acquire it, write as many pages as you need, then release it. Here is a usage example: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters var w = pager.acquireWriter(); defer w.release(); try w.write(0, buffer); try w.flushWrites(); view raw usage.zig hosted with ❤ by GitHub The basic idea is fairly simple. With the writer, we operate at the page boundary to write as many pages as we need, once we are done, the call to flushWrites() persists the data to disk and then we can release the writer. Let’s dig a bit deeper and see how that works, shall we? This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn write(self: *Writer, page: u64, buffer: []align(mem.page_size) u8) !void { std.debug.assert(buffer.len % FileChunks.PageSize == 0); var count = @intCast(u32, buffer.len / FileChunks.PageSize); var dst = try self.parent.getPage(&self.loadedChunksForWrites, page, count, .{ .forWrites = true }); std.mem.copy(u8, dst, buffer); try self.writtenPages.put(self.parent.allocator, page, count); } view raw write.zig hosted with ❤ by GitHub The write() call is about as basic as you can get. We use the getPage() function to get the right page, memcpy the data and that is about it, right? There are only two other things here that are important: We record which chunk (the 2MB chunk of memory, out of which we carve the 8KB pages) at the writer’s level, is using the loadedChunksForWrites value. We remember which pages we wrote to using the writtenPages hash table. This is intentionally bare bones, because that is actually sufficient for our needs. The  fact that we remember which chunks we loaded (and keep a reference to them) will prevent us from reclaiming them, so even though we just wrote to memory, another thread can get the data and start using it without waiting for the disk. Of course, we still need to hit the disk eventually, that is what flushWrites() is about. This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn flushWrites(self: *Writer) !void { var count = self.writtenPages.count(); if (count == 0) return; var tmp = try self.parent.allocator.alloc(u64, count); defer self.parent.allocator.free(tmp); for (self.writtenPages.keys()) |i, p| { tmp[i] = p; } try self.writeResults.ensureTotalCapacity(self.parent.allocator, count); std.sort.sort(u64, tmp, {}, comptime std.sort.asc(u64)); var page = tmp[0]; var fileIdx = page / FileChunks.PagesInFile; var len = self.writtenPages.get(page) orelse unreachable; var index: usize = 1; while (index < tmp.len) : (index += 1) { if (page + len == tmp[index] and fileIdx == (tmp[index] / FileChunks.PagesInFile)) { // consecutive and in same file, just increase... len += self.writtenPages.get(page) orelse unreachable; continue; } try self.writePageToDisk(page, len); } // last one... try self.writePageToDisk(page, len); try self.waitForAllDiskWritesToComplete(); } view raw flushWrites.zig hosted with ❤ by GitHub There is a lot that is going on here, let’s break it up. We start by allocating a temporary array and copying the keys from the writtenPages hash table to it. We then sort the array. This is done so we’ll be able to process the writes in a sequential manner, which is likely to be faster, even with async I/O. We then scan the list of pages in order, trying to merge writes together. The idea is to issue the minimum number of write calls. Finally, we’ll wait for all the writes to complete. Okay, maybe it isn’t that complex.  There is a bunch of code here, but it is mostly straightforward. Note that we also prepare the writeResults list to accept the results of the write to the disk. As for writing to the disk, this is done using the PagerRing we previously looked at: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn writePageToDisk(self: *Writer, page: u64, count: u32) !void { var fileIdx = page / FileChunks.PagesInFile; var file = self.parent.files[fileIdx] orelse return error.FileIsNotLoaded; var pageBuffer = try self.parent.getPage(&self.loadedChunksForWrites, page, count, .{}); _ = self.writeFlushCompleted.fetchAdd(1, .Release); try self.parent.ring.submit(.{ .tag = .Write, .fd = file.fd, .buffer = pageBuffer, .offset = page % FileChunks.PagesInFile, .context = @ptrToInt(self), .callback = completeFlush, }); } fn completeFlush(work: *PagerRing.Work) void { var self = @intToPtr(*Pager.Writer, work.context); if (work.result.err) |err| { // need to report the error before releasing waiters self.writeResults.appendAssumeCapacity(err); } if (self.writeFlushCompleted.fetchSub(1, .Release) == 1) { std.Thread.Futex.wake(&self.writeFlushCompleted, std.math.maxInt(u32)); } } view raw writePageToDisk.zig hosted with ❤ by GitHub To write a buffer to the disk, we simply get the buffer from the Pager (reusing all the work we did in getPage()), increment the number of outstanding writes and then submit the work for the ring for processing. We setup the completeFlush as the callback function on completion. The PagerRing will call us when it is done writing to the disk. If there is an error, we’ll record it and reduce the number of outstanding writes. If there are no more outstanding writes, we’ll wake any waiters. That part is handled in the waitForAllDiskWritesToComplete(). This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn waitForAllDiskWritesToComplete(self: *Writer) !void { while (true) { var cur = self.writeFlushCompleted.load(.Acquire); if (cur == 0) // no outstanding writes... break; try std.Thread.Futex.wait(&self.writeFlushCompleted, cur, null); } self.writtenPages.shrinkRetainingCapacity(0); var it = self.loadedChunksForWrites.keyIterator(); while (it.next()) |chunk| { try self.parent.releaseChunk(chunk.*); } self.loadedChunksForWrites.clearRetainingCapacity(); defer self.writeResults.clearRetainingCapacity(); if (self.writeResults.items.len != 0) { return self.writeResults.items[0]; } } view raw waitForAllDiskWritesToComplete.zig hosted with ❤ by GitHub We start by waiting for the outstanding writes to complete, waiting if needed. Then we can reset the state of the Writer. We start by resetting the written pages and then iterate over all the loaded chunks and release them. After the call, the Pager may decide to remove them from memory. This is fine, since they were already written to disk. Except… if there was an error. You might have noticed that we are gathering the errors on each individual write operation we send, but we are actually only looking at the first one. For that matter, we clear the state of the Writer regardless if there were errors or not. In general, an I/O error from the disk is not something that is recoverable. What you can do at this stage is to raise the error higher and run whatever recovery you have on startup. In the next post, I’m going to be talking about durability and the overall expected performance of the system under this sort of model.