Workflow's payload maintenance / custom data

Hello zeebe-team,

I read all the documentation, gave your demos a try and try to be up to date. Simply to be ready to use zeebe once it is released :slight_smile:. I’m thinking about Camunda related situations within our software we solved in the past and how to solve them in context of zeebe. Doing so I was facing this:

We’ve implemented our own user task list and even an own process list for a couple of reasons. One main reason is to make user tasks or processes searchable by custom data which is not stored in process variables. This data might be complete different for each process definition or user task so this information is related to the context. We use Camunda’s event stream to keep our lists up to date and load custom data during event processing.

In zeebe there is no historic data available for random access. One could implement an event log subscription which fills a database as Camunda’s DbHistoryEventHandler does. This would be similar to what we do to build our own process and task lists. So from this pespective our current approach matches zeebe well.

But to be as scalable as zeebe is, it is necessary to avoid rdbms as you do. There are several possibilities for doing so (NoSQL, indexes, etc.) but whatever we choose it leads to have two competing data stores which might get out of sync. For the processes and tasks zeebe’s log is the primary data store and what about the custom data? I would prefer to have only one primary data store and be able to recreate any derived copy (for searching) if necessary.

A possible solution would be to set/update the payload of a process instance to store the custom data. This might be in the context of a certain task or the process instance or any other context of the process. But to store current data for an upcoming task an execution listener would be necessary to set the payload. Otherwise the new payload could only be set on completing the task which might be far in the future and any new task would receive outdated data.

But there are also disadvantages for storing custom data in the payload:

  • For data related to certain tasks you might create a shadow structure in the payload to the process’ structure or it is necessary that zeebe introduces the concept of local payloads similar to local variables in Camunda.
  • A lot of redundant data: I don’t know if this is true, but is it necessary to update the entire payload on completing a task? What about partial updates? What about concurrent updates? Is there a OptimisticLockingException for payload updates of parallel executions?

What I also was thinking about is to populate “custom” events. This events could be used to store custom data in zeebe as part of the event’s payload. This would reduce redundancy but requires to merge those events to get the entire view of custom data. Merging happens on updating the secondary storage, so this should not be a problem. But I think this is also something zeebe will have to be able to once local/partial payloads are possible.

What do you think about this and what are your thoughts about payload maintenance? Do you have any concepts ready to use but not yet implemented? It would be nice to start a discussion to get an idea where the journey is going to.

Cheers,
Stephan

Hi Stephan,

hey cool that you found the time to try out zeebe!

In zeebe there is no historic data available for random access. One could implement an event log subscription which fills a database as Camunda’s DbHistoryEventHandler does. This would be similar to what we do to build our own process and task lists. So from this pespective our current approach matches zeebe well.

Great, yes, that would be the idea.

To be able to give a better answer the points that follow, it would help me to explain a bit better what the “custom data” consists of? For example, is it’s lifetime scoped to the lifetime of a process instance?

Best,
Daniel

Here are some examples:

For processes:

  • context information like customer name & id, product name & id concerning that process instance. In general: any related information by which this process should be found (by business users or operators or the software itself) or needed to display it to a clerk/operator/manager.
  • ‎transaction id: if this process is about automation and the triggering system provides this information
  • if an incident occurs we usually start a seperate process which assists the operator solving this issue - for that processes the information about the causing process/task/execution.
  • ‎for hierachical processes we store the super parent’s id because often this is the context the user is interested in.
  • ‎usually we show a “status” which reflects milestones within the process. This helps the users to get an idea of the current status. This status may also change without any action of the process (e.g. if a service task communicates to another system asynchronously, we also report “sent”, “received” and “delayed”, “retried” and so on although this technical details is not part of the BPM.

For user tasks:

  • Similar information like processes, but related to user task: any information to enable the clerk to be able to decide whether he should claim that task or not withouth reviewen details of the task/process.
  • if the user task is aborted, we track the reason (which boundary event caused cancellation) so users can see why a task disappeared (if he looks into historic data, yes we provide them for the sake of confirmability and stats).

Most of the data we store is loaded/updated on processing Camunda events:

  • a process started/ended
  • ‎a user task is created/delegated/resolved/completed/aborted
  • ‎a throwing intermediate events occured
  • ‎a catching event occured

So the lifetime is limited to a usertask or a process instance. Storing beyond this might be interesting to avoid redundancy (e.g. two different process instances for the same customer) but would introduce more complexity for reconstructing the current state within the lifetime of a process instance (at which log position was the last update to this customers data?). But on the other hand: Where is the difference between reconstructing the status if the process instance’s lifetime is a year?

What we do not at the moment but we want to do is to store similar information for multi instance tasks/sub processes. Especially if processes are not used for automation but for more high level processes then those multi instance executions might be inspected by users exploring those iterations. For example to answer questions like ‘why this certain multi instance was processed in this way?’ or ‘what was the result of a DMN processed for a certain multi instance?’.

All this could be covered by scoped payloads analog to process instance variable scopes. The questions I asked myself are:

  • Should the payload be used for that?
  • If using the process’ payload I want to avoid influencing regular process instance variables. Otherwise sometimes those information is already available as a process instance variables.
  • Should there be custom events to avoid mixing up the workflows payload and the “custom data payload”?
  • Can those custom events belong to the process instance?
  • In any case, this would be much easier if the payload could be scoped to the current execution of the process instance - at least easier to me but not for zebee. Otherwise the payload might grow and grow. Remember: Context information for a task which is completed is (often) not required any more.
  • Does a zeebe workflow always store the entire payload? This cannot be: If a tasks completes by setting the payload the task always would have to set the entire payload. The historic payload would be loaded on task creation which might has changed in the meantime. So there must be some sort of “merge” mechanism in zeebe and therefor zeebe must be able to reconstruct the payload based on previous events. So this is the same mechanism which is needed to fullfil the requirements of my custom data events.

A lot of thoughts. I hope some of them match anything you have in mind :slight_smile:

Cheers,
Stephan

Hi Stephan,

I’ll try to answer your questions in the following. However, I am not sure I understand your requirements and use cases 100%. If you have the impression that my answers are not so helpful, then I suggest we discuss one specific case, i.e. one specific piece of information you would like to display, and how it could be handled in Zeebe and connected systems. That would reduce the complexity of the discussion a bit and we may be able to generalize from that.

Yes, I think you can use workflow payload to store contextual data.

Not sure I understand this. What is the difference between regular variables and contextual data. Why does it have to be separated?

Same as above, what would be the benefit of this separation? Eventually, we may implement a concept of custom events, but for a different purpose: message correlation and event handling in general. I.e. a custome event would be the trigger of a workflow event.

This can only be answered when we have an understanding if such custom events are needed for your purpose.

and

There are two important concepts here:

  1. Scoping of payload: In Zeebe’s current feature set, a workflow instance (the root scope of a process) has payload and task instances have payload (as a child scope of the process). Task payload is similar to local variables of a task in Camunda BPM.
  2. Input/Output mappings for tasks (and in the future embedded subprocesses, etc.): Input mappings allow you to generate a task payload from the parent scope’s payload. Output mappings allow you to update the parent scope’s payload. That way, you can control for how long certain parts of the payload should live. This is very similar to input/output mappings in Camunda BPM.

A third concept will be required when we implement the BPMN parallel gateway (or any other construct that introduces concurrency in a scope). Then, we will have tasks that are executed concurrently and that may update the parent scope payload (via output mappings) concurrently. We haven’t worked out the details for this concept yet, but we are pretty sure that this will work in a way that the individual branches of workflow execution will each maintain a copy the scope payload and there will be a merging mechanism when the concurrent branches are joined.

Does that help you?

Best regards,
Thorben

Hi Thorben,

The difference I mean is the usage: “process variables” are meant to be accessed from parts of the BPM to control the flow of the process (e.g. expressions for conditions, multi instance collections, etc.). All the other data is any context needed by the tasks but not by the process itself (but transported in process variables/payload).
My idea was to seperate those “scopes” to not be able to mix them up unintentionally. On using the payload for both types of data it is up the developer to keep this in mind and avoid any conflicting changes. I prefer solutions where the developer is supported in those situations because they are usually a source of hidden bugs (e.g. since there is no compiler which tells you that you overwrite data needed in an expression of the process). But as we talk about it I see that the process’ payload and the mapping to task payloads can be used for storing additional context data.

Maybe my differentation is based on the fact that we usually share the same databse for storing application data as the Camunda engine uses. So there are seperate tables for both kind of data. If the process engine is remote and there is no database (like zeebe) then there are two stores and this is what makes me feel unconfortable: two storages that might get out of sync. This is the main reason why I would like to store context data in the payload. At least to recreate a process list index or a user task list index (or any other list needed which is based on process execution information) and everything to be able to complete tasks.

If it is OK to use the payload then execution listeners (or task listeners) are required and also the ability to update the payload not only in response to a zeebe action (see RuntimeService.setVariable*) is necessary. I’ll argue this in response to your suggestion:

Example 1 about Runtime.setVariable* and execution listener/task listener:

“status” information may be updated on task completion, several times within a task (e.g. async communication) and at certain points of the process (e.g. intermediate throw events). This “status” usually does not match the BPMNs granularity, more a birds view (depending on what kind of process/task this is).

We built a process for microservice orchestration of deprovisioning a contract and we had two kind of “status” information: The main status reflected that birds perspective (which subsystem is deprovisioning at the moment). The secondary status showed details about the asynchronous communication to this microservice (was the initial request accepted, did the service respond in time, did an operator cause a retry, etc.) where we used RuntimeService.setVariable* methods.

Example 2 about execution listener/task listener:

This is a special situation: We want to store information concerning a payload-context which is already gone, the task’s payload after it was aborted by the boundary event. On using a database this not an issue because I can join that information. I guess it is not possible (even in zeebe) to change a scoped payload after that scope has been closed. Is there actually a dedicated “close” on scoped payloads?

I hope I could concretize some of my use cases.

Cheers,
Stephan

Hi Stephan,

Thanks for the clarification, that helps me understand. Let’s focus on example 1 for now, where we have a status that is some kind of aggregated state of a workflow instance. Contrary to my previous post, I believe that managing such data in an external system is better than doing it in Zeebe.

That is because Zeebe will probably never have as powerful data manipulation and querying capabilities that Camunda BPM has, as Camunda BPM is based on a relational database (good for querying and manipulating any data) whereas Zeebe is built on the log stream concept (good at sequential processing).

The way you would build this with Zeebe is via a component that connects Zeebe and the secondary system that stores the status data (e.g. Elastic Search). This connecting component embeds the Zeebe client and opens a subscription to all of Zeebe’s workflow-related events. On every event that is of interest to you, this component would update the status in the secondary system. On workflow instance completion or cancellation, you would remove that state (if required), etc.
The Zeebe client provides at-least-once invocation semantics for the handling of every subscribed event, so you are guaranteed to not miss a single one.

Of course, the logic which events are of interest must be encoded in this connecting component. Still the BPMN XML could be used to encode the configuration per process (e.g. you could have an extension property on every element that is relevant to the status).

It is correct that you then have two systems that manage data that is related. I see two cases for inconsistencies:

  1. Zeebe is ahead of the secondary system. For example, a task has already been completed but the secondary system has not recognized this, because it was not notified via the subscription yet.
  2. There is data loss in the secondary system.

For case 1, I don’t think that this is a problem for our example. Eventually the state of the secondary system will be up-to-date. In reality, this should be a short amount of time that shouldn’t be noticeable for humans. Also, the secondary system can maintain a timestamp for how old the data on a certain workflow instance is to somehow make that transparent to the user.

For case 2, you will need to have backups of the secondary system that contain the status data as well as the position of the last event that was received and processed via the Zeebe subscription. Via the Zeebe client, it is possible to “rewind” subscriptions to any previous event. On recovery, you can then rewind the subscription to the position of the last backup and then reprocess all events to recover the last state. If all backups were lost, you can rewind to the beginning. In the future, we will have a feature that will clean up events in Zeebe after a certain time and if they are no longer required in order to recover disk space. Then the rewind approach is limited to that window.

For the second example you gave, having the data in a secondary system allows you to define how long you keep which data, so it should not be an issue to keep data for longer than the time of a workflow instance.

Does that make sense?

Cheers,
Thorben

Addition: If the status can be updated by non-workflow events (as you write several times within a task (e.g. async communication)), I can imagine that publishing custom events to a Zeebe topic could make sense. That would allow you to integrate custom sources of workflow-related data into Zeebe’s event stream, allowing to apply the rewind idea in this case as well.

This is exaclty what I’m trying to do. But as you may known an Elastic index is known to be “instable” and should not be used as a primary data store. I guess that one might never see a data corruption but who wants to rely on this? They recommend that one should be able to rebuild the index if needed and store the data in any other system which is immune to data corruption. At the moment this data store are the Camunda history tables and our own tables. But a simple “Zeebe 2 Camunda history tables” subscriber/converter breaks the scalablitiy of the entire system because the load of Zeebe would also be reflected to the system filling the tables.

In addition to this the features of our software change over the time and sometimes we need to add another data to the index or change the way we store data in the index to build new features. In this situation we also need to rebuild the index.

An advantage of storing the history in another system would be that data in the secondary system represent a snapshot of the progress in Zeebe. Reconstructing this status by the Zeebe log might take more time. But then there are three storages of the same data, just for different purposes (processing = Zeebe, reliable storage = ?, searching = Elastic Search). All of them have different reliablities, have to stay in sync and also have to be operated.

Especially in the context of writing common business processing software I often need Camunda’s history tables to fulfill requirements of our use cases. For this Elastic search would fit perfectly. But filling an Elastic index might be slower then Zebee fires tasks to the workers. This might lead to repeating rejections of a task (if it is possible to determine whether the index is in sync to Zeebe) or processing deprecated data by the tasks.
To avoid those situations it would be better to store everything needed to fulfill the tasks in the Zebee’s payload. Doing so avoids any mismatch. If a current Camunda-situtation would require to query history tables to fulfill any functionality then in Zeebe an execution listener (maybe script based and running on the Zeebe node) has to store that information in den payload in the moment the information is generated. So I think it’s better to use Elastic Search only for non-time-critical situations like user task list, process lists in UIs and statistics for data mining.

This is exactly my point.

To sum up: I’m not sure whats the best way. I always try to reduce complexity of the overall system and if it is (nearly) easy possible than I would prefer to avoid another storage. On the other hand, I don’t know whether storing everything in the payload covers all situations in which I currently use the history or my own tables. I think I will have to reflect all the processes I built in the past an see if it would be possible.

Cheers,
Stephan