一、迁移MongoDB数据到MySQL
1、使用mongoexport导出.csv文件
mongodb自带mongoexport工具,可便捷导出csv、json等格式数据:
1
2
3 1mongoexport -h 127.0.0.1 -u username -p password -d userInfoDB(数据库名称) -c regInfo(集合名称) -f _id,字段1,字段2 --type=csv -o /tmp/mongoStore/userInfo.csv(保存路径)
2
3
根据个人需要选择要导出的字段,此处可以无须导出_id字段
2、新建数据库和数据表
按照个人需要设计数据表结构。此处注意数据表的字段顺序必须一一对应于csv文件中首行的key,所以对应数据表可以暂时先不用设置自增id,否则数据导入后数据表字段对应的值会混乱。
3、新建csv导入mysql的sql脚本
创建load_csv.sql文件
1
2
3
4
5
6
7 1load data local infile '/tmp/mongoStore/userInfo.csv'(修改为指定csv文件路径)
2into table `userInfo`(修改为mysql中新建数据表名称) character set utf8
3fields terminated by ',' optionally enclosed by '"'
4lines terminated by '\n'
5ignore 1 lines;
6
7
执行以下mysql load sql命令
1
2
3 1mysql -uroot -pmysql -DuserInfoDB --default-character-set=utf8 --local-infile=1 < ~/load_csv.sql
2
3
这样数据就从迁移到了mysql
如果mongodb中键比较多,可以通过如下方式获取keys
比较麻烦的点在于导出csv文件的时候要选择字段和新建mysql表的时候也要写全字段。
如下方式可以比较快的获取字段列表:
从mongodb获取一条数据
1
2
3
4
5
6 1$ mongo
2> use userInfoDB
3> db.regInfo.find().limit(1)
4{ "_id" : ObjectId("5ac3ac86af5b4e34af40xxxx"), "regAuthority" : "XXXX", "entranceName" : 1, "have_data_flag" : 1, "orgNumber" : "091xxxx", "termStart" : "2014-02-12", "businessScope" : "咨询"}
5
6
以上数据复制到python解释器中,使用python命令获取所有key的列表
(_id的值不符合dict格式,此处删掉)
1
2
3
4
5
6
7
8 1>>> import json
2>>> s = """
3... {"regAuthority" : "XXXX", "entranceName" : 1, "have_data_flag" : 1, "orgNumber" : "091xxxx", "termStart" : "2014-02-12", "businessScope" : "咨询"}"""
4>>> s_dict = json.loads(s)
5>>> s_dict.keys()
6['have_data_flag', 'termStart', 'regAuthority', 'orgNumber', 'entranceName', 'businessScope']
7
8
这样就获得了keys
二、Python迁移MySQL数据到MongoDB
1、Python模块
执行如下命令即可:
1
2
3
4 1 pip install pymysql
2 pip install pymongo
3
4
2、脚本内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 1#!/usr/bin/env python
2#-*- coding:utf-8 -*-
3import pymysql,pymongo,time
4# connect to mysql database
5mysql = pymysql.connect(host='127.0.0.1', database='database', user='username', password='password')
6cursor = mysql.cursor()
7
8#connect to mongodb and obtain total lines in mysql
9mongo = pymongo.MongoClient('mongodb://ip').database
10mongo.authenticate('username',password='password')
11
12cursor.execute('SELECT max(table_field) FROM table_name')
13countlines = cursor.fetchall()
14count = countlines[0]['max(table_field)']
15
16#count = 300
17print(count)
18
19i = 0
20j = 100
21start_time = time.time()
22#select from mysql to insert mongodb by 100 lines.
23for i in range(0,count,100):
24 #print(a,b)
25 #print(i)
26 #print('SELECT * FROM quiz_submission where quiz_submission_id > %d and quiz_submission_id <= %d' %(i,j))
27 submission = mysql.query(f'SELECT * FROM table_name where table_field > %d and table_field <= %d' %(i,j))
28 #print(submission)
29 if submission:
30 #collection_name like mysql table_name
31 mongo.collection_name.insert_many(submission)
32 else:
33 i +=100
34 j +=100
35 continue
36 i +=100
37 j +=100
38end_time = time.time()
39deltatime = end_time - start_time
40totalhour = int(deltatime / 3600)
41totalminute = int((deltatime - totalhour * 3600) / 60)
42totalsecond = int(deltatime - totalhour * 3600 - totalminute * 60)
43#print(migrate data total time consuming.)
44print("Data Migrate Finished,Total Time Consuming: %d Hour %d Minute %d Seconds" %(totalhour,totalminute,totalsecond))
45
46cursor.close()
47mysql.close()
48
49
三、使用pandas将MySQL的数据导入MongoDB
需求:把mysql的70万条数据导入到mongodb并去重,同时在第二列加入一个url字段,字段的值和第三列的值一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import pandas as pd
5from sqlalchemy import create_engine
6from pymongo import MongoClient
7import json
8import time
9
10class MongoBase:
11 def __init__(self, collection):
12 self.collection = collection
13 self.OpenDB()
14
15 def read_mysql(self):
16 engine = create_engine(
17 'mysql+pymysql://usernmae:passwd@ip:port/dbname?charset=utf8') # 用sqlalchemy创建引擎
18 start=time.time()
19 max_id=self.get_max_id()
20 df1 = pd.read_sql(f'select primary_key,phone,plat_code,crawl_time,jrjt_del_dt from test_info where primary_key>{max_id}', engine) # 从数据库中读取表存为DataFrame
21 end = time.time()
22 print("查询完毕条数",len(df1['phone']),"用时",end-start)
23 df1.drop_duplicates('phone', keep='first', inplace=True) #保持phone第一列位置不变
24 df1.insert(1, 'url', df1['phone']) #在phone列后面插入一个url列,值和phone的值一样
25 return df1
26
27 def OpenDB(self):
28 self.con = MongoClient(host=host)
29 self.db = self.con[self.collection]
30 self.collection = self.db['test']
31
32 def closeDB(self):
33 self.con.close()
34
35 def get_max_id(self):
36 max_id = self.collection.find().sort([('primary_key', -1)]).limit(1)[0]
37 if max_id:
38 return max_id.get("primary_key")
39
40if __name__ == '__main__':
41 start=time.time()
42 mongo = MongoBase('spider_data')
43 df =mongo.read_mysql()
44 mongo.collection.insert(json.loads(df.T.to_json()).values())
45 mongo.closeDB()
46 end=time.time()
47 print("运行完成所用时",end-start)
48
49