Press ESC to close

muduo网络库复习

muduo网络库

计算机网络之间的相互通信,是互联网最为基础的组成部分之一。在当前,借助 TCP/IP、HTTP 等协议,两台计算机实现数据通信并非困难的事情。但计算机之间的通信并非零成本,它需要消耗一定的计算机资源。因此,如何以最小的计算机资源维护更多的通信连接数量,成为网络技术领域中的一个重要课题。

在这方面,muduo 网络库表现出色,能够高效地管理计算机资源,维持大量的通信连接。其源码的编码思路精巧,设计模式科学合理,对于我来说,具有极高的学习价值,值得深入研究与学习.

1.五种常见I/O模型简介

在这之前需要了解计算机操作系统常见的五种I/O模型分别是:

1..阻塞I/O:当计算机进程运行到读写命令时,也就是调用阻塞 I/O 函数时,操作系统会将该线程或进程置于等待状态。在进行网络数据接收时,网络数据需要一定时间从发送端传输到接收端。在数据到达之前,执行接收操作的线程就会被阻塞。这时该进程是阻塞状态,直到数据准备完成套接字或文件描述符就绪是,该进程被唤醒,继续执行.这样会使程序运行变慢,这样的方式编程简单,但是性能不高.

2.非阻塞I/O:字面意思,与阻塞I/O相对,当执行I/O函数时进程不会阻塞而是直接向下运行,当处理别的事情一段时间后再次执行一次I/O函数,这个过程是通过I/O函数的返回值来控制的,以linux为例,当recv返回值>0时表示正常读取,若等于零表示对端关闭,若小于零则根据错误码来判断读取情况.非阻塞I/O的优点是可以充分利用cpu的资源,使用于需要高性能的场景,缺点是编码困难,增加cpu开销,因为需要不断轮询.

3.信号驱动I/O:信号驱动I/O的特点是I/O数据准备交给内核完成,通过注册一个信号与信号处理程序,当内核完成数据准备后发出信号软中断,来让用户执行I/O拷贝操作,优点是能够及时响应 I/O 事件,当 I/O 操作准备好时,应用程序可以立即得到通知并进行处理,降低 CPU 开销,与非阻塞 I/O 的轮询方式相比,信号驱动 I/O 不需要频繁地检查 I/O 状态,减少了 CPU 的无效开销,提高了 CPU 的利用率。

4.异步I/O:异步 I/O 允许应用程序在发起 I/O 请求后,不用等待 I/O 操作完成就可以继续执行其他任务。当 I/O 操作完成时,系统会通过特定的机制通知应用程序,应用程序再进行后续处理。特点是需要调用特定的异步I/O接口.

5.多路转接:多路转接是一个进程同时监控多个 I/O 文件描述符的状态变化,当其中一个或多个文件描述符准备好进行 I/O 操作时,进程能够得到通知并进行相应处理,而不必为每个文件描述符单独使用一个进程或线程来等待 I/O 操作完成,而这次学习的muduo网络库便是使用多路转接的方案.

2.linux下的epoll网络编程

我们一般的Linux网络编程使用Tcp服务器的步骤如下:

1.首先是创建套接字,

2.再绑定ip端口,设置监听套接字

3.再创建epoll模型,设置需要监听的事件

4.设置需要执行的业务代码,

5.启动服务器.

代码如下:

SOCK.hpp用于将套接字的初始化操作封装起来:

1 #pragma once

2

3 #include

4 #include

5 #include

6 #include

7 #include

8 #include

9 #include

10 #include

11 #include "log.hpp"

12

13 enum

14 {

15 USAGE_ERR = 1,//枚举变量用于退出码

16 SOCKET_ERR,

17 BIND_ERR,

18 LISTEN_ERR,

19 EPOLL_CREATE_ERR

20 };

21 class Sock

22 {

23 const static int backlog = 32;//listen函数的第二个参数,接口等待队列的最大长度

24

25 public:

26 static int Socket()//创建监听套接字

27 {

28 int _listen = socket(AF_INET, SOCK_STREAM, 0);

29 if (_listen < 0)

30 {

31 logmessage(FATAL, "create listencosk is fail");

32 exit(SOCKET_ERR);

33 }

34 logmessage(NORMAL, "create listensock is success : %d", _listen);

35 int opt=1;

36 setsockopt(_listen,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));//这里用于设置接口复用

37 return _listen;

38 }

39

40

41 static void Bind(int sock, int port)//将接口套接字与端口绑定

42 {

43 struct sockaddr_in local;

44 memset(&local, 0, sizeof(local));

45 local.sin_family = AF_INET;

46 local.sin_port = htons(port);

47 local.sin_addr.s_addr = INADDR_ANY;//上面用于初始化本地服务器属性

48 if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)//将端口与套接字绑定

49 {

50 logmessage(FATAL, "bind is fial");

51 exit(BIND_ERR);

52 }

53 logmessage(NORMAL, "bind is success");

54 }

55 static void Listen(int sock)//设置套接字监听

56 {

57 if (listen(sock, backlog) < 0)

58 {

59 logmessage(FATAL, "listen is error");

60 exit(LISTEN_ERR);

61 }

62 logmessage(NORMAL, "listen is success");

63 }

64 static int Accept(int listen, std::string *clientip, uint16_t *clientport)//使用监听套接字返回一个新的传输套接字

65 {

66 struct sockaddr_in peer;

67 socklen_t len = sizeof(peer);

68 int sock = accept(listen, (struct sockaddr *)&peer, &len);

69 if (sock < 0)

70 {

71 logmessage(FATAL, "accept is error");

72 }

73 else

74 {

75 logmessage(NORMAL, "accept is success get new sock: %d", sock);

76 *clientport = ntohs(peer.sin_port);//传入传出行参数用于返回远程端的接口与ip地址

77 *clientip = inet_ntoa(peer.sin_addr);

78 }

79 return sock;

80 }

81 };

EpollServer.hpp用于服务器编码:这是一个向客户端发送回显的服务器

1 #pragma once

2

3 #include "Sock.hpp"

4 #include

5 #include

6

7 namespace epoll_ns

8 {

9 static const int default_port = 8080;//默认端口

10 static const int size = 128;//epoll——create的默认参数,一般设置为非负数即可

11 static const int default_value = -1;//套接字的默认状态

12 static const int default_num = 64;//事件数量

13 using func = std::function;//回调函数

14 class EpollServer

15 {

16 public:

17 EpollServer(func _func, int num = default_num, uint16_t port = default_port) : func_(_func), _revt(nullptr), _num(num), _port(port), _epfd(-1), _listensock(-1){};

18 void Init()

19 {

20 _listensock = Sock::Socket();

21 Sock::Bind(_listensock, _port);

22 Sock::Listen(_listensock);//创建和初始化监听套接字

23

24 _epfd = epoll_create(size);//创建epoll模型

25 if (_epfd < 0)

26 {

27 logmessage(FATAL, "epoll create is error: %s", strerror(errno));//意外日志

28 exit(EPOLL_CREATE_ERR);

29 }

30 struct epoll_event ev;//设置epoll需要监听的时间,用于初始化epoll——ctl函数使用

31 ev.events = EPOLLIN;

32 ev.data.fd = _listensock;

33 epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);

34 _revt = new struct epoll_event[_num];//初始化io就绪事件队列,用于做epoll——wait函数参数

35 };

36 void handler(int num)

37 {

38 logmessage(NORMAL, "event have in");

39 for (int i = 0; i < num; i++)

40 {

41 if (_revt->data.fd == _listensock && _revt->events & EPOLLIN)//就绪的事件是一个litensock的读事件

42 {

43 uint16_t client_port;

44 std::string client_ip;

45 int n = Sock::Accept(_listensock, &client_ip, &client_port);

46 if (n > 0)

47 {

48 logmessage(NORMAL, "Accept is success");

49 struct epoll_event evt;

50 evt.events = EPOLLIN;

51 evt.data.fd = n;

52 epoll_ctl(_epfd, EPOLL_CTL_ADD, n, &evt);//将accept函数返回的套接字使用epoll——ctl函数注册进epoll模型中

53 }

54 else if (n < 0)

55 {

56 continue;//一个时间出现意外,跳过执行下一个事件

57 }

58 }

59 else if (_revt->events & EPOLLIN)//一个普通的套接字事件

60 {

61 char buffer[1024];

62 int n = recv(_revt->data.fd, buffer, sizeof(buffer) - 1, 0);

63 if (n == 0)

64 {

65 logmessage(WARNING, "client is close fd");

66 epoll_ctl(_epfd, EPOLL_CTL_DEL, _revt->data.fd, nullptr);//表示对面客户端已经关闭,epoll就不需要关注这个事件了

67 close(_revt->data.fd);//关闭文件描述符,先从epoll中删除正在关闭

68 }

69 else if (n < 0)

70 {

71 logmessage(FATAL, "recv is fail error:%s", strerror(errno));

72 epoll_ctl(_epfd, EPOLL_CTL_DEL, _revt->data.fd, nullptr);

73 close(_revt->data.fd);//同理上面

74 }

75 else if (n > 0)

76 {

77 buffer[n] = 0;

78 logmessage(DEBUG, "buffer:%s", buffer);

79 std::string respon = func_(buffer);//回调函数

80 send(_revt->data.fd, respon.c_str(), respon.size(), 0);//发送

81 }

82 }

83 }

84 logmessage(NORMAL, "event have out");

85 }

86 void Start()

87 {

88 for (;;)//服务器死循环

89 {

90 int n = epoll_wait(_epfd, _revt, _num, 1000);//epoll函数工作完成后输出型参数得到就绪事件队列

91 switch (n)

92 {

93 case 0:

94 logmessage(NORMAL, "timeout...");

95 break;

96 case -1:

97 logmessage(WARNING, "epoll_wait is fail code:%d,error:%s", errno, strerror(errno));

98 break;

99 default:

100 logmessage(NORMAL, "have event is ready");

101 handler(n);//就绪事件合法以后进入处理事件阶段,就绪事件队列作为一个类的成员变量,在任意函数内都可以使用

102 break;

103 }

104 }

105 };

106 ~EpollServer()

107 {

108 if (_listensock == default_value)

109 close(_listensock);

110 if (_epfd != default_value)

111 close(_epfd);

112 if (_revt)

113 delete[] _revt;

114 };

115

116 private:

117 int _epfd;

118 int _listensock;

119 uint16_t _port;

120 struct epoll_event *_revt;

121 int _num;

122 func func_;

123 };

124 }

测试代码:

#include "epoll_server.hpp"

#include

using namespace epoll_ns;

using namespace std;

void Usage(std::string proc)

{

std::cerr << "Usage:\n\t" << proc << "port" << std::endl;

}

std::string function1(std::string str)

{

return "I'am server: "+str;

}

int main(int argc, char *argv[])

{

if (argc != 2)

{

Usage(argv[0]);

exit(USAGE_ERR);

}

uint16_t port=atoi(argv[1]);

unique_ptr esvr(new EpollServer(function1));

esvr->Init();

esvr->Start();

return 0;

}

以上可以看出linux提供的系统调用已经可以满足网络数据的传输,而muduo网络库则对epoll进行了更进一步的封装,并且加入了线程池的设计使其性能更好。

3.使用muduo网络库完成服务器编程

以下是使用muduo完成客户端数据回显,可以看出代码量大大减少.

1 #include //包含头文件

2 #include

3 #include

4 #include

5 using namespace std;

6 using namespace muduo;

7 using namespace muduo::net;

8

9 class ChatServer

10 {

11 public:

12 ChatServer(EventLoop *loop,

13 const InetAddress &listenAddr,

14 const string &nameArg) : _server(loop, listenAddr, nameArg), _loop(loop)

15 {

16 // 注册连接事件回调

17 _server.setConnectionCallback(std::bind(&ChatServer::onConnection, this, placeholders::_1));

18 // 注册读写事件回调

19 _server.setMessageCallback(std::bind(&ChatServer::onMessage, this, placeholders::_1, placeholders::_2, placeholders::_3));

20 // 设置线程数量

21 _server.setThreadNum(4);

22 }

23 void start(){

24 _server.start();

25 }

26

27 private:

28 // 处理用户的连接

29 void onConnection(const TcpConnectionPtr &conn)

30 {

31 cout << conn->peerAddress().toIpPort() << "->" << conn->localAddress().toIpPort() << "is" << (conn->connected() ? "UP" : "DOWN") << endl;

32 }

33 // 处理用户的读写事件

34 void onMessage(const TcpConnectionPtr &conn, Buffer *buffer, Timestamp time)

35 {

36 string buf=buffer->retrieveAllAsString();

37 cout<<"recv data:"<

38 conn->send(buf);

39 }

40 TcpServer _server;

41 EventLoop *_loop;

42 };

43 int main(){

44 EventLoop *loop=new EventLoop();

45 InetAddress addr("127.0.0.1",8080);

46 ChatServer _ser(loop,addr,"ChatServer");

47 _ser.start();

48 loop->loop();

49 }

4.reactor模型简介

muduo使用reactor模型:

reactor 模型是一种事件驱动的 I/O 多路复用模型,广泛应用于高性能网络服务器中。它将 I/O 事件的处理分为几个关键部分,每个部分负责不同的职责。以下为 reactor 模型的主要组成部分及其功能:

4.1.事件循环Event Loop

EventLoop 是整个 Reactor 模型的核心,负责监听和分发事件。 它使用 I/O 多路复用机制epoll监听多个文件描述符上的事件。 当有事件发生时,唤醒事件循环并调用相应的事件处理器。 管理回调任务队列,确保异步任务能够及时执行.对应muduo代码:

muduo库中eventloop被设计成池化结构,每一个loop对应一个线程,其中eventloop主要的成员为poller和channellist,当poller检测到对应套接字事件触发时,会将对应的channel加入到,eventloop的channellist中,而eventloop会不断检测channellist中的事件执行channel中的回调函数,并触发用户设置的回调函数.

1 #pragma once

2 #include

3 #include

4 #include

5 #include

6 #include

7 #include

8 #include

9 #include "CurrentThread.h"

10 #include "nocopy.h"

11 #include"TimeStamp.h"

12 #include"Poller.h"

13 // #pragma once

14

15 // #include

16 // #include

17 // #include

18 // #include

19 // #include

20

21 // #include "nocopy.h"

22 // #include "TimeStamp.h"

23 // #include "CurrentThread.h"

24

25 class Channel;

26 class Poller;

27

28

29 class EventLoop : nocopyable

30 {

31 public:

32 using Functor = std::function;

33 EventLoop() ;

34

35 ~EventLoop();

36

37

38 void loop();

39

40 void quit();

41

42 TimeStamp pollReturnTime() const { return pollReturnTime_; }

43 // 在当前loop中执行cb

44 void runInLoop(Functor cb);

45

46 // 把cb放入队列中,唤醒loop所在的线程,执行cb

47 void queueInLoop(Functor cb);

48

49

50 // 唤醒loop所在的线程

51 void wakeup();

52

53 // poller中的方法

54 void updateChannel(Channel *channel);

55

56 void removeChannel(Channel *channel);

57

58 bool hasChannel(Channel *channel);

59

60 // 判断loop对象是否在当前线程

61 bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

62

63 private:

64 void handleRead();

65 // 唤醒

66 void doPendingFunctors();

67 // 执行函数列表回调

68 using ChanneList = std::vector;

69

70 std::atomic_bool looping_;

71 std::atomic_bool quit_; // 原子操作

72 std::atomic_bool callingPendingFunctions_;

73 const pid_t threadId_; // 记录当前线程id

74 TimeStamp pollReturnTime_;

75

76 std::unique_ptr poller_;

77 int wakeup_; // 当loop获取到一个新用户的channel使用轮询唤醒一个subloop处理channel

78 std::unique_ptr wakeupChannel_;

79 Poller::ChannelList activeChannels_;

80 std::vector pendingFunctors_;

81 std::mutex mutex_; // 互斥锁,同步pending的多线程操作

82 };

4.2.事件源

Event Source 事件源是任何可以产生事件的对象,比如文件描述符、套接字,我们网络编程一般指套接字。 作用是将事件注册到事件循环中,等待事件的发生。 当事件发生时,触发相应的回调函数。muduo库中为channel。如下:

channel主要封装了fd套接字,event感兴趣的事件,revent表示poller返回的具体发生的事件,和handler执行相应的回调函数。

1 #pragma once

2

3 #include "nocopy.h"

4 #include "TimeStamp.h"

5

6 #include

7 #include

8

9 class EventLoop;

10 class Channel : nocopyable

11 {

12 public:

13 typedef std::function EventCallBack;

14 typedef std::function ReadEventCallBack;

15 Channel(EventLoop *loop, int fd);

16 // 事件处理函数

17 void handlerEvevt(TimeStamp recvTime);

18 void handlerEventWithGuard(TimeStamp receiveTime);

19

20 void ReadCallBack(ReadEventCallBack cb);

21 void writeCallBack(EventCallBack cb);

22 void errorCallBack(EventCallBack cb);

23 void closeCallBack(EventCallBack cb);

24 void tie(const std::shared_ptr &obj);

25 // 当channel改变fd中的event后,更新epoll里相应的事件

26

27 int fd() const;

28 int events() const;

29 void set_events(int revt) { revents_ = revt; };

30

31 bool isNoneEvent() const;

32

33 // 设置fd对应状态

34 void enableReading();

35 void disableReading();

36 void enableWriting();

37 void disableWriting();

38

39 void disableAll();

40

41 bool isWriting() const;

42 bool isReading() const;

43

44 void setindex(int idx);

45 int index();

46 // 在Channel所属的EventLoop中,删除当前的Channel

47 void remove();

48 ~Channel();

49

50 private:

51 void update();

52

53 static const int KnoneEvent_;

54 static const int KReadEvent_;

55 static const int KWriteEvent_;

56 EventLoop *loop_; // 事件循环

57 const int fd_; // poller监听对象

58 int events_; // 注册事件

59 int revents_; // poller返回具体发生的事件

60 int index_;

61

62 std::weak_ptr tie_;

63 bool tied_;

64

65 EventCallBack writeCallBack_;

66 EventCallBack closeCallBack_;

67 ReadEventCallBack readCallBack_;

68 EventCallBack errorCallBack_;

69 };

4.3. 事件处理器 Event Handler

事件处理器是具体的事件处理逻辑,通常以回调函数的形式实现。 根据事件类型,读事件、写事件、错误事件执行相应的操作。网络中就对应数据新建,读取、写入、关闭连接操作。

muduo中代码:

muduo中将channel分为两类一类是普通读写事件,另一类是链接建立事件

1 #pragma once

2 #include "nocopy.h"

3 #include "Socket.h"

4 #include "Channel.h"

5

6 #include

7 class EventLoop;

8 class InetAddress;

9 class Acceptor : nocopyable

10 {

11 public:

12 using NewConnectionCallback = std::function;

13 Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport);

14 ~Acceptor();

15 void setNewConnectionCallback(const NewConnectionCallback &cb)

16 {

17 newConnectionCallback_ = cb;

18 }

19 bool listenning() const { return listenning_; }

20 void listen();

21 private:

22 void headleRead();

23 EventLoop *loop_; // 这里是mainloop

24 Socket acceptSocket_;

25 Channel acceptChannel_;

26 NewConnectionCallback newConnectionCallback_;

27 bool listenning_;

28 };

1 #pragma once

2 #include "nocopy.h"

3 #include

4 #include

5 #include

6 #include "InetAddress.h"

7 #include "Callbacks.h"

8 #include "Buffer.h"

9 #include "TimeStamp.h"

10

11 /**

12 * TcpServer => Acceptor => 有一个新用户链接,通过Acceptor拿到connfd

13 *

14 * Tcpconnection通过设置回调=>将回调设置给Channel

15 *

16 * 然后Channel即将回调注册到Poller上,Poller监听到Channel的相应事件后进行回调操作

17 *

18 */

19 class Channel;

20 class EventLoop;

21 class Socket;

22 class TcpConnection : nocopyable, public std::enable_shared_from_this

23 {

24 public:

25 TcpConnection(EventLoop *loop, const std::string &name, int socket, const InetAddress &localAddr,

26 const InetAddress &peerAddr);

27 ~TcpConnection();

28 EventLoop *getLoop() const

29 {

30 return loop_;

31 }

32 const std::string &name() const { return name_; }

33 const InetAddress &localAddr() const { return localAddr_; }

34 const InetAddress &peerAddr() const { return peerAddr_; }

35 bool connected() const { return state_ == kConnected; }

36 void send(const void *meaasge, int len);

37 void shutdown();

38

39 void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }

40

41 void setMessagecallback(const MessageCallback &cb) { messageCallback_ = cb; }

42

43 void setWriteComplateCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }

44

45 void setCloseCallback(const CloseCallback &cb) { closeCallback_ = cb; }

46

47 void setHighWriteMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark)

48 {

49 highWaterMarkCallback_ = cb;

50 highWaterMark_ = highWaterMark;

51 }

52

53 // 建立于销毁链接

54 void connectionEstablished();

55 void connectionDestoryed();

56 void send(const std::string &buf);

57 void shutdownInLoop();

58 void sendInLoop(const void *message, size_t len);

59

60 private:

61 enum StateE

62 {

63 kDisconnected,

64 kConnecting,

65 kConnected,

66 kDisconnecting

67 };

68

69 void setstate(StateE state) { state_ = state; }

70 void handleRead(TimeStamp receiveTime);

71 void handleWrite();

72 void handleClose();

73 void handleError();

74

75 EventLoop *loop_; // 这里的Loop一定是subloop而不是basicloop

76 const std::string name_;

77 std::atomic_int state_;

78 bool reading_;

79

80 std::unique_ptr socket_;

81 std::unique_ptr channel_;

82

83 const InetAddress localAddr_;

84 const InetAddress peerAddr_;

85

86 ConnectionCallback connectionCallback_;

87 MessageCallback messageCallback_;

88 WriteCompleteCallback writeCompleteCallback_;

89 HighWaterMarkCallback highWaterMarkCallback_;

90 CloseCallback closeCallback_;

91 size_t highWaterMark_;

92

93 Buffer inputBuffer_;

94 Buffer outputBuffer_;

95 };

4.4. 多路复用器Poller

负责管理多个文件描述符,并使用高效的 I/O 多路复用机制来监听这些文件描述符上的事件。 提供统一的接口来注册、修改和删除文件描述符的事件监听。 使用底层的系统调用 epoll_wait来高效地检测事件。

1 #pragma once

2 #include

3 #include

4 #include"TimeStamp.h"

5 #include"nocopy.h"

6 class EventLoop;

7 class Channel;

8 class Poller : nocopyable

9 {

10 public:

11 using ChannelList = std::vector;

12 Poller(EventLoop *loop) ;

13 virtual ~Poller() ;

14 // 给所有的IO复用保留统一的接口

15 virtual TimeStamp poll(int timeoutMs, ChannelList *activeChannells) = 0;

16 virtual void updateChannel(Channel *channel) = 0;

17 virtual void removeChannel(Channel *channel) = 0;

18 // 判断channel是否在当前Poller中

19 bool hasChannel(Channel *channel) const;

20

21 // EventLoop获取默认的IO复用实现

22 static Poller *newDefaultPoller(EventLoop *loop);

23

24 protected:

25 using ChannelMap = std::unordered_map;

26 ChannelMap channels_;

27

28 private:

29 EventLoop *ownLoop_; // 定义poller所属的事件循环

30 };

5.对reactor各个部分进行分析

首先我们提到Channel事件源,它主要封装Fd和events和Revents还有一组回调函数,而这个Fd表示就是需要往Poller上注册的文件描述符, 而Rvents表示要往poller上感兴趣的事件, Revents表示Poller返回的具体发生的时间根据相应的事件执行相应的回调。 Channel有两种种类,一种是监听套节字的AcceptorChannel,另一种是普通的用户多写事件的ConnectionChannel,这两种Channel统一注册进 Loop中相应的Poller中, 而Poller的底层是一个Epoll,当epoll返回后,可以通过Fd找到Channel调用相应的回调, 之后需要Eventloop将Channel与Poller相互联系起来。

Poller也就是多路复用器,他其中封装了一个Map键值对,其中的Key对应fd,Value对应Channel对象,若检测到哪个Fd有时间发生,则找到对应的Channel进一步调用Channel中的相应的回调函数,将这个Channel写入EventLoop中,Channel与Poller相互独立,他们不能直接相互通信, 所以需要EventLoop将Channel和Poller联系起来一起工作。

接下来介绍 eventloop Eventloop中有一个chanellist。这其中包含着所有的channel, Event Loop不断通过循环检测channel list来判断哪一个 fd的事件准备就绪,而Channellist事件是否就绪是由poller来写入的。所以整个事件的流程就是,一个用户建立好的链接被封装成channel,然后将这个Channel注册进poller,当这个事件就绪时, Poller的epoll返回一个事件就绪的文件描述符或者套接字,然后poller通过这个套接字找到这个channel,再将这个Channel在eventloop中的ChannelList的中改为活跃事件已发生,那么Event Loop就会执行channel的回调,直到触发用户所设置的回调

而muduo网络库中针对每个, Event Loop都会安排一个子线程单独的来运行,,就是EventLoopThread是事件循环线程池,可以通过get next loop算法获取下一个subLoop,而新创建的线程池中的子线程是不工作的,而用户只需要初始化一个 MainLoop来派生剩下所有的子线程下的Loop,所以需要有一个wakeUpFd来唤醒线程使loop来工作, muduo库中也将Wake Up封装成了一个WakeupChannel并注册进了Poller,并且设置为监听读操作,当需要唤醒一个子线程时,只需要向这个wakeUp中写入一个数据,就会直接进行回调,唤醒这个子线程,运行普通用户读写操作的eventlLoop循环。

Acceptor主要封装listenFd的相关操作,主要是创建Listen绑定Listen打包成AcceptorChannel注册给Poller,然后通过mainLoop监听,而tcpConnection表示一个成功连接的客户端套接字,它封装了一个SocketFd,并封装成了一个Channel注册进了Poller,负责发送和接收数据,

最终就是tcpServer。他管理了所有组件,包括AcceptorChannel, EventLoopThreadPool,当Acceptor得到一个新用户链接之后,会将新用户打包封装成一个tcpConnection,并设置回调函数,然后再调用 getSubLoop获取一个子线程,然后通过Map保存所有链接。

6.梳理muduo库Tcp编程的工作流程

6.1监听流程

当有新用户链接时,底层的epoll会触发readcallback回调

而这个回调是由headleread绑定的如图:

而这个headleRead调用newConnectionCallback

而newconnection是由setnewconnection设置的

而setnewConnection由TcpServer传入绑定的newConnection

而newconnection最后将新建的套接字打包成TcpConnection进入subLoop运行

6.2数据读写流程

当有数据通信时,同样的epoll底层调用相应的回调,

从TcpConnection中可以看到,channel中绑定了对应读写的handRead和headWrite

点开其中一个可以看到,分别是读数据处理和写数据处理,而messageCallback是再acceptorChannel注册时设置过的。

6.3链接异常或关闭流程

若链接断开以后或有异常则调用shutDown。底层的channel会响应closeCallBack

同样的绑定的是hand回调函数

关闭channel向poller注册的事件,并执行connectionCallback提示连接关闭,,再次调用closeCallback

closeCallback由TcpConnection的setClose设置

所以调用的是removeConnection知道关闭连接