Zeebe Kafka Exporter Data

Hey everyone, I’m currently trying to create a spring service which is sort of a “Historian” for zeebe. In order to do this, I’m using the kafka exporter and creating a spring boot service which consumes the messages. currently I have it printing them but my plan eventually is to have them save relevant info into a mongodb.

EDIT
So I’ve looked into this further and the current issue I’m having is that I’m using the @KafkaListener annotation in my spring boot project, however this annotation requires the method its attached to to catch the payload message in a String. I’m now wondering what the best course of action is to deserialize this, currently i’m doing this:

@KafkaListener(id="workflow-instance", topics="zeebe-workflow")
public void consumeMessageWorkflow(String message) {
    GenericRecordDeserializer genericRecordDeserializer = new GenericRecordDeserializer();
    GenericRecord record = genericRecordDeserializer.deserialize("zeebe-workflow", message.getBytes());
    System.out.println("Zeebe Workflow Message: " + message);
}

However I’m seeing this each time the listener is called:

2020-03-30 14:49:26.597 ERROR 29943 --- [-instance-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = zeebe-workflow, partition = 0, leaderEpoch = 0, offset = 1139, CreateTime = 1585594095409, serialized key size = 8, serialized value size = 124, headers = RecordHeaders(headers = [RecordHeader(key = io.zeebe.exporter.kafka.serde.generic.GenericRecordDescriptorHeader, value = [87, 111, 114, 107, 102, 108, 111, 119, 73, 110, 115, 116, 97, 110, 99, 101, 82, 101, 99, 111, 114, 100])], isReadOnly = false), key =�Dz�@, value = 
order-process Ɓ�����(ϱ�����2Task_0cr2z188ϱ�����@a)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.zeebe.historian.service.SimpleConsumer.consumeMessageWorkflow(java.lang.String)' threw exception; nested exception is java.lang.UnsupportedOperationException: Cannot deserialize GenericRecord instances without specifying the descriptor header; nested exception is java.lang.UnsupportedOperationException: Cannot deserialize GenericRecord instances without specifying the descriptor header
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1774) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1766) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1679) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1605) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1257) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1007) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Caused by: java.lang.UnsupportedOperationException: Cannot deserialize GenericRecord instances without specifying the descriptor header
at io.zeebe.exporters.kafka.serde.generic.GenericRecordDeserializer.deserialize(GenericRecordDeserializer.java:90) ~[zeebe-kafka-exporter-serde-1.1.0.jar:1.1.0]
at com.zeebe.historian.service.SimpleConsumer.consumeMessageWorkflow(SimpleConsumer.java:24) ~[classes/:na]
at jdk.internal.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:326) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1728) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1711) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1666) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
... 9 common frames omitted

I know its looking for headers as well, however I’m not 100% sure what to give it for headers. Is this a kafka specific object or is it specific to these zeebe libraries?

So I found the solution, I added:
spring.kafka.consumer.key-deserializer=io.zeebe.exporters.kafka.serde.generic.GenericRecordDeserializer
spring.kafka.consumer.value-deserializer=io.zeebe.exporters.kafka.serde.generic.GenericRecordDeserializer

to my application.properties file and it now lets me use GenericRecord as the type for my listener

1 Like