One of the high costs that we have right now in my Redis Clone is strings. That is actually a bit misleading, take a look here:
Strings take 12.57% of the runtime, but there is also the GC Wait, where we need to cleanup after them. That means that the manner in which we are working is pretty inefficient.
Our test scenario right now also involves solely GET and SET requests, there are no deletions, expirations, etc. I mention that because we need to consider what we’ll replace the strings with.
The simplest option is to replace that with a byte array, but that is still managed memory and incurs the costs associated with GC. We can pool those byte arrays, but then we have an important question to answer, how do we know when a buffer is no longer used?
Consider the following set of events:
Time
Thread #1
Thread #2
1
SET abc
2
GET abc
3
SET abc
4
Use the buffer we got on #2
In this case, we have thread #2 accessing the value buffer, but we replaced that buffer. We need to let thread #2 keep using this buffer until it is done.
This little tidbit put us right back at concurrent manual memory management, which is scary. We can do things in a slightly different manner, however. We can take advantage of the GC to support us, like so:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
public class ReusableBuffer
{
public byte[] Buffer;
public int Length;
public Span<byte> Span => new Span<byte>(Buffer, 0, Length);
public ReusableBuffer(byte[] buffer, int length)
{
Buffer = buffer;
Length = length;
}
public override bool Equals(object? obj)
{
if (obj is not ReusableBuffer o)
return false;
return o.Span.SequenceEqual(Span);
}
public override int GetHashCode()
{
var hc = new HashCode();
hc.AddBytes(Span);
return hc.ToHashCode();
}
~ReusableBuffer()
{
ArrayPool<byte>.Shared.Return(Buffer);
}
}
view raw
ReusableBuffer.cs
hosted with ❤ by GitHub
The idea is pretty simple. We have a class that holds a buffer, and when the GC notices that it is no longer in use, it will add its buffer back to the pool. The idea is that we rely on the GC to resolve this (really hard) problem for us. The fact that this moves the cost to the finalizer means that we can not worry about this. Otherwise, you have to jump through a lot of hoops.
The ReusableBuffer class also implements GetHashCode() / Equals() which allow us to use it as a key in the dictionary.
Now that we have the backing store for keys and values, let’s see how we can read & write from the network. I’m going to go back to the ConcurrentDictionary implementation for now, so I’ll handle only a single concept at a time.
Before, we used StreamReader / StreamWriter to do the work, now we’ll use PipeReader / PipeWriter from System.IO.PIpelines. That will allow us to easily work with the raw bytes directly and it is meant for high performance scenarios.
I wrote the code twice, once using the reusable buffer model and once using PIpeReader / PipeWriter and allocating strings. I was surprised to see that my fancy reusable buffers were within 1% performance of the (much simpler) strings implementation. That is 1% in the wrong direction, by the way.
On my machine, the buffer based system was 165K ops/second while the strings based one was 166K ops/sec.
Here is the reusable buffer based approach complete source code. And to compare, here is the string based one. The string based one is about 50% shorter in terms of lines of code.
I’m guessing that the allocation pattern is really good for the kind of heuristics that the GC does. We either have long term objects (in the cache) or very short term ones.
It’s worth pointing out that the actual parsing of the commands from the network isn’t using strings. Only the actual keys and values are actually translated to strings. The rest I’m doing using raw bytes.
Here is what the code looks like for the string version under the profiler:
And here is the same thing using the reusable buffer:
There are a few interesting things to note here. The cost of ExecCommand is almost twice as high as the previous attempt. Digging deeper, I believe that the fault is here:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
var buffer = ArrayPool<byte>.Shared.Rent((int)cmds[2].Length);
cmds[2].CopyTo(buffer);
var val = new ReusableBuffer(buffer, (int)cmds[2].Length);
Item newItem;
ReusableBuffer key;
if (_state.TryGetValue(_reusable, out var item))
{
// can reuse key buffer
newItem = new Item(item.Key, val);
key = item.Key;
}
else
{
var keyBuffer = ArrayPool<byte>.Shared.Rent((int)cmds[1].Length);
cmds[1].CopyTo(keyBuffer);
key = new ReusableBuffer(keyBuffer, (int)cmds[1].Length);
newItem = new Item(key, val);
}
_state[key] = newItem;
WriteMissing();
view raw
ExecCommand.Set.cs
hosted with ❤ by GitHub
This is the piece of code that is responsible for setting an item in the dictionary. However, note that we are doing a read for every write? The idea here is that if we have a set on an existing item, we can avoid allocating the buffer for the key again, and reuse it.
However, that piece of code is in the critical path for this benchmark and it is quite costly. I changed it to do the allocations always, and we got a fairly consistent 1% – 3% faster than the string version. Here is what this looks like:
In other words, here is the current performance table (under the profiler):
1.57 ms - String based
1.79 ms - Reusable buffer based (reduce memory usage)
1.04 ms - Reusable buffer (optimized lookup)
All of those numbers are under the profiler, and on my development machine. Let’s see what we get when I’m running them on the production instances, shall we?
String based – 1,602,728.75 ops/sec
Reusable buffer (with reducing memory code) – 1,866,676.53 ops/sec
Reusable buffer (optimized lookup) – 1,756,930.64
Those results do not match with what we see in my development machine. The likely reason is that the amount of operations is high enough and the load is sufficiently big that we are seeing a much bigger impact from the memory optimization at scale.
That is the only conclusion I can draw from the fact that the memory reduction code, which adds costs, is actually able to process more requests/seconds under such load.
This post is part of the series 'SIMD'. Be sure to check out the rest of the blog posts of the series!Faster Guid comparisons using Vectors (SIMD) in .NET (this post)Finding the maximum value in an array using vectorizationReplace characters in a string using VectorizationComparing Guids in .NET is
GitHub Issues offer a simpler approach to work item management than many other systems like Jira or Azure DevOps. Despite being lightweight…Keep Reading →
Now that I’m done with the low hanging fruits, I decided to shift the Redis implementation to use System.IO.Pipelines. That is a high performance I/O API that is meant specifically for servers that need to eke out all the performance out of the system.
The API is a bit different, but it follows a very logical pattern and makes a lot of sense. Here is the main loop of handling commands from a client:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
public async Task HandleConnection()
{
while (true)
{
var result = await _netReader.ReadAsync();
var (consumed, examined) = ParseNetworkData(result);
_netReader.AdvanceTo(consumed, examined);
await _netWriter.FlushAsync();
}
}
view raw
server.cs
hosted with ❤ by GitHub
The idea is that we get a buffer from the network, we read everything (including pipelined commands) and then we flush to the client. The more interesting things happen when we start processing the actual commands, because now we aren’t utilizing StreamReader but PipeReader. So we are working at the level of bytes, not strings.
Here is what this (roughly) looks like, I’m not showing the whole thing because I want to focus on the issue that I ran into:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
(SequencePosition Consumed, SequencePosition Examined) ParseNetworkData(ReadResult result)
{
var reader = new SequenceReader<byte>(result.Buffer);
while (true)
{
_cmds.Clear();
if (reader.TryReadTo(out ReadOnlySpan<byte> line, (byte)'\n') == false)
return (reader.Consumed, reader.Position);
if (line.Length == 0 || line[0] != '*' || line[line.Length - 1] != '\r')
ThrowBadBuffer(result.Buffer);
if (Utf8Parser.TryParse(line.Slice(1), out int argc, out int bytesConsumed) == false ||
bytesConsumed + 2 != line.Length) // account for the * and \r
ThrowBadBuffer(result.Buffer);
for (int i = 0; i < argc; i++)
{
// **** redacted - reading cmd to _cmds buffer
}
ExecCommand(_cmds);
}
}
view raw
parser.cs
hosted with ❤ by GitHub
The code is reading from the buffer, parsing the Redis format and then executing the commands. It supports multiple commands in the same buffer (pipelining) and it has absolutely atrocious performance.
Yes, the super speedy API that is significantly harder to get right (compared to the ease of working with strings) is far slower. And by far slower I mean the following, on my development machine:
The previous version clocks at around 126,017.72 operations per second.
This version clocks at less than 100 operations per second.
Yes, you read that right, less than one hundred operations per second compared to over hundred thousands for the unoptimized version.
That was… surprising, as you can imagine.
I actually wrote the implementation twice, using different approaches, trying to figure out what I was doing wrong. Surely, it can’t be that bad.
I took a look at the profiler output, to try to figure out what is going on:
It says, quite clearly, that the implementation is super bad, no? Except, that this is what you are supposed to be using. So what is going on?
The underlying problem is actually fairly simple and relates to how the Pipelines API achieves its performance. Instead of doing small calls, you are expected to get a buffer and process that. Once you are done processing the buffer you can indicate what amount of data you consumed, and then you can issue another call.
However, there is a difference between consumed data and examined data. Consider the following data:
*3
$3
SET
$15
memtier-2818567
$256
xxxxxxxxxx ... xxxxxx
*2
$3
GET
$15
memtier-7689405
*2
$3
GET
$15memt
What you can see here is a pipelined command, with 335 bytes in the buffer. We’ll process all of those commands in a single hit, except… look at the highlighted portion. What do we have there?
We have a partial command. In other words, we are expected to execute a GET with a key size of 15 bytes, but we only have the first 4 bytes here. That is actually expected and fine. We consumed all the bytes until the highlighted portion (thus letting the PipeReader know that we are done with them). The problem is that when we issue a call now, we’ll get the highlighted portion (which we didn’t consume), but we aren’t ready to process that. Data is missing. We indicate that to the PipeReader using the examined portion. So the PipeReader knows that it needs to read more from the network.
However… my code has a subtle bug. It will report that it examined the yellow highlight, not the green one. In other words, we tell the PipeReader that we consumed some portion of the buffer, and examined some more, but there are still bytes on the buffer that are neither consumed nor examined. That means that when we issue the read call, expecting to get data from the network, we’ll actually get the same buffer again, to do the exact same processing.
Eventually, we’ll have more data in the buffer from the other side, so the correctness of the solution isn’t impacted. But it will kill your performance.
The fix is really simple, we need to tell the PipeReader that we examined the entire buffer, so it will not do a busy wait and wait for more data from the network. Here is the bug fix:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
9c9
< return (reader.Consumed, reader.Position);
---
> return (reader.Consumed, result.Buffer.End);
view raw
fix.diff
hosted with ❤ by GitHub
With that change in place, we can hit 187,104.21 operations per second! That is 50% better, which is awesome. I haven’t profiled things yet properly, because I also want to address another issue, how are we going to deal with the data from the network. More on that in my next post.
.NET provides multiple APIs to send http requests. You can use the HttpClient class and the obsolete HttpWebRequest and WebClient classes. Also, you may use libraries that send requests out of your control. So, you need to use the hooks provided by .NET to observe all http requests.Program.cs (C#)c
In this post, I thought it might be fun to create the world’s (nearly) shortest C# program and then deep dive into some of the fine details of what happens behind the scenes. This post is not intended to solve a real-world problem but I hope it’s well worth your time spent reading it. By […]
I’m inordinately fond of the Fallacies of Distributed Computing, these are a set of common (false) assumptions that people make when building distributed systems, to their sorrow.
Today I want to talk about one of those fallacies:
There is one administrator.
I like to add the term competent in there as well.
A pretty significant amount of time in the development of RavenDB was dedicated to addressing that issue. For example, RavenDB has a lot of code and behavior around externalizing metrics. Both its own and the underlying system.
That is a duplication of effort, surely. Let’s consider the simplest stuff, such as CPU, memory and I/O resource utilization. RavenDB makes sure to track those values, plot them in the user interface and expose that to external monitoring systems.
All of those have better metrics sources. You can ask the OS directly about those details, and it will likely give you far better answers (with more details) than RavenDB can.
There have been numerous times where detailed monitoring from the systems that RavenDB runs on was the thing that allowed us to figure out what is going on. Having the underlying hardware tell us in detail about its status is wonderful. Plug that into a monitoring system so you can see trends and I’m overjoyed.
So why did we bother investing all this effort to add support for this to RavenDB? We would rather have the source data, not whatever we expose outside. RavenDB runs on a wide variety of hardware and software systems. By necessity, whatever we can provide is only a partial view.
The answer to that is that we cannot assume that the administrator has set up such monitoring. Nor can we assume that they are able to.
For example, the system may be running on a container in an environment where the people we talk to have no actual access to the host machine to pull production details.
Having a significant investment in self-contained set of diagnostics means that we aren’t limited to whatever the admin has set up (and has the permissions to view) but have a consistent experience digging into issues.
And since we have our own self contained diagnostics, we can push them out to create a debug package for offline analysis or even take active actions in response to the state of the system.
If we were relying on external monitoring, we would need to integrate that, each and every time. The amount of work (and quality of the result) in such an endeavor is huge.
We build RavenDB to last in production, and part of that is that it needs to be able to survive even outside of the hothouse environment.
After achieving 1.25 million ops/sec, I decided to see what would happen if I would change the code to support pipelining. That ended up being quite involved, because I needed to both keep track of all the incoming work as well as send the work to multiple locations. The code itself is garbage, in my opinion. It is worth it only as far as it points me inthe right direction in terms of the overall architecture. You can read it below, but it is a bit complex. We read from the client as much as we are able, then we send it to each of the dedicated threads to run it.
In terms of performance, it is actually slower than the previous iteration (by about 20%!), but it serves a very important aspect, it makes it easy to tell where the costs are.
Take a look at the following profiler result:
You can see that we are spending a lot of time in I/O and in string processing. The GC time is also quite significant.
Conversely, when we actually process the commands from the clients, we are spending most of the time simply idling.
I want to tackle this in stages. The first part is to stop using strings all over the place. The next stage after that will likely be to change the I/O model.
For now, here is where we stand:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Text;
using System.Threading.Channels;
var listener = new TcpListener(System.Net.IPAddress.Any, 6379);
listener.Start();
ShardedDictionary _state = new(Environment.ProcessorCount / 2);
while (true)
{
var tcp = listener.AcceptTcpClient();
var stream = tcp.GetStream();
var client = new Client(tcp, new StreamReader(stream), new StreamWriter(stream)
{
AutoFlush = true
}, _state);
var _ = client.ReadAsync();
}
class Client
{
public readonly TcpClient Tcp;
public readonly StreamReader Reader;
public readonly StreamWriter Writer;
public readonly ShardedDictionary Dic;
public struct Command
{
public string Key;
public string? Value;
public bool Completed;
}
private List<string> _args = new();
private Task<string?> _nextLine;
private Command[] _commands = Array.Empty<Command>();
private int _commandsLength = 0;
private StringBuilder _buffer = new();
private int _shardFactor;
public Client(TcpClient tcp, StreamReader reader, StreamWriter writer, ShardedDictionary dic)
{
Tcp = tcp;
Reader = reader;
Writer = writer;
Dic = dic;
_shardFactor = dic.Factor;
}
public async Task ReadAsync()
{
try
{
while (true)
{
if (_buffer.Length != 0)
{
await Writer.WriteAsync(_buffer);
_buffer.Length = 0;
}
var lineTask = _nextLine ?? Reader.ReadLineAsync();
if (lineTask.IsCompleted == false)
{
if (_commandsLength != 0)
{
_nextLine = lineTask;
Dic.Enqueue(this, Math.Abs(_commands[0].Key.GetHashCode()) % _shardFactor);
return;
}
}
var line = await lineTask;
_nextLine = null;
if (line == null)
{
using (Tcp) // done reading...
{
return;
}
}
await ReadCommand(line);
AddCommand();
}
}
catch (Exception e)
{
await HandleError(e);
}
}
private async Task ReadCommand(string line)
{
_args.Clear();
if (line[0] != '*')
throw new InvalidDataException("Cannot understand arg batch: " + line);
var argsv = int.Parse(line.Substring(1));
for (int i = 0; i < argsv; i++)
{
line = await Reader.ReadLineAsync() ?? string.Empty;
if (line[0] != '$')
throw new InvalidDataException("Cannot understand arg length: " + line);
var argLen = int.Parse(line.Substring(1));
line = await Reader.ReadLineAsync() ?? string.Empty;
if (line.Length != argLen)
throw new InvalidDataException("Wrong arg length expected " + argLen + " got: " + line);
_args.Add(line);
}
}
private void AddCommand()
{
if (_commandsLength >= _commands.Length)
{
Array.Resize(ref _commands, _commands.Length + 8);
}
ref Command cmd = ref _commands[_commandsLength++];
cmd.Completed = false;
switch (_args[0])
{
case "GET":
cmd.Key = _args[1];
cmd.Value = null;
break;
case "SET":
cmd.Key = _args[1];
cmd.Value = _args[2];
break;
default:
throw new ArgumentOutOfRangeException("Unknown command: " + _args[0]);
}
}
public async Task NextAsync()
{
try
{
WriteToBuffer();
await ReadAsync();
}
catch (Exception e)
{
await HandleError(e);
}
}
private void WriteToBuffer()
{
for (int i = 0; i < _commandsLength; i++)
{
ref Command cmd = ref _commands[i];
if (cmd.Value == null)
{
_buffer.Append("$-1\r\n");
}
else
{
_buffer.Append($"${cmd.Value.Length}\r\n{cmd.Value}\r\n");
}
}
_commandsLength = 0;
}
public async Task HandleError(Exception e)
{
using (Tcp)
{
try
{
string? line;
var errReader = new StringReader(e.ToString());
while ((line = errReader.ReadLine()) != null)
{
await Writer.WriteAsync("-");
await Writer.WriteLineAsync(line);
}
await Writer.FlushAsync();
}
catch (Exception)
{
// nothing we can do
}
}
}
internal void Execute(Dictionary<string, string> localDic, int index)
{
int? next = null;
for (int i = 0; i < _commandsLength; i++)
{
ref var cmd = ref _commands[i];
var cur = Math.Abs(cmd.Key.GetHashCode()) % _shardFactor;
if (cur == index) // match
{
cmd.Completed = true;
if (cmd.Value != null)
{
localDic[cmd.Key] = cmd.Value;
}
else
{
localDic.TryGetValue(cmd.Key, out cmd.Value);
}
}
else if (cmd.Completed == false)
{
next = cur;
}
}
if (next != null)
{
Dic.Enqueue(this, next.Value);
}
else
{
_ = NextAsync();
}
}
}
class ShardedDictionary
{
Dictionary<string, string>[] _dics;
BlockingCollection<Client>[] _workers;
public int Factor => _dics.Length;
public ShardedDictionary(int shardingFactor)
{
_dics = new Dictionary<string, string>[shardingFactor];
_workers = new BlockingCollection<Client>[shardingFactor];
for (int i = 0; i < shardingFactor; i++)
{
var dic = new Dictionary<string, string>();
var worker = new BlockingCollection<Client>();
_dics[i] = dic;
_workers[i] = worker;
var index = i;
// readers
new Thread(() =>
{
ExecWorker(dic, index, worker);
})
{
IsBackground = true,
}.Start();
}
}
private static void ExecWorker(Dictionary<string, string> dic, int index, BlockingCollection<Client> worker)
{
while (true)
{
worker.Take().Execute(dic, index);
}
}
public void Enqueue(Client c, int index)
{
_workers[index].Add(c);
}
}
view raw
Redis.3.cs
hosted with ❤ by GitHub