Tuesday, February 17, 2015

Thread pools, event dispatching, and avoiding sychronization

Writing ad hoc threaded code is almost always more difficult than it is worth. You might be able to get it to work, but the next guy in your code is probably going to break it by accident, or find it so hard to deal with, they will want to rewrite it. So you want some kind of simplifying policy about how to develop threaded code. It needs to perform well (the primary reason for doing threading in the first place, other than dealing with blocking), require little mental overhead or boiler plate code, and ideally be able to be used by more junior developers than yourself.

Functional parallelism only extracts a small fraction of available performance, at least in larger, "interesting" problems. Data parallelism is often better at extracting more performance. In the case of large online game servers, the ideal thing to parallelize across are game entities. There are other types of entities that we can generalize to, as well, once this is working.

Here's what I've come up with...

Generally, you want to avoid ticking entities as fast as possible. While this makes a certain amount of sense on the client (one tick per graphics frame) to provide the smoothest visual experience, on the server it will consume 100% of the CPU without significantly improving the player's experience. One big downside is that you can't tell when you need to add more CPUs to your cluster. So I prefer event driven models of computing. Schedule an event to occur when there is something necessary to do. In some cases, you may want to have periodic events, but decide what rate is appropriate, and schedule only those. In this way, you can easily see the intrinsic load on a machine rise as more work (entities) are being handled. So the core assumption is: we have a collection of entities, handling a stream of events.

We built an event dispatcher. It is responsible for distributing events (that usually arrive as messages) to their target entities. I've discussed a number of means to route those messages and events in other posts. When an event is available, the system has the entity consume the event, and runs an event handler designed for that type of event.

You don't want to spawn a thread per entity. There could be thousands of entities per processor, and that would cause inefficient context switching, and bloat memory use for all those threads sitting around when only a few can run at a time. You also don't want to create and destroy threads every time an entity is created or destroyed. Instead, you want to be in control of the number of threads spawned regardless of the workload. That allows you to tune the number of threads for maximum performance and adjust to the current state of affairs in the data center. You create a thread pool ahead of time, then map the work to those threads.

It would be easy enough to use an Executor (java), and Runnables. But this leads to an unwanted problem. If there are multiple events scheduled for a single entity, this naive arrangement might map two events for the same target to different threads. Consequently, you would have to put in a full set of concurrency controls, guarding all data structures that were shared, including the entity itself. This would counteract the effect of running on two threads, and make the arrangement worthless.

Instead, what I did was to create a separate event queue per entity, and make a custom event dispatcher that pulled only a single event off the queue. The Runnable and Executor in this better set up are managing entities (ones with one or more events on their private queues). Any entity can be chosen to run, but each entity will only work on a single event at a time. This makes more efficient use of the threads in the pool (no blocking between them), and obviates the need for concurrency control (other than in the queues). Event handlers can now be written as if they are single threaded. This makes game logic development easier for more junior programmers, and for those less familiar with server code or with Java (e.g. client programmers).

Obviously, this applies to data that is owned by a single entity. Anything shared between entities still needs to be guarded. In general, you want to avoid that kind of sharing between entities, especially if you have a large scale game where entities might be mapped across multiple machines. You can't share data between two entities if they are in different processes on different machines (well there is one way, but it requires fancy hardware or device drivers, but that is another story). So you will be building for the distributed case anyway. Why not do it the same way when the entities are local?

I found a number of posts about executors and event dispatching that touched on these ideas, but there was nothing official, and there seemed to be a lot of debate about good ways to do it. I'm here to say it worked great. I'd love to post the code for this. Again, maybe when Quantum is available, you'll see some of this.

Monday, February 16, 2015

Dynamic Serialization of Messages

Call it message marshalling, streaming, or serialization, you'll have to convert from a message data structure to a series of bytes and back, if you want to send a message between two processes. You might cast the message pointer to a byte pointer, take the size and slam the bytes into a the socket, but you won't be able to deal with different cpu architectures (byte ordering), there may be hidden padding between fields, or unwanted fields, there may be tighter representations on the wire than you have in memory. There are many reasons to have some kind of serialization system. The following version has some of the best trade offs I've come across. It was partially inspired by simple json parsers like LitJSON, and partially by a library called MessagePack, which discover the fields of a data structure automatically.

So I kind of hate the boiler plate and manual drudgery of ToStream/FromStream functions added to each message subclass. It always seemed like there should be an automatic way of implementing that code. Google Protocol Buffers, or Thrift and others make you specify your data structures in a separate language, then run a compiler to generate the message classes and serialization code. That always seemed clumsy to me, and was extra work to deal with excluding the generated files from source control, more custom stuff in your maven, makefile, or ms proj files. Plus I always think of the messages as *my* code, not something you generate. These are personal preferences that led to the energy needed to come up with the idea, not necessarily full justification for what resulted. In the end, the continued justification is that it is super easy to maintain, and has a very desirable side effect of fixing a long standing problem of mismatching protocols between client and server. So here's the outline (maybe I'll post the code as part of the Quantum system some day).

We have a large number of message classes, but are lazy, and don't want to write serialization code, and we always had bugs where the server's version of the class didn't match the client's. The server is in Java, and the client is in C#. Maybe the byte order of the client and server are different. Ideally, we could say: Send(m), where m is a pointer to any object, and the message is just sent. Here's how:
- Use introspection (Java calls it Reflection), to determine the most derived class of m, if you have a reflection based serializer constructed for that type, use it, else create one.
- To create one, walk each field of the type, and construct a ReaderWriter instance for that type, appending it to list for the message type. Do this recursively. ReaderWriter classes are created for each atomic type, and can be created for any custom type (like lists, dictionaries, or application classes that you think you can serialize more efficiently). Cache the result so you only do this once. You may want to do this for all message types ahead of time, but that is optional. You could do it on the first call to Send(m)
- As you send each message, find its serializer, and hand the message instance in. The system will walk the list of ReaderWriters and will serialize the whole message. To make this work, the ReaderWriter classes must use reflection to access the field (by reading during send, and by writing during message arrival). This is pretty easy to do in Java, and C#, or interpretted languages like Python, Lua and Ruby. C++ would be a special case where code generation or template and macro tricks would be needed. Or good old fashioned boiler plate. Sigh.
- As a message arrives, you allocate an instance of the type (again, using reflection), then look up the serializer, and fill it in from the byte buffer coming in.

This works fine across languages and architectures. There is a small performance hit in reading or setting the fields using reflection, but it is not too bad, since you only scan the type once at startup, and you keep the accessor classes around so you don't have to recreate anything for each message.

Once you have all this metadata about each message type, it is easy to see how you can make a checksum that will change any time you modify the message class definition. That checksum can be sent when the client first connects to the server. If you connect to an old build, you will get a warning or be disconnected. It can include enough detail that the warning will tell you exactly which class doesn't match, and which field is wrong. You may not want this debugging info in your shipping product (why make it easy for hackers by giving them your protocol description), but the checksums could be retained. The checksum would include the message field names, and types, so any change will trigger a warning. We chose to sort the field serialization alphabetically, and ignored capitalization. That way differences in field order on the client and server didn't matter, and capitalization differences due to language naming conventions were ignored. And atomic types were mapped appropriately.

Another consideration was to delay deserialization as long as possible. That way intermediate processes (like an Edge Server) didn't have to have every message class compiled in. Message buffers could be forwarded as byte buffers without having to pay deserialization/serialization costs. This also allowed deserialization to occur on the target thread of a multi-threaded event handling system.

One necessary code overhead in this system is that the class type of each arriving message has to be pre-registered with the system, otherwise we can't determine which constructor to run with reflection, and we don't know which reflection based serializer to use (or construct, if this is the first use of it since the app started). We need a mapping between the message type identifier in the message header, and the run time type. This registration code allows message types to be compact (an enum, or integer), instead of using the type name as a string (which could be used for reflection lookup, but seemed too much overhead per message send). It has a nice side effect, which is we know every type that is going to be used, so the protocol checking system can make all the checksums ahead of time and verify them when a client makes a connection (instead of waiting for the first instance of each message to detect the mismatch). We might have been able to make use of message handler registration to deduce this message type list, and ignore any messages that arrived that had no handlers.

Some of these features exist in competing libraries, but not all of them were available when we built our system. For example, MessagePack didn't have checksums to validate type matching.