Zeebe: JobWorker subscription failing to read message

I get the below error while the JobWorker reads the message:
io.zeebe.client.cmd.InternalClientException: Failed to deserialize json ‘orchestration-pipeline-2’ to ‘Map<String, Object>’

This is because the bpmn processId is getting populated as string and is deserialized as Map.
I’m not explicitly setting customer header.
Environment:
Windows 10
Client version: <zeebe.version>0.17.0</zeebe.version>
broker: zeebe-broker-0.22.0-alpha1

Code snippet:

  1. Starting the flow:
    @StreamListener(target = Sink.INPUT, condition = “(headers[‘messageType’]?:’’)==‘eventAccepted’”)
    public void eventReceived(String messageJson) throws JsonParseException, JsonMappingException, IOException {

     Event<String> event = new ObjectMapper().readValue(messageJson, new TypeReference<Event<String>>() {});
    
    
     String payload = new ObjectMapper().writeValueAsString(event);
    
     Map<String, Object> response = new ObjectMapper().readValue(payload, HashMap.class);
     // and kick of a new flow instance
     System.out.println("New event accepted, start flow. " + payload);
     zeebe.newCreateInstanceCommand() //
             .bpmnProcessId("orchestration-pipeline-2") //
             //.bpmnProcessId("orchestration-pipeline") //
             .latestVersion() //
             .variables(response) //
             .send().join();
    

    }

  2. Subscription:
    @PostConstruct
    public void subscribe() {
    try {
    subscription = zeebe.newWorker()
    .jobType(“maas”)
    //.jobType(“Model-Service-2”)
    .handler(this)
    .timeout(Duration.ofSeconds(5))
    .open();
    System.out.println(“opened subscription”);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    @Override
    public void handle(JobClient client, ActivatedJob job) {
    try {
    //String payload = job.getVariables();
    String traceId = (String) job.getVariablesAsMap().get(“traceId”);
    String sender = (String) job.getVariablesAsMap().get(“sender”);
    System.out.println("modalService: traceId: " + traceId + " sender: " + sender);
    } catch (Exception e) {
    throw new RuntimeException("Could not parse payload: " + e.getMessage(), e);
    }
    client.newCompleteCommand(job.getKey()).send().join();
    }

  3. BPMN file:

    <bpmn:process id=“orchestration-pipeline-2” name=“orchestration” isExecutable=“true”>
    <bpmn:startEvent id=“eventAccepted” name=“accept event”>
    <bpmn:outgoing >SequenceFlow_05vz11o</bpmn:outgoing>
    </bpmn:startEvent>
    <bpmn:serviceTask id=“maas” name=“maas”>
    <bpmn:extensionElements >
    <zeebe:taskDefinition type=“maas” retries=“3” />
    </bpmn:extensionElements >
    <bpmn:incoming >SequenceFlow_05vz11o</bpmn:incoming>
    <bpmn:outgoing >SequenceFlow_0n4oyva</bpmn:outgoing>
    </bpmn:serviceTask>
    <bpmn:sequenceFlow id=“SequenceFlow_05vz11o” sourceRef=“eventAccepted” targetRef=“maas” />
    <bpmn:serviceTask id=“finsiher” name=“finish”>
    <bpmn:extensionElements >
    <zeebe:taskDefinition type=“finisher” />
    </bpmn:extensionElements>
    <bpmn:incoming >SequenceFlow_0n4oyva</bpmn:incoming>
    <bpmn:outgoing >SequenceFlow_0pp57s3</bpmn:outgoing>
    </bpmn:serviceTask>
    <bpmn:sequenceFlow id=“SequenceFlow_0n4oyva” sourceRef=“maas” targetRef=“finsiher” />
    <bpmn:endEvent id=“EndEvent_0qztqyb”>
    <bpmn:incoming >SequenceFlow_0pp57s3</bpmn:incoming>
    </bpmn:endEvent>
    <bpmn:sequenceFlow id=“SequenceFlow_0pp57s3” sourceRef=“finsiher” targetRef=“EndEvent_0qztqyb” />
    </bpmn:process>
    <bpmndi:BPMNDiagram id=“BPMNDiagram_1”>
    <bpmndi:BPMNPlane id=“BPMNPlane_1” bpmnElement=“orchestration-pipeline-2”>
    <bpmndi:BPMNShape id=“StartEvent_1ea8h7z_di” bpmnElement=“eventAccepted”>
    <dc:Bounds x=“172” y=“102” width=“36” height=“36” />
    <bpmndi:BPMNLabel >
    <dc:Bounds x=“158” y=“145” width=“64” height=“14” />
    </bpmndi:BPMNLabel>
    </bpmndi:BPMNShape>
    <bpmndi:BPMNShape id=“ServiceTask_1yqkh01_di” bpmnElement=“maas”>
    <dc:Bounds x=“260” y=“80” width=“100” height=“80” />
    </bpmndi:BPMNShape>
    <bpmndi:BPMNEdge id=“SequenceFlow_05vz11o_di” bpmnElement=“SequenceFlow_05vz11o”>
    <di:waypoint x=“208” y=“120” />
    <di:waypoint x=“260” y=“120” />
    </bpmndi:BPMNEdge>
    <bpmndi:BPMNShape id=“ServiceTask_19o7gx5_di” bpmnElement=“finsiher”>
    <dc:Bounds x=“420” y=“80” width=“100” height=“80” />
    </bpmndi:BPMNShape>
    <bpmndi:BPMNEdge id=“SequenceFlow_0n4oyva_di” bpmnElement=“SequenceFlow_0n4oyva”>
    <di:waypoint x=“360” y=“120” />
    <di:waypoint x=“420” y=“120” />
    </bpmndi:BPMNEdge>
    <bpmndi:BPMNShape id=“EndEvent_0qztqyb_di” bpmnElement=“EndEvent_0qztqyb”>
    <dc:Bounds x=“582” y=“102” width=“36” height=“36” />
    </bpmndi:BPMNShape>
    <bpmndi:BPMNEdge id=“SequenceFlow_0pp57s3_di” bpmnElement=“SequenceFlow_0pp57s3”>
    <di:waypoint x=“520” y=“120” />
    <di:waypoint x=“582” y=“120” />
    </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
    </bpmndi:BPMNDiagram>
    </bpmn:definitions>

Exception stack:

io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
at io.grpc.Status.asRuntimeException(Status.java:532) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:434) ~[grpc-stub-1.19.0.jar:1.19.0]
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:699) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:531) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[grpc-core-1.19.0.jar:1.19.0]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]
Caused by: io.zeebe.client.cmd.InternalClientException: Failed to deserialize json ‘orchestration-pipeline-2’ to ‘Map<String, Object>’
at io.zeebe.client.impl.ZeebeObjectMapper.fromJsonAsMap(ZeebeObjectMapper.java:45) ~[zeebe-client-java-0.17.0.jar:0.17.0]
at io.zeebe.client.impl.response.ActivatedJobImpl.(ActivatedJobImpl.java:45) ~[zeebe-client-java-0.17.0.jar:0.17.0]
at io.zeebe.client.impl.subscription.JobPoller.lambda$onNext$0(JobPoller.java:76) ~[zeebe-client-java-0.17.0.jar:0.17.0]
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[na:na]
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1621) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[na:na]
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[na:na]
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) ~[na:na]
at io.zeebe.client.impl.subscription.JobPoller.onNext(JobPoller.java:77) ~[zeebe-client-java-0.17.0.jar:0.17.0]
at io.zeebe.client.impl.subscription.JobPoller.onNext(JobPoller.java:29) ~[zeebe-client-java-0.17.0.jar:0.17.0]
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:421) ~[grpc-stub-1.19.0.jar:1.19.0]
at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33) ~[grpc-core-1.19.0.jar:1.19.0]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:519) ~[grpc-core-1.19.0.jar:1.19.0]
… 5 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token ‘orchestration’: was expecting (‘true’, ‘false’ or ‘null’)
at [Source: (String)“orchestration-pipeline-2”; line: 1, column: 14]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:673) ~[jackson-core-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2835) ~[jackson-core-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1889) ~[jackson-core-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:747) ~[jackson-core-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4129) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988) ~[jackson-databind-2.9.5.jar:2.9.5]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3011) ~[jackson-databind-2.9.5.jar:2.9.5]
at io.zeebe.client.impl.ZeebeObjectMapper.fromJsonAsMap(ZeebeObjectMapper.java:42) ~[zeebe-client-java-0.17.0.jar:0.17.0]
… 21 common frames omitted

Any insight on what i’m missing is helpful.
[/details]

Hi @vmysore, use the same client version as the broker.

1 Like

Hi @jwulf,
Thanks for the assistance. I see the below exception on startup of the broker. Both the client and the server versions are 0.21.1. Do i need to overide any default configuration.

2019-11-14 09:59:31.869 [exporter] [localhost:26501-zb-fs-workers-1] INFO com.hazelcast.core.LifecycleService - [192.168.56.1]:5701 [dev] [3.11] [192.168.56.1]:5701 is STARTED
Nov 14, 2019 9:59:36 AM io.grpc.netty.NettyServerTransport notifyTerminated
INFO: Transport failed
io.netty.handler.codec.http2.Http2Exception: HTTP/2 client preface string missing or corrupt. Hex dump for received bytes: 16030100f7010000f303039ee1325a64b89c949485735424
at io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:103)
at io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.readClientPrefaceString(Http2ConnectionHandler.java:306)
at io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:239)
at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)

The exception is observed when we try to register the bpmn workflow.

Code snippet:
@Value("${zeebe.brokerContactPoint}")
private String zeebeBrokerContactPoint;

@Bean
public ZeebeClient zeebe() {
    System.out.println("Connect to Zeebe at '" + zeebeBrokerContactPoint + "'");

    // Cannot yet use Spring Zeebe in current alpha
    ZeebeClient zeebeClient = ZeebeClient.newClientBuilder() //
            .brokerContactPoint(zeebeBrokerContactPoint) //
            .build();

    // Trigger deployment
    zeebeClient.newDeployCommand() //
            //.addResourceFromClasspath("work-flow.yaml") //
            .addResourceFromClasspath("maas.bpmn") //
            .send().join();

    return zeebeClient;
}

dependency:
<zeebe.version>0.21.1</zeebe.version>

jdk: 13

2019-11-14 10:07:08.124 [partition-1-processor] [localhost:26501-zb-actors-0] INFO io.zeebe.logstreams - Recovered state of partition 1 from snapshot at position -1
Nov 14, 2019 10:08:04 AM io.grpc.netty.NettyServerTransport notifyTerminated
INFO: Transport failed
io.netty.handler.codec.http2.Http2Exception: HTTP/2 client preface string missing or corrupt. Hex dump for received bytes: 16030100f7010000f303039c315eb9e263e526f03f2b3e1a
at io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:103)
at io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.readClientPrefaceString(Http2ConnectionHandler.java:306)
at io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:239)
at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)

Sounds like TLS. See here (deep link):

1 Like

@vmysore have you tried using Spring Zeebe for the client? It will be great if you give it a try as you are using Spring Cloud Streams it might be a good fit… if you use the spring modules you can configure to not use TLS with:

zeebe.client.security.plaintext=true

Thanks @jwulf. I don’t see the error after disabling the tls at the client side.

Thanks @salaboy. We’re using spring cloud streams.