ConnectorTypeK8S
class connector_types.connector_type_k8s.ConnectorTypeK8S
Interact with a kubernetes cluster.
This connector type enables you to interact with a remote kubernetes cluster. See the Official Python client library for kubernetes for a description of available APIs and methods.
Input Schema
-
hostThe hostname or IP of the kubernetes cluster to use.
Type:
string -
certificateThe CA certificate of the kubernetes cluster.
Type:
string -
documentA Kubernetes API JSON object to apply. Either
documentor (apiandmethod) must be specified. -
apiThe API to use. E.g. "CoreV1Api". Either
documentor (apiandmethod) must be specified.Type:
anyOfOptions: -
methodThe method to call. E.g. "create_namespaced_deployment". Either
documentor (apiandmethod) must be specified.Type:
anyOfOptions: -
replaceIf set to true, the resource will be replaced. Otherwise it will be patched, which means artefacts that have not changed will remain.
Type:
boolean -
argsPositional arguments to pass to the method call.
Type:
arrayItems: -
kwargsKeyword arguments to pass to the method call
Type:
objectAdditional Properties:
TruePattern Properties:
-
.*Data
-
-
tokenThe bearer token to use for authentication. Either
tokenorkeymust be specified.Type:
anyOfOptions: -
keyType:
anyOfOptions: -
parametersDEPRECATED. Replaced by
kwargs.
Output Schema
-
resultData
Constants
ssl_context_inputs = ['check_hostname', 'client_cert', 'client_key', 'server_ca']Example
import yaml
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
nginx_deployment_str = system.file('nginx-deployment.yaml').get_text_content()
nginx_deployment = yaml.safe_load(nginx_deployment_str)
this.connect(
connector_type='K8S',
token='*secret*',
host='my-kubernetes-cluster-master',
certificate='-----BEGIN CERTIFICATE-----\nMII...',
api='ExtensionsV1beta1Api',
method='create_namespaced_deployment',
kwargs={
'body': nginx_deployment,
},
)
return this.success('all done')