logo

有一篇帖子分析mongod的启动 http://nosqldb.org/topic/514bcd3c05c3fa4d470037a1 但还有关键的地方没有分析到。下面分析从mongod启动到同步数据过程(没数据时初始化同步)。 函数调用流程:

main->mongoDbMain->initAndListen->_initAndListen->listen->startReplication->
startReplSets->ReplSetImpl::_go()->startThreads->startSyncThread->syncThread
->_syncThread->无数据 syncDoInitialSync,有数据oplogApplication

代码:

  void listen(int port) {
        //testTheDb();
        MessageServer::Options options;
        options.port = port;
        options.ipList = cmdLine.bind_ip;

        MessageServer * server = createServer( options , new MyMessageHandler() );
        server->setAsTimeTracker();

        startReplication();
        if ( !noHttpInterface )
            boost::thread web( boost::bind(&webServerThread, new RestAdminAccess() /* takes ownership */));

#if(TESTEXHAUST)
        boost::thread thr(testExhaust);
#endif
        server->run();
    }


void startReplication() {
    /* if we are going to be a replica set, we aren't doing other forms of replication. */
    if( !cmdLine._replSet.empty() ) {
        if( replSettings.slave || replSettings.master ) {
            log() << "***" << endl;
            log() << "ERROR: can't use --slave or --master replication options with --replSet" << endl;
            log() << "***" << endl;
        }
        newRepl();

        replSet = true;
        ReplSetCmdline *replSetCmdline = new ReplSetCmdline(cmdLine._replSet);
        boost::thread t( boost::bind( &startReplSets, replSetCmdline) );

        return;
    }

    oldRepl();

    if( !replSettings.slave && !replSettings.master )
        return;

    {
        Lock::GlobalWrite lk;
        replLocalAuth();
    }

    if ( replSettings.slave ) {
        verify( replSettings.slave == SimpleSlave );
        log(1) << "slave=true" << endl;
        boost::thread repl_thread(replSlaveThread);
    }

    if ( replSettings.master ) {
        log(1) << "master=true" << endl;
        replSettings.master = true;
        createOplog();
        boost::thread t(replMasterThread);
    }

    while( replSettings.fastsync ) // don't allow writes until we've set up from log
        sleepmillis( 50 );
}




 /* forked as a thread during startup
       it can run quite a while looking for config.  but once found,
       a separate thread takes over as ReplSetImpl::Manager, and this thread
       terminates.
    */
    void startReplSets(ReplSetCmdline *replSetCmdline) {
        Client::initThread("rsStart");
        try {
            verify( theReplSet == 0 );
            if( replSetCmdline == 0 ) {
                verify(!replSet);
                return;
            }
            replLocalAuth();
            (theReplSet = new ReplSet(*replSetCmdline))->go();
        }
        catch(std::exception& e) {
            log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog;
            if( theReplSet )
                theReplSet->fatal();
        }
        cc().shutdown();
    }



 /* call after constructing to start - returns fairly quickly after launching its threads */
        void go() { _go(); }




   /* call after constructing to start - returns fairly quickly after launching its threads */
    void ReplSetImpl::_go() {
        try {
            loadLastOpTimeWritten();
        }
        catch(std::exception& e) {
            log() << "replSet error fatal couldn't query the local " << rsoplog << " collection.  Terminating mongod after 30 seconds." << rsLog;
            log() << e.what() << rsLog;
            sleepsecs(30);
            dbexit( EXIT_REPLICATION_ERROR );
            return;
        }

        changeState(MemberState::RS_STARTUP2);
        startThreads();
        newReplUp(); // oplog.cpp
    } 



/** called during repl set startup.  caller expects it to return fairly quickly.
    note ReplSet object is only created once we get a config - so this won't run
    until the initiation.
*/
void ReplSetImpl::startThreads() {
    task::fork(mgr);
    mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );

    if (myConfig().arbiterOnly) {
        return;
    }

    boost::thread t(startSyncThread);

    replset::BackgroundSync* sync = replset::BackgroundSync::get();
    boost::thread producer(boost::bind(&replset::BackgroundSync::producerThread, sync));
    boost::thread notifier(boost::bind(&replset::BackgroundSync::notifierThread, sync));

    task::fork(ghost);

    // member heartbeats are started in ReplSetImpl::initFromConfig
}


  void startSyncThread() {
        static int n;
        if( n != 0 ) {
            log() << "replSet ERROR : more than one sync thread?" << rsLog;
            verify( n == 0 );
        }
        n++;

        Client::initThread("rsSync");
        cc().iAmSyncThread(); // for isSyncThread() (which is used not used much, is used in secondary create index code
        replLocalAuth();
        theReplSet->syncThread();
        cc().shutdown();
    }


 void ReplSetImpl::syncThread() {
        while( 1 ) {
            // After a reconfig, we may not be in the replica set anymore, so
            // check that we are in the set (and not an arbiter) before
            // trying to sync with other replicas.
            if( ! _self ) {
                log() << "replSet warning did not receive a valid config yet, sleeping 20 seconds " << rsLog;
                sleepsecs(20);
                continue;
            }
            if( myConfig().arbiterOnly ) {
                return;
            }

            fassert(16113, !Lock::isLocked());

            try {
                _syncThread();
            }
            catch(DBException& e) {
                sethbmsg(str::stream() << "syncThread: " << e.toString());
                sleepsecs(10);
            }
            catch(...) {
                sethbmsg("unexpected exception in syncThread()");
                // TODO : SET NOT SECONDARY here?
                sleepsecs(60);
            }
            sleepsecs(1);
        }
    }



void ReplSetImpl::_syncThread() {
        StateBox::SP sp = box.get();
        if( sp.state.primary() ) {
            sleepsecs(1);
            return;
        }
        if( _blockSync || sp.state.fatal() || sp.state.startup() ) {
            sleepsecs(5);
            return;
        }

        /* do we have anything at all? */
        if( lastOpTimeWritten.isNull() ) {
            syncDoInitialSync();
            return; // _syncThread will be recalled, starts from top again in case sync failed.
        }

        /* we have some data.  continue tailing. */
        replset::SyncTail tail(replset::BackgroundSync::get());
        tail.oplogApplication();
    }

void ReplSetImpl::syncDoInitialSync() {
    const static int maxFailedAttempts = 10;
    createOplog();
    int failedAttempts = 0;
    while ( failedAttempts < maxFailedAttempts ) {
        try {
            _syncDoInitialSync();
            break;
        }
        catch(DBException& e) {
            failedAttempts++;
            str::stream msg;
            msg << "initial sync exception: ";
            msg << e.toString() << " " << (maxFailedAttempts - failedAttempts) << " attempts remaining" ;
            sethbmsg(msg, 0);
            sleepsecs(30);
        }
    }
    fassert( 16233, failedAttempts < maxFailedAttempts);
}
0 回复
需要 登录 后方可回复, 如果你还没有账号你可以 注册 一个帐号。