选取从哪个节点同步后,拉取oplog 1.Applies the op执行这个op日志 2.Writes the op to its own oplog (also local.oplog.rs)将这个op日志写入到自己的oplog中 3.Requests the next op请求下一个op日志secondary节点同步到哪了



How does primary know where secondary is synced to? Well, secondary is querying primary‘s oplog for more results. So, if secondary requests an op written at 3pm, primary knows seconday has replicated all ops written before 3pm.

So, it goes like: 1.Do a write on primary. 2.Write is written to the oplog on primary, with a field “ts” saying the write occurred at time t. 3.{getLastError:1,w:2} is called on primary. primary has done the write, so it is just waiting for one more server to get the write (w:2). 4.secondary queries the oplog on primary and gets the op 5.secondary applies the op from time t 6.secondary requests ops with {ts:{$gt:t}} from primary‘s oplog 7.primary updates that secondary has applied up to t because it is requesting ops > t. 8.getLastError notices that primary and secondary both have the write, so w:2 is satisfied and it returns.





A向B请求ops(写操作),B就转向C,这样来完成A的请求。 A B C <====> <====> <—->

<====> 是”真正”的同步连接. <—-> “ghost” connection,B代表A与C的连接。



如下7步: 1.Check the oplog. If it is not empty, this node does not initial sync, it just starts syncing normally. If the oplog is empty, then initial sync is necessary, continue to step #2:检查oplog,如果空的,需要进行初始化同步,否则进行普通的同步。 2.Get the latest oplog time from the source member: call this time start.取同步来源节点最新的oplog time,标记为start 3.Clone all of the data from the source member to the destination member.复制所有数据到目标节点 4.Build indexes on destination. 目标节点建索引,2.0版本包含在复制数据步骤里,2.2在复制数据后建索引。 5.Get the latest oplog time from the sync target, which is called minValid.取目标节点最新的oplog time,标记为minValid 6.Apply the sync target’s oplog from start to minValid.在目标节点执行start 到minValid之间的oplog 7.Become a “normal” member (transition into secondary state).成为正常的成员

个人理解,start 到minValid之间的oplog是复制过来的没有执行的oplog,没有完成最终一致性的那部分,就是一个oplog replay的过程。


/** * Do the initial sync for this member. There are several steps to this process: * * 1. Record start time. * 2. Clone. * 3. Set minValid1 to sync target’s latest op time. * 4. Apply ops from start to minValid1, fetching missing docs as needed. * 5. Set minValid2 to sync target’s latest op time. * 6. Apply ops from minValid1 to minValid2. * 7. Build indexes. * 8. Set minValid3 to sync target’s latest op time. * 9. Apply ops from minValid2 to minValid3. * * At that point, initial sync is finished. Note that the oplog from the sync target is applied * three times: step 4, 6, and 8. 4 may involve refetching, 6 should not. By the end of 6, * this member should have consistent data. 8 is “cosmetic,” it is only to get this member * closer to the latest op time before it can transition to secondary state. */

clone data 复制数据的过程:

for each db on sourceServer:
    for each collection in db:
        for each doc in db.collection.find():


好处:数据更紧凑,节省磁盘空间,因为所有操作都是insert。注意padding factor会设置为1。

不好的地方:同步速度太慢。使用fsync+lock 加写锁复制数据文件同步更快。


从哪个成员来同步数据(Who to sync from)


By default, the member syncs from the the closest member of the set that is either the primary or another secondary with more recent oplog entries. This prevents two secondaries from syncing from each other.


如在上一篇文章提到的日志里[rsSync] replSet syncing to:

这里syncing to 实际上是syncing from的意思,由于版本兼容原因沿用,正如kristina chodorow 所说’Backwards compatibility sucks’.

Replica Sets通过选择最邻近的节点(基于ping值),通过如下算法选择从哪个节点同步: for each member that is healthy: if member[state] == PRIMARY add to set of possible sync targets

if member[lastOpTimeWritten] > our[lastOpTimeWritten]
    add to set of possible sync targets

sync target = member with the min ping time from the possible sync targets

对于节点是否healthy的判断,各个版本不同,但是其目的都是找出正常运转的节点。在2.0版本中,它的判断还包括了salve delay这个因素。

在secondary运行db.adminCommand({replSetGetStatus:1}) 或者rs.status()命令来查看当前的节点状况,可以看到syncingTo这个字段,这个字段的值就是这个secondary的同步来源。


db.adminCommand( { replSetSyncFrom: "[hostname]:[port]" } )




Member* ReplSetImpl::getMemberToSyncTo() {
 lock lk(this);

bool buildIndexes = true;

// if we have a target we’ve requested to sync from, use it

if (_forceSyncTarget) {
 Member* target = _forceSyncTarget;
 _forceSyncTarget = 0;
 sethbmsg( str::stream() << “syncing to: ” << target->fullName() << ” by request”, 0);
 return target;

Member* primary = const_cast<Member*>(box.getPrimary());

// wait for 2N pings before choosing a sync target
 if (_cfg) {
 int needMorePings = config().members.size()*2 – HeartbeatInfo::numPings;

if (needMorePings > 0) {
 OCCASIONALLY log() << “waiting for ” << needMorePings << ” pings from other members before syncing” << endl;
 return NULL;

buildIndexes = myConfig().buildIndexes;

// If we are only allowed to sync from the primary, return that
 if (!_cfg->chainingAllowed()) {
 // Returns NULL if we cannot reach the primary
 return primary;

// find the member with the lowest ping time that has more data than me

// Find primary’s oplog time. Reject sync candidates that are more than
 // MAX_SLACK_TIME seconds behind.
 OpTime primaryOpTime;
 static const unsigned maxSlackDurationSeconds = 10 * 60; // 10 minutes
 if (primary)
 primaryOpTime = primary->hbinfo().opTime;
 // choose a time that will exclude no candidates, since we don’t see a primary
 primaryOpTime = OpTime(maxSlackDurationSeconds, 0);

if ( primaryOpTime.getSecs() < maxSlackDurationSeconds ) {
 // erh – I think this means there was just a new election
 // and we don’t yet know the new primary’s optime
 primaryOpTime = OpTime(maxSlackDurationSeconds, 0);

OpTime oldestSyncOpTime(primaryOpTime.getSecs() – maxSlackDurationSeconds, 0);

Member *closest = 0;
 time_t now = 0;

// Make two attempts. The first attempt, we ignore those nodes with
 // slave delay higher than our own. The second attempt includes such
 // nodes, in case those are the only ones we can reach.
 // This loop attempts to set ‘closest’.
for (int attempts = 0; attempts < 2; ++attempts) {
 for (Member *m = _members.head(); m; m = m->next()) {
 if (!m->hbinfo().up())
 // make sure members with buildIndexes sync from other members w/indexes
 if (buildIndexes && !m->config().buildIndexes)

if (!m->state().readable())

if (m->state() == MemberState::RS_SECONDARY) {
 // only consider secondaries that are ahead of where we are 只考虑OpTime在当前节点之前的节点
if (m->hbinfo().opTime <= lastOpTimeWritten)
 // omit secondaries that are excessively behind, on the first attempt at least.
 if (attempts == 0 &&
 m->hbinfo().opTime < oldestSyncOpTime)

// omit nodes that are more latent than anything we’ve already considered 忽略ping值延迟的
if (closest &&
 (m->hbinfo().ping > closest->hbinfo().ping))

if (attempts == 0 &&
 (myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) {
 continue; // skip this one in the first attempt

map<string,time_t>::iterator vetoed = _veto.find(m->fullName());
 if (vetoed != _veto.end()) {
 // Do some veto housekeeping
 if (now == 0) {
 now = time(0);

// if this was on the veto list, check if it was vetoed in the last “while”. 判断是否在否决列表里
// if it was, skip.
 if (vetoed->second >= now) {
 if (time(0) % 5 == 0) {
 log() << “replSet not trying to sync from ” << (*vetoed).first
 << “, it is vetoed for ” << ((*vetoed).second – now) << ” more seconds” << rsLog;
 // fall through, this is a valid candidate now
 // This candidate has passed all tests; set ‘closest’  满足所有条件,设置为最近的节点
closest = m;
 if (closest) break; // no need for second attempt

if (!closest) {
 return NULL;

sethbmsg( str::stream() << “syncing to: ” << closest->fullName(), 0);

return closest;





0 回复
需要 登录 后方可回复, 如果你还没有账号你可以 注册 一个帐号。