Contents

Kafka Protobuf Deserializer

Protocol buffers (aka. protobuf for short) is very efficient serialization format. Usually implementations provided by library maintainers (i.e. Protobuf ports for various frameworks) are optimal and are only getting better with time. Recently I needed to use Confluent Kafka’s implementation of protobuf deserializer and to my astonishment, deserialization operation is not optimal. Deserialization operation the one that my project needed the most as we were producing messages in off-work times but main system is consuming them in high-traffic periods. Let’s see how we might remedy this issue.

This article is about Apache Kafka and Confluent’s implementation of Protocol Buffers deserializer. It’s not endorsed nor advertised by any of them. This is written for pure educational purposes. Copyrighted names, technologies and symbols belong to appropriate entities.

Deserializer

Luckily for us, Nuget for Confluent.Kafka Confluent.Kafka package is open source and can be found here. Indeed, upon inspection of ProtobufDeserializer.cs we can suspect that our culprit are:

  • data.ToArray();
  • using (var stream = new MemoryStream(array))
  • using (var reader = new BinaryReader(stream))

These design decision were probably caused by the fact that there was no convenient and official API for reading data from Span/ReadOnlySpan. There is few unofficial ones, you can read about my approach here.

Fortunately for us, Confluent made Kafka to be really nicely composable so we can provide our own deserializer. The only thing we need to read upon deserialization are:

  1. Magic byte (zero) to indicate a message with Confluent Platform framing
  2. 4 byte schema ID. Since this is not needed as we know deserialized type’s metadata (this can be taken from generic type) - we can safely ignore this portion of data (but stream pointer needs to move)
  3. Unsigned/Signed variable-length encoded integers (Varint) that denotes array length - and then subsequently reading these indices
  4. Reading payload itself - this can be delegated to underlying Protobuf library (by Google in my/Confluent’s case)

So our Deserialize method can look like that:

public T? Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
    if (isNull) return null;

    if (data.Length < 6)
        throw new InvalidDataException(
            "Expecting data framing of length 6 bytes or more but total data size is " +
            $"{data.Length} bytes");

    var spanReader = new SpanBinaryReader(data);

    var magicByte = spanReader.ReadByte();
    if (magicByte != MagicByte)
        throw new InvalidDataException(
            $"Expecting message {context.Component} with Confluent Schema Registry framing." +
            $"Magic byte was {magicByte}, expecting {MagicByte}");
    
    // A schema is not required to deserialize protobuf messages since the serialized data includes 
    // tag and type information, which is enough for the IMessage<T> implementation to deserialize 
    // the data (even if the schema has evolved). 
    // Schema Id is thus unused. Just advancing by 4 bytes is enough    
    spanReader.Seek(4); //var _schemaId = IPAddress.NetworkToHostOrder(spanReader.ReadInt32());
    
    // Read the index array length, then all of the indices. These are not needed, 
    // but parsing them is the easiest way to seek to the start of the serialized data
    // because they are varints.
    var indicesLength = _useDeprecatedFormat 
        ? (int)spanReader.ReadUnsignedVarint() 
        : spanReader.ReadVarint();

    for (int i = 0; i < indicesLength; ++i)
        if (_useDeprecatedFormat)
            spanReader.ReadUnsignedVarint();
        else
            spanReader.ReadVarint();
    
    return _parser.ParseFrom(spanReader.Remaining());
}

Full implementation can be found here. During optimizations I’ve noticed that Confluent’s implementation only implementsIAsyncDeserializer<>whereas implementingIDeserializer<>should be sufficient - we are not doing any async work there.

Benchmarks

Unit benchmarks

This benchmark tests what the performance and memory footprint of every approach are. Full results are located in the same file, let me just present excerpt for problem size equal to 10 (pay no attention to NonAlloc* benchmarks, they we just there for tests):

MethodMean [ns]ErrorRatioGen 0Gen 1Allocated [B]
Confluent12087.8117.931.008.27790.167852003
EfficientAsync4995.752.320.410.9689-6080
EfficientSync4838.480.180.400.8545-5360

Especially after presenting same data on interactive chart:

one can clearly see the trend that Confluent’s implementation is adding (unnecessary) allocations and partially due to that they are significantly slower. Moreover, my synchronous version is only slightly (“negligibly”) faster than my async counterpart. But since there is really no point of using async here - synchronous deserializer might be our variant of choice.

Full operation benchmarks

Ok we see where this is going. Performance benefits are visible but one can argue that they might not be significant. After all, Kafka internally allocates a lot of things (message itself, TopicPartitionOffset, ConsumeResult etc.). They all clearly would dwarf performance benefits we’ve just obtained. Let’s measure that. Here I tried to recreate the whole pipeline that Confluent’s Kafka performs upon message deserialization. These are the results:

MethodMean [ns]ErrorRatioGen 0Gen 1Allocated [B]
Create1049.518.401.000.99370.00196240
Confluent8116.392.927.658.66700.122154403
EfficientAsync5137.896.984.851.3504-8480
EfficientSync4993.034.524.711.2360-7760

Create benchmark is there just to demonstrate amount of memory/performance that deserialize operation would have without any calls to Protobuf deserializer. Again, the difference both in performance and memory allocations is clear…

Summary

We’ve demonstrated that eliminating allocations allows us to harvest low hanging fruits in performance realm. We are using this deserializer on production and it seem’s fine. I did not prepare any Nuget package with that solution but can provide one if the need arrives. I filed an issue and offered a pull request. So far a claim was offered that at some point in the future this might be implemented. Fingers crossed. Upvote 👍 if you care to include my change in official release.
Subscribe to this issue on Github to stay tuned for more info 🖖.

Sources