一个简单的数据库迁移脚本(python实现)

释放双眼,带上耳机,听听看~!

使用python写一个迁移数据库的脚本

最近有一个小项目,是将SQL server中得数据迁移到远程的MySQL库中,因为字段名和数据表现方式都不尽相同,所以操作起来比较繁琐。为了以后能够更快地增加表,甚至能够从MySQL(或是其他什么数据库软件)迁移到SQL server中,所以打算做一个尽量能够通用的数据库迁移脚本。


连接mssqlserver和mariadb

得益于pymssql和pymsql的方法极其相似,可以极其简单地对数据库进行连接操作。


1
2
3
1pymssql.connect(ip, user, password, database)
2
3

当然,两者的其他操作也十分相似。


设计大致的框架

其实一个脚本,也没什么好设计的。但是为了以后可能存在的迭代更新乃至于增加的需求,秉着“程序员”的思考方式,还是稍微设计一下吧。

1、数据库connector

首先需求就是将数据从一个库移到另一个库,于是很容易就能想到,要先连接两个数据库。那么就先设计一个link_to_databases的模块吧。

这个模块有一个Connector的父类,以及两个分别对应MSSQLSERVER和MARIADB的两个Connector子类。超类中有conn连接和cursor指针两个成员变量,以及连接connect和释放资源disconnect两个成员函数。

子类中,为了方便成员变量的获取,将IP地址server、用户名user、密码password以及连接的需要连接的库database在初始化类实例的时候作为成员变量保存。

然后重写connect和disconnect,因为两者有稍微的不同。

Connector:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1class Connector:
2    def __init__(self):
3        self.conn = None
4        self.cursor = None
5
6    def connect(self):
7        pass
8
9    def disconnect(self):
10        try:
11            if self.cursor:
12                self.cursor.close()
13        except Exception as err:
14            print(err)
15
16        try:
17            if self.conn:
18                self.conn.close()
19        except Exception as err:
20            print(err)
21
22

因为不想做太多的处理,所以此处就算便try…catch…了。

MSConnector:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1class MsConnector(Connector):
2    def __init__(self, mssql_server, mssql_user, mssql_password, mssql_database):
3        super(MsConnector, self).__init__()
4        self.server = mssql_server
5        self.user = mssql_user
6        self.password = mssql_password
7        self.database = mssql_database
8
9    def connect(self):
10        if self.cursor:
11            self.cursor.close()
12            self.cursor = None
13
14        if self.conn:
15            self.conn.close()
16            self.conn = None
17
18        self.conn = pymssql.connect(self.server, self.user, self.password, self.database)
19        self.cursor = self.conn.cursor(as_dict=True)
20
21

MariaConnector:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1class MariaConnector(Connector):
2    def __init__(self, mysql_server, mysql_user, mysql_password, mysql_database):
3        super(MariaConnector, self).__init__()
4        self.server = mysql_server
5        self.user = mysql_user
6        self.password = mysql_password
7        self.database = mysql_database
8
9    def connect(self):
10        if self.cursor:
11            self.cursor.close()
12            self.cursor = None
13
14        if self.conn and self.conn.open:
15            self.conn.close()
16            self.conn = None
17
18        self.conn = pymysql.connect(self.server, self.user, self.password, self.database)
19        self.cursor = self.conn.cursor()
20
21

这里其实没有做好抽象,不过并无大碍。另外,pymysql库中,判断连接是否还在,需要用open()来验证。

2、数据表models

各数据库的数据表暂时分开来写,MS的归MS,Maria的归Maria。
这里以MariaDB为例。

各表的model有各自的字段,这个不表了。这里谈谈超类的成员变量,以及为何需要这些变量。
MariaModel:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1class MariaModel:
2    def __init__(self):
3        self.column = ""
4        self.pk = ""
5        self.filter = ""
6
7    def update_from_db(self, queryset):
8        pass
9
10    def __str__(self):
11        return "MARIADB_MODEL - " + self.column
12
13    def get_field(self):
14        return []
15
16

上述,column是各自表的名称,因为相互对应的两个数据库中的数据表,表名完全不同。为了之后的通用性,使用该字段指向自己model的表名。
pk并非主键,而是用来指向表中唯一的标志,比如id或是uuid,因为在需求中,在迁移数据的过程中,如果遇到了id相同或是uuid相同的数据,则先删除后再添加。
filter是用来指代可以筛选的字段,比如时间日期或是其他,不过体现在项目中,只有日期。

update_from_db是用来将获取到的数据库的数据,保存到model之中,便于使用model统一操作。
get_field是返回整个表的字段,目前用于操作模块自动生成sql语句。

3、数据交换模块

将MSSQL server的model交换为MariaDB的model之后,才能进行对数据库的操作,所以接下来最重要的就是Translator模块。

Translator模块是需要反复使用的模块,为了提高效率,所以写了一个单例的装饰器。


1
2
3
4
5
6
7
8
9
10
11
1def singleton(cls):
2    instances = {}
3
4    def instance(*args, **kwargs):
5        if cls not in instances:
6            instances[cls] = cls(*args, **kwargs)
7        return instances[cls]
8
9    return instance
10
11

接着写一个简单的工厂类,用来自动调用不同的数据交换类(MS2MYSQL和MYSQL2MS)。


1
2
3
4
5
6
7
8
9
1class ModelTranslator:
2    @classmethod
3    def dispatch(cls, model):
4        if isinstance(model, sql_server_models.MsModel):
5            return getattr(MS2SQL(), str(type(model).__name__))(model)
6        elif isinstance(model, my_sql_models.MariaModel):
7            return getattr(SQL2MS(), str(type(model).__name__))(model)
8
9

最后就是交换数据了,编写这部分和上面的model类是最耗费时间和精力的体力劳动。因为有大量的字段需要定义并且赋值,而且大同小异,十分之无聊与枯燥,我于是写了几个脚本用于字符串的格式化,来省去写大量代码的时间。最后附上。


1
2
3
4
5
6
7
8
9
1@singleton
2class MS2SQL:
3    def Account(self, account: sql_server_models.Account):
4       pass
5    def Axe(self, axe: sql_server_models.Axe):
6       pass
7    ......
8
9

交换数据的方法很简单,就是一一对照赋值。但是我发现这边的速度非常慢,应该还有其他可以优化的方法。

4、数据库操作

对数据库的操作,按照此处的需求是,将MSSQLSERVER的数据取出,再删除MARIADB中相应的数据,再将取出的数据存入MARIADB。

此处的父类定义了一个connector的成员变量和get、delete、save三个成员方法,之后只要其他数据库的connector实现就行了。

get:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1def get(self, model_class, filters: list = None):
2    """
3    获取数据库数据
4    :param model_class: 表模型
5    :param filters: 过滤条件
6    :return: 数据集合
7    """
8    self.connector.connect()
9    if model_class:
10        table = model_class().column
11        filter_item = model_class().filter
12        if issubclass(model_class, MsModel):
13            sql = "SELECT * FROM [{0}]".format(table)
14        elif issubclass(model_class, MariaModel):
15            sql = "SELECT * FROM `{0}`".format(table)
16        else:
17            sql = ""
18
19        # filter 省略
20
21        try:
22            self.connector.cursor.execute(sql)
23            results = self.connector.cursor.fetchall()
24            self.connector.disconnect()
25        except Exception as err:
26            self.connector.disconnect()
27            return []
28        models = []
29        for query in results:
30            model = model_class()
31            model.update_from_db(query)
32            models.append(model)
33        return models
34    else:
35        self.connector.disconnect()
36        print("请输入需要读取的数据模型")
37        return []
38
39

可以很明显的看到,这里的操作会浪费大量的时间和空间,但是这里的效率还算比较快,并没有造成大量时间的浪费。

delete:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1def delete(self, model_class, filters: list = None, all_update=False):
2    """
3    删除数据
4    :param model_class: 数据表模型
5    :param filters:  条件
6    :param all_update:  是否完全更新(如果是,需要全部删除)
7    """
8    if not all_update and not model_class().pk:
9        return
10    self.connector.connect()
11    if model_class:
12        table = model_class().column
13        filter_item = model_class().filter
14        self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
15        self.connector.conn.commit()
16        sql = "DELETE FROM `{0}`".format(table)
17        if not all_update:
18            if sql and filter_item:
19                if len(filters) == 2:
20                    if filters[0] and filters[1]:
21                        sql = sql + " WHERE `{0}` >= '{1}' and `{0}` <= '{2}'".format(filter_item, filters[0],
22                                                                                      filters[1])
23                    elif filters[0] and not filters[1]:
24                        sql = sql + " WHERE `{0}` >= '{1}'".format(filter_item, filters[0])
25                    elif not filters[0] and filters[1]:
26                        sql = sql + " WHERE `{0}` <= '{1}'".format(filter_item, filters[1])
27                elif len(filters) == 1:
28                    sql = sql + " WHERE `{0}` >= '{1}'".format(filter_item, filters[0])
29                else:
30                    pass
31        try:
32            self.connector.cursor.execute(sql)
33            self.connector.conn.commit()
34            self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
35            self.connector.conn.commit()
36            self.connector.disconnect()
37        except Exception as err:
38            self.connector.disconnect()
39            Logger.Logger.Instance("ERROR").exception(str(err))
40    else:
41        self.connector.disconnect()
42
43

这里的删除是MariaDB的connector做的重写。所以只针对MySQL。

save:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1def save(self, model_class, context=None):
2    self.connector.connect()
3    if model_class:
4        table = model_class().column
5        fields = model_class().get_field()
6        if issubclass(model_class, MariaModel):
7            self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
8            self.connector.conn.commit()
9            for item in context:
10                sql_insert = "INSERT INTO `{0}`(".format(table)
11                for field in fields[:-1]:
12                    sql_insert = sql_insert + "`{0}`,".format(field)
13                sql_insert = sql_insert + "`{0}`) VALUES (".format(fields[-1])
14                sql_check = "SELECT * FROM `{0}`".format(table)
15                try:
16                    if isinstance(item, MariaModel):
17                        if item.pk:
18                            sql_check = sql_check + " WHERE `{0}`='{1}'".format(item.pk, getattr(item, item.pk))
19                            count = self.connector.cursor.execute(sql_check)
20                            if count:
21                                sql_delete = "DELETE FROM `{0}` WHERE `{1}`='{2}'".format(table, item.pk,
22                                                                                          getattr(item, item.pk))
23                                self.connector.cursor.execute(sql_delete)
24                                self.connector.conn.commit()
25                        for field in fields[:-1]:
26                            sql_insert = sql_insert + "'{0}',".format(getattr(item, field))
27                        sql_insert = sql_insert + "'{0}')".format(getattr(item, fields[-1]))
28                        self.connector.cursor.execute(sql_insert)
29                        self.connector.conn.commit()
30                    else:
31                        Logger.Logger.Instance("ERROR").error("The model is wrong.")
32                except Exception as err:
33                    Logger.Logger.Instance("ERROR").exception(str(err))
34            self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
35            self.connector.conn.commit()
36            self.connector.disconnect()
37            print("done")
38        else:
39            self.connector.disconnect()
40    else:
41        self.connector.disconnect()
42
43

从上面可以看出,所有的方法都是打开连接->处理->关闭连接,而非再同一个连接下完成,虽然按照需求只是重复了三次。


遇到的问题和注意点

1、 将所有的主键、外键改为UUID

这里遇到一些关于主键的问题,详细不谈了,在这里只做记录,对应MariaDB的系统是由Django完成的,所以在Django中将默认的ID换为UUID的方法如下:


1
2
3
4
1class SomeModel(models.Model):
2   uuid = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
3
4

2、多张表相互引用时的问题

有两张以上的表互相作为外键或是引用时,删除或是更新甚至清空再保存,都会遇到一些问题,尤其是MySql(Maria DB)。


1
2
3
1Cannot add or update a child row: a foreign key constraint fails (database.table, CONSTRAINT XXX FOREIGN KEY (xxx) REFERENCES `xxx_table` (`id`))')
2
3

会产生类似于上述的错误。

这时候需要在运行sql之前(针对mysql和pymysql):


1
2
3
4
1self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
2self.connector.conn.commit()
3
4

以及在最后释放连接资源之前(针对mysql和pymysql):


1
2
3
4
1self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
2self.connector.conn.commit()
3
4

这样就可以基本解决问题。

3、对于搜索集不同的处理方式

pymysql和pymssql的搜索集的处理方式很不一样,因为pymysql返回的是一个set或是list,但是pymssql返回的结果集是一个字典。


1
2
3
4
5
6
7
1# 这个是pymysql
2self.id = queryset[0]
3
4# 这个是pymssql
5self.id = queryset.get("ID", "")
6
7

4、消除控制台warning


1
2
3
4
1import warnings
2warnings.filterwarnings('ignore')
3
4

5、遇到的数据库链接错误

在使用pymssql的时候,有事会产生这种异常


1
2
3
4
5
1conn = pymssql.connect(config.Host, config.User, config.Pass, config.Database)
2File "pymssql.pyx", line 641, in pymssql.connect (pymssql.c:10824)
3pymssql.OperationalError: (20017, b'DB-Lib error message 20017, severity 9:\nUnexpected EOF from the server (10.1.2.14:1433)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (10.1.2.14:1433)\n')
4
5

这个问题的本质目前还不清楚,网上有人给出的解决方案是给一个显式编码


1
2
3
1conn = pymssql.connect(config.Host, config.User, config.Pass, config.Database,charset="UTF-8")
2
3

总结

之后我会补完代码的优化和运行效率的部分。目前项目的整体还不完善,而且运行速度十分有限,字段少的表中,25万条数据大概要1个半小时,而字段多(100以上)的表,4个小时才跑完8万条数据,说明在数据模块交换的地方耗费了大量的时间,如果能有所改进就好。


算是对一些小地方的补充,没什么意义,纯粹作为记录。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1import re
2if __name__ == "__main__":
3    context = []
4    with open("demo.txt") as p:
5        text = p.readlines()
6        for line in text:
7            m = re.search("self.([a-zA-z0-9_]+) = \"\"", line)
8            if m:
9                item = m.groups()
10                context.append(item[0])
11
12    print(context)
13    print()
14    print()
15    conut = 0
16    result = ""
17    for item in context:
18        result = result + "\""
19        result = result + item + ": {" + str(conut) + "},\\t"
20        if conut == len(context) - 1:
21            result = result + "\".format(\n"
22            break
23        result = result + "\" \\\n"
24        conut = conut + 1
25    for item in context:
26        result = result + "self." + item + ",\n"
27    result = result + ")"
28    print(result)
29
30

这个脚本将形似于


1
2
3
4
5
1self.id = ""
2self.user_id = ""
3self.role_id = ""
4
5

的成员变量,转换为


1
2
3
4
5
6
7
8
9
10
11
12
1['id', 'user_id', 'role_id']
2
3
4"id: {0},\t" \
5"user_id: {1},\t" \
6"role_id: {2},\t".format(
7   self.id,
8   self.user_id,
9   self.role_id,
10)
11
12

将成员变量提取出来,用于完成update_from_db方法和__str__()方法。


对速率的优化

1、重要的修改

上面说到了,对超过20万条的数据,执行速度十分缓慢,而且通过测试,并不是如同在总结中所说的,是数据交换赋值的速度有问题,而是数据库插入操作的问题。对于多条数据,pymysql是推荐使用cursor.executemany(sql_str,values)来替代cursor.execute(sql_str)的,也不推荐用户自己拼接字符串。

所以应该使用cursor.executemany(sql_str,values)。结果显而易见,28万条数据的插入仅仅使用了1分钟便完成了。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1def fast_save(self, model_class, context=None):
2    self.connector.connect()
3    if model_class:
4        table = model_class().column
5        fields = model_class().get_field()
6        if issubclass(model_class, MariaModel):
7            self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
8            self.connector.conn.commit()
9            sql_insert = "INSERT INTO `{0}`(".format(table)
10            for field in fields[:-1]:
11                sql_insert = sql_insert + "`{0}`,".format(field)
12            sql_insert = sql_insert + "`{0}`) VALUES (".format(fields[-1])
13            for field in fields[:-1]:
14                sql_insert = sql_insert + "%s,"
15            sql_insert = sql_insert + "%s);"
16            values = []
17            for item in context:
18                t = []
19                for field in fields:
20                    t.append(getattr(item, field))
21                values.append(tuple(t))
22
23            self.connector.cursor.executemany(sql_insert, values)
24            self.connector.conn.commit()
25
26            self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
27            self.connector.conn.commit()
28
29            self.connector.disconnect()
30            print("done")
31        else:
32            self.connector.disconnect()
33    else:
34        self.connector.disconnect()
35
36

2、代码结构和逻辑的修改

对代码的优化

给TA打赏
共{{data.count}}人
人已打赏
安全技术

C++ 用libcurl库进行http通讯网络编程

2022-1-11 12:36:11

安全经验

jenkins+jmeter+ant(四)jenkins报告HTML

2021-10-11 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索