Skip to main content
Version: 9 - Germknödel

ConnectorTypeAZUREAI

class connector_types.connector_type_azureai.ConnectorTypeAZUREAI

Call an Azure AI API.

See https://learn.microsoft.com/en-us/python/api/?view=azure-python&term=azure-ai

Input Schema

  • authentication

    The authentication method.

    Type: anyOf

  • endpoint

    Supported Cognitive Services endpoints (protocol and hostname, for example: https://westus2.api.cognitive.microsoft.com.

    Type: string

  • api_version

    The API version to use for the request.

    Type: anyOf

  • client

    The name of the Azure AI client to use. E.g. "documentanalysis".

    Type: string

  • method

    The name of the method to call on the collection. E.g. "begin_analyze_document".

    Type: string

  • kwargs

    Keyword arguments to pass to the method call.

    Type: object

    Additional Properties: True

    Pattern Properties:

    • .*

      Data

Output Schema

  • result

    Data

Constants

SUPPORTED_CLIENTS = ['documentanalysis', 'documentmodeladministration'] ssl_context_inputs = ['check_hostname', 'client_cert', 'client_key', 'server_ca']

Example

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
analyze_result = this.connect(
connector_type='AZUREAI',
authentication={
'authentication_method': 'azure_ad',
client_id='<client_id>',
client_secret='<client_secret>',
tenant_id='<tenant_id>',
},
endpoint='<endpoint>',
api_version='<api_version>',
client='documentanalysis',
method='begin_analyze_document',
kwargs={
'model_id': '<model_id>',
'document': 'cloudomation:file-name.pdf',
'pages': '1-3,5-6',
'locale': 'de',
},
).get('output_value')['result']
this.log(analyze_result=analyze_result)
return this.success('all done')

More

Key authentication

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
analyze_result = this.connect(
connector_type='AZUREAI',
authentication={
'authentication_method': 'key',
'key'='<key>',
},
...
).get('output_value')['result']
this.log(analyze_result=analyze_result)
return this.success('all done')

Pass document bytes directly

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
document_bytes = b'...'
analyze_result = this.connect(
connector_type='AZUREAI',
authentication={
'authentication_method': 'azure_ad',
client_id='<client_id>',
client_secret='<client_secret>',
tenant_id='<tenant_id>',
},
endpoint='<endpoint>',
api_version='<api_version>',
client='documentanalysis',
method='begin_analyze_document',
kwargs={
'model_id': '<model_id>',
'document': document_bytes,
'pages': '1-3,5-6',
'locale': 'de',
},
).get('output_value')['result']
this.log(analyze_result=analyze_result)
return this.success('all done')