Supporting multi-type Kafka topics in .NET

Zhongming Chen
2 min readAug 14, 2020

--

If you are using confluent-kafka-dotnet in your .net project for communicating with the Kafka server, then you are probably aware of this issue where the package currently (at the time of writing this post) still doesn’t support a single topic with multiple message types, as the author mentions, when it was initially designed both producer and consumer need to be strongly typed.

So we decided to follow the workaround described in the issue with a bit of tweak that is to create my consumer withIConsumer<string, GenericRecord> type other than IConsumer<string, byte[]>, which basically it is managed via a Dictionary<string,object> under the hood, so that all messages in different type can be deserialized to key-value pairs.

Create a GenericRecord consumer

However, the problem with that is, the GenericRecord type doesn’t give me too many exciting options for retrieving the field value except the TryGetValue(string fieldName, out object result) method, and ideally I want to get field value via property like message.FirstName for instance, instead of TryGetValue("firstName", out object result) therefore you probably can imagine it can easily end up a mess with lots of nasty strings everywhere.

Solution

After consulting with the author, here is the accepted solution

  1. Serialize GenericRecord instance back to bytes, then
  2. Deserialize the bytes into SpecificRecord

Serialize GenericRecord to bytes

To serialize GenericRecord to bytes, it is required to install Confluent.SchemaRegistry.Serdes.Avro package which gives me access to both generic AvroSerializer<T> and AvroDeserializer<T>.

Following is an example of how to serialize GenericRecord to byte array

var serializerConfigs = new AvroSerializerConfig(){
SubjectNameStrategy = SubjectNameStrategy.TopicRecord
AutoRegisterSchemas = false
};
var serializer = new AvroSerializer<GenericRecord>(_schemaRegistry, serializerConfigs);
var bytes = await serializer.SerializeAsync(message.Value, new SerializationContext());

The AvroSerializer<T> constructor takes two arguments, one is an instance of ISchemaRegistryClient so it can download schema from the remote registry if not exists already in the local cache, and another argument is an instance of AvroSerializerConfig which is optional. It has few properties, and you may notice in my example given above, I set the AutoRegisterSchemas to false, so it prevents my serializer from registering schema in run-time, and the SubjectNameStrategy to be TopicRecord (in my case) to make sure it can resolve the correct schema by matching schema stored in the remote schema registry.

Deserialize to a SpecificRecord

The deserializer is less exciting, similarly, it also takes two arguments, an instance of ISchemaRegistryClient and instance of AvroDeserializerConfig

var deserializer = new AvroDeserializer<MySpecificRecord>(_schemaRegistry));

var mySpecificRecord = await deserializer.DeserializeAsync(bytes, false, new SerializationContext());

Here is what look like if put all together

Send messages in different types to a single topic

With the above solution but reversing the process (firstly serialize a specific record into bytes then deserialize bytes into GenericRecord), I can then create a single generic producer that produces multiple messages in different types to a single topic.

Following is what it looks like as an example.

--

--