pg_lib api documentation

class pg_lib.pg_connection(global_config)[source]

Bases: object


Connects to PostgreSQL using the parameters stored in pg_pars built adding the key dbname to the self.pg_conn dictionary. This method’s connection and cursors are widely used in the procedure except for the replay process which uses a dedicated connection and cursor. The method after the connection creates a database cursor and set the session to autocommit.


Connects to PostgreSQL using the parameters stored in pg_pars built adding the key dbname to the self.pg_conn dictionary. The method after the connection creates a database cursor and set the session to autocommit. This method creates an additional connection and cursor used by the replay process.


The method disconnects from the main database connection.


The method disconnects from the replay database connection.

class pg_lib.pg_encoder(skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, encoding='utf-8', default=None)[source]

Bases: json.encoder.JSONEncoder


Return a JSON string representation of a Python data structure.

>>> JSONEncoder().encode({"foo": ["bar", "baz"]})
'{"foo": ["bar", "baz"]}'
item_separator = ', '
iterencode(o, _one_shot=False)

Encode the given object and yield each string representation as available.

For example:

for chunk in JSONEncoder().iterencode(bigobject):
key_separator = ': '
class pg_lib.pg_engine(global_config, table_metadata, table_file, logger, sql_dir='sql/')[source]

Bases: object

The class pg_engine manages the replica initialisation and execution on the PostgreSQL side.

The class connects to the database when instantiated and setup several class attributes used by the replica. In particular the class dictionary type_dictionary is used to map the MySQL types to the equivalent PostgreSQL types. Unlike pgloader, which allows the type mapping configuration, the dictionary is hardcoded as the mapping is an effort to keep the replica running as smooth as possible. The class manages the replica catalogue upgrade using the current catalogue version self.cat_version and the list of migrations self.cat_sql.

If the catalogue version, stored in sch_chameleon.v_version is different from the value stored in self.cat_version then the method upgrade_service_schema() is executed.

add_foreign_keys(source_name, fk_metadata)[source]

the method creates the foreign keys extracted from the mysql catalog the keys are created initially as invalid then validated. If an error happens is displayed on the log destination

  • source_name – the source name, required to determine the destination schema
  • fk_metadata – the foreign keys metadata extracted from mysql’s information schema
add_source(source_name, dest_schema)[source]

The method add a new source in the replica catalogue. If the source name is already present an error message is emitted without further actions. :param source_name: The source name stored in the configuration parameter source_name. :param source_schema: The source schema on mysql. The field is not used except when migrating the catalogue to the newer version 2.0.x.


The method builds the alter table statement from the token data. The function currently supports the following statements. DROP TABLE ADD COLUMN CHANGE MODIFY

The change and modify are potential source of breakage for the replica because of the mysql implicit fallback data types. For better understanding please have a look to

Parameters:token – A dictionary with the tokenised sql statement
Returns:query the DDL query in the PostgreSQL dialect
Return type:string

The method builds the enum DDL using the token data. The postgresql system catalog is queried to determine whether the enum exists and needs to be altered. The alter is not written in the replica log table but executed as single statement as PostgreSQL do not allow the alter being part of a multi command SQL.

Parameters:enm_dic – a dictionary with the enumeration details
Returns:a dictionary with the pre_alter and post_alter statements (e.g. pre alter create type , post alter drop type)
Return type:dictionary

The method loops over the list l_pkeys and builds a new list with the statements for pkeys


The method iterates over the list l_tables and builds a new list with the statements for tables


the function checks if there is any reindex running and holds for the given number of seconds


The method checks if the sch_chameleon exists

Returns:count from information_schema.schemata
Return type:integer

Removes the replica batch data for the given source id. The method is used to cleanup incomplete batch data in case of crash or replica’s unclean restart

copy_data(table, csv_file, my_tables={})[source]

The method copy the data into postgresql using psycopg2’s copy_expert. The csv_file is a file like object which can be either a csv file or a string io object, accordingly with the configuration parameter copy_mode.

  • table – the table name, used to get the table’s metadata out of my_tables
  • csv_file – file like object with the table’s data stored in CSV format
  • my_tables – table’s metadata dictionary

The method creates the indices using the DDL stored in the class list self.idx_ddl.


The method drops and creates the destination schema. It also set the search_path for the cursor to the destination schema.


The method installs the service replica service schema sch_chameleon.


The method executes the index DDL read from the table t_index_def. The method is used when resyncing the replica for recreating the indices after the bulk load.


The method loops trough the list table_ddl and executes the creation scripts. No index is created in this method


The method removes the events from the log table for specific table and source. Is used to cleanup any residual event for a a synced table in the replica_engine’s sync_table method.


The method drops the primary key for the table. As tables without primary key cannot be replicated the method calls unregister_table to remove the table from the replica set. The drop constraint statement is not built from the token but generated from the information_schema.

Parameters:token – the tokenised query for drop primary key

The method removes the service schema discarding all the replica references. The replicated tables are kept in place though.


Drops the source from the replication catalogue discarding any replica reference. :param source_name: The source name stored in the configuration parameter source_name.


The method executes the index drop statements read from the table t_index_def. The method is used when resyncing the replica for removing the indices before the bulk load.


The method drops the tables present in the table_ddl


The method builds the DDL using the tokenised SQL stored in token. The supported commands are RENAME TABLE DROP TABLE TRUNCATE CREATE TABLE ALTER TABLE DROP PRIMARY KEY

Parameters:token – A dictionary with the tokenised sql statement
Returns:query the DDL query in the PostgreSQL dialect
Return type:string
generate_default_statements(table, column, create_column=None)[source]

The method gets the default value associated with the table and column removing the cast. :param table: The table name :param table: The column name :return: the statements for dropping and creating default value on the affected table :rtype: dictionary


The method updates the batch status to started for the given source_id and returns the batch informations.

Returns:psycopg2 fetchall results without any manipulation
Return type:psycopg2 tuple
get_data_type(column, table)[source]

The method determines whether the specified type has to be overridden or not.

  • column – the column dictionary extracted from the information_schema or build in the sql_parser class
  • table – the table name

the postgresql converted column type

Return type:



The method collects the tables in not consistent state. The informations are stored in a dictionary which key is the table’s name. The dictionary is used in the read replica loop to determine wheter the table’s modifications should be ignored because in not consistent state.

Returns:a dictionary with the tables in inconsistent state and their snapshot coordinates.
Return type:dictionary

The method inserts in the table t_index_def the create and drop statements for the tables affected by the resync replica.


The method gets the service schema version querying the view sch_chameleon.v_version. The try-except is used in order to get a valid value “base” if the view is missing. This happens only if the schema upgrade is performed from very early pg_chamelon’s versions.

Returns:the catalogg version
Return type:string

Gets the source status usin the source name. Possible values are:

ready : the source is registered but the init_replica is not yet done.

initialising: init_replica is initialising

initialised: init_replica finished and the replica process is ready to start

stopped: the replica process is stopped

running: the replica process is running

Parameters:source_name (string) – The source name stored in the configuration parameter source_name.
Returns:source_status extracted from PostgreSQL
Return type:string

The metod lists the sources with the running status and the eventual lag

Returns:psycopg2 fetchall results
Return type:psycopg2 tuple

Fallback method for the batch insert. Each row event is processed individually and any problematic row is discarded into the table t_discarded_rows. The row is encoded in base64 in order to prevent any encoding or type issue.

Parameters:group_insert – the event data built in mysql_engine
insert_data(table, insert_data, my_tables={})[source]

The method is a fallback procedure for when the copy method fails. The procedure performs a row by row insert, very slow but capable to skip the rows with problematic data (e.g. enchoding issues).

  • table – the table name, used to get the table’s metadata out of my_tables
  • csv_file – file like object with the table’s data stored in CSV format
  • my_tables – table’s metadata dictionary

The method calls the function fn_process_batch with the parameters batch size and the id_source.

The plpgsql function returns true if there are still rows to process. When all rows are replayed the method exits.

Parameters:replica_batch_size – the max rows to process in a single function call.

the method resets the sequences to the max value available the associated table :param source_name: the source name, required to determine the destination schema

save_discarded_row(row_data, batch_id)[source]

The method saves the discarded row in the table t_discarded_row along with the id_batch. The row is encoded in base64 as the t_row_data is a text field.

  • row_data – the row data dictionary
  • batch_id – the id batch where the row belongs
save_master_status(master_status, cleanup=False)[source]

This method saves the master data determining which log table should be used in the next batch.

The method performs also a cleanup for the logged events the cleanup parameter is true.

  • master_status – the master data with the binlogfile and the log position
  • cleanup – if true cleans the not replayed batches. This is useful when resyncing a replica.
set_application_name(action='', conn_type='main')[source]

The method sets the application name in the replica using the variable self.pg_conn.global_conf.source_name, Making simpler to find the replication processes. If the source name is not set then a generic PGCHAMELEON name is used.


The method updates the flag b_processed and sets the processed timestamp for the given batch id. The event ids are aggregated into the table t_batch_events used by the replay function.

Parameters:id_batch – the id batch to set as processed

The method set to NULL the binlog name and position for the given table. When the table is marked consistent the read replica loop reads and saves the table’s row images.

Parameters:table – the table name

The method sets the search path for the connection.


Sets the source status for the source_name and sets the two class attributes i_id_source and dest_schema.

Parameters:source_status – The source status to be set.

The method saves the table name along with the primary key definition in the table t_replica_tables. This is required in order to let the replay procedure which primary key to use replaying the update and delete. If the table is without primary key is not stored. A table without primary key is copied and the indices are create like any other table. However the replica doesn’t work for the tables without primary key.

If the class variable master status is set then the master’s coordinates are saved along with the table. This happens in general when a table is added to the replica or the data is refreshed with sync_tables.

Parameters:table_name – the table name to store in the table t_replica_tables

The method truncate the tables listed in t_index_def. In order to minimise the risk of lock chain the truncate is prepended by a set lock_timeout = 10 seconds. If the lock is not acquired in that time the procedure fallsback to a delete and vacuum.


This method is used when a table have the primary key dropped on MySQL. The table name is removed from the replicatoin catalogue and the table is renamed. This way any dependency (e.g. views, functions) to the table is preserved but the replica is stopped.

Parameters:table_name – the table name to remove from t_replica_tables

The method upgrades the service schema to the latest version using the upgrade files if required.

The method uses the install_script flag to determine whether an upgrade file should be applied. The variable cat_version stores the schema version. Each element in the class list cat_sql stores the scripts in the upgrade directory along with the catalogue version associated with the install script.

If the current catalogue version stored in cat_version is equal to the script version the install is skipped but the variable install_script is set to true. This way any following install script is executed to upgrade the catalogue to the higher version.

The hardcoded 0.7 version is required because that version introduced the multi source feature. As initially the destination schema were not stored in the migration catalogue, the post migration update is required to save this information in the replica catalogue.


Main method for adding the batch data in the log tables. The row data from group_insert are mogrified in CSV format and stored in the string like object csv_file.

psycopg2’s copy expert is used to store the event data in PostgreSQL.

Should any error occur the procedure fallsback to insert_batch.

Parameters:group_insert – the event data built in mysql_engine
write_ddl(token, query_data)[source]

The method writes the DDL built from the tokenised sql into PostgreSQL.

  • token – the tokenised query
  • query_data – query’s metadata (schema,binlog, etc.)