MongoDB提供了很多可用命令,命令列表http://docs.mongodb.org/manual/reference/command/ 最近开发mongosync过程中,详细了解了oplog的解析,下面分享一下applyOps这个内部命令,该命令主要是同步时候使用: applyOps (internal) Parameters:

  • operations (array) – an array of operations to perform.
  • preCondition (array) – Optional. Defines one or more conditions that the destination must meet applying the entries from the array. Use ns to specify a namespace, q to specify a query and res to specify the result that the query should match. You may specify zero, one, or many preCondition documents.

applyOps provides a way to apply entries from an oplog created by replica set members and master instances in a master/slave deployment. applyOps is primarily an internal command to support sharding functionality, and has the following prototype form:

db.runCommand( { applyOps: [ <operations> ], preCondition: [ { ns: <namespace>, q: <query>, res: <result> } ] } )

applyOps applies oplog entries from the array, to the mongod instance. The preCondition array provides the ability to specify conditions that must be true in order to apply the oplog entry.

You can specify as many preCondition sets as needed. If you specify the ns option, applyOps will only apply oplog entries for the collection described by that namespace. You may also specify a query in the q field with a corresponding expected result in the res field that must match in order to apply the oplog entry.

Warning This command obtains a global write lock and will block other operations until it has completed.

实例分析: 1.从oplog找出test.t的一条oplog(先将test.t数据清空),使用applyOps replay oplog。

tom:PRIMARY> var ops=[]
tom:PRIMARY> ops.push(db.oplog.rs.findOne({ns:"test.t"}))
1
tom:PRIMARY> use test
switched to db test
tom:PRIMARY> db.runCommand({applyOps:ops,ns:"test.t"})
{ "applied" : 1, "results" : [ true ], "ok" : 1 }
tom:PRIMARY> db.t.find()
{ "_id" : ObjectId("5163ce2e56e97a973b7876a1"), "name" : "tt", "val" : 0 }

已成功执行,看到了该条数据。 2.将多条oplog replay。

tom:PRIMARY> use local
switched to db local
tom:PRIMARY> var c= db.oplog.rs.find({ns:"test.t",ts:{$gt:Timestamp(1365913506000,1)}})
tom:PRIMARY> var ops=[]
tom:PRIMARY> while(c.hasNext()) ops.push(c.next())
73458
tom:PRIMARY> db.runCommand({applyOps:ops,ns:"test.t"})
{
    "applied" : 73458,
    "results" : [
        true,
        true,
        true,
                 ...
        true
    ],
    "ok" : 1
}

3.preCondition的使用

tom:PRIMARY> db.runCommand({ applyOps: [ { ts: Timestamp (1365913250000,1848), h: 99854255531695219, v: 2, op: "i", ns: "test.task_3", o: { _id: ObjectId('516a2ea274d6427a83455d7f'), name: "ccj", v: 1847.0 } } ], preCondition : [ { ns: "test.task_3" } ] })
{
    "errmsg" : "exception: invalid parameter: expected an object (res)",
    "code" : 10065,
    "ok" : 0
}
tom:PRIMARY> db.runCommand({ applyOps: [ { ts: Timestamp (1365913250000,1848), h: 99854255531695219, v: 2, op: "i", ns: "test.task_3", o: { _id: ObjectId('516a2ea274d6427a83455d7f'), name: "ccj", v: 1847.0 } } ], preCondition : [ { ns: "test.task_3",q:{name:"ccj"},res:{name:"c"} } ] })
{
    "got" : {
        "_id" : ObjectId("516a2ea274d6427a83455d7f"),
        "name" : "ccj",
        "v" : 1847
    },
    "whatFailed" : {
        "ns" : "test.task_3",
        "q" : {
            "name" : "ccj"
        },
        "res" : {
            "name" : 1
        }
    },
    "errmsg" : "pre-condition failed",
    "ok" : 0
}
tom:PRIMARY> db.runCommand({ applyOps: [ { ts: Timestamp (1365913250000,1848), h: 99854255531695219, v: 2, op: "i", ns: "test.task_3", o: { _id: ObjectId('516a2ea274d6427a83455d7f'), name: "ccj", v: 1847.0 } } ], preCondition : [ { ns: "test.task_3",q:{name:"ccj"},res:{name:"ccj"} } ] })
{ "applied" : 1, "results" : [ true ], "ok" : 1 }
tom:PRIMARY> db.runCommand({ applyOps: [ { ts: Timestamp (1365913250000,1848), h: 99854255531695219, v: 2, op: "i", ns: "test.task_3", o: { _id: ObjectId('516a2ea274d6427a83455d7f'), name: "ccj", v: 1847.0 } } ], preCondition : [ { ns: "test.task_3",q:{name:"ccj"},res:{name:"ccj"} } ] })
{ "applied" : 1, "results" : [ true ], "ok" : 1 }
tom:PRIMARY> db.runCommand({ applyOps: [ { ts: Timestamp (1365913250000,1848), h: 99854255531695219, v: 2, op: "i", ns: "test.task_3", o: { _id: ObjectId('516a2ea274d6427a83455d7f'), name: "ccj", v: 1847.0 } } ], preCondition : [ { ns: "test.task_3",q:{name:"ccj"},res:{} } ] })
{ "applied" : 1, "results" : [ true ], "ok" : 1 }
tom:PRIMARY> db.runCommand({ applyOps: [ { ts: Timestamp (1365913250000,1848), h: 99854255531695219, v: 2, op: "i", ns: "test.task_3", o: { _id: ObjectId('516a2ea274d6427a83455d7f'), name: "ccj", v: 1847.0 } } ], preCondition : [ { ns: "test.task_3",q:{},res:{} } ] })
{ "applied" : 1, "results" : [ true ], "ok" : 1 }
tom:PRIMARY> 

刚开始出现errmsg,也是很疑惑,后来发现是这preCondition里面几个选项要全部写上。ns表示匹配op的ns,q对op进行查询,res是匹配查询结果,如果不匹配,”errmsg” : “pre-condition failed”。

源码分析: applyOps命令对应源码在src/mongo/db/oplog.cpp最后。

   class ApplyOpsCmd : public Command {
    public:
        virtual bool slaveOk() const { return false; }
        virtual LockType locktype() const { return WRITE; }
        virtual bool lockGlobally() const { return true; } // SERVER-4328 todo : is global ok or does this take a long time? i believe multiple ns used so locking individually requires more analysis
        ApplyOpsCmd() : Command( "applyOps" ) {}
        virtual void help( stringstream &help ) const {
            help << "internal (sharding)\n{ applyOps : [ ] , preCondition : [ { ns : ... , q : ... , res : ... } ] }";
        }
        virtual bool run(const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {

            if ( cmdObj.firstElement().type() != Array ) {
                errmsg = "ops has to be an array";
                return false;
            }

            BSONObj ops = cmdObj.firstElement().Obj();

            {
                // check input
                BSONObjIterator i( ops );
                while ( i.more() ) {
                    BSONElement e = i.next();
                    if ( e.type() == Object )
                        continue;
                    errmsg = "op not an object: ";
                    errmsg += e.fieldName();
                    return false;
                }
            }

            if ( cmdObj["preCondition"].type() == Array ) {
                BSONObjIterator i( cmdObj["preCondition"].Obj() );
                while ( i.more() ) {
                    BSONObj f = i.next().Obj();

                    BSONObj realres = db.findOne( f["ns"].String() , f["q"].Obj() );

                    Matcher m( f["res"].Obj() );
                    if ( ! m.matches( realres ) ) {
                        result.append( "got" , realres );
                        result.append( "whatFailed" , f );
                        errmsg = "pre-condition failed";
                        return false;
                    }
                }
            }

            // apply
            int num = 0;
            int errors = 0;

            BSONObjIterator i( ops );
            BSONArrayBuilder ab;

            while ( i.more() ) {
                BSONElement e = i.next();
                const BSONObj& temp = e.Obj();

                Client::Context ctx( temp["ns"].String() ); // this handles security
                bool failed = applyOperation_inlock( temp , false );
                ab.append(!failed);
                if ( failed )
                    errors++;

                num++;
            }

            result.append( "applied" , num );
            result.append( "results" , ab.arr() );

            if ( ! fromRepl ) {
                // We want this applied atomically on slaves
                // so we re-wrap without the pre-condition for speed

                string tempNS = str::stream() << dbname << ".$cmd";

                logOp( "c" , tempNS.c_str() , cmdObj.firstElement().wrap() );
            }

            return errors == 0;
        }

        DBDirectClient db;

    } applyOpsCmd;

}

主要是构造查询条件,核心是调用applyOperation_inlock函数,该函数是applyOps的核心:

/** [@param](/user/param) fromRepl false if from ApplyOpsCmd
    [@return](/user/return) true if was and update should have happened and the document DNE.  see replset initial sync code.
 */
bool applyOperation_inlock(const BSONObj& op, bool fromRepl, bool convertUpdateToUpsert) {
    LOG(6) << "applying op: " << op << endl;
    bool failedUpdate = false;

    OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters;

    const char *names[] = { "o", "ns", "op", "b" };
    BSONElement fields[4];
    op.getFields(4, names, fields);

    BSONObj o;
    if( fields[0].isABSONObj() )
        o = fields[0].embeddedObject();

    const char *ns = fields[1].valuestrsafe();

    Lock::assertWriteLocked(ns);

    NamespaceDetails *nsd = nsdetails(ns);

    // operation type -- see logOp() comments for types
    const char *opType = fields[2].valuestrsafe();

    if ( *opType == 'i' ) {
        opCounters->gotInsert();

        const char *p = strchr(ns, '.');
        if ( p && strcmp(p, ".system.indexes") == 0 ) {
            // updates aren't allowed for indexes -- so we will do a regular insert. if index already
            // exists, that is ok.
            theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
        }
        else {
            // do upserts for inserts as we might get replayed more than once
            OpDebug debug;
            BSONElement _id;
            if( !o.getObjectID(_id) ) {
                /* No _id.  This will be very slow. */
                Timer t;
                updateObjectsForReplication(ns, o, o, true, false, false, debug, false,
                                            QueryPlanSelectionPolicy::idElseNatural() );
                if( t.millis() >= 2 ) {
                    RARELY OCCASIONALLY log() << "warning, repl doing slow updates (no _id field) for " << ns << endl;
                }
            }
            else {
                // probably don't need this since all replicated colls have _id indexes now
                // but keep it just in case
                RARELY if ( nsd && !nsd->isCapped() ) { ensureHaveIdIndex(ns); }

                /* todo : it may be better to do an insert here, and then catch the dup key exception and do update
                          then.  very few upserts will not be inserts...
                          */
                BSONObjBuilder b;
                b.append(_id);
                updateObjectsForReplication(ns, o, b.done(), true, false, false , debug, false,
                                            QueryPlanSelectionPolicy::idElseNatural() );
            }
        }
    }
    else if ( *opType == 'u' ) {
        opCounters->gotUpdate();

        // probably don't need this since all replicated colls have _id indexes now
        // but keep it just in case
        RARELY if ( nsd && !nsd->isCapped() ) { ensureHaveIdIndex(ns); }

        OpDebug debug;
        BSONObj updateCriteria = op.getObjectField("o2");
        bool upsert = fields[3].booleanSafe() || convertUpdateToUpsert;
        UpdateResult ur =
            updateObjectsForReplication(ns,
                                        o,
                                        updateCriteria,
                                        upsert,
                                        /*multi*/ false,
                                        /*logop*/ false,
                                        debug,
                                        /*fromMigrate*/ false,
                                        QueryPlanSelectionPolicy::idElseNatural() );

        if( ur.num == 0 ) {
            if( ur.mod ) {
                if( updateCriteria.nFields() == 1 ) {
                    // was a simple { _id : ... } update criteria
                    failedUpdate = true;
                    log() << "replication failed to apply update: " << op.toString() << endl;
                }
                // need to check to see if it isn't present so we can set failedUpdate correctly.
                // note that adds some overhead for this extra check in some cases, such as an updateCriteria
                // of the form
                //   { _id:..., { x : {$size:...} }
                // thus this is not ideal.
                else {
                    if (nsd == NULL ||
                        (nsd->findIdIndex() >= 0 && Helpers::findById(nsd, updateCriteria).isNull()) ||
                        // capped collections won't have an _id index
                        (nsd->findIdIndex() < 0 && Helpers::findOne(ns, updateCriteria, false).isNull())) {
                        failedUpdate = true;
                        log() << "replication couldn't find doc: " << op.toString() << endl;
                    }

                    // Otherwise, it's present; zero objects were updated because of additional specifiers
                    // in the query for idempotence
                }
            }
            else { 
                // this could happen benignly on an oplog duplicate replay of an upsert
                // (because we are idempotent), 
                // if an regular non-mod update fails the item is (presumably) missing.
                if( !upsert ) {
                    failedUpdate = true;
                    log() << "replication update of non-mod failed: " << op.toString() << endl;
                }
            }
        }
    }
    else if ( *opType == 'd' ) {
        opCounters->gotDelete();
        if ( opType[1] == 0 )
            deleteObjects(ns, o, /*justOne*/ fields[3].booleanSafe());
        else
            verify( opType[1] == 'b' ); // "db" advertisement
    }
    else if ( *opType == 'c' ) {
        opCounters->gotCommand();
        BufBuilder bb;
        BSONObjBuilder ob;
        _runCommands(ns, o, bb, ob, true, 0);
    }
    else if ( *opType == 'n' ) {
        // no op
    }
    else {
        throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) );
    }
    return failedUpdate;
}

该函数主要对oplog的解析,比如insert,delete,update的操作,至此applyOps就分析完了。

话外

今天在测试mongooplogsync工具时,使用的是一个使用applyOps命令来replay oplog的分支版本(其他版本用自己的apply op方法)。 测试过程中出现:

{ applied: 1, results: [ false ], errmsg: "", ok: 0.0 }

显然是apply log失败了,查看oplog发现是很多update的op,由于这个工具是增量同步的,之前的集合数据并未导入过来,导入数据后,运行正常了。 进一步查看源码分析,为什么update的op失败就报这个错误。 从ApplyOpsCmd类中看调用是这么实现的

bool failed = applyOperation_inlock( temp , false );
ab.append(!failed);
...
result.append( "results" , ab.arr() );

results即ab的值,由于applyOperation_inlock返回值表示failedUpdate,正常情况是false,ab就为true,results正常情况返回true,因此分析是由于failedUpdate造成。applyOperation_inlock第二个参数false表示是从ApplyOpsCmd调用的,结合函数applyOperation_inlock看,第三个参数convertUpdateToUpsert未填,这就是问题所在,update不转换成upsert,在没有之前的数据情况下,更新失败。 另外从rs_sync.cpp中SyncTail::syncApply函数中有这么几句:

// For non-initial-sync, we convert updates to upserts
 // to suppress errors when replaying oplog entries.
bool ok = !applyOperation_inlock(op, true, convertUpdateToUpsert);

multiSyncApply中的设置

  void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) {
    initializeWriterThread();

    // convert update operations only for 2.2.1 or greater, because we need guaranteed
    // idempotent operations for this to work.  See SERVER-6825
    bool convertUpdatesToUpserts = theReplSet->oplogVersion > 1 ? true : false;
    ...
}

可以看出只要是非初始化同步情况下,MongoDB同步都是将update转换为upsert的,防止replay oplog过程中报错。 关于同步的分析,请看http://nosqldb.org/topic/514e6d9505c3fa4d47017da6

0 回复
需要 登录 后方可回复, 如果你还没有账号你可以 注册 一个帐号。