Wrappers
Wrappers encapsulate executeable resources. Executing a wrapped resource will instead run the wrapper, which can coordinate the execution of the wrapped resource.
Use Cases
Use wrappers to
- Make sure something is always executed before and/or after.
- Automatically retry an execution in case of an error. See Retries.
- Send notifications in case of an error. See Notifications.
- Provision resources on-the-fly when they are accessed.
- Cleanup resources after they were accessed.
- Execute a flow interactively. See Interactive Mode.
Concept
The following resource types can be wrapped:
- Flow
- Schedule
- Scheduler
- Connection
- Wrapper
In the example below we use a wrapped flow. The same concept also applies for all other resource types which can be wrapped.
When a user runs a flow which is wrapped, an execution of the wrapper script is started instead. The wrapper script is provided with the information of how to start the flow and its inputs. The wrapper script is then responsible for starting the flow which the user originally wanted to run.
The wrapper script can have arbitrary logic before / around / after starting the flow. The wrapper script can modify the inputs for the flow, it can modify the outputs of the flow, it can start the flow multiple times, or the wrapper can also decide not to start the flow at all.
Wrapping resources
Resources can be wrapped statically. This means the wrapper is attached to the definition of the resource and always applies when the resource is used. The caller does not need to know if there is a wrapper. This is useful when certain functionality should always apply when the resource is used.
Another method is to apply wrappers dynamically when using a resource. A caller can specify a wrapper for a particular child execution. The wrapper applies for this particular use of the resource. In other places the same resource can be used without the wrapper.
Static wrappers
To register a static wrapper on a resource, open the resource in the frontend, click "Add" and select the wrapper you want from the dropdown.
When multiple static wrappers are registered, they are executed from top to bottom. To change their order, simply drag and drop them.
In this case first the interactive
will be executed then the retry
You can configure the parameters of each static wrapper by clicking on the wrapper. They are then passed to the wrapper script next to the child
input_value.
The configration form of the static wrapper
Dynamic wrappers
To use a wrapper dynamically use the following syntax in your script:
this.using('<name-of-a-wrapper>').flow('<name-of-a-flow>')
# or
this.using('<name-of-a-wrapper>').connect('<name-of-a-connector>')
To use multiple dynamic wrappers
this.using(
'<name-of-a-wrapper>'
).using(
'<name-of-another-wrapper>'
).flow(
'<name-of-a-flow>'
)
Specify parameters for a wrapper
this.using(
'<name-of-a-wrapper>',
parameter='for-the-wrapper',
).flow(
'<name-of-a-flow>',
another_parameter='for-the-flow',
)
Multiple wrappers
It is possible to specify multiple static wrappers, multiple dynamic wrappers and both at the same time. The wrappers will be applied in the following order:
- dynamic wrappers in the order they were specified
- static wrappers in ascending
order
A chain of wrapped executions
# Flow 1
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
this.using(
'Dynamic Wrapper 1'
).using(
'Dynamic Wrapper 2'
).flow('Flow 2')
Static wrappers of Flow 2:
- Static Wrapper 1, order: 10
- Static Wrapper 2, order: 20
The following chain of executions will run when Flow 1
is started:
Examples
We provide several example wrappers as part of the Wrapper bundle.
Modify inputs
Modify the inputs of a wrapped execution
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
# child contains all information needed to start
# the child execution, including its input_value
output_value = this.child(
**child,
# we can override any input by specifying it later
parameter='overridden',
).get('output_value')
# we need to store the output_value of the child in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')
Remove an input passed to the wrapped execution
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
# child contains all information needed to start
# the child execution, including its input_value
# we can remove any input by removing it from the `child` dictionary
child.pop('parameter')
output_value = this.child(**child).get('output_value')
# we need to store the output_value of the child in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')
Modify outputs
Modify outputs returned by a wrapped execution
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
output_value = this.child(**child).get('output_value')
# we can modify and remove outputs as needed
output_value.pop('parameter-to-remove')
output_value['parameter'] = 'new-value'
# we need to store the final output_value in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')
Starting child more than once
Start a child execution in a loop until success
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
for _ in range(5): # let's try at most 5 times
try:
output_value = this.child(**child).get('output_value')
except flow_api.DependencyFailedError:
# let's wait a second before retrying
this.sleep(1)
continue
else:
break # child succeeded, let's exit the loop
else:
return this.error('child did not succeed after 5 tries')
this.save(output_value=output_value)
return this.success('all done')
Not starting the child
Only starting the child outside of working hours, otherwise return the last output_value
import datetime
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
input_value = this.get('input_value')
child = input_value['child']
now = datetime.datetime.today()
if now.hour < 9 or now.hour > 17:
# outside working hours. let's start the child
output_value = this.child(**child).get('output_value')
# store the output_value in a setting
system.setting('last-child-output').save(value=output_value)
else:
# within working hours, we do not start the child
# load the last output from the setting
output_value = system.setting('last-child-output').get('value')
this.save(output_value=output_value)
return this.success('all done')