Skip to main content
Version: 9 - Germknödel

Flows

Flows contain your automation logic in the form of flow scripts. With them, you define what you want to automate and how to do so. This makes them your most important resource in Engine.

The flow script is written in Python. Executions of flows will process the handler function of the flow script.

Depending on the complexity of your automated process, it can consists of one or many flows. Executions of flows can start executions of other flows and pass inputs and outputs between each other.

important

You don't need to know Python to use Engine. Engine provides many features that make it possible to run and monitor flows without coding experience. (e.g. Dependency Visualisation, Script Visualisation, Execution Live Monitor)

Creating or modifying flows, on the other hand, requires some knowledge of Python.

tip

Small flows with well-defined scope can can be reused bydby many automated processes. We recommend breaking automated processes down into several flows which automate specific tasks. In addition to simpler re-use, it also makes it easier to maintain and extend your automated processes.

Working with Flows

You can create flows in the User Interface by pressing the "Create" button and selecting "Flow":

The buttons to create a flow

The new flow is opened and you can directly modify the script.

The flow script editor provided in the Engine user interface comes with built-in Linting and supports Breakpoints and Step Through. If you prefer, you can also use an offline editor of your choice.. You can synchronise your offline work with Engine using the Git Integration.

Engine does not store older versions of your flows when making changes. Make sure to have a backup of your flows if you make significant changes. Or even better: use the Git Integration.

Clicking the "Run" button in the flow view will create an execution of the flow and automatically redirect you to the execution view. Read more about executing flows in the documentation on Development and Productive Mode.

Interaction between Flows

Parent-child relationships

  • Executions of flows can create executions of other flows.
  • When an execution creates another execution, a parent-child relation is established between them.
  • Each execution is represented in a tree structure, where it can have only one parent and may have multiple children.
  • Executions without a parent form the trunk of the tree and appear as top-level executions in the Execution Live Monitor.
important

Parent-child relationships are independent of dependencies. However, dependencies can mostly align with parent-child relationships.

Executions of flows can create executions of other flows.

example

Create a child execution.

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
my_flow = this.flow('my_other_flow_name')
return this.success('all done')

When an execution creates another execution, a parent-child relation is established between them:

The parent execution lists all children in the "Children" tab

The child execution has a button to navigate to the parent

These relationships are primarily used to track logical and hierarchical connections between executions, aiding in monitoring, debugging, and traceability across flows.

Dependencies

  • Executions of flows can be dependent on other executions.
  • When an execution waits for another execution a dependency relation is established between them.
  • Dependencies are represented as a directed graph, where each execution can depend on multiple others and, in turn, can be a dependency for multiple executions.
  • Dependencies are shown in the dependency visualization in each execution (see more here).
important

The dependency visualization is designed to simplify understanding execution order and coordination.

While it can include parent-child relationships, it is not a full representation of parent-child hierarchies.

It currently only shows the dependencies of the currently opened execution and (if it exists) one level up of dependee executions (see more here).

Take a look at the below flow scripts to learn more about how to create a dependency relation between executions in different ways.

example

A dependency relation between executions can be created implicitly:

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
child_execution = this.flow(
'another-flow',
run=True # Main execution has to wait for this child execution
)
return this.success('All done.')
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
child_execution = this.flow(
'another-flow',
run=False # Main execution does not wait for the child execution (not a dependency)
)
child_execution.run(wait=True) # Wait for the child execution later on, creates the dependency
return this.success('All done.')

A dependency relation between executions can be created explicitly:

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
child_execution = this.flow(
'another-flow',
run=False # Main execution does not wait for the child execution (not a dependency)
)
this.dependency(child_execution) # Dependency is later defined explicitly
return this.success('All done.')

To add dependencies to executions started by other executions:

In this example the current execution waits for the other executions in the loop one by one. It starts with the first execution found if that one fails, the current execution will also fail immediately. If the first execution succeeds, it moves on to wait for the next one, and so on until all are processed.

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
# find an execution
for execution in system.executions(filter_=...):
this.dependency(execution) # execution could be created by some other flow
return this.success('All done.')

It is also possible to wait for all executions at once:

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
this.dependency(*system.executions(filter_=...))
return this.success('All done.')

The table below provides a summary of key conceptual differences between parent-child relationships and dependencies.

AspectParent-ChildDependency
EstablishmentEstablished when an execution (parent) creates a new execution (child)Established when an execution waits for one or more other executions
PurposeTracks creation/origin of executionsDefines execution order and coordination
StructureTree (1 parent, many children)Directed graph (many-to-many)
DirectionalityOne-way (creator → created)One-way (dependent → dependee)
MultiplicityOne parent, multiple childrenMultiple dependencies and dependees

Validating flow existence

Referencing a flow does not validate its existence. Even if a flow with the provided name or ID does not exist, the Flow API will not throw an error.

important

For example:

this.log(system.flow('some-name')) 

This will log <flow some-name>, even if the flow 'some-name' does not exist or if 'some-name' does not refer to an actual flow.

Therefore, it is necessary to explicitly validate the existence of a flow before performing updates or modifications through the Flow API.

example

How to access and validate if the flow actually exists

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
flow = system.flow('example_flow_name')
if flow.exists():
this.log("flow exists!")
return this.success('all done')
else:
return this.error("flow does not exist!")

Passing values

Interactions between flows can be leveraged to build large and complex processes from small and maintainable elements.

Let’s look at how you can break a process into parts. Note that we pass values between flows multiple times, and one flow’s result influences what the other flow does.

example 1: passing values

Flow 1 - Main Process

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
initial_value = 42

new_value = this.flow(
'Calculator',
name='calculate new value',
initial_value=initial_value, # passing initial_value to the flow 'Calculator'
).get('output_value')['result'] # getting 'result' field of the output_value of the flow 'Calculator'

this.flow(
'Messenger',
name='create message with result',
message_value=new_value, # passing new_value to the flow 'Messenger'
)
return this.success('all done')

Flow 2 - Calculator

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
# getting initial_value from the 'initial_value' field of the input_value
initial_value = inputs['initial_value']

# doing some (not very) complex calculations
result = initial_value * 1

# saving result into the 'result' field of the output_value
this.set_output(result=result)

return this.success('all done')

Flow 3 - Messenger

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
# getting message_value from the 'message_value' field of the input_value
message_value = inputs['message_value']

# creating message
this.message(
subject='Message',
message=f' The value is: {message_value}',
)

return this.success('all done')

In the previous example we passed numbers as values. It is also possible to pass Cloudomation Resources.

example 2: passing Cloudomation resources

Flow 1 - Main Process

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
# get a Cloudomation resource
setting = system.setting('my-setting')

# pass the Cloudomation resource to the flow
this.flow(
'Setting Writer',
name='write setting',
setting=setting,
)
return this.success('all done')

Flow 2 - Setting Writer

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
# get the passed Cloudomation resource
setting = inputs['setting']

# write the setting
setting.save(value='my value')

return this.success('all done')
note

These examples are, of course, very simple, and it might be easier to implement all functionality within a single flow. Breaking processes into smaller flows becomes increasingly relevant as complexity grows and when considering future scalability.

Handling Errors

If a child execution fails, it will raise an exception, causing the parent execution to fail as well. To avoid the parent failing, you can catch the exception:

example

Create a child execution and handle errors.

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
try:
my_flow = this.flow('my_other_flow_name')
except flow_api.DependencyFailedError:
this.log('my_other_flow_name failed')
else:
this.log('my_other_flow_name succeeded')
return this.success('all done')

Please refer to Exceptions for details about error handling.

Immediate execution control

The run parameter allows the user to control whether the child flow should instantly run or not.

  • True: The flow runs immediately.
  • False: The flow execution is created but not started.

In the example below, you can observe this behavior. The script runs the flow child_flow_name which creates a child execution of it. Since the run parameter is set to False the execution status will be initially "Paused" until a manual start.

example
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
# Create child execution but do not run it yet
child_execution = this.flow(
'another-flow',
run=False # Execution is not started immediately
)
# Start execution later
child_execution.run(wait=True)
this.log('Flow executed after manual start.')
return this.success('Manual execution started successfully.')

Synchronous execution

Synchronous execution ensures that the script waits for the child execution to complete before proceeding. In the example below, since the parent execution waits for the child execution to finish, it is possible to determine whether the child execution completed successfully.

To control whether the interaction between flows are synchronous or asynchronous use the wait parameter of the run() method.

This parameter is passed when starting a child execution (e.g., run(wait=False)) and it determines whether the parent execution will wait immediately for a child execution to complete after starting it.

If wait=True the parent will pause and wait for the specific child to finish executing before continuing with the next line of code in the parent. This creates a dependency between them and causes the parent execution to behave synchronously with the child.

example
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
child_execution = this.flow(
'sync_flow',
wait=True # Ensures that the parent waits for the child execution to finish
)
if child_execution.get('status') != 'ENDED_SUCCESS':
return this.error('Child flow execution failed!')

this.log('Synchronous execution completed successfully!')
return this.success('Flow executed synchronously')

Asynchronous execution

Asynchronous execution enables the parent execution to continue without waiting for the child execution to complete. This can be done via the wait=False parameter.

If wait=False the parent will not wait for the child execution to complete immediately. It allows the child to run asynchronously, while the parent execution continues from the subsequent lines of code.

example
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
async_execution = this.flow(
'async_flow',
wait=False # Proceed without waiting for the child execution
)
this.log(f'Async flow triggered.')
return this.success('Flow executed asynchronously')

Wait for multiple executions to end

When triggering multiple asynchronous executions, you can wait for all of them to complete by using the return_when parameter with the wait_for() method on the execution.

Unlike the wait parameter, return_when gives you control after multiple asynchronous flows have been started.

This provides more flexibility in managing and customizing the waiting behavior for multiple concurrent executions. return_when only matters if you have already started child flows asynchronously wait=False and want to control further execution based on their status.

example

wait=False starts both child flows asynchronously without waiting for them to finish. this.wait_for(..., return_when='ALL_ENDED') controls when the parent resumes execution: In this case, the parent waits until both flows finish.

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
async_child_1 = this.flow('flow_1', wait=False)
async_child_2 = this.flow('sync_flow', wait=False)

this.wait_for(
async_child_1,
async_child_2,
return_when='ALL_ENDED' # Options: ALL_ENDED, ALL_SUCCEEDED, FIRST_ENDED
)
this.log('All children executions have completed.')
return this.success('Synchronized all async flows successfully')

Interaction with other services: connectors

In order to connect with services outside of the Engine platform, you can create ad-hoc connections directly from a flow script using the this.connect() method, as shown in the example below.

This allows you to define and trigger a connection to an external system on the fly without relying on a preconfigured connector.

Creating a connection execution from your flow is simple:

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
my_connection = this.connect(
connector_type='GIT',
command='get',
repository_url='https://example.com/path/to/repo.git',
)
# read the outputs of the connection and log them
this.log(my_connection.get('output_value'))

return this.success('all done')

Alternatively, you can use an existing connector to create a connection. When you create a connection using a connector, the base configuration stored in the connector is applied automatically.

However, it's also possible to override any of the connector's stored configuration by providing parameters directly in the flow script. These inline parameters will take precedence over the corresponding values defined in the connector for that specific execution, providing flexibility to adjust behavior dynamically without modifying the connector itself.

To explore the different types of connectors available, see the Connector Types.

Specifying location inheritance

The location_inheritance option determines how location associations are handled when a flow is executed.

Location inheritance options

  • created_by: The flow execution inherits the same location as the creating identity.
  • default: Defaults to the workspace-level "Default Project" or "Workspace" for records that cannot belong to a project.
  • wrapped_resource: This location is tied to the innermost wrapped resource (specific to executions based on a resource, not ad-hoc).
  • resource: The location is associated with the resource on which the execution is based (if applicable).

When location_inheritance is unset for an execution, the default behavior depends on the identity that triggered the execution. If the execution is triggered by any identity type other than an execution, the default inheritance is wrapped_resource. If the triggering identity is an execution, the default inheritance is created_by.

Use cases for inheritance options

It might be helpful to review the example scenarios below to understand when to use which location inheritance type

created_by

Use this type when a workflow is initiated by a manual trigger (user identity), and the flows must run within the same project or bundle associated with the user’s workspace settings.

example
child_execution = this.flow(
'archive-log-flow'
).run(
location_inheritance='created-by'
)

default

Use this type when you want to ensure the execution gets processed within the workspace-wide default location, regardless of the triggering identity.

example
child_execution = this.flow(
'archive-log-flow'
).run(
location_inheritance='default'
)

wrapped_resource

Use this type when the execution is based on a deeply wrapped resource and you need the location inheritance to maintain consistency with that specific resource.

example
child_execution = this.flow(
'api-data-processing-flow'
).run(
location_inheritance='wrapped_resource'
)

resource

Use this type when the execution is based on a resource and must inherit the location of the specific resource (e.g., a custom object) rather than another context.

example
child_execution = this.flow(
'equipment-operations-flow'
).run(
location_inheritance='resource'
)

Learn More

Connector Types
Third-Party Python Modules
Executions
Webhooks