import time
import sys
import io
import pymysql
import codecs
import binascii
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import QueryEvent, GtidEvent, HeartbeatLogEvent
from pymysqlreplication.row_event import DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent
from pymysqlreplication.event import RotateEvent
from pg_chameleon import sql_token
from os import remove
import re
[docs]class mysql_source(object):
def __init__(self):
"""
Class constructor, the method sets the class variables and configure the
operating parameters from the args provided t the class.
"""
self.statement_skip = ['BEGIN', 'COMMIT']
self.schema_tables = {}
self.schema_mappings = {}
self.schema_loading = {}
self.schema_list = []
self.hexify_always = ['blob', 'tinyblob', 'mediumblob','longblob','binary','varbinary']
self.spatial_datatypes = ['point','geometry','linestring','polygon', 'multipoint', 'multilinestring', 'geometrycollection']
self.schema_only = {}
self.gtid_mode = False
self.gtid_enable = False
self.copy_table_data = True
def __del__(self):
"""
Class destructor, tries to disconnect the mysql connection.
"""
self.disconnect_db_unbuffered()
self.disconnect_db_buffered()
def __check_mysql_config(self):
"""
The method check if the mysql configuration is compatible with the replica requirements.
If all the configuration requirements are met then the return value is True.
Otherwise is false.
The parameters checked are:
log_bin - ON if the binary log is enabled if we are on a vanilla mysql
binlog_format - must be ROW , otherwise the replica won't get the data
binlog_row_image - must be FULL, otherwise the row image will be incomplete
The method checks the function AURORA_VERSION() and if the query doesn't error then skips the log_bin parameter check.
"""
sql_aurora = """select AURORA_VERSION() ver;"""
try:
self.cursor_buffered.execute(sql_aurora)
skip_log_bin_check = True
except:
skip_log_bin_check = False
if self.gtid_enable:
sql_log_bin = """SHOW GLOBAL VARIABLES LIKE 'gtid_mode';"""
self.cursor_buffered.execute(sql_log_bin)
variable_check = self.cursor_buffered.fetchone()
if variable_check:
gtid_mode = variable_check["Value"]
if gtid_mode.upper() == 'ON':
self.gtid_mode = True
sql_uuid = """SHOW SLAVE STATUS;"""
self.cursor_buffered.execute(sql_uuid)
slave_status = self.cursor_buffered.fetchall()
if len(slave_status)>0:
gtid_set=slave_status[0]["Retrieved_Gtid_Set"]
else:
sql_uuid = """SHOW GLOBAL VARIABLES LIKE 'server_uuid';"""
self.cursor_buffered.execute(sql_uuid)
server_uuid = self.cursor_buffered.fetchone()
gtid_set = server_uuid["Value"]
self.gtid_uuid = gtid_set.split(':')[0]
else:
self.gtid_mode = False
else:
self.gtid_mode = False
sql_log_bin = """SHOW GLOBAL VARIABLES LIKE 'log_bin';"""
self.cursor_buffered.execute(sql_log_bin)
variable_check = self.cursor_buffered.fetchone()
log_bin = variable_check["Value"]
sql_log_bin = """SHOW GLOBAL VARIABLES LIKE 'binlog_format';"""
self.cursor_buffered.execute(sql_log_bin)
variable_check = self.cursor_buffered.fetchone()
binlog_format = variable_check["Value"]
sql_log_bin = """SHOW GLOBAL VARIABLES LIKE 'binlog_row_image';"""
self.cursor_buffered.execute(sql_log_bin)
variable_check = self.cursor_buffered.fetchone()
if variable_check:
binlog_row_image = variable_check["Value"]
else:
binlog_row_image = 'FULL'
if (log_bin.upper() == 'ON' or skip_log_bin_check) and binlog_format.upper() == 'ROW' and binlog_row_image.upper() == 'FULL':
self.replica_possible = True
else:
self.replica_possible = False
self.pg_engine.set_source_status("error")
self.logger.error("The MySQL configuration does not allow the replica. Exiting now")
self.logger.error("Source settings - log_bin %s, binlog_format %s, binlog_row_image %s" % (log_bin.upper(), binlog_format.upper(), binlog_row_image.upper() ))
self.logger.error("Mandatory settings - log_bin ON, binlog_format ROW, binlog_row_image FULL (only for MySQL 5.6+) ")
sys.exit()
[docs] def connect_db_buffered(self):
"""
The method creates a new connection to the mysql database.
The connection is made using the dictionary type cursor factory, which is buffered.
"""
db_conn = self.source_config["db_conn"]
db_conn = {key:str(value) for key, value in db_conn.items()}
db_conn["port"] = int(db_conn["port"])
db_conn["connect_timeout"] = int(db_conn["connect_timeout"])
self.conn_buffered=pymysql.connect(
host = db_conn["host"],
user = db_conn["user"],
port = db_conn["port"],
password = db_conn["password"],
charset = db_conn["charset"],
connect_timeout = db_conn["connect_timeout"],
cursorclass=pymysql.cursors.DictCursor
)
self.charset = db_conn["charset"]
self.cursor_buffered = self.conn_buffered.cursor()
self.cursor_buffered_fallback = self.conn_buffered.cursor()
[docs] def disconnect_db_buffered(self):
"""
The method disconnects any connection with dictionary type cursor from the mysql database.
"""
try:
self.conn_buffered.close()
except:
pass
[docs] def connect_db_unbuffered(self):
"""
The method creates a new connection to the mysql database.
The connection is made using the unbuffered cursor factory.
"""
db_conn = self.source_config["db_conn"]
db_conn = {key:str(value) for key, value in db_conn.items()}
db_conn["port"] = int(db_conn["port"])
db_conn["connect_timeout"] = int(db_conn["connect_timeout"])
self.conn_unbuffered=pymysql.connect(
host = db_conn["host"],
user = db_conn["user"],
port = db_conn["port"],
password = db_conn["password"],
charset = db_conn["charset"],
connect_timeout = db_conn["connect_timeout"],
cursorclass=pymysql.cursors.SSCursor
)
self.charset = db_conn["charset"]
self.cursor_unbuffered = self.conn_unbuffered.cursor()
[docs] def disconnect_db_unbuffered(self):
"""
The method disconnects any unbuffered connection from the mysql database.
"""
try:
self.conn_unbuffered.close()
except:
pass
def __build_skip_events(self):
"""
The method builds a class attribute self.skip_events. The attribute is a dictionary with the tables and schemas listed under the three kind of skippable events (insert,delete,update) using
the configuration parameter skip_events.
"""
self.skip_events = None
if "skip_events" in self.source_config:
skip_events = self.source_config["skip_events"]
self.skip_events = {}
if "insert" in skip_events:
self.skip_events["insert"] = skip_events["insert"]
else:
self.skip_events["insert"] = []
if "update" in skip_events:
self.skip_events["update"] = skip_events["update"]
else:
self.skip_events["update"] = []
if "delete" in skip_events:
self.skip_events["delete"] = skip_events["delete"]
else:
self.skip_events["delete"] = []
def __build_table_exceptions(self):
"""
The method builds two dictionaries from the limit_tables and skip tables values set for the source.
The dictionaries are intended to be used in the get_table_list to cleanup the list of tables per schema.
The method manages the particular case of when the class variable self.tables is set.
In that case only the specified tables in self.tables will be synced. Should limit_tables be already
set, then the resulting list is the intersection of self.tables and limit_tables.
"""
self.limit_tables = {}
self.skip_tables = {}
limit_tables = self.source_config["limit_tables"]
skip_tables = self.source_config["skip_tables"]
if self.tables !='*':
tables = [table.strip() for table in self.tables.split(',')]
if limit_tables:
limit_schemas = [table.split('.')[0] for table in limit_tables]
limit_tables = [table for table in tables if table in limit_tables or table.split('.')[0] not in limit_schemas]
else:
limit_tables = tables
self.schema_only = {table.split('.')[0] for table in limit_tables}
if limit_tables:
table_limit = [table.split('.') for table in limit_tables]
for table_list in table_limit:
list_exclude = []
try:
list_exclude = self.limit_tables[table_list[0]]
list_exclude.append(table_list[1])
except KeyError:
try:
list_exclude.append(table_list[1])
except IndexError:
pass
self.limit_tables[table_list[0]] = list_exclude
if skip_tables:
table_skip = [table.split('.') for table in skip_tables]
for table_list in table_skip:
list_exclude = []
try:
list_exclude = self.skip_tables[table_list[0]]
list_exclude.append(table_list[1])
except KeyError:
try:
list_exclude.append(table_list[1])
except:
pass
self.skip_tables[table_list[0]] = list_exclude
[docs] def get_table_list(self):
"""
The method pulls the table list from the information_schema.
The list is stored in a dictionary which key is the table's schema.
"""
sql_tables="""
SELECT
table_name as table_name
FROM
information_schema.TABLES
WHERE
table_type='BASE TABLE'
AND table_schema=%s
;
"""
for schema in self.schema_list:
self.cursor_buffered.execute(sql_tables, (schema))
table_list = [table["table_name"] for table in self.cursor_buffered.fetchall()]
try:
limit_tables = self.limit_tables[schema]
if len(limit_tables) > 0:
table_list = [table for table in table_list if table in limit_tables]
except KeyError:
pass
try:
skip_tables = self.skip_tables[schema]
if len(skip_tables) > 0:
table_list = [table for table in table_list if table not in skip_tables]
except KeyError:
pass
self.schema_tables[schema] = table_list
[docs] def create_destination_schemas(self):
"""
Creates the loading schemas in the destination database and associated tables listed in the dictionary
self.schema_tables.
The method builds a dictionary which associates the destination schema to the loading schema.
The loading_schema is named after the destination schema plus with the prefix _ and the _tmp suffix.
As postgresql allows, by default up to 64 characters for an identifier, the original schema is truncated to 59 characters,
in order to fit the maximum identifier's length.
The mappings are stored in the class dictionary schema_loading.
If the source parameter keep_existing_schema is set to true the method doesn't create the schemas.
Instead assumes the schema and the tables are already there.
"""
if self.keep_existing_schema:
self.logger.debug("Keep existing schema is set to True. Skipping the schema creation." )
for schema in self.schema_list:
destination_schema = self.schema_mappings[schema]
self.schema_loading[schema] = {'destination':destination_schema, 'loading':destination_schema}
else:
for schema in self.schema_list:
destination_schema = self.schema_mappings[schema]
loading_schema = "_%s_tmp" % destination_schema[0:59]
self.schema_loading[schema] = {'destination':destination_schema, 'loading':loading_schema}
self.logger.debug("Creating the loading schema %s." % loading_schema)
self.pg_engine.create_database_schema(loading_schema)
self.logger.debug("Creating the destination schema %s." % destination_schema)
self.pg_engine.create_database_schema(destination_schema)
[docs] def drop_loading_schemas(self):
"""
The method drops the loading schemas from the destination database.
The drop is performed on the schemas generated in create_destination_schemas.
The method assumes the class dictionary schema_loading is correctly set.
"""
for schema in self.schema_loading:
loading_schema = self.schema_loading[schema]["loading"]
self.logger.debug("Dropping the schema %s." % loading_schema)
self.pg_engine.drop_database_schema(loading_schema, True)
[docs] def create_destination_tables(self):
"""
The method creates the destination tables in the loading schema.
The tables names are looped using the values stored in the class dictionary schema_tables.
"""
for schema in self.schema_tables:
table_list = self.schema_tables[schema]
for table in table_list:
table_metadata = self.get_table_metadata(table, schema)
self.pg_engine.create_table(table_metadata, table, schema, 'mysql')
[docs] def generate_select_statements(self, schema, table):
"""
The generates the csv output and the statements output for the given schema and table.
The method assumes there is a buffered database connection active.
:param schema: the origin's schema
:param table: the table name
:return: the select list statements for the copy to csv and the fallback to inserts.
:rtype: dictionary
"""
select_columns = {}
sql_select="""
SELECT
CASE
WHEN
data_type IN ('"""+"','".join(self.hexify)+"""')
THEN
concat('hex(',column_name,')')
WHEN
data_type IN ('bit')
THEN
concat('cast(`',column_name,'` AS unsigned)')
WHEN
data_type IN ('datetime','timestamp','date')
THEN
concat('nullif(`',column_name,'`,cast("0000-00-00 00:00:00" as date))')
WHEN
data_type IN ('"""+"','".join(self.spatial_datatypes)+"""')
THEN
concat('ST_AsText(',column_name,')')
ELSE
concat('cast(`',column_name,'` AS char CHARACTER SET """+ self.charset +""")')
END
AS select_csv,
CASE
WHEN
data_type IN ('"""+"','".join(self.hexify)+"""')
THEN
concat('hex(',column_name,') AS','`',column_name,'`')
WHEN
data_type IN ('bit')
THEN
concat('cast(`',column_name,'` AS unsigned) AS','`',column_name,'`')
WHEN
data_type IN ('datetime','timestamp','date')
THEN
concat('nullif(`',column_name,'`,cast("0000-00-00 00:00:00" as date)) AS `',column_name,'`')
WHEN
data_type IN ('"""+"','".join(self.spatial_datatypes)+"""')
THEN
concat('ST_AsText(',column_name,') AS','`',column_name,'`')
ELSE
concat('cast(`',column_name,'` AS char CHARACTER SET """+ self.charset +""") AS','`',column_name,'`')
END
AS select_stat,
column_name as column_name
FROM
information_schema.COLUMNS
WHERE
table_schema=%s
AND table_name=%s
ORDER BY
ordinal_position
;
"""
self.cursor_buffered.execute(sql_select, (schema, table))
select_data = self.cursor_buffered.fetchall()
select_csv = ["COALESCE(REPLACE(%s, '\"', '\"\"'),'NULL') " % statement["select_csv"] for statement in select_data]
select_stat = [statement["select_csv"] for statement in select_data]
column_list = ['"%s"' % statement["column_name"] for statement in select_data]
select_columns["select_csv"] = "REPLACE(CONCAT('\"',CONCAT_WS('\",\"',%s),'\"'),'\"NULL\"','NULL')" % ','.join(select_csv)
select_columns["select_stat"] = ','.join(select_stat)
select_columns["column_list"] = ','.join(column_list)
return select_columns
[docs] def begin_tx(self):
"""
The method sets the isolation level to repeatable read and begins a transaction
"""
self.logger.debug("set isolation level")
self.cursor_unbuffered.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
self.logger.debug("beginning transaction")
self.cursor_unbuffered.execute("BEGIN")
[docs] def end_tx(self):
"""
The method ends the current transaction by rollback
- We should never have changed source anyway
"""
self.logger.debug("rolling back")
self.cursor_unbuffered.execute("ROLLBACK")
[docs] def make_tx_snapshot(self, schema, table):
"""
The method forces creation of transaction snapshot by making a read of
one row from the source table and discarding it
"""
self.logger.debug("reading and discarding 1 row from `%s`.`%s`" % (schema, table))
self.cursor_unbuffered.execute("SELECT * FROM `%s`.`%s` LIMIT 1" % (schema, table))
[docs] def lock_table(self, schema, table):
"""
The method flushes the given table with read lock.
The method assumes there is a database connection active.
:param schema: the origin's schema
:param table: the table name
"""
self.logger.debug("locking the table `%s`.`%s`" % (schema, table) )
sql_lock = "FLUSH TABLES `%s`.`%s` WITH READ LOCK;" %(schema, table)
self.logger.debug("collecting the master's coordinates for table `%s`.`%s`" % (schema, table) )
self.cursor_buffered.execute(sql_lock)
[docs] def unlock_tables(self):
"""
The method unlocks all tables
"""
self.logger.debug("unlocking the tables")
sql_unlock = "UNLOCK TABLES;"
self.cursor_buffered.execute(sql_unlock)
[docs] def get_master_coordinates(self):
"""
The method gets the master's coordinates and return them stored in a dictionary.
The method assumes there is a database connection active.
:return: the master's log coordinates for the given table
:rtype: dictionary
"""
if not self.conn_buffered.open:
self.connect_db_buffered()
sql_master = "SHOW MASTER STATUS;"
self.cursor_buffered.execute(sql_master)
master_status = self.cursor_buffered.fetchall()
return master_status
[docs] def copy_data(self, schema, table):
"""
The method copy the data between the origin and destination table.
The method locks the table read only mode and gets the log coordinates which are returned to the calling method.
:param schema: the origin's schema
:param table: the table name
:return: the log coordinates for the given table
:rtype: dictionary
"""
slice_insert = []
loading_schema = self.schema_loading[schema]["loading"]
self.connect_db_buffered()
self.logger.debug("estimating rows in %s.%s" % (schema , table))
sql_rows = """
SELECT
table_rows as table_rows,
CASE
WHEN avg_row_length>0
then
round(({}/avg_row_length))
ELSE
0
END as copy_limit,
transactions
FROM
information_schema.TABLES,
information_schema.ENGINES
WHERE
table_schema=%s
AND table_type='BASE TABLE'
AND table_name=%s
AND TABLES.engine = ENGINES.engine
;
"""
sql_rows = sql_rows.format(self.copy_max_memory)
self.cursor_buffered.execute(sql_rows, (schema, table))
count_rows = self.cursor_buffered.fetchone()
total_rows = count_rows["table_rows"]
copy_limit = int(count_rows["copy_limit"])
table_txs = count_rows["transactions"] == "YES"
if copy_limit == 0:
copy_limit = 1000000
num_slices = int(total_rows//copy_limit)
range_slices = list(range(num_slices+1))
total_slices = len(range_slices)
slice = range_slices[0]
self.logger.debug("The table %s.%s will be copied in %s estimated slice(s) of %s rows, using a transaction %s" % (schema, table, total_slices, copy_limit, table_txs))
out_file = '%s/%s_%s.csv' % (self.out_dir, schema, table )
self.lock_table(schema, table)
master_status = self.get_master_coordinates()
select_columns = self.generate_select_statements(schema, table)
csv_data = ""
sql_csv = "SELECT %s as data FROM `%s`.`%s`;" % (select_columns["select_csv"], schema, table)
column_list = select_columns["column_list"]
self.logger.debug("Executing query for table %s.%s" % (schema, table ))
self.connect_db_unbuffered()
if table_txs:
self.begin_tx()
self.cursor_unbuffered.execute(sql_csv)
if table_txs:
self.unlock_tables()
while True:
csv_results = self.cursor_unbuffered.fetchmany(copy_limit)
if len(csv_results) == 0:
break
csv_data="\n".join(d[0] for d in csv_results )
if self.copy_mode == 'direct':
csv_file = io.StringIO()
csv_file.write(csv_data)
csv_file.seek(0)
if self.copy_mode == 'file':
csv_file = codecs.open(out_file, 'wb', self.charset)
csv_file.write(csv_data)
csv_file.close()
csv_file = open(out_file, 'rb')
try:
self.pg_engine.copy_data(csv_file, loading_schema, table, column_list)
except:
self.logger.info("Table %s.%s error in PostgreSQL copy, saving slice number for the fallback to insert statements " % (loading_schema, table ))
slice_insert.append(slice)
self.print_progress(slice+1,total_slices, schema, table)
slice+=1
csv_file.close()
if len(slice_insert)>0:
ins_arg={}
ins_arg["slice_insert"] = slice_insert
ins_arg["table"] = table
ins_arg["schema"] = schema
ins_arg["select_stat"] = select_columns["select_stat"]
ins_arg["column_list"] = column_list
ins_arg["copy_limit"] = copy_limit
self.insert_table_data(ins_arg)
if table_txs:
self.end_tx()
else:
self.unlock_tables()
self.cursor_unbuffered.close()
self.disconnect_db_unbuffered()
self.disconnect_db_buffered()
try:
remove(out_file)
except:
pass
return master_status
[docs] def insert_table_data(self, ins_arg):
"""
This method is a fallback procedure whether copy_table_data fails.
The ins_args is a list with the informations required to run the select for building the insert
statements and the slices's start and stop.
The process is performed in memory and can take a very long time to complete.
:param pg_engine: the postgresql engine
:param ins_arg: the list with the insert arguments (slice_insert, schema, table, select_stat,column_list, copy_limit)
"""
slice_insert= ins_arg["slice_insert"]
table = ins_arg["table"]
schema = ins_arg["schema"]
select_stat = ins_arg["select_stat"]
column_list = ins_arg["column_list"]
copy_limit = ins_arg["copy_limit"]
self.connect_db_unbuffered()
loading_schema = self.schema_loading[schema]["loading"]
num_insert = 1
for slice in slice_insert:
self.logger.info("Executing inserts in %s.%s. Slice %s. Rows per slice %s." % (loading_schema, table, num_insert, copy_limit , ))
offset = slice*copy_limit
sql_fallback = "SELECT %s FROM `%s`.`%s` LIMIT %s, %s;" % (select_stat, schema, table, offset, copy_limit)
self.cursor_unbuffered.execute(sql_fallback)
insert_data = self.cursor_unbuffered.fetchall()
self.pg_engine.insert_data(loading_schema, table, insert_data , column_list)
num_insert +=1
[docs] def print_progress (self, iteration, total, schema, table):
"""
Print the copy progress in slices and estimated total slices.
In order to reduce noise when the log level is info only the tables copied in multiple slices
get the print progress.
:param iteration: The slice number currently processed
:param total: The estimated total slices
:param table_name: The table name
"""
if iteration>=total:
total = iteration
if total>1:
self.logger.info("Table %s.%s copied %s slice of %s" % (schema, table, iteration, total))
else:
self.logger.debug("Table %s.%s copied %s slice of %s" % (schema, table, iteration, total))
def __create_indices(self, schema, table):
"""
The method copy the data between the origin and destination table.
The method locks the table read only mode and gets the log coordinates which are returned to the calling method.
:param schema: the origin's schema
:param table: the table name
:return: the table and schema name with the primary key.
:rtype: dictionary
"""
loading_schema = self.schema_loading[schema]["loading"]
self.connect_db_buffered()
self.logger.debug("Creating indices on table %s.%s " % (schema, table))
sql_index = """
SELECT
CASE WHEN index_name='PRIMARY'
THEN
index_name
WHEN (
SELECT
count(1)
FROM
information_schema.statistics s
WHERE
s.index_name=t.index_name
AND s.table_schema=t.table_schema
)>1
THEN
concat(substring(index_name,1,59),'_',SUBSTRING(md5(uuid()),1,4))
ELSE
index_name
END AS index_name,
non_unique as non_unique,
GROUP_CONCAT(column_name ORDER BY seq_in_index) as index_columns
FROM
information_schema.statistics t
WHERE
table_schema=%s
AND table_name=%s
AND index_type = 'BTREE'
GROUP BY
table_name,
non_unique,
index_name
;
"""
self.cursor_buffered.execute(sql_index, (schema, table))
index_data = self.cursor_buffered.fetchall()
table_pkey = self.pg_engine.create_indices(loading_schema, table, index_data)
self.disconnect_db_buffered()
return table_pkey
def __copy_tables(self):
"""
The method copies the data between tables, from the mysql schema to the corresponding
postgresql loading schema. Before the copy starts the table is locked and then the lock is released.
If keep_existing_schema is true for the source then the tables are truncated before the copy,
the indices are left in place and a REINDEX TABLE is executed after the copy.
"""
for schema in self.schema_tables:
loading_schema = self.schema_loading[schema]["loading"]
destination_schema = self.schema_loading[schema]["destination"]
table_list = self.schema_tables[schema]
for table in table_list:
self.logger.info("Copying the source table %s into %s.%s" %(table, loading_schema, table) )
try:
if self.keep_existing_schema:
table_pkey = self.pg_engine.get_existing_pkey(destination_schema,table)
self.logger.info("Collecting constraints and indices from the destination table %s.%s" %(destination_schema, table) )
self.pg_engine.collect_idx_cons(destination_schema,table)
self.logger.info("Removing constraints and indices from the destination table %s.%s" %(destination_schema, table) )
self.pg_engine.cleanup_idx_cons(destination_schema,table)
self.logger.info("Truncating the table %s.%s" %(destination_schema, table) )
self.pg_engine.truncate_table(destination_schema,table)
master_status = self.copy_data(schema, table)
else:
if self.copy_table_data:
master_status = self.copy_data(schema, table)
else:
master_status = self.get_master_coordinates()
table_pkey = self.__create_indices(schema, table)
self.pg_engine.store_table(destination_schema, table, table_pkey, master_status)
if self.keep_existing_schema:
#input("Press Enter to continue...")
self.logger.info("Adding constraint and indices to the destination table %s.%s" %(destination_schema, table) )
self.pg_engine.create_idx_cons(destination_schema,table)
except:
self.logger.info("Could not copy the table %s. Excluding it from the replica." %(table) )
raise
[docs] def set_copy_max_memory(self):
"""
The method sets the class variable self.copy_max_memory using the value stored in the
source setting.
"""
copy_max_memory = str(self.source_config["copy_max_memory"])[:-1]
copy_scale = str(self.source_config["copy_max_memory"])[-1]
try:
int(copy_scale)
copy_max_memory = self.source_config["copy_max_memory"]
except:
if copy_scale =='k':
copy_max_memory = str(int(copy_max_memory)*1024)
elif copy_scale =='M':
copy_max_memory = str(int(copy_max_memory)*1024*1024)
elif copy_scale =='G':
copy_max_memory = str(int(copy_max_memory)*1024*1024*1024)
else:
print("**FATAL - invalid suffix in parameter copy_max_memory (accepted values are (k)ilobytes, (M)egabytes, (G)igabytes.")
sys.exit(3)
self.copy_max_memory = copy_max_memory
def __init_read_replica(self):
"""
The method calls the pre-steps required by the read replica method.
"""
self.replica_conn = {}
self.source_config = self.sources[self.source]
try:
exit_on_error = True if self.source_config["on_error_read"]=='exit' else False
except KeyError:
exit_on_error = True
self.my_server_id = self.source_config["my_server_id"]
self.limit_tables = self.source_config["limit_tables"]
self.skip_tables = self.source_config["skip_tables"]
self.replica_batch_size = self.source_config["replica_batch_size"]
self.sleep_loop = self.source_config["sleep_loop"]
self.postgis_present = self.pg_engine.check_postgis()
if self.postgis_present:
self.hexify = self.hexify_always
else:
self.hexify = self.hexify_always + self.spatial_datatypes
try:
self.connect_db_buffered()
except:
if exit_on_error:
raise
else:
return "skip"
self.pg_engine.connect_db()
self.schema_mappings = self.pg_engine.get_schema_mappings()
self.schema_replica = [schema for schema in self.schema_mappings]
db_conn = self.source_config["db_conn"]
self.replica_conn["host"] = str(db_conn["host"])
self.replica_conn["user"] = str(db_conn["user"])
self.replica_conn["passwd"] = str(db_conn["password"])
self.replica_conn["port"] = int(db_conn["port"])
self.__build_table_exceptions()
self.__build_skip_events()
self.__check_mysql_config()
if self.gtid_mode:
master_data = self.get_master_coordinates()
self.start_xid = master_data[0]["Executed_Gtid_Set"].split(':')[1].split('-')[0]
def __init_sync(self):
"""
The method calls the common steps required to initialise the database connections and
class attributes within sync_tables,refresh_schema and init_replica.
"""
try:
self.source_config = self.sources[self.source]
except KeyError:
self.logger.error("The source %s doesn't exists " % (self.source))
sys.exit()
self.out_dir = self.source_config["out_dir"]
self.copy_mode = self.source_config["copy_mode"]
self.pg_engine.lock_timeout = self.source_config["lock_timeout"]
self.pg_engine.grant_select_to = self.source_config["grant_select_to"]
if "keep_existing_schema" in self.sources[self.source]:
self.keep_existing_schema = self.sources[self.source]["keep_existing_schema"]
else:
self.keep_existing_schema = False
self.set_copy_max_memory()
self.postgis_present = self.pg_engine.check_postgis()
if self.postgis_present:
self.hexify = self.hexify_always
else:
self.hexify = self.hexify_always + self.spatial_datatypes
self.connect_db_buffered()
self.pg_engine.connect_db()
self.schema_mappings = self.pg_engine.get_schema_mappings()
self.pg_engine.schema_tables = self.schema_tables
[docs] def refresh_schema(self):
"""
The method performs a sync for an entire schema within a source.
The method works in a similar way like init_replica.
The swap happens in a single transaction.
"""
self.logger.debug("starting sync schema for source %s" % self.source)
self.logger.debug("The schema affected is %s" % self.schema)
self.__init_sync()
self.__check_mysql_config()
self.pg_engine.set_source_status("syncing")
self.__build_table_exceptions()
self.schema_list = [self.schema]
self.get_table_list()
self.create_destination_schemas()
try:
self.pg_engine.schema_loading = self.schema_loading
self.pg_engine.schema_tables = self.schema_tables
if self.keep_existing_schema:
self.disconnect_db_buffered()
self.__copy_tables()
else:
self.create_destination_tables()
self.disconnect_db_buffered()
self.__copy_tables()
self.pg_engine.grant_select()
self.pg_engine.swap_schemas()
self.drop_loading_schemas()
self.pg_engine.set_source_status("initialised")
self.connect_db_buffered()
master_end = self.get_master_coordinates()
self.disconnect_db_buffered()
self.pg_engine.set_source_highwatermark(master_end, consistent=False)
self.pg_engine.cleanup_table_events()
notifier_message = "refresh schema %s for source %s is complete" % (self.schema, self.source)
self.notifier.send_message(notifier_message, 'info')
self.logger.info(notifier_message)
except:
if not self.keep_existing_schema:
self.drop_loading_schemas()
self.pg_engine.set_source_status("error")
notifier_message = "refresh schema %s for source %s failed" % (self.schema, self.source)
self.notifier.send_message(notifier_message, 'critical')
self.logger.critical(notifier_message)
raise
[docs] def sync_tables(self):
"""
The method performs a sync for specific tables.
The method works in a similar way like init_replica except when swapping the relations.
The tables are loaded into a temporary schema and the log coordinates are stored with the table
in the replica catalogue. When the load is complete the method drops the existing table and changes the
schema for the loaded tables to the destination schema.
The swap happens in a single transaction.
"""
self.logger.info("Starting sync tables for source %s" % self.source)
self.__init_sync()
self.__check_mysql_config()
self.pg_engine.set_source_status("syncing")
if self.tables == 'disabled':
self.tables = self.pg_engine.get_tables_disabled ()
if not self.tables:
self.logger.info("There are no disabled tables to sync")
return
self.logger.debug("The tables affected are %s" % self.tables)
self.__build_table_exceptions()
self.schema_list = [schema for schema in self.schema_mappings if schema in self.schema_only]
self.get_table_list()
self.create_destination_schemas()
try:
self.pg_engine.schema_loading = self.schema_loading
self.pg_engine.schema_tables = self.schema_tables
if self.keep_existing_schema:
self.disconnect_db_buffered()
self.__copy_tables()
else:
self.create_destination_tables()
self.disconnect_db_buffered()
self.__copy_tables()
self.pg_engine.grant_select()
self.pg_engine.swap_tables()
self.drop_loading_schemas()
self.pg_engine.set_source_status("synced")
self.connect_db_buffered()
master_end = self.get_master_coordinates()
self.disconnect_db_buffered()
self.pg_engine.set_source_highwatermark(master_end, consistent=False)
self.pg_engine.cleanup_table_events()
notifier_message = "the sync for tables %s in source %s is complete" % (self.tables, self.source)
self.notifier.send_message(notifier_message, 'info')
self.logger.info(notifier_message)
except:
if not self.keep_existing_schema:
self.drop_loading_schemas()
self.pg_engine.set_source_status("error")
notifier_message = "the sync for tables %s in source %s failed" % (self.tables, self.source)
self.notifier.send_message(notifier_message, 'critical')
self.logger.critical(notifier_message)
raise
def __get_text_spatial(self,raw_data):
"""
The method returns the text representation converted in postgresql format
for the raw data point using the ST_AsText function and the regular expressions
:param charset: The table's character set
:param raw_data: The raw_data returned by the mysql-replication library
:return: text representation converted in postgresql format
:rtype: text
"""
decoded_data=binascii.hexlify(raw_data)
return decoded_data.decode()[8:]
[docs] def get_table_type_map(self):
"""
The method builds a dictionary with a key per each schema replicated.
Each key maps a dictionary with the schema's tables stored as keys and the column/type mappings.
The dictionary is used in the read_replica method, to determine whether a field requires hexadecimal conversion.
"""
table_type_map = {}
table_map = {}
self.logger.debug("collecting table type map")
for schema in self.schema_replica:
sql_tables = """
SELECT
t.table_schema as table_schema,
t.table_name as table_name,
SUBSTRING_INDEX(t.TABLE_COLLATION,'_',1) as character_set
FROM
information_schema.TABLES t
WHERE
table_type='BASE TABLE'
AND table_schema=%s
;
"""
self.cursor_buffered.execute(sql_tables, (schema, ))
table_list = self.cursor_buffered.fetchall()
for table in table_list:
column_type = {}
sql_columns = """
SELECT
column_name as column_name,
data_type as data_type
FROM
information_schema.COLUMNS
WHERE
table_schema=%s
AND table_name=%s
ORDER BY
ordinal_position
;
"""
table_charset = table["character_set"]
self.cursor_buffered.execute(sql_columns, (table["table_schema"], table["table_name"]))
column_data = self.cursor_buffered.fetchall()
for column in column_data:
column_type[column["column_name"]] = column["data_type"]
table_dict = {}
table_dict["table_charset"] = table_charset
table_dict["column_type"] = column_type
table_map[table["table_name"]] = table_dict
table_type_map[schema] = table_map
table_map = {}
return table_type_map
def __store_binlog_event(self, table, schema):
"""
The private method returns whether the table event should be stored or not in the postgresql log replica.
:param table: The table's name to check
:param schema: The table's schema name
:return: true if the table should be replicated, false if shouldn't
:rtype: boolean
"""
if self.tables_disabled:
if "%s.%s" % (schema, table) in self.tables_disabled:
return False
if schema in self.skip_tables:
if table in self.skip_tables[schema]:
return False
if schema in self.limit_tables:
if table in self.limit_tables[schema]:
return True
else:
return False
return True
def __skip_event(self, table, schema, binlogevent):
"""
The method returns true or false if whether the event should be skipped or not.
The dictionary self.skip_events is used for the check.
:param table: The table's name to check
:param schema: The table's schema name
:param binlogevent: The binlog event to evaluate
:return: list with first element a boolean and the second element the event type
:rtype: listt
"""
if isinstance(binlogevent, DeleteRowsEvent):
event = "delete"
elif isinstance(binlogevent, UpdateRowsEvent):
event = "update"
elif isinstance(binlogevent, WriteRowsEvent):
event = "insert"
skip_event = False
if self.skip_events:
if self.skip_events[event]:
table_name = "%s.%s" % (schema, table)
if schema in self.skip_events[event] or table_name in self.skip_events[event]:
skip_event = True
return [skip_event, event]
def __build_gtid_set(self, gtid):
"""
The method builds a gtid set using the current gtid and
"""
new_set = None
gtid_pack = []
master_data= self.get_master_coordinates()
if "Executed_Gtid_Set" in master_data[0]:
gtid_set = master_data[0]["Executed_Gtid_Set"]
gtid_list = gtid_set.split(",\n")
for gtid_item in gtid_list:
if gtid_item.split(':')[0] in gtid:
gtid_old = gtid_item.split(':')
gtid_new = "%s:%s-%s" % (gtid_old[0],gtid_old[1].split('-')[0],gtid[gtid_old[0]])
gtid_pack.append(gtid_new)
else:
gtid_pack.append(gtid_item)
new_set = ",\n".join(gtid_pack)
return new_set
def __decode_dic_keys(self, dic_encoded):
"""
Private method to recursively decode the dictionary keys and values into strings.
This is used fixing the the json data types in the __read_replica_stream method because
at moment the mysql-replication library returns the keys of the json data types as binary values in python3.
:param dic_encoded: The dictionary with the encoded keys
:return: The dictionary with the decoded keys
:rtype: dictionary
"""
dic_decoded = {}
lst_decode = []
if isinstance(dic_encoded, list):
for item in dic_encoded:
lst_decode.append(self.__decode_dic_keys(item))
return lst_decode
elif not isinstance(dic_encoded, dict):
try:
return dic_encoded.decode("UTF-8")
except AttributeError:
return dic_encoded
else:
for key, value in dic_encoded.items():
try:
dic_decoded[key.decode("UTF-8")] = self.__decode_dic_keys(value)
except AttributeError:
dic_decoded[key] = self.__decode_dic_keys(value)
return dic_decoded
def __read_replica_stream(self, batch_data):
"""
Stream the replica using the batch data. This method evaluates the different events streamed from MySQL
and manages them accordingly. The BinLogStreamReader function is called with the only_event parameter which
restricts the event type received by the streamer.
The events managed are the following.
RotateEvent which happens whether mysql restarts or the binary log file changes.
QueryEvent which happens when a new row image comes in (BEGIN statement) or a DDL is executed.
The BEGIN is always skipped. The DDL is parsed using the sql_token class.
[Write,Update,Delete]RowEvents are the row images pulled from the mysql replica.
The RotateEvent and the QueryEvent cause the batch to be closed.
The for loop reads the row events, builds the dictionary carrying informations like the destination schema,
the binlog coordinates and store them into the group_insert list.
When the number of events exceeds the replica_batch_size the group_insert is written into PostgreSQL.
The batch is not closed in that case and the method exits only if there are no more rows available in the stream.
Therefore the replica_batch_size is just the maximum size of the single insert and the size of replayed batch on PostgreSQL.
The binlog switch or a captured DDL determines whether a batch is closed and processed.
The update row event stores in a separate key event_before the row image before the update. This is required
to allow updates where the primary key is updated as well.
Each row event is scanned for data types requiring conversion to hex string.
:param batch_data: The list with the master's batch data.
:return: the batch's data composed by binlog name, binlog position and last event timestamp read from the mysql replica stream.
:rtype: dictionary
"""
size_insert=0
sql_tokeniser = sql_token()
table_type_map = self.get_table_type_map()
inc_tables = self.pg_engine.get_inconsistent_tables()
self.tables_disabled = self.pg_engine.get_tables_disabled(format='list')
close_batch = False
master_data = {}
group_insert = []
next_gtid = {}
id_batch = batch_data[0][0]
log_file = batch_data[0][1]
log_position = batch_data[0][2]
log_table = batch_data[0][3]
master_data["log_table"] = log_table
if self.gtid_mode:
gtid_position = batch_data[0][4]
gtid_pack = gtid_position.split(",\n")
for gtid in gtid_pack:
gtid = gtid.split(':')
next_gtid[gtid [0]] = gtid [1].split("-")[-1]
gtid_set = self.__build_gtid_set(next_gtid)
else:
gtid_set = None
stream_connected = False
my_stream = BinLogStreamReader(
connection_settings = self.replica_conn,
server_id = self.my_server_id,
only_events = [RotateEvent, DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent, QueryEvent, GtidEvent, HeartbeatLogEvent],
log_file = log_file,
log_pos = log_position,
auto_position = gtid_set,
resume_stream = True,
only_schemas = self.schema_replica,
slave_heartbeat = self.sleep_loop,
)
if gtid_set:
self.logger.debug("GTID ENABLED - gtid: %s. id_batch: %s " % (gtid_set, id_batch))
else:
self.logger.debug("GTID DISABLED - log_file %s, log_position %s. id_batch: %s " % (log_file, log_position, id_batch))
for binlogevent in my_stream:
if isinstance(binlogevent, GtidEvent):
if close_batch:
break
gtid = binlogevent.gtid.split(':')
next_gtid[gtid [0]] = gtid [1]
master_data["gtid"] = next_gtid
elif isinstance(binlogevent, RotateEvent):
event_time = binlogevent.timestamp
binlogfile = binlogevent.next_binlog
position = binlogevent.position
self.logger.debug("ROTATE EVENT - binlogfile %s, position %s. " % (binlogfile, position))
if (log_file != binlogfile and stream_connected) or len(group_insert)>0:
close_batch = True
master_data["File"]=binlogfile
master_data["Position"]=position
master_data["Time"]=event_time
master_data["gtid"] = next_gtid
stream_connected = True
if close_batch:
break
elif isinstance(binlogevent, HeartbeatLogEvent):
self.logger.debug("HEARTBEAT EVENT - binlogfile %s " % (binlogevent.ident,))
if len(group_insert)>0 or log_file != binlogevent.ident:
self.logger.debug("WRITING ROWS - binlogfile %s " % (binlogevent.ident,))
master_data["File"] = binlogevent.ident
close_batch = True
break
elif isinstance(binlogevent, QueryEvent):
event_time = binlogevent.timestamp
try:
schema_query = binlogevent.schema.decode()
except:
schema_query = binlogevent.schema
if binlogevent.query.strip().upper() not in self.statement_skip and schema_query in self.schema_mappings:
close_batch=True
destination_schema = self.schema_mappings[schema_query]
log_position = binlogevent.packet.log_pos
master_data["File"] = binlogfile
master_data["Position"] = log_position
master_data["Time"] = event_time
master_data["gtid"] = next_gtid
if len(group_insert)>0:
self.pg_engine.write_batch(group_insert)
group_insert=[]
self.logger.info("QUERY EVENT - binlogfile %s, position %s.\n--------\n%s\n-------- " % (binlogfile, log_position, binlogevent.query))
sql_tokeniser.parse_sql(binlogevent.query)
for token in sql_tokeniser.tokenised:
write_ddl = True
table_name = token["name"]
store_query = self.__store_binlog_event(table_name, schema_query)
if store_query:
table_key_dic = "%s.%s" % (destination_schema, table_name)
if table_key_dic in inc_tables:
write_ddl = False
log_seq = int(log_file.split('.')[1])
log_pos = int(log_position)
table_dic = inc_tables[table_key_dic]
if log_seq > table_dic["log_seq"]:
write_ddl = True
elif log_seq == table_dic["log_seq"] and log_pos >= table_dic["log_pos"]:
write_ddl = True
if write_ddl:
self.logger.info("CONSISTENT POINT FOR TABLE %s REACHED - binlogfile %s, position %s" % (table_key_dic, binlogfile, log_position))
self.pg_engine.set_consistent_table(table_name, destination_schema)
inc_tables = self.pg_engine.get_inconsistent_tables()
if write_ddl:
event_time = binlogevent.timestamp
self.logger.debug("TOKEN: %s" % (token))
if len(token)>0:
query_data={
"binlog":log_file,
"logpos":log_position,
"schema": destination_schema,
"batch_id":id_batch,
"log_table":log_table
}
self.pg_engine.write_ddl(token, query_data, destination_schema)
sql_tokeniser.reset_lists()
if close_batch:
my_stream.close()
return [master_data, close_batch]
else:
for row in binlogevent.rows:
event_after={}
event_before={}
event_insert = {}
add_row = True
log_file=binlogfile
log_position=binlogevent.packet.log_pos
table_name=binlogevent.table
event_time=binlogevent.timestamp
schema_row = binlogevent.schema
destination_schema = self.schema_mappings[schema_row]
table_key_dic = "%s.%s" % (destination_schema, table_name)
store_row = self.__store_binlog_event(table_name, schema_row)
skip_event = self.__skip_event(table_name, schema_row, binlogevent)
if store_row and not skip_event[0]:
if table_key_dic in inc_tables:
table_consistent = False
log_seq = int(log_file.split('.')[1])
log_pos = int(log_position)
table_dic = inc_tables[table_key_dic]
if log_seq > table_dic["log_seq"]:
table_consistent = True
elif log_seq == table_dic["log_seq"] and log_pos >= table_dic["log_pos"]:
table_consistent = True
self.logger.info("CONSISTENT POINT FOR TABLE %s REACHED - binlogfile %s, position %s" % (table_key_dic, binlogfile, log_position))
if table_consistent:
add_row = True
self.pg_engine.set_consistent_table(table_name, destination_schema)
inc_tables = self.pg_engine.get_inconsistent_tables()
else:
add_row = False
column_map = table_type_map[schema_row][table_name]["column_type"]
table_charset = table_type_map[schema_row][table_name]["table_charset"]
global_data={
"binlog":log_file,
"logpos":log_position,
"schema": destination_schema,
"table": table_name,
"batch_id":id_batch,
"log_table":log_table,
"event_time":event_time
}
if add_row:
if skip_event[1] == "delete":
global_data["action"] = "delete"
event_after=row["values"]
elif skip_event[1] == "update":
global_data["action"] = "update"
event_after=row["after_values"]
event_before=row["before_values"]
elif skip_event[1] == "insert":
global_data["action"] = "insert"
event_after=row["values"]
for column_name in event_after:
try:
column_type=column_map[column_name]
except KeyError:
self.logger.debug("Detected inconsistent structure for the table %s. The replay may fail. " % (table_name))
column_type = 'text'
if column_type in self.hexify and event_after[column_name]:
event_after[column_name]=binascii.hexlify(event_after[column_name]).decode()
elif column_type in self.hexify and isinstance(event_after[column_name], bytes):
event_after[column_name] = ''
elif column_type == 'json':
event_after[column_name] = self.__decode_dic_keys(event_after[column_name])
elif column_type in self.spatial_datatypes and event_after[column_name]:
event_after[column_name] = self.__get_text_spatial(event_after[column_name])
for column_name in event_before:
try:
column_type=column_map[column_name]
except KeyError:
self.logger.debug("Detected inconsistent structure for the table %s. The replay may fail. " % (table_name))
column_type = 'text'
if column_type in self.hexify and event_before[column_name]:
event_before[column_name]=binascii.hexlify(event_before[column_name]).decode()
elif column_type in self.hexify and isinstance(event_before[column_name], bytes):
event_before[column_name] = ''
elif column_type == 'json':
event_before[column_name] = self.__decode_dic_keys(event_after[column_name])
elif column_type in self.spatial_datatypes and event_after[column_name]:
event_before[column_name] = self.__get_text_spatial(event_before[column_name])
event_insert={"global_data":global_data,"event_after":event_after, "event_before":event_before}
size_insert += len(str(event_insert))
group_insert.append(event_insert)
master_data["File"]=log_file
master_data["Position"]=log_position
master_data["Time"]=event_time
master_data["gtid"] = next_gtid
if len(group_insert)>=self.replica_batch_size:
self.logger.info("Max rows per batch reached. Writing %s. rows. Size in bytes: %s " % (len(group_insert), size_insert))
self.logger.debug("Master coordinates: %s" % (master_data, ))
self.pg_engine.write_batch(group_insert)
size_insert=0
group_insert=[]
close_batch=True
my_stream.close()
if len(group_insert)>0:
self.logger.debug("writing the last %s events" % (len(group_insert), ))
self.pg_engine.write_batch(group_insert)
close_batch=True
return [master_data, close_batch]
[docs] def read_replica(self):
"""
The method gets the batch data from PostgreSQL.
If the batch data is not empty then method read_replica_stream is executed to get the rows from
the mysql replica stored into the PostgreSQL database.
When the method exits the replica_data list is decomposed in the master_data (log name, position and last event's timestamp).
If the flag close_batch is set then the master status is saved in PostgreSQL the batch id returned by the method is
is saved in the class variable id_batch.
This variable is used to determine whether the old batch should be closed or not.
If the variable is not empty then the previous batch gets closed with a simple update of the processed flag.
"""
skip = self.__init_read_replica()
if skip:
self.logger.warning("Couldn't connect to the source database for reading the replica. Ignoring.")
else:
self.pg_engine.set_source_status("running")
replica_paused = self.pg_engine.get_replica_paused()
if replica_paused:
self.logger.info("Read replica is paused")
self.pg_engine.set_read_paused(True)
else:
self.pg_engine.set_read_paused(False)
batch_data = self.pg_engine.get_batch_data()
if len(batch_data)>0:
id_batch=batch_data[0][0]
self.logger.debug("Batch data %s " % (batch_data))
replica_data=self.__read_replica_stream(batch_data)
master_data=replica_data[0]
close_batch=replica_data[1]
if "gtid" in master_data:
master_data["Executed_Gtid_Set"] = self.__build_gtid_set(master_data["gtid"])
else:
master_data["Executed_Gtid_Set"] = ""
if close_batch:
self.master_status=[master_data]
self.logger.debug("trying to save the master data...")
next_id_batch=self.pg_engine.save_master_status(self.master_status)
if next_id_batch:
self.logger.debug("new batch created, saving id_batch %s in class variable" % (id_batch))
self.id_batch=id_batch
else:
self.logger.debug("batch not saved. using old id_batch %s" % (self.id_batch))
if self.id_batch:
self.logger.debug("updating processed flag for id_batch %s", (id_batch))
self.pg_engine.set_batch_processed(id_batch)
self.id_batch=None
self.pg_engine.keep_existing_schema = self.keep_existing_schema
self.pg_engine.check_source_consistent()
self.disconnect_db_buffered()
[docs] def init_replica(self):
"""
The method performs a full init replica for the given source
"""
self.logger.debug("starting init replica for source %s" % self.source)
self.__init_sync()
self.__check_mysql_config()
master_start = self.get_master_coordinates()
self.pg_engine.set_source_status("initialising")
self.pg_engine.cleanup_source_tables()
self.schema_list = [schema for schema in self.schema_mappings]
self.__build_table_exceptions()
self.get_table_list()
self.create_destination_schemas()
try:
self.pg_engine.insert_source_timings()
self.pg_engine.schema_loading = self.schema_loading
if self.keep_existing_schema:
self.disconnect_db_buffered()
self.__copy_tables()
else:
self.create_destination_tables()
self.disconnect_db_buffered()
self.__copy_tables()
self.pg_engine.grant_select()
self.pg_engine.swap_schemas()
self.drop_loading_schemas()
self.pg_engine.clean_batch_data()
self.pg_engine.save_master_status(master_start)
self.pg_engine.set_source_status("initialised")
self.connect_db_buffered()
master_end = self.get_master_coordinates()
self.disconnect_db_buffered()
self.pg_engine.set_source_highwatermark(master_end, consistent=False)
notifier_message = "init replica for source %s is complete" % self.source
self.notifier.send_message(notifier_message, 'info')
self.logger.info(notifier_message)
except:
if not self.keep_existing_schema:
self.drop_loading_schemas()
self.pg_engine.set_source_status("error")
notifier_message = "init replica for source %s failed" % self.source
self.logger.critical(notifier_message)
self.notifier.send_message(notifier_message, 'critical')
raise