Worker stops processing messages after at max capacity

Hi
My zeebe worker is getting this message:
04:32:46.751 | zeebe | [log] INFO: Worker at max capacity - log has 33, a capacity of 32, and a minimum job batch size of 0.
After which, it does not process any further message
There is not much stress on the worker, actually, only manual testing, but I do have a very large number of variables
I was expecting the worker to process what it needs and then accept more jobs, but that is not happening, even when waiting for a long time.
If I restart the worker, the new worker receives the jobs and everything continues as planned
What am I doing wrong / what is happening?
How can I increase the log size and worker ability to take on jobs with large number of parameters?

Thanks
Eetay

1 Like

@Eetay hi there… can you please share more information? Which version of Zeebe are you using? what’s your topology? What client are you using for your Workers?

Thanks

This looks the zeebe-node ZBBatchWorker? What version is it? Can you share the worker code?

Thanks guys!
my broker is docker image “camunda/zeebe:0.22.1”
I have just one client, built on nodejs 12, using the 0.23.0-alpha.4 version zeebe-node
I’m running docker-compose file on 2xlarge machine, based on latest master of “zeebe-docker-compose/simple-monitor”
Changes made to zeebe-broker config: I have enabled elasticsearch exporter

I shared more of the code with @jwulf over linked-in message
but the main task handling is here below
Eetay


zbc.createWorker('my-first-worker', 'log', async (job, complete) => {
        console.log('LOG-IN', {type: job.type, elementId: job.elementId, variables: job.variables})
        const out = {[job.elementId]: job.variables, count: `${job.elementId}|${job.variables.count}` }
        console.log('LOG-OUT', out)
        amq.send(amq.client,'test',JSON.stringify({jobKey:job.key, variables: out})) // loop "log" jobs completion via activemq for test/evaluation purpose
})

amq.subscribe(amq.channel, 'test', async (body) => {
        try {
                const out = JSON.parse(body)
                console.log(`COMPLETE JOB ${out.jobKey}`)
                await zbc.completeJob(out) // complete "log" jobs here
        } catch (e) {
                console.log('COMPLETE EXCEPTION', body, e)
        }
})

If you are completing the job elsewhere, you need to use complete.forwarded() to release worker capacity. Described in the documentation here.

zbc.createWorker('my-first-worker', 'log', async (job, complete) => {
  console.log('LOG-IN', {type: job.type, elementId: job.elementId, variables: job.variables})
   const out = {[job.elementId]: job.variables, count: `${job.elementId}|${job.variables.count}` }
   console.log('LOG-OUT', out)
   amq.send(amq.client,'test',JSON.stringify({jobKey:job.key, variables: out})) // loop "log" jobs completion via activemq for test/evaluation purpose
   complete.forwarded()
})

Thanks much for your quick reply!
I suspected it was something I’m not handling correctly.

One more question:
Does the order in which the calls of to completeJob() and complete.forwarded() matter?

Anyway, thanks a lot for the quick response
Zeebe looks really great! Kudos!

1 Like

No.

Next, call complete.forwarded() in your job worker handler. This has no side-effect with the broker - so nothing is communicated to Zeebe. The job is still out there with your worker as far as Zeebe is concerned. What this call does is release worker capacity to request more jobs.