IO多路复用模型

IO多路复用就是用一个线程/进程处理多个TCP以减少系统开销。

IO多路复用模型有三种:select(1024),poll(数千),epoll(百万)。

重点概念:

  • 网络通讯的读事件:

    1. 已连接的队列中已经有准备好的socket(有新的客户端连接上来)
    2. 接收缓存中有数据可以读(对端发送的报文已到达)
    3. tcp连接已断开(对端调用close()函数关闭了连接)
  • 网络通讯的写事件:

    发送缓冲区没有满,可以写入数据(可以向对端发送报文)

1. select模型

1
2
//select模型需要调用select函数
int select(int fds,fd_set *readfds, fd_set *writefds,fd_set* exceptfds,struct timeval*timeout);

select():允许程序同时监视多个文件描述符,检测他们的变化(如数据可读可写),从而高效的管理多个I/O操作,而不需要单独为每个操作创建独立的线程或进程,同时也支持非阻塞IO。

  • fd_set:这个结构体是用来存储链接的集合,本质是一个32位的整形数组,所以共有$4832=1024$bitmap位

    其中提供了四个宏来操作位图:

    1. void FD_CLR(int fd,fd_set *set):将对应socket从位图中删除
    2. int FD_ISSET(int fd,fd_set *set):判断socket是否在位图中,如果不在返回0,如果在返回大于0的值
    3. void FD_SET(int fd,fd_set *set):将socket加入到对应位图中
    4. FD_ZERO(fd_set* set):初始化位图,将其中1024个位置都置为0
    1
    2
    3
    fd_set readfds; //需要监视的读事件的socket集合,大小为16字节(1024位)的bitmap
    FD_ZERO(&readfds); //初始化readfds
    FD_SET(listensock,&readfds);//把服务端监听的socket加入bitmap
  • timeval:是超时时间的结构体:

    成员有:tv_sectv_usec分别是超时的秒和微秒

    1
    2
    3
    struct timeval timeout;
    timeout.tv_sec = 10;//秒
    timeout.tv_usec = 0;//微秒
  • fds:实际操作的bitmap的大小+1,告诉要操作的bitmap

  • readfds:需要监视的读的bitmap

  • writefds:需要监视的写的bitmap

  • exceptfds:需要监视的异常的bitmap,select也可以监视普通IO,在监视普通IO时多用网络编程中不常用。

  • timeout:超时时间阈值设置

    1. 如果为NULL,那么select会无限等待直到至少有一个文件描述符就绪
    2. 如果两个参数都设置为0,select会立即返回用于轮询
    3. 设置具体时间后,在这段时间内select会等待文件描述符就绪,时间结束会返回超时报错。

使用:

1
2
3
4
int maxfd = listensock;//获取readfds中实际的最大值
fd_set tmpfds = readfds;//因为在select中位图会被操作修改所以这里要复制一份再传入
int infds = select(maxfd+1,&tmpfds,NULL,NULL,&timeout);
//tmpfds即告诉需要监视的读的bitmap,这里会修改bitmap所以需要复制一份

select返回值:

  • 如果是小于零则调用失败

  • 如果是等于0则是超时

  • 如果成功调用则是返回成功发生事件的个数

    具体使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    for (int eventfd=0;eventfd<=maxfd;eventfd++){//循环遍历bitmap
    if(FD_ISSET(eventfd,&tmpfds)==0)continue;//如果当前位置为0则没任何事件

    //如果是listen发生了事件则是说明有客户端连接上来了(已经有准备好的socket)
    if(eventfd == listensock){
    struct sockaddr_in client;
    socklen_t len =sizof(client);
    int clientsock = accept(listensock,(struct sockaddr*)&client,&len);
    if(listensock < 0){
    perror("accept() faild");
    continue;
    }
    std::cout << "accept client(socket ="<< clientsock<<") ok"<<std::endl;
    FD_SET(clientsock,&readfds); //把新连接上来的客户端标志位设置为1
    if(maxfd < clientsock) maxfd = clientsock;//更新maxfd
    }else{//如果是客户端连接上的socket有事件,表示接收缓存中有数据可以读,或者有客户端断开连接
    char buffer[1024];
    memset(buffer,0,sizeof(buffer));
    if(recv(eventfd,buffer,sizeof(buffer),0)<=0){
    std::cout << "client(eventfd = "<< eventfd<<")disconnected"<<std::endl;
    close(eventfd);
    FD_CLR(eventfd,&readfds);

    if(eventfd == maxfd)
    for(int ii =maxfd;ii>0;ii--)
    if(FD_ISSET(ii,&readfds)){
    maxfd = ii;
    break;
    }
    }else{
    //如果有报文发送过来
    std::cout << "recv(eventfd="<< eventfd<<"):"<<buffer<<std::endl;
    send(eventfd,buffer,strlen(buffer),0);//暂时把报文发送回去
    }
    }
    }
  • select-写事件:

    • 如果tcp发送缓存区没有满,那么,socket连接是可写的。
    • 一般来说,发送缓冲区不容易被填满。
    • 如果发送数据量太大,或者网络带宽不够,发送缓冲区有填满的可能性。
    • 综上所述,一般业务中不需要关心监听写事件
  • select-水平触发:

    • select()监视的socket如果发生了事件,select()会返回(通知应用程序处理事件)。
    • 如果事件没有被处理,再次调用select()的时候会立即再通知。
  • select-压力测试:大概可以处理12w个事务

  • select-存在的问题:

    • 采用轮询方式扫描bitmap,性能会随着socket数量增多而下降
    • 每次调用select(),需要拷贝bitmap
    • bitmap的大小(单个进程/线程打开的socket数量)由FD_SETSIZE宏设置,默认是1024个,可以修改,但是效率将更低。

2. poll模型

poll模型大体与select有些相似,再poll模型中要使用pollfd来存放需要监视的socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
pollfd fds[1024];

//pollfd结构体具体如下:
struct pollfd{
int fd;//要监视的socket
short events; //
short revents; //当有事件发生时会修改此成员
}
//初始化使用以下方法:
for(int i = 0;i<1024;i++)
fds[i].fd = -1;

//让poll监视事件
fds[listensock].fd = listensock;
fds[listensock].events = POLLIN;//读时间
/*fds[listensock].events =
基本只用得到:
POLLIN 读事件
POLLOUT 写事件
如果既需要读事件也需要写事件,可以如下使用;
fds[listensock].events = POLLIN|POLLOUT;
*/

/*
详解:
events和revents共同可用:
读事件:
POLLIN 读普通或优先级数据
POLLRDNORM 读普通数据
POLLRDBAND 读优先级数据
POLLPRI 读高优先级数据
写事件
POLLOUT 写普通数据
POLLWRNORM 写普通数据
POLLWRBAND 写优先级数据

仅revents可用:
异常:
POLLERR 错误
POLLHUP 挂起
POLLNAV 文件描述符未打开
*/

poll的具体使用

1
int poll(struct pollfd* nfds,nfds_t,int timeout);
  • nfds:结构体数组地址
  • nfds_t:结构体最大数+1
  • timeout:超时时间(ms)

具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//初始化服务器监听端口获取listensock

pollfd fds[1024];

//初始化数组
for(int i=0;i <=1024;i++)
fds[i].fd=-1;

//让poll监视listensock事件此处位置可以自由选择,也可以让监听事件顶格排布,也可以直接按数值位置排取决于自己
fds[listensock].fd = listensock;
fds[listensock].events = POLLIN;

int maxfds = listensock; //需要监视sock的实际大小

while(true){
int infds = poll(fds,maxfd+1,10000);

//如果infds < 0,表示调用失败
if(infds < 0)
{
perror("poll()");
break;
}

//如果infds = 0为超时
if(infds == 0)
{
perror("poll(),timeout");
break;
}
//大于0时是有事件发生
for(int eventfd = 0;eventfd<=maxfd;eventfd++){
if(fds[eventfd].fd < 0)continue;//忽略负fd
if(fds[eventfd].events & POLLIN == 0)continue;//没有读事件,跳过
if(eventfd == listensock){
struct sockaddr_in client;
socklen_t len = sizeof(client);
int clientsock = accept(listensock,(struct sockaddr*)&client,&len);
if(clientsock < 0){perror("accept() failed");continue;}

printf("accept client(socket = %d)ok.\n",clientsock);

//修改clientsock位置元素
fds[clientsock].fd =clientsock;
fds[clientsock].events = POLLIN;

if(maxfd<clientsock) maxfd = clientsock;
}else{

char buffer[1024];
memset(buffer,0,sizof(buffer));
if(recv(eventfd,buffer,sizeof(buffer),0)<=0){
//如果客户端连接已断开
printf("client(eventfd= %d) disconnected.\n",eventfd);

close(eventfd);
fds[eventfd].fd=-1;
//重新计算maxfd
if(eventfd == maxfd)
for(int i=0;i>0;i--)
if(fds[i].fd!=-1)
{
maxfd = i;
break;
}
}else{
//如果有报文
printf("recv(eventfd = %d):%s\n",eventfd,buffer);
send(eventfd,buffer,strlen(buffer),0);
}
}

}

}
  • poll模型缺点:
    • 在程序中,poll的数据结构是数组,传入内核后转换成了链表
    • 每调用一次select()需要拷贝两次bitmap,poll拷贝一次结构体数组
    • poll监视没有1024的显示,但是同样是遍历的方式,所以依旧是监视的socket越多,效率越低

3. epoll模型

epoll模型解决了上面两个模型的痛点,再效率方面有了极大提升,根据以下介绍顺序逐步解释并且使用epoll

epoll的使用:

1
int epoll_create(); //创建epoll句柄

epoll_create()在linux2.6.8版本之前有一个int size的参数,在此之后就没有了被忽略掉了,填任意大于0的数即可

epoll的数据结构epoll_event:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct epoll_event{
uint32_t events; //事件
epoll_data_t data;//用户数据变量
};
/*
epoll的events与poll类似有:

EPOLLIN:可读事件。当接收缓冲区有数据可读时触发,例如套接字有新数据或管道写端关闭。
EPOLLOUT:可写事件。当发送缓冲区有空间可写时触发,适用于非阻塞连接完成或数据发送。
实际开发中以上两个常用,下列不常用

EPOLLERR:错误事件。文件描述符发生错误时自动触发。
EPOLLHUP:挂起事件。通常表示连接被对端关闭。
EPOLLRDHUP:对端关闭事件。用于检测 TCP 半关闭状态。
EPOLLPRI:紧急数据事件。适用于带外数据(如 TCP 紧急数据)。
EPOLLET:边缘触发模式,仅在状态变化时触发,适合高性能场景。
EPOLLONESHOT:单次触发模式,事件触发后需手动重新注册。
*/
/*用户数据变量的具体定义如下
typedef union epoll_data{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
这里使用了哪种数据类型后续使用时就需要使用哪种
*/

epoll_event的使用:

1
2
3
4
//为服务端的listensock准备可读事件
epoll_event ev; //声明事件的数据结构
ev.data.fd = listensock;//指定事件的自定义数据,会随着epoll_event一起返回。
ev.events =EPOLLIN;//让epoll监视listensock事件

epoll_ctl的使用

1
2
3
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);//epoll_ctl定义
//oll_ctl的使用
epoll_ctl(epollfd,EPOLL_CTL_ADD,listensock,&ev);//把需要监视的socket加入epollfd
  • epfd:为创建的epoll句柄
  • op:为操作类型:
    • EPOLL_CTL_ADD:添加
    • EPOLL_CTL_DEL:删除
    • EPOLL_CTL_MOD:修改
  • fd:需要操作的目标文件符
  • event:需要监控的事件类型

最后声明一组epoll_events来接收返回事件:

1
epoll_event evs[10];//存放epoll返回的事件

后续使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
while(true){
int infds = epoll_wait(epollfd,evs,10,-1);

//返回失败
if(infds < 0 ){
perror("epoll()failed");
break;
}

//超时
if(infds == 0){
perror("epoll() timeout");
break;
}

for(int ii =0;ii<=infds;ii++){//遍历返回数组
if(evs[ii].data.fd == listensock){

struct sockaddr_in client;
socklen_t len = sizeof(client);
int clientsock = accept(listensock,(struct sockaddr*)&client,&len);

printf("accept client(socket = %d)ok.\n",clientsock);

//为新客户端准备可读事件
ev.data.fd =clientsock;
ev.events = EPOLLIN;
epoll_ctl(epollfd,EPOLL_CTL_ADD,clientsock,&ev);
}else{
//客户端有事件
char buffer[1024];
memset(buffer,0,sizeof(buffer));
if(recv(evs[ii].data.fd,buffer,sizeof(buffer),0)<=0)
{
printf("client(eventfd = %d) disconnected.\n",evs[ii].data.fd);
close(evs[ii].data.fd); //关闭客户端的socket
//如果socket被关闭了会自动从epoll句柄中被删除,以下代码不必使用
//epoll_ctl(epollfd,EPOLL_CTL_DEL,evs[ii].data.fd,0);
}else{
printf("recv(eventfd = %d):%S\n",evs[ii].data.fd,buffer);
send(evs[i].data.fd,buffer,strlen(buffer),0);
}
}
}

}