Skip to main content
Version: 11 - Zimtschnecke

ConnectorTypeAZURE

class connector_types.connector_type_azure.ConnectorTypeAZURE

Call the Azure API.

See https://learn.microsoft.com/en-us/python/api/?view=azure-python

Client resource exposes resource_groups, resources, providers, tags, operations, provider_resource_types (ResourceManagementClient) and deployments, deployment_operations (DeploymentsMgmtClient from azure-mgmt-resource-deployments) so existing code works unchanged with azure-mgmt-resource v25+.

Input Schema

  • schema_version = '10.0'

    Type: string

  • authentication

    Type: anyOf

  • subscription_id

    The Azure subscription ID.

    Required for ARM/management clients (compute, network, resource, storage, appcontainers, containerregistry). Not used for Key Vault data-plane clients.

    Type: string

  • vault_url

    Required when using Key Vault clients (keyvault_secrets, keyvault_certificates, keyvault_keys). Example: "https://my-vault.vault.azure.net/".

    Type: string

  • mode

    Type: anyOf

Output Schema

Constants

SUPPORTED_CLIENTS = ['appcontainers', 'compute', 'containerregistry', 'keyvault_certificates', 'keyvault_keys', 'keyvault_secrets', 'network', 'resource', 'storage']

Example

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
virtual_machine = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
subscription_id='...',
mode={
'mode_name': 'call_client_collection_method',
'client': 'compute',
'collection': 'virtual_machines',
'method': 'begin_create_or_update',
'kwargs': {
'resource_group_name': '...',
'vm_name': '...',
'parameters': ...
},
},
).get('output_value')['result']
this.log(vm_id=virtual_machine['id'])
return this.success('all done')

Azure Container Apps (management-plane)

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
apps = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
subscription_id='...',
mode={
'mode_name': 'call_client_collection_method',
'client': 'appcontainers',
'collection': 'container_apps',
'method': 'list_by_resource_group',
'kwargs': {'resource_group_name': '...'},
},
).get('output_value')['result']
this.log(apps=apps)
return this.success('all done')

Azure Container Registry (management-plane)

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
registries = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
subscription_id='...',
mode={
'mode_name': 'call_client_collection_method',
'client': 'containerregistry',
'collection': 'registries',
'method': 'list',
},
).get('output_value')['result']
this.log(registries=registries)
return this.success('all done')

Key Vault Secrets (data-plane)

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
secret = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
vault_url='https://my-vault.vault.azure.net/',
mode={
'mode_name': 'call_client_collection_method',
'client': 'keyvault_secrets',
'collection': '',
'method': 'get_secret',
'kwargs': {'name': 'my-secret'},
},
).get('output_value')['result']
this.log(secret=secret)
return this.success('all done')

Key Vault Certificates (data-plane)

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
cert = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
vault_url='https://my-vault.vault.azure.net/',
mode={
'mode_name': 'call_client_collection_method',
'client': 'keyvault_certificates',
'collection': '',
'method': 'get_certificate',
'kwargs': {'certificate_name': 'my-cert'},
},
).get('output_value')['result']
this.log(cert=cert)
return this.success('all done')

Key Vault Keys (data-plane)

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
key = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
vault_url='https://my-vault.vault.azure.net/',
mode={
'mode_name': 'call_client_collection_method',
'client': 'keyvault_keys',
'collection': '',
'method': 'get_key',
'kwargs': {'name': 'my-key'},
},
).get('output_value')['result']
this.log(key=key)
return this.success('all done')