Skip to main content

ConnectorTypeK8S

class connector_types.connector_type_k8s.ConnectorTypeK8S

Interact with a kubernetes cluster.

This connector type enables you to interact with a remote kubernetes cluster. See the Official Python client library for kubernetes for a description of available APIs and methods.

Inputs

NameTypeDefaultDescription
apistrNoneThe API to use. E.g. "CoreV1Api". Either document or (api and method) must be specified
argslistNonePositional arguments to pass to the method call
certificatestrThe CA certificate of the kubernetes cluster
documentdictNoneA Kubernetes API object to apply. Either document or (api and method) must be specified
grantstrNonethe name of an oauth grant to use for authentication. Either token, key, or grant must be specified
hoststrThe hostname or IP of the kubernetes cluster to use
keydictNoneThe content of the key file of the service account to use. Either token, key, or grant must be specified
kwargsdictNoneKeyword arguments to pass to the method call
methodstrNoneThe method to call. E.g. "create_namespaced_deployment". Either document or (api and method) must be specified
parametersdictNoneDeprecated: Additional keyword parameters to pass to the method call
replaceboolFalseIf set to true, the resource will be replaced. Otherwise it will be patched, which means artifacts that have not changed will remain
tokenstrNoneThe bearer token to use for authentication. Either token, key, or grant must be specified

Outputs

NameTypeDefaultDescription
execution_idintThe ID of the connection execution
messagestrThe ended message for the connection. If the connection ended with an error, the message will contain information about what went wrong
resultdict
statusstrThe ended status for the connection. Either "success" or "error".

Constants

input_list = ['api', 'args', 'certificate', 'document', 'grant', 'host', 'key', 'kwargs', 'method', 'parameters', 'replace', 'token']output_list = ['result']version = 1

Methods

Example

import yaml
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution):
nginx_deployment_str = system.file('nginx-deployment.yaml').get_text_content()
nginx_deployment = yaml.safe_load(nginx_deployment_str)
this.connect(
connector_type='K8S',
token='*secret*',
host='my-kubernetes-cluster-master',
certificate='-----BEGIN CERTIFICATE-----\nMII...',
api='ExtensionsV1beta1Api',
method='create_namespaced_deployment',
kwargs={
'body': nginx_deployment,
},
)

return this.success('all done')