ConnectorTypeAZURE
class connector_types.connector_type_azure.ConnectorTypeAZURE
Call the Azure API.
See https://learn.microsoft.com/en-us/python/api/?view=azure-python
Client resource exposes resource_groups, resources, providers, tags, operations,
provider_resource_types (ResourceManagementClient) and deployments, deployment_operations
(DeploymentsMgmtClient from azure-mgmt-resource-deployments) so existing code works
unchanged with azure-mgmt-resource v25+.
Input Schema
-
schema_version = '10.0'Type:
string -
authenticationType:
anyOfOptions: -
subscription_idThe Azure subscription ID.
Required for ARM/management clients (compute, network, resource, storage, appcontainers, containerregistry). Not used for Key Vault data-plane clients.
Type:
string -
vault_urlRequired when using Key Vault clients (keyvault_secrets, keyvault_certificates, keyvault_keys). Example: "https://my-vault.vault.azure.net/".
Type:
string -
modeType:
anyOfOptions:
Output Schema
Constants
SUPPORTED_CLIENTS = ['appcontainers', 'compute', 'containerregistry', 'keyvault_certificates', 'keyvault_keys', 'keyvault_secrets', 'network', 'resource', 'storage']Example
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
virtual_machine = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
subscription_id='...',
mode={
'mode_name': 'call_client_collection_method',
'client': 'compute',
'collection': 'virtual_machines',
'method': 'begin_create_or_update',
'kwargs': {
'resource_group_name': '...',
'vm_name': '...',
'parameters': ...
},
},
).get('output_value')['result']
this.log(vm_id=virtual_machine['id'])
return this.success('all done')
Azure Container Apps (management-plane)
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
apps = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
subscription_id='...',
mode={
'mode_name': 'call_client_collection_method',
'client': 'appcontainers',
'collection': 'container_apps',
'method': 'list_by_resource_group',
'kwargs': {'resource_group_name': '...'},
},
).get('output_value')['result']
this.log(apps=apps)
return this.success('all done')
Azure Container Registry (management-plane)
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
registries = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
subscription_id='...',
mode={
'mode_name': 'call_client_collection_method',
'client': 'containerregistry',
'collection': 'registries',
'method': 'list',
},
).get('output_value')['result']
this.log(registries=registries)
return this.success('all done')
Key Vault Secrets (data-plane)
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
secret = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
vault_url='https://my-vault.vault.azure.net/',
mode={
'mode_name': 'call_client_collection_method',
'client': 'keyvault_secrets',
'collection': '',
'method': 'get_secret',
'kwargs': {'name': 'my-secret'},
},
).get('output_value')['result']
this.log(secret=secret)
return this.success('all done')
Key Vault Certificates (data-plane)
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
cert = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
vault_url='https://my-vault.vault.azure.net/',
mode={
'mode_name': 'call_client_collection_method',
'client': 'keyvault_certificates',
'collection': '',
'method': 'get_certificate',
'kwargs': {'certificate_name': 'my-cert'},
},
).get('output_value')['result']
this.log(cert=cert)
return this.success('all done')
Key Vault Keys (data-plane)
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
key = this.connect(
connector_type='AZURE',
authentication={
'authentication_method': 'azure_ad',
'client_id': '...',
'client_secret': '...',
'tenant_id': '...',
},
vault_url='https://my-vault.vault.azure.net/',
mode={
'mode_name': 'call_client_collection_method',
'client': 'keyvault_keys',
'collection': '',
'method': 'get_key',
'kwargs': {'name': 'my-key'},
},
).get('output_value')['result']
this.log(key=key)
return this.success('all done')