Hey everyone, I’m currently trying to create a spring service which is sort of a “Historian” for zeebe. In order to do this, I’m using the kafka exporter and creating a spring boot service which consumes the messages. currently I have it printing them but my plan eventually is to have them save relevant info into a mongodb.
EDIT
So I’ve looked into this further and the current issue I’m having is that I’m using the @KafkaListener annotation in my spring boot project, however this annotation requires the method its attached to to catch the payload message in a String. I’m now wondering what the best course of action is to deserialize this, currently i’m doing this:
@KafkaListener(id="workflow-instance", topics="zeebe-workflow")
public void consumeMessageWorkflow(String message) {
GenericRecordDeserializer genericRecordDeserializer = new GenericRecordDeserializer();
GenericRecord record = genericRecordDeserializer.deserialize("zeebe-workflow", message.getBytes());
System.out.println("Zeebe Workflow Message: " + message);
}
However I’m seeing this each time the listener is called:
2020-03-30 14:49:26.597 ERROR 29943 --- [-instance-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = zeebe-workflow, partition = 0, leaderEpoch = 0, offset = 1139, CreateTime = 1585594095409, serialized key size = 8, serialized value size = 124, headers = RecordHeaders(headers = [RecordHeader(key = io.zeebe.exporter.kafka.serde.generic.GenericRecordDescriptorHeader, value = [87, 111, 114, 107, 102, 108, 111, 119, 73, 110, 115, 116, 97, 110, 99, 101, 82, 101, 99, 111, 114, 100])], isReadOnly = false), key =�Dz�@, value =
order-process Ɓ�����(ϱ�����2Task_0cr2z188ϱ�����@a)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.zeebe.historian.service.SimpleConsumer.consumeMessageWorkflow(java.lang.String)' threw exception; nested exception is java.lang.UnsupportedOperationException: Cannot deserialize GenericRecord instances without specifying the descriptor header; nested exception is java.lang.UnsupportedOperationException: Cannot deserialize GenericRecord instances without specifying the descriptor header
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1774) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1766) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1679) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1605) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1257) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1007) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.lang.UnsupportedOperationException: Cannot deserialize GenericRecord instances without specifying the descriptor header
at io.zeebe.exporters.kafka.serde.generic.GenericRecordDeserializer.deserialize(GenericRecordDeserializer.java:90) ~[zeebe-kafka-exporter-serde-1.1.0.jar:1.1.0]
at com.zeebe.historian.service.SimpleConsumer.consumeMessageWorkflow(SimpleConsumer.java:24) ~[classes/:na]
at jdk.internal.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:326) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1728) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1711) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1666) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
... 9 common frames omitted
I know its looking for headers as well, however I’m not 100% sure what to give it for headers. Is this a kafka specific object or is it specific to these zeebe libraries?