使用python写一个迁移数据库的脚本
最近有一个小项目,是将SQL server中得数据迁移到远程的MySQL库中,因为字段名和数据表现方式都不尽相同,所以操作起来比较繁琐。为了以后能够更快地增加表,甚至能够从MySQL(或是其他什么数据库软件)迁移到SQL server中,所以打算做一个尽量能够通用的数据库迁移脚本。
连接mssqlserver和mariadb
得益于pymssql和pymsql的方法极其相似,可以极其简单地对数据库进行连接操作。
1
2
3 1pymssql.connect(ip, user, password, database)
2
3
当然,两者的其他操作也十分相似。
设计大致的框架
其实一个脚本,也没什么好设计的。但是为了以后可能存在的迭代更新乃至于增加的需求,秉着“程序员”的思考方式,还是稍微设计一下吧。
1、数据库connector
首先需求就是将数据从一个库移到另一个库,于是很容易就能想到,要先连接两个数据库。那么就先设计一个link_to_databases的模块吧。
这个模块有一个Connector的父类,以及两个分别对应MSSQLSERVER和MARIADB的两个Connector子类。超类中有conn连接和cursor指针两个成员变量,以及连接connect和释放资源disconnect两个成员函数。
子类中,为了方便成员变量的获取,将IP地址server、用户名user、密码password以及连接的需要连接的库database在初始化类实例的时候作为成员变量保存。
然后重写connect和disconnect,因为两者有稍微的不同。
Connector:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 1class Connector:
2 def __init__(self):
3 self.conn = None
4 self.cursor = None
5
6 def connect(self):
7 pass
8
9 def disconnect(self):
10 try:
11 if self.cursor:
12 self.cursor.close()
13 except Exception as err:
14 print(err)
15
16 try:
17 if self.conn:
18 self.conn.close()
19 except Exception as err:
20 print(err)
21
22
因为不想做太多的处理,所以此处就算便try…catch…了。
MSConnector:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1class MsConnector(Connector):
2 def __init__(self, mssql_server, mssql_user, mssql_password, mssql_database):
3 super(MsConnector, self).__init__()
4 self.server = mssql_server
5 self.user = mssql_user
6 self.password = mssql_password
7 self.database = mssql_database
8
9 def connect(self):
10 if self.cursor:
11 self.cursor.close()
12 self.cursor = None
13
14 if self.conn:
15 self.conn.close()
16 self.conn = None
17
18 self.conn = pymssql.connect(self.server, self.user, self.password, self.database)
19 self.cursor = self.conn.cursor(as_dict=True)
20
21
MariaConnector:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1class MariaConnector(Connector):
2 def __init__(self, mysql_server, mysql_user, mysql_password, mysql_database):
3 super(MariaConnector, self).__init__()
4 self.server = mysql_server
5 self.user = mysql_user
6 self.password = mysql_password
7 self.database = mysql_database
8
9 def connect(self):
10 if self.cursor:
11 self.cursor.close()
12 self.cursor = None
13
14 if self.conn and self.conn.open:
15 self.conn.close()
16 self.conn = None
17
18 self.conn = pymysql.connect(self.server, self.user, self.password, self.database)
19 self.cursor = self.conn.cursor()
20
21
这里其实没有做好抽象,不过并无大碍。另外,pymysql库中,判断连接是否还在,需要用open()来验证。
2、数据表models
各数据库的数据表暂时分开来写,MS的归MS,Maria的归Maria。
这里以MariaDB为例。
各表的model有各自的字段,这个不表了。这里谈谈超类的成员变量,以及为何需要这些变量。
MariaModel:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1class MariaModel:
2 def __init__(self):
3 self.column = ""
4 self.pk = ""
5 self.filter = ""
6
7 def update_from_db(self, queryset):
8 pass
9
10 def __str__(self):
11 return "MARIADB_MODEL - " + self.column
12
13 def get_field(self):
14 return []
15
16
上述,column是各自表的名称,因为相互对应的两个数据库中的数据表,表名完全不同。为了之后的通用性,使用该字段指向自己model的表名。
pk并非主键,而是用来指向表中唯一的标志,比如id或是uuid,因为在需求中,在迁移数据的过程中,如果遇到了id相同或是uuid相同的数据,则先删除后再添加。
filter是用来指代可以筛选的字段,比如时间日期或是其他,不过体现在项目中,只有日期。
update_from_db是用来将获取到的数据库的数据,保存到model之中,便于使用model统一操作。
get_field是返回整个表的字段,目前用于操作模块自动生成sql语句。
3、数据交换模块
将MSSQL server的model交换为MariaDB的model之后,才能进行对数据库的操作,所以接下来最重要的就是Translator模块。
Translator模块是需要反复使用的模块,为了提高效率,所以写了一个单例的装饰器。
1
2
3
4
5
6
7
8
9
10
11 1def singleton(cls):
2 instances = {}
3
4 def instance(*args, **kwargs):
5 if cls not in instances:
6 instances[cls] = cls(*args, **kwargs)
7 return instances[cls]
8
9 return instance
10
11
接着写一个简单的工厂类,用来自动调用不同的数据交换类(MS2MYSQL和MYSQL2MS)。
1
2
3
4
5
6
7
8
9 1class ModelTranslator:
2 @classmethod
3 def dispatch(cls, model):
4 if isinstance(model, sql_server_models.MsModel):
5 return getattr(MS2SQL(), str(type(model).__name__))(model)
6 elif isinstance(model, my_sql_models.MariaModel):
7 return getattr(SQL2MS(), str(type(model).__name__))(model)
8
9
最后就是交换数据了,编写这部分和上面的model类是最耗费时间和精力的体力劳动。因为有大量的字段需要定义并且赋值,而且大同小异,十分之无聊与枯燥,我于是写了几个脚本用于字符串的格式化,来省去写大量代码的时间。最后附上。
1
2
3
4
5
6
7
8
9 1@singleton
2class MS2SQL:
3 def Account(self, account: sql_server_models.Account):
4 pass
5 def Axe(self, axe: sql_server_models.Axe):
6 pass
7 ......
8
9
交换数据的方法很简单,就是一一对照赋值。但是我发现这边的速度非常慢,应该还有其他可以优化的方法。
4、数据库操作
对数据库的操作,按照此处的需求是,将MSSQLSERVER的数据取出,再删除MARIADB中相应的数据,再将取出的数据存入MARIADB。
此处的父类定义了一个connector的成员变量和get、delete、save三个成员方法,之后只要其他数据库的connector实现就行了。
get:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 1def get(self, model_class, filters: list = None):
2 """
3 获取数据库数据
4 :param model_class: 表模型
5 :param filters: 过滤条件
6 :return: 数据集合
7 """
8 self.connector.connect()
9 if model_class:
10 table = model_class().column
11 filter_item = model_class().filter
12 if issubclass(model_class, MsModel):
13 sql = "SELECT * FROM [{0}]".format(table)
14 elif issubclass(model_class, MariaModel):
15 sql = "SELECT * FROM `{0}`".format(table)
16 else:
17 sql = ""
18
19 # filter 省略
20
21 try:
22 self.connector.cursor.execute(sql)
23 results = self.connector.cursor.fetchall()
24 self.connector.disconnect()
25 except Exception as err:
26 self.connector.disconnect()
27 return []
28 models = []
29 for query in results:
30 model = model_class()
31 model.update_from_db(query)
32 models.append(model)
33 return models
34 else:
35 self.connector.disconnect()
36 print("请输入需要读取的数据模型")
37 return []
38
39
可以很明显的看到,这里的操作会浪费大量的时间和空间,但是这里的效率还算比较快,并没有造成大量时间的浪费。
delete:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 1def delete(self, model_class, filters: list = None, all_update=False):
2 """
3 删除数据
4 :param model_class: 数据表模型
5 :param filters: 条件
6 :param all_update: 是否完全更新(如果是,需要全部删除)
7 """
8 if not all_update and not model_class().pk:
9 return
10 self.connector.connect()
11 if model_class:
12 table = model_class().column
13 filter_item = model_class().filter
14 self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
15 self.connector.conn.commit()
16 sql = "DELETE FROM `{0}`".format(table)
17 if not all_update:
18 if sql and filter_item:
19 if len(filters) == 2:
20 if filters[0] and filters[1]:
21 sql = sql + " WHERE `{0}` >= '{1}' and `{0}` <= '{2}'".format(filter_item, filters[0],
22 filters[1])
23 elif filters[0] and not filters[1]:
24 sql = sql + " WHERE `{0}` >= '{1}'".format(filter_item, filters[0])
25 elif not filters[0] and filters[1]:
26 sql = sql + " WHERE `{0}` <= '{1}'".format(filter_item, filters[1])
27 elif len(filters) == 1:
28 sql = sql + " WHERE `{0}` >= '{1}'".format(filter_item, filters[0])
29 else:
30 pass
31 try:
32 self.connector.cursor.execute(sql)
33 self.connector.conn.commit()
34 self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
35 self.connector.conn.commit()
36 self.connector.disconnect()
37 except Exception as err:
38 self.connector.disconnect()
39 Logger.Logger.Instance("ERROR").exception(str(err))
40 else:
41 self.connector.disconnect()
42
43
这里的删除是MariaDB的connector做的重写。所以只针对MySQL。
save:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 1def save(self, model_class, context=None):
2 self.connector.connect()
3 if model_class:
4 table = model_class().column
5 fields = model_class().get_field()
6 if issubclass(model_class, MariaModel):
7 self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
8 self.connector.conn.commit()
9 for item in context:
10 sql_insert = "INSERT INTO `{0}`(".format(table)
11 for field in fields[:-1]:
12 sql_insert = sql_insert + "`{0}`,".format(field)
13 sql_insert = sql_insert + "`{0}`) VALUES (".format(fields[-1])
14 sql_check = "SELECT * FROM `{0}`".format(table)
15 try:
16 if isinstance(item, MariaModel):
17 if item.pk:
18 sql_check = sql_check + " WHERE `{0}`='{1}'".format(item.pk, getattr(item, item.pk))
19 count = self.connector.cursor.execute(sql_check)
20 if count:
21 sql_delete = "DELETE FROM `{0}` WHERE `{1}`='{2}'".format(table, item.pk,
22 getattr(item, item.pk))
23 self.connector.cursor.execute(sql_delete)
24 self.connector.conn.commit()
25 for field in fields[:-1]:
26 sql_insert = sql_insert + "'{0}',".format(getattr(item, field))
27 sql_insert = sql_insert + "'{0}')".format(getattr(item, fields[-1]))
28 self.connector.cursor.execute(sql_insert)
29 self.connector.conn.commit()
30 else:
31 Logger.Logger.Instance("ERROR").error("The model is wrong.")
32 except Exception as err:
33 Logger.Logger.Instance("ERROR").exception(str(err))
34 self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
35 self.connector.conn.commit()
36 self.connector.disconnect()
37 print("done")
38 else:
39 self.connector.disconnect()
40 else:
41 self.connector.disconnect()
42
43
从上面可以看出,所有的方法都是打开连接->处理->关闭连接,而非再同一个连接下完成,虽然按照需求只是重复了三次。
遇到的问题和注意点
1、 将所有的主键、外键改为UUID
这里遇到一些关于主键的问题,详细不谈了,在这里只做记录,对应MariaDB的系统是由Django完成的,所以在Django中将默认的ID换为UUID的方法如下:
1
2
3
4 1class SomeModel(models.Model):
2 uuid = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
3
4
2、多张表相互引用时的问题
有两张以上的表互相作为外键或是引用时,删除或是更新甚至清空再保存,都会遇到一些问题,尤其是MySql(Maria DB)。
1
2
3 1Cannot add or update a child row: a foreign key constraint fails (database.table, CONSTRAINT XXX FOREIGN KEY (xxx) REFERENCES `xxx_table` (`id`))')
2
3
会产生类似于上述的错误。
这时候需要在运行sql之前(针对mysql和pymysql):
1
2
3
4 1self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
2self.connector.conn.commit()
3
4
以及在最后释放连接资源之前(针对mysql和pymysql):
1
2
3
4 1self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
2self.connector.conn.commit()
3
4
这样就可以基本解决问题。
3、对于搜索集不同的处理方式
pymysql和pymssql的搜索集的处理方式很不一样,因为pymysql返回的是一个set或是list,但是pymssql返回的结果集是一个字典。
1
2
3
4
5
6
7 1# 这个是pymysql
2self.id = queryset[0]
3
4# 这个是pymssql
5self.id = queryset.get("ID", "")
6
7
4、消除控制台warning
1
2
3
4 1import warnings
2warnings.filterwarnings('ignore')
3
4
5、遇到的数据库链接错误
在使用pymssql的时候,有事会产生这种异常
1
2
3
4
5 1conn = pymssql.connect(config.Host, config.User, config.Pass, config.Database)
2File "pymssql.pyx", line 641, in pymssql.connect (pymssql.c:10824)
3pymssql.OperationalError: (20017, b'DB-Lib error message 20017, severity 9:\nUnexpected EOF from the server (10.1.2.14:1433)\nDB-Lib error message 20002, severity 9:\nAdaptive Server connection failed (10.1.2.14:1433)\n')
4
5
这个问题的本质目前还不清楚,网上有人给出的解决方案是给一个显式编码
1
2
3 1conn = pymssql.connect(config.Host, config.User, config.Pass, config.Database,charset="UTF-8")
2
3
总结
之后我会补完代码的优化和运行效率的部分。目前项目的整体还不完善,而且运行速度十分有限,字段少的表中,25万条数据大概要1个半小时,而字段多(100以上)的表,4个小时才跑完8万条数据,说明在数据模块交换的地方耗费了大量的时间,如果能有所改进就好。
附
算是对一些小地方的补充,没什么意义,纯粹作为记录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1import re
2if __name__ == "__main__":
3 context = []
4 with open("demo.txt") as p:
5 text = p.readlines()
6 for line in text:
7 m = re.search("self.([a-zA-z0-9_]+) = \"\"", line)
8 if m:
9 item = m.groups()
10 context.append(item[0])
11
12 print(context)
13 print()
14 print()
15 conut = 0
16 result = ""
17 for item in context:
18 result = result + "\""
19 result = result + item + ": {" + str(conut) + "},\\t"
20 if conut == len(context) - 1:
21 result = result + "\".format(\n"
22 break
23 result = result + "\" \\\n"
24 conut = conut + 1
25 for item in context:
26 result = result + "self." + item + ",\n"
27 result = result + ")"
28 print(result)
29
30
这个脚本将形似于
1
2
3
4
5 1self.id = ""
2self.user_id = ""
3self.role_id = ""
4
5
的成员变量,转换为
1
2
3
4
5
6
7
8
9
10
11
12 1['id', 'user_id', 'role_id']
2
3
4"id: {0},\t" \
5"user_id: {1},\t" \
6"role_id: {2},\t".format(
7 self.id,
8 self.user_id,
9 self.role_id,
10)
11
12
将成员变量提取出来,用于完成update_from_db方法和__str__()方法。
对速率的优化
1、重要的修改
上面说到了,对超过20万条的数据,执行速度十分缓慢,而且通过测试,并不是如同在总结中所说的,是数据交换赋值的速度有问题,而是数据库插入操作的问题。对于多条数据,pymysql是推荐使用cursor.executemany(sql_str,values)来替代cursor.execute(sql_str)的,也不推荐用户自己拼接字符串。
所以应该使用cursor.executemany(sql_str,values)。结果显而易见,28万条数据的插入仅仅使用了1分钟便完成了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1def fast_save(self, model_class, context=None):
2 self.connector.connect()
3 if model_class:
4 table = model_class().column
5 fields = model_class().get_field()
6 if issubclass(model_class, MariaModel):
7 self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
8 self.connector.conn.commit()
9 sql_insert = "INSERT INTO `{0}`(".format(table)
10 for field in fields[:-1]:
11 sql_insert = sql_insert + "`{0}`,".format(field)
12 sql_insert = sql_insert + "`{0}`) VALUES (".format(fields[-1])
13 for field in fields[:-1]:
14 sql_insert = sql_insert + "%s,"
15 sql_insert = sql_insert + "%s);"
16 values = []
17 for item in context:
18 t = []
19 for field in fields:
20 t.append(getattr(item, field))
21 values.append(tuple(t))
22
23 self.connector.cursor.executemany(sql_insert, values)
24 self.connector.conn.commit()
25
26 self.connector.cursor.execute("SET FOREIGN_KEY_CHECKS = 1;")
27 self.connector.conn.commit()
28
29 self.connector.disconnect()
30 print("done")
31 else:
32 self.connector.disconnect()
33 else:
34 self.connector.disconnect()
35
36