关于Linux IO多路复用那些事

关于I/O多路复用那些事

Q1:为什么需要I/O多路复用?[1]

A1:

  • TCP Socket只能实现一对一通信 ==> 同步阻塞(连接请求饥饿性):read阻塞直到请求数据传送过来并write

    🤔 如果没有I/O多路复用,那如何更好服务更多的用户?

    • 一个请求对应一个线程 ==> 维护一个线程池
    • 线程池的缺点:1)如何确定预先分配的线程池中线程的数量?如果分配少了,但连接数激增,可能会导致程序直接崩溃或者上下文切换的开销太大导致程序运行效率很低;如果分配过多,但连接数很少,会导致已分配的线程得不到利用而造成资源浪费
  • I/O(时分)多路复用:一个线程维护多个Socket


Q2:如何进行I/O多路复用?

A2:

  • 使用一个操作系统机制来轮询一组文件描述符,在Linux中,主要有三种操作:selectpollepoll

  • Linux(Unix)操作系统内核提供给用户态的多路复用系统调用(select、poll、epoll),进程可以通过一个系统调用函数从内核中获取多个事件


E1:Select系统调用

  • 函数签名[3-4]:

    1
    2
    3
    4
    5
    6
    7
    int select(
    int nfds, // 所允许设置的最大文件描述符值+1
    fd_set *readfds, // 读文件描述符集合(select返回后,readfds仅保留那些准备读取的文件描述符)
    fd_set *writefds, // 写文件描述符集合(select返回后,readfds仅保留那些准备写的文件描述符)
    fd_set *exceptfds, // 监控异常条件(exceptional conditions)
    struct timeval *timeout // timeval结构体,用来描述超时信息(全0表示立刻返回,为NULL表示无限等待)
    );
  • select()调用接触阻塞的条件:

    • 任意一个文件描述符准备好了
    • 调用被一个信号处理器所中断
    • 超时 [由参数timeout控制]
  • 与fd_set操作有关的宏:

    • **FD_ZERO()**:清空set
    • **FD_SET()**:将fd添加到set中
    • **FD_CLR()**:将fd从set中移除
    • **FD_ISSET()**:可以在调用select()之后,检查某一个文件描述符是否仍然在一个集合中,返回非0表示该文件描述符在集合中,否则返回0
  • pselect()

    • 函数签名:

      1
      2
      3
      4
      5
      int pselect(int nfds, fd_set *_Nullable restrict readfds,
      fd_set *_Nullable restrict writefds,
      fd_set *_Nullable restrict exceptfds,
      const struct timespec *_Nullable restrict timeout,
      const sigset_t *_Nullable restrict sigmask);
    • 提供额外一个信号掩码操作,允许更安全的调用select

    • 与select()的区别:

      (1) 超时参数的结构体不一样:在select()中是struct timeval(秒+毫秒),而在pselect()中是struct timespec(秒+纳秒)

      (2) 是否修改timeout值:select()可能会将timeout值更新为还剩下的时间,而pselect()则不会修改

      (3) sigmask参数

    • 与select()之间的等价表述:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      ready = pselect(nfds, &readfds, &writefds, &exceptfds,
      timeout, &sigmask);
      // Same as ---|:
      // v
      sigset_t origmask;

      pthread_sigmask(SIG_SETMASK, &sigmask, &origmask);
      ready = select(nfds, &readfds, &writefds, &exceptfds, timeout);
      pthread_sigmask(SIG_SETMASK, &origmask, NULL);
  • 🌰[2]

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    #include <stdio.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <wait.h>
    #include <signal.h>
    #include <errno.h>
    #include <sys/select.h>
    #include <sys/time.h>
    #include <unistd.h>

    #define MAXBUF 256

    void child_process(void)
    {
    sleep(2); // make sure that server is started
    char msg[MAXBUF];
    struct sockaddr_in addr = {0};
    int n, sockfd, num=1;
    srandom(getpid());
    /* Create socket and connect to server */
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    addr.sin_family = AF_INET;
    addr.sin_port = htons(2000);
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    connect(sockfd, (struct sockaddr*)&addr, sizeof(addr));

    printf("child {%d} connected \n", getpid());
    while(1){ // dead loop
    int sl = (random() % 10) + 1;
    num++;
    sleep(sl);
    sprintf (msg, "Test message %d from client %d", num, getpid());
    n = write(sockfd, msg, strlen(msg)); /* Send message */
    }

    }

    int main()
    {
    char buffer[MAXBUF];
    int fds[5];
    struct sockaddr_in addr;
    struct sockaddr_in client;
    int addrlen, n,i,max=0;
    int sockfd, commfd;
    fd_set rset;
    for(i=0;i<5;i++)
    {
    if(fork() == 0)
    {
    child_process();
    exit(0);
    }
    }

    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    memset(&addr, 0, sizeof (addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(2000);
    addr.sin_addr.s_addr = INADDR_ANY;
    bind(sockfd,(struct sockaddr*)&addr, sizeof(addr));
    listen (sockfd, 5);

    for (i=0;i<5;i++)
    {
    memset(&client, 0, sizeof (client));
    addrlen = sizeof(client);
    fds[i] = accept(sockfd,(struct sockaddr*)&client, &addrlen);
    if(fds[i] > max)
    max = fds[i];
    }

    while(1){
    FD_ZERO(&rset);
    for (i = 0; i< 5; i++ ) {
    FD_SET(fds[i],&rset);
    }

    puts("round again");
    select(max+1, &rset, NULL, NULL, NULL);

    for(i=0;i<5;i++) {
    if (FD_ISSET(fds[i], &rset)){ // 如果文件描述符在这个集合里,意味着该文件描述符可读
    memset(buffer,0,MAXBUF);
    read(fds[i], buffer, MAXBUF);
    puts(buffer);
    }
    }
    }
    return 0;
    }
  • 🤔 这里为什么需要传入nfds并告诉select()当前所有文件描述符中最大值+1?

    A: 与fd_set的实现有关:每个fd表示为一个比特,fd_set是一个32个整数的数组,共1024比特;我们进一步假设有5个文件描述符,但最高的文件描述符值是900,那么函数将会检查从0到900的比特而不需要检查到1024

小结

  • 在每一次调用之前,需要构造每一个集合(三个集合)
  • 函数逐位检查到最大文件描述符值 - 复杂度:O(n)
  • 需要在select调用之后检查文件描述符是否在select返回的集合中,如果在,则进行相应的处理
  • select最多只能监听1024个文件描述符(底层使用BitsMap实现)

E2:Poll系统调用

  • 函数签名[5]:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    int poll(
    struct pollfd *fds,
    nfds_t nfds, // fds数组元素个数!(注意这里与select()函数的区别)
    int timeout
    );
    int ppoll(
    struct pollfd *fds, nfds_t nfds,
    const struct timespec *_Nullable tmo_p,
    const sigset_t *_Nullable sigmask
    );
    /* struct pollfd */
    struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events */
    short revents; /* returned events */
    };

  • poll()调用阻塞接触逻辑与select()一样

  • 在events和revents(poll.h)中定义的比特:

    • POLLIN:有数据要读
    • POLLPRI:文件描述符存在一些异常,可能包括:TCP socket中有带外数据等
    • POLLOUT:可写
    • POLLRDHUP:流socket关闭连接 or 连接途中关闭写操作
    • POLLERR:错误条件 *
    • POLLHUP:挂起 *
    • POLLNVAL:无效请求,fd未打开 *

    ps: * 表示只会出现在revents中

  • 🌰 将上述select()示例代码进行相应的修改

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    #include <poll.h>
    ...
    struct pollfd pollfds[5];
    ...
    for (i=0;i<5;i++)
    {
    memset(&client, 0, sizeof (client));
    addrlen = sizeof(client);
    pollfds[i].fd = accept(sockfd,(struct sockaddr*)&client, &addrlen);
    pollfds[i].events = POLLIN;
    }
    sleep(1);
    while(1){
    puts("round again");
    poll(pollfds, 5, 50000);

    for(i=0;i<5;i++) {
    if (pollfds[i].revents & POLLIN){
    pollfds[i].revents = 0;
    memset(buffer,0,MAXBUF);
    read(pollfds[i].fd, buffer, MAXBUF);
    puts(buffer);
    }
    }
    }
    ...

    运行结果:

小结

**poll() 🆚 select(): **

  • poll()不需要使用者计算最大文件描述符值+1的值。

  • poll()对于大数值的文件描述符来说更有效。仍然假设最大值为900的五个文件描述符,poll()仅需遍历5次,而使用select()则需要遍历900次

  • select()的文件描述符集合是定长的(1024)

  • select()中,文件描述符集合在返回时重新构建,因此随后每一次调用时都需要重新初始化他们:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
     while(1){
    FD_ZERO(&rset);
    for (i = 0; i< 5; i++ ) {
    FD_SET(fds[i],&rset); // <================= 重新初始化!
    }

    puts("round again");
    select(max+1, &rset, NULL, NULL, NULL);

    for(i=0;i<5;i++) {
    if (FD_ISSET(fds[i], &rset)){ // 如果文件描述符在这个集合里,意味着该文件描述符可读
    memset(buffer,0,MAXBUF);
    read(fds[i], buffer, MAXBUF);
    puts(buffer);
    }
    }
    }

    poll()将输入(events字段)和输出(revents字段)分隔开,允许数组重新使用(这里可以理解为生产者消费者模式):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    while(1){
    puts("round again");
    poll(pollfds, 5, 50000); // <================== 生产revent

    for(i=0;i<5;i++) {
    if (pollfds[i].revents & POLLIN){
    pollfds[i].revents = 0; // <================= 消费了,清空revent
    memset(buffer,0,MAXBUF);
    read(pollfds[i].fd, buffer, MAXBUF);
    puts(buffer);
    }
    }
    }
  • select()更加具有可移植性,因为某些Unix系统不支持poll()

🤔 再深入思考一下,select()poll()最本质的区别其实是数据结构上的调整,select()使用了三个数组来分别维护读描述符、写描述符和异常处理描述符,而poll()仅使用了一个数组来维护文件描述符,但是使用了自定义的一个名为pollfds的结构体存储文件描述符信息,而该结构体中的event字段描述了文件描述符的类型

E3: Epoll系统调用

  • 函数签名[7-9]:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    #include <sys/epoll.h>

    int epoll_create(int size);
    int epoll_create1(int flags);

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *_Nullable event);

    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
    int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *_Nullable sigmask);
    int epoll_pwait2(int epfd, struct epoll_event *events, int maxevents,
    const struct timespec *_Nullable timeout, const sigset_t *_Nullable sigmask);

    // event structure
    struct epoll_event {
    uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    };

    union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
    };
  • epoll API核心概念是epoll实例,其是一个内核数据结构,从用户空间的角度来看,它可以被看做是两个列表的容器[6]:

    • interest list(有时也被称作epoll set):进程注册监视的文件描述符集合
    • ready list:已经准备好用于I/O操作的文件描述符集合,是interest list的一个子集
  • 创建并管理epoll实例的系统调用有:

    1. epoll_create(epoll_create1):在内核中创建一个上下文(i.e. 实例)
    2. epoll_ctl:从上下文中添加或者删除文件描述符
    3. epoll_wait: 等待上下文中的事件
  • 水平触发(Level-triggered,LT) 和 边缘触发(edge-triggered,ET

    • 两种事件触发机制

    • 假设一个场景:

      1. 在管道的方的文件描述符 (rfd) 已经被注册到epoll实例中
      2. 在管道另一个写方往管道中写入2KB的数据
      3. 调用epoll_wait(),其将返回rfd作为就绪文件描述符
      4. 管道读方从rfd中读入1KB数据
      5. 再次调用epoll_wait()

      如果在上述场景中rfd文件描述符添加了EPOLLET(边缘触发)的标志,那么在Step 5中的epoll_wait()将会被阻塞,尽管管道中还有1KB字节未读。原因在于边缘触发仅在被监控的文件描述符发生改变时才会传递事件;当使用一个水平触发机制时(默认,不指定EPOLLET),服务器端不断地从epoll_wait()调用中唤醒,直到内核缓冲区被读完才结束。这样能保证让我们知道有哪些数据需要读取。

  • 🌰 修改上述的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    #include <sys/epoll.h>
    ...

    struct epoll_event events[5];
    int epfd = epoll_create(10);
    ...
    ...
    for (i=0;i<5;i++)
    {
    static struct epoll_event ev;
    memset(&client, 0, sizeof (client));
    addrlen = sizeof(client);
    ev.data.fd = accept(sockfd,(struct sockaddr*)&client, &addrlen);
    ev.events = EPOLLIN;
    epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev); // 将事件添加到epoll实例中
    }

    while(1){
    puts("round again");
    nfds = epoll_wait(epfd, events, 5, 10000); // wait

    for(i=0;i<nfds;i++) { // 遍历就绪事件,就绪事件已更新到events中,取出相应的文件描述符
    memset(buffer,0,MAXBUF);
    read(events[i].data.fd, buffer, MAXBUF);
    puts(buffer);
    }
    }

小结

Epoll() 🆚 Select()/Poll()

  • select()/poll()仅提供一个API(当然也提供了一些宏定义),而epoll()提供了三个API

  • [10] select() poll() epoll()
    最大连接数 1024
    【数组】
    与操作系统有关
    【链表】
    与操作系统有关
    FD激增后带来的I/O效率问题 O(n) 线性下降 O(n) 线性下降 与活跃socket数据有关
    消息传递方式 内核需要将消息传递到用户空间,
    都需要内核拷贝动作
    select() 通过内核和用户空间共享一块内存来实现的
    mmap()文件映射内存加速与内核空间的消息传递
    遍历就绪文件描述符的方式 基于轮询机制 基于轮询机制 基于操作系统的I/O通知机制
  • epoll()仅支持linux,跨平台可能存在不兼容的问题

  1. I/O多路复用的三种实现 - 掘金 (juejin.cn)
  2. Linux – IO Multiplexing – Select vs Poll vs Epoll – Developers Area (devarea.com)
  3. select(2): synchronous I/O multiplexing - Linux man page (die.net)
  4. select(2) - Linux manual page (man7.org)
  5. poll(2) - Linux manual page (man7.org)
  6. https://man7.org/linux/man-pages/man7/epoll.7.html
  7. https://man7.org/linux/man-pages/man2/epoll_create.2.html
  8. https://man7.org/linux/man-pages/man2/epoll_ctl.2.html
  9. https://man7.org/linux/man-pages/man2/epoll_wait.2.html
  10. select、poll和epoll_epoll最大连接数_Cudi_的博客-CSDN博客

关于Linux IO多路复用那些事
http://bladchan.github.io/2023/07/10/关于Linux IO多路复用那些事/
作者
bladchan
发布于
2023年7月10日
许可协议