How to complete a task later asynchronously


#1

Hey guys.

I currently want to implement a simple demo which sends a message via Kafka and waits for an response message on Kafka before to move on. As we do not yet have receiving message capabilities in the broker I thought to simply use a task handler to send the message and then do nothing. The response message will trigger the completion of the task later.

In code I do something like this:

  @ZeebeTaskListener(taskType = "payment")
  public void handleTaskA(final TasksClient client, final TaskEvent task) {
    // send message via Kafka (or basically Spring Cloud Streams, could also be Rabbit or sth similar)
    messageSender.send( //
        new Message<RetrievePaymentCommandPayload>(...));
    // do NOTHING, so the workflow instance keeps waiting there
   }

Now we can listen on the messages on Kafka, with Spring CLoud Streams e.g.:

  @StreamListener(target = Sink.INPUT)
  public void messageReceived(String messageJson) {
    Message<PaymentReceivedEvent> message = new ObjectMapper().readValue(messageJson, new TypeReference<Message<PaymentReceivedEvent>>(){});
    PaymentReceivedEvent evt = message.getPayload();    
    
    // THIS IS NOT POSSIBLE:
    zeebeClients.tasks().complete(evt.getTaskId());

I need to have a TaskEvent in order to complete the task. Why is this? Is there a reason that prevents completing the task with some id only? Or probably a combination of 2 or informations?
Is there any way to reconstruct this myself. What is necessary to set on this event and what is not needed?
Or would it be doable to stringify the event and reconstruct it from there?

Happy to take any thoughts - as this is currently the only way I see to implement waiting for any event (message, timer, …) from the outside.

Thanks
Bernd


#2

Philipp just pushed me into the right direction via Slack:

  • Remember the position of the event.
  • Combined with the topic this is a unique identifier
  • You can re-read the event from Zeebe later on in this case

open a topic subscription to find the given with the stored position https://github.com/zeebe-io/zeebe-simple-monitor/blob/master/src/main/java/io/zeebe/zeebemonitor/rest/WorkflowInstanceResource.java#L86

Thanks a lot Philipp!!

I can post some code here as soon as I have it working.


#3

Here my current code (working): https://github.com/flowing/flowing-retail/blob/zeebe/kafka/java/order-zeebe/src/main/java/io/flowing/retail/kafka/order/adapter/ZeebeWorkarounds.java


#4

Hi Bernd,

I think this will be a better fit once you will have bpmn message send / catch events.

You will send a message to a kafka topic and consume the response and correlate it back to zeebe from another one.

Makes sense?

(Now whether that will be a single service task or a sequence of message send / catch (receive) is another story.)

Best,
Daniel


#5

Also: https://gist.github.com/meyerdan/6f9dd94192652977870346d09d310b20


#6

Definitely make sense for this use case - once we have that. But I think we should still have a way of completing a task with an ID only.

And there is a related discussion around if send command/receive event should always be two activities (send & receive task) or if that can be combined into one.We haven’t yet talked about how we see this in Zeebe…


#7

Too bad you cannot comment in gists :wink: Thanks for the proposal - let me comment on some key points:

ZeebeClient client = ...;
ZeebeTopicClient orderTopic = client.topic("order-processing");

Probably a good idea to abstract the topic at the very beginning - this also makes it easy to inject it and the code using it doesn’t have to know about the topic name at all.

I still would love to have

ZeebeTopicClient orderTopic = client.topic(DEFAULT); 

This is confusing - listTopics get you a Map of partitions?

Map<String, PartitionInfo> partitions = client.listTopics();
PartitionInfo partitionInfo = partitions.get("order-processing");

I thought a bit about it, but I think I like this way of making it clear that you send commands. It is very in-sync with the conepts behind.

orderTopic.send(new StartWorkflowInstanceCommand("orderProcess").payload(...));

The downside is definitely that it will be harder for “workflow” people to understand it.
Also we should think about how a user will know about the possible commands - as you cannot simply use code completion with that. Probably some kind of Factory or static helper could help with this?

orderTopic.send(Commands.startWorkflowInstanceCommand("orderProcess").payload(...));

I definitely do not like the design that you have to store the task event yourself for completing it later asynchronously (https://gist.github.com/meyerdan/6f9dd94192652977870346d09d310b20#file-taskworkerasync-java). Why is that? What kind of information do you need beyond the task id? We should at least provide a simple strinigified identifier that can be used to restore the event with sufficient data to complete. Actually I still would prefer an API where you can simply complete by task id. What is the show stopper of doing that?

And last comment:

new TaskHandler()  {
  public void handle(TaskEvent taskEvent) {
    topicClient.send(new CompleteTaskCommand(taskEvent).payload(...));

Where does the topicClient comes from in https://gist.github.com/meyerdan/6f9dd94192652977870346d09d310b20#file-taskworkersync-java-L15?
And: While it is consistent, at this point sending a Command feels a bit wired compared to the current API.


#8

A task worker in Zeebe, you have to look at it as stream processor: it reacts to an event “Task Locked and Available for processing” calculates it’s logic, potentially updating it’s state and then submits a command “Compte Task”.
This is a different perspective from “Call a service”. We need to synchronize on this, make it clearer and document it properly.

In the example you created: do you

  1. use kafka as asynchronous transport for “calling” services
  2. use kafka as an event bus to which events are published and from which events are consumed to which the workflow should react.

Case 1) does not make sense in my opinion.
Case 2) makes sense, but here, if you build an integration with Zeebe, than that integration will be stateful and will have to do the correlation before Zeebe can do it itself


#9

In the example it would be option 2 - as I will wait not for the ACK of the command but a real event that is issued from another component.
But despite that I do not see why option 1 should not make sense? Probably Kafka is misused as messaging system in this case - but I see that happening at quite some customers. And then you send a command and wait for the ack - why not?

In both cases building a stateful integration is a show stopper of using Zeebe for this use case in my perspective. If I have to store state in order to use a state machine that is really wired (and hard to do at scale). But I know that this is an intermediate stage until we can really do it ourselves.

But what I still do not understand: Where is the problem in providing a completeTask(taskId) method? Is there a technical show stopper? Why do you want to avoid that use case by all means?

And I think we could/should still discuss if one service task in a BPMN process could serve as send AND receive in one node. We discuss this every time we have some kind of async communication (messaging, Kafka) with Camunda BPM. But that might be an optimization later on and it is not that important at the moment.


#10

It strikes me that we’re attempting to fit a square peg in a round hole here… and I agree with Daniel that this use case will work much more effectively once we have events - especially message events - in Zeebe.

I think this argument is very similar to the argument of whether it should be possible to implement an activity in a Camunda BPM BPMN diagram using ActivityBehavior, and that one’s essentially private (in an impl package) because there’s a better way to accomplish that in Camunda BPMN… message events.

For now, in demos, it’s possible to hold a lock for a very long time and thus implement this functionality, but it isn’t perfect. Then again, demos don’t have to be perfect. :blush:

I do agree that we should be able to complete a task using a task id; that just makes sense.

Big picture, I think we need to let Zeebe evolve a bit more and wait until it has functionality such as message events before we spend a lot of time discussing the API.

-Ryan


#11

I discussed with Sebastian earlier on Skype and he explained me a bit more why we cannot complete a task based on its task id at the moment.

When we complete a task, the event that we hand in will be written to the log directly. That means it has to contain all relevant information. We do not look up the event or attributes of it, as this would require to read back the event log - which is not very performant to do.

We will have to tackle questions like this with added features around e.g. message correlation. Let’s wait for that, maybe that also can help with the complete(taskId) - as this is basically also only a correlation :slight_smile:


#12

It’s like CQRS if the client has to store the event locally or find it in the topic. For convenience, this kind of functionality should be integrated into the clients provided by Camunda. Ultimately, what users will work with is the client API. What happens internally is hidden like Camunda BPM’s RuntimeServiceImpl.

If one decides to always look up the event like Philipp showed, does that come with a performance penalty?