Event based automations

Hi there,

I would like to inherit the event-based communication to trigger automations
I know Corteza has message-bus … but the docs are not enough to properly set it up

so how would I set it up, and use it with automations?
also, if I wanna extend it to an external message-bus like Kafka for example … is that possible ?

thanks

Hi @munawir,

we currently do not have a queue consumer that would talk directly to Kafka.

But! We do run the messagebus system in production in an event orchestration architecture in many different scenarios.

We do not have experience with Kafka (yet!), but I quickly googled for some alternative and there is an HTTP REST proxy for Kafka (Confluent REST Proxy API Reference | Confluent Documentation), so I’m basing this example on this premise.

This is an example scenario you could use in that direction:

  1. Create a queue which we will read from
  2. Write a custom payload to this queue
  3. Read from queue and parse the payload to our “Any” expression
  4. Create an HTTP request to an arbitrary Kafka REST API server
  5. Create a mock API endpoint in Integration Gateway
  6. Preview the sent data in Integration Gateway Profiler

Corteza subsystems used in this example:

Workflows:
queue_wf.zip (2.0 KB)

Create a queue

Write to queue

This is a workflow we will use just to manually trigger the write to queue.

Note the payload used as

toJSON([
  {"key":"foo", "value": "bar"},
  {"key":"bar", "value": "baz"}
])

Read from queue

Here we parse the queue payload and create a custom HTTP request to Kafka (in our example an endpoint we will create for this purpose only).

Create a mock Kafka endpoint

What you need now is to trigger the Queue Write workflow and a request will be sent to our local server on the /api/gateway/example_kafka endpoint.

The resulting request can be seen on the Integration Gateway profiler (if you have it enabled, see more at Route Profiler :: Corteza Docs).

Preview the request


Hope that helped and thanks for kicking my butt for making this post as a basis for an example in the official docs.

2 Likes

couldn’t wait to have this on the docs … thanks @peter

so to validate my understanding of the big picture here
the message-bus operates inside Corteza ecosystem and to push its messages outside for (Kafka, RabbidMQ…etc) I need to read it and push it using HTTP request

I have a couple of questions here

  • to push each message to an external message-bus using HTTP would that have a performance issue?
    for example, if I have Module X and I have the wirte-to-queue workflow runs on AfterUpdate. if I updated 10K of that module I would get 10K runs of write-to-queue and 10K runs of read-from-queue and 10K http requests that push the messages … so my system would get in total 40K of HTTP requests in the relatively same time … hope this is clear.
  • what’s the characteristic of the message-bus?
    • will the message-bus write each message to the DB to keep it till the message consumed?
    • can I consume a message multiple times? or does it get deleted after it consumed consume ??
  • lastly, any inputs on how can we properly monitor each message pushed and consumed from the bus?

let me know if you need more explanation … thanks

the message-bus operates inside Corteza ecosystem and to push its messages outside for (Kafka, RabbidMQ…etc) I need to read it and push it using HTTP request

yes, I used the HTTP request because we have (so far) no workflow function to natively push to Kafka (which is probably just the same thing, just a useful wrapper)

to push each message to an external message-bus using HTTP would that have a performance issue?

not a big performance hit, but a slightly bigger than using native Kafka writer (ie, messagebus kafka consumer)

for example, if I have Module X and I have the wirte-to-queue workflow runs on AfterUpdate. if I updated 10K of that module I would get 10K runs of write-to-queue and 10K runs of read-from-queue and 10K http requests that push the messages … so my system would get in total 40K of HTTP requests in the relatively same time … hope this is clear.

with 10k record updates you would only get 10k HTTP requests, as every push to queue amounts to 1 read from queue, which means 1 push via HTTP

will the message-bus write each message to the DB to keep it till the message consumed?

queues have an internal mechanism (not DB) in memory, and yes they keep listening until the message is consumed
you cannot consume the same message multiple times + there is no profiler/list tool of any kind so far

Messagebus is a subsystem that keeps growing and will eventually have all these features, but time and priority limitations prevent us from further upgrades (short-term).
If you wish to implement the kafka messagebus consumer (which wouldnt be such a detailed task), you can create a PR.

If you have more time-sensitive deadlines, you can sponsor the development of the monitoring tool/kafka writer or any official support with @mia.arh

1 Like

valuable inputs @peter

so far we discussed rewriting the message to the external message-bus(Kafka) after we write it to the internal message-bus (Corteza) … what if we want directly connect the Corteza-server to the external message-bus(Kafka) … how to approach that ?

also, read messages from the external message-bus(Kafka) so I can make use of external systems events in Corteza

the idea here is to make Corteza-server part of my ecosystem, not just 3rd party service

follow up question here (maybe a generalized one too!)

why did you guys make a custom message-bus while there are plenty of well-known open-source solutions out there ? is it due to some performance constraints?

For what we needed the internal event bus to be, there was no need to use something more complex than a list we can register things onto

2 Likes

Hi @tjerman , In extension to the message queue workflow example. I tried to extract the specific item from json received in queue and do further processing like calling http request using that value. But unable to do so as when I try to expressionise the value it prints as is (eg: wanted to extract key (foo) from event info and call http request).

Any suggestions on this ?

Output:

INFO	workflow	event info: {"key":"foo","value":"bar"}
INFO	workflow	target info: event[0]["key"]