skip to content
Relatively General .NET

Working With Market Data Using Time Series in RavenDB

by Oren Eini

posted on: January 13, 2022

Kamran Ayub has another great article discussing how to utilize RavenDB to analyze crypto markets data. In addition to the article, you can see the full source code for the sample application. There is even a really cool intro video for the application and article:

Implementing a file pager in Zig

by Oren Eini

posted on: January 12, 2022

Up to this point, we focused on reading data from the disk, we can do that up to a point. Eventually we’ll run out of memory (assuming that the database is bigger than memory, which is a pretty safe assumption). That means that we need to decide what to remove from memory. When we use mmap(), the OS gets to decide that for us. In fact, that is probably the best argument against using mmap(). The additional control we get from knowing how to manage the memory is the chief reason to take the plunge and manage our own memory. There are a lot of algorithms around managing memory, I really like this one, because it is quite elegant. However, that requires quite a lot of states to be dealt with, especially when working with highly concurrent systems. Instead, I chose to look at the clock sweep algorithm. This is also implemented by PostgreSQL, and it is actually far simpler to work with. The idea is that for each page, we maintain a usage count. Each time that we need to get a page, we’ll increment its usage count (up to a small limit). Each time we need to evict a page, we’ll search for a page that can be evicted and has no recent usage. If it has usages, we’ll decrement that value and repeat until we find something. Our buffer management isn’t actually dealing with pages, however. We are working with 2MB chunks, instead. The principal is the same, but using bigger aggregates is advantageous given typical memory sizes these days. The first thing that we need to do is to modify the ChunkMetadata. I’m showing only the relevant changes. This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const ChunkMetadata = packed union { raw: u64, v: packed struct { version: u30, tag: Tag, references: u29, usages: u3, }, } view raw ChunkMetadata.zig hosted with ❤ by GitHub The major change here is the introduction of the usages field. That is a 3 bits field (0 .. 8 in range) and we reduced the number of references a chunk can have to about 500 million (should be sufficient, I believe). The idea here is that each time that we call addRef(), we’ll increment the usages count, like so: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn addRef(self: *ChunkMetadata) !void { comptime var maxReferences = std.math.maxInt(@TypeOf(self.v.references)); if (@intCast(u64, self.v.references) + 1 > maxReferences) { return error.ChunkReferenceCountOverflow; } self.v.references += 1; self.v.version +%= 1; self.v.usages +|= 1; // saturating addition } view raw addRef.zig hosted with ❤ by GitHub Zig has a nice feature that I’m using here, saturating addition. In other words, if the value is incremented beyond its limit, it is clamped to the limit. That means that I don’t have to worry about overflows, etc. I took a look at how this is implemented, and the compiler generates the following assembly for this code (x +| 100) : This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters add eax, 100 mov ecx, -1 cmovb eax, ecx view raw saturating.asm hosted with ❤ by GitHub This may look daunting, but I’ll break it down. First, we add the 100 to the value, as you can expect (the value is currently in the EAX register). Then we store –1 (value of 0xFFFFFFFF) in the ECX register. Finally, we use the CMOV instruction (the CMOVB in the snipper is a variant of CMOV), telling it to store ECX in EAX if the carry flag is set on the last addition instruction. For fun, this also avoids a branch in the code, which is great for performance. One of the critical functions that we need to consider here is the behavior of the pager when we are accessing rare pages once, and then never again. A really common scenario is when we are doing a scan of some data for a rare query. For that reason, the usages behavior is a bit more complex than one might imagine. Let’s explore this for a bit before moving on. Look at the following code, I marked the important lines with stars: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn markLoaded(self: *FileChunks, chunk: u64) ![]align(mem.page_size) u8 { while (true) { var copy = self.chunks[chunk]; var origin = copy; switch (copy.v.tag) { .Value => return error.ValueAlreadyExists, .Error => return error.ValueInErrorState, .Empty => return error.ValueIsNotLoading, .Loading => {}, } copy.setTag(.Value); try copy.addRef(); // ownership by the pager try copy.addRef(); // ownership by the caller // *********************** copy.setUsagesFrom(origin); // *********************** if (self.chunks[chunk].tryUpdate(origin, copy)) { return self.getLoadedChunk(chunk); } } } fn trySetToEmpty(self: *FileChunks, index: u64) !void { while (true) { var modified = self.chunks[index]; if (modified.getTag() != .Loading) { // someone else modified it while we where releasing the memory return error.ValueIsNotLoading; } var released = modified.reset(.Empty); // *********************** released.setUsages(1); // next time we load the chunk, we'll know to keep it around // *********************** if (self.chunks[index].tryUpdate(modified, released)) break; } } view raw usages.zig hosted with ❤ by GitHub When we mark a chunk as loaded, we copy the usages from the current record. That starts out as zero, so for the scenario of accessing rare pages, we’ll have a good reason to evict them soon. However, there is a twist, when we remove a chunk from memory, we also set its usage count to 1. That is an interesting issue. The chunk is not loaded, why does it have a usage count? That is because if we removed it from memory, and we load it again, we want it to start with a higher usage count (and less chance to be evicted). In this manner, we are somewhat simulating the 2Q algorithm. Now, let’s take a look at the actual reclaiming portion, shall we? In the chunk metadata, we have the following behavior: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const RelciamOptions = enum { None, ReduceUsages, Reclaim, }; pub fn reclaim(self: *ChunkMetadata) RelciamOptions { switch (self.v.tag) { .Value => { if (self.v.references == 1) { return .Reclaim; } }, .Empty => { if (self.v.usages > 0) { return .ReduceUsages; } }, else => {}, } return .None; } pub fn reduceUsages(self: *ChunkMetadata) ChunkMetadata { return ChunkMetadata{ .v = .{ .tag = self.v.tag, .version = self.v.version +% 1, .references = self.v.references, .usages = self.v.usages -| 1, } }; } view raw ChunkMetadata.usages.zig hosted with ❤ by GitHub If we have a value and no outstanding references, we can reclaim it, if we don’t have a value, we’ll reduce the usage count anyway. The idea is that when we unload a page, we’ll set the usages count to 1. The usage counter of an unloaded page will reset this as time goes by, so if there has been a sweep on an empty chunk that has a usage count, we’ll not count that for the next time we load it. Basically, after unloading a chunk, if we reload it soonish, we’ll keep it around longer the next time. But if it is only rarely loaded, we don’t care and will forget that it was loaded previously. The process for actually reclaiming a chunk is shown here: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn tryReclaimChunk(self: *FileChunks, result: *ReclaimResult) bool { var index = result.lastScannedIndex; while (true) { var copy = self.chunks[index]; switch (copy.reclaim()) { .None => return false, .ReduceUsages => { var modified = copy.reduceUsages(); if (self.chunks[index].tryUpdate(copy, modified)) return false; continue; }, .Reclaim => {}, } result.hasCandidates = true; var modified = copy.reduceUsages(); if (modified.hasUsages()) { // update the reduced usage if (self.chunks[index].tryUpdate(copy, modified)) return false; } modified = copy.reset(.Loading); // means that we own it for the duration... if (self.chunks[index].tryUpdate(copy, modified)) return true; } } view raw tryReclaimChunk.zig hosted with ❤ by GitHub We look at a particular index in the chunks, and check if we can claim that. Following out previous behavior, there are a bunch of options here. We can either have an empty chunk that remembers the previous usage, which we can reduce, or we can actually try to reclaim the chunk. Of course, that isn’t that simple, because even if we found a candidate for reclaiming, we still need to reduce its usages count. Only if it has no usages will we be able to actually start the removal process. Finally, we have the actual scanning of the chunks in the file, shown here: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const ReclaimResult = struct { lastScannedIndex: u64 = 0, reclaimed: bool = false, hasCandidates: bool = false, }; pub fn reclaim( self: *FileChunks, startIndex: u64, ) !ReclaimResult { var result = ReclaimResult{ .lastScannedIndex = if (startIndex < self.chunks.len) startIndex else 0 }; while (result.lastScannedIndex < self.chunks.len) : (result.lastScannedIndex += 1) { if (self.tryReclaimChunk(&result) == false) continue; // at this point, the current chunk is owned by us, and no one else can use it... result.reclaimedBytes += ChunkSize; _ = try os.mmap( self.getLoadedChunk(result.lastScannedIndex).ptr, ChunkSize, os.PROT.NONE, os.MAP.ANONYMOUS | os.MAP.PRIVATE | os.MAP.FIXED, -1, 0, ); try self.trySetToEmpty(result.lastScannedIndex); result.reclaimed = true; result.lastScannedIndex += 1; break; } return result; } view raw reclaim.zig hosted with ❤ by GitHub We scan the file from the provided start index and try to reclaim each chunk in turn. That may reduce the usages count, as we previously discussed. The actual process will continue until we have found a chunk to reclaim. Note that we are always claiming just a single chunk and return. This is because the process is repeatable. We start from a given index and we return the last index that we scanned. If the caller needs to free more than a single chunk, they can call us again, passing the last index that we scanned. That is why this is called the clock sweep algorithm. We are sweeping through the chunks that we have in the system, reaping them as needed. The code so far is all in the same FileChunks instance, but the Pager actually deals with multiple files. How would that work? We start by adding some configuration options to the pager, telling us how much memory we are allowed to use: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const Options = struct { capacity: usize = 0, hardMemoryLimit: usize = 0, softMemoryLimit: usize = 0, }; view raw Options.zig hosted with ❤ by GitHub We have both soft and hard limits here, because we want to give the users the ability to say “don’t use too much memory, unless you really have to”.  The problem is that otherwise, users get nervous when they see 99% memory being used and want to keep some free. The point of soft and hard limits is that this gives us more flexibility, rather than setting a lower than needed limit and getting memory errors with GBs of RAM to spare. In the Pager, we have the loadChunksToTransaction() that we looked at in the previous post. That is where we read the chunk from the file. We are going to modify this method so will reserve the memory budget before we actually allocate it, like so: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters { try self.reserveMemoryBudgetForChunk(); errdefer self.releaseChunkMemoryBudget(); // if null, someone else is loading... maybeBuffer = file.chunks.markLoading(chunkInFile) catch |err| switch (err) { error.ValueAlreadyExists => continue, // someone else is loading... else => return err, }; } view raw loadChunksToTransaction.zig hosted with ❤ by GitHub As you can see, we reserve the memory budget, then actually allocate the memory (inside markLoading()). If there is a failure, we release the budget allocation and report the error. To manage the memory budget, we need to add a few fields to the Pager: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters options: Options, usedMemory: std.atomic.Atomic(u64), filesRelcaimSweeps: [8]std.atomic.Atomic(u64), fn releaseChunkMemoryBudget(self: *Pager) void { _ = self.usedMemory.fetchSub(FileChunks.ChunkSize, .Release); } view raw Pager.zig hosted with ❤ by GitHub You can see that releaseChunkMemoryBudget() is pretty trivial. Simply release the memory budget and move on. Things are a lot more complex when we need to reserve memory budget, however. Before we dive into this, I want to talk a bit about the filesRelcaimSweeps field. That is an interesting one. This is where we’ll keep the last position that we scanned in the Pager (across all pages). However, why is that an array? The answer is simple. The Pager struct is meant to be used from multiple threads at the same time. Under memory pressure, we are likely to need to evict multiple chunks at once. In order to avoid multiple threads scanning the same range of the Pager to find chunks to remove, I decided that we’ll instead have several sweeps at the same time. On startup, we’ll initialize them to a random initial value, like so: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters var rnd = std.rand.DefaultPrng.init(@bitCast(u64, std.time.timestamp())); for (self.filesRelcaimSweeps) |*it| { rnd.fill(std.mem.asBytes(it)); } view raw init.zig hosted with ❤ by GitHub In this manner, under load, each thread is likely to scan an independent portion of the Pager’s memory, which should avoiding competing on the same memory to evict. And with that behind us, let’s see how we can actually use this to evict memory: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn reserveMemoryBudgetForChunk(self: *Pager) !void { // optimistically increment... if (self.usedMemory.fetchAdd(FileChunks.ChunkSize, .Release) < self.options.softMemoryLimit) { // simplest scenario return; } // we are *too* high, reduce the optimistic increment... _ = self.usedMemory.fetchSub(FileChunks.ChunkSize, .Release); // we need to release some memory... var pos = std.Thread.getCurrentId() % self.filesRelcaimSweeps.len; var scanStartIndex = self.filesRelcaimSweeps[pos].load(.Acquire) % (self.files.len * FileChunks.ChunksInFile); var hasCandidates = true; while (hasCandidates) { hasCandidates = false; var fileIdx = scanStartIndex / FileChunks.ChunksInFile; if (self.files[fileIdx]) |file| { var chunkInFile = scanStartIndex % FileChunks.ChunksInFile; var result = try file.chunks.reclaim(chunkInFile); _ = self.filesRelcaimSweeps[pos].fetchAdd( // update the location of the _next_ scan (result.lastScannedIndex - chunkInFile) + 1, .Release, ); if (result.reclaimed) { // we released a chunk (and reserved it too, so usedMemory remains the same) return; } if (result.hasCandidates) hasCandidates = true; } // still searching, move to the next file scanStartIndex = (scanStartIndex / FileChunks.ChunksInFile) * FileChunks.ChunksInFile + FileChunks.ChunksInFile; _ = self.filesRelcaimSweeps[pos].fetchAdd( FileChunks.ChunksInFile - (scanStartIndex % FileChunks.ChunksInFile), // move to next file... .Release, ); } // if we are here, we couldn't release any chunks var currentMem = self.usedMemory.load(.Acquire) + FileChunks.ChunkSize; if (currentMem > self.options.hardMemoryLimit) { return error.OutOfMemoryBudget; } // we are higher than the soft limit, but still okay... _ = self.usedMemory.fetchAdd(FileChunks.ChunkSize, .Release); } view raw reserveMemoryBudgetForChunk.zig hosted with ❤ by GitHub There is a lot of code here, and quite a few comments, because this is choke-full of behavior. Let’s dissect this in detail: We start by checking the memory budget and see if we are below the soft memory limits. We are doing this optimistically, but if we fail, we reset the state and then start to scan the Pager for chunks we can release. We do that by accessing one of the filesRelcaimSweeps values using the current thread id. In this way, different threads are likely to use different values and move them independently. We find the relevant file for the index and start scanning for chunks to release. We’ll stop the process when one of the following will happen: We released a chunk, in which case we are successful and can return with glory. We didn’t find a chunk, but found candidates (whose usage count is too high to discard). In the case of the second option, we’ll look for better options before getting back to them and discarding them. If we are completely unable to find anything to release, we’ll check if we exceed the hard memory limit and error, or just accept the soft limit as, well… soft limit and allocate the budget anyway. This happens as part of the process for loading chunks in the pager, so we’ll only need to release a single chunk at a time. For that reason, we remember the state so the next operation will start from where we left off. You can think about this as a giant set of hands that are scanning the range of chunks in memory as needed. There are actually a few things that we can implement that would make this faster. For example, we always scan through all the chunks in a file. We could try to maintain some data structure that will tell us which pages have usage count to consider, but that is actually complex (remember, we are concurrent). There is also another factor to consider. The ChunkMetdata is 64 bits in size, and a FilesChunk struct contains an array of 4096 such values, totaling 32KB in size. It is actually cheaper to scan through the entire array and do the relevant computation on each candidate than try to be smart about it. I think that this is it for now, this post has certainly gone for quite a while. In the next post in the series, I want to tackle writes. So far we only looked at reads, but I think we have all the relevant infrastructure at hand already, so this should be simpler.

DateTime as a Value Object

by Ardalis

posted on: January 12, 2022

Value Objects are a key part of Domain-Driven Design and domain models created by this process. However, they're not used nearly as often by…Keep Reading →

Implementing a file pager in Zig

by Oren Eini

posted on: January 11, 2022

We have finally gotten to the point where we can ask the pager for a page of data (reminder, a page in this case is 8KB of data) and get it back. Here is what this looks like: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters var page = try pager.getPage(&tx.loadedChuncks, 2, 1, tx.timeout); view raw usage.zig hosted with ❤ by GitHub There are a few things to note here. There is a new concept here, the transaction. We don’t want to have that concept at the pager level, so we just pass the relevant state to the pager to deal with. Basically, any transaction we’ll have will need to have a bit of state for the pager to deal with it. The state we need at this point is minimal, just the list of chunks that are referenced by the pager. You can also see that we provide a timeout for the getPage() call. What is that about? This leads to the second concern we need to consider. How are we expected to run this? If we call getPage() on a page (actually, a chunk containing this page) that isn’t resident in memory, we’ll need to go to the disk to read it. That can take a while, sometimes a long while. At a glance, that is one of those things that async/await was meant for. Since Zig supports async functions, that is certainly something that we can do, but it is something that I want to be cautious about. Having explicit blocking is far easier to understand and debug, at least for now. This is especially true if we’ll want to consume the pager API from a non Zig target. That leads to an interesting issue, however. If a call to getPage() can block, how can we avoid blocking the thread. In most cases, we would like to avoid blocking, after all. It would be simple to have tryGetPage() method, which will not block (but schedule the load of the page from disk), and then maybe register for a notification for that. If that sounds like async to you, that is because it is. The problem with this sort of approach is that you need to suspend execution somewhere in the middle of a transaction operation and continue when the data is loaded. Without async/await, you can’t really do that. Well, I mean, you could try, but we have a lot of experience with trying to manage state via callbacks, that isn’t really going to work for anything beyond the simplest systems (see: node.js without async/await). There is one thing that we can do that would be both simple and effective, however: we can error if the page isn’t in memory. That sounds like a pretty bad idea, no? How would that help us? Well, the concept is simple. If a transaction attempts to access a page that isn’t resident in memory, we’ll do the following operations: Schedule the chunk the page resides on to load into memory. Return an error from the pager Rollback the transaction Keep the loadedChunks for that transaction active and wait for the chunk to be loaded Re-run the transaction again, now the chunk is in memory and we can proceed further Each time that we re-run the transaction, we make sure that the chunks it needs are in memory, eventually ensuring that all the required chunks are resident and we don’t need to block. At the same time, the code to work with the transactions is not going to care about blocking, etc. We need to do the usual error handling, but that is required anyway. There is a single location where we need to deal with callbacks from the pager, so there is a limited blast radius of complexity. For write transactions, for example, this is a very reasonable strategy. We assume that there is only a single thread writing at a given time. A transaction being blocked because it needs to read a page from the disk can stall other pending transactions. By having it abort and retry later, we can keep the line moving. For read operations, on the other hand, that is likely not something that you want to do. If I’m already streaming results to the caller, I can’t just repeat the transaction. I’m not making any decisions at this point, just considering the various options and implications that we have to deal with at this early level. Now, let’s look at how the getPage() is actually implemented, shall we? This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn getPage(self: *Pager, loadedChunks: *ChunksSet, page: u64, count: u32, timeout: ?u64) ![]align(mem.page_size) u8 { var chunk = page / FileChunks.PagesInChunk; var end = (page + count) / FileChunks.PagesInChunk; // we ensure that any range is always within a single file... // we ensure that any range is always within a single file... var fileNum = page / FileChunks.PagesInFile; var file = self.files[fileNum] orelse return error.FileIsNotLoaded; try self.loadChunksToTransaction(loadedChunks, file, chunk, end, timeout); if (loadedChunks.get(chunk) != null) { // explicitly go beyond the scope of the chunk, because we know the proper memory layout // and if the size exceed a single chunk, we already loaded the next one(s) var ptr = file.chunks.ptr.ptr; var pageInFile = page % FileChunks.PagesInFile; if (@ptrToInt(ptr + ((pageInFile + count) * FileChunks.PageSize)) > file.chunks.ptrEnd) { return error.PageRequestIsOutOfBand; } var slice = ptr[(pageInFile * FileChunks.PageSize)..((pageInFile + count) * FileChunks.PageSize)]; return slice; } return error.ChunkWasNotLoaded; } view raw getPage.zig hosted with ❤ by GitHub There is a lot that is going on here, I know. We start by defining a set (a hash map using a 64 bits unsigned integer to a zero-sized value). The way this works with Zig is quite elegant, since we pay no memory cost for the values here. The majority of the work is done in the loadChunksToTransaction() function, which we’ll examine shortly, but you can see some interesting details already in getPage(). We assume that we have a page loaded, and any range of pages that we ask is always within a single page. The call to load the chunks actually puts them in the loadedChuncks argument. We verify that we loaded the chunk properly and then we create a slice to return for the caller. Note that we may request more than a single page and it is valid to ask for a range that contains multiple chunks. We validate that the range we return is within the memory range for the current file, we ensured that the chunks for a specific file are consecutive in memory, so we can safely return this pointer across multiple chunks without needing to think about it. There is another aspect of loadedChunks that we need to discuss. A transaction may use multiple pages from the same chunk, but we only need to load the chunk once. At the same time, we can avoid adding a reference to the chunk multiple times for each loaded page. When we close the transaction, we need to release the reference for these chunks, so we need to keep track of those. With that in mind, let’s see how we actually load the chunks to memory. As a reminder, we have two actors working together here. The FileChunks is used to store the chunks in memory and the PagerRing is used for parallel I/O. This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn scheduleLoadingChunks(self: *Pager, loadedChunks: *ChunksSet, file: *PagerFile, start: u64, end: u64) !void { while (start <= end) : (start += 1) { if (loadedChunks.get(start) != null) { continue; } var chunkInFile = (start % FileChunks.ChunksInFile); var maybeBuffer = file.chunks.tryGet(chunkInFile) catch |err| switch (err) { error.ValueIsLoading => continue, // someone else is loading this, will get later else => return err, }; if (maybeBuffer != null) { // a buffer exists, and we own a ref to it now, record it... try recordChunk(file, loadedChunks, start); continue; } { try self.reserveMemoryBudgetForChunk(); errdefer self.releaseChunkMemoryBudget(); // if null, someone else is loading... maybeBuffer = file.chunks.markLoading(chunkInFile) catch |err| switch (err) { error.ValueAlreadyExists => continue, // someone else is loading... else => return err, }; } if (maybeBuffer) |buffer| { // now need to load it... var state = try self.allocator.create(AsyncLoadChunkState); errdefer self.allocator.destroy(state); state.* = .{ .chunk = chunkInFile, .file = file, .parent = self }; try self.ring.submit(.{ .tag = .Read, .fd = file.fd, .buffer = buffer, .offset = chunkInFile * FileChunks.ChunkSize, .context = @ptrToInt(state), .result = .{ .bytes = 0, .err = null }, .callback = completeLoad, }); } } } fn loadChunksToTransaction(self: *Pager, loadedChunks: *ChunksSet, file: *PagerFile, start: u64, end: u64, timeout: ?u64) !void { try self.scheduleLoadingChunks(loadedChunks, file, start, end); // now we actually load the chunks to the transaction var currentGlobalChunk = start; while (currentGlobalChunk <= end) : (currentGlobalChunk += 1) { if (loadedChunks.get(currentGlobalChunk) != null) { continue; } var chunkInFile = (currentGlobalChunk % FileChunks.ChunksInFile); // we'll wait until we have the buffer _ = try file.chunks.getBlocking(chunkInFile, timeout); try recordChunk(file, loadedChunks, currentGlobalChunk); } } view raw loadChunksToTransaction.zig hosted with ❤ by GitHub That is a lot of code to throw at you, I know, let’s dissect it in detail. In this method, we are working on chunks, not pages, and we assume that we may have multiple chunks, that is why we have the while loops. We start by checking if the chunk is already loaded in the transactions’ loadedChunks. If it isn’t, we compute the position of the chunk in the file (the chunk number we get from the caller is the global one, after all) and try to get it from the FileChunks. This is where things get interesting.  When we call tryGet() for the current chunk, we may get an error because of two possible scenarios: The value is currently being loaded from the disk (some other transaction asked for it, probably). We don’t need to do anything further other than wait for it to show up. Another transaction tried to load it, but got an error. At this point we just return the error. We don’t try to do anything special here. In general, there may be a lot of errors to consider here. We may have temporary I/O issue, or run out of memory or something that is transient. Or we may have an actual problem at hand (bad sector on disk, corrupted data, etc). Regardless of what we are doing, we aren’t going to try to do any error handling here. We’ll just record the error and any future attempt to access that chunk will also error. The proper way to recover at this point is to restart the pager. This is assuming we have the other components of a database at play here. So we’ll re-run the journal files, apply recovery, etc. In short, any I/O issues like that are critical errors and require a restart of the system to come back to a known state. If the tryGet() method returned without an error, there are still two options to consider. The call may have returned a value (so we called addRef() on the chunk internally), we can simply add that to the chunks we own and move on. If there isn’t a value in memory, things start to get interesting. At this point we call markLoading(). We are basically racing to be the owners for loading this chunk. If we are successful in this race, we’ll get the buffer back from the FileChunks and can schedule reading the relevant chunk from the disk. You’ll note that we are setting the callback to completeLoad, we’ll look into that shortly. If we aren’t successful (we didn’t get a buffer back), then some other thread was able to get the buffer and will schedule the read for us, so we are done. After we either ensured that all the chunks are loaded or scheduled them to be loaded, we use getBlocking() to wait for all the relevant chunks to be available. Once that is done, we can safely return and getPage() will complete the process, as we saw earlier. The only thing that we have to look at is the completeLoad function, which is about as basic as you can get: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn completeLoad(work: *PagerRing.Work) void { var state = @intToPtr(*AsyncLoadChunkState, work.context); defer state.parent.allocator.destroy(state); var err: ?anyerror = null; if (work.result.err) |e| { state.file.chunks.markLoadError(state.chunk, e) catch |e2| { err = e2; }; } else { _ = state.file.chunks.markLoaded(state.chunk) catch |e3| { err = e3; }; } if (err) |e| { std.log.err("Ownership violation for chunk: {}, file: {}." ++ " Got error: {} when handling state {} ", .{ state.chunk, state.file, e, work.result.err, }); } } view raw completeLoad.zig hosted with ❤ by GitHub Most of the function is about error handling. We register either the fact that we got an error reading from the disk or that we completed the load process and maybe log something. In general, there isn’t really much that we need to do here. The act of calling markLoaded() will release any threads waiting on getBlocking(), after all. So the whole thing comes together quite nicely. With this done, we are mostly done on the reading side of the pager and this post as well. In my next post, I want to discuss how we should handle eviction of data. So far, we are just reading into memory, never releasing. We need to take care of that as well, of course. Once that is done, we can move to the wonderful topic of handling writes and durability…

2021 Year in Review

by Ardalis

posted on: January 11, 2022

Time for a recap of stats and important (and not so important) milestones from 2021, the SECOND year of the COVID-19 pandemic (yeah we…Keep Reading →

Implementing a file pager in Zig

by Oren Eini

posted on: January 10, 2022

This is my 7th post in this series, and we are now starting to get into the really interesting bits. So far we worked with the individual components, each of them doing just one thing, which makes it hard to see how they are all put together. In this post, we are going to start plugging things together. As a reminder, the whole point of this series of posts is to explore what we need to do in order to ask for a page (8KB, in our case) from the data file and work with it in memory.  We have most of everything ready, let’s put them back together. The first thing that we need to do is actually tie a file to the FileChunks structure that we previously created. This is as simple as this structure: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters const PagerFile = struct { chunks: *FileChunks, fd: std.os.fd_t, pub fn init(allocator: std.mem.Allocator, file: []const u8) !PagerFile { var chunks = try FileChunks.init(allocator); errdefer chunks.deinit(); var fd = try std.os.open( file, std.os.O.RDWR | std.os.O.CREAT | std.os.O.CLOEXEC | std.os.O.DIRECT | std.os.O.DSYNC, std.os.S.IRUSR | std.os.S.IWUSR, ); return PagerFile{ .chunks = chunks, .fd = fd, }; } pub fn deinit(self: *PagerFile) void { self.chunks.deinit(); std.os.close(self.fd); } }; view raw PagerFile.zig hosted with ❤ by GitHub We are simply initializing the FileChunks and opening the file, nothing else. Remember that a single pager is going to be responsible for multiple files. Furthermore, all the data structures that we are dealing with now are meant for concurrent use. That means that we should be prepared for this type of code: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters // on thread #1 try pager.addFile("/data/orders/data.003"); // on thread #2 try pager.getPage(&tx.owned,2034, 32, 15 * std.time.us_in_s); view raw concurrent.zig hosted with ❤ by GitHub When we use a managed language, that is actually fairly simple to work with. In an unmanaged language, those two lines of code are tough. Why is that? Let’s look at the raw data members for the Pager structure, shall we? This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const Pager = struct { allocator: std.mem.Allocator, ring: *PagerRing, files: []PagerFile, filesCapacity: usize, pendingFree: PagerFileList, const PagerFileList = std.ArrayListUnmanaged([]PagerFile); } view raw Pager.zig hosted with ❤ by GitHub Of particular interest to us is the files member. This is the list of files that are being managed by the pager. Each one of them has a maximum size of 8GB in size. The problem is that we may have one thread accessing the list at the same time that another thread wants to increase its size. How would that work? The simplest option is that we’ll reallocate the array, but that will move it. What would the first thread be doing in that scenario? The good thing from our point of view is that we don’t need to worry about concurrent modifications. There is only a single thread that is allowed to modify the Pager’s state at any given point in time. Trying to find solutions for this problem leads into a rabid rabbit’s hole. You go into hazard pointers, epoch GC and other fun stuff. Also, the call to getPage() is one of the most important ones in a database, anything that we can do to reduce its cost will be done. As such, we can’t typically use any form of locking. A reader/writer lock can be a killer. Here is a good example for how that can happen. I thought about how to resolve this, and I decided against a generic solution, instead, let’s look at the actual behavior that we need. The files array is going to be accessed a lot, it has an entry per 8GB of disk space that we take. That means that it isn’t going to be experiencing any rapid growth. It is also only going to grow. We also need to worry only when we grow the physical backing store for this array, if we overprovision and use less than we need, that is perfectly fine. Each element in the array is 8 bytes in size, so if we allocate a single memory page (4KB) we can store 512 file references in it. That represents 4 TB(!) of data, so we can probably just accept the additional cost and allocate it and not think about it. Databases with > 4TB of disk size do exist, and we don’t want to have this artificial limit on us, do we? Instead, we can use another approach. We’ll start by allocating the array with a minimum size of 8 elements (sufficient until you get to 64GB). But what happens when we reach that size? This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn addFile(self: *Pager, file: []const u8) !void { // caller holds a write lock, we don't care about readers, they can follow // either new or old pointers and are guarantee to only access the old range if (self.files.len + 1 >= self.filesCapacity) { var nextCapacity = self.filesCapacity * 2; var newFiles = try self.allocator.alloc(PagerFile, nextCapacity); errdefer self.allocator.free(newFiles); try self.pendingFree.append(self.allocator, newFiles); std.mem.copy(PagerFile, newFiles, self.files); self.files.ptr = newFiles.ptr; self.filesCapacity = nextCapacity; } var cur = self.files.len; self.files.len += 1; self.files[cur] = try PagerFile.init(self.allocator, file); } view raw addFile.zig hosted with ❤ by GitHub What we do here is cheat. We don’t need to free the memory immediately, when we reach the limit of the size of the array, we’ll double its size, copy the data that we have and register it to be freed when we close the Pager. At that point, the caller already needs to ensure that there are no other users of the pager. Because we copy the values from the old array, but keep it around, old readers may use the old or new arrays, but we don’t actually care. The memory remains valid and accessible. In terms of wasted space, if our database went from a single file to being 128 TB in one run, we’ll have an array with 16,384 elements (whose size is 128KB). Along the way, we had to double the size of the array a dozen times and we “waste” 128KB of unused buffers. This seems like a pretty reasonable cost to significantly reduce the level of complexity of concurrent access. Using this method, we can avoid any sort of synchronization on the read side. That is certainly a plus. Here are the init() and deinit() calls for the Pager, to complete the picture: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn init(allocator: std.mem.Allocator, capacity: usize) !*Pager { var self = try allocator.create(Pager); errdefer allocator.destroy(self); self.allocator = allocator; self.filesCapacity = std.math.max(8, nextPowerOfTwo(capacity)); var files = try allocator.alloc(PagerFile, self.filesCapacity); errdefer allocator.free(files); self.pendingFree = try PagerFileList.initCapacity(allocator, 1); errdefer self.pendingFree.deinit(allocator); self.ring = try PagerRing.init(allocator); errdefer self.ring.deinit(); try self.pendingFree.append(allocator, files); self.files = files; self.files.len = 0; // no values yet... return self; } pub fn deinit(self: *Pager) void { for (self.files) |*file| { file.deinit(); } while (self.pendingFree.popOrNull()) |toFree| { self.allocator.free(toFree); } self.pendingFree.deinit(self.allocator); self.ring.deinit(); self.allocator.destroy(self); } view raw Pager.zig hosted with ❤ by GitHub As you can see, we allocate a PagerRing for the pager, which will deal with the actual I/O. The actual disposal of the files array is managed using the pendingFree list. That is a small cost to pay, to reduce the cost of adding a new file. In the deinit() routine, note that there is a distinction between releasing the files themselves (where we close the FilesChunk, release the relevant memory, close the file handle, etc) and releasing the arrays that hold the files themselves. I’m quite pleased with how this turned out, zero cost for reads (important) and negligible memory cost for most scenarios). In my next post, I’ll get started with actually reading the data from disk and putting that in the pager. So far, that is a 7 post series, and we haven’t completed the first part. That simple scenario is surprisingly tricky.

Using HTTP/3 (QUIC) in .NET

by Gérald Barré

posted on: January 10, 2022

#What's HTTP/3?HTTP/3 is a new version of HTTP. HTTP/3 protocol is supported by most modern browsers and servers. This update should bring performance benefits mostly for mobile users or unreliable connections. The main idea is to replace TCP by a new protocol, QUIC, which removes some issues of TC

Teaching, learning and tearing your hair out

by Oren Eini

posted on: January 06, 2022

One of my favorite activities is teaching. I love explaining how things work and passing on knowledge. Another good way to pass the time is to learn, which can be a source of great joy and incredible frustration. Recently I had a conversation with a university student in the Computer Science track talking about databases. As you might imagine, that is a topic that is near and dear to my heart, so I had a lot of interest in hearing from the student what kind of databases they have been exposed to and what impression they had on them. The student in question was familiar with MySQL from his courses and had run into PostgreSQL in some job interviews. He was very much in the PostgreSQL camp, that isn’t an uncommon reaction, but the reason for that was interesting to me. In his course, they had to setup and configure MySQL from scratch. That was annoying and hard, especially since they used the command line to make queries to the database. In the PostgreSQL case, however, he was given access to a web front end to the database and was assigned tasks to complete. They could get started right away doing the most important aspects of their task. When I’m teaching, part of the job is data dump (here are the things you need to know), part of the job is to answer questions, make sure that they understand the material, etc. A crucial part of teaching is getting the students to do things, making sure that the students are actually exercising the new knowledge. In such cases, I noted that I provide them with the baseline and they need to complete just the parts that are missing, the core pieces. That is pretty much the same thing that the student ran into during their interview. In retrospect, for teaching, I think that this approach is a serious issue. One of the most important hurdles that I see for new developers is their ability to deal with problems. Whether it is actually reading the errors, composing some mental model for what is going on or just being able to dig deeper and understand what is happening. I’m not talking about experience, mind. I’m talking about the approach. If most of the tasks that they have dealt with so far were ones which were “fill the missing pieces”, they are likely never had the experience of dealing with everything else. And in many cases, the issues aren’t in the areas that you are thinking, they can be somewhere else completely. I remember many times where I had to do something, and I ran into a wall. That was incredibly frustrating, especially when the issue was somewhere completely orthogonal to what I’m trying to do. A great example recently was having to figure out how to do cross compilation in GitHub action using GCC. That took a lot of time and all I wanted to do is to just call a single function from native code. As frustrating as that is, I think that there is a huge amount of value in those kinds of “side quests”. That is especially true when someone is in the early stages of their career. Those are just the sort of hurdles that can teach you not only what is going on at your level but about the ecosystem in general and the layers beneath you. A great example of lack of such knowledge is a candidate in an interview recently that was asked: “Given a static HTML page and a domain name that was registered, what do you need to setup a website for that page in that domain?” The candidate had no idea what was actually involved (nothing about DNS, routing, servers, etc). They were likely able to write an application using modern practices, but everything about pushing to production, what is a website, what is a domain name or IPs… nope. And that makes sense, they never had even run into something like that. On the other hand, I remember building websites using FTP and Apache / IIS in the 90s. It wasn’t fun, but I had very little abstraction to deal with and was exposed to the working of the engine. And that sort of thing matters, because you will need to understand details such as DNS propagation times and its impact on what your system is doing, for example.

Implementing a file pager in Zig

by Oren Eini

posted on: January 05, 2022

After implementing the memory management in the previous post, I set out to handle the actual I/O primitives that we need. As a reminder, we are separating the concerns here. We managed memory and reference counting in the previous post and now I want to focus on how we can read and write from the disk in as efficient a manner as possible. Before we get to the guts of the code, I want to explain a bit about the environment that I have in mind. Most of the time, the pager is able to provide the requested page from memory directly. If it can’t do that, it needs to consider the fact that there may be multiple threads that are trying to load that page.  At the same time, while we are loading the page, we want to be free to do other things as well. I decided to implement the I/O routine using async I/O. Here is the rough sketch of the API I have in mind: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const BackgroundRing = struct { const IoRingQueueSize = 32; const WorkReqStack = std.atomic.Stack(Work); pub const CallbackFn = fn (*Work) void; pub const Work = struct { tag: enum { Read, Write }, fd: std.os.fd_t, buffer: []u8, offset: u64, context: u64, result: struct { bytes: u64, err: ?i32, }, callback: CallbackFn, }; allocator: std.mem.Allocator, event_fd: i32, worker: std.Thread, background_error: ?anyerror, pending: WorkReqStack, running: bool, pub fn init(allocator: std.mem.Allocator) !*BackgroundRing { var self = try allocator.create(FileRing); errdefer allocator.destroy(self); self.allocator = allocator; self.pending = WorkReqStack.init(); self.event_fd = try std.os.eventfd(0, 0); self.running = true; self.worker = try std.Thread.spawn(.{}, background_worker_wrapper, .{self}); return self; } pub fn deinit(self: *BackgroundRing) void { self.running = false; @fence(.Acquire); self.wake_worker() catch {}; self.worker.join(); while (self.pending.pop()) |node| { self.allocator.destroy(node); } std.os.close(self.event_fd); self.allocator.destroy(self); } pub fn submit(self: *BackgroundRing, work: Work) !void { var ptr = try self.allocator.create(WorkReqStack.Node); ptr.data = work; self.pending.push(ptr); try self.wake_worker(); } fn background_worker_wrapper(self: *BackgroundRing) void { self.background_worker() catch |err| { self.background_error = err; }; } fn background_worker(self: *BackgroundRing) !void { while (self.running) { try self.wait_for_work(); while (self.pending.pop()) |node| { const bytes = switch (node.data.tag) { .Read => std.os.pread(node.data.fd, node.data.buffer, node.data.offset), .Write => std.os.pwrite(node.data.fd, node.data.buffer, node.data.offset), } catch |err| val: { node.data.result.err = @errorToInt(err); break :val 0; }; node.data.result.bytes += bytes; if (bytes != 0 and bytes < node.data.buffer.len) { node.data.offset += bytes; node.data.buffer = node.data.buffer[bytes..]; self.pending.push(node); // retry... continue; } node.data.callback(&node.data); self.allocator.destroy(node); } } } fn wake_worker(self: *BackgroundRing) !void { var w: u64 = 1; _ = try std.os.write(self.event_fd, std.mem.asBytes(&w)); } fn wait_for_work(self: *BackgroundRing) !void { var val: u64 = undefined; _ = try std.os.read(self.event_fd, std.mem.asBytes(&val)); } }; view raw BackgroundRing.zig hosted with ❤ by GitHub The idea is simple, we use a struct to which we can submit work in an asynchronous manner. At some later point in time, the work will complete and our read or write will be done. At that point we’ll be able to invoke the provided callback for the user. The code above is about as simple as you can manage, it spawns a dedicated thread to manage the I/O and then just issues those operations directly. To save myself some headache, I’m using an eventfd as a synchronization mechanism, I don’t strictly need this, but it will be useful down the road. In terms of the API, I can now write the following code: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters var file = try std.fs.openFileAbsolute(filename, .{ .write = true, .read = true }); defer file.close(); try ring.submit(.{ .tag = .Read, .buffer = &buffer, .offset = 0, .fd = file.handle, .context = @ptrToInt(&waiter), .callback = handle_completion, .result = .{ .bytes = 0, .err = null }, }); view raw usage.zig hosted with ❤ by GitHub The basic idea is that the BackgroundRing struct doesn’t manage file descriptors or buffers. It is a strict way to manage I/O. The API is pretty piss poor as well, in terms of usability. No one will ever want to write generic I/O routines using this method, but we aren’t trying to do generic I/O, we are trying to ensure usability in a very constrained manner, inside the pager. About the only nice thing that we do in this implementation is handle partial reads and writes. If we were asked to read more than what we got, we’ll repeat the operation until we get to the end of the file or succeed. In terms of implementation, as well, this is a really bad idea. We look like we are doing async I/O, but we are actually just passing it all to a background thread that will do the work off a queue. That means that it will be very hard to make full use of the hardware capabilities. But I think that you can guess from the name that I’m not going to leave things like that. I’m going to use the new io_uring API in Linux to handle most of those concerns. That idea is that we’ll allocate a command buffer in the kernel and allow the kernel to handle asynchronous execution of the I/O operations. We still retrain the same rough structure, in that we are going to have a dedicated background thread to manage the commands, however. Amusing enough, the io_uring API is meant to be used from a single thread, since otherwise you’ll need to orchestrate writes to the ring buffer from multiple providers, which is much harder than a single consumer, single producer scenario. The use of io_uring is also why we are using the eventfd model. We are registering that file descriptor in the io_uring so it will let us know when event completes. This also does double duty as the method that we can use to wake the background thread when we have more work for it to do. The most major change is inside the background worker, of course. Here is how this looks like: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn background_worker(self: *PagerRing) !void { var cqes = std.mem.zeroes([IoRingQueueSize]std.os.linux.io_uring_cqe); while (self.running) { try self.wait_for_work(); var shouldWake: bool = false; while (self.pending.pop()) |node| { const op = switch (node.data.tag) { .Read => IO_Uring.read, .Write => IO_Uring.write, }; _ = op( &self.ring, @ptrToInt(node), node.data.fd, node.data.buffer, node.data.offset, ) catch |e| switch (e) { error.SubmissionQueueFull => { self.pending.push(node); shouldWake = true; break; }, else => { self.allocator.destroy(node); return e; }, }; } _ = self.ring.submit() catch |err| switch (err) { error.CompletionQueueOvercommitted => shouldWake = true, error.SignalInterrupt => shouldWake = true, else => return err, }; // now let's process the completed values const n = try self.ring.copy_cqes(cqes[0..], 0); for (cqes[0..n]) |cqe| { var node = @intToPtr(*WorkReqStack.Node, cqe.user_data); if (cqe.res > 0 and cqe.res < node.data.buffer.len) { // partial operation, need to resubmit var bytes = @intCast(usize, cqe.res); node.data.buffer = node.data.buffer[bytes..]; node.data.offset += bytes; node.data.result.bytes += bytes; self.pending.push(node); shouldWake = true; continue; } defer self.allocator.destroy(node); if (cqe.res < 0) { node.data.result.err = -cqe.res; } else { node.data.result.bytes += @intCast(usize, cqe.res); } node.data.callback(&node.data); } if (shouldWake) { try self.wake_worker(); } } } view raw Ring.zig hosted with ❤ by GitHub We create  the ring in the init function (see full code listing below) and in the background thread we are simply waiting for an event using the eventfd. When a caller submits some work, we’ll register that on the io_uring and wait for it to complete (also using the eventfd). You can see that I’m handling some basic states (partial reads, full queues, etc). The code itself ends up being pretty small. Once we are done with the operation, we let the user know about the completion. There are a few things here that are interesting to note. We are actually allowing interleaving of operations, so we may have many outstanding operations at any given point. We aren’t trying to guarantee any kind of ordering between the operations, nor are we providing anything but the most bare bones interface for the caller. Even so, there is quite a bit of power in the manner in which we are working here. We need to complete a few more components first, and then we can bring it all together… Here is the full listing of the PagerRing code.  In the next post, I want to focus on the actual overall pager, when we are managing multiple files and how we work with all of them together. In particular, we want to understand how we manage the memory budget across all of them.