MySQL和MongoDB数据相互迁移

释放双眼,带上耳机,听听看~!

一、迁移MongoDB数据到MySQL

1、使用mongoexport导出.csv文件
mongodb自带mongoexport工具,可便捷导出csv、json等格式数据:


1
2
3
1mongoexport -h 127.0.0.1 -u username -p password -d userInfoDB(数据库名称) -c regInfo(集合名称) -f _id,字段1,字段2 --type=csv -o /tmp/mongoStore/userInfo.csv(保存路径)
2
3

根据个人需要选择要导出的字段,此处可以无须导出_id字段

2、新建数据库和数据表
按照个人需要设计数据表结构。此处注意数据表的字段顺序必须一一对应于csv文件中首行的key,所以对应数据表可以暂时先不用设置自增id,否则数据导入后数据表字段对应的值会混乱。

3、新建csv导入mysql的sql脚本
创建load_csv.sql文件


1
2
3
4
5
6
7
1load data local infile '/tmp/mongoStore/userInfo.csv'(修改为指定csv文件路径)
2into table `userInfo`(修改为mysql中新建数据表名称) character set utf8
3fields terminated by ',' optionally enclosed by '"'
4lines terminated by '\n'
5ignore 1 lines;
6
7

执行以下mysql load sql命令


1
2
3
1mysql -uroot -pmysql -DuserInfoDB --default-character-set=utf8 --local-infile=1 < ~/load_csv.sql
2
3

这样数据就从迁移到了mysql

如果mongodb中键比较多,可以通过如下方式获取keys
比较麻烦的点在于导出csv文件的时候要选择字段和新建mysql表的时候也要写全字段。
如下方式可以比较快的获取字段列表:
从mongodb获取一条数据


1
2
3
4
5
6
1$ mongo
2> use userInfoDB
3> db.regInfo.find().limit(1)
4{ "_id" : ObjectId("5ac3ac86af5b4e34af40xxxx"), "regAuthority" : "XXXX", "entranceName" : 1, "have_data_flag" : 1, "orgNumber" : "091xxxx", "termStart" : "2014-02-12", "businessScope" : "咨询"}
5
6

以上数据复制到python解释器中,使用python命令获取所有key的列表
(_id的值不符合dict格式,此处删掉)


1
2
3
4
5
6
7
8
1>>> import json
2>>> s = """
3... {"regAuthority" : "XXXX", "entranceName" : 1, "have_data_flag" : 1, "orgNumber" : "091xxxx", "termStart" : "2014-02-12", "businessScope" : "咨询"}"""
4>>> s_dict = json.loads(s)
5>>> s_dict.keys()
6['have_data_flag', 'termStart', 'regAuthority', 'orgNumber', 'entranceName', 'businessScope']
7
8

这样就获得了keys

二、Python迁移MySQL数据到MongoDB

1、Python模块
执行如下命令即可:


1
2
3
4
1    pip install pymysql
2    pip install pymongo
3
4

2、脚本内容


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1#!/usr/bin/env python
2#-*- coding:utf-8 -*-
3import pymysql,pymongo,time
4# connect to mysql database
5mysql = pymysql.connect(host='127.0.0.1', database='database', user='username', password='password')
6cursor = mysql.cursor()
7
8#connect to mongodb and obtain total lines in mysql
9mongo = pymongo.MongoClient('mongodb://ip').database
10mongo.authenticate('username',password='password')
11
12cursor.execute('SELECT max(table_field) FROM table_name')
13countlines = cursor.fetchall()
14count = countlines[0]['max(table_field)']
15
16#count = 300
17print(count)
18
19i = 0
20j = 100
21start_time = time.time()
22#select from mysql to insert mongodb by 100 lines.
23for i in range(0,count,100):
24    #print(a,b)
25    #print(i)
26    #print('SELECT * FROM quiz_submission where quiz_submission_id > %d and  quiz_submission_id <= %d' %(i,j))
27    submission = mysql.query(f'SELECT * FROM table_name where table_field > %d and  table_field <= %d' %(i,j))
28    #print(submission)
29    if submission:
30        #collection_name like mysql table_name
31        mongo.collection_name.insert_many(submission)
32    else:
33        i +=100
34        j +=100
35        continue
36    i +=100
37    j +=100
38end_time = time.time()
39deltatime = end_time - start_time
40totalhour = int(deltatime / 3600)
41totalminute = int((deltatime - totalhour * 3600) / 60)
42totalsecond = int(deltatime - totalhour * 3600 - totalminute * 60)
43#print(migrate data total time consuming.)
44print("Data Migrate Finished,Total Time Consuming: %d Hour %d Minute %d Seconds" %(totalhour,totalminute,totalsecond))
45
46cursor.close()
47mysql.close()
48
49

三、使用pandas将MySQL的数据导入MongoDB

需求:把mysql的70万条数据导入到mongodb并去重,同时在第二列加入一个url字段,字段的值和第三列的值一样


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import pandas as pd
5from sqlalchemy import create_engine
6from pymongo import MongoClient
7import json
8import time
9
10class MongoBase:
11    def __init__(self, collection):
12        self.collection = collection
13        self.OpenDB()
14
15    def read_mysql(self):
16        engine = create_engine(
17          'mysql+pymysql://usernmae:passwd@ip:port/dbname?charset=utf8')  # 用sqlalchemy创建引擎
18        start=time.time()
19        max_id=self.get_max_id()
20        df1 = pd.read_sql(f'select primary_key,phone,plat_code,crawl_time,jrjt_del_dt from test_info where primary_key>{max_id}', engine)  # 从数据库中读取表存为DataFrame
21        end = time.time()
22        print("查询完毕条数",len(df1['phone']),"用时",end-start)
23        df1.drop_duplicates('phone', keep='first', inplace=True)   #保持phone第一列位置不变
24        df1.insert(1, 'url', df1['phone'])  #在phone列后面插入一个url列,值和phone的值一样
25        return df1
26
27    def OpenDB(self):
28        self.con = MongoClient(host=host)
29        self.db = self.con[self.collection]
30        self.collection = self.db['test']
31
32    def closeDB(self):
33        self.con.close()
34
35    def get_max_id(self):
36        max_id = self.collection.find().sort([('primary_key', -1)]).limit(1)[0]
37        if max_id:
38            return max_id.get("primary_key")
39
40if __name__ == '__main__':
41    start=time.time()
42    mongo = MongoBase('spider_data')
43    df =mongo.read_mysql()
44    mongo.collection.insert(json.loads(df.T.to_json()).values())
45    mongo.closeDB()
46    end=time.time()
47    print("运行完成所用时",end-start)
48
49

给TA打赏
共{{data.count}}人
人已打赏
安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

安全运维

设计模式的设计原则

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索