Mongodb源码分析–日志及持久化

释放双眼,带上耳机,听听看~!

在本系列的第一篇文章(主函数入口)中,介绍了mongodb会在系统启动同时,初始化了日志持久化服务,该功能貌似是1.7版本后引入到系统中的,主要用于解决因系统宕机时,内存中的数据未写入磁盘而造成的数据丢失。其机制主要是通过log方式定时将操作日志(如cud操作等)记录到db的journal文件夹下,这样当系统再次重启时从该文件夹下恢复丢失的(内存)数据。也就是在_initAndListen()函数体(db.cpp文件第511行)中下面这一行代码:

   dur::startup();

    今天就以这个函数为起点,看一下mongodb的日志持久化的流程,及实现方式。

    在Mongodb中,提供持久化的类一般都以dur开头,比如下面几个:

  dur.cpp:封装持久化主要方法和实现,以便外部使用
dur_commitjob.cpp:持久化任务工作(单元),封装延时队列TaskQueue<D>
 ,操作集合vector<shared_ptr<DurOp>>等
dur_journal.cpp:提供日志文件/路径,创建,遍历等操作 dur_journalformat.h:日志文件格式定义
dur_preplogbuffer.cpp:构造用于输出的日志buffer
dur_recover.h:日志恢复类(后台任务方式BackgroupJob)
dur_stats.h:统计类,包括提交
/
同步数据次数等
dur_writetodatafiles.cpp:封装写入数据文件mongofile方法
durop.h:持久化操作类,提供序列化,创建操作(FileCreatedOp),DROP操作(DropDbOp)

 

    首先我们看一下dur::startup()方法实现(dur.cpp),如下:        


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1
2/*
3* at startup, recover, and then start the journal threads 
4*/
5    
6void startup() {  
7   if( !cmdLine.dur ) 
8/*
9判断命令行启动参数是否为持久化
10*/
11           return;  
12       DurableInterface::enableDurability();
13//对持久化变量 _impl 设置为DurableImpl方式
14
15       journalMakeDir();
16/*
17构造日志文件所要存储的路径:dur_journal.cpp
18*/
19       try{  
20           recover(); 
21/*
22从上一次系统crash中恢复数据日志信息:dur_recover.cpp
23*/
24       }  
25       catch(...) {  
26           log() &lt;&lt;&quot;exception during recovery&quot; &lt;&lt; endl;  
27           throw;  
28       }  
29       preallocateFiles();  
30       boost::thread t(durThrea);  
31    }  
32

     注意:上面的DurableInterface,因为mongodb使用类似接口方式,从而约定不同的持久化方式实现,如下:


1
2
3
4
5
6
7
8
9
1  
2class DurableInterface : boost::noncopyable {  
3    virtual void* writingPtr(void *x, unsigned len) = 0;  
4    virtual void createdFile(string filename, unsigned long long len) = 0;  
5    virtual void declareWriteIntent(void *x, unsigned len) = 0;  
6    virtual void * writingAtOffset(void *buf, unsigned ofs, unsigned len) = 0;  
7    ....  
8   }  
9

    接口定义了写文件的方式及方法等等。  
并且mongodb包括了两种实现方式,即:


1
2
3
4
5
6
7
8
9
10
11
12
13
1    
2class NonDurableImpl : public DurableInterface{ 
3/*
4非持久化,基于内存临时存储
5*/
6    }  
7    
8class DurableImpl : public DurableInterface { 
9/*
10持久化,支持磁盘存储
11*/
12    }
13

 
再回到startup函数最后一行:boost::thread t(durThread);
该行代码会创建一个线程来运行durThread方法,该方法就是持久化线程,如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
1void durThread() {  
2    Client::initThread(&quot;dur&quot;);  
3    
4const int HowOftenToGroupCommitMs = 90;
5/*
6多少时间提交一组信息,单位:毫秒
7*/
8    
9//注:commitJob对象用于封装并执行提交一组操作
10
11    while( !inShutdown() ) {  
12        sleepmillis(10);  
13        CodeBlock::Within w(durThreadMain);
14/*
15定义代码块锁,该设计很讨巧,接下来会介绍
16*/
17        try {  
18            int millis = HowOftenToGroupCommitMs;  
19            {  
20                stats.rotate();
21//统计最新的_lastRotate信息
22
23                {  
24                    Timer t;
25/*
26声明定时器
27*/
28                    
29/*
30遍历日志文件夹下的文件并更新文件的“最新更新时间”标志位并移除无效或关闭之前使用的日志文件:dur_journal.cpp
31*/
32                    journalRotate();  
33                    millis -= t.millis();
34/*
35线程睡眠时间为90减去遍历时间
36*/
37                    assert( millis &lt;= HowOftenToGroupCommitMs );  
38                    if( millis &lt; 5 )  
39                        millis = 5;  
40                }  
41                
42// we do this in a couple blocks, which makes it a tiny bit faster (only a little) on throughput,  
43                
44// but is likely also less spiky on our cpu usage, which is good:
45
46                sleepmillis(millis/2);  
47                
48//从commitJob的defer任务队列中获取任务并执行,详情参见: taskqueue.h的invoke() 和 dur_commitjob.cpp 的  
49                
50// Writes::D::go(const Writes::D&amp; d)方法(用于非延迟写入信息操作)
51
52                commitJob.wi()._deferred.invoke();  
53                 
54                sleepmillis(millis/2);  
55                
56//按mongodb开发者的理解,通过将休眠时间减少一半(millis/2)并紧跟着继续从队列中取任务,  
57                
58//以此小幅提升读取队列系统的吞吐量
59
60                commitJob.wi()._deferred.invoke();  
61            }  
62            go(); 
63//执行提交一组信息操作
64
65        }  
66        catch(std::exception&amp; e) {
67/*
68服务如果突然crash
69*/
70            log() &lt;&lt; &quot;exception in durThread causing immediate shutdown: &quot; &lt;&lt; e.what() &lt;&lt; endl;  
71            abort(); 
72// based on myTerminate()
73
74        }  
75    }  
76    cc().shutdown();
77//关闭当前线程,Client::initThread(&quot;dur&quot;)
78
79}  
80

       下面是go()的实现代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
1        
2static void go() {  
3            if( !commitJob.hasWritten() ){ 
4/*
5hasWritten一般在CUD操作时会变为true,后面会加以介绍
6*/
7                commitJob.notifyCommitted();
8/*
9发送信息已存储到磁盘的通知
10*/
11                return;  
12            }  
13            {  
14                readlocktry lk(&quot;&quot;, 1000);
15/*
16声明读锁
17*/
18                if( lk.got() ) {  
19                    groupCommit();
20/*
21提交一组操作
22*/
23                    return;  
24                }  
25            }  
26            
27// 当未取到读锁时,可能获取读锁比较慢,则直接使用写锁,不过写锁会用更多的RAM
28
29            writelock lk;  
30            groupCommit();  
31        }  
32
33
34 
35/*
36* locking: in read lock when called. 
37*/
38        
39static void _groupCommit() {  
40            stats.curr-&gt;_commits++;
41/*
42提交次数加1
43*/
44            ......            
45            
46//预定义页对齐的日志缓存对象,该对象对会commitJob.ops()的返回值(该返回值类型vector&lt; shared_ptr&lt;DurOp&gt; &gt;)进行对象序列化  
47            
48//
49=并保存到commitJob._ab中,供下面方法调用,位于dur_preplogbuffer.cpp--&gt;_PREPLOGBUFFER()方法                    
50
51            PREPLOGBUFFER();  
52            
53// todo : write to the journal outside locks, as this write can be slow.  
54            
55//        however, be careful then about remapprivateview as that cannot be done  
56            
57//        if new writes are then pending in the private maps.
58
59            WRITETOJOURNAL(commitJob._ab);
60/*
61写入journal信息,最终操作位于dur_journal.cpp的 Journal::journal(const AlignedBuilder&amp; b)方法
62*/
63            
64// data is now in the journal, which is sufficient for acknowledging getLastError.  
65            
66//
67 (ok to crash after that)
68
69            commitJob.notifyCommitted();  
70            WRITETODATAFILES();
71/*
72写信息到mongofile文件中
73*/
74            commitJob.reset();
75/*
76重置当前任务操作
77*/
78            
79// REMAPPRIVATEVIEW  
80            
81// remapping 私有视图必须在 WRITETODATAFILES 方法之后调用,否则无法读出新写入的数据
82
83            DEV assert( !commitJob.hasWritten() );  
84            if( !dbMutex.isWriteLocked() ) {  
85                
86// this needs done in a write lock (as there is a short window during remapping when each view  
87                
88// might not exist) thus we do it on the next acquisition of that instead of here (there is no  
89                
90// rush if you aren&#x27;t writing anyway -- but it must happen, if it is done, before any uncommitted  
91                
92// writes occur).  If desired, perhpas this can be eliminated on posix as it may be that the remap  
93                
94// is race-free there.  
95                
96//  
97
98                dbMutex._remapPrivateViewRequested = true;  
99            }  
100            else {  
101                stats.curr-&gt;_commitsInWriteLock++;  
102                
103// however, if we are already write locked, we must do it now -- up the call tree someone  
104                
105// may do a write without a new lock acquisition.  this can happen when MongoMMF::close() calls  
106                
107// this method when a file (and its views) is about to go away.  
108                
109//  
110
111                REMAPPRIVATEVIEW();  
112            }  
113        }  
114

      
到这里只是知道mongodb会定时从任务队列中获取相应任务并统一写入,写入journal和mongofile文件后再重置任务队列及递增相应统计计数信息(如privateMapBytes用于REMAPPRIVATEVIEW)。
但任务队列中的操作信息又是如何生成的呢?这个比较简单,我们只要看一下相应的cud数据操作时的代码即可,这里以插入(insert)数据为例:
我们找到pdfile.cpp文件的插入记录方法,如下(1467行):


1
2
3
4
5
6
1
2   DiskLoc DataFileMgr::insert(const char *ns, const void *obuf, int len, bool god, const BSONElement &amp;writeId, bool mayAddIndex) {  
3    ......  
4    r = (Record*) getDur().writingPtr(r, lenWHdr);
5//位于1588行
6

     该方法用于将客户端提交的数据(信息)写入到持久化队列(defer)中去,如下(按函数调用顺序):


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1void* DurableImpl::writingPtr(void *x, unsigned len) {  
2        void *p = x;  
3        declareWriteIntent(p, len);  
4        return p;  
5}  
6
7void DurableImpl::declareWriteIntent(void *p, unsigned len) {  
8     commitJob.note(p, len);  
9}  
10
11void CommitJob::note(void* p, int len) {  
12     DEV dbMutex.assertWriteLocked();  
13     dassert( cmdLine.dur );  
14     if( !_wi._alreadyNoted.checkAndSet(p, len) ) {  
15         MemoryMappedFile::makeWritable(p, len);
16/*
17设置可写入mmap文件的信息
18*/
19         if( !_hasWritten ) {  
20             assert( !dbMutex._remapPrivateViewRequested );  
21             
22// 设置写信息标志位, 用于进行_groupCommit(上面提到)时进行判断
23
24             _hasWritten = true;  
25         }  
26         ......  
27         
28// 向defer任务队列中加入操作信息
29
30         _wi.insertWriteIntent(p, len);  
31         wassert( _wi._writes.size() &lt;  2000000 );  
32         assert(  _wi._writes.size() &lt; 20000000 );  
33         ......  
34}  
35
36

    其中insertWriteIntent方法定义如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1    
2void insertWriteIntent(void* p, int len) {  
3        D d;  
4        d.p = p;
5/*
6操作记录record类型
7*/
8        d.len = len;
9/*
10记录长度
11*/
12        _deferred.defer(d);
13/*
14延期任务队列:TaskQueue&lt;D&gt;类型
15*/
16    }
17

     到这里总结一下,mongodb在启动时,专门初始化一个线程不断循环(除非应用crash掉),用于在一定时间周期内来从defer队列中获取要持久化的数据并写入到磁盘的journal(日志)和mongofile(数据)处,当然因为它不是在用户添加记录时就写到磁盘上,所以按mongodb开发者说,它不会造成性能上的损耗,因为看过代码发现,当进行CUD操作时,记录(Record类型)都被放入到defer队列中以供延时批量(groupcommit)提交写入,但相信其中时间周期参数是个要认真考量的参数,系统为90毫秒,如果该值更低的话,可能会造成频繁磁盘操作,过高又会造成系统宕机时数据丢失过多。
最后对文中那个mongodb设置很计巧的代码做一下简要分析,代码如下:    

    CodeBlock::Within w(durThreadMain);

    它的作为就是一个对多线程访问指定代码块加锁的功能,其类定义如下(位于race.h):  


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

2class CodeBlock {  
3        volatile int n;  
4        unsigned tid;  
5        void fail() {  
6            log() &lt;&lt; &quot;synchronization (race condition) failure&quot; &lt;&lt; endl;  
7            printStackTrace();  
8            abort();
9/**/
10        }  
11        void enter() {  
12            if( ++n != 1 ) fail(); 
13/*
14当已有线程执行该代码块时,则执行fail
15*/
16
17#if defined(_WIN32)
18            tid = GetCurrentThreadId();  
19
20#endif
21        }  
22        void leave() { 
23/*
24只有调用 leave 操作,才会--n,即在线程执行完该代码块时调用
25*/
26            if( --n != 0 ) fail();  
27        }  
28    public:  
29        CodeBlock() : n(0) { }  
30        class Within {  
31            CodeBlock&amp; _s;  
32        public:  
33            Within(CodeBlock&amp; s) : _s(s) { _s.enter(); }  
34            ~Within() { _s.leave(); }  
35        };  
36        void assertWithin() {  
37            assert( n == 1 );  
38
39#if
40 defined(_WIN32)
41            assert( GetCurrentThreadId() == tid );  
42
43#endif
44        }  
45    };  
46      
47#else  
48
49

      
通过其内部类Within的构造函数和析构函数,分别调用了_s.enter,_s.leave()方法,这样只要在一个代码块之前定义一个该类实例,则从下一行开始到codeblock结束之后,该进程内只允许一个线程执行该代码块,呵呵。

 

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense 技巧提示100条

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索