Unable to filter variables being sent to Kafka connector

Hi,

I’m having trouble filtering variables being sent to Kafka as mentioned in GitHub - camunda-community-hub/kafka-connect-zeebe: Kafka Connector for Zeebe.io. The message which gets posted on my kafka topic is -
‘{
“bpmnProcessId”: “toKafka”,
“customHeaders”: {
“sendVars”: “testID, key”,
“topic”: “pong3”
},
“deadline”: 1583748375824,
“elementId”: “Task_13zltv1”,
“elementInstanceKey”: 2251799813687492,
“key”: 2251799813687493,
“retries”: 3,
“type”: “testService”,
“variables”: “{}”,
“variablesAsMap”: {},
“worker”: “kafka-connector-1”,
“workflowDefinitionVersion”: 1,
“workflowInstanceKey”: 2251799813687486,
“workflowKey”: 2251799813687421
}’

The filtering seems to happen only inside ‘variables’ and ‘variablesAsMaps’ fields of the message. Is there a way I can get rid of all the other keys like ‘bpmnProcessId’, ‘workflowInstanceKey’, ‘workflowKey’ etc so that my message becomes something as shown below?
‘{
“type”: “testService”,
“variables”: “{}”,
“variablesAsMap”: {}
}’

Thanks in advance.

Hi @KaranChauhan01,

This is not an issue with “filtering variables”.

This is the job metadata. The variables are the key:values in the variables field.

Sorry, no one-line answer from me.

There may be a way to accomplish what you are trying to do (format the record being sent to Kafka) through configuration, either on the Kafka side or in the connector.

Someone else may have the answer, but I would start by checking out the connector source code and opening it in IntelliJ, or whatever you use as your IDE, and going through it to understand how it works. The answers are always in the source code.

Maybe this? https://github.com/zeebe-io/kafka-connect-zeebe/blob/master/config/quickstart-zeebe-source.properties#L22

and this? https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas

This blog shows the idea behind the construction of the SourceRecord in the Connector: https://zeebe.io/blog/2018/12/writing-an-apache-kafka-connector-for-zeebe/

This looks like the point in the connector where this is currently implemented: https://github.com/zeebe-io/kafka-connect-zeebe/blob/master/src/main/java/io/zeebe/kafka/connect/source/ZeebeSourceTask.java#L76

I would dig into that code to see how it does it, and how it determines what to do, and where (if at all) it looks for configuration.

HTH
Josh

Thanks @jwulf for the prompt reply. I had already gone through the source code before posting the query. I just wanted to be sure that my understanding of ‘filter variables’ was in sync with the functionality and probably find someone who has already faced and fixed what I am facing. The links you have shared seem promising. Thanks for that. :slight_smile:

1 Like

Hey Karan. Josh is right - best configure that custom transformation using Kafka Connect means (as far as I know https://docs.confluent.io/current/connect/transforms/index.html). We already discussed if we probably should switch to it also for own stuff - to get rid of the bespoke JsonPath handling: https://github.com/zeebe-io/kafka-connect-zeebe/issues/18. Happy to hear your thoughts…
Best
Bernd