Worker contact point in a clustered environment


#1

Hi,

I’m moving first steps in Zeebe Clustering. I’m a newbie in the clustering scenario (so please apologize in advance if this could be a trivial topic for some of you).

Let me describe my scenario

  1. I’m under Zeebe 0.16.4. (I will update to 0.17.0 soon)

  2. I configured a couple of nodes locally on my pc to do some testing:

    #node 0
    [cluster]
    nodeId = 0
    partitionsCount = 2
    replicationFactor = 2
    clusterSize = 2

    #node 1 with port offset 1
    [cluster]
    nodeId = 1
    partitionsCount = 2
    replicationFactor = 2
    clusterSize = 2
    initialContactPoints = [ “127.0.0.1:26502” ]

  3. A BPMN diagram is correctly deployed with a couple of tasks
    4 I’ve got a worker up and running whose types are the ones declared in each diagram task…the worker app client is contacting node 0 port 26500 successfully.

My question: How can the worker app contact a different node if node 0 is down? I had a look at API documentation…I see that I can obtain the cluster topology too. Even with that option, having the topology client side means coupling the topology to the client code.
Am I miss something? Is the worker client app notified of its contact node going down by some implicit mechanism? Should I use a configured load balancer and make worker client point to this demanding the load balancer the routing mechanism?
I can suppose to start a worker client app one for each node too…

Thanks a lot

Simone


#2

Hi @spulci,

thanks for trying out Zeebe.

First of all if you want to setup a cluster you should choose a odd count of nodes, otherwise you have no real benefit of it. This means you should use as node count 3 or 5 or 7 etc.

The reason for this is because we use an RAFT implementation as consensus protocol to provide consistency. In RAFT you need a quorum to commit events and make configuration changes. A quorum is defined as quorum = (nodeCount / 2) + 1. This means if you have an even node count like 2 you will have a quorum of 2. If now one of the nodes goes down you can’t reach your quorum so in the end you can’t commit any events (you have a fault tolerance of zero). So to get reliability you need to have at least 3 nodes, where you have still a quorum of 2. If then one node fails the cluster is still available (you have a fault tolerance of one).

How can the worker app contact a different node if node 0 is down?

The Zeebe client (the worker is part of it) contacts the gateway (embedded or standalone) and this gateway routes then the requests to the different brokers and partitions depending on the topology.
So you don’t need to use the topology in the client for this.

Even if the gateway can connect to the remaining broker with your setup it can’t activate further jobs, because there is no quorum.

Hope this helps?
Do not hesitate to ask more questions about this.

Greets
Chris


#3

Hi @Zelldon,

first of all many thanks for your reply! It’s helping me a lot to better understand RAFT consensus.

Gateway role is now clear :slight_smile:. It’s acting as a request router to the underlying network of nodes. I’m going to try an odd number of nodes, thanks a lot!
Be sure that, if needed, I’ll come back with more questions too :smiley:

Thanks again!!!

Simone


#4

@Zelldon a couple of more questions:

  1. What happens if the embedded gateway is down in a cluster of broker nodes? Should I deploy the gateway externally in an high reliability scenario?

  2. Ok, configured a cluster with 5 nodes, 5 partitions with a replica factor of 3. Runs great on my machine…Gateway set to true on broker instance 0, node from 1 to 4 has an offset to guarantee that each node use a set of five different ports.
    All runs smooth…but when I startup a client with a couple of worker methods implemented to do the job of two tasks with two different types (business logic is trivial: increments a payload of one unit and pass to the other node) i’ve got this two exception:

Node 0 Exception (type “second” is the type name of one of my worker method)

11:44:46.711 [io.zeebe.gateway.impl.broker.BrokerRequestManager] [gateway-zb-actors-0] WARN io.zeebe.gateway - Failed to activate jobs for type second from partition 3
io.zeebe.transport.RequestTimeoutException: Request timed out after PT15S
at io.zeebe.transport.impl.sender.OutgoingRequest.timeout(OutgoingRequest.java:151) ~[zb-transport-0.16.4.jar:0.16.4]
at io.zeebe.transport.impl.sender.Sender.onTimerExpiry(Sender.java:483) ~[zb-transport-0.16.4.jar:0.16.4]
at org.agrona.DeadlineTimerWheel.poll(DeadlineTimerWheel.java:284) ~[agrona-0.9.34.jar:0.9.34]
at io.zeebe.transport.impl.sender.Sender.processTimeouts(Sender.java:114) ~[zb-transport-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorJob.invoke(ActorJob.java:90) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorJob.execute(ActorJob.java:53) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorTask.execute(ActorTask.java:189) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorThread.executeCurrentTask(ActorThread.java:154) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorThread.doWork(ActorThread.java:135) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorThread.run(ActorThread.java:112) [zb-util-0.16.4.jar:0.16.4]

Node 4 Exception

11:44:03.505 [zb-stream-processor] [0.0.0.0:26541-zb-actors-1] ERROR io.zeebe.broker.workflow.repository - **Error on pushing deployment to partition 1. Retry request. **
io.zeebe.transport.RequestTimeoutException: Request timed out after PT15S
at io.zeebe.transport.impl.sender.OutgoingRequest.timeout(OutgoingRequest.java:151) ~[zb-transport-0.16.4.jar:0.16.4]
at io.zeebe.transport.impl.sender.Sender.onTimerExpiry(Sender.java:483) ~[zb-transport-0.16.4.jar:0.16.4]
at org.agrona.DeadlineTimerWheel.poll(DeadlineTimerWheel.java:284) ~[agrona-0.9.34.jar:0.9.34]
at io.zeebe.transport.impl.sender.Sender.processTimeouts(Sender.java:114) ~[zb-transport-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorJob.invoke(ActorJob.java:90) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorJob.execute(ActorJob.java:53) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorTask.execute(ActorTask.java:189) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorThread.executeCurrentTask(ActorThread.java:154) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorThread.doWork(ActorThread.java:135) [zb-util-0.16.4.jar:0.16.4]
at io.zeebe.util.sched.ActorThread.run(ActorThread.java:112) [zb-util-0.16.4.jar:0.16.4]

A few times the client generates this exception too:

2019-05-10 11:49:15.366 WARN 11385 — [pool-2-thread-1] io.zeebe.client.job.worker : Worker worker-name failed to handle job with key 219780 of type first, sending fail command to broker

java.lang.reflect.InvocationTargetException: null
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.zeebe.spring.client.bean.MethodInfo.invoke(MethodInfo.java:31)
at io.zeebe.spring.client.config.processor.ZeebeWorkerPostProcessor.lambda$null$1(ZeebeWorkerPostProcessor.java:49)
at io.zeebe.client.impl.subscription.JobRunnableFactory.executeJob(JobRunnableFactory.java:42)
at io.zeebe.client.impl.subscription.JobRunnableFactory.lambda$create$0(JobRunnableFactory.java:37)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.zeebe.client.cmd.ClientStatusException: Command rejected with code ‘COMPLETE’: Expected to complete job with key ‘219780’, but no such job was found
at io.zeebe.client.impl.ZeebeClientFutureImpl.transformExecutionException(ZeebeClientFutureImpl.java:93)
at io.zeebe.client.impl.ZeebeClientFutureImpl.join(ZeebeClientFutureImpl.java:50)
at io.zeebe.spring.example.WorkerApplication.handleFirstStressJob(WorkerApplication.java:56)
… 14 common frames omitted
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: NOT_FOUND: Command rejected with code ‘COMPLETE’: Expected to complete job with key ‘219780’, but no such job was found
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at io.zeebe.client.impl.ZeebeClientFutureImpl.join(ZeebeClientFutureImpl.java:48)
… 15 common frames omitted
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Command rejected with code ‘COMPLETE’: Expected to complete job with key ‘219780’, but no such job was found
at io.grpc.Status.asRuntimeException(Status.java:532)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:434)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:699)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
… 3 common frames omitted

it seems like that sometimes a worker is triggered by a message to do some job (identified by a key ID)…but another thread seems to have already marked that job as a COMPLETE one in a previous successful run. Am I right? Should it be considered a normal warning in a clustered environment? Can I mitigate this behaviour fine tuning worker “timeout” and “bufferSize” properties as well as cluster itself?


#5

Hey @spulci,

What happens if the embedded gateway is down in a cluster of broker nodes? Should I deploy the gateway externally in an high reliability scenario?

I’m not sure if get the question right. I assume you mean standalone Gateway?

Maybe the docs help, which says the following:

The gateway is stateless and sessionless, and gateways can be added as necessary for load balancing and high availability.

Regarding to your second questions:

Whats is your configured timeout? It seems that some workers try to complete a job which is already complected.

Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: NOT_FOUND: Command rejected with code ‘COMPLETE’: Expected to complete job with key ‘219780’, but no such job was found
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at io.zeebe.client.impl.ZeebeClientFutureImpl.join(ZeebeClientFutureImpl.java:48)
… 15 common frames omitted

Hope this helps.

Greets
Chris


#6

@zelldon -I think it’s that multiple gateway brokers (embedded or standalone) should be clustered behind a consul DNS (https://www.consul.io/discovery.html) or similar, so that my clients are configured for a single static address for the “Zeebe broker”, but don’t choke if one of the gateways in the cluster dies?


#7

Hey @jwulf,

this sounds reasonable to me :slight_smile:

Greets
Chris


#8

You hit the point @jwulf : this is the scenario I want to implement. Btw, thanks to both of you for your replies, greatly appreciated :sunny:

Thanks,

Simone