RavenDB supports a dedicated batch processing mode, using the notion of subscriptions. A subscription is simply a way to register a query with the database and have the database send the subscriber the documents that match the query.
The previous sentence is taken directly from the Inside RavenDB book, and it is a good intro for the topic. A subscription is a way to process documents that match a query. A good example might be to run various business processes as a result of data changes. Let’s assume that we have a bank, and a new customer was registered. We need to run plenty of such processes (Know Your Customer, Anti Money Laundering, Credit Score, in-house estimation, credit limits & authorization, etc).
A typical subscription query would then be:
from Customers where Onboarded = false
And then we can register to that subscription. At this point, the database will start sending us all the customers that haven’t been onboarded yet. This is a persistent query, so restarts and failures are handled properly. And the key aspect is that RavenDB will push the matching documents to the subscription worker. RavenDB will handle batching of the results, ensure that we can process humungous amount of data safely and easily and in general remove a lot of hassle from backend processing.
Up until RavenDB 5.3, however, a subscription was defined to be a singleton. In other words, at any given point, only a single subscription worker could be running. That is enforced by the server and help making it much easier to reason about processing documents. One comment that we got is that this is great, if the processing that we are doing is internal, but if there is the need to make a remote call to a potentially slow service, that can be an issue.
For example, consider the following worker code:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
worker.Run(async batch => {
using var session = batch.OpenAsyncSession();
for (var item in batch.Items)
{
Customer customer = item.Result;
customer.CreditScore = await CheckCreditScore(customer);
}
await session.SaveChangesAsync();
});
view raw
worker.cs
hosted with ❤ by GitHub
What happens when the CheckCreditScore() is slow? We are halting processing for everything. In some cases, it is only particular customers that are slow, and we absolutely want to process them in parallel. However, RavenDB did not allow that.
In RavenDB 5.3, we are bringing concurrent subscriptions to the table. When you create the subscription worker, you can define it with a Concurrent mode, like so:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
var subscription = store.Subscriptions.GetSubscriptionWorker<Customer>(
new SubscriptionWorkerOptions("NewCustomers")
{
Strategy = SubscriptionOpeningStrategy.Concurrent
});
view raw
sub.cs
hosted with ❤ by GitHub
When you have done that, RavenDB will allow multiple concurrent workers to run at the same time, processing batches in parallel. That means that a single slow customer will not halt your entire processing pipeline.
In general, I would like you to think about this flag as just removing a limitation. Previously we blocked you from an operation, and now you can run freely. However…
We didn’t decide to limit your capabilities just because we like doing that. One of the key aspects of subscriptions is that they offer reliable processing of documents. If an exception has been thrown when processing a batch, RavenDB will resend the batch to the worker again, until processing is susccessful. If we handed a batch of documents to process to a worker, and that worker crashed without letting us know, we need to make sure that the next client to connect will start processing from the last acknowledged batch.
It turns out that adding concurrency and the ability for workers to work completely independently of one another make such promises a lot harder to implement.
There is also another aspect that we have to consider. When we have just a single worker, certain concurrency issues never happen, but when we allow you to run concurrently, we have to deal with them.
Consider the subscription above, running on two workers. We handed a new customer document to Worker A, which started processing it. While Worker A is processing the document, that document has changed. That means that it needs to be processed again by the subscription. We have Worker B available and ready, but if we allow such a scenario, we risk getting a race between the workers, working on the same document.
We could punt that to the user and ask them to ensure that this is something that they handle, but that isn’t the philosophy of RavenDB. Instead, we have implemented the following behavior for concurrent subscriptions:
When the server sends a batch of documents to a worker, that worker “checks them out”. Until that worker signals the server that the batch has been either processed or failed, we’ll not send those documents out to other workers, even if they have been modified. Once a batch is acknowledged as processed, we’ll scan all the documents in that batch and see if we need to schedule them for the next batch, because they were missed while they were checked out.
That means that from the perspective of the user, they can write code knowing that only a single subscription worker will run on a given document at a time. This is a very powerful promise and can significantly simplify the complexity of building your systems. A single worker that is stalling will not prevent the other workers from making progress. There aren’t any timeouts to deal with. If you have a process that may take a long time, as long as the worker is alive and functioning (maintaining the TCP connection to the server), the server will consider the documents that the worker is processing as checked out.
Concurrent subscriptions require you to opt in, using the Concurrent flag. All workers for a subscription must agree to run in a concurrent mode. This is to ensure that there aren’t any workers that expect pure serial work model. If you aren’t setting this flag, you’ll keep getting the usual serial behavior of subscriptions. We require opting in to this behavior because we violate an important guarantee of the subscription, that you’ll process the documents in the order in which they were modified. This is now no longer the case, obviously.
The first worker to connect to a subscription will determine if it will run in concurrent mode or serial mode. Any new worker trying to run on that subscription needs to be concurrent (if the first one was concurrent) and no concurrent worker can join a subscription that has a serial worker active. This is a transient setting, it is important to note. When the last worker is shut down, the subscription state is reset, and then you can connect a worker for the first time again (which will then be able to set the mode of the subscription).
You can see in the benchmark image on the right the impact of adding concurrent workers when there is a non trivial processing time. It is important to note that the concurrent part of the concurrent subscriptions is the fact that the workers are running in parallel. We are still sending batches of documents for each worker independently and then waiting for confirmation. If you have no significant processing time for a batch, you’ll not see a significant improvement in processing time (the server side cost of processing the documents, sending the batch, etc is related to the total number of documents, and won’t be impacted).
Concurrent subscriptions are available in RavenDB 5.3 (due to be released by mid November) and will be available in the Professional and Enterprise editions of RavenDB.
Recording a short screencast video can be a very effective way to provide a demo to stakeholders or show how a bug can be reproduced. Follow…Keep Reading →
Uploading files may take times. The users want to have visibility on what's happening, so it is good to show them about the progress. The simplest way to do it is to show a progress bar. In this post, we'll use the InputFile component to upload files and some custom code to show the progress bar. T
Following my previous posts, about the use after free bug that was found in a pull request, I put a lot of effort into testing the validity of the fix. As it turned out, my code was right and the fix worked properly. However, I uncovered a race condition in the .NET MemoryCache implementation. Here is what I got when I put the system under load:Unhandled exception. System.ArgumentException: Unable to sort because the IComparer.Compare() method returns inconsistent results. Either a value does not compare equal to itself, or one value repeatedly compared to another value yields different results. IComparer: 'System.Comparison`1[Microsoft.Extensions.Caching.Memory.CacheEntry]'.
at System.Collections.Generic.ArraySortHelper`1.Sort(Span`1 keys, Comparison`1 comparer) in System.Private.CoreLib.dll:token 0x60066cd+0x1d
at System.Collections.Generic.List`1.Sort(Comparison`1 comparison) in System.Private.CoreLib.dll:token 0x600688b+0x3
at Microsoft.Extensions.Caching.Memory.MemoryCache.g__ExpirePriorityBucket|27_0(Int64& removedSize, Int64 removalSizeTarget, Func`2 computeEntrySize, List`1 entriesToRemove, List`1 priorityEntries) in Microsoft.Extensions.Caching.Memory.dll:token 0x6000061+0x21
at Microsoft.Extensions.Caching.Memory.MemoryCache.Compact(Int64 removalSizeTarget, Func`2 computeEntrySize) in Microsoft.Extensions.Caching.Memory.dll:token 0x600005b+0xff
at Microsoft.Extensions.Caching.Memory.MemoryCache.OvercapacityCompaction(MemoryCache cache) in Microsoft.Extensions.Caching.Memory.dll:token 0x6000059+0xad
at System.Threading.ThreadPoolWorkQueue.Dispatch() in System.Private.CoreLib.dll:token 0x6002b7c+0x110
at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart() in System.Private.CoreLib.dll:token 0x6002c66+0x190There are a few interesting things here. First, we can see that this killed the process, this isn’t just an error, this is an error from a background thread that ended up unhandled and killed everything. That is a nope, for server applications. The second issue is that the error is strange. What exactly is going on here?Here is the relevant piece of code that throw the error, inside the MemoryCache:priorityEntries.Sort((e1, e2) => e1.LastAccessed.CompareTo(e2.LastAccessed));This is a really interesting line, because of what it does. The priorityEntries is a local list of cache entries, which we need to sort by the last access, to figure out what we can evict. What can go wrong here?Well, the MemoryCache is a naturally concurrent instance, of course. What happens when we have an access to the entry in question? We’ll update the LastAccessed value. And if we do this just right, we may give the sort function different values for the same cache entry, leading to this problem. The bug in question was in place for as long as I can track the MemoryCache codebase. In the best case scenario, it will cause evictions of data that shouldn’t be evicted. Not a major issue, and unlikely to be noticed. But if it fail in this manner, it will kill the process, so very likely to be noticed.My test scenario had a lot of concurrency and a very small cache (leading to a lot of evictions / compactions), I’m guessing that is why I’m able to trigger this so easily.
When you use a cache, you need to take into account several factors about the cache. There are several workload patterns that can cause the cache to turn into a liability, instead of an asset. One of the most common scenarios where you can pay heavily for the cache, but not benefit much, is when you have a sequential access pattern that exceed the size of the cache.Consider the following scenario:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
var cache = new MyCache(100); // Max items - 1000
for (var i =0; i < 1_000_000; i++) {
DoSomething(cache, i % 128);
}
view raw
bad_cache.cs
hosted with ❤ by GitHub
In this case, the size is set to 100, but the keys are sequential in the range of 0 .. 127. We are basically guaranteed to never have a cache hit. What is the impact of such a cache, however?Well, it will keep the reference alive for longer, so they will end up in the Gen2. On eviction, they will take longer to be discarded. In other words, adding a cache here will increase the amount of memory that is being used, have higher CPU utilization (the GC has to do more work) and won’t add any performance benefit at all. Removing the cache, on the other hand, will reduce both memory utilization and CPU costs. This can be completely unintuitive at first glance, but it is a real scenario, and sadly something that we had experienced many times in RavenDB 3.x editions. In fact, a lot of the design of RavenDB 4.x was about fixing those kinds of issues.Whenever you design a cache, you should consider what sort of adversity you have to face. Considering your users and adversaries, intentionally trying to break your software, is a good mindset to have. You get to avoid many pitfalls this way. There are many other caching anti patterns. For example, if you are using a distributed cache, the pattern of accesses to the cache may be more expensive than reading from the source. You have many (fast) queries to answer a value, instead of one (somewhat slower) remote call. The network cost is typically huge, but discounted (see: Fallacies of Distributed Computing).But for in memory cache, it is easy to forget that a cache that is overloaded is just a memory hog, not providing very good details at all. In the previous posts, I discussed how I should use a buffer pool in conjunction with the cache. That is done because of this particular scenario, if the cache is overloaded, and we discard values, we want to at least avoid doing additional allocations.In many ways, a cache is a really complex piece of software. There has been a lot of research into it. Here are another non initiative result. Instead of using the least recently used (or least frequently used), select a value at random and evict it. Your performance is going to be faster.Why is that? Look at the code above, let’s assume that I’m evicting a random value in the 25% least frequently used items. The fact that I’m doing that randomly means that there is higher likelihood that some values will remain in the cache, even after they “should” have expired. And by the time I come back to them, they would be useful in the cache, instead of predictably evicted. In many databases, the cache management takes a huge part of the complexity. You usually have multiple levels of caches, and policies that move them between one another. I really liked this post, discussing the Postgres algorithm in great details. It also cover some aspects of nearly hostile behavior that the cache has to guard against, to avoid pathological performance drops.
After presenting the issue of how to return items to the array pool without creating a use after free bug, I asked you how you would fix that. There are several ways to try to do that, you can use reference counting scheme, or try to use locking, etc. All of those options are expensive, what is worse, they are expensive on a routine basis, not just for the free the buffer code path.Instead, I changed the way we are handling returning the buffer. Take a look at the following code:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
public class ReturnBuffer
{
public byte[] Buffer;
~ReturnBuffer()
{
if (Buffer != null)
{
ArrayPool<byte>.Shared.Return(Buffer);
Buffer = null;
}
}
}
private static ConditionalWeakTable<object, object> _joinLifetimes = new();
private void EvictionCallback(object key, object value, EvictionReason reason, object state)
{
_joinLifetimes.Add(value, new ReturnBuffer { Buffer = (byte[])value });
}
view raw
fixed.cs
hosted with ❤ by GitHub
This may require some explanation. I’m using a ConditionaWeakTable here, that was added to the runtime to enable dynamic properties on objects. Basically, it creates a table that you can lookup by an object to get a key. The most important feature is that the runtime ensures that the associated reference lifetime match the key object lifetime. In other words, when we add the buffer in the eviction callback, we ensure that the ReturnBuffer we register will live at least as long as the buffer.That means that we can let the GC do the verification job. We’ll now return the buffer back to the pool only after the GC has ensured that there are no outstanding references to it. Not a lot of code, and an elegant solution. This also ensures that we are only paying the code on eviction (likely rare), and not all the time.
If you're writing blog posts, GitHub content, and/or Stack Over flow questions and answers using Markdown, it's often helpful to show code…Keep Reading →
In my previous post, I discussed a bug that brought up in code review, that bug made me go into near panic mode. Here is the issue:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
public byte[] ComputeHash(string file)
{
retyrn (byte[])_cache.Get(file) ?? ComputeHashAndPutInCache(file);
}
private byte[] ComputeHashAndPutInCache(string file)
{
byte[] hash = ArrayPool<byte>.Shared.Rent(32);
HashTheFile(file, hash); // actual work that we are caching
_cache.Set(file, hash, new MemoryCacheEntryOptions
{
Size = 32,
PostEvictionCallbacks =
{
new PostEvictionCallbackRegistration
{
EvictionCallback = EvictionCallback
}
}
});
return hash;
}
private void EvictionCallback(object key, object value, EvictionReason reason, object state)
{
ArrayPool<byte>.Shared.Return((byte[])value);
}
view raw
bug.cs
hosted with ❤ by GitHub
In order to understand this bug, you have to take into account multiple concurrent threads at the same time. Look at the ComputeHashAndPutInCache() method, where we register an eviction callback for the item in the cache. When we evict the item, we return the buffer to the buffer pool.
We want to avoid allocating memory, so that is certainly something desirable, no? However, consider what happens if I have a thread in ComputeHash(), getting a value from the cache. Before I have a chance to look at the value, however, the cache will decide to evict it. At which point the eviction callback will run.
We’ll return the buffer back to the buffer pool, where it may be used again by something else. I am also using this buffer to do other things from the caller of the ComputeHash() call. This is a somewhat convoluted use after free issue, basically.
And I find is super scary bug because of its affects:
Randomly, and rarely, some buffer will contain the wrong data, leading to wrong results (but hard to track it down).
Trying to find such a bug after the fact, however, is nearly impossible.
Most of the debugging techniques (repeating the operation for a particular value) will make it go away (the cache will keep the value and not evict it).
In short, this is a recipe for a really nasty debug session and an impossible to resolve bug. All from code that looks very much like an innocent bystander.
Now, I can obviously fix it by not using the array pool, but that may cause me to allocate more memory than I should. How would you approach fixing this issue?