Problems with Kafka Connector

Hi,

I’m on Zeebe 0.17.0 following the guide for Writing an Apache Kafka Connector for Zeebe experiencing some problems.

bin/connect-standalone.sh config/connect-standalone.properties config/zeebe-connector-source.properties

[2019-04-27 12:23:19,084] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser:109)

[2019-04-27 12:23:19,084] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser:110)

[2019-04-27 12:23:19,090] INFO Connecting to Zeebe broker at 'localhost:26500' (io.berndruecker.demo.kafka.connect.zeebe.ZeebeSourceTask:50)

[2019-04-27 12:23:19,091] INFO Created connector ZeebeSourceConnector (org.apache.kafka.connect.cli.ConnectStandalone:104)

[2019-04-27 12:23:19,366] INFO Subscribed to Zeebe at 'localhost:26500' for sending records (io.berndruecker.demo.kafka.connect.zeebe.ZeebeSourceTask:67)

[2019-04-27 12:23:19,366] INFO WorkerSourceTask{id=ZeebeSourceConnector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:199)

[2019-04-27 12:23:19,643] INFO WorkerSourceTask{id=ZeebeSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)

[2019-04-27 12:23:19,643] INFO WorkerSourceTask{id=ZeebeSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)

[2019-04-27 12:23:19,643] ERROR WorkerSourceTask{id=ZeebeSourceConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)

java.lang.NullPointerException

at io.berndruecker.demo.kafka.connect.zeebe.ZeebeSourceTask.poll(ZeebeSourceTask.java:77)

at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)

at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

[2019-04-27 12:23:19,644] INFO Activated 1 jobs for worker KafkaConnector and job type sendMessage (io.zeebe.client.job.poller:97)

[2019-04-27 12:23:19,645] ERROR WorkerSourceTask{id=ZeebeSourceConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

[2019-04-27 12:23:19,649] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1136)

Did you configure the Kafka Topic(s) to listen to?

According to your exception this is null: https://github.com/berndruecker/kafka-connect-zeebe/blob/master/src/main/java/io/berndruecker/demo/kafka/connect/zeebe/ZeebeSourceTask.java#L77

This is set via “topics”, see example in https://github.com/berndruecker/kafka-connect-zeebe#source-zeebe-kafka

Hi Bernd,

I’m trying out your ZeebeSourceConnector. Here’s my config/zeebe-connector-source.properties file:

name=ZeebeSourceConnector
connector.class=io.berndruecker.demo.kafka.connect.zeebe.ZeebeSourceConnector

zeebeBrokerAddress=localhost:26500

topics=isk-activate

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

Thanks for helping.

Jonas, there was a bug in reading the configuration. I just fixed that in master now - could you try again?
As I haven’t really used the Sink in my demos that slipped to the cracks - sorry.

Hi,

I built a new package and replaced the old jar, started up Kafka (kafka_2.12-2.1.0), Zeebe (zeebe-broker-0.17.0) and then started the Connector with some new errors:

[2019-04-29 09:18:34,857] ERROR Failed to start task ZeebeSourceConnector-0 (org.apache.kafka.connect.runtime.Worker:455)
java.lang.NoClassDefFoundError: io/zeebe/client/api/subscription/JobHandler
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:709)
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471)
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
	at org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:51)
	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:408)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:315)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:340)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:211)
	at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException: io.zeebe.client.api.subscription.JobHandler
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 12 more

I could share the full log with you on Slack.

Btw: My intention was to try out the event source, not the sink. Am I wrong here?

Did you make sure that you use the “uber jar”? That’s the big one generated:

If you look into that via a ZIP tool of your choice the class should be contained within there:

Make sure this JAR is in the plugins directoy of Kafka, so that it can be picked up properly.

PS: I am not on slack as I am actually officially on leave right now :wink:

You’re right I copied the wrong JAR.

I’m now getting events on my topic:

[B@161e58ee

But they don’t make any sense, do they?

Depends on your definition of “sense” :wink: I understand why you see this - it is actually the byte array of your JSON. This is set here: https://github.com/berndruecker/kafka-connect-zeebe/blob/master/src/main/java/io/berndruecker/demo/kafka/connect/zeebe/ZeebeSourceTask.java#L73

I think you should see a proper JSON if you change it to:

    final SourceRecord record = new SourceRecord(null, null, topic, //
        Schema.STRING_SCHEMA, //
        collectedJob.getVariables()); 

Payload handling is actually something which might need a second thought for your environment anyway - as I am not sure what you want to send via Kafka records. Json? Avro? …?

I just thought I made some misstake - but if that’s the expected result I’m good. I was looking for something else (without reading the connector serialization code), that the Kafka event would be the set of variables (JSON formatted) defined in the scope of the process definition.

Now I’ll try the BPMN catch event and a Kafka connect sink.

EDIT: And yes it worked fine to change to Schema.STRING_SCHEMA.

Great! I actually changed the default to STRING_SCHEMA, as it might be less confusing for the beginning. Thanks for the feedback.

By the way: If you can share anything about your use case I would be interested :slight_smile: