ChargenServer服务器启动流程
在使用TCPServer时,用户需要注册connectionCallback_,messageCallback_,writeCompleteCallback_三个事件句柄,每当TCPServer中新构造一个新Acceptor对象。其构造函数会先初始化Channel成员对象acceptChannel_(loop, acceptSocket_.fd()),然后调用acceptChannel_.setReadCallback(boost::bind(&Acceptor::handleRead, this))将Acceptor的成员函数绑定为其Channel成员的事件句柄,并在TCPServer::start()调用acceptChannel_.enableReading()向loop登记轮询。并在接下来调用acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2));将TcpServer::newConnection赋值给Acceptor::newConnectionCallback_[newConnectionCallback_在Acceptor::handleRead回调]。
当TCPServer的Acceptor成员接收到一个新的连接请求后最终调用TcpServer::newConnectionCallback_函数,产生新的TcpConnection对象。在其构造函数中初始化成员Channel成员对象channel_(new Channel(loop, sockfd)),续而调用channel_->setReadCallback,channel_->setWriteCallback等将TcpConnection的成员函数绑定为其Channel的事件回调,并在随后调用ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn))向loop登记轮询。最后将TcpServer::connectionCallback_\messageCallback_\writeCompleteCallback_赋值给TcpConnection对应函数对象[分别在Acceptor::handleWrite/handleRead回调]。
其中,HttpProxy对象封装了http服务器的所有行为,用户需要实现onConnection,onMessage及onWriteComplete三个回调函数,并在构造函数中将三个函数注册为TcpServer对象的事件句柄。此处看似违背了线程安全,在构造函数中暴露this指针,实际上由于设计时不考虑跨线程使用HttpProxy的实例,并且在代码是现实强行限制了对象使用的线程,所以不存在线程安全方面的隐患。启动Tcp服务器的时序图如图6-X所示:

图1 启动Tcp服务器的时序图(错误:“登记事件轮训”非函数返回)
Tcp服务启动实际上是线程的事件循环启动,网络服务器的事件循环实际上是不断在while循环体的开始使用epoll系统调用轮训监听的套接字,查看是否有事件发生,若果有事件,则调用注册的事件句柄处理,事件处理时序图如图6-X所示:

图2 Tcp事件处理时序图(错误:“发送数据”非同步调用;epoll非函数返回,而是EventLoop的自调用)
函数调用流程:
>EventLoop loop;// 构造线程事件循环对象
| >poller_(Poller::newDefaultPoller(this)) // 初始化poller_,构造newDefaultPoller对象
| >wakeupChannel_(new Channel(this, wakeupFd_)) // 初始化wakeupChannel_,构造一个Channel对象
| >wakeupChannel_->setReadCallback(boost::bind(&EventLoop::handleRead, this)); // 为wakeupChannel_注册读事件回调
| >wakeupChannel_->enableReading() // Channel对象核心函数,将Channel对象在事件循环中登记,使Channel对象可读
| | >events_ |= kReadEvent; // 设置事件
| | >update(); // 注册wakeupChannel_对象到Poller的std::map<int, Channel*>容器
| | | >loop_->updateChannel(this);
| | | >poller_->updateChannel(channel);
| | | | // 将socket文件描述符添加struct pollfd数组,以便poll()函数遍历
| | | | >struct pollfd pfd;
| | | | >pfd.fd = channel->fd();
| | | | | >pfd.events = static_cast<short>(channel->events());
| | | | | >pfd.revents = 0;
| | | | | >pollfds_.push_back(pfd); // pollfds_为std::vector<struct pollfd>数组
| | | | | //
| | | | | >channels_[pfd.fd] = channel; // 将channel对象放入std::map<int, Channel*>
| | | | | |
>ChargenServer server(&loop, listenAddr, true); //构造用户服务器对象
| >server_(loop, listenAddr, "ChargenServer") // 初始化TCPServer server_对象
| | >acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)) // 初始化Acceptor acceptor_对象, 用于处理连接操作
| | | >acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())) // 初始化Socket acceptSocket_对象,创建非阻塞套接字
| | | >acceptChannel_(loop, acceptSocket_.fd()) // 为acceptSocket_绑定Channel对象,处理其IO操作
| | | | >loop_(loop) //初始化loop_指针
| | | | >fd_(fd__) // 初始化socket文件描述符
| | | // 绑定ip地址和端口
| | | >acceptSocket_.setReuseAddr(true);
| | | >acceptSocket_.setReusePort(reuseport);
| | | >acceptSocket_.bindAddress(listenAddr);
| | | //
| | | >acceptChannel_.setReadCallback(boost::bind(&Acceptor::handleRead, this)); // 为acceptChannel_绑定读事件回调
| | | | >readCallback_=cb //设置Channel acceptChannel_对象读回调成员readCallback_
| | | | >其中,Acceptor::handleRead函数
| | | | | >int connfd = acceptSocket_.accept(&peerAddr); // 回调handleRead函数意味着,有新连接请求,则需要调用::accept
| | | | | >newConnectionCallback_(connfd, peerAddr); // 调用Acceptor的连接请求回调
| | >connectionCallback_(defaultConnectionCallback) //为TCPServer server_对象绑定默认连接回调
| | >messageCallback_(defaultMessageCallback) //为TCPServer server_对象绑定默认消息回调
| | | >其中,defaultMessageCallback函数只负责写日志:
| | >acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2)); //为Acceptor acceptor_对象设置新连接回调newConnectionCallback_
| | | >其中,TcpServer::newConnection函数处理新连接请求:
| | | | >TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr)); // 构造新连接对象
| | | | | >socket_(new Socket(sockfd)) //初始化TCP连接的socket描述符
| | | | | >channel_(new Channel(loop, sockfd)) //为socket绑定Channel对象
| | | | | >localAddr_(localAddr),peerAddr_(peerAddr) // 记录连接两端的地址
| | | | | // 设置TcpConnection::channel_对象的事件处理函数。可以看出,实际上channel_调用事件回调的处理核心全部来自其父对象TcpConnection,使用boost::bind()原因是使回调函数不受普通成员函数的限制
| | | | | >channel_->setReadCallback(boost::bind(&TcpConnection::handleRead, this, _1));
| | | | | | >其中,TcpConnection::handleRead函数:
| | | | | | | >messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); // messageCallback_由TcpConnection::setMessageCallback()绑定
| | | | | >channel_->setWriteCallback(boost::bind(&TcpConnection::handleWrite, this));
| | | | | | >其中,TcpConnection::handleWrite函数:
| | | | | | | >sockets::write(channel_->fd(),outputBuffer_.peek(),outputBuffer_.readableBytes()); // 当socket可写时,调用sockets::write发送数据
| | | | | | | >loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this())); // 且在发送成功后调用发送完成回调
| | | | | >channel_->setCloseCallback(boost::bind(&TcpConnection::handleClose, this));
| | | | | | >其中, TcpConnection::handleClose函数:
| | | | | | | >connectionCallback_(guardThis);
| | | | | | | >closeCallback_(guardThis);????
| | | | | >channel_->setErrorCallback(boost::bind(&TcpConnection::handleError, this));
| | | | | | >其中, TcpConnection::handleError负责写日志
| | | | | //
| | | | >connections_[connName] = conn; // 新连接加入连接队列
| | | | // 为TcpConnection conn对象绑定连接,消息,发送,关闭回调。
| | | | >conn->setConnectionCallback(connectionCallback_); //设置TcpServer::connectionCallback_ -> TcpConnection::connectionCallback_
| | | | >conn->setMessageCallback(messageCallback_); //设置TcpServer::messageCallback_ -> TcpConnection::messageCallback_
| | | | >conn->setWriteCompleteCallback(writeCompleteCallback_); //设置TcpServer::writeCompleteCallback_ -> TcpConnection::writeCompleteCallback_
| | | | >conn->setCloseCallback(boost::bind(&TcpServer::removeConnection, this, _1)); //设置TcpServer::closeCallback_ -> TcpConnection::closeCallback_
| | | | | >其中,TcpServer::removeConnection ????
| | | | //
| | | | >ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn)); //调用EventLoop::runInLoop(const Functor& cb)函数的目的是当使用线程池启动多个事件循环线程时,将connectEstablished函数投递到其他事件循环线程调用,分摊负载
| | | | | >其中,TcpConnection::connectEstablished函数:
| | | | | | >channel_->enableReading(); //向loop登记新连接的channel对象
| | | | | | >connectionCallback_(shared_from_this());
| >server_.setConnectionCallback(boost::bind(&ChargenServer::onConnection, this, _1)); //为TcpServer注册connectionCallback_成员,连接相关(建立和关闭)事件句柄
| >server_.setMessageCallback(boost::bind(&ChargenServer::onMessage, this, _1, _2, _3)); //为TcpServer注册messageCallback_成员,可读事件句柄
| >server_.setWriteCompleteCallback(boost::bind(&ChargenServer::onWriteComplete, this, _1)); //为TcpServer注册writeCompleteCallback_成员,可写事件句柄
>server.start(); // 启动TCP服务器,实际上就是打开监听套接字
| >threadPool_->start(threadInitCallback_); // 启动线程池,如果需要的话,主要取决于numThreads_。当numThreads_==0时,线程池不启动
| >loop_->runInLoop(boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
| | 其中,Acceptor::listen函数:
| | >acceptSocket_.listen(); // 开始监听
| | | >sockets::listenOrDie(sockfd_);
| | >acceptChannel_.enableReading(); //向loop登记新连接的channel对象
>loop.loop();
| >while (!quit_){
| | >pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // 调用poller对象pool方法,获得有事件发生的socket描述符对应Channel数组
| | >for (ChannelList::iterator it = activeChannels_.begin();it != activeChannels_.end(); ++it){ // 遍历被**的Channel数组
| | | >currentActiveChannel_ = *it;
| | | >currentActiveChannel_->handleEvent(pollReturnTime_); // 调用处理事件的句柄
| | | | >handleEventWithGuard(receiveTime);
| | | | | >if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
| | | | | | >closeCallback_();
| | | | | >if (revents_ & POLLNVAL)
| | | | | >if (revents_ & (POLLERR | POLLNVAL))
| | | | | | >errorCallback_();
| | | | | >if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
| | | | | | >readCallback_(receiveTime);
| | | | | >if (revents_ & POLLOUT)
| | | | | | >writeCallback_();
| | }
| }