logo

现在把mongod启动过程接着梳理一下。源码根据2.4版本分析。
函数调用

main->mongoDbMain->initAndListen->_initAndListen->listen

mongoDbMain里主要包含以下几个函数(分别对应一个线程):

setupSignalHandlers();  
processCommandLineOptions(std::vector<std::string>(argv, argv + argc));
mongo::runGlobalInitializersOrDie(argc, argv, envp);
CmdLine::censor(argc, argv);
startSignalProcessingThread();
dataFileSync.go();
StartupTest::runTests();
initAndListen(cmdLine.port);

主要是信号处理,初始化,命令行选项处理,数据刷盘,这里主要分析监听initAndListen

_initAndListen主要包含:

Client::initThread("initandlisten");//初始化initandlisten线程
FileAllocator::get()->start();//文件分配相关
dur::startup();//启动时根据journal数据恢复;直接让线程运行函数boost::thread t(durThread);
Module::initAll();//模块初始化
snapshotThread.go();
d.clientCursorMonitor.go();
PeriodicTask::theRunner->go();
startTTLBackgroundJob();
listen(listenPort);

这里主要看后面几个:
其中dataFileSync,snapshotThread, clientCursorMonitor, PeriodicTask, startTTLBackgroundJob
是基于BackgroundJob类线程。
snapshotThread.go();
初始化名称为”snapshotthread”线程,周期的统计cpu和writelock信息
开启—cpu选项才会有意义,暂不深究。
SnapshotThread 继承自 BackgroundJob ,调用基类BackgroundJob 类go函数,
该函数主要是用到 boost::thread t( boost::bind( &BackgroundJob::jobBody , this, _status ) );
创建线程;调用BackgroundJob::jobBody,jobBody主要调用run()函数,该函数是纯虚函数,最终调用的是
SnapshotThread::run(),兜了一大圈回来了。

d.clientCursorMonitor.go();
类似调用clientCursorMonitor::run() 统计超时的cursors

PeriodicTask::theRunner->go();
调用 void PeriodicTask::Runner::run(),里面主要是60秒为一周期调用taskDoWork(),纯虚函数,需要看派生类,
主要派生类是mongos里WriteBackManager嵌套类里面的Cleaner类,和连接池里面DBConnectionPool类。
这里看下DBConnectionPool类

void DBConnectionPool::taskDoWork() { 
    vector<DBClientBase*> toDelete;

    {
        // we need to get the connections inside the lock
        // but we can actually delete them outside
        scoped_lock lk( _mutex );
        for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) {
            i->second.getStaleConnections( toDelete );
        }
    }

    for ( size_t i=0; i<toDelete.size(); i++ ) {
        try {
            onDestroy( toDelete[i] );
            delete toDelete[i];
        }
        catch ( ... ) {
            // we don't care if there was a socket error
        }
    }
}

这里面是删除连接池超时的连接数(超过30分钟)。

startTTLBackgroundJob();
这个也简单

   void startTTLBackgroundJob() {
        TTLMonitor* ttl = new TTLMonitor();
        ttl->go();
    } 

最终看下 TTLMonitor的run函数就可以了,处理TTL索引数据,删除过期数据。

listen(listenPort);
重点分析下listen

    void listen(int port) {
        //testTheDb();
        MessageServer::Options options;
        options.port = port;
        options.ipList = cmdLine.bind_ip;

        MessageServer * server = createServer( options , new MyMessageHandler() );
        server->setAsTimeTracker();

        logStartup();
        startReplication();
        if ( !noHttpInterface )
            boost::thread web( boost::bind(&webServerThread, new RestAdminAccess() /* takes ownership */));

#if(TESTEXHAUST)
        boost::thread thr(testExhaust);
#endif
        server->run();
    }

其中startReplication()在MongoDB源码分析—mongod启动到同步数据过程分析了,主要看下面两行消息处理:

  MessageServer * server = createServer( options , new MyMessageHandler() ); 
  server->run();

createServer初始化

 MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ) {
        return new PortMessageServer( opts , handler );
    }

server->run()中为纯虚函数,调用PortMessageServer的run(),

void run() {
            initAndListen();
        }

调用Listener::initAndListen(),以linux为例

#if !defined(_WIN32)
    void Listener::initAndListen() {
    ...
   accepted( pnewSock , myConnectionNumber );
 }

往下看

 void Listener::accepted(boost::shared_ptr<Socket> psocket, long long connectionId ) {
        MessagingPort* port = new MessagingPort(psocket);
        port->setConnectionId( connectionId );
        acceptedMP( port );
    }

Listener::acceptedMP为虚函数,实际调用的是PortMessageServer的acceptedMP
继承关系 class PortMessageServer : public MessageServer , public Listener

virtual void acceptedMP(MessagingPort * p) {
                 ...
                pthread_t thread;
                HandleIncomingMsgParam* himParam = new HandleIncomingMsgParam(p, _handler);
                int failed = pthread_create(&thread, &attrs, &handleIncomingMsg, himParam);
                ...
}

接着看handleIncomingMsg

static void* handleIncomingMsg(void* arg) {
               ...
                while ( ! inShutdown() ) {
                    m.reset();
                    p->psock->clearCounters();

                    if ( ! p->recv(m) ) {
                        if( !cmdLine.quiet ){
                            int conns = Listener::globalTicketHolder.used()-1;
                            const char* word = (conns == 1 ? " connection" : " connections");
                            log() << "end connection " << otherSide << " (" << conns << word << " now open)" << endl;
                        }
                        p->shutdown();
                        break;
                    }

                    handler->process( m , p.get() , le );
                    networkCounter.hit( p->psock->getBytesIn() , p->psock->getBytesOut() );
                }
           ...
}

主要就是一个循环了,主要看 handler->process( m , p.get() , le );,又是纯虚函数,实际调用db.cpp中MyMessageHandler的process。

virtual void process( Message& m , AbstractMessagingPort* port , LastError * le) {
            ...

                lastError.startRequest( m , le );

                DbResponse dbresponse;
                try {
                    assembleResponse( m, dbresponse, port->remote() );
                }
                catch ( const ClockSkewException & ) {
                    log() << "ClockSkewException - shutting down" << endl;
                    exitCleanly( EXIT_CLOCK_SKEW );
                }
}

主要就是assembleResponse了,处理各种类型消息。
总结一下,2.4版本消息处理流程:

listen(listenPort);->server->run(),PortMessageServer的run()->Listener::initAndListen()
-> accepted( pnewSock , myConnectionNumber );-> acceptedMP( port );实际是PortMessageServer的acceptedMP
->handleIncomingMsg-> handler->process( m , p.get() , le );实际MyMessageHandler的process。
->assembleResponse( m, dbresponse, port->remote() );

2.2版本略有不同:
handleIncomingMsg位置是threadRun。2.4版本的名称更清晰了。


ccj 于 2015-12-03 21:14 修改
0 回复
需要 登录 后方可回复, 如果你还没有账号你可以 注册 一个帐号。