import time
import sys
import io
import pymysql
import codecs
import binascii
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import QueryEvent, GtidEvent, HeartbeatLogEvent
from pymysqlreplication.row_event import DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent
from pymysqlreplication.event import RotateEvent
from pg_chameleon import sql_token
from os import remove
import re
[docs]class mysql_source(object):
    def __init__(self):
        """
            Class constructor, the method sets the class variables and configure the
            operating parameters from the args provided t the class.
        """
        self.statement_skip = ['BEGIN', 'COMMIT']
        self.schema_tables = {}
        self.schema_mappings = {}
        self.schema_loading = {}
        self.schema_list = []
        self.hexify_always = ['blob', 'tinyblob', 'mediumblob','longblob','binary','varbinary']
        self.spatial_datatypes = ['point','geometry','linestring','polygon', 'multipoint', 'multilinestring', 'geometrycollection']
        self.schema_only = {}
        self.gtid_mode = False
        self.gtid_enable = False
        self.copy_table_data = True
    def __del__(self):
        """
            Class destructor, tries to disconnect the mysql connection.
        """
        self.disconnect_db_unbuffered()
        self.disconnect_db_buffered()
    def __check_mysql_config(self):
        """
            The method check if the mysql configuration is compatible with the replica requirements.
            If all the configuration requirements are met then the return value is True.
            Otherwise is false.
            The parameters checked are:
            log_bin - ON if the binary log is enabled if we are on a vanilla mysql
            binlog_format - must be ROW , otherwise the replica won't get the data
            binlog_row_image - must be FULL, otherwise the row image will be incomplete
            The method checks the function AURORA_VERSION() and if the query doesn't error then skips the log_bin parameter check.
        """
        sql_aurora = """select AURORA_VERSION() ver;"""
        try:
            self.cursor_buffered.execute(sql_aurora)
            skip_log_bin_check = True
        except:
            skip_log_bin_check = False
        if self.gtid_enable:
            sql_log_bin = """SHOW GLOBAL VARIABLES LIKE 'gtid_mode';"""
            self.cursor_buffered.execute(sql_log_bin)
            variable_check = self.cursor_buffered.fetchone()
            if variable_check:
                gtid_mode = variable_check["Value"]
                if gtid_mode.upper() == 'ON':
                    self.gtid_mode = True
                    sql_uuid = """SHOW SLAVE STATUS;"""
                    self.cursor_buffered.execute(sql_uuid)
                    slave_status = self.cursor_buffered.fetchall()
                    if len(slave_status)>0:
                        gtid_set=slave_status[0]["Retrieved_Gtid_Set"]
                    else:
                        sql_uuid = """SHOW GLOBAL VARIABLES LIKE 'server_uuid';"""
                        self.cursor_buffered.execute(sql_uuid)
                        server_uuid = self.cursor_buffered.fetchone()
                        gtid_set = server_uuid["Value"]
                    self.gtid_uuid = gtid_set.split(':')[0]
            else:
                self.gtid_mode = False
        else:
            self.gtid_mode = False
        sql_log_bin = """SHOW GLOBAL VARIABLES LIKE 'log_bin';"""
        self.cursor_buffered.execute(sql_log_bin)
        variable_check = self.cursor_buffered.fetchone()
        log_bin = variable_check["Value"]
        sql_log_bin = """SHOW GLOBAL VARIABLES LIKE 'binlog_format';"""
        self.cursor_buffered.execute(sql_log_bin)
        variable_check = self.cursor_buffered.fetchone()
        binlog_format = variable_check["Value"]
        sql_log_bin = """SHOW GLOBAL VARIABLES LIKE 'binlog_row_image';"""
        self.cursor_buffered.execute(sql_log_bin)
        variable_check = self.cursor_buffered.fetchone()
        if variable_check:
            binlog_row_image = variable_check["Value"]
        else:
            binlog_row_image = 'FULL'
        if (log_bin.upper() == 'ON' or skip_log_bin_check) and binlog_format.upper() == 'ROW' and binlog_row_image.upper() == 'FULL':
            self.replica_possible = True
        else:
            self.replica_possible = False
            self.pg_engine.set_source_status("error")
            self.logger.error("The MySQL configuration does not allow the replica. Exiting now")
            self.logger.error("Source settings - log_bin %s, binlog_format %s, binlog_row_image %s" % (log_bin.upper(),  binlog_format.upper(), binlog_row_image.upper() ))
            self.logger.error("Mandatory settings - log_bin ON, binlog_format ROW, binlog_row_image FULL (only for MySQL 5.6+) ")
            sys.exit()
[docs]    def connect_db_buffered(self):
        """
            The method creates a new connection to the mysql database.
            The connection is made using the dictionary type cursor factory, which is buffered.
        """
        db_conn = self.source_config["db_conn"]
        db_conn = {key:str(value) for key, value in db_conn.items()}
        db_conn["port"] = int(db_conn["port"])
        db_conn["connect_timeout"] = int(db_conn["connect_timeout"])
        self.conn_buffered=pymysql.connect(
            host = db_conn["host"],
            user = db_conn["user"],
            port = db_conn["port"],
            password = db_conn["password"],
            charset = db_conn["charset"],
            connect_timeout = db_conn["connect_timeout"],
            cursorclass=pymysql.cursors.DictCursor
        )
        self.charset = db_conn["charset"]
        self.cursor_buffered = self.conn_buffered.cursor()
        self.cursor_buffered_fallback = self.conn_buffered.cursor() 
[docs]    def disconnect_db_buffered(self):
        """
            The method disconnects any connection  with dictionary type cursor from the mysql database.
        """
        try:
            self.conn_buffered.close()
        except:
            pass 
[docs]    def connect_db_unbuffered(self):
        """
            The method creates a new connection to the mysql database.
            The connection is made using the unbuffered cursor factory.
        """
        db_conn = self.source_config["db_conn"]
        db_conn = {key:str(value) for key, value in db_conn.items()}
        db_conn["port"] = int(db_conn["port"])
        db_conn["connect_timeout"] = int(db_conn["connect_timeout"])
        self.conn_unbuffered=pymysql.connect(
            host = db_conn["host"],
            user = db_conn["user"],
            port = db_conn["port"],
            password = db_conn["password"],
            charset = db_conn["charset"],
            connect_timeout = db_conn["connect_timeout"],
            cursorclass=pymysql.cursors.SSCursor
        )
        self.charset = db_conn["charset"]
        self.cursor_unbuffered = self.conn_unbuffered.cursor() 
[docs]    def disconnect_db_unbuffered(self):
        """
            The method disconnects any unbuffered connection from the mysql database.
        """
        try:
            self.conn_unbuffered.close()
        except:
            pass 
    def __build_skip_events(self):
        """
            The method builds a class attribute self.skip_events. The attribute is a dictionary with the tables and schemas listed under the three kind of skippable events  (insert,delete,update) using
            the configuration parameter skip_events.
        """
        self.skip_events = None
        if "skip_events" in  self.source_config:
            skip_events  = self.source_config["skip_events"]
            self.skip_events = {}
            if "insert" in skip_events:
                self.skip_events["insert"] = skip_events["insert"]
            else:
                self.skip_events["insert"] = []
            if "update" in skip_events:
                self.skip_events["update"] = skip_events["update"]
            else:
                self.skip_events["update"] = []
            if "delete" in skip_events:
                self.skip_events["delete"] = skip_events["delete"]
            else:
                self.skip_events["delete"] = []
    def __build_table_exceptions(self):
        """
            The method builds two dictionaries from the limit_tables and skip tables values set for the source.
            The dictionaries are intended to be used in the get_table_list to cleanup the list of tables per schema.
            The method manages the particular case of when the class variable self.tables is set.
            In that case only the specified tables in self.tables will be synced. Should limit_tables be already
            set, then the resulting list is the intersection of self.tables and limit_tables.
        """
        self.limit_tables = {}
        self.skip_tables = {}
        limit_tables = self.source_config["limit_tables"]
        skip_tables = self.source_config["skip_tables"]
        if self.tables !='*':
            tables = [table.strip() for table in self.tables.split(',')]
            if limit_tables:
                limit_schemas = [table.split('.')[0] for table in limit_tables]
                limit_tables = [table for table in tables if table in limit_tables or table.split('.')[0] not in limit_schemas]
            else:
                limit_tables = tables
            self.schema_only = {table.split('.')[0] for table in limit_tables}
        if limit_tables:
            table_limit = [table.split('.') for table in limit_tables]
            for table_list in table_limit:
                list_exclude = []
                try:
                    list_exclude = self.limit_tables[table_list[0]]
                    list_exclude.append(table_list[1])
                except KeyError:
                    try:
                        list_exclude.append(table_list[1])
                    except IndexError:
                        pass
                self.limit_tables[table_list[0]]  = list_exclude
        if skip_tables:
            table_skip = [table.split('.') for table in skip_tables]
            for table_list in table_skip:
                list_exclude = []
                try:
                    list_exclude = self.skip_tables[table_list[0]]
                    list_exclude.append(table_list[1])
                except KeyError:
                    try:
                        list_exclude.append(table_list[1])
                    except:
                        pass
                self.skip_tables[table_list[0]]  = list_exclude
[docs]    def get_table_list(self):
        """
            The method pulls the table list from the information_schema.
            The list is stored in a dictionary  which key is the table's schema.
        """
        sql_tables="""
            SELECT
                table_name as table_name
            FROM
                information_schema.TABLES
            WHERE
                    table_type='BASE TABLE'
                AND table_schema=%s
            ;
        """
        for schema in self.schema_list:
            self.cursor_buffered.execute(sql_tables, (schema))
            table_list = [table["table_name"] for table in self.cursor_buffered.fetchall()]
            try:
                limit_tables = self.limit_tables[schema]
                if len(limit_tables) > 0:
                    table_list = [table for table in table_list if table in limit_tables]
            except KeyError:
                pass
            try:
                skip_tables = self.skip_tables[schema]
                if len(skip_tables) > 0:
                    table_list = [table for table in table_list if table not in skip_tables]
            except KeyError:
                pass
            self.schema_tables[schema] = table_list 
[docs]    def create_destination_schemas(self):
        """
            Creates the loading schemas in the destination database and associated tables listed in the dictionary
            self.schema_tables.
            The method builds a dictionary which associates the destination schema to the loading schema.
            The loading_schema is named after the destination schema plus with the prefix _ and the _tmp suffix.
            As postgresql allows, by default up to 64  characters for an identifier, the original schema is truncated to 59 characters,
            in order to fit the maximum identifier's length.
            The mappings are stored in the class dictionary schema_loading.
            If the source parameter keep_existing_schema is set to true the method doesn't create the schemas.
            Instead assumes the schema and the tables are already there.
        """
        if self.keep_existing_schema:
            self.logger.debug("Keep existing schema is set to True. Skipping the schema creation." )
            for schema in self.schema_list:
                destination_schema = self.schema_mappings[schema]
                self.schema_loading[schema] = {'destination':destination_schema, 'loading':destination_schema}
        else:
            for schema in self.schema_list:
                destination_schema = self.schema_mappings[schema]
                loading_schema = "_%s_tmp" % destination_schema[0:59]
                self.schema_loading[schema] = {'destination':destination_schema, 'loading':loading_schema}
                self.logger.debug("Creating the loading schema %s." % loading_schema)
                self.pg_engine.create_database_schema(loading_schema)
                self.logger.debug("Creating the destination schema %s." % destination_schema)
                self.pg_engine.create_database_schema(destination_schema) 
[docs]    def drop_loading_schemas(self):
        """
            The method drops the loading schemas from the destination database.
            The drop is performed on the schemas generated in create_destination_schemas.
            The method assumes the class dictionary schema_loading is correctly set.
        """
        for schema in self.schema_loading:
            loading_schema = self.schema_loading[schema]["loading"]
            self.logger.debug("Dropping the schema %s." % loading_schema)
            self.pg_engine.drop_database_schema(loading_schema, True) 
[docs]    def create_destination_tables(self):
        """
            The method creates the destination tables in the loading schema.
            The tables names are looped using the values stored in the class dictionary schema_tables.
        """
        for schema in self.schema_tables:
            table_list = self.schema_tables[schema]
            for table in table_list:
                table_metadata = self.get_table_metadata(table, schema)
                self.pg_engine.create_table(table_metadata, table, schema, 'mysql') 
[docs]    def generate_select_statements(self, schema, table):
        """
            The generates the csv output and the statements output for the given schema and table.
            The method assumes there is a buffered database connection active.
            :param schema: the origin's schema
            :param table: the table name
            :return: the select list statements for the copy to csv and  the fallback to inserts.
            :rtype: dictionary
        """
        select_columns = {}
        sql_select="""
            SELECT
                CASE
                    WHEN
                        data_type IN ('"""+"','".join(self.hexify)+"""')
                    THEN
                        concat('hex(',column_name,')')
                    WHEN
                        data_type IN ('bit')
                    THEN
                        concat('cast(`',column_name,'` AS unsigned)')
                    WHEN
                        data_type IN ('datetime','timestamp','date')
                    THEN
                        concat('nullif(`',column_name,'`,cast("0000-00-00 00:00:00" as date))')
                    WHEN
                        data_type IN ('"""+"','".join(self.spatial_datatypes)+"""')
                    THEN
                        concat('ST_AsText(',column_name,')')
                ELSE
                    concat('cast(`',column_name,'` AS char CHARACTER SET """+ self.charset +""")')
                END
                AS select_csv,
                CASE
                    WHEN
                        data_type IN ('"""+"','".join(self.hexify)+"""')
                    THEN
                        concat('hex(',column_name,') AS','`',column_name,'`')
                    WHEN
                        data_type IN ('bit')
                    THEN
                        concat('cast(`',column_name,'` AS unsigned) AS','`',column_name,'`')
                    WHEN
                        data_type IN ('datetime','timestamp','date')
                    THEN
                        concat('nullif(`',column_name,'`,cast("0000-00-00 00:00:00" as date)) AS `',column_name,'`')
                    WHEN
                        data_type IN ('"""+"','".join(self.spatial_datatypes)+"""')
                    THEN
                        concat('ST_AsText(',column_name,') AS','`',column_name,'`')
                ELSE
                    concat('cast(`',column_name,'` AS char CHARACTER SET """+ self.charset +""") AS','`',column_name,'`')
                END
                AS select_stat,
                column_name as column_name
            FROM
                information_schema.COLUMNS
            WHERE
                table_schema=%s
                AND 	table_name=%s
            ORDER BY
                ordinal_position
            ;
        """
        self.cursor_buffered.execute(sql_select, (schema, table))
        select_data = self.cursor_buffered.fetchall()
        select_csv = ["COALESCE(REPLACE(%s, '\"', '\"\"'),'NULL') " % statement["select_csv"] for statement in select_data]
        select_stat = [statement["select_csv"] for statement in select_data]
        column_list = ['"%s"' % statement["column_name"] for statement in select_data]
        select_columns["select_csv"] = "REPLACE(CONCAT('\"',CONCAT_WS('\",\"',%s),'\"'),'\"NULL\"','NULL')" % ','.join(select_csv)
        select_columns["select_stat"]  = ','.join(select_stat)
        select_columns["column_list"]  = ','.join(column_list)
        return select_columns 
[docs]    def begin_tx(self):
        """
            The method sets the isolation level to repeatable read and begins a transaction
        """
        self.logger.debug("set isolation level")
        self.cursor_unbuffered.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
        self.logger.debug("beginning transaction")
        self.cursor_unbuffered.execute("BEGIN") 
[docs]    def end_tx(self):
        """
            The method ends the current transaction by rollback
            - We should never have changed source anyway
        """
        self.logger.debug("rolling back")
        self.cursor_unbuffered.execute("ROLLBACK") 
[docs]    def make_tx_snapshot(self, schema, table):
        """
            The method forces creation of transaction snapshot by making a read of
            one row from the source table and discarding it
        """
        self.logger.debug("reading and discarding 1 row from `%s`.`%s`" % (schema, table))
        self.cursor_unbuffered.execute("SELECT * FROM `%s`.`%s` LIMIT 1" % (schema, table)) 
[docs]    def lock_table(self, schema, table):
        """
            The method flushes the given table with read lock.
            The method assumes there is a database connection active.
            :param schema: the origin's schema
            :param table: the table name
        """
        self.logger.debug("locking the table `%s`.`%s`" % (schema, table) )
        sql_lock = "FLUSH TABLES `%s`.`%s` WITH READ LOCK;" %(schema, table)
        self.logger.debug("collecting the master's coordinates for table `%s`.`%s`" % (schema, table) )
        self.cursor_buffered.execute(sql_lock) 
[docs]    def unlock_tables(self):
        """
            The method unlocks all tables
        """
        self.logger.debug("unlocking the tables")
        sql_unlock = "UNLOCK TABLES;"
        self.cursor_buffered.execute(sql_unlock) 
[docs]    def get_master_coordinates(self):
        """
            The method gets the master's coordinates and return them stored in a dictionary.
            The method assumes there is a database connection active.
            :return: the master's log coordinates for the given table
            :rtype: dictionary
        """
        if not self.conn_buffered.open:
            self.connect_db_buffered()
        sql_master = "SHOW MASTER STATUS;"
        self.cursor_buffered.execute(sql_master)
        master_status = self.cursor_buffered.fetchall()
        return master_status 
[docs]    def copy_data(self, schema, table):
        """
            The method copy the data between the origin and destination table.
            The method locks the table read only mode and  gets the log coordinates which are returned to the calling method.
            :param schema: the origin's schema
            :param table: the table name
            :return: the log coordinates for the given table
            :rtype: dictionary
        """
        slice_insert = []
        loading_schema = self.schema_loading[schema]["loading"]
        self.connect_db_buffered()
        self.logger.debug("estimating rows in %s.%s" % (schema , table))
        sql_rows = """
            SELECT
                table_rows as table_rows,
                CASE
                    WHEN avg_row_length>0
                    then
                        round(({}/avg_row_length))
                ELSE
                    0
                END as copy_limit,
                transactions
            FROM
                information_schema.TABLES,
                information_schema.ENGINES
            WHERE
                    table_schema=%s
                AND	table_type='BASE TABLE'
                AND table_name=%s
                AND TABLES.engine = ENGINES.engine
            ;
        """
        sql_rows = sql_rows.format(self.copy_max_memory)
        self.cursor_buffered.execute(sql_rows, (schema, table))
        count_rows = self.cursor_buffered.fetchone()
        total_rows = count_rows["table_rows"]
        copy_limit = int(count_rows["copy_limit"])
        table_txs = count_rows["transactions"] == "YES"
        if copy_limit == 0:
            copy_limit = 1000000
        num_slices = int(total_rows//copy_limit)
        range_slices = list(range(num_slices+1))
        total_slices = len(range_slices)
        slice = range_slices[0]
        self.logger.debug("The table %s.%s will be copied in %s  estimated slice(s) of %s rows, using a transaction %s"  % (schema, table, total_slices, copy_limit, table_txs))
        out_file = '%s/%s_%s.csv' % (self.out_dir, schema, table )
        self.lock_table(schema, table)
        master_status = self.get_master_coordinates()
        select_columns = self.generate_select_statements(schema, table)
        csv_data = ""
        sql_csv = "SELECT %s as data FROM `%s`.`%s`;" % (select_columns["select_csv"], schema, table)
        column_list = select_columns["column_list"]
        self.logger.debug("Executing query for table %s.%s"  % (schema, table ))
        self.connect_db_unbuffered()
        if table_txs:
            self.begin_tx()
        self.cursor_unbuffered.execute(sql_csv)
        if table_txs:
            self.unlock_tables()
        while True:
            csv_results = self.cursor_unbuffered.fetchmany(copy_limit)
            if len(csv_results) == 0:
                break
            csv_data="\n".join(d[0] for d in csv_results )
            if self.copy_mode == 'direct':
                csv_file = io.StringIO()
                csv_file.write(csv_data)
                csv_file.seek(0)
            if self.copy_mode == 'file':
                csv_file = codecs.open(out_file, 'wb', self.charset)
                csv_file.write(csv_data)
                csv_file.close()
                csv_file = open(out_file, 'rb')
            try:
                self.pg_engine.copy_data(csv_file, loading_schema, table, column_list)
            except:
                self.logger.info("Table %s.%s error in PostgreSQL copy, saving slice number for the fallback to insert statements " %  (loading_schema, table ))
                slice_insert.append(slice)
            self.print_progress(slice+1,total_slices, schema, table)
            slice+=1
            csv_file.close()
        if len(slice_insert)>0:
            ins_arg={}
            ins_arg["slice_insert"] = slice_insert
            ins_arg["table"] = table
            ins_arg["schema"] = schema
            ins_arg["select_stat"] = select_columns["select_stat"]
            ins_arg["column_list"] = column_list
            ins_arg["copy_limit"] = copy_limit
            self.insert_table_data(ins_arg)
        if table_txs:
            self.end_tx()
        else:
            self.unlock_tables()
        self.cursor_unbuffered.close()
        self.disconnect_db_unbuffered()
        self.disconnect_db_buffered()
        try:
            remove(out_file)
        except:
            pass
        return master_status 
[docs]    def insert_table_data(self, ins_arg):
        """
            This method is a fallback procedure whether copy_table_data fails.
            The ins_args is a list with the informations required to run the select for building the insert
            statements and the slices's start and stop.
            The process is performed in memory and can take a very long time to complete.
            :param pg_engine: the postgresql engine
            :param ins_arg: the list with the insert arguments (slice_insert, schema, table, select_stat,column_list, copy_limit)
        """
        slice_insert= ins_arg["slice_insert"]
        table = ins_arg["table"]
        schema = ins_arg["schema"]
        select_stat = ins_arg["select_stat"]
        column_list = ins_arg["column_list"]
        copy_limit = ins_arg["copy_limit"]
        self.connect_db_unbuffered()
        loading_schema = self.schema_loading[schema]["loading"]
        num_insert = 1
        for slice in slice_insert:
            self.logger.info("Executing inserts in %s.%s. Slice %s. Rows per slice %s." %  (loading_schema, table, num_insert, copy_limit ,   ))
            offset = slice*copy_limit
            sql_fallback = "SELECT %s FROM `%s`.`%s` LIMIT %s, %s;" % (select_stat, schema, table, offset, copy_limit)
            self.cursor_unbuffered.execute(sql_fallback)
            insert_data =  self.cursor_unbuffered.fetchall()
            self.pg_engine.insert_data(loading_schema, table, insert_data , column_list)
            num_insert +=1 
[docs]    def print_progress (self, iteration, total, schema, table):
        """
            Print the copy progress in slices and estimated total slices.
            In order to reduce noise when the log level is info only the tables copied in multiple slices
            get the print progress.
            :param iteration: The slice number currently processed
            :param total: The estimated total slices
            :param table_name: The table name
        """
        if iteration>=total:
            total = iteration
        if total>1:
            self.logger.info("Table %s.%s copied %s slice of %s" % (schema, table, iteration, total))
        else:
            self.logger.debug("Table %s.%s copied %s slice of %s" % (schema, table, iteration, total)) 
    def __create_indices(self, schema, table):
        """
            The method copy the data between the origin and destination table.
            The method locks the table read only mode and  gets the log coordinates which are returned to the calling method.
            :param schema: the origin's schema
            :param table: the table name
            :return: the table and schema name with the primary key.
            :rtype: dictionary
        """
        loading_schema = self.schema_loading[schema]["loading"]
        self.connect_db_buffered()
        self.logger.debug("Creating indices on table %s.%s " % (schema, table))
        sql_index = """
            SELECT
            CASE WHEN index_name='PRIMARY'
            THEN
                index_name
            WHEN (
                    SELECT
                        count(1)
                    FROM
                        information_schema.statistics s
                    WHERE
                             s.index_name=t.index_name
                        AND  s.table_schema=t.table_schema
                )>1
            THEN
                concat(substring(index_name,1,59),'_',SUBSTRING(md5(uuid()),1,4))
            ELSE
                index_name
            END AS index_name,
            non_unique as non_unique,
            GROUP_CONCAT(column_name ORDER BY seq_in_index) as index_columns
            FROM
                information_schema.statistics t
            WHERE
                    table_schema=%s
                AND 	table_name=%s
                AND	index_type = 'BTREE'
            GROUP BY
                table_name,
                non_unique,
                index_name
            ;
        """
        self.cursor_buffered.execute(sql_index, (schema, table))
        index_data = self.cursor_buffered.fetchall()
        table_pkey = self.pg_engine.create_indices(loading_schema, table, index_data)
        self.disconnect_db_buffered()
        return table_pkey
    def __copy_tables(self):
        """
            The method copies the data between tables, from the mysql schema to the corresponding
            postgresql loading schema. Before the copy starts the table is locked and then the lock is released.
            If keep_existing_schema is true for the source then the tables are truncated before the copy,
            the indices are left in place and a REINDEX TABLE is executed after the copy.
        """
        for schema in self.schema_tables:
            loading_schema = self.schema_loading[schema]["loading"]
            destination_schema = self.schema_loading[schema]["destination"]
            table_list = self.schema_tables[schema]
            for table in table_list:
                self.logger.info("Copying the source table %s into %s.%s" %(table, loading_schema, table) )
                try:
                    if self.keep_existing_schema:
                        table_pkey = self.pg_engine.get_existing_pkey(destination_schema,table)
                        self.logger.info("Collecting constraints and indices from the destination table  %s.%s" %(destination_schema, table) )
                        self.pg_engine.collect_idx_cons(destination_schema,table)
                        self.logger.info("Removing constraints and indices from the destination table  %s.%s" %(destination_schema, table) )
                        self.pg_engine.cleanup_idx_cons(destination_schema,table)
                        self.logger.info("Truncating the table  %s.%s" %(destination_schema, table) )
                        self.pg_engine.truncate_table(destination_schema,table)
                        master_status = self.copy_data(schema, table)
                    else:
                        if self.copy_table_data:
                            master_status = self.copy_data(schema, table)
                        else:
                            master_status = self.get_master_coordinates()
                        table_pkey = self.__create_indices(schema, table)
                    self.pg_engine.store_table(destination_schema, table, table_pkey, master_status)
                    if self.keep_existing_schema:
                        #input("Press Enter to continue...")
                        self.logger.info("Adding constraint and indices to the destination table  %s.%s" %(destination_schema, table) )
                        self.pg_engine.create_idx_cons(destination_schema,table)
                except:
                    self.logger.info("Could not copy the table %s. Excluding it from the replica." %(table) )
                    raise
[docs]    def set_copy_max_memory(self):
        """
            The method sets the class variable self.copy_max_memory using the value stored in the
            source setting.
        """
        copy_max_memory = str(self.source_config["copy_max_memory"])[:-1]
        copy_scale = str(self.source_config["copy_max_memory"])[-1]
        try:
            int(copy_scale)
            copy_max_memory = self.source_config["copy_max_memory"]
        except:
            if copy_scale =='k':
                copy_max_memory = str(int(copy_max_memory)*1024)
            elif copy_scale =='M':
                copy_max_memory = str(int(copy_max_memory)*1024*1024)
            elif copy_scale =='G':
                copy_max_memory = str(int(copy_max_memory)*1024*1024*1024)
            else:
                print("**FATAL - invalid suffix in parameter copy_max_memory  (accepted values are (k)ilobytes, (M)egabytes, (G)igabytes.")
                sys.exit(3)
        self.copy_max_memory = copy_max_memory 
    def __init_read_replica(self):
        """
            The method calls the pre-steps required by the read replica method.
        """
        self.replica_conn = {}
        self.source_config = self.sources[self.source]
        try:
            exit_on_error = True if self.source_config["on_error_read"]=='exit' else False
        except KeyError:
            exit_on_error = True
        self.my_server_id = self.source_config["my_server_id"]
        self.limit_tables = self.source_config["limit_tables"]
        self.skip_tables = self.source_config["skip_tables"]
        self.replica_batch_size = self.source_config["replica_batch_size"]
        self.sleep_loop = self.source_config["sleep_loop"]
        self.postgis_present = self.pg_engine.check_postgis()
        if self.postgis_present:
            self.hexify = self.hexify_always
        else:
            self.hexify = self.hexify_always + self.spatial_datatypes
        try:
            self.connect_db_buffered()
        except:
            if exit_on_error:
                raise
            else:
                return "skip"
        self.pg_engine.connect_db()
        self.schema_mappings = self.pg_engine.get_schema_mappings()
        self.schema_replica = [schema for schema in self.schema_mappings]
        db_conn = self.source_config["db_conn"]
        self.replica_conn["host"] = str(db_conn["host"])
        self.replica_conn["user"] = str(db_conn["user"])
        self.replica_conn["passwd"] = str(db_conn["password"])
        self.replica_conn["port"] = int(db_conn["port"])
        self.__build_table_exceptions()
        self.__build_skip_events()
        self.__check_mysql_config()
        if self.gtid_mode:
            master_data = self.get_master_coordinates()
            self.start_xid = master_data[0]["Executed_Gtid_Set"].split(':')[1].split('-')[0]
    def __init_sync(self):
        """
            The method calls the common steps required to initialise the database connections and
            class attributes within sync_tables,refresh_schema and init_replica.
        """
        try:
            self.source_config = self.sources[self.source]
        except KeyError:
            self.logger.error("The source %s doesn't exists " % (self.source))
            sys.exit()
        self.out_dir = self.source_config["out_dir"]
        self.copy_mode = self.source_config["copy_mode"]
        self.pg_engine.lock_timeout = self.source_config["lock_timeout"]
        self.pg_engine.grant_select_to = self.source_config["grant_select_to"]
        if "keep_existing_schema" in self.sources[self.source]:
            self.keep_existing_schema = self.sources[self.source]["keep_existing_schema"]
        else:
            self.keep_existing_schema = False
        self.set_copy_max_memory()
        self.postgis_present = self.pg_engine.check_postgis()
        if self.postgis_present:
            self.hexify = self.hexify_always
        else:
            self.hexify = self.hexify_always + self.spatial_datatypes
        self.connect_db_buffered()
        self.pg_engine.connect_db()
        self.schema_mappings = self.pg_engine.get_schema_mappings()
        self.pg_engine.schema_tables = self.schema_tables
[docs]    def refresh_schema(self):
        """
            The method performs a sync for an entire schema within a source.
            The method works in a similar way like init_replica.
            The swap happens in a single transaction.
        """
        self.logger.debug("starting sync schema for source %s" % self.source)
        self.logger.debug("The schema affected is %s" % self.schema)
        self.__init_sync()
        self.__check_mysql_config()
        self.pg_engine.set_source_status("syncing")
        self.__build_table_exceptions()
        self.schema_list = [self.schema]
        self.get_table_list()
        self.create_destination_schemas()
        try:
            self.pg_engine.schema_loading = self.schema_loading
            self.pg_engine.schema_tables = self.schema_tables
            if self.keep_existing_schema:
                self.disconnect_db_buffered()
                self.__copy_tables()
            else:
                self.create_destination_tables()
                self.disconnect_db_buffered()
                self.__copy_tables()
                self.pg_engine.grant_select()
                self.pg_engine.swap_schemas()
                self.drop_loading_schemas()
            self.pg_engine.set_source_status("initialised")
            self.connect_db_buffered()
            master_end = self.get_master_coordinates()
            self.disconnect_db_buffered()
            self.pg_engine.set_source_highwatermark(master_end, consistent=False)
            self.pg_engine.cleanup_table_events()
            notifier_message = "refresh schema %s for source %s is complete" % (self.schema, self.source)
            self.notifier.send_message(notifier_message, 'info')
            self.logger.info(notifier_message)
        except:
            if not self.keep_existing_schema:
                self.drop_loading_schemas()
            self.pg_engine.set_source_status("error")
            notifier_message = "refresh schema %s for source %s failed" % (self.schema, self.source)
            self.notifier.send_message(notifier_message, 'critical')
            self.logger.critical(notifier_message)
            raise 
[docs]    def sync_tables(self):
        """
            The method performs a sync for specific tables.
            The method works in a similar way like init_replica except when swapping the relations.
            The tables are loaded into a temporary schema and the log coordinates are stored with the table
            in the replica catalogue. When the load is complete the method drops the existing table and changes the
            schema for the loaded tables to the destination schema.
            The swap happens in a single transaction.
        """
        self.logger.info("Starting sync tables for source %s" % self.source)
        self.__init_sync()
        self.__check_mysql_config()
        self.pg_engine.set_source_status("syncing")
        if self.tables == 'disabled':
            self.tables = self.pg_engine.get_tables_disabled ()
            if not self.tables:
                self.logger.info("There are no disabled tables to sync")
                return
        self.logger.debug("The tables affected are %s" % self.tables)
        self.__build_table_exceptions()
        self.schema_list = [schema for schema in self.schema_mappings if schema in self.schema_only]
        self.get_table_list()
        self.create_destination_schemas()
        try:
            self.pg_engine.schema_loading = self.schema_loading
            self.pg_engine.schema_tables = self.schema_tables
            if self.keep_existing_schema:
                self.disconnect_db_buffered()
                self.__copy_tables()
            else:
                self.create_destination_tables()
                self.disconnect_db_buffered()
                self.__copy_tables()
                self.pg_engine.grant_select()
                self.pg_engine.swap_tables()
                self.drop_loading_schemas()
            self.pg_engine.set_source_status("synced")
            self.connect_db_buffered()
            master_end = self.get_master_coordinates()
            self.disconnect_db_buffered()
            self.pg_engine.set_source_highwatermark(master_end, consistent=False)
            self.pg_engine.cleanup_table_events()
            notifier_message = "the sync for tables %s in source %s is complete" % (self.tables, self.source)
            self.notifier.send_message(notifier_message, 'info')
            self.logger.info(notifier_message)
        except:
            if not self.keep_existing_schema:
                self.drop_loading_schemas()
            self.pg_engine.set_source_status("error")
            notifier_message = "the sync for tables %s in source %s failed" % (self.tables, self.source)
            self.notifier.send_message(notifier_message, 'critical')
            self.logger.critical(notifier_message)
            raise 
    def __get_text_spatial(self,raw_data):
        """
            The method returns the text representation converted in postgresql format
            for the raw data point using the ST_AsText function and the regular expressions
            :param charset: The table's character set
            :param raw_data: The raw_data returned by the mysql-replication library
            :return: text representation converted in postgresql format
            :rtype: text
        """
        decoded_data=binascii.hexlify(raw_data)
        return decoded_data.decode()[8:]
[docs]    def get_table_type_map(self):
        """
            The method builds a dictionary with a key per each schema replicated.
            Each key maps a dictionary with the schema's tables stored as keys and the column/type mappings.
            The dictionary is used in the read_replica method, to determine whether a field requires hexadecimal conversion.
        """
        table_type_map = {}
        table_map = {}
        self.logger.debug("collecting table type map")
        for schema in self.schema_replica:
            sql_tables = """
                SELECT
                    t.table_schema as table_schema,
                    t.table_name as table_name,
                    SUBSTRING_INDEX(t.TABLE_COLLATION,'_',1) as character_set
                FROM
                    information_schema.TABLES t
                WHERE
                        table_type='BASE TABLE'
                    AND	table_schema=%s
                ;
            """
            self.cursor_buffered.execute(sql_tables, (schema, ))
            table_list = self.cursor_buffered.fetchall()
            for table in table_list:
                column_type = {}
                sql_columns = """
                    SELECT
                        column_name as column_name,
                        data_type as data_type
                    FROM
                        information_schema.COLUMNS
                    WHERE
                            table_schema=%s
                        AND table_name=%s
                    ORDER BY
                        ordinal_position
                    ;
                """
                table_charset = table["character_set"]
                self.cursor_buffered.execute(sql_columns, (table["table_schema"], table["table_name"]))
                column_data = self.cursor_buffered.fetchall()
                for column in column_data:
                    column_type[column["column_name"]] = column["data_type"]
                table_dict = {}
                table_dict["table_charset"] = table_charset
                table_dict["column_type"] = column_type
                table_map[table["table_name"]] = table_dict
            table_type_map[schema] = table_map
            table_map = {}
        return table_type_map 
    def __store_binlog_event(self, table, schema):
        """
        The private method returns whether the table event should be stored or not in the postgresql log replica.
        :param table: The table's name to check
        :param schema: The table's schema name
        :return: true if the table should be replicated, false if shouldn't
        :rtype: boolean
        """
        if self.tables_disabled:
            if  "%s.%s" % (schema, table) in self.tables_disabled:
                return False
        if schema in self.skip_tables:
            if table in self.skip_tables[schema]:
                return False
        if schema in self.limit_tables:
            if table in self.limit_tables[schema]:
                return True
            else:
                return False
        return True
    def __skip_event(self, table, schema, binlogevent):
        """
            The method returns true or false if whether the event should be skipped or not.
            The dictionary self.skip_events is used for the check.
            :param table: The table's name to check
            :param schema: The table's schema name
            :param binlogevent: The binlog event to evaluate
            :return: list with first element a boolean and the second element the event type
            :rtype: listt
        """
        if isinstance(binlogevent, DeleteRowsEvent):
            event = "delete"
        elif isinstance(binlogevent, UpdateRowsEvent):
            event = "update"
        elif isinstance(binlogevent, WriteRowsEvent):
            event = "insert"
        skip_event = False
        if self.skip_events:
            if self.skip_events[event]:
                table_name = "%s.%s" % (schema, table)
                if schema in self.skip_events[event] or table_name in self.skip_events[event]:
                    skip_event = True
        return [skip_event, event]
    def __build_gtid_set(self, gtid):
        """
            The method builds a gtid set using the current gtid and
        """
        new_set = None
        gtid_pack = []
        master_data= self.get_master_coordinates()
        if "Executed_Gtid_Set" in master_data[0]:
            gtid_set = master_data[0]["Executed_Gtid_Set"]
            gtid_list = gtid_set.split(",\n")
            for gtid_item in gtid_list:
                if gtid_item.split(':')[0] in gtid:
                    gtid_old = gtid_item.split(':')
                    gtid_new = "%s:%s-%s" % (gtid_old[0],gtid_old[1].split('-')[0],gtid[gtid_old[0]])
                    gtid_pack.append(gtid_new)
                else:
                    gtid_pack.append(gtid_item)
            new_set = ",\n".join(gtid_pack)
        return new_set
    def __decode_dic_keys(self, dic_encoded):
        """
        Private method to recursively decode the dictionary keys  and values into strings.
        This is used fixing the the json data types in the __read_replica_stream method because
        at moment the mysql-replication library returns the keys of the json data types as binary values in python3.
        :param dic_encoded: The dictionary with the encoded keys
        :return: The dictionary with the decoded keys
        :rtype: dictionary
        """
        dic_decoded = {}
        lst_decode = []
        if isinstance(dic_encoded, list):
            for item in dic_encoded:
                lst_decode.append(self.__decode_dic_keys(item))
            return lst_decode
        elif not isinstance(dic_encoded, dict):
            try:
                return dic_encoded.decode("UTF-8")
            except AttributeError:
                return dic_encoded
        else:
            for key, value in dic_encoded.items():
                try:
                    dic_decoded[key.decode("UTF-8")] = self.__decode_dic_keys(value)
                except AttributeError:
                    dic_decoded[key] = self.__decode_dic_keys(value)
        return dic_decoded
    def __read_replica_stream(self, batch_data):
        """
        Stream the replica using the batch data. This method evaluates the different events streamed from MySQL
        and manages them accordingly. The BinLogStreamReader function is called with the only_event parameter which
        restricts the event type received by the streamer.
        The events managed are the following.
        RotateEvent which happens whether mysql restarts or the binary log file changes.
        QueryEvent which happens when a new row image comes in (BEGIN statement) or a DDL is executed.
        The BEGIN is always skipped. The DDL is parsed using the sql_token class.
        [Write,Update,Delete]RowEvents are the row images pulled from the mysql replica.
        The RotateEvent and the QueryEvent cause the batch to be closed.
        The for loop reads the row events, builds the dictionary carrying informations like the destination schema,
        the 	binlog coordinates and store them into the group_insert list.
        When the number of events exceeds the replica_batch_size the group_insert is written into PostgreSQL.
        The batch is not closed in that case and the method exits only if there are no more rows available in the stream.
        Therefore the replica_batch_size is just the maximum size of the single insert and the size of replayed batch on PostgreSQL.
        The binlog switch or a captured DDL determines whether a batch is closed and processed.
        The update row event stores in a separate key event_before the row image before the update. This is required
        to allow updates where the primary key is updated as well.
        Each row event is scanned for data types requiring conversion to hex string.
        :param batch_data: The list with the master's batch data.
        :return: the batch's data composed by binlog name, binlog position and last event timestamp read from the mysql replica stream.
        :rtype: dictionary
        """
        size_insert=0
        sql_tokeniser = sql_token()
        table_type_map = self.get_table_type_map()
        inc_tables = self.pg_engine.get_inconsistent_tables()
        self.tables_disabled = self.pg_engine.get_tables_disabled(format='list')
        close_batch = False
        master_data = {}
        group_insert = []
        next_gtid = {}
        id_batch = batch_data[0][0]
        log_file = batch_data[0][1]
        log_position = batch_data[0][2]
        log_table = batch_data[0][3]
        master_data["log_table"] = log_table
        if self.gtid_mode:
            gtid_position = batch_data[0][4]
            gtid_pack = gtid_position.split(",\n")
            for gtid in gtid_pack:
                gtid  = gtid.split(':')
                next_gtid[gtid [0]]  = gtid [1].split("-")[-1]
                gtid_set = self.__build_gtid_set(next_gtid)
        else:
            gtid_set = None
        stream_connected = False
        my_stream = BinLogStreamReader(
            connection_settings = self.replica_conn,
            server_id = self.my_server_id,
            only_events = [RotateEvent, DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent, QueryEvent, GtidEvent, HeartbeatLogEvent],
            log_file = log_file,
            log_pos = log_position,
            auto_position = gtid_set,
            resume_stream = True,
            only_schemas = self.schema_replica,
            slave_heartbeat = self.sleep_loop,
        )
        if gtid_set:
            self.logger.debug("GTID ENABLED - gtid: %s. id_batch: %s " % (gtid_set, id_batch))
        else:
            self.logger.debug("GTID DISABLED - log_file %s, log_position %s. id_batch: %s " % (log_file, log_position, id_batch))
        for binlogevent in my_stream:
            if isinstance(binlogevent, GtidEvent):
                if close_batch:
                    break
                gtid  = binlogevent.gtid.split(':')
                next_gtid[gtid [0]]  = gtid [1]
                master_data["gtid"] = next_gtid
            elif isinstance(binlogevent, RotateEvent):
                event_time = binlogevent.timestamp
                binlogfile = binlogevent.next_binlog
                position = binlogevent.position
                self.logger.debug("ROTATE EVENT - binlogfile %s, position %s. " % (binlogfile, position))
                if (log_file != binlogfile and stream_connected) or len(group_insert)>0:
                    close_batch = True
                master_data["File"]=binlogfile
                master_data["Position"]=position
                master_data["Time"]=event_time
                master_data["gtid"] = next_gtid
                stream_connected = True
                if close_batch:
                    break
            elif isinstance(binlogevent, HeartbeatLogEvent):
                self.logger.debug("HEARTBEAT EVENT - binlogfile %s " % (binlogevent.ident,))
                if len(group_insert)>0 or log_file != binlogevent.ident:
                    self.logger.debug("WRITING ROWS - binlogfile %s " % (binlogevent.ident,))
                    master_data["File"] = binlogevent.ident
                    close_batch = True
                    break
            elif isinstance(binlogevent, QueryEvent):
                event_time = binlogevent.timestamp
                try:
                    schema_query = binlogevent.schema.decode()
                except:
                    schema_query = binlogevent.schema
                if binlogevent.query.strip().upper() not in self.statement_skip and schema_query in self.schema_mappings:
                    close_batch=True
                    destination_schema = self.schema_mappings[schema_query]
                    log_position = binlogevent.packet.log_pos
                    master_data["File"] = binlogfile
                    master_data["Position"] = log_position
                    master_data["Time"] = event_time
                    master_data["gtid"] = next_gtid
                    if len(group_insert)>0:
                        self.pg_engine.write_batch(group_insert)
                        group_insert=[]
                    self.logger.info("QUERY EVENT - binlogfile %s, position %s.\n--------\n%s\n-------- " % (binlogfile, log_position, binlogevent.query))
                    sql_tokeniser.parse_sql(binlogevent.query)
                    for token in sql_tokeniser.tokenised:
                        write_ddl = True
                        table_name = token["name"]
                        store_query = self.__store_binlog_event(table_name, schema_query)
                        if store_query:
                            table_key_dic = "%s.%s" % (destination_schema, table_name)
                            if table_key_dic in inc_tables:
                                write_ddl = False
                                log_seq = int(log_file.split('.')[1])
                                log_pos = int(log_position)
                                table_dic = inc_tables[table_key_dic]
                                if log_seq > table_dic["log_seq"]:
                                    write_ddl = True
                                elif log_seq == table_dic["log_seq"] and log_pos >= table_dic["log_pos"]:
                                    write_ddl = True
                                if write_ddl:
                                    self.logger.info("CONSISTENT POINT FOR TABLE %s REACHED  - binlogfile %s, position %s" % (table_key_dic, binlogfile, log_position))
                                    self.pg_engine.set_consistent_table(table_name, destination_schema)
                                    inc_tables = self.pg_engine.get_inconsistent_tables()
                            if write_ddl:
                                event_time = binlogevent.timestamp
                                self.logger.debug("TOKEN: %s" % (token))
                                if len(token)>0:
                                    query_data={
                                        "binlog":log_file,
                                        "logpos":log_position,
                                        "schema": destination_schema,
                                        "batch_id":id_batch,
                                        "log_table":log_table
                                    }
                                    self.pg_engine.write_ddl(token, query_data, destination_schema)
                    sql_tokeniser.reset_lists()
                if close_batch:
                    my_stream.close()
                    return [master_data, close_batch]
            else:
                for row in binlogevent.rows:
                    event_after={}
                    event_before={}
                    event_insert = {}
                    add_row = True
                    log_file=binlogfile
                    log_position=binlogevent.packet.log_pos
                    table_name=binlogevent.table
                    event_time=binlogevent.timestamp
                    schema_row = binlogevent.schema
                    destination_schema = self.schema_mappings[schema_row]
                    table_key_dic = "%s.%s" % (destination_schema, table_name)
                    store_row = self.__store_binlog_event(table_name, schema_row)
                    skip_event = self.__skip_event(table_name, schema_row, binlogevent)
                    if store_row and not skip_event[0]:
                        if table_key_dic in inc_tables:
                            table_consistent = False
                            log_seq = int(log_file.split('.')[1])
                            log_pos = int(log_position)
                            table_dic = inc_tables[table_key_dic]
                            if log_seq > table_dic["log_seq"]:
                                table_consistent = True
                            elif log_seq == table_dic["log_seq"] and log_pos >= table_dic["log_pos"]:
                                table_consistent = True
                                self.logger.info("CONSISTENT POINT FOR TABLE %s REACHED  - binlogfile %s, position %s" % (table_key_dic, binlogfile, log_position))
                            if table_consistent:
                                add_row = True
                                self.pg_engine.set_consistent_table(table_name, destination_schema)
                                inc_tables = self.pg_engine.get_inconsistent_tables()
                            else:
                                add_row = False
                        column_map = table_type_map[schema_row][table_name]["column_type"]
                        table_charset = table_type_map[schema_row][table_name]["table_charset"]
                        global_data={
                                            "binlog":log_file,
                                            "logpos":log_position,
                                            "schema": destination_schema,
                                            "table": table_name,
                                            "batch_id":id_batch,
                                            "log_table":log_table,
                                            "event_time":event_time
                                        }
                        if add_row:
                            if skip_event[1] == "delete":
                                global_data["action"] = "delete"
                                event_after=row["values"]
                            elif skip_event[1] == "update":
                                global_data["action"] = "update"
                                event_after=row["after_values"]
                                event_before=row["before_values"]
                            elif skip_event[1] == "insert":
                                global_data["action"] = "insert"
                                event_after=row["values"]
                            for column_name in event_after:
                                try:
                                    column_type=column_map[column_name]
                                except KeyError:
                                    self.logger.debug("Detected inconsistent structure for the table  %s. The replay may fail. " % (table_name))
                                    column_type = 'text'
                                if column_type in self.hexify and event_after[column_name]:
                                    event_after[column_name]=binascii.hexlify(event_after[column_name]).decode()
                                elif column_type in self.hexify and isinstance(event_after[column_name], bytes):
                                    event_after[column_name] = ''
                                elif column_type == 'json':
                                    event_after[column_name] = self.__decode_dic_keys(event_after[column_name])
                                elif column_type in self.spatial_datatypes and event_after[column_name]:
                                    event_after[column_name] = self.__get_text_spatial(event_after[column_name])
                            for column_name in event_before:
                                try:
                                    column_type=column_map[column_name]
                                except KeyError:
                                    self.logger.debug("Detected inconsistent structure for the table  %s. The replay may fail. " % (table_name))
                                    column_type = 'text'
                                if column_type in self.hexify and event_before[column_name]:
                                    event_before[column_name]=binascii.hexlify(event_before[column_name]).decode()
                                elif column_type in self.hexify and isinstance(event_before[column_name], bytes):
                                    event_before[column_name] = ''
                                elif column_type == 'json':
                                    event_before[column_name] = self.__decode_dic_keys(event_after[column_name])
                                elif column_type in self.spatial_datatypes and event_after[column_name]:
                                    event_before[column_name] = self.__get_text_spatial(event_before[column_name])
                            event_insert={"global_data":global_data,"event_after":event_after,  "event_before":event_before}
                            size_insert += len(str(event_insert))
                            group_insert.append(event_insert)
                        master_data["File"]=log_file
                        master_data["Position"]=log_position
                        master_data["Time"]=event_time
                        master_data["gtid"] = next_gtid
                        if len(group_insert)>=self.replica_batch_size:
                            self.logger.info("Max rows per batch reached. Writing %s. rows. Size in bytes: %s " % (len(group_insert), size_insert))
                            self.logger.debug("Master coordinates: %s" % (master_data, ))
                            self.pg_engine.write_batch(group_insert)
                            size_insert=0
                            group_insert=[]
                            close_batch=True
        my_stream.close()
        if len(group_insert)>0:
            self.logger.debug("writing the last %s events" % (len(group_insert), ))
            self.pg_engine.write_batch(group_insert)
            close_batch=True
        return [master_data, close_batch]
[docs]    def read_replica(self):
        """
            The method gets the batch data from PostgreSQL.
            If the batch data is not empty then method read_replica_stream is executed to get the rows from
            the mysql replica stored into the PostgreSQL database.
            When the method exits the replica_data list is decomposed in the master_data (log name, position and last event's timestamp).
            If the flag close_batch is set then the master status is saved in PostgreSQL the batch id  returned by the method is
            is saved in the class variable id_batch.
            This variable is used to determine whether the old batch should be closed or not.
            If the variable is not empty then the previous batch gets closed with a simple update of the processed flag.
        """
        skip = self.__init_read_replica()
        if skip:
            self.logger.warning("Couldn't connect to the source database for reading the replica. Ignoring.")
        else:
            self.pg_engine.set_source_status("running")
            replica_paused = self.pg_engine.get_replica_paused()
            if replica_paused:
                self.logger.info("Read replica is paused")
                self.pg_engine.set_read_paused(True)
            else:
                self.pg_engine.set_read_paused(False)
                batch_data = self.pg_engine.get_batch_data()
                if len(batch_data)>0:
                    id_batch=batch_data[0][0]
                    self.logger.debug("Batch data %s " % (batch_data))
                    replica_data=self.__read_replica_stream(batch_data)
                    master_data=replica_data[0]
                    close_batch=replica_data[1]
                    if "gtid" in master_data:
                        master_data["Executed_Gtid_Set"] = self.__build_gtid_set(master_data["gtid"])
                    else:
                        master_data["Executed_Gtid_Set"] = ""
                    if close_batch:
                        self.master_status=[master_data]
                        self.logger.debug("trying to save the master data...")
                        next_id_batch=self.pg_engine.save_master_status(self.master_status)
                        if next_id_batch:
                            self.logger.debug("new batch created, saving id_batch %s in class variable" % (id_batch))
                            self.id_batch=id_batch
                        else:
                            self.logger.debug("batch not saved. using old id_batch %s" % (self.id_batch))
                        if self.id_batch:
                            self.logger.debug("updating processed flag for id_batch %s", (id_batch))
                            self.pg_engine.set_batch_processed(id_batch)
                            self.id_batch=None
                self.pg_engine.keep_existing_schema = self.keep_existing_schema
                self.pg_engine.check_source_consistent()
            self.disconnect_db_buffered() 
[docs]    def init_replica(self):
        """
            The method performs a full init replica for the given source
        """
        self.logger.debug("starting init replica for source %s" % self.source)
        self.__init_sync()
        self.__check_mysql_config()
        master_start = self.get_master_coordinates()
        self.pg_engine.set_source_status("initialising")
        self.pg_engine.cleanup_source_tables()
        self.schema_list = [schema for schema in self.schema_mappings]
        self.__build_table_exceptions()
        self.get_table_list()
        self.create_destination_schemas()
        try:
            self.pg_engine.insert_source_timings()
            self.pg_engine.schema_loading = self.schema_loading
            if self.keep_existing_schema:
                self.disconnect_db_buffered()
                self.__copy_tables()
            else:
                self.create_destination_tables()
                self.disconnect_db_buffered()
                self.__copy_tables()
                self.pg_engine.grant_select()
                self.pg_engine.swap_schemas()
                self.drop_loading_schemas()
            self.pg_engine.clean_batch_data()
            self.pg_engine.save_master_status(master_start)
            self.pg_engine.set_source_status("initialised")
            self.connect_db_buffered()
            master_end = self.get_master_coordinates()
            self.disconnect_db_buffered()
            self.pg_engine.set_source_highwatermark(master_end, consistent=False)
            notifier_message = "init replica for source %s is complete" % self.source
            self.notifier.send_message(notifier_message, 'info')
            self.logger.info(notifier_message)
        except:
            if not self.keep_existing_schema:
                self.drop_loading_schemas()
            self.pg_engine.set_source_status("error")
            notifier_message = "init replica for source %s failed" % self.source
            self.logger.critical(notifier_message)
            self.notifier.send_message(notifier_message, 'critical')
            raise