Skip to main content
Version: 11 - TBD

Wrappers

Wrappers encapsulate executeable resources. Executing a wrapped resource will instead run the wrapper, which can coordinate the execution of the wrapped resource.

Use Cases

Use wrappers to

  • Make sure something is always executed before and/or after.
  • Automatically retry an execution in case of an error. See Retries.
  • Send notifications in case of an error. See Notifications.
  • Provision resources on-the-fly when they are accessed.
  • Cleanup resources after they were accessed.
  • Execute a flow interactively. See Interactive Mode.

Concept

The following resource types can be wrapped:

  • Flow
  • Schedule
  • Scheduler
  • Connection
  • Wrapper
note

In the example below we use a wrapped flow. The same concept also applies for all other resource types which can be wrapped.

When a user runs a flow which is wrapped, an execution of the wrapper script is started instead. The wrapper script is provided with the information of how to start the flow and its inputs. The wrapper script is then responsible for starting the flow which the user originally wanted to run.

The wrapper script can have arbitrary logic before / around / after starting the flow. The wrapper script can modify the inputs for the flow, it can modify the outputs of the flow, it can start the flow multiple times, or the wrapper can also decide not to start the flow at all.

Wrapping Resources

Resources can be wrapped statically. This means the wrapper is attached to the definition of the resource and always applies when the resource is used. The caller does not need to know if there is a wrapper. This is useful when certain functionality should always apply when the resource is used.

Another method is to apply wrappers dynamically when using a resource. A caller can specify a wrapper for a particular child execution. The wrapper applies for this particular use of the resource. In other places the same resource can be used without the wrapper.

Static Wrappers

To register a static wrapper on a resource, open the resource in the frontend, click "Add" and select the wrapper you want from the dropdown.

When multiple static wrappers are registered, they are executed from top to bottom. To change their order, simply drag and drop them.

In this case first the interactive will be executed then the retry

You can configure the parameters of each static wrapper by clicking on the wrapper. They are then passed to the wrapper script next to the child input_value.

The configration form of the static wrapper

Dynamic Wrappers

To use a wrapper dynamically use the following syntax in your script:

this.using('<name-of-a-wrapper>').flow('<name-of-a-flow>')
# or
this.using('<name-of-a-wrapper>').connect('<name-of-a-connector>')

To use multiple dynamic wrappers

this.using(
'<name-of-a-wrapper>'
).using(
'<name-of-another-wrapper>'
).flow(
'<name-of-a-flow>'
)

Specify parameters for a wrapper

this.using(
'<name-of-a-wrapper>',
parameter='for-the-wrapper',
).flow(
'<name-of-a-flow>',
another_parameter='for-the-flow',
)

Multiple Wrappers

It is possible to specify multiple static wrappers, multiple dynamic wrappers and both at the same time. The wrappers will be applied in the following order:

  1. dynamic wrappers in the order they were specified
  2. static wrappers in ascending order
example

A chain of wrapped executions

# Flow 1
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
this.using(
'Dynamic Wrapper 1'
).using(
'Dynamic Wrapper 2'
).flow('Flow 2')

Static wrappers of Flow 2:

  • Static Wrapper 1, order: 10
  • Static Wrapper 2, order: 20

The following chain of executions will run when Flow 1 is started:

Examples

We provide several example wrappers as part of the Wrapper bundle.

Modify Inputs

example

Modify the inputs of a wrapped execution

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
# child contains all information needed to start
# the child execution, including its input_value
output_value = this.child(
**child,
# we can override any input by specifying it later
parameter='overridden',
).get('output_value')
# we need to store the output_value of the child in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')
example

Remove an input passed to the wrapped execution

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
# child contains all information needed to start
# the child execution, including its input_value
# we can remove any input by removing it from the `child` dictionary
child.pop('parameter')
output_value = this.child(**child).get('output_value')
# we need to store the output_value of the child in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')

Modify Outputs

example

Modify outputs returned by a wrapped execution

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
output_value = this.child(**child).get('output_value')
# we can modify and remove outputs as needed
output_value.pop('parameter-to-remove')
output_value['parameter'] = 'new-value'
# we need to store the final output_value in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')

Starting Child More than Once

example

Start a child execution in a loop until success

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
for _ in range(5): # let's try at most 5 times
try:
output_value = this.child(**child).get('output_value')
except flow_api.DependencyFailedError:
# let's wait a second before retrying
this.sleep(1)
continue
else:
break # child succeeded, let's exit the loop
else:
return this.error('child did not succeed after 5 tries')
this.save(output_value=output_value)
return this.success('all done')

Not Starting the Child

example

Only starting the child outside of working hours, otherwise return the last output_value

import datetime
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
now = datetime.datetime.today()
if now.hour < 9 or now.hour > 17:
# outside working hours. let's start the child
output_value = this.child(**child).get('output_value')
# store the output_value in a setting
system.setting('last-child-output').save(value=output_value)
else:
# within working hours, we do not start the child
# load the last output from the setting
output_value = system.setting('last-child-output').get('value')
this.save(output_value=output_value)
return this.success('all done')

Defining Where the Child Runs

Let's build on the previous example to see how you can specify the project in which the child execution should run.

This is done by defining the project_inheritance key in the init parameter of the child.

project_inheritance can be on of:

  • parent_execution - the child execution runs in the same project as that of the parent_execution,
  • resource - the child execution runs in the same project where the child resource is located,
  • explicit - no inheritance, the project_id of the project where the child runs needs to be set explicitly in the init.
example

Specify in which project the child runs

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
child = inputs['child']
# child contains all information needed to start
# the child execution, including its input_value

# we specify the project_inheritance
project_inheritance = 'explicit'
project_id = system.project('my project').get('id')

child['init']['project_inheritance'] = project_inheritance
child['init']['project_id'] = project_id

output_value = this.child(
**child,
).get('output_value')
# we need to store the output_value of the child in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')

Learn More

Notifications
Retries
Interactive Mode
Caching
Wrapper bundle