Skip to main content
Version: 9 - Germknödel

ConnectorTypeK8S

class connector_types.connector_type_k8s.ConnectorTypeK8S

Interact with a kubernetes cluster.

This connector type enables you to interact with a remote kubernetes cluster. See the Official Python client library for kubernetes for a description of available APIs and methods.

Input Schema

  • host

    The hostname or IP of the kubernetes cluster to use.

    Type: string

  • certificate

    The CA certificate of the kubernetes cluster.

    Type: string

  • document

    A Kubernetes API JSON object to apply. Either document or (api and method) must be specified.

  • api

    The API to use. E.g. "CoreV1Api". Either document or (api and method) must be specified.

    Type: anyOf

  • method

    The method to call. E.g. "create_namespaced_deployment". Either document or (api and method) must be specified.

    Type: anyOf

  • replace

    If set to true, the resource will be replaced. Otherwise it will be patched, which means artefacts that have not changed will remain.

    Type: boolean

  • args

    Positional arguments to pass to the method call.

    Type: array

  • kwargs

    Keyword arguments to pass to the method call

    Type: object

    Additional Properties: True

    Pattern Properties:

    • .*

      Data

  • token

    The bearer token to use for authentication. Either token or key must be specified.

    Type: anyOf

  • key

    Type: anyOf

  • parameters

    DEPRECATED. Replaced by kwargs.

Output Schema

  • result

    Data

Constants

ssl_context_inputs = ['check_hostname', 'client_cert', 'client_key', 'server_ca']

Example

import yaml
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
nginx_deployment_str = system.file('nginx-deployment.yaml').get_text_content()
nginx_deployment = yaml.safe_load(nginx_deployment_str)
this.connect(
connector_type='K8S',
token='*secret*',
host='my-kubernetes-cluster-master',
certificate='-----BEGIN CERTIFICATE-----\nMII...',
api='ExtensionsV1beta1Api',
method='create_namespaced_deployment',
kwargs={
'body': nginx_deployment,
},
)

return this.success('all done')