在本系列的第一篇文章(主函数入口)中,介绍了mongodb会在系统启动同时,初始化了日志持久化服务,该功能貌似是1.7版本后引入到系统中的,主要用于解决因系统宕机时,内存中的数据未写入磁盘而造成的数据丢失。其机制主要是通过log方式定时将操作日志(如cud操作等)记录到db的journal文件夹下,这样当系统再次重启时从该文件夹下恢复丢失的(内存)数据。也就是在_initAndListen()函数体(db.cpp文件第511行)中下面这一行代码:
dur::startup();
今天就以这个函数为起点,看一下mongodb的日志持久化的流程,及实现方式。
在Mongodb中,提供持久化的类一般都以dur开头,比如下面几个:
dur.cpp:封装持久化主要方法和实现,以便外部使用
dur_commitjob.cpp:持久化任务工作(单元),封装延时队列TaskQueue<D>
,操作集合vector<shared_ptr<DurOp>>等
dur_journal.cpp:提供日志文件/路径,创建,遍历等操作 dur_journalformat.h:日志文件格式定义
dur_preplogbuffer.cpp:构造用于输出的日志buffer
dur_recover.h:日志恢复类(后台任务方式BackgroupJob)
dur_stats.h:统计类,包括提交
/
同步数据次数等
dur_writetodatafiles.cpp:封装写入数据文件mongofile方法
durop.h:持久化操作类,提供序列化,创建操作(FileCreatedOp),DROP操作(DropDbOp)
首先我们看一下dur::startup()方法实现(dur.cpp),如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1
2/*
3* at startup, recover, and then start the journal threads
4*/
5
6void startup() {
7 if( !cmdLine.dur )
8/*
9判断命令行启动参数是否为持久化
10*/
11 return;
12 DurableInterface::enableDurability();
13//对持久化变量 _impl 设置为DurableImpl方式
14
15 journalMakeDir();
16/*
17构造日志文件所要存储的路径:dur_journal.cpp
18*/
19 try{
20 recover();
21/*
22从上一次系统crash中恢复数据日志信息:dur_recover.cpp
23*/
24 }
25 catch(...) {
26 log() <<"exception during recovery" << endl;
27 throw;
28 }
29 preallocateFiles();
30 boost::thread t(durThrea);
31 }
32
注意:上面的DurableInterface,因为mongodb使用类似接口方式,从而约定不同的持久化方式实现,如下:
1
2
3
4
5
6
7
8
9 1
2class DurableInterface : boost::noncopyable {
3 virtual void* writingPtr(void *x, unsigned len) = 0;
4 virtual void createdFile(string filename, unsigned long long len) = 0;
5 virtual void declareWriteIntent(void *x, unsigned len) = 0;
6 virtual void * writingAtOffset(void *buf, unsigned ofs, unsigned len) = 0;
7 ....
8 }
9
接口定义了写文件的方式及方法等等。
并且mongodb包括了两种实现方式,即:
1
2
3
4
5
6
7
8
9
10
11
12
13 1
2class NonDurableImpl : public DurableInterface{
3/*
4非持久化,基于内存临时存储
5*/
6 }
7
8class DurableImpl : public DurableInterface {
9/*
10持久化,支持磁盘存储
11*/
12 }
13
再回到startup函数最后一行:boost::thread t(durThread);
该行代码会创建一个线程来运行durThread方法,该方法就是持久化线程,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 1void durThread() {
2 Client::initThread("dur");
3
4const int HowOftenToGroupCommitMs = 90;
5/*
6多少时间提交一组信息,单位:毫秒
7*/
8
9//注:commitJob对象用于封装并执行提交一组操作
10
11 while( !inShutdown() ) {
12 sleepmillis(10);
13 CodeBlock::Within w(durThreadMain);
14/*
15定义代码块锁,该设计很讨巧,接下来会介绍
16*/
17 try {
18 int millis = HowOftenToGroupCommitMs;
19 {
20 stats.rotate();
21//统计最新的_lastRotate信息
22
23 {
24 Timer t;
25/*
26声明定时器
27*/
28
29/*
30遍历日志文件夹下的文件并更新文件的“最新更新时间”标志位并移除无效或关闭之前使用的日志文件:dur_journal.cpp
31*/
32 journalRotate();
33 millis -= t.millis();
34/*
35线程睡眠时间为90减去遍历时间
36*/
37 assert( millis <= HowOftenToGroupCommitMs );
38 if( millis < 5 )
39 millis = 5;
40 }
41
42// we do this in a couple blocks, which makes it a tiny bit faster (only a little) on throughput,
43
44// but is likely also less spiky on our cpu usage, which is good:
45
46 sleepmillis(millis/2);
47
48//从commitJob的defer任务队列中获取任务并执行,详情参见: taskqueue.h的invoke() 和 dur_commitjob.cpp 的
49
50// Writes::D::go(const Writes::D& d)方法(用于非延迟写入信息操作)
51
52 commitJob.wi()._deferred.invoke();
53
54 sleepmillis(millis/2);
55
56//按mongodb开发者的理解,通过将休眠时间减少一半(millis/2)并紧跟着继续从队列中取任务,
57
58//以此小幅提升读取队列系统的吞吐量
59
60 commitJob.wi()._deferred.invoke();
61 }
62 go();
63//执行提交一组信息操作
64
65 }
66 catch(std::exception& e) {
67/*
68服务如果突然crash
69*/
70 log() << "exception in durThread causing immediate shutdown: " << e.what() << endl;
71 abort();
72// based on myTerminate()
73
74 }
75 }
76 cc().shutdown();
77//关闭当前线程,Client::initThread("dur")
78
79}
80
下面是go()的实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 1
2static void go() {
3 if( !commitJob.hasWritten() ){
4/*
5hasWritten一般在CUD操作时会变为true,后面会加以介绍
6*/
7 commitJob.notifyCommitted();
8/*
9发送信息已存储到磁盘的通知
10*/
11 return;
12 }
13 {
14 readlocktry lk("", 1000);
15/*
16声明读锁
17*/
18 if( lk.got() ) {
19 groupCommit();
20/*
21提交一组操作
22*/
23 return;
24 }
25 }
26
27// 当未取到读锁时,可能获取读锁比较慢,则直接使用写锁,不过写锁会用更多的RAM
28
29 writelock lk;
30 groupCommit();
31 }
32
33
34
35/*
36* locking: in read lock when called.
37*/
38
39static void _groupCommit() {
40 stats.curr->_commits++;
41/*
42提交次数加1
43*/
44 ......
45
46//预定义页对齐的日志缓存对象,该对象对会commitJob.ops()的返回值(该返回值类型vector< shared_ptr<DurOp> >)进行对象序列化
47
48//
49=并保存到commitJob._ab中,供下面方法调用,位于dur_preplogbuffer.cpp-->_PREPLOGBUFFER()方法
50
51 PREPLOGBUFFER();
52
53// todo : write to the journal outside locks, as this write can be slow.
54
55// however, be careful then about remapprivateview as that cannot be done
56
57// if new writes are then pending in the private maps.
58
59 WRITETOJOURNAL(commitJob._ab);
60/*
61写入journal信息,最终操作位于dur_journal.cpp的 Journal::journal(const AlignedBuilder& b)方法
62*/
63
64// data is now in the journal, which is sufficient for acknowledging getLastError.
65
66//
67 (ok to crash after that)
68
69 commitJob.notifyCommitted();
70 WRITETODATAFILES();
71/*
72写信息到mongofile文件中
73*/
74 commitJob.reset();
75/*
76重置当前任务操作
77*/
78
79// REMAPPRIVATEVIEW
80
81// remapping 私有视图必须在 WRITETODATAFILES 方法之后调用,否则无法读出新写入的数据
82
83 DEV assert( !commitJob.hasWritten() );
84 if( !dbMutex.isWriteLocked() ) {
85
86// this needs done in a write lock (as there is a short window during remapping when each view
87
88// might not exist) thus we do it on the next acquisition of that instead of here (there is no
89
90// rush if you aren't writing anyway -- but it must happen, if it is done, before any uncommitted
91
92// writes occur). If desired, perhpas this can be eliminated on posix as it may be that the remap
93
94// is race-free there.
95
96//
97
98 dbMutex._remapPrivateViewRequested = true;
99 }
100 else {
101 stats.curr->_commitsInWriteLock++;
102
103// however, if we are already write locked, we must do it now -- up the call tree someone
104
105// may do a write without a new lock acquisition. this can happen when MongoMMF::close() calls
106
107// this method when a file (and its views) is about to go away.
108
109//
110
111 REMAPPRIVATEVIEW();
112 }
113 }
114
到这里只是知道mongodb会定时从任务队列中获取相应任务并统一写入,写入journal和mongofile文件后再重置任务队列及递增相应统计计数信息(如privateMapBytes用于REMAPPRIVATEVIEW)。
但任务队列中的操作信息又是如何生成的呢?这个比较简单,我们只要看一下相应的cud数据操作时的代码即可,这里以插入(insert)数据为例:
我们找到pdfile.cpp文件的插入记录方法,如下(1467行):
1
2
3
4
5
6 1
2 DiskLoc DataFileMgr::insert(const char *ns, const void *obuf, int len, bool god, const BSONElement &writeId, bool mayAddIndex) {
3 ......
4 r = (Record*) getDur().writingPtr(r, lenWHdr);
5//位于1588行
6
该方法用于将客户端提交的数据(信息)写入到持久化队列(defer)中去,如下(按函数调用顺序):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1void* DurableImpl::writingPtr(void *x, unsigned len) {
2 void *p = x;
3 declareWriteIntent(p, len);
4 return p;
5}
6
7void DurableImpl::declareWriteIntent(void *p, unsigned len) {
8 commitJob.note(p, len);
9}
10
11void CommitJob::note(void* p, int len) {
12 DEV dbMutex.assertWriteLocked();
13 dassert( cmdLine.dur );
14 if( !_wi._alreadyNoted.checkAndSet(p, len) ) {
15 MemoryMappedFile::makeWritable(p, len);
16/*
17设置可写入mmap文件的信息
18*/
19 if( !_hasWritten ) {
20 assert( !dbMutex._remapPrivateViewRequested );
21
22// 设置写信息标志位, 用于进行_groupCommit(上面提到)时进行判断
23
24 _hasWritten = true;
25 }
26 ......
27
28// 向defer任务队列中加入操作信息
29
30 _wi.insertWriteIntent(p, len);
31 wassert( _wi._writes.size() < 2000000 );
32 assert( _wi._writes.size() < 20000000 );
33 ......
34}
35
36
其中insertWriteIntent方法定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1
2void insertWriteIntent(void* p, int len) {
3 D d;
4 d.p = p;
5/*
6操作记录record类型
7*/
8 d.len = len;
9/*
10记录长度
11*/
12 _deferred.defer(d);
13/*
14延期任务队列:TaskQueue<D>类型
15*/
16 }
17
到这里总结一下,mongodb在启动时,专门初始化一个线程不断循环(除非应用crash掉),用于在一定时间周期内来从defer队列中获取要持久化的数据并写入到磁盘的journal(日志)和mongofile(数据)处,当然因为它不是在用户添加记录时就写到磁盘上,所以按mongodb开发者说,它不会造成性能上的损耗,因为看过代码发现,当进行CUD操作时,记录(Record类型)都被放入到defer队列中以供延时批量(groupcommit)提交写入,但相信其中时间周期参数是个要认真考量的参数,系统为90毫秒,如果该值更低的话,可能会造成频繁磁盘操作,过高又会造成系统宕机时数据丢失过多。
最后对文中那个mongodb设置很计巧的代码做一下简要分析,代码如下:
CodeBlock::Within w(durThreadMain);
它的作为就是一个对多线程访问指定代码块加锁的功能,其类定义如下(位于race.h):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 1
2class CodeBlock {
3 volatile int n;
4 unsigned tid;
5 void fail() {
6 log() << "synchronization (race condition) failure" << endl;
7 printStackTrace();
8 abort();
9/**/
10 }
11 void enter() {
12 if( ++n != 1 ) fail();
13/*
14当已有线程执行该代码块时,则执行fail
15*/
16
17#if defined(_WIN32)
18 tid = GetCurrentThreadId();
19
20#endif
21 }
22 void leave() {
23/*
24只有调用 leave 操作,才会--n,即在线程执行完该代码块时调用
25*/
26 if( --n != 0 ) fail();
27 }
28 public:
29 CodeBlock() : n(0) { }
30 class Within {
31 CodeBlock& _s;
32 public:
33 Within(CodeBlock& s) : _s(s) { _s.enter(); }
34 ~Within() { _s.leave(); }
35 };
36 void assertWithin() {
37 assert( n == 1 );
38
39#if
40 defined(_WIN32)
41 assert( GetCurrentThreadId() == tid );
42
43#endif
44 }
45 };
46
47#else
48
49
通过其内部类Within的构造函数和析构函数,分别调用了_s.enter,_s.leave()方法,这样只要在一个代码块之前定义一个该类实例,则从下一行开始到codeblock结束之后,该进程内只允许一个线程执行该代码块,呵呵。