skip to content
Relatively General .NET

Why Use DateTimeOffset

by Ardalis

posted on: January 17, 2022

Raise your hand if you've stored entity values in a database as DateTime. Ok, everybody has their hand up. You can put your hand down - you…Keep Reading →

Deleting GitHub Actions artifacts using the GitHub REST API

by Gérald Barré

posted on: January 17, 2022

GitHub Actions supports sharing data between jobs in any workflow as artifacts. This is very convenient but the storage is limited (or you have to pay for it). If you use too much storage, you may get one of those notifications:You've used 75% of included services for GitHub Storage (GitHub Actions

re

by Oren Eini

posted on: January 14, 2022

I was pointed to this paper on twitter: Are You Sure You Want to Use MMAP in Your Database Management System? As you can imagine, this is a topic near and dear to my heart. This is especially the case since I am currently writing the Implementing a file pager in Zig posts series. I implemented the same low level mechanics using mmap, using mmap, I have < 100 lines of code and can start building higher level concepts almost immediately. Writing my own pager is currently a 10 posts series and the end doesn’t seem to be in sight. I’m going to use this post to respond to the article. As a reminder, I’m the founder of RavenDB and I wrote Voron, a mmap based storage engine, and has been running that across hundreds of clients and literally tens of millions of instances in production. I am also writing a book about building a storage engine that uses mmap internally. The paper itself does a great job of outlining the issue of using mmap as the buffer pool in DBMS. What it doesn’t cover, however, is the alternative. I will touch on specific points from the paper shortly, but I want to point out that the article compares apples to camels in the benchmarks and conclusions. Note that I don’t necessarily disagree with some of the statements, mmap certainly has challenges that you need to deal with, but if you avoid that, you can’t have wave everything that it brings to the table. When building a database, using mmap has the following advantages, the OS will take care of: Reading the data from disk Concurrency between different threads reading the same data Caching and buffer management Eviction of pages from memory Playing nice with other processes in the machine Tracking dirty pages and writing to disk* I put an asterisk on the last one because it probably requires your attention as well. If you aren’t using mmap, on the other hand, you still need to handle all those issues. That is a key point that I believe isn’t addressed in the paper. Solving those issues properly (and efficiently) is a seriously challenging task. Given that you are building a specialized solution, you can probably do better than the generic mmap, but it will absolutely have a cost. That cost is both in terms of runtime overhead as well as increased development time. The comparison that was made by the paper was done using fio benchmark tool, which is great if you want to test your storage system, but is pretty much irrelevant if you are trying to benchmark a buffer pool. Consider the following: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters // pool.h void* getPage(pool* p, long id); // mmap_pool.c void* getPage(pool* p, long id){ return p->base_ptr + (id * PAGE_SIZE); } // fio_pool.c void* getPage(pool* p, long id) { // * lookup if the page is in memory // * if not, schedule its load // *** block the current thread? return an error and have it try later? // *** other threads that need this page should not initiate another load // * record that the page is used (to handle evictions later) // * record that this page is in use (to avoid evicting a page while it is in use) } view raw buffer_pool.c hosted with ❤ by GitHub For the mmap version, we need to compute the address of the page and that is pretty much it. For the manual buffer pool, the list of tasks that we need to handle is long. And some of them require us to be thread safe. For example, if we handed a page to a transaction, we need to keep track of that page status as being in use. We cannot evict this page until the transaction is done with it. That means that we probably need to do atomic reference counting, which can have very high costs. There are other alternatives, of course, but they all have even higher degrees of complexity. In practice, data access within a database isn’t actually random, even if you are doing random reads. There are pages that are going to almost always be referenced. The root page in the B+Tree is a good example. It is always going to be used. Under atomic reference counting, that page is going to be a bottleneck. Ignoring such overhead of the buffer pool management means that you aren’t actually comparing equivalent structures. I should also point out that I’m probably forgetting a few other tasks that the buffer pool needs to manage as well, which complicate its life significantly. Here is an example of such a buffer pool implementation from what is effectively a random GitHub repository. You can see what the code is trying to do here. The reason I point to this is that there is a mutex there (and I/O under the lock), which is fairly typical for many buffer pools. And not accounting for the overhead of buffer pool management is seriously skewing the results of the paper. All of this said, I absolutely agree that mmap can be challenging. The paper outlines 4 different problems, which I want to address. Problem #1 – Transactional safety A database needs to know when the data is persisted to disk. When using mmap, we explicitly give up that knowledge. That can be a challenge, but I don’t see that as a seriously different one from not using mmap. Let’s consider the manner in which Postgres is working. It has its own buffer pool, and may modify the pages as a result of a write. Postgres may need to evict modified pages to disk before the transaction that modified them is committed. The overhead of managing that is just… part of the challenge that we need to deal with. For RavenDB, as the paper points out, we modify the pages outside of the mmap memory. This is actually not done for the reason the paper describes. I don’t actually care if the data is written to memory behind my back. What I care about is MVCC (a totally separate concern than buffer management). The fact that I’m copying the modified data to the side means that I Can support concurrent transactions with far greater ease. In a similar fashion, Postgres handles MVCC using multiple entries for the same row in the same page. When the transaction commits and older transactions no longer need the old version of the data, I can push the data from the modified buffers to the mmap region. That tends to be fairly fast (given that I’m basically doing memcpy(), which runs at memory speed) unless I have to page data in, more on that later. The paper mentions the issue of single writer in LMDB, and I wanted to point out that a single writer model is actually far more common (and again, not really related to the buffer pool issue). Off the top of my head, most embedded databases implement a single writer model. LMDB Voron (RavenDB’s storage engine) LevelDB Lucene The one that I can think that doesn’t have a single writer is RocksDB(where allow_concurrent_memtable_write is for writes to the memtable, not related to file I/O). The reason this matters is that embedded systems can typically assume that all operations in a transaction will complete as a unit. Compare to Postgres, where we may have a transaction spanning multiple network calls, interleaving writes is a must. If we could avoid such concurrency, that would be far preferable. You can get additional concurrency by having sharding writes, but that is usually not needed. Problem #2 – I/O Stalls The paper points out, quite correctly, that not having control over the I/O means that you may incur a page fault at any time. In particular, you may end up blocked on I/O without really noticing. This can be a killer especially if you are currently holding a lock and blocked on page fault. Indeed, I consider this to be the most serious issue that you have to deal with mmap based systems. In practice, however, the situation isn’t so clear cut. Until quite recently, the state of asynchronous I/O on Linux was quite iffy. Until the arrival of io_uring, certain operations that you expected to be async would block occasionally, ruining your day. The paper mentions that you can use async I/O to issue I/O requests to load the next pages (non sequentially) from the disk when you are performing certain operations. You can do the same with mmap as well, and RavenDB does just that. When you start a scan on a B+Tree, RavenDB will ask the OS to ensure that the memory we are interested in is in memory before we actually get to it. On Linux, this is done with madvise(WILL_NEED) call. That call may be blocking, so we actually have a dedicated thread that is meant to handle such a scenario.  In practice, this isn’t really that different from how you’ll handle it with async I/O. Another consideration to deal with is the cost of mapping at the kernel level. I’m not talking about the I/O cost, but if you have many threads that are faulting pages, you’ll run into problems with the page table lock. We have run into that before, this is considered an OS level bug, but it obviously has an impact on the database. In practice, however, the overhead of memory management is the same in most cases. If you are reading via mmap or allocating directly, you’ll need to orchestrate things. Note that the same page table lock is also in effect if you are heavily allocating / freeing, since you’re also modifying the process page table. Problem #3 – Error Handling Error handling is a serious concern for a database. The paper points out that databases such as SQL Server may run a checksum when reading data from disk. When you use a buffer pool, the boundary of reading from the disk is obvious and you can easily validate the read from the disk. Voron is using mmap exclusively, and we do have checksums. We validate the page from the disk the first time that we access it (there is an internal bitmap that is used for that).  There isn’t a big difference between the two anyway. We only check a given page once per run, because to do otherwise is meaningless. When you use read() to get data from the disk, you have no guarantees that the data wasn’t fetched from a cache along the way. So you may validate the data you read is “correct”, while the on disk representation is broken. For that reason, we only do the check once, instead of each time. A far greater issue to deal with is I/O errors. What do you do when a read or a write fails? If you are using system calls to manage that, you get a return code and can react accordingly. If you are using mmap, the system will generate a SIGBUS that you’ll have to (somehow) handle. For a database, dealing with I/O errors has a single correct answer. Crash and then run recovery from scratch. If the I/O system has returned an error, there is no longer any way to know what the state of that is. See: fsync-gate. The only way to recover is to stop, reload everything (apply the WAL, run recovery, etc) and get back into a stable state. SIGBUS isn’t my cup of tea with regards to handling this, but error handling for I/O error isn’t actually something that you do, so just restarting the process ends up more acceptable than you might initially think. Problem #4 – Performance issues The paper points out three common reasons for performance issues with mmap usage: page table contention single threaded page eviction TLB shootdowns The first issue is something that I have run into in the past. It was a bug in the operating system which was fixed. There is no longer a single page table in both Windows and Linux. The single threaded eviction, on the other hand, is something that we never run into. When using Voron, we map the memory using MAP_SHARED, and most of the time, the memory isn’t dirty. If the system needs memory, it can do that when it assigns a page by just discarding the memory of an unmodified shared page. In this model, we typically see most of the memory as shared, clean. So there isn’t a lot of pressure to evict things, and it can be done on as needed basis. The TLB shootdown issue is not something that we ever run into as a problem. We have run TB range databases on Raspberry PI with 4GB of RAM and hammered that in benchmarks (far exceeding the memory capacity). The interesting thing here is that the B+Tree nature means that the upper tiers of the tree were already in memory, so we mostly ended up with a single page fault per request. In order to actually observe the cost of TLS Shootdown in a significant manner, you need to have: really fast I/O working set that significantly exceeds memory no other work that needs to be done for processing a request In practice, if you have really fast I/O, you spent money on that, you’ll more likely get more RAM as well. And you typically need to do something with the data you read, which means that you won’t notice the TLB shootdown as much. Finally, going back to how I started this post. This assumes that there are no other costs of not using mmap and using direct IO. The benchmark doesn’t account for those extra costs. For example, without mmap, who is doing evictions? In practice, that will lead to the same sort of considerations that you’ll have when dealing with mmap. This is especially the case with TLS shootdown when we start talking about high memory traffic (which likely modifies page allocations for the process, leading to the same scenario). The paper has been quite interesting to read and it has presented a number of real problems that occur with mmap based systems, but I’m afraid that it doesn’t present the alternatives properly and vastly underestimates both costs and complexity of not using mmap and writing your own buffer pool.

Working With Market Data Using Time Series in RavenDB

by Oren Eini

posted on: January 13, 2022

Kamran Ayub has another great article discussing how to utilize RavenDB to analyze crypto markets data. In addition to the article, you can see the full source code for the sample application. There is even a really cool intro video for the application and article:

Implementing a file pager in Zig

by Oren Eini

posted on: January 12, 2022

Up to this point, we focused on reading data from the disk, we can do that up to a point. Eventually we’ll run out of memory (assuming that the database is bigger than memory, which is a pretty safe assumption). That means that we need to decide what to remove from memory. When we use mmap(), the OS gets to decide that for us. In fact, that is probably the best argument against using mmap(). The additional control we get from knowing how to manage the memory is the chief reason to take the plunge and manage our own memory. There are a lot of algorithms around managing memory, I really like this one, because it is quite elegant. However, that requires quite a lot of states to be dealt with, especially when working with highly concurrent systems. Instead, I chose to look at the clock sweep algorithm. This is also implemented by PostgreSQL, and it is actually far simpler to work with. The idea is that for each page, we maintain a usage count. Each time that we need to get a page, we’ll increment its usage count (up to a small limit). Each time we need to evict a page, we’ll search for a page that can be evicted and has no recent usage. If it has usages, we’ll decrement that value and repeat until we find something. Our buffer management isn’t actually dealing with pages, however. We are working with 2MB chunks, instead. The principal is the same, but using bigger aggregates is advantageous given typical memory sizes these days. The first thing that we need to do is to modify the ChunkMetadata. I’m showing only the relevant changes. This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const ChunkMetadata = packed union { raw: u64, v: packed struct { version: u30, tag: Tag, references: u29, usages: u3, }, } view raw ChunkMetadata.zig hosted with ❤ by GitHub The major change here is the introduction of the usages field. That is a 3 bits field (0 .. 8 in range) and we reduced the number of references a chunk can have to about 500 million (should be sufficient, I believe). The idea here is that each time that we call addRef(), we’ll increment the usages count, like so: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn addRef(self: *ChunkMetadata) !void { comptime var maxReferences = std.math.maxInt(@TypeOf(self.v.references)); if (@intCast(u64, self.v.references) + 1 > maxReferences) { return error.ChunkReferenceCountOverflow; } self.v.references += 1; self.v.version +%= 1; self.v.usages +|= 1; // saturating addition } view raw addRef.zig hosted with ❤ by GitHub Zig has a nice feature that I’m using here, saturating addition. In other words, if the value is incremented beyond its limit, it is clamped to the limit. That means that I don’t have to worry about overflows, etc. I took a look at how this is implemented, and the compiler generates the following assembly for this code (x +| 100) : This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters add eax, 100 mov ecx, -1 cmovb eax, ecx view raw saturating.asm hosted with ❤ by GitHub This may look daunting, but I’ll break it down. First, we add the 100 to the value, as you can expect (the value is currently in the EAX register). Then we store –1 (value of 0xFFFFFFFF) in the ECX register. Finally, we use the CMOV instruction (the CMOVB in the snipper is a variant of CMOV), telling it to store ECX in EAX if the carry flag is set on the last addition instruction. For fun, this also avoids a branch in the code, which is great for performance. One of the critical functions that we need to consider here is the behavior of the pager when we are accessing rare pages once, and then never again. A really common scenario is when we are doing a scan of some data for a rare query. For that reason, the usages behavior is a bit more complex than one might imagine. Let’s explore this for a bit before moving on. Look at the following code, I marked the important lines with stars: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn markLoaded(self: *FileChunks, chunk: u64) ![]align(mem.page_size) u8 { while (true) { var copy = self.chunks[chunk]; var origin = copy; switch (copy.v.tag) { .Value => return error.ValueAlreadyExists, .Error => return error.ValueInErrorState, .Empty => return error.ValueIsNotLoading, .Loading => {}, } copy.setTag(.Value); try copy.addRef(); // ownership by the pager try copy.addRef(); // ownership by the caller // *********************** copy.setUsagesFrom(origin); // *********************** if (self.chunks[chunk].tryUpdate(origin, copy)) { return self.getLoadedChunk(chunk); } } } fn trySetToEmpty(self: *FileChunks, index: u64) !void { while (true) { var modified = self.chunks[index]; if (modified.getTag() != .Loading) { // someone else modified it while we where releasing the memory return error.ValueIsNotLoading; } var released = modified.reset(.Empty); // *********************** released.setUsages(1); // next time we load the chunk, we'll know to keep it around // *********************** if (self.chunks[index].tryUpdate(modified, released)) break; } } view raw usages.zig hosted with ❤ by GitHub When we mark a chunk as loaded, we copy the usages from the current record. That starts out as zero, so for the scenario of accessing rare pages, we’ll have a good reason to evict them soon. However, there is a twist, when we remove a chunk from memory, we also set its usage count to 1. That is an interesting issue. The chunk is not loaded, why does it have a usage count? That is because if we removed it from memory, and we load it again, we want it to start with a higher usage count (and less chance to be evicted). In this manner, we are somewhat simulating the 2Q algorithm. Now, let’s take a look at the actual reclaiming portion, shall we? In the chunk metadata, we have the following behavior: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const RelciamOptions = enum { None, ReduceUsages, Reclaim, }; pub fn reclaim(self: *ChunkMetadata) RelciamOptions { switch (self.v.tag) { .Value => { if (self.v.references == 1) { return .Reclaim; } }, .Empty => { if (self.v.usages > 0) { return .ReduceUsages; } }, else => {}, } return .None; } pub fn reduceUsages(self: *ChunkMetadata) ChunkMetadata { return ChunkMetadata{ .v = .{ .tag = self.v.tag, .version = self.v.version +% 1, .references = self.v.references, .usages = self.v.usages -| 1, } }; } view raw ChunkMetadata.usages.zig hosted with ❤ by GitHub If we have a value and no outstanding references, we can reclaim it, if we don’t have a value, we’ll reduce the usage count anyway. The idea is that when we unload a page, we’ll set the usages count to 1. The usage counter of an unloaded page will reset this as time goes by, so if there has been a sweep on an empty chunk that has a usage count, we’ll not count that for the next time we load it. Basically, after unloading a chunk, if we reload it soonish, we’ll keep it around longer the next time. But if it is only rarely loaded, we don’t care and will forget that it was loaded previously. The process for actually reclaiming a chunk is shown here: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn tryReclaimChunk(self: *FileChunks, result: *ReclaimResult) bool { var index = result.lastScannedIndex; while (true) { var copy = self.chunks[index]; switch (copy.reclaim()) { .None => return false, .ReduceUsages => { var modified = copy.reduceUsages(); if (self.chunks[index].tryUpdate(copy, modified)) return false; continue; }, .Reclaim => {}, } result.hasCandidates = true; var modified = copy.reduceUsages(); if (modified.hasUsages()) { // update the reduced usage if (self.chunks[index].tryUpdate(copy, modified)) return false; } modified = copy.reset(.Loading); // means that we own it for the duration... if (self.chunks[index].tryUpdate(copy, modified)) return true; } } view raw tryReclaimChunk.zig hosted with ❤ by GitHub We look at a particular index in the chunks, and check if we can claim that. Following out previous behavior, there are a bunch of options here. We can either have an empty chunk that remembers the previous usage, which we can reduce, or we can actually try to reclaim the chunk. Of course, that isn’t that simple, because even if we found a candidate for reclaiming, we still need to reduce its usages count. Only if it has no usages will we be able to actually start the removal process. Finally, we have the actual scanning of the chunks in the file, shown here: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const ReclaimResult = struct { lastScannedIndex: u64 = 0, reclaimed: bool = false, hasCandidates: bool = false, }; pub fn reclaim( self: *FileChunks, startIndex: u64, ) !ReclaimResult { var result = ReclaimResult{ .lastScannedIndex = if (startIndex < self.chunks.len) startIndex else 0 }; while (result.lastScannedIndex < self.chunks.len) : (result.lastScannedIndex += 1) { if (self.tryReclaimChunk(&result) == false) continue; // at this point, the current chunk is owned by us, and no one else can use it... result.reclaimedBytes += ChunkSize; _ = try os.mmap( self.getLoadedChunk(result.lastScannedIndex).ptr, ChunkSize, os.PROT.NONE, os.MAP.ANONYMOUS | os.MAP.PRIVATE | os.MAP.FIXED, -1, 0, ); try self.trySetToEmpty(result.lastScannedIndex); result.reclaimed = true; result.lastScannedIndex += 1; break; } return result; } view raw reclaim.zig hosted with ❤ by GitHub We scan the file from the provided start index and try to reclaim each chunk in turn. That may reduce the usages count, as we previously discussed. The actual process will continue until we have found a chunk to reclaim. Note that we are always claiming just a single chunk and return. This is because the process is repeatable. We start from a given index and we return the last index that we scanned. If the caller needs to free more than a single chunk, they can call us again, passing the last index that we scanned. That is why this is called the clock sweep algorithm. We are sweeping through the chunks that we have in the system, reaping them as needed. The code so far is all in the same FileChunks instance, but the Pager actually deals with multiple files. How would that work? We start by adding some configuration options to the pager, telling us how much memory we are allowed to use: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const Options = struct { capacity: usize = 0, hardMemoryLimit: usize = 0, softMemoryLimit: usize = 0, }; view raw Options.zig hosted with ❤ by GitHub We have both soft and hard limits here, because we want to give the users the ability to say “don’t use too much memory, unless you really have to”.  The problem is that otherwise, users get nervous when they see 99% memory being used and want to keep some free. The point of soft and hard limits is that this gives us more flexibility, rather than setting a lower than needed limit and getting memory errors with GBs of RAM to spare. In the Pager, we have the loadChunksToTransaction() that we looked at in the previous post. That is where we read the chunk from the file. We are going to modify this method so will reserve the memory budget before we actually allocate it, like so: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters { try self.reserveMemoryBudgetForChunk(); errdefer self.releaseChunkMemoryBudget(); // if null, someone else is loading... maybeBuffer = file.chunks.markLoading(chunkInFile) catch |err| switch (err) { error.ValueAlreadyExists => continue, // someone else is loading... else => return err, }; } view raw loadChunksToTransaction.zig hosted with ❤ by GitHub As you can see, we reserve the memory budget, then actually allocate the memory (inside markLoading()). If there is a failure, we release the budget allocation and report the error. To manage the memory budget, we need to add a few fields to the Pager: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters options: Options, usedMemory: std.atomic.Atomic(u64), filesRelcaimSweeps: [8]std.atomic.Atomic(u64), fn releaseChunkMemoryBudget(self: *Pager) void { _ = self.usedMemory.fetchSub(FileChunks.ChunkSize, .Release); } view raw Pager.zig hosted with ❤ by GitHub You can see that releaseChunkMemoryBudget() is pretty trivial. Simply release the memory budget and move on. Things are a lot more complex when we need to reserve memory budget, however. Before we dive into this, I want to talk a bit about the filesRelcaimSweeps field. That is an interesting one. This is where we’ll keep the last position that we scanned in the Pager (across all pages). However, why is that an array? The answer is simple. The Pager struct is meant to be used from multiple threads at the same time. Under memory pressure, we are likely to need to evict multiple chunks at once. In order to avoid multiple threads scanning the same range of the Pager to find chunks to remove, I decided that we’ll instead have several sweeps at the same time. On startup, we’ll initialize them to a random initial value, like so: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters var rnd = std.rand.DefaultPrng.init(@bitCast(u64, std.time.timestamp())); for (self.filesRelcaimSweeps) |*it| { rnd.fill(std.mem.asBytes(it)); } view raw init.zig hosted with ❤ by GitHub In this manner, under load, each thread is likely to scan an independent portion of the Pager’s memory, which should avoiding competing on the same memory to evict. And with that behind us, let’s see how we can actually use this to evict memory: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn reserveMemoryBudgetForChunk(self: *Pager) !void { // optimistically increment... if (self.usedMemory.fetchAdd(FileChunks.ChunkSize, .Release) < self.options.softMemoryLimit) { // simplest scenario return; } // we are *too* high, reduce the optimistic increment... _ = self.usedMemory.fetchSub(FileChunks.ChunkSize, .Release); // we need to release some memory... var pos = std.Thread.getCurrentId() % self.filesRelcaimSweeps.len; var scanStartIndex = self.filesRelcaimSweeps[pos].load(.Acquire) % (self.files.len * FileChunks.ChunksInFile); var hasCandidates = true; while (hasCandidates) { hasCandidates = false; var fileIdx = scanStartIndex / FileChunks.ChunksInFile; if (self.files[fileIdx]) |file| { var chunkInFile = scanStartIndex % FileChunks.ChunksInFile; var result = try file.chunks.reclaim(chunkInFile); _ = self.filesRelcaimSweeps[pos].fetchAdd( // update the location of the _next_ scan (result.lastScannedIndex - chunkInFile) + 1, .Release, ); if (result.reclaimed) { // we released a chunk (and reserved it too, so usedMemory remains the same) return; } if (result.hasCandidates) hasCandidates = true; } // still searching, move to the next file scanStartIndex = (scanStartIndex / FileChunks.ChunksInFile) * FileChunks.ChunksInFile + FileChunks.ChunksInFile; _ = self.filesRelcaimSweeps[pos].fetchAdd( FileChunks.ChunksInFile - (scanStartIndex % FileChunks.ChunksInFile), // move to next file... .Release, ); } // if we are here, we couldn't release any chunks var currentMem = self.usedMemory.load(.Acquire) + FileChunks.ChunkSize; if (currentMem > self.options.hardMemoryLimit) { return error.OutOfMemoryBudget; } // we are higher than the soft limit, but still okay... _ = self.usedMemory.fetchAdd(FileChunks.ChunkSize, .Release); } view raw reserveMemoryBudgetForChunk.zig hosted with ❤ by GitHub There is a lot of code here, and quite a few comments, because this is choke-full of behavior. Let’s dissect this in detail: We start by checking the memory budget and see if we are below the soft memory limits. We are doing this optimistically, but if we fail, we reset the state and then start to scan the Pager for chunks we can release. We do that by accessing one of the filesRelcaimSweeps values using the current thread id. In this way, different threads are likely to use different values and move them independently. We find the relevant file for the index and start scanning for chunks to release. We’ll stop the process when one of the following will happen: We released a chunk, in which case we are successful and can return with glory. We didn’t find a chunk, but found candidates (whose usage count is too high to discard). In the case of the second option, we’ll look for better options before getting back to them and discarding them. If we are completely unable to find anything to release, we’ll check if we exceed the hard memory limit and error, or just accept the soft limit as, well… soft limit and allocate the budget anyway. This happens as part of the process for loading chunks in the pager, so we’ll only need to release a single chunk at a time. For that reason, we remember the state so the next operation will start from where we left off. You can think about this as a giant set of hands that are scanning the range of chunks in memory as needed. There are actually a few things that we can implement that would make this faster. For example, we always scan through all the chunks in a file. We could try to maintain some data structure that will tell us which pages have usage count to consider, but that is actually complex (remember, we are concurrent). There is also another factor to consider. The ChunkMetdata is 64 bits in size, and a FilesChunk struct contains an array of 4096 such values, totaling 32KB in size. It is actually cheaper to scan through the entire array and do the relevant computation on each candidate than try to be smart about it. I think that this is it for now, this post has certainly gone for quite a while. In the next post in the series, I want to tackle writes. So far we only looked at reads, but I think we have all the relevant infrastructure at hand already, so this should be simpler.

DateTime as a Value Object

by Ardalis

posted on: January 12, 2022

Value Objects are a key part of Domain-Driven Design and domain models created by this process. However, they're not used nearly as often by…Keep Reading →

Implementing a file pager in Zig

by Oren Eini

posted on: January 11, 2022

We have finally gotten to the point where we can ask the pager for a page of data (reminder, a page in this case is 8KB of data) and get it back. Here is what this looks like: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters var page = try pager.getPage(&tx.loadedChuncks, 2, 1, tx.timeout); view raw usage.zig hosted with ❤ by GitHub There are a few things to note here. There is a new concept here, the transaction. We don’t want to have that concept at the pager level, so we just pass the relevant state to the pager to deal with. Basically, any transaction we’ll have will need to have a bit of state for the pager to deal with it. The state we need at this point is minimal, just the list of chunks that are referenced by the pager. You can also see that we provide a timeout for the getPage() call. What is that about? This leads to the second concern we need to consider. How are we expected to run this? If we call getPage() on a page (actually, a chunk containing this page) that isn’t resident in memory, we’ll need to go to the disk to read it. That can take a while, sometimes a long while. At a glance, that is one of those things that async/await was meant for. Since Zig supports async functions, that is certainly something that we can do, but it is something that I want to be cautious about. Having explicit blocking is far easier to understand and debug, at least for now. This is especially true if we’ll want to consume the pager API from a non Zig target. That leads to an interesting issue, however. If a call to getPage() can block, how can we avoid blocking the thread. In most cases, we would like to avoid blocking, after all. It would be simple to have tryGetPage() method, which will not block (but schedule the load of the page from disk), and then maybe register for a notification for that. If that sounds like async to you, that is because it is. The problem with this sort of approach is that you need to suspend execution somewhere in the middle of a transaction operation and continue when the data is loaded. Without async/await, you can’t really do that. Well, I mean, you could try, but we have a lot of experience with trying to manage state via callbacks, that isn’t really going to work for anything beyond the simplest systems (see: node.js without async/await). There is one thing that we can do that would be both simple and effective, however: we can error if the page isn’t in memory. That sounds like a pretty bad idea, no? How would that help us? Well, the concept is simple. If a transaction attempts to access a page that isn’t resident in memory, we’ll do the following operations: Schedule the chunk the page resides on to load into memory. Return an error from the pager Rollback the transaction Keep the loadedChunks for that transaction active and wait for the chunk to be loaded Re-run the transaction again, now the chunk is in memory and we can proceed further Each time that we re-run the transaction, we make sure that the chunks it needs are in memory, eventually ensuring that all the required chunks are resident and we don’t need to block. At the same time, the code to work with the transactions is not going to care about blocking, etc. We need to do the usual error handling, but that is required anyway. There is a single location where we need to deal with callbacks from the pager, so there is a limited blast radius of complexity. For write transactions, for example, this is a very reasonable strategy. We assume that there is only a single thread writing at a given time. A transaction being blocked because it needs to read a page from the disk can stall other pending transactions. By having it abort and retry later, we can keep the line moving. For read operations, on the other hand, that is likely not something that you want to do. If I’m already streaming results to the caller, I can’t just repeat the transaction. I’m not making any decisions at this point, just considering the various options and implications that we have to deal with at this early level. Now, let’s look at how the getPage() is actually implemented, shall we? This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn getPage(self: *Pager, loadedChunks: *ChunksSet, page: u64, count: u32, timeout: ?u64) ![]align(mem.page_size) u8 { var chunk = page / FileChunks.PagesInChunk; var end = (page + count) / FileChunks.PagesInChunk; // we ensure that any range is always within a single file... // we ensure that any range is always within a single file... var fileNum = page / FileChunks.PagesInFile; var file = self.files[fileNum] orelse return error.FileIsNotLoaded; try self.loadChunksToTransaction(loadedChunks, file, chunk, end, timeout); if (loadedChunks.get(chunk) != null) { // explicitly go beyond the scope of the chunk, because we know the proper memory layout // and if the size exceed a single chunk, we already loaded the next one(s) var ptr = file.chunks.ptr.ptr; var pageInFile = page % FileChunks.PagesInFile; if (@ptrToInt(ptr + ((pageInFile + count) * FileChunks.PageSize)) > file.chunks.ptrEnd) { return error.PageRequestIsOutOfBand; } var slice = ptr[(pageInFile * FileChunks.PageSize)..((pageInFile + count) * FileChunks.PageSize)]; return slice; } return error.ChunkWasNotLoaded; } view raw getPage.zig hosted with ❤ by GitHub There is a lot that is going on here, I know. We start by defining a set (a hash map using a 64 bits unsigned integer to a zero-sized value). The way this works with Zig is quite elegant, since we pay no memory cost for the values here. The majority of the work is done in the loadChunksToTransaction() function, which we’ll examine shortly, but you can see some interesting details already in getPage(). We assume that we have a page loaded, and any range of pages that we ask is always within a single page. The call to load the chunks actually puts them in the loadedChuncks argument. We verify that we loaded the chunk properly and then we create a slice to return for the caller. Note that we may request more than a single page and it is valid to ask for a range that contains multiple chunks. We validate that the range we return is within the memory range for the current file, we ensured that the chunks for a specific file are consecutive in memory, so we can safely return this pointer across multiple chunks without needing to think about it. There is another aspect of loadedChunks that we need to discuss. A transaction may use multiple pages from the same chunk, but we only need to load the chunk once. At the same time, we can avoid adding a reference to the chunk multiple times for each loaded page. When we close the transaction, we need to release the reference for these chunks, so we need to keep track of those. With that in mind, let’s see how we actually load the chunks to memory. As a reminder, we have two actors working together here. The FileChunks is used to store the chunks in memory and the PagerRing is used for parallel I/O. This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn scheduleLoadingChunks(self: *Pager, loadedChunks: *ChunksSet, file: *PagerFile, start: u64, end: u64) !void { while (start <= end) : (start += 1) { if (loadedChunks.get(start) != null) { continue; } var chunkInFile = (start % FileChunks.ChunksInFile); var maybeBuffer = file.chunks.tryGet(chunkInFile) catch |err| switch (err) { error.ValueIsLoading => continue, // someone else is loading this, will get later else => return err, }; if (maybeBuffer != null) { // a buffer exists, and we own a ref to it now, record it... try recordChunk(file, loadedChunks, start); continue; } { try self.reserveMemoryBudgetForChunk(); errdefer self.releaseChunkMemoryBudget(); // if null, someone else is loading... maybeBuffer = file.chunks.markLoading(chunkInFile) catch |err| switch (err) { error.ValueAlreadyExists => continue, // someone else is loading... else => return err, }; } if (maybeBuffer) |buffer| { // now need to load it... var state = try self.allocator.create(AsyncLoadChunkState); errdefer self.allocator.destroy(state); state.* = .{ .chunk = chunkInFile, .file = file, .parent = self }; try self.ring.submit(.{ .tag = .Read, .fd = file.fd, .buffer = buffer, .offset = chunkInFile * FileChunks.ChunkSize, .context = @ptrToInt(state), .result = .{ .bytes = 0, .err = null }, .callback = completeLoad, }); } } } fn loadChunksToTransaction(self: *Pager, loadedChunks: *ChunksSet, file: *PagerFile, start: u64, end: u64, timeout: ?u64) !void { try self.scheduleLoadingChunks(loadedChunks, file, start, end); // now we actually load the chunks to the transaction var currentGlobalChunk = start; while (currentGlobalChunk <= end) : (currentGlobalChunk += 1) { if (loadedChunks.get(currentGlobalChunk) != null) { continue; } var chunkInFile = (currentGlobalChunk % FileChunks.ChunksInFile); // we'll wait until we have the buffer _ = try file.chunks.getBlocking(chunkInFile, timeout); try recordChunk(file, loadedChunks, currentGlobalChunk); } } view raw loadChunksToTransaction.zig hosted with ❤ by GitHub That is a lot of code to throw at you, I know, let’s dissect it in detail. In this method, we are working on chunks, not pages, and we assume that we may have multiple chunks, that is why we have the while loops. We start by checking if the chunk is already loaded in the transactions’ loadedChunks. If it isn’t, we compute the position of the chunk in the file (the chunk number we get from the caller is the global one, after all) and try to get it from the FileChunks. This is where things get interesting.  When we call tryGet() for the current chunk, we may get an error because of two possible scenarios: The value is currently being loaded from the disk (some other transaction asked for it, probably). We don’t need to do anything further other than wait for it to show up. Another transaction tried to load it, but got an error. At this point we just return the error. We don’t try to do anything special here. In general, there may be a lot of errors to consider here. We may have temporary I/O issue, or run out of memory or something that is transient. Or we may have an actual problem at hand (bad sector on disk, corrupted data, etc). Regardless of what we are doing, we aren’t going to try to do any error handling here. We’ll just record the error and any future attempt to access that chunk will also error. The proper way to recover at this point is to restart the pager. This is assuming we have the other components of a database at play here. So we’ll re-run the journal files, apply recovery, etc. In short, any I/O issues like that are critical errors and require a restart of the system to come back to a known state. If the tryGet() method returned without an error, there are still two options to consider. The call may have returned a value (so we called addRef() on the chunk internally), we can simply add that to the chunks we own and move on. If there isn’t a value in memory, things start to get interesting. At this point we call markLoading(). We are basically racing to be the owners for loading this chunk. If we are successful in this race, we’ll get the buffer back from the FileChunks and can schedule reading the relevant chunk from the disk. You’ll note that we are setting the callback to completeLoad, we’ll look into that shortly. If we aren’t successful (we didn’t get a buffer back), then some other thread was able to get the buffer and will schedule the read for us, so we are done. After we either ensured that all the chunks are loaded or scheduled them to be loaded, we use getBlocking() to wait for all the relevant chunks to be available. Once that is done, we can safely return and getPage() will complete the process, as we saw earlier. The only thing that we have to look at is the completeLoad function, which is about as basic as you can get: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters fn completeLoad(work: *PagerRing.Work) void { var state = @intToPtr(*AsyncLoadChunkState, work.context); defer state.parent.allocator.destroy(state); var err: ?anyerror = null; if (work.result.err) |e| { state.file.chunks.markLoadError(state.chunk, e) catch |e2| { err = e2; }; } else { _ = state.file.chunks.markLoaded(state.chunk) catch |e3| { err = e3; }; } if (err) |e| { std.log.err("Ownership violation for chunk: {}, file: {}." ++ " Got error: {} when handling state {} ", .{ state.chunk, state.file, e, work.result.err, }); } } view raw completeLoad.zig hosted with ❤ by GitHub Most of the function is about error handling. We register either the fact that we got an error reading from the disk or that we completed the load process and maybe log something. In general, there isn’t really much that we need to do here. The act of calling markLoaded() will release any threads waiting on getBlocking(), after all. So the whole thing comes together quite nicely. With this done, we are mostly done on the reading side of the pager and this post as well. In my next post, I want to discuss how we should handle eviction of data. So far, we are just reading into memory, never releasing. We need to take care of that as well, of course. Once that is done, we can move to the wonderful topic of handling writes and durability…

2021 Year in Review

by Ardalis

posted on: January 11, 2022

Time for a recap of stats and important (and not so important) milestones from 2021, the SECOND year of the COVID-19 pandemic (yeah we…Keep Reading →

Implementing a file pager in Zig

by Oren Eini

posted on: January 10, 2022

This is my 7th post in this series, and we are now starting to get into the really interesting bits. So far we worked with the individual components, each of them doing just one thing, which makes it hard to see how they are all put together. In this post, we are going to start plugging things together. As a reminder, the whole point of this series of posts is to explore what we need to do in order to ask for a page (8KB, in our case) from the data file and work with it in memory.  We have most of everything ready, let’s put them back together. The first thing that we need to do is actually tie a file to the FileChunks structure that we previously created. This is as simple as this structure: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters const PagerFile = struct { chunks: *FileChunks, fd: std.os.fd_t, pub fn init(allocator: std.mem.Allocator, file: []const u8) !PagerFile { var chunks = try FileChunks.init(allocator); errdefer chunks.deinit(); var fd = try std.os.open( file, std.os.O.RDWR | std.os.O.CREAT | std.os.O.CLOEXEC | std.os.O.DIRECT | std.os.O.DSYNC, std.os.S.IRUSR | std.os.S.IWUSR, ); return PagerFile{ .chunks = chunks, .fd = fd, }; } pub fn deinit(self: *PagerFile) void { self.chunks.deinit(); std.os.close(self.fd); } }; view raw PagerFile.zig hosted with ❤ by GitHub We are simply initializing the FileChunks and opening the file, nothing else. Remember that a single pager is going to be responsible for multiple files. Furthermore, all the data structures that we are dealing with now are meant for concurrent use. That means that we should be prepared for this type of code: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters // on thread #1 try pager.addFile("/data/orders/data.003"); // on thread #2 try pager.getPage(&tx.owned,2034, 32, 15 * std.time.us_in_s); view raw concurrent.zig hosted with ❤ by GitHub When we use a managed language, that is actually fairly simple to work with. In an unmanaged language, those two lines of code are tough. Why is that? Let’s look at the raw data members for the Pager structure, shall we? This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub const Pager = struct { allocator: std.mem.Allocator, ring: *PagerRing, files: []PagerFile, filesCapacity: usize, pendingFree: PagerFileList, const PagerFileList = std.ArrayListUnmanaged([]PagerFile); } view raw Pager.zig hosted with ❤ by GitHub Of particular interest to us is the files member. This is the list of files that are being managed by the pager. Each one of them has a maximum size of 8GB in size. The problem is that we may have one thread accessing the list at the same time that another thread wants to increase its size. How would that work? The simplest option is that we’ll reallocate the array, but that will move it. What would the first thread be doing in that scenario? The good thing from our point of view is that we don’t need to worry about concurrent modifications. There is only a single thread that is allowed to modify the Pager’s state at any given point in time. Trying to find solutions for this problem leads into a rabid rabbit’s hole. You go into hazard pointers, epoch GC and other fun stuff. Also, the call to getPage() is one of the most important ones in a database, anything that we can do to reduce its cost will be done. As such, we can’t typically use any form of locking. A reader/writer lock can be a killer. Here is a good example for how that can happen. I thought about how to resolve this, and I decided against a generic solution, instead, let’s look at the actual behavior that we need. The files array is going to be accessed a lot, it has an entry per 8GB of disk space that we take. That means that it isn’t going to be experiencing any rapid growth. It is also only going to grow. We also need to worry only when we grow the physical backing store for this array, if we overprovision and use less than we need, that is perfectly fine. Each element in the array is 8 bytes in size, so if we allocate a single memory page (4KB) we can store 512 file references in it. That represents 4 TB(!) of data, so we can probably just accept the additional cost and allocate it and not think about it. Databases with > 4TB of disk size do exist, and we don’t want to have this artificial limit on us, do we? Instead, we can use another approach. We’ll start by allocating the array with a minimum size of 8 elements (sufficient until you get to 64GB). But what happens when we reach that size? This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn addFile(self: *Pager, file: []const u8) !void { // caller holds a write lock, we don't care about readers, they can follow // either new or old pointers and are guarantee to only access the old range if (self.files.len + 1 >= self.filesCapacity) { var nextCapacity = self.filesCapacity * 2; var newFiles = try self.allocator.alloc(PagerFile, nextCapacity); errdefer self.allocator.free(newFiles); try self.pendingFree.append(self.allocator, newFiles); std.mem.copy(PagerFile, newFiles, self.files); self.files.ptr = newFiles.ptr; self.filesCapacity = nextCapacity; } var cur = self.files.len; self.files.len += 1; self.files[cur] = try PagerFile.init(self.allocator, file); } view raw addFile.zig hosted with ❤ by GitHub What we do here is cheat. We don’t need to free the memory immediately, when we reach the limit of the size of the array, we’ll double its size, copy the data that we have and register it to be freed when we close the Pager. At that point, the caller already needs to ensure that there are no other users of the pager. Because we copy the values from the old array, but keep it around, old readers may use the old or new arrays, but we don’t actually care. The memory remains valid and accessible. In terms of wasted space, if our database went from a single file to being 128 TB in one run, we’ll have an array with 16,384 elements (whose size is 128KB). Along the way, we had to double the size of the array a dozen times and we “waste” 128KB of unused buffers. This seems like a pretty reasonable cost to significantly reduce the level of complexity of concurrent access. Using this method, we can avoid any sort of synchronization on the read side. That is certainly a plus. Here are the init() and deinit() calls for the Pager, to complete the picture: This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters Show hidden characters pub fn init(allocator: std.mem.Allocator, capacity: usize) !*Pager { var self = try allocator.create(Pager); errdefer allocator.destroy(self); self.allocator = allocator; self.filesCapacity = std.math.max(8, nextPowerOfTwo(capacity)); var files = try allocator.alloc(PagerFile, self.filesCapacity); errdefer allocator.free(files); self.pendingFree = try PagerFileList.initCapacity(allocator, 1); errdefer self.pendingFree.deinit(allocator); self.ring = try PagerRing.init(allocator); errdefer self.ring.deinit(); try self.pendingFree.append(allocator, files); self.files = files; self.files.len = 0; // no values yet... return self; } pub fn deinit(self: *Pager) void { for (self.files) |*file| { file.deinit(); } while (self.pendingFree.popOrNull()) |toFree| { self.allocator.free(toFree); } self.pendingFree.deinit(self.allocator); self.ring.deinit(); self.allocator.destroy(self); } view raw Pager.zig hosted with ❤ by GitHub As you can see, we allocate a PagerRing for the pager, which will deal with the actual I/O. The actual disposal of the files array is managed using the pendingFree list. That is a small cost to pay, to reduce the cost of adding a new file. In the deinit() routine, note that there is a distinction between releasing the files themselves (where we close the FilesChunk, release the relevant memory, close the file handle, etc) and releasing the arrays that hold the files themselves. I’m quite pleased with how this turned out, zero cost for reads (important) and negligible memory cost for most scenarios). In my next post, I’ll get started with actually reading the data from disk and putting that in the pager. So far, that is a 7 post series, and we haven’t completed the first part. That simple scenario is surprisingly tricky.