Skip to main content
Version: 9 - Germknödel

Flow Triggering, Inheritance, and Execution Options

Cloudomation scripts allow you to trigger other flows and manage their execution seamlessly. Below are examples you can integrate into your flow scripts based on different settings and operational requirements.

Running a flow

You can execute flows using the "Run" button in the flow API. Running a flow will create a unique execution of it. This execution can be monitored via the "Latest Executions" button at the top right corner of the user interface.

The "Run" button

The "Latest Executions" button

To view all executions, click 'Show All' in the top-right corner of the pop-up window that appears after selecting 'Latest Executions'.

It is also possible to execute a flow using the flow script. The flow() method is used to execute another flow. This provides flexibility in specifying the flow's ID or name and customizing execution options like location_inheritance.

example
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
child_execution = this.flow(
'target_flow_name', # Replace with your flow's name
location_inheritance='created_by' # Optional: Use the same location as the identity which creates the record
)
response = child_execution.get('output_value') # Retrieving outputs from the child flow
this.log(f'Received response: {response}')
return this.success('Flow execution completed')

Specifying location inheritance

The location_inheritance option in Cloudomation's flow_api.Execution.flow() method determines how location associations are handled when a flow is executed. Additionally, this method provides flexibility with parameters for roles, orphan children, savepoint retention, priorities, and more.

Location inheritance options

  • created_by: The flow execution inherits the same location as the creating identity.
  • default: Defaults to the workspace-level "Default Project" or "Workspace" for records that cannot belong to a project.
  • wrapped_resource: This location is tied to the innermost wrapped resource (specific to executions based on a resource, not ad-hoc).
  • resource: The location is associated with the resource on which the execution is based (if applicable).

When location_inheritance is unset for an execution, the default behavior depends on the identity that triggered the execution. If the execution is triggered by any identity type other than an execution, the default inheritance is wrapped_resource. If the triggering identity is an execution, the default inheritance is created_by.

Use cases for inheritance options

It might be helpful to go through the below example scenarios to understand when to use which location inheritance type

created_by

Use this type when a workflow is initiated by a manual trigger (user identity), and the flows must run within the same project or bundle associated with the user’s workspace settings.

example
child_execution = this.flow(
'billing-report-flow',
location_inheritance='created_by'
)

default

Use this type when you want to ensure the execution gets processed within the workspace-wide default location, regardless of the triggering identity.

example
child_execution = this.flow(
'archive-log-flow',
location_inheritance='default'
)

wrapped_resource

Use this type when the execution is based on a deeply wrapped resource and you need the location inheritance to maintain consistency with that specific resource.

example
child_execution = this.flow(
'api-data-processing-flow',
location_inheritance='wrapped_resource'
)

resource

Use this type when the execution is based on a resource and must inherit the location of the specific resource (e.g., a custom object) rather than another context.

example
child_execution = this.flow(
'equipment-operations-flow',
location_inheritance='resource'
)

Synchronous and asynchronous execution of flows

Synchronous execution

Synchronous execution ensures that the script waits for the child execution to complete before proceeding. In the example below, since the parent flow waits for the child flow to finish, it is possible to determine whether the child execution completed successfully.

example
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
child_execution = this.flow(
'sync_flow',
wait=True # Ensures that the parent waits for the child execution to finish
)
if child_execution.get('status') != 'ENDED_SUCCESS':
return this.error('Child flow execution failed!')

this.log('Synchronous execution completed successfully!')
return this.success('Flow executed synchronously')

Asynchronous execution

Asynchronous execution enables the parent flow to continue without waiting for the child flow to complete. This can be done via the wait=False parameter.

example
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
async_execution = this.flow(
'async_flow',
wait=False # Proceed without waiting for the child execution
)
this.log(f'Async flow triggered.')
return this.success('Flow executed asynchronously')

Wait for multiple executions to end

In cases where you trigger several asynchronous executions, you can wait until they all finish.

example
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
async_child_1 = this.flow('flow_1', wait=False)
async_child_2 = this.flow('flow_2', wait=False)

this.wait_for(
async_child_1,
async_child_2,
return_when='ALL_ENDED' # Options: ALL_ENDED, ALL_SUCCEEDED, FIRST_ENDED
)
this.log('All children executions have completed.')
return this.success('Synchronized all async flows successfully')

Immediate execution control

The run parameter allows the user to control whether the flow should instantly run or not.

  • True: The flow runs immediately.
  • False: The flow execution is created but not started.

In the below example it is possible to observe this behavior. The script runs the flow child_flow_name which creates a child execution of it. Since the run parameter is set to False the execution status will be initially "Paused" until a manual start.

example
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
# Create child execution but do not run it yet
child_execution = this.flow(
'child_flow_name',
run=False # Execution is not started immediately
)
# Start execution later
child_execution.run(wait=True) # Without this line, the status of the execution for child_flow_name will remain stuck in 'Paused'.
this.log('Flow executed after manual start.')
return this.success('Manual execution started successfully.')