Questions on GoLang client - especially payload handling

Hey guys.

I currently build an example using the GoClient for Zeebe and struggle a bit. Here a list of current questions/some feedback:

1. Refactoring?

Is a refactoring planned like with the Java Client? Or does the “go way” of doing things simply feels a bit wired for an old Java guy?

2. Payload Handling

I do not understand the payload handling in the Go client. The payload is a simple JSON but it seems I cannot easily access it as JSON? The examples use a map only:

    payload := make(map[string]interface{})
    payload["orderId"] = "31243"

    instance := zbc.NewWorkflowInstance("payment", -1, payload)

or

subscription, err := zbClient.JobSubscription("default-topic", "SomeWorker", "charge-credit-card",  1000, 32, 
    func(client zbsubscribe.ZeebeAPI, event *zbsubscriptions.SubscriptionEvent) {
        job, err := event.GetJob()
        if err != nil {
            panic(err)
        }	
        payload, err := job.GetPayload()

In the latter case I got around this limitation when unpacking myself

        msgpack.Unmarshal(job.Payload, &payloadJson)

But is that really the intention?

And because of all the “if err!=nil” it would be maybe a huge benefit to provide some shortcut methods like job.getPayloadAsJson() or the like (I do not look at consistency at this point - just to express my idea).

If I combine this with additional code to work around the payload/map limitation I end up in something nobody can read (and I actually work on an example how easy it is ;-)).

3. Empty error messages

This code:

payload := "{somevalidJson}"
instance := zbc.NewWorkflowInstance("payment", -1, payload)
msg, err := zbClient.CreateWorkflowInstance("default-topic", instance)

if (err != nil) {        
    fmt.Fprintf(w, "Bam: " + err.Error())

Results in an error - but without any error message. So it is really hard to get an idea what is wrong. It seems to be the payload - as the code above works. But I can only guess.

4. Get Started / Example

The Get Started Guide is really a bit too short and what is missing is a running example somewhere.This might be part of https://github.com/zeebe-io/zbc-go/issues/185 - a vital one to get Go people on board.

That’s everything from me so far.

Thanks in advance and cheers
Bernd

1 Like

Hey Bernd,

Thank you very much for this quite large feedback. I will try to answer your questions from my perspective.

  1. Yes, we have a major overhaul coming soon which should improve things on several fronts. What do you mean by “a bit wired”? Go in many ways follows “old school” Unix philosophy, which is quite different from Java world, so yes things may seem a bit different.

  2. I’m not sure If I fully understand your question here. What do you mean by “access it as a JSON”? JSON itself is not a data structure with access pattern but a serialization format (series of bytes of how we represent data) and as such you can marshal or unmarshal it to different data structures - maps, slices or the user defined data types (which would be prefered the way of building production-grade services). I’ll try to elaborate what I mean later on. I’m completely aware of that examples and tests around payload handling are a bit primitive for non-Go people so I will try to explain how would I build a service with some job workers.

  3. & 4. We are aware of those things as you might have already seen on our issue tracker. Due to new direction and prioritization of the things which we are currently working on I can’t make any promises when those tickets will be resolved.

In the examples you provided you will get payload as a map, more exact - specialized version of map with the following signature map[string]interface{}. What this means, is that key attributes of this map will be string and values will be interface{}. Every type in Go implements at least 1 interface, this is this one - interface{}. This translates that values of the map will be any type. If we think about it (and check JSON grammar in spec.) we can conclude that map[string]interface{} is a superset of JSON spec and representing JSON tree with is perfectly valid.

Therefore, (if I understood you correctly) the following would be equal:

  payload := job.GetPayload() 
  payload := job.GetPayloadAsJSON()

Now, the question is, if it’s there, should we use it in production? My answer, is definately no, cause we lose the type checking at runtime and since the payload is object which is coming from I/O, we should always keep type checks.

I wrote a small example on how I see the production-grade service should be done. The example can be found here. Let me try to explain it - We have the following process:

First, we have to create a client and connect to Zeebe. In the example, I will use a special feature of Go - init functions. Those functions are special module functions which will get executed before anything else in the module and are used for variables initialization, checking/fixing program’s state, registering, running one-time computations, etc.

var zbClient *zbc.Client

func init() {
   client, err := zbc.NewClient(Addr)
   if err != nil { log.Fatal(err) }
   zbClient = client
}

Now, once we have our client we can start our program resources:


func main() {
   _, err := zbClient.CreateWorkflowFromFile(Topic, zbcommon.BpmnXml, ProcessDefinitionPath)
   if err != nil { log.Fatal(err) }

   fooSub, err := zbClient.JobSubscription(Topic, "fooWorker", "foo", 30000, 30, new(FooJobHandler).Handle)
   if err != nil { log.Fatal(err) }
   fooSub.StartAsync()

   barSub, err := zbClient.JobSubscription(Topic, "fooWorker", "bar", 30000, 30, new(BarJobHandler).Handle)
   if err != nil { log.Fatal(err) }
   barSub.StartAsync()

   foobarSub, err := zbClient.JobSubscription(Topic, "fooWorker", "foobar", 30000, 30, new(FooBarJobHandler).Handle)
   if err != nil { log.Fatal(err) }
   foobarSub.StartAsync()

   http.HandleFunc("/create/instance", createInstance)
   log.Fatal(http.ListenAndServe(fmt.Sprintf("%s:%d", ServerInterface, ServerPort), nil))
}

We deployed our workflow and started all workers which we need to start by our process definition. You can notice here that I’m using a similar approach as you can find in Java. We have to define our worker types and implement Handle function which will execute every job for a given task, you can use this or just normal functions. After that we are starting a HTTP server with one GET endpoint /create/instance.

Now, if we look into the process definition you can see that there we have some ioMapping records for our first foo task. Those mappings are the following:

        <zeebe:ioMapping>
          <zeebe:input source="$.a" target="$.foo" />
          <zeebe:input source="$.name" target="$.name" />
        </zeebe:ioMapping>

This is important to note during payload handling. Now let’s create our workflow instance with payload, but before let’s look into the way how we define our payload in a way that we preserve type checking.

type CreateWorkflowPayload struct {
   ID   int    `msgpack:"a"`
   Name string `msgpack:"name"`
}

and payload for foo job worker:

type FooJobPayload struct {
   ID   int    `msgpack:"foo"`
   Name string `msgpack:"name"`
   Value int  `msgpack:"value"`
}

If we look into the implementation of a handler for that endpoint it looks like this:

func createInstance(w http.ResponseWriter, r *http.Request) {
   name := r.URL.Query().Get("name")
   
   /// ... more code in example ...
   
   payload := &CreateWorkflowPayload{ID: 1, Name: name}
   instance := zbc.NewWorkflowInstance("demoProcess", zbc.LatestVersion, payload)

   _, err := zbClient.CreateWorkflowInstance(Topic, instance)
   if err != nil {
      log.Printf("[http] InternalServerError %d - /create/instance?name=%s - %s\n", http.StatusInternalServerError, name, err)
      http.Error(w, err.Error(), http.StatusInternalServerError)
   }

    /// ... more code in example ...
}

Now, if we execute GET request towards our HTTP server, a handler will create a new payload with ID=1 and Name=<GETQueryParameter>. Example:

curl http://localhost:4567/create/instance\?name\=myWorkflowInstance

If we look into our server log, we can see something like this:

Great! Let’s look into how these prints are achieved. To implement a Zeebe job handler, we need a function with the following signature:

type FooJobHandler struct {
   // NOTE: atomic values which this handler might use
}

func (handler *FooJobHandler) Handle(client zbsubscribe.ZeebeAPI, event *zbsubscriptions.SubscriptionEvent) {
   var payload FooJobPayload
   event.LoadPayload(&payload)

   log.Printf("[worker::foo] %+v\n", payload)

   payload.Name = "fooWasHere"
   payload.Value += 1

   event.UpdatePayload(&payload)
   client.CompleteJob(event)
}

We created initially our payload with CreateWorklowPayload object and now we are reading it with FooJobPayload object - wat? The reason for this is ioMapping! Couple of things to note here:

  • Technically we can use the same object if there is no ioMapping
  • Those msgpack: "" tags on payload objects have to match that ioMapping from the process definition
  • Handling payload boils down to two actions: LoadPayload from the event into the user-defined object and UpdatePayload to the event before sending it to the broker

Why do we have both? Each method provides pros/cons and generally would be used in different stages of development. The typed way of dealing with payload is safer, more readable and maintainable but puts a bit more effort on the development team cause they have to define how the payload will look like ahead of time. Other, map[string]interface{} / JSON way of dealing with payload is less safe and more cumbersome but puts less constraint on the development team, so it’s useful in the prototyping stage since it will always give you all the data (without type-assertion) which comes in your worker.

You can find this example on develop branch, so don’t forget to checkout that branch on your GOPATH before you try this out. Hope I made some things a bit clearer. If not, feel free to talk to me on Slack.

p.s. When in doubt, always type it and never panic. :wink:

1 Like

Hi Sam.

Thanks a lot for this example - that really helps and should be posted somewhere accessible - best with parts of the description of this post as a readme :slight_smile:

For the payload: Why not have JSON as a String? I actually do something pretty similar to what you do - but in the HTTP handler I would like to pass the JSON as I get it right into the workflow instance. No conversion, not data type checking - just pass the JSON payload as it is. How can I do this?
Having a simple query parameter is rather a seldom case to be honest :wink:

Thanks again
Cheers
Bernd

Hey Bernd,

Easiest way to do this would be this:

    payload := make(map[string]interface{})
	json.Unmarshal([]byte(`{"a":"Unicorn","name":"Pickle"}`), &payload)
	instance := zbc.NewWorkflowInstance("demoProcess", zbc.LatestVersion, payload)

Thanks!
Another question: How can I adjust the number of retries when indicate a job has failed?

In java:
client.newFailCommand(job).retries(job.getRetries()-1).send();

In go?

Is there a good place where I can search for these API methods? Haven’t found any parameter to achieve this when sneaking through the sources…

Thanks again
Bernd

I cancel the last question, as retries are decreased as default which is fine for me at the moment.

I wondered how I can manually increase retries (e.g. as an operator) like in Java
client.topicClient().jobClient().newUpdateRetriesCommand(event) - but I don’t need that right away so I can very well wait for a rewamped GoLang client.

Just FYI - working with this code for the moment: https://github.com/flowing/flowing-retail/blob/master/rest/go/payment-zeebe/v3/main.go