Skip to main content
Version: 10 - Vanillekipferl

ConnectorTypeSQLPG

class connector_types.connector_type_sqlpg.ConnectorTypeSQLPG

Interact with a PostgreSQL database. This connector type supports the execute, fetch, fetchrow, and fetchval commands. Each command expects an SQL query and returns the status, list, record or field value respectively.

Consult the PostgreSQL SQL language documentation at https://www.postgresql.org/docs/12/tutorial-sql.html for more information.

Input Schema

  • schema_version

    Type: string

  • authentication

    Type: anyOf

  • host

    The remote hostname or IP address.

    Type: string

  • port

    Type: anyOf

  • tls

    If to connect using TLS/SSL.

    Type: anyOf

  • mode

    Type: anyOf

  • database

    The database to use.

    Type: string

Output Schema

  • result

    Data

Example

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
postgres_server_version = this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'fetchval',
'query': 'SELECT version()',
},
).get('output_value')['result']
this.log(postgres_server_version=postgres_server_version)
return this.success('all done')

Insert data into a table

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'execute',
'query': 'INSERT INTO mytab (str_col, int_col) VALUES ($1, $2);',
'params': ['foo', 42],
},
)
return this.success('all done')

Execute many

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'executemany',
'query': 'INSERT INTO mytab (str_col, int_col) VALUES ($1, $2);',
'rows': [
['foo', 42],
['bar', 43],
],
},
)
return this.success('all done')

Transaction

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
results = this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'transaction',
'queries': [
{
'mode_name': 'execute',
'query': 'INSERT INTO mytab (str_col, int_col) VALUES ($1, $2);',
'params': ['foo', 42],
},
{
'mode_name': 'fetchval',
'query': 'SELECT last_insert_id()',
},
],
},
).get('output_value')['result']
this.log(results=results)
return this.success('all done')