ConnectorTypeSQLPG
class connector_types.connector_type_sqlpg.ConnectorTypeSQLPG
Interact with a PostgreSQL database. This connector type supports the execute
,
fetch
, fetchrow
, and fetchval
commands. Each command expects an
SQL query and returns the status, list, record or field value respectively.
Consult the PostgreSQL SQL language documentation at https://www.postgresql.org/docs/12/tutorial-sql.html for more information.
Input Schema
-
schema_version
Type:
string
-
authentication
Type:
anyOf
Options: -
host
The remote hostname or IP address.
Type:
string
-
port
Type:
anyOf
Options: -
tls
If to connect using TLS/SSL.
Type:
anyOf
Options: -
mode
Type:
anyOf
Options: -
database
The database to use.
Type:
string
Output Schema
Example
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
postgres_server_version = this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'fetchval',
'query': 'SELECT version()',
},
).get('output_value')['result']
this.log(postgres_server_version=postgres_server_version)
return this.success('all done')
Insert data into a table
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'execute',
'query': 'INSERT INTO mytab (str_col, int_col) VALUES ($1, $2);',
'params': ['foo', 42],
},
)
return this.success('all done')
Execute many
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'executemany',
'query': 'INSERT INTO mytab (str_col, int_col) VALUES ($1, $2);',
'rows': [
['foo', 42],
['bar', 43],
],
'batch_size': 100,
},
)
return this.success('all done')
Transaction
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
results = this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'transaction',
'queries': [
{
'mode_name': 'execute',
'query': 'INSERT INTO mytab (str_col, int_col) VALUES ($1, $2);',
'params': ['foo', 42],
},
{
'mode_name': 'fetchval',
'query': 'SELECT last_insert_id()',
},
],
},
).get('output_value')['result']
this.log(results=results)
return this.success('all done')