如何快速在表级别做同构或者异构数据库之间的数据迁移/备份

 

与库级别的备份还原这一重量级操作套路不同,单个表或者多个表的备份还原,亦或是跨实例或者异构数据库之间的数据同步,作为DBA日常处理的最多的问题之一,如何简洁,快速,高效地实现快速,是一个值得思考的问题?

 

同一个实例上表级别备份还原

对于源表和目标表,如果在同一个实例上,仅从SQL的层面就可以实现表级别的备份还原:

  • MySQL中 :create target_table like source_table; insert into target_table select * from source_table;
  • SQLServer中:select * into target_table from source_table;
  • PostgreSQL中:create table target as table source_table;

正常情况下,MySQL和SQLServer都可以实现跨库操作,也就是源表和目标表可以不在同一个数据库中,PostgreSQL中无法直接实现跨库操作。

 

非一个实例上表级别备份还原

对于源表和目标表,如果不在同一个实例,以上方法均无法完成单表/多表的备份,可采用的方法:

  • 1,基于表级别的逻辑备份和还原,MySQL中的mysqldump,SQLServer中的bcp导入导出,postgresql中的pgdump等
  • 2,MySQL可以直接备份表对应的文件到目标实例,前提是开启了innodb_file_per_table
  • 3,自定义代码实现,类似于ETL的功能

 MySQL中表文件级别备份还原

在MySQL表级别迁移/备份实现:

源表在DB01,目标表在DB02
USE db02;
CREATE TABLE my_table
(
    c1 BIGINT AUTO_INCREMENT PRIMARY key,
    c2 VARCHAR(100),
    c3 VARCHAR(100),
    c4 VARCHAR(100),
    c5 DATETIME(6)
);

-- 丢弃目标表的idb数据文件
USE db02
alter table my_table discard tablespace;   
 
-- 锁定源表防止被修改
USE db01;
flush table my_table for EXPORT; 

-- 然后拷贝源表src的cfg文件和ibd文件到目标表dst,命令如下:
cp ./db01/my_table.ibd ./db02/

-- 对原表执行解锁操作
USE db01;
unlock TABLES;

-- 修改目标表的文件属主
chown mysql.mysql my_table.ibd 

-- 对目标表执行导入
USE db02;
alter table my_table import TABLESPACE; 

USE db02;
SELECT * FROM my_table LIMIT 100;

自定义代码实现表级别同构/异构数据库之间同步

鉴于对于表的同步,并不限于同构类型的数据库,或者是简单的表到表的备份,可以是异构数据库,或者是一个联合查询结果导出/备份到其他数据库,所以自定义代码的方式可以最大限度地适应各种情况。
代码实现大概思路就是,通过配置一个源(实际上是一个查询语句source_select_sql),再配置一个目标表(实际上是一个insert语句),定义好源的数据库类型(MySQL/PostgreSQL/SQLServer),定义好目标表的数据库类型(MySQL/PostgreSQL/SQLServer),以及相关的批处理参数,通过批量从源中读取数据(表或者查询),然后批量写入目标表,即可快速高地实现表在同构/异构级别的数据同步。

以下实现了MySQL/PostgreSQL/SQLServer之间,任意两个数据库之间表级别的并发数据备份/迁移,以下demo,忽略网络传输的情况下,以MySQL单实例本从A库到B库为例,每秒可导入5000行左右。

 具体代码实现

import base64
import datetime
import uuid
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Process
from multiprocessing import Pool
import psycopg2
import pymssql
import pymysql
from enum import Enum
import logging


logger = logging.getLogger('data-sync-log')
logger.setLevel(level=logging.INFO)
formatter = logging.Formatter('%(asctime)s -%(name)s -%(levelname)s -%(message)s')
# handler = logging.FileHandler("data-sync-log-{0}.txt".format(time.strftime("%Y-%m-%d-%H-%M-%S")))
# handler.setFormatter(formatter)
# logger.addHandler(handler)
console = logging.StreamHandler()  
console.setLevel(logging.INFO)
logger.addHandler(console)


class DataBaseType(Enum):
    MSSQL = 1
    POSTGRESQL = 2
    MYSQL = 3


def get_postgre_conn(conn):
    # conn = psycopg2.connect(**conn, as_dict=True)
    try:
        conn = psycopg2.connect(**conn)
        return conn
    except Exception:
        raise


def get_mssql_conn(conn):
    # conn = pymssql.connect(**conn, as_dict=True)
    try:
        conn = pymssql.connect(**conn)
        return conn
    except Exception:
        raise


def get_mysql_conn(conn):
    # conn = pymssql.connect(**conn, as_dict=True)
    try:
        conn = pymysql.connect(**conn)
        return conn
    except Exception:
        raise


def get_db_conn(db_type, conn):
    if db_type.name == 'MSSQL':
        return get_mssql_conn(conn)
    elif db_type.name == 'POSTGRESQL':
        return get_postgre_conn(conn)
    elif db_type.name == 'MYSQL':
        return get_mysql_conn(conn)
    return None


def get_data_from_mssql(source_server, source_server_type, source_select_sql, last_sync_timestamp,read_page_size=50000):
    source_conn = get_db_conn(source_server_type, source_server)
    # for mssql the execute function will do nothing
    # fetchmany will only return page size result to client for pymssql and pymysql
    with source_conn.cursor() as source_cursor:
        source_cursor.execute(source_select_sql.format(last_sync_timestamp))
        while True:
            list_result = source_cursor.fetchmany(read_page_size)
            if list_result:
                yield list_result
                list_result.clear()
            else:
                break


def get_data_from_mysql(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_page_size=50000):
    source_conn = get_db_conn(source_server_type, source_server)
    v_last_sync_timestamp = last_sync_timestamp;
    list_result = []
    #for pymysql, the execute will fetch all the result from server to client one time, so paging is necessary
    with source_conn.cursor() as source_cursor:
        current_start_no = 1
        while True:
            source_cursor.execute(source_select_sql.format((current_start_no-1) * read_page_size,read_page_size))
            #print(source_select_sql.format((current_start_no-1) * read_page_size,read_page_size))
            result = source_cursor.fetchmany(read_page_size)
            if len(result)>0:
                current_start_no = current_start_no + 1
                for row in result:
                    list_result.append(row)
                    if len(list_result) == read_page_size:
                        yield list_result
                        list_result.clear()
                if len(list_result) > 0:
                    yield list_result
            else:
                break


def get_data_from_postgresql(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_batch_size=50000):
    source_conn = get_db_conn(source_server_type, source_server)
    # target_conn.set_session(autocommit=True)
    # psycopg2.extras.register_uuid()
    '''
    page_index = 0
    para_col = '(' + ''.join('%s,' for x in range(col_size))
    para_col = para_col[:-1] + ')'
    '''
    v_last_execute_timestamp = last_sync_timestamp
    v_current_timestamp = datetime.datetime.utcnow()
    list_result = []
    with source_conn.cursor(name="client_cursor") as source_cursor:
        # for postgresql type data source
        # create client cursor for postgresql database,
        # fetchmany will return all the select result from server side in postgresql, so use client cursor
        source_cursor.execute(source_select_sql.format(v_last_execute_timestamp))
        for row in source_cursor:
            list_result.append(row)
            if len(list_result) == read_batch_size:
                yield list_result
                list_result.clear()
        if len(list_result) > 0:
            yield list_result


def get_data_from_datasource(source_server, source_server_type, source_select_sql, last_sync_timestamp, read_batch_size):
    if source_server_type == DataBaseType.MYSQL:
        return get_data_from_mysql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
                                   read_batch_size)
    if source_server_type == DataBaseType.POSTGRESQL:
        return get_data_from_postgresql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
                                        read_batch_size)
    if source_server_type == DataBaseType.MSSQL:
        return get_data_from_mssql(source_server, source_server_type, source_select_sql, last_sync_timestamp,
                                   read_batch_size)


def execute_batch_insert2postgresql(target_server_type, target_server, target_insert_statement, target_table_name,  col_size, list_datasource, insert_batch_size):
    batch_size_index = 0
    para_col = '(' + ''.join('%s,' for x in range(col_size))
    para_col = para_col[:-1] + ')'
    conn = get_db_conn(target_server_type, target_server)
    conn.set_session(autocommit=True)
    psycopg2.extras.register_uuid()
    with conn.cursor() as cursor:
        while batch_size_index * insert_batch_size < len(list_datasource):
            try:
                star_pos = batch_size_index * insert_batch_size
                end_pos = (batch_size_index + 1) * insert_batch_size
                args_str = ','.join( cursor.mogrify(para_col, x).decode('utf-8') for x in tuple(list_datasource[star_pos: end_pos]))
                cursor.execute(target_insert_statement.format(args_str))
                logger.info(
                    str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,
                                                                                                len(list_datasource[
                                                                                                    star_pos:end_pos])))
                batch_size_index = batch_size_index + 1
                conn.commit()
            except Exception as err:
                raise err
    conn.close()


def execute_batch_insert2mysql(target_server_type, target_server, target_insert_statement, target_table_name, col_size,list_datasource, batch_size):
    batch_size_index = 0
    para_col = '(' + ''.join('%s,' for x in range(col_size))
    para_col = para_col[:-1] + ')'
    conn = get_db_conn(target_server_type, target_server)
    with conn.cursor() as cursor:
        while batch_size_index * batch_size < len(list_datasource):
            try:
                cursor.executemany(target_insert_statement, list_datasource[batch_size_index * batch_size: (batch_size_index + 1) * batch_size])
                logger.info(str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,len(list_datasource[batch_size_index * batch_size:(batch_size_index + 1) * batch_size])))
                batch_size_index = batch_size_index + 1
                conn.commit()
            except Exception as err:
                raise err
    conn.close()


def execute_batch_insert2mssql(target_server_type, target_server, target_insert_statement, target_table_name, col_size, list_datasource, batch_size):
    batch_size_index = 0
    conn = get_db_conn(target_server_type, target_server)
    with conn.cursor() as cursor:
        while batch_size_index * batch_size < len(list_datasource):
            try:
                conn.bulk_copy(table_name = target_insert_statement, elements=list_datasource[batch_size_index * batch_size: (batch_size_index + 1) * batch_size],column_ids=[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21])
                logger.info(str(datetime.datetime.now()) + ' {0} --->{1} rows batch size finish'.format(target_table_name,len(list_datasource[batch_size_index * batch_size:(batch_size_index + 1) * batch_size])))
                batch_size_index = batch_size_index + 1
                conn.commit()
            except Exception as err:
                raise err
    conn.close()


def execute_batch_insert_to_target(target_server, target_server_type, target_insert_statement, target_table_name, col_size, list_data_source, insert_batch_size):
    if target_server_type == DataBaseType.MYSQL:
        execute_batch_insert2mysql(target_server_type, target_server, target_insert_statement, target_table_name,
                                   col_size, list_data_source, insert_batch_size)
    elif target_server_type == DataBaseType.POSTGRESQL:
        execute_batch_insert2postgresql(target_server_type, target_server, target_insert_statement, target_table_name,
                                        col_size, list_data_source, insert_batch_size)
    elif target_server_type == DataBaseType.MSSQL:
        execute_batch_insert2mssql(target_server_type, target_server, target_insert_statement, target_table_name,
                                   col_size, list_data_source, insert_batch_size)


def sync_data_from_source2target(source_server, source_server_type, source_select_sql,
                                 target_server, target_server_type, target_table_name, col_size,
                                 target_insert_statement,
                                 sync_type='FULL', insert_batch_size=5000, read_batch_size=50000):
    # target_conn = get_db_conn(target_server_type, target_server)
    # target_conn.set_session(autocommit=True)
    # psycopg2.extras.register_uuid()
    '''
    page_index = 0
    para_col = '(' + ''.join('%s,' for x in range(col_size))
    para_col = para_col[:-1] + ')'
    '''
    v_last_execute_timestamp = datetime.datetime.min;
    v_current_timestamp = datetime.datetime.utcnow()

    if sync_type == 'Delta':
        last_sync_info = get_last_sync_position(target_server, target_server_type, target_table_name)
        log_id = last_sync_info[0]
        v_last_execute_timestamp = last_sync_info[2]

    for list_data_source in get_data_from_datasource(source_server, source_server_type, source_select_sql,  v_last_execute_timestamp, read_batch_size):
        execute_batch_insert_to_target(target_server, target_server_type, target_insert_statement, target_table_name, col_size, list_data_source, insert_batch_size)

    if sync_type == 'Delta':
        update_last_sync_position(target_server, log_id, target_server_type, v_current_timestamp)


def get_last_sync_position(conn, db_type, table_name):
    conn = get_db_conn(db_type, conn)
    with conn.cursor() as cursor:
        cursor.execute('''select id,position_type,position_value from data_sync_position_log where table_name = %s ''',
                       [table_name])
        result = cursor.fetchone()
    conn.close()
    return result


def update_last_sync_position(conn, id, db_type, position_value):
    conn = get_db_conn(db_type, conn)
    last_sync_timestamp = None
    with conn.cursor() as cursor:
        # print('update data_sync_position_log set position_value = %s, last_update =  now()  where id = %s ',[position_value,id])
        cursor.execute('update data_sync_position_log set position_value = %s, last_update =  now()  where id = %s ',
                       [position_value, id])
    conn.commit()
    conn.close()
    return last_sync_timestamp


def sync_data_from_source2target_worker(para_dict):
    '''
    process current
    :param para_dict:
    :return:
    '''
    list_future = []
    with ProcessPoolExecutor(max_workers=10) as executor:
        try:
            # 通过submit函数提交执行的函数到进程池中,submit函数立即返回,不阻塞
            for key, val in para_dict.items():
                future = executor.submit(sync_data_from_source2target,
                                         val['source_server'],
                                         val['source_server_type'],
                                         val['source_select_sql'],
                                         val['target_server'],
                                         val['target_server_type'],
                                         val['target_table_name'],
                                         val['col_size'],
                                         val['target_insert_statement'],
                                         val['sync_type'],
                                         val['insert_batch_size'],
                                         val['read_page_size'])
                list_future.append(future)
        except Exception as ex:
            raise Exception('startup process exception: ' + str(ex))
        try:
            # 遍历list_future的result方法,获取进程的执行结果或者异常信息
            for future in list_future:
                future.result()
        except Exception as ex:
            raise Exception('process execute exception: ' + str(ex))



def mysql_2_mysql(): para_dict = {} sync_type = 'Full' # Full or Delta insert_batch_size = 5000 read_page_size = 50000 # mysql to mssql case source_conn = {'host': '***.***.***.***', 'database': 'XXX', 'port': ****, 'user': '****', 'password': '******'} target_conn = {'host': '***.***.***.***', 'database': 'XXX', 'port': ****, 'user': '****', 'password': '******'} para_dict['test_mytable'] = {'source_server': source_conn, 'source_server_type': DataBaseType.MYSQL, 'source_select_sql': ''' SELECT c1,c2,c3,c4,c5 FROM my_table limit {0},{1}; ''', 'target_table_name': 'my_table', 'target_server': target_conn, 'target_server_type': DataBaseType.MYSQL, #这个参数原本是想自动生成insert 语句的,直接写一个也不麻烦,这个参数废弃 'col_size': 7, 'target_insert_statement': '''INSERT INTO my_table (c1,c2,c3,c4,c5) VALUES (%s,%s,%s,%s,%s);''', 'insert_batch_size': insert_batch_size, 'read_page_size': read_page_size, #原本一直想做Delta,也就是差异数据同步的,但是某些情况下无法实现,比如原表上发生的update或者delete操作,无法精确地找到差异的数据,这个参数废弃 'sync_type': 'FULL'} sync_data_from_source2target_worker(para_dict) if __name__ == "__main__": mysql_2_mysql()

 

总结

1,基于表级别的逻辑备份和还原,传统情况下,MySQL中的mysqldump,SQLServer中的bcp导入导出,postgresql中的pgdump等
这种方式比较通用,适应于每个数据库,导出与导入效率不错,但是缺点也很明显,无法实现异构数据操作,对于复杂情况也无法实现,比如比备份或者导出一个联合查询的结果到另一种数据库中

2,MySQL中开启了innodb_file_per_table的情况下,可以直接通过flush table for EXPORT的方式备份表对应的文件到目标实例,然后通过alter table my_table import TABLESPACE;这种方式仅适应于MySQL,优点是简单快捷效率高。但是PostgreSQL和SQLServer中没有类似选项或者做法。

3,自定义代码实现,类似于ETL
这种方式最为灵活,可以跨实例,跨表结构(源表可以是一个结果集,目标是一个表),或者异构数据库之间的表级别备份/迁移,配置简单明了,如果有多个迁移对象的情况下,通过多进程/线程并发备份(迁移),灵活性和效率都有保证。缺点是需要自行做代码实现。

 

热门相关:凤惊天之狂妃难求   无敌天下   隋唐君子演义   巨星小甜妻:前夫,请出局   新书