Supporting multi-type Kafka topics in .NET
If you are using confluent-kafka-dotnet in your .net project for communicating with the Kafka server, then you are probably aware of this issue where the package currently (at the time of writing this post) still doesn’t support a single topic with multiple message types, as the author mentions, when it was initially designed both producer and consumer need to be strongly typed.
So we decided to follow the workaround described in the issue with a bit of tweak that is to create my consumer withIConsumer<string, GenericRecord>
type other than IConsumer<string, byte[]>
, which basically it is managed via a Dictionary<string,object>
under the hood, so that all messages in different type can be deserialized to key-value pairs.
However, the problem with that is, the GenericRecord
type doesn’t give me too many exciting options for retrieving the field value except the TryGetValue(string fieldName, out object result)
method, and ideally I want to get field value via property like message.FirstName
for instance, instead of TryGetValue("firstName", out object result)
therefore you probably can imagine it can easily end up a mess with lots of nasty strings everywhere.
Solution
After consulting with the author, here is the accepted solution
- Serialize GenericRecord instance back to bytes, then
- Deserialize the bytes into SpecificRecord
Serialize GenericRecord to bytes
To serialize GenericRecord
to bytes, it is required to install Confluent.SchemaRegistry.Serdes.Avro package which gives me access to both generic AvroSerializer<T>
and AvroDeserializer<T>
.
Following is an example of how to serialize GenericRecord
to byte
array
var serializerConfigs = new AvroSerializerConfig(){
SubjectNameStrategy = SubjectNameStrategy.TopicRecord
AutoRegisterSchemas = false
};
var serializer = new AvroSerializer<GenericRecord>(_schemaRegistry, serializerConfigs);
var bytes = await serializer.SerializeAsync(message.Value, new SerializationContext());
The AvroSerializer<T>
constructor takes two arguments, one is an instance of ISchemaRegistryClient
so it can download schema from the remote registry if not exists already in the local cache, and another argument is an instance of AvroSerializerConfig
which is optional. It has few properties, and you may notice in my example given above, I set the AutoRegisterSchemas
to false
, so it prevents my serializer from registering schema in run-time, and the SubjectNameStrategy
to be TopicRecord
(in my case) to make sure it can resolve the correct schema by matching schema stored in the remote schema registry.
Deserialize to a SpecificRecord
The deserializer is less exciting, similarly, it also takes two arguments, an instance of ISchemaRegistryClient
and instance of AvroDeserializerConfig
var deserializer = new AvroDeserializer<MySpecificRecord>(_schemaRegistry));
var mySpecificRecord = await deserializer.DeserializeAsync(bytes, false, new SerializationContext());
Here is what look like if put all together
Send messages in different types to a single topic
With the above solution but reversing the process (firstly serialize a specific record into bytes then deserialize bytes into GenericRecord), I can then create a single generic producer that produces multiple messages in different types to a single topic.
Following is what it looks like as an example.