logo

Source

MongoDB Replication NetworkInterface

Replication是MongoDB一套非常复杂的功能,功能包括数据同步,选举,心跳维护等。涉及到与其他MongoD进程通讯。RPC的封装相对也比较重要,作为这些功能实现的基础。

Replication相关的对象角色都被封装在ReplicationCoordinatorImpl对象中:

repl::ReplicationCoordinatorImpl* replCoord =
    new repl::ReplicationCoordinatorImpl(
        getGlobalReplSettings(),
        new repl::ReplicationCoordinatorExternalStateImpl,
        new repl::NetworkInterfaceImpl,
        new repl::TopologyCoordinatorImpl(Seconds(repl::maxSyncSourceLagSecs)),
        static_cast(curTimeMillis64()));

本篇先从网路部分开始介绍NetworkInterfaceImpl

ReplicationExecutor::NetworkInterface

接口,定义了startCommand cancelCommand runCallbackWithGlobalExclusiveLock等方法,抽象Command的处理逻辑。同时还有start wait等一些线程管理接口,重点关注在command和callback相关的实现。

首先来说明下连接池的管理:

NetworkInterfaceImpl::ConnectionPool::ConnectionInfo

集成DBClientConnection外,还有一个creationDate成员,记录连接创建的时间,后面会根据这个时间来消耗过期的Connection。

NetworkInterfaceImpl::ConnectionPool::ConnectionPtr

保存的并不是Connection指针,而是一个list::iterator。构造该对象时,只需要指定HostAddress即可,而不是需要真正的Connection对象等。ConnectionPtr会根据HostAddress从Pool中获取(acquireConnection)相应的迭代器。

NetworkInterfaceImpl::ConnectionPool

核心类,连接池管理,不同与一般的连接池只维护同一个地址的多个连接。此连接池维护的是不同的地址,对应的多个连接,所有的空闲连接都存放在_connections成员中,使用中的连接存放在_inUseConnections,两个成员都被_mutex保护。调用者只要使用两个函数acquireConnectionreleaseConnection,需要连接时调用acquireConnection,使用结束后通过releaseConnection归还。

acquireConnection

作用是获取Connection,如果Pool中没有,则创建一个Connection。首先从刚才说的_connections中根据参数传入的HostAddress查找到相应的ConnectionList。

获取到ConnectionList后,先会去尝试清理掉过期的Connection(参考cleanUpOlderThan_inlock实现)。然后取出ConnectionList中的第一个Connection放入到_inUseConnections中。

如果没有找到相应HostAddress的连接,或者连接过期被释放,甚至连接已经不可用。则重新构建连接,并且发送鉴权信息,保证连接的合法性,且可用。

cleanUpOlderThan_inlock

这里的处理比较暴力,个人认为并不是非常合理。简单讲就是找到从创建开始30秒的链接去杀掉销毁。如果能修改成只销毁长时间没有被使用过的链接,效果会更好。明明是在长期使用的连接,也因为30秒的问题而被销毁。

Sock::isStillConnected

判断连接是否可用的,采用了socket pool的方式来判断,用非阻塞的方式查看Socket的POLLIN事件,如果Socket中存在数据,或者任何其他的错误事件被触发,则说明连接状态不可用,返回False。

Sock::connect

因为connect timeout是受syncookies影响,timeout时间会非常的久,所以要在创建另一个线程中进行connect操作,原线程wait指定的时间,超时则放弃。这里的实现比较重,需要开辟新的线程,比较好的做法是使用epoll非阻塞的方法,可以参考解决方案[Stackoverflow][1]。

releaseConnection

_inUseConnections再放回到_connections中,在List中的位置是头部。

NetworkInterfaceImpl

现在可以回头来继续说明NetworkInterfaceImpl

NetworkInterfaceImpl虽然通过ConnectionPool管理了Socket,自身则来管理了一个线程池,线程数量最多51,最少也维护了1个线程。command任务都提交到_pending成员中,工作线程从中获取到Command任务对象来执行,生产者消费者模式。工作线程的最大空闲时间也是30S,如果30S时间内,线程空闲数量小于_peding数量,则自动销毁资源。

每个提交上来的任务,都有一个cbHandle,作为Callback的句柄参数。onFinish任务处理完成,或者被Cancel的话,都会调用。对于Cancel需要说明,只能取消处于pending状态的任务。

_runCommand

有了前面的ConnectionPool,这里就简单了很多,只需要从中拿到连接对象,发送Command消息即可。如果异常情况出现,比如连接失败,则通过捕获异常来反馈。虽然ConnectionPool没有对连接数量做出上限控制,但NetworkInterfaceImpl控制了线程数量,所以连接数基本也是可控的。理论来说最多的连接数量等于地址数量*51,但实际情况远达不到。

[1]: http://stackoverflow.com/questions/2597608/c-socket-connection-timeout

0 回复
需要 登录 后方可回复, 如果你还没有账号你可以 注册 一个帐号。