用户空间与内核空间

现代操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 Linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到 0xBFFFFFFF),供各个进程使用,称为用户空间。

现代的网络服务的主流已经完成从 CPU 密集型到 IO 密集型的转变,所以服务端程序对 I/O 的处理必不可少,而一旦操作 I/O 则必定要在用户态和内核态之间来回切换。

I/O 模型

在神作《UNIX 网络编程》里,总结归纳了 5 种 I/O 模型,包括同步和异步 I/O:

  • 阻塞 I/O (Blocking I/O)
  • 非阻塞 I/O (Nonblocking I/O)
  • I/O 多路复用 (I/O multiplexing)
  • 信号驱动 I/O (Signal driven I/O)
  • 异步 I/O (Asynchronous I/O)

操作系统上的 I/O 是用户空间和内核空间的数据交互,因此 I/O 操作通常包含以下两个步骤:

  1. 等待网络数据到达网卡(读就绪)/等待网卡可写(写就绪) –> 读取/写入到内核缓冲区
  2. 从内核缓冲区复制数据 –> 用户空间(读)/从用户空间复制数据 -> 内核缓冲区(写)

而判定一个 I/O 模型是同步还是异步,主要看第二步:数据在用户和内核空间之间复制的时候是不是会阻塞当前进程,如果会,则是同步 I/O,否则,就是异步 I/O。基于这个原则,这 5 种 I/O 模型中只有一种异步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。

这 5 种 I/O 模型的对比如下:

在描述这块内容的诸多书籍中,很多都只说笼统的概念,我们将问题具体化,暂时只考虑服务器端的网络I/O情形。我们假定目前的情形是服务器已经在监听用户请求,建立连接后服务器调用read()函数等待读取用户发送过来的数据流,之后将接收到的数据打印出来。

所以服务器端简单是这样的流程:建立连接 -> 监听请求 -> 等待用户数据 -> 打印数据。我们总结网络通信中的等待:

  1. 建立连接时等待对方的ACK包(TCP)。

  2. 等待客户端请求(HTTP)。

  3. 输入等待:服务器用户数据到达内核缓冲区(read函数等待)。

  4. 输出等待:用户端等待缓冲区有足够空间可以输入(write函数等待)。

另外为了能够解释清楚网络I/O模型,还需要了解一些基础。对服务器而言,打印出用户输入的字符串(printf函数)和从网络中获取数据(read函数)需要单独来看。服务器首先accept用户连接请求后首先调用read函数等待数据,这里的read函数是系统调用,运行于内核态,使用的也是内核地址空间,并且从网络中取得的数据需要先写入到内核缓冲区。当read系统调用获取到数据后将这些数据再复制到用户地址空间的用户缓冲区中,之后返回到用户态执行printf函数打印字符串。我们需要明确两点:

read执行在内核态且数据流先读入内核缓冲区;printf运行于用户态,打印的数据会先从内核缓冲区复制到进程的用户缓冲区,之后打印出来。

printf函数一定是在read函数已经准备好数据之后才能执行,但read函数作为I/O操作通常需要等待而触发阻塞。调用read函数的是服务器进程,一旦被read调用阻塞,整个服务器在获取到用户数据前都不能接受任何其他用户的请求(单进程/线程)。

有了上面的基础,我们就可以介绍下面五种网络I/O模型。

阻塞 I/O

阻塞表示一旦调用I/O函数必须等整个I/O完成才返回。正如上面提到的那种情形,当服务器调用了read函数之后,如果不是立即接收到数据,服务器进程会被阻塞,之后一直在等待用户数据到达,用户数据到达后首先会写进内核缓冲区,之后内核缓冲区数据复制到用户进程(服务器进程)缓冲区。完成了上述所有的工作后,才会把执行权限返回给用户(从内核态 -> 用户态)。

很显然,阻塞式I/O的效率实在太低,如果用户输入数据迟迟不到的话,整个服务器就会一直被阻塞(单进程/线程)。为了不影响服务器接收其他进程的连接,我们可以考虑多进程模型,这样当服务器建立连接后为连接的用户创建新线程,新线程即使是使用阻塞式I/O也仅仅是这一个线程被阻塞,不会影响服务器等待接收新的连接。

多线程模型下,主线程等待用户请求,用户有请求到达时创建新线程。新线程负责具体的工作,即使是因为调用了read函数被阻塞也不会影响服务器。我们还可以进一步优化创建连接池和线程池以减小频繁调用I/O接口的开销。但新问题随之产生,每个新线程或者进程(加入使用对进程模型)都会占用大量系统资源,除此之外过多的线程和进程在调度方面开销也会大很对,所以这种模型并不适合大并发量。

非阻塞 I/O

什么叫非阻塞 I/O,顾名思义就是:所有 I/O 操作都是立刻返回而不会阻塞当前用户进程。I/O 多路复用通常情况下需要和非阻塞 I/O 搭配使用,否则可能会产生意想不到的问题。比如,epoll 的 ET(边缘触发) 模式下,如果不使用非阻塞 I/O,有极大的概率会导致阻塞 event-loop 线程,从而降低吞吐量,甚至导致 bug。

Linux 下,我们可以通过 fcntl 系统调用来设置 O_NONBLOCK 标志位,从而把 socket 设置成 Non-blocking。

1
fcntl(fd, F_SETFL, O_NONBLOCK);

当对一个 Non-blocking socket 执行读操作时,流程是这个样子:

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 EAGAIN error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,Non-blocking I/O 的特点是用户进程需要不断的主动询问 kernel 数据好了没有。

阻塞和非阻塞最大的区别在于调用I/O系统调用后,是等整个I/O过程完成再把操作权限返回给用户还是会立即返回。

非阻塞I/O在调用后会立即返回,用户进程对返回的返回值判断以区分是否完成了I/O。如果返回大于0表示完成了数据读取,返回值即读取的字节数;返回0表示连接已经正常断开;返回-1表示错误,接下来用户进程会不停地询问kernel是否准备完毕。

非阻塞I/O虽然不再会完全阻塞用户进程,但实际上由于用户进程需要不停地询问kernel是否准备完数据,所以整体效率依旧非常低,不适合做并发。

下一节我们要讲的 I/O 多路复用需要和 Non-blocking I/O 配合才能发挥出最大的威力!

I/O 多路复用

前面已经论述了多进程、多进程模型会因为开销巨大和调度困难而导致并不能承受高并发量。但不适用这种模型的话,无论是阻塞还是非阻塞方式都会导致整个服务器停滞。

所以对于大并发量,我们需要一种代理模型可以帮助我们集中去管理所有的socket连接,一旦某个socket数据到达了就执行其对应的用户进程,I/O多路复用就是这么一种模型。Linux下I/O多路复用的系统调用有select,poll和epoll,但从本质上来讲他们都是同步I/O范畴。

select,poll,epoll都是IO多路复用的机制。I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O 事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。

IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:

(1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。

(2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。

(3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。

(4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。

(5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

select & poll

相关接口:

1
2
3
4
5
int select (int maxfd, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *timeout);
FD_ZERO(int fd, fd_set* fds) //清空集合
FD_SET(int fd, fd_set* fds) //将给定的描述符加入集合
FD_ISSET(int fd, fd_set* fds) //将给定的描述符从文件中删除
FD_CLR(int fd, fd_set* fds) //判断指定描述符是否在集合中

参数:

1
2
3
4
5
maxfd:当前最大文件描述符的值+1(≠ MAX_CONN)。
readfds:指向读文件队列集合(fd_set)的指针。
writefds:同上,指向读集合的指针。
writefds:同上,指向错误集合的指针。
timeout:指向timeval结构指针,用于设置超时。

其他:

判断和操作对象为set_fd集合,集合大小为单个进程可打开的最大文件数1024或2048(可重新编译内核修改但不建议)。

select 是 epoll 之前 Linux 使用的 I/O 事件驱动技术。

select整体流程如下:

  1. 使用copy_from_user从用户空间拷贝fd_set到内核空间

  2. 注册回调函数__pollwait

    __pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。

  3. 遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll).poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。

  4. 如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout使调用select的进程(也就是current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。

  5. 把fd_set从内核空间拷贝到用户空间。

理解 select 的关键在于理解 fd_set,为说明方便,取 fd_set 长度为 1 字节,fd_set 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fd_set 最大可以对应 8 个 fd。select 的调用过程如下:

  1. 执行 FD_ZERO(&set), 则 set 用位表示是 0000,0000
  2. 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第 5 位置为 1)
  3. 再加入 fd=2, fd=1,则 set 变为 0001,0011
  4. 执行 select(6, &set, 0, 0, 0) 阻塞等待
  5. 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011 (注意:没有事件发生的 fd=5 被清空)

基于上面的调用过程,可以得出 select 的特点:

  • 可监控的文件描述符个数取决于 sizeof(fd_set) 的值。假设服务器上 sizeof(fd_set)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。fd_set 的大小调整可突破 select 可监控的文件描述符上限
  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fd_set 进行 FD_ISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO 最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数
  • 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET 判断是否有事件发生)

所以,select 有如下的缺点:

  1. 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈
  2. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大
  3. 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。

相关接口:

1
int poll(struct pollfd *fds, unsigned int nfds, int timeout);

结构体定义:

1
2
3
4
5
struct pollfd{
	int fd; // 文件描述符
	short events; // 等到的事件
	short revents; // 实际发生的事件
}

参数:

1
2
3
fds:指向pollfd结构体数组的指针nfdspollfd数组当前已被使用的最大下标timeout:等待毫秒数。

其他:

判断和操作对象是元素为pollfd类型的数组,数组大小自己设定,即为最大连接数。

epoll

原理

epoll 是 Linux kernel 2.6 之后引入的新 I/O 事件驱动技术,I/O 多路复用的核心设计是 1 个线程处理所有连接的 等待消息准备好 I/O 事件,这一点上 epoll 和 select&poll 是大同小异的。但 select&poll 错误预估了一件事,当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的。select&poll 的使用方法是这样的: 返回的活跃连接 == select(全部待监控的连接) 。

什么时候会调用 select&poll 呢?在你认为需要找出有报文到达的活跃连接时,就应该调用。所以,select&poll 在高并发时是会被频繁调用的。这样,这个频繁调用的方法就很有必要看看它是否有效率,因为,它的轻微效率损失都会被 高频 二字所放大。它有效率损失吗?显而易见,全部待监控连接是数以十万计的,返回的只是数百个活跃连接,这本身就是无效率的表现。被放大后就会发现,处理并发上万个连接时,select&poll 就完全力不从心了。这个时候就该 epoll 上场了,epoll 通过一些新的设计和优化,基本上解决了 select&poll 的问题。

epoll 的 API 非常简洁,涉及到的只有 3 个系统调用:

相关接口:

1
2
3
int epoll_create(int size); // 创建epoll句柄
int epoll_ctl(int epfd, int op, int fd, structepoll_event *event); // 事件注册函数
int epoll_wait(int epfd, struct epoll_event * events,int maxevents, int timeout);

结构体定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
struct epoll_event{
    __uint32_t events;
    epoll_data_t data;
};
typedef union epoll_data{
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
}epoll_data_t;

参数:

  • size:用来告诉内核要监听的数目。
  • epfd:epoll函数的返回值。
  • op:表示动作(EPOLL_CTL_ADD/EPOLL_CTL_FD/EPOLL_CTL_DEL)。
  • fd:需要监听的fd。
  • events:指向epoll_event的指针,该结构记录监听的事件。
  • maxevents:告诉内核events的大小。

其中,epoll_create 创建一个 epoll 实例并返回 epollfd;epoll_ctl 注册 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 实例上;epoll_wait 则是阻塞监听 epoll 实例上所有的 file descriptor 的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epoll_wait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了:

1
2
3
#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

epoll 的工作原理如下:

与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epoll_ctl 相对来说就是非频繁调用的,而 epoll_wait 则是会被高频调用的。所以 epoll 利用 epoll_ctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epoll_wait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。

在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epoll_ctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候会完成关键的一步:该 fd 会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为: ep_poll_callback ,这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epoll_wait 实际上就是去检查 rdllist 双向链表中是否有就绪的 fd,当 rdllist 为空(无就绪 fd)时挂起当前进程,直到 rdllist 非空时进程才被唤醒并返回。

相比于 select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态,epoll_wait 则是直接返回已就绪 fd,因此 epoll 的 I/O 性能不会像 select&poll 那样随着监听的 fd 数量增加而出现线性衰减,是一个非常高效的 I/O 事件驱动技术。

由于使用 epoll 的 I/O 多路复用需要用户进程自己负责 I/O 读写,从用户进程的角度看,读写过程是阻塞的,所以 select&poll&epoll 本质上都是同步 I/O 模型,而像 Windows 的 IOCP 这一类的异步 I/O,只需要在调用 WSARecv 或 WSASend 方法读写数据的时候把用户空间的内存 buffer 提交给 kernel,kernel 负责数据在用户空间和内核空间拷贝,完成之后就会通知用户进程,整个过程不需要用户进程参与,所以是真正的异步 I/O。

API

int epoll_create(int size):

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

从slab缓存中创建一个eventpoll对象,并且创建一个匿名的fd跟fd对应的file对象, 而eventpoll对象保存在struct file结构的private指针中,并且返回,该fd对应的file operations只是实现了poll跟release操作

创建eventpoll对象的初始化操作

获取当前用户信息,是不是root,最大监听fd数目等并且保存到eventpoll对象中初始化等待队列,初始化就绪链表,初始化红黑树的头结点

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):

epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值.

第二个参数表示动作,用三个宏来表示:

1
2
3
EPOLL_CTL_ADD:注册新的fd到epfd中
EPOLL_CTL_MOD:修改已经注册的fd的监听事件
EPOLL_CTL_DEL:从epfd中删除一个fd

第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

1
2
3
4
struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

events可以是以下几个宏的集合:

  • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
  • EPOLLOUT:表示对应的文件描述符可以写;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
  • EPOLLRDHUP: 表示对端调用close()关闭socket连接或调用shutdown(SHUT_WR)关闭对端的写。

有几点需要注意:

  • 对于EPOLLERR和EPOLLHUP,不需要在epoll_event时针对fd作设置,一样也会触发;
  • EPOLLRDHUP实测在对端关闭时会触发,需要注意的是:
    • 对EPOLLRDHUP的处理应该放在EPOLLIN和EPOLLOUT前面,处理方式应该 是close掉相应的fd后,作其他应用层的清理动作;
    • 如果采用的是LT触发模式,且没有close相应的fd, EPOLLRDHUP会持续被触发;
    • EPOLLRDHUP想要被触发,需要显式地在epoll_ctl调用时设置在events中;
  • 对于EPOLLOUT:有写需要时才通过epoll_ctl添加相应fd,不然在LT模式下会频繁触发;
  • 对于写操作,大部分情况下都处于可写状态,可先直接调用write来发送数据,直到返回 EAGAIN后再使能EPOLLOUT,待触发后再继续write。

如何判断对端关闭:

  • 优先使用上面介绍的EPOLLRDHUP;
  • 使用EPOLLIN, 然后调用read, 此时返回的ssize_t类型结果为0;
  • 对端关闭包括:ctrl + c, kill, kill -9。

将epoll_event结构拷贝到内核空间中,并且判断加入的fd是否支持poll结构(epoll,poll,selectI/O多路复用必须支持poll操作). 并且从epfd->file->privatedata获取event_poll对象,根据op区分是添加删除还是修改, 首先在eventpoll结构中的红黑树查找是否已经存在了相对应的fd,没找到* 就支持插入操作,否则报重复的错误.

相对应的修改,删除比较简单就不啰嗦了

插入操作时,会创建一个与fd对应的epitem结构,并且初始化相关成员,比如保存监听的fd跟file结构之类的.

重要的是指定了调用poll_wait时的回调函数用于数据就绪时唤醒进程,(其内部,初始化设备的等待队列,将该进程注册到等待队列)完成这一步,我们的epitem就跟这个socket关联起来了, 当它有状态变化时, 会通过ep_poll_callback()来通知. 最后调用加入的fd的file operation->poll函数(最后会调用poll_wait操作)用于完成注册操作.最后将epitem结构添加到红黑树中

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

计算睡眠时间(如果有),判断eventpoll对象的链表是否为空,不为空那就干活不睡眠.并且初始化一个等待队列,把自己挂上去,设置自己的进程状态 为可睡眠状态.判断是否有信号到来(有的话直接被中断醒来,),如果啥事都没有那就调用schedule_timeout进行睡眠,如果超时或者被唤醒,首先从自己初始化的等待队列删除,然后开始拷贝资源给用户空间了.拷贝资源则是先把就绪事件链表转移到中间链表,然后挨个遍历拷贝到用户空间,并且挨个判断其是否为水平触发,是的话再次插入到就绪链表

工作模式

LT模式状态时,主线程正在epoll_wait等待事件时,请求到了,epoll_wait返回后没有去处理请求(recv),那么下次epoll_wait时此请求还是会返回(立刻返回了);而ET模式状态下,这次没处理,下次epoll_wait时将不返回(所以我们应该每次一定要处理).

Level Triggered (LT) 水平触发
  1. socket接收缓冲区不为空 有数据可读 读事件一直触发

  2. socket发送缓冲区不满 可以继续写入数据 写事件一直触发

符合思维习惯,epoll_wait返回的事件就是socket的状态

LT的处理过程:

  1. accept一个连接,添加到epoll中监听EPOLLIN|EPOLLOUT事件
  2. 当EPOLLIN事件到达时,read fd中的数据并处理
  3. 当需要写出数据时,把数据write到fd中;
  4. 当EPOLLOUT事件到达时,继续把数据write到fd中;如果数据写出完毕,那么在epoll中关闭EPOLLOUT事件.如果数据较大,无法一次性写出,那么在epoll中监听EPOLLOUT事件.

水平触发的问题:不必要的唤醒

  1. 内核:收到一个新建连接的请求
  2. 内核:由于 “惊群效应” ,唤醒两个正在 epoll_wait() 的线程 A 和线程 B
  3. 线程A:epoll_wait() 返回
  4. 线程B:epoll_wait() 返回
  5. 线程A:执行 accept() 并且成功
  6. 线程B:执行 accept() 失败,accept() 返回 EAGAIN

epoll LT惊群的发生:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 否则会阻塞在IO系统调用,导致没有机会再epoll
set_socket_nonblocking(sd);
epfd = epoll_create(64);
event.data.fd = sd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sd, &event);
while (1) {
    epoll_wait(epfd, events, 64, xx);
    ... // 危险区域!如果有共享同一个epfd的进程/线程调用epoll_wait,它们也将会被唤醒!
    // 这个accept将会有多个进程/线程调用,如果并发请求数很少,那么将仅有几个进程会成功:
    // 1. 假设accept队列中有n个请求,则仅有n个进程能成功,其它将全部返回EAGAIN (Resource temporarily unavailable)
    // 2. 如果n很大(即增加请求负载),虽然返回EAGAIN的比率会降低,但这些进程也并不一定取到了epoll_wait返回当下的那个预期的请求。
    csd = accept(sd, &in_addr, &in_len);
    ...
}

再看一遍LT的描述“如果事件来了,不管来了几个,只要仍然有未处理的事件,epoll都会通知你。”,显然,epoll_wait刚刚取到事件的时候的时候,不可能马上就调用accept去处理,事实上,逻辑在epoll_wait函数调用的ep_poll中还没返回的,这个时候,显然符合“仍然有未处理的事件”这个条件,显然这个时候为了实现这个语义,需要做的就是通知别的同样阻塞在同一个epoll句柄睡眠队列上的进程!在实现上,这个语义由两点来保证:

保证1:在LT模式下,“就绪链表”上取出的epi上报完事件后会重新加回“就绪链表”; 保证2:如果“就绪链表”不为空,且此时有进程阻塞在同一个epoll句柄的睡眠队列上,则唤醒它。

epoll LT模式下有进程被不必要唤醒,这一点并不是内核无意而为之的,内核肯定是知道这件事的,这个并不像之前accept惊群那样算是内核的一个缺陷。epoll LT模式只是提供了一种模式,误用这种模式将会造成类似惊群那样的效应。但是不管怎么说,为了讨论上的方便,后面我们姑且将这种效应称作epoll LT惊群吧。

Edge Triggered (ET) 边沿触发
  1. socket的接收缓冲区状态变化时触发读事件,即空的接收缓冲区刚接收到数据时触发读事件

  2. socket的发送缓冲区状态变化时触发写事件,即满的缓冲区刚空出空间时触发读事件

仅在状态变化时触发事件

ET的处理过程:

  1. accept一个一个连接,添加到epoll中监听EPOLLIN|EPOLLOUT事件
  2. 当EPOLLIN事件到达时,read fd中的数据并处理,read需要一直读,直到返回EAGAIN为止
  3. 当需要写出数据时,把数据write到fd中,直到数据全部写完,或者write返回EAGAIN
  4. 当EPOLLOUT事件到达时,继续把数据write到fd中,直到数据全部写完,或者write返回EAGAIN

边缘触发的问题:不必要的唤醒以及饥饿

不必要的唤醒:

  1. 内核:收到第一个连接请求。线程 A 和 线程 B 两个线程都在 epoll_wait() 上等待。由于采用边缘触发模式,所以只有一个线程会收到通知。这里假定线程 A 收到通知
  2. 线程A:epoll_wait() 返回
  3. 线程A:调用 accpet() 并且成功
  4. 内核:此时 accept queue 为空,所以将边缘触发的 socket 的状态从可读置成不可读
  5. 内核:收到第二个建连请求
  6. 内核:此时,由于线程 A 还在执行 accept() 处理,只剩下线程 B 在等待 epoll_wait(),于是唤醒线程 B
  7. 线程A:继续执行 accept() 直到返回 EAGAIN
  8. 线程B:执行 accept(),并返回 EAGAIN,此时线程 B 可能有点困惑(“明明通知我有事件,结果却返回 EAGAIN”)
  9. 线程A:再次执行 accept(),这次终于返回 EAGAIN

饥饿:

  1. 内核:接收到两个建连请求。线程 A 和 线程 B 两个线程都在等在 epoll_wait()。由于采用边缘触发模式,只有一个线程会被唤醒,我们这里假定线程 A 先被唤醒
  2. 线程A:epoll_wait() 返回
  3. 线程A:调用 accpet() 并且成功
  4. 内核:收到第三个建连请求。由于线程 A 还没有处理完(没有返回 EAGAIN),当前 socket 还处于可读的状态,由于是边缘触发模式,所有不会产生新的事件
  5. 线程A:继续执行 accept() 希望返回 EAGAIN 再进入 epoll_wait() 等待,然而它又 accept() 成功并处理了一个新连接
  6. 内核:又收到了第四个建连请求
  7. 线程A:又继续执行 accept(),结果又返回成功

由于epi entry的callback即ep_poll_callback所做的事情仅仅是将该epi自身加入到epoll句柄的“就绪链表”,同时唤醒在epoll句柄睡眠队列上的task,所以这里并不对事件的细节进行计数,比如说,如果ep_poll_callback在将一个epi加入“就绪链表”之前发现它已经在“就绪链表”了,那么就不会再次添加,因此可以说,一个epi可能pending了多个事件,注意到这点非常重要!

一个epi上pending多个事件,这个在LT模式下没有任何问题,因为获取事件的epi总是会被重新添加回“就绪链表”,那么如果还有事件,在下次check的时候总会取到。然而对于ET模式,仅仅将epi从“就绪链表”删除并将事件本身上报后就返回了,因此如果该epi里还有事件,则只能等待再次发生事件,进而调用ep_poll_callback时将该epi加入“就绪队列”。这意味着什么?

这意味着,应用程序,即epoll_wait的调用进程必须自己在获取事件后将其处理干净后方可再次调用epoll_wait,否则epoll_wait不会返回,而是必须等到下次产生事件的时候方可返回。这会导致事件堆积,所以一般会死循环一直拉取事件,直到拉取不到了再返回。

  1. 对于读操作,如果read没有一次读完buff数据,下一次将得不到就绪通知(ET特性),造成buff中数据无法读出,除非有新数据到达。 解决方法:将套接字设置为非阻塞,用while循环包住read,只要buff中有数据,就一直读。一直读到产生EAGIN错误。

  2. 对于写操作主要因为ET模式下非阻塞需要我们考虑如何将用户要求写的数据写完。 解决方法:只要buff还有空间且用户请求写的数据还未写完,就一直写。

区别

边缘触发会比条件触发更高效一些,因为边缘触发不会让同一个文件描述符多次被处理,比如有些文件描述符已经不需要再读写了,但是在条件触发下每次都会返回,而边缘触发只会返回一次。

ET模式下每次write或read需要循环write或read直到返回EAGAIN错误。以读操作为例,这是因为ET模式只在socket描述符状态发生变化时才触发事件,如果不一次把socket内核缓冲区的数据读完,会导致socket内核缓冲区中即使还有一部分数据,该socket的可读事件也不会被触发.

根据上面的讨论,若ET模式下使用阻塞IO,则程序一定会阻塞在最后一次write或read操作,因此说ET模式下一定要使用非阻塞IO

对端close

对端close时,如果接收缓冲区内已无数据,则走tcp四次挥手流程,发送FIN 包,此时本端会触发事件如下: EPOLLRDHUP (需要主动在epoll_ctal时加入events) EPOLLIN EPOLLOUT

此时应优先处理EPOLLRDHUP,它明确表明对端已经关闭,处理时close相应fd后,无需再继续处理其他事件;

如果不处理EPOLLRDHUP的话,也可以处理EPOLLIN事件,此时read返回0, 同样表明对端已经关闭;

如果以上两个事件都没有处理,而是在EPOLLOUT事件里又向fd写了数据,数据只是写入到本地tcp发送缓冲区,此时write调用会返回成功,但是紧接着epoll_wait又会返回如下事件组合:

1
EPOLLERR EPOLLHUP EPOLLIN EPOLLOUT POLLRDHUP (需要主动在epoll_ctal时加入events

可以看到相比之前多了EPOLLERR和EPOLLHUP,是因为之前收到了对端close时发送的FIN 包,此时再给对端发送数据,对端会返回RST包。

如果在收到RST包后,又向对端发送数据,会收到sigpipe异常,其默认处理是终止当前进程,此时可通过忽略此异常解决,忽略后write会返回

1
-1, erron =32, Broken pipe:c signal(SIGPIPE, SIG_IGN);

Broker pipie这个异常,说到底是应用层没有对相应的fd在收到对端关闭通知时,作正确的处理所致,它并不是tcp/ip通讯层面的问题。

下图可以看到发送了FIN包

对端close(kill, kill -9)时,如果接收缓冲区内还有数据,不会发送FIN包,而是发送RST,此时本端:

收到RST后的第一次写操作,写失败,errno = 104, Connection reset by peer; 之后将触发下列事件:

1
EPOLLIN EPOLLOUT EPOLLHUP EPOLLRDHUP(需要主动在epoll_ctal时加入events

收到RST后的第二次及后序的写操作,写失败,在忽略了SIGPIPE后,erron =32, Broken pipe;

收到RST后的读操作:errno = 104, Connection reset by peer

下面可以看到发送了RST包:

epoll源码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
/*
 *  fs/eventpoll.c (Efficient event retrieval implementation)
 *  Copyright (C) 2001,...,2009     Davide Libenzi
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  Davide Libenzi <davidel@xmailserver.org>
 *
 */
/*
 * 在深入了解epoll的实现之前, 先来了解内核的3个方面.
 * 1. 等待队列 waitqueue
 * 我们简单解释一下等待队列:
 * 队列头(wait_queue_head_t)往往是资源生产者,
 * 队列成员(wait_queue_t)往往是资源消费者,
 * 当头的资源ready后, 会逐个执行每个成员指定的回调函数,
 * 来通知它们资源已经ready了, 等待队列大致就这个意思.
 * 2. 内核的poll机制
 * 被Poll的fd, 必须在实现上支持内核的Poll技术,
 * 比如fd是某个字符设备,或者是个socket, 它必须实现
 * file_operations中的poll操作, 给自己分配有一个等待队列头.
 * 主动poll fd的某个进程必须分配一个等待队列成员, 添加到
 * fd的对待队列里面去, 并指定资源ready时的回调函数.
 * 用socket做例子, 它必须有实现一个poll操作, 这个Poll是
 * 发起轮询的代码必须主动调用的, 该函数中必须调用poll_wait(),
 * poll_wait会将发起者作为等待队列成员加入到socket的等待队列中去.
 * 这样socket发生状态变化时可以通过队列头逐个通知所有关心它的进程.
 * 这一点必须很清楚的理解, 否则会想不明白epoll是如何
 * 得知fd的状态发生变化的.
 * 3. epollfd本身也是个fd, 所以它本身也可以被epoll,
 * 可以猜测一下它是不是可以无限嵌套epoll下去...
 *
 * epoll基本上就是使用了上面的1,2点来完成.
 * 可见epoll本身并没有给内核引入什么特别复杂或者高深的技术,
 * 只不过是已有功能的重新组合, 达到了超过select的效果.
 */
/*
 * 相关的其它内核知识:
 * 1. fd我们知道是文件描述符, 在内核态, 与之对应的是struct file结构,
 * 可以看作是内核态的文件描述符.
 * 2. spinlock, 自旋锁, 必须要非常小心使用的锁,
 * 尤其是调用spin_lock_irqsave()的时候, 中断关闭, 不会发生进程调度,
 * 被保护的资源其它CPU也无法访问. 这个锁是很强力的, 所以只能锁一些
 * 非常轻量级的操作.
 * 3. 引用计数在内核中是非常重要的概念,
 * 内核代码里面经常有些release, free释放资源的函数几乎不加任何锁,
 * 这是因为这些函数往往是在对象的引用计数变成0时被调用,
 * 既然没有进程在使用在这些对象, 自然也不需要加锁.
 * struct file 是持有引用计数的.
 */
/* --- epoll相关的数据结构 --- */
/*
 * This structure is stored inside the "private_data" member of the file
 * structure and rapresent the main data sructure for the eventpoll
 * interface.
 */
/* 每创建一个epollfd, 内核就会分配一个eventpoll与之对应, 可以说是
 * 内核态的epollfd. */
struct eventpoll {
    /* Protect the this structure access */
    spinlock_t lock;
    /*
     * This mutex is used to ensure that files are not removed
     * while epoll is using them. This is held during the event
     * collection loop, the file cleanup path, the epoll file exit
     * code and the ctl operations.
     */
    /* 添加, 修改或者删除监听fd的时候, 以及epoll_wait返回, 向用户空间
     * 传递数据时都会持有这个互斥锁, 所以在用户空间可以放心的在多个线程
     * 中同时执行epoll相关的操作, 内核级已经做了保护. */
    struct mutex mtx;
    /* Wait queue used by sys_epoll_wait() */
    /* 调用epoll_wait()时, 我们就是"睡"在了这个等待队列上... */
    wait_queue_head_t wq;
    /* Wait queue used by file->poll() */
    /* 这个用于epollfd本事被poll的时候... */
    wait_queue_head_t poll_wait;
    /* List of ready file descriptors */
    /* 所有已经ready的epitem都在这个链表里面 */
    struct list_head rdllist;
    /* RB tree root used to store monitored fd structs */
    /* 所有要监听的epitem都在这里 */
    struct rb_root rbr;
    /*
        这是一个单链表链接着所有的struct epitem当event转移到用户空间时
     */
     * This is a single linked list that chains all the "struct epitem" that
     * happened while transfering ready events to userspace w/out
     * holding ->lock.
     */
    struct epitem *ovflist;
    /* The user that created the eventpoll descriptor */
    /* 这里保存了一些用户变量, 比如fd监听数量的最大值等等 */
    struct user_struct *user;
};
/*
 * Each file descriptor added to the eventpoll interface will
 * have an entry of this type linked to the "rbr" RB tree.
 */
/* epitem 表示一个被监听的fd */
struct epitem {
    /* RB tree node used to link this structure to the eventpoll RB tree */
    /* rb_node, 当使用epoll_ctl()将一批fds加入到某个epollfd时, 内核会分配
     * 一批的epitem与fds们对应, 而且它们以rb_tree的形式组织起来, tree的root
     * 保存在epollfd, 也就是struct eventpoll中.
     * 在这里使用rb_tree的原因我认为是提高查找,插入以及删除的速度.
     * rb_tree对以上3个操作都具有O(lgN)的时间复杂度 */
    struct rb_node rbn;
    /* List header used to link this structure to the eventpoll ready list */
    /* 链表节点, 所有已经ready的epitem都会被链到eventpoll的rdllist中 */
    struct list_head rdllink;
    /*
     * Works together "struct eventpoll"->ovflist in keeping the
     * single linked chain of items.
     */
    /* 这个在代码中再解释... */
    struct epitem *next;
    /* The file descriptor information this item refers to */
    /* epitem对应的fd和struct file */
    struct epoll_filefd ffd;
    /* Number of active wait queue attached to poll operations */
    int nwait;
    /* List containing poll wait queues */
    struct list_head pwqlist;
    /* The "container" of this item */
    /* 当前epitem属于哪个eventpoll */
    struct eventpoll *ep;
    /* List header used to link this item to the "struct file" items list */
    struct list_head fllink;
    /* The structure that describe the interested events and the source fd */
    /* 当前的epitem关系哪些events, 这个数据是调用epoll_ctl时从用户态传递过来 */
    struct epoll_event event;
};
struct epoll_filefd {
    struct file *file;
    int fd;
};
/* poll所用到的钩子Wait structure used by the poll hooks */
struct eppoll_entry {
    /* List header used to link this structure to the "struct epitem" */
    struct list_head llink;
    /* The "base" pointer is set to the container "struct epitem" */
    struct epitem *base;
    /*
     * Wait queue item that will be linked to the target file wait
     * queue head.
     */
    wait_queue_t wait;
    /* The wait queue head that linked the "wait" wait queue item */
    wait_queue_head_t *whead;
};
/* Wrapper struct used by poll queueing */
struct ep_pqueue {
    poll_table pt;
    struct epitem *epi;
};
/* Used by the ep_send_events() function as callback private data */
struct ep_send_events_data {
    int maxevents;
    struct epoll_event __user *events;
};

/* --- 代码注释 --- */
/* 你没看错, 这就是epoll_create()的真身, 基本啥也不干直接调用epoll_create1了,
 * 另外你也可以发现, size这个参数其实是没有任何用处的... */
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
/* 这才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error;
    struct eventpoll *ep = NULL;//主描述符
    /* Check the EPOLL_* constant for consistency.  */
    /* 这句没啥用处... */
    BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
    /* 对于epoll来讲, 目前唯一有效的FLAG就是CLOEXEC */
    if (flags & ~EPOLL_CLOEXEC)
        return -EINVAL;
    /*
     * Create the internal data structure ("struct eventpoll").
     */
    /* 分配一个struct eventpoll, 分配和初始化细节我们随后深聊~ */
    error = ep_alloc(&ep);
    if (error < 0)
        return error;
    /*
     * Creates all the items needed to setup an eventpoll file. That is,
     * a file structure and a free file descriptor.
     */
    /* 这里是创建一个匿名fd, 说起来就话长了...长话短说:
     * epollfd本身并不存在一个真正的文件与之对应, 所以内核需要创建一个
     * "虚拟"的文件, 并为之分配真正的struct file结构, 而且有真正的fd.
     * 这里2个参数比较关键:
     * eventpoll_fops, fops就是file operations, 就是当你对这个文件(这里是虚拟的)进行操作(比如读)时,
     * fops里面的函数指针指向真正的操作实现, 类似C++里面虚函数和子类的概念.
     * epoll只实现了poll和release(就是close)操作, 其它文件系统操作都有VFS全权处理了.
     * ep, ep就是struct epollevent, 它会作为一个私有数据保存在struct file的private指针里面.
     * 其实说白了, 就是为了能通过fd找到struct file, 通过struct file能找到eventpoll结构.
     * 如果懂一点Linux下字符设备驱动开发, 这里应该是很好理解的,
     * 推荐阅读 <Linux device driver 3rd>
     */
    error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
                 O_RDWR | (flags & O_CLOEXEC));
    if (error < 0)
        ep_free(ep);
    return error;
}
/*
* 创建好epollfd后, 接下来我们要往里面添加fd咯
* 来看epoll_ctl
* epfd 就是epollfd
* op ADD,MOD,DEL
* fd 需要监听的描述符
* event 我们关心的events
*/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
        struct epoll_event __user *, event)
{
    int error;
    struct file *file, *tfile;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
    error = -EFAULT;
    /*
     * 错误处理以及从用户空间将epoll_event结构copy到内核空间.
     */
    if (ep_op_has_event(op) &&
    copy_from_user(&epds, event, sizeof(struct epoll_event)))
        goto error_return;
    /* Get the "struct file *" for the eventpoll file */
    /* 取得struct file结构, epfd既然是真正的fd, 那么内核空间
     * 就会有与之对于的一个struct file结构
     * 这个结构在epoll_create1()中, 由函数anon_inode_getfd()分配 */
    error = -EBADF;
    file = fget(epfd);
    if (!file)
        goto error_return;
    /* Get the "struct file *" for the target file */
    /* 我们需要监听的fd, 它当然也有个struct file结构, 上下2个不要搞混了哦 */
    tfile = fget(fd);
    if (!tfile)
        goto error_fput;
    /* The target file descriptor must support poll */
    error = -EPERM;
    /* 如果监听的文件不支持poll, 那就没辙了.
     * 你知道什么情况下, 文件会不支持poll吗?
     */
    if (!tfile->f_op || !tfile->f_op->poll)
        goto error_tgt_fput;
    /*
     * We have to check that the file structure underneath the file descriptor
     * the user passed to us _is_ an eventpoll file. And also we do not permit
     * adding an epoll file descriptor inside itself.
     */
    error = -EINVAL;
    /* epoll不能自己监听自己... */
    if (file == tfile || !is_file_epoll(file))
        goto error_tgt_fput;
    /*
     * At this point it is safe to assume that the "private_data" contains
     * our own data structure.
     */
    /* 取到我们的eventpoll结构, 来自与epoll_create1()中的分配 */
    ep = file->private_data;
    /* 接下来的操作有可能修改数据结构内容, 锁之~ */
    mutex_lock(&ep->mtx);
    /*
     * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
     * above, we can be sure to be able to use the item looked up by
     * ep_find() till we release the mutex.
     */
    /* 对于每一个监听的fd, 内核都有分配一个epitem结构,
     * 而且我们也知道, epoll是不允许重复添加fd的,
     * 所以我们首先查找该fd是不是已经存在了.
     * ep_find()其实就是RBTREE查找, 跟C++STL的map差不多一回事, O(lgn)的时间复杂度.
     */
    epi = ep_find(ep, tfile, fd);
    error = -EINVAL;
    switch (op) {
        /* 首先我们关心添加 */
    case EPOLL_CTL_ADD:
        if (!epi) {
            /* 之前的find没有找到有效的epitem, 证明是第一次插入, 接受!
             * 这里我们可以知道, POLLERR和POLLHUP事件内核总是会关心的
             * */
            epds.events |= POLLERR | POLLHUP;
            /* rbtree插入, 详情见ep_insert()的分析
             * 其实我觉得这里有insert的话, 之前的find应该
             * 是可以省掉的... */
            error = ep_insert(ep, &epds, tfile, fd);
        } else
            /* 找到了!? 重复添加! */
            error = -EEXIST;
        break;
        /* 删除和修改操作都比较简单 */
    case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:
        if (epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_modify(ep, epi, &epds);
        } else
            error = -ENOENT;
        break;
    }
    mutex_unlock(&ep->mtx);
error_tgt_fput:
    fput(tfile);
error_fput:
    fput(file);
error_return:
    return error;
}
/* 分配一个eventpoll结构 */
static int ep_alloc(struct eventpoll **pep)
{
    int error;
    struct user_struct *user;
    struct eventpoll *ep;
    /* 获取当前用户的一些信息, 比如是不是root啦, 最大监听fd数目啦 */
    user = get_current_user();
    error = -ENOMEM;
    ep = kzalloc(sizeof(*ep), GFP_KERNEL);
    if (unlikely(!ep))
        goto free_uid;
    /* 这些都是初始化啦 */
    spin_lock_init(&ep->lock);
    mutex_init(&ep->mtx);
    init_waitqueue_head(&ep->wq);//初始化自己睡在的等待队列
    init_waitqueue_head(&ep->poll_wait);//初始化
    INIT_LIST_HEAD(&ep->rdllist);//初始化就绪链表
    ep->rbr = RB_ROOT;
    ep->ovflist = EP_UNACTIVE_PTR;
    ep->user = user;
    *pep = ep;
    return 0;
free_uid:
    free_uid(user);
    return error;
}
/*
 * Must be called with "mtx" held.
 */
/*
 * ep_insert()在epoll_ctl()中被调用, 完成往epollfd里面添加一个监听fd的工作
 * tfile是fd在内核态的struct file结构
 */
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
         struct file *tfile, int fd)
{
    int error, revents, pwake = 0;
    unsigned long flags;
    struct epitem *epi;
    struct ep_pqueue epq;
    /* 查看是否达到当前用户的最大监听数 */
    if (unlikely(atomic_read(&ep->user->epoll_watches) >=
         max_user_watches))
        return -ENOSPC;
    /* 从著名的slab中分配一个epitem */
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
        return -ENOMEM;
    /* Item initialization follow here ... */
    /* 这些都是相关成员的初始化... */
    INIT_LIST_HEAD(&epi->rdllink);
    INIT_LIST_HEAD(&epi->fllink);
    INIT_LIST_HEAD(&epi->pwqlist);
    epi->ep = ep;
    /* 这里保存了我们需要监听的文件fd和它的file结构 */
    ep_set_ffd(&epi->ffd, tfile, fd);
    epi->event = *event;
    epi->nwait = 0;
    /* 这个指针的初值不是NULL哦... */
    epi->next = EP_UNACTIVE_PTR;
    /* Initialize the poll table using the queue callback */
    /* 好, 我们终于要进入到poll的正题了 */
    epq.epi = epi;
    /* 初始化一个poll_table
     * 其实就是指定调用poll_wait(注意不是epoll_wait!!!)时的回调函数,和我们关心哪些events,
     * ep_ptable_queue_proc()就是我们的回调啦, 初值是所有event都关心 */
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    /*
     * Attach the item to the poll hooks and get current event bits.
     * We can safely use the file* here because its usage count has
     * been increased by the caller of this function. Note that after
     * this operation completes, the poll callback can start hitting
     * the new item.
     */
    /* 这一部很关键, 也比较难懂, 完全是内核的poll机制导致的...
     * 首先, f_op->poll()一般来说只是个wrapper, 它会调用真正的poll实现,
     * 拿UDP的socket来举例, 这里就是这样的调用流程: f_op->poll(), sock_poll(),
     * udp_poll(), datagram_poll(), sock_poll_wait(), 最后调用到我们上面指定的
     * ep_ptable_queue_proc()这个回调函数...(好深的调用路径...).
     * 完成这一步, 我们的epitem就跟这个socket关联起来了, 当它有状态变化时,
     * 会通过ep_poll_callback()来通知.
     * 最后, 这个函数还会查询当前的fd是不是已经有啥event已经ready了, 有的话
     * 会将event返回. */
    revents = tfile->f_op->poll(tfile, &epq.pt);
    /*
     * We have to check if something went wrong during the poll wait queue
     * install process. Namely an allocation for a wait queue failed due
     * high memory pressure.
     */
    error = -ENOMEM;
    if (epi->nwait < 0)
        goto error_unregister;
    /* Add the current item to the list of active epoll hook for this file */
    /* 这个就是每个文件会将所有监听自己的epitem链起来 */
    spin_lock(&tfile->f_lock);
    list_add_tail(&epi->fllink, &tfile->f_ep_links);
    spin_unlock(&tfile->f_lock);
    /*
     * Add the current item to the RB tree. All RB tree operations are
     * protected by "mtx", and ep_insert() is called with "mtx" held.
     */
    /* 都搞定后, 将epitem插入到对应的eventpoll中去 */
    ep_rbtree_insert(ep, epi);
    /* We have to drop the new item inside our item list to keep track of it */
    spin_lock_irqsave(&ep->lock, flags);
    /* If the file is already "ready" we drop it inside the ready list */
    /* 到达这里后, 如果我们监听的fd已经有事件发生, 那就要处理一下 */
    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
        /* 将当前的epitem加入到ready list中去 */
        list_add_tail(&epi->rdllink, &ep->rdllist);
        /* Notify waiting tasks that events are available */
        /* 谁在epoll_wait, 就唤醒它... */
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        /* 谁在epoll当前的epollfd, 也唤醒它... */
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }
    spin_unlock_irqrestore(&ep->lock, flags);
    atomic_inc(&ep->user->epoll_watches);
    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return 0;
error_unregister:
    ep_unregister_pollwait(ep, epi);
    /*
     * We need to do this because an event could have been arrived on some
     * allocated wait queue. Note that we don't care about the ep->ovflist
     * list, since that is used/cleaned only inside a section bound by "mtx".
     * And ep_insert() is called with "mtx" held.
     */
    spin_lock_irqsave(&ep->lock, flags);
    if (ep_is_linked(&epi->rdllink))
        list_del_init(&epi->rdllink);
    spin_unlock_irqrestore(&ep->lock, flags);
    kmem_cache_free(epi_cache, epi);
    return error;
}
/*
 * This is the callback that is used to add our wait queue to the
 * target file wakeup lists.
 */
/*
 * 该函数在调用f_op->poll()时会被调用.
 * 也就是epoll主动poll某个fd时, 用来将epitem与指定的fd关联起来的.
 * 关联的办法就是使用等待队列(waitqueue)
 */
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;
    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        /* 初始化等待队列, 指定ep_poll_callback为唤醒时的回调函数,
         * 当我们监听的fd发生状态改变时, 也就是队列头被唤醒时,
         * 指定的回调函数将会被调用. */
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        /* 将刚分配的等待队列成员加入到头中, 头是由fd持有的 */
        add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        /* nwait记录了当前epitem加入到了多少个等待队列中,
         * 我认为这个值最大也只会是1... */
        epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}
/*
 * This is the callback that is passed to the wait queue wakeup
 * machanism. It is called by the stored file descriptors when they
 * have events to report.
 */
/*
 * 这个是关键性的回调函数, 当我们监听的fd发生状态改变时, 它会被调用.
 * 参数key被当作一个unsigned long整数使用, 携带的是events.
 */
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    int pwake = 0;
    unsigned long flags;
    struct epitem *epi = ep_item_from_wait(wait);//从等待队列获取epitem.需要知道哪个进程挂载到这个设备
    struct eventpoll *ep = epi->ep;//获取
    spin_lock_irqsave(&ep->lock, flags);
    /*
     * If the event mask does not contain any poll(2) event, we consider the
     * descriptor to be disabled. This condition is likely the effect of the
     * EPOLLONESHOT bit that disables the descriptor when an event is received,
     * until the next EPOLL_CTL_MOD will be issued.
     */
    if (!(epi->event.events & ~EP_PRIVATE_BITS))
        goto out_unlock;
    /*
     * Check the events coming with the callback. At this stage, not
     * every device reports the events in the "key" parameter of the
     * callback. We need to be able to handle both cases here, hence the
     * test for "key" != NULL before the event match test.
     */
    /* 没有我们关心的event... */
    if (key && !((unsigned long) key & epi->event.events))
        goto out_unlock;
    /*
     * If we are trasfering events to userspace, we can hold no locks
     * (because we're accessing user memory, and because of linux f_op->poll()
     * semantics). All the events that happens during that period of time are
     * chained in ep->ovflist and requeued later on.
     */
    /*
     * 这里看起来可能有点费解, 其实干的事情比较简单:
     * 如果该callback被调用的同时, epoll_wait()已经返回了,
     * 也就是说, 此刻应用程序有可能已经在循环获取events,
     * 这种情况下, 内核将此刻发生event的epitem用一个单独的链表
     * 链起来, 不发给应用程序, 也不丢弃, 而是在下一次epoll_wait
     * 时返回给用户.
     */
    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
        if (epi->next == EP_UNACTIVE_PTR) {
            epi->next = ep->ovflist;
            ep->ovflist = epi;
        }
        goto out_unlock;
    }
    /* If this file is already in the ready list we exit soon */
    /* 将当前的epitem放入ready list */
    if (!ep_is_linked(&epi->rdllink))
        list_add_tail(&epi->rdllink, &ep->rdllist);
    /*
     * Wake up ( if active ) both the eventpoll wait list and the ->poll()
     * wait list.
     */
    /* 唤醒epoll_wait... */
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    /* 如果epollfd也在被poll, 那就唤醒队列里面的所有成员. */
    if (waitqueue_active(&ep->poll_wait))
        pwake++;
out_unlock:
    spin_unlock_irqrestore(&ep->lock, flags);
    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return 1;
}
/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_wait(2).
 */
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout)
{
    int error;
    struct file *file;
    struct eventpoll *ep;
    /* The maximum number of event must be greater than zero */
    if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
        return -EINVAL;
    /* Verify that the area passed by the user is writeable */
    /* 这个地方有必要说明一下:
     * 内核对应用程序采取的策略是"绝对不信任",
     * 所以内核跟应用程序之间的数据交互大都是copy, 不允许(也时候也是不能...)指针引用.
     * epoll_wait()需要内核返回数据给用户空间, 内存由用户程序提供,
     * 所以内核会用一些手段来验证这一段内存空间是不是有效的.
     */
    if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
        error = -EFAULT;
        goto error_return;
    }
    /* Get the "struct file *" for the eventpoll file */
    error = -EBADF;
    /* 获取epollfd的struct file, epollfd也是文件嘛 */
    file = fget(epfd);
    if (!file)
        goto error_return;
    /*
     * We have to check that the file structure underneath the fd
     * the user passed to us _is_ an eventpoll file.
     */
    error = -EINVAL;
    /* 检查一下它是不是一个真正的epollfd... */
    if (!is_file_epoll(file))
        goto error_fput;
    /*
     * At this point it is safe to assume that the "private_data" contains
     * our own data structure.
     */
    /* 获取eventpoll结构 */
    ep = file->private_data;
    /* Time to fish for events ... */
    /* OK, 睡觉, 等待事件到来~~ */
    error = ep_poll(ep, events, maxevents, timeout);
error_fput:
    fput(file);
error_return:
    return error;
}
/* 这个函数真正将执行epoll_wait的进程带入睡眠状态... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    int res, eavail;
    unsigned long flags;
    long jtimeout;
    wait_queue_t wait;//等待队列
    /*
     * Calculate the timeout by checking for the "infinite" value (-1)
     * and the overflow condition. The passed timeout is in milliseconds,
     * that why (t * HZ) / 1000.
     */
    /* 计算睡觉时间, 毫秒要转换为HZ */
    jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
        MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
    spin_lock_irqsave(&ep->lock, flags);
    res = 0;
    /* 如果ready list不为空, 就不睡了, 直接干活... */
    if (list_empty(&ep->rdllist)) {
        /*
         * We don't have any available event to return to the caller.
         * We need to sleep here, and we will be wake up by
         * ep_poll_callback() when events will become available.
         */
        /* OK, 初始化一个等待队列, 准备直接把自己挂起,
         * 注意current是一个宏, 代表当前进程 */
        init_waitqueue_entry(&wait, current);//初始化等待队列,wait表示当前进程
        __add_wait_queue_exclusive(&ep->wq, &wait);//挂载到ep结构的等待队列
        for (;;) {
            /*
             * We don't want to sleep if the ep_poll_callback() sends us
             * a wakeup in between. That's why we set the task state
             * to TASK_INTERRUPTIBLE before doing the checks.
             */
            /* 将当前进程设置位睡眠, 但是可以被信号唤醒的状态,
             * 注意这个设置是"将来时", 我们此刻还没睡! */
            set_current_state(TASK_INTERRUPTIBLE);
            /* 如果这个时候, ready list里面有成员了,
             * 或者睡眠时间已经过了, 就直接不睡了... */
            if (!list_empty(&ep->rdllist) || !jtimeout)
                break;
            /* 如果有信号产生, 也起床... */
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }
            /* 啥事都没有,解锁, 睡觉... */
            spin_unlock_irqrestore(&ep->lock, flags);
            /* jtimeout这个时间后, 会被唤醒,
             * ep_poll_callback()如果此时被调用,
             * 那么我们就会直接被唤醒, 不用等时间了...
             * 再次强调一下ep_poll_callback()的调用时机是由被监听的fd
             * 的具体实现, 比如socket或者某个设备驱动来决定的,
             * 因为等待队列头是他们持有的, epoll和当前进程
             * 只是单纯的等待...
             **/
            jtimeout = schedule_timeout(jtimeout);//睡觉
            spin_lock_irqsave(&ep->lock, flags);
        }
        __remove_wait_queue(&ep->wq, &wait);
        /* OK 我们醒来了... */
        set_current_state(TASK_RUNNING);
    }
    /* Is it worth to try to dig for events ? */
    eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
    spin_unlock_irqrestore(&ep->lock, flags);
    /*
     * Try to transfer events to user space. In case we get 0 events and
     * there's still timeout left over, we go trying again in search of
     * more luck.
     */
    /* 如果一切正常, 有event发生, 就开始准备数据copy给用户空间了... */
    if (!res && eavail &&
    !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
        goto retry;
    return res;
}
/* 这个简单, 我们直奔下一个... */
static int ep_send_events(struct eventpoll *ep,
              struct epoll_event __user *events, int maxevents)
{
    struct ep_send_events_data esed;
    esed.maxevents = maxevents;
    esed.events = events;
    return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
/**
 * ep_scan_ready_list - Scans the ready list in a way that makes possible for
 *  the scan code, to call f_op->poll(). Also allows for
 *  O(NumReady) performance.
 *
 * @ep: Pointer to the epoll private data structure.
 * @sproc: Pointer to the scan callback.
 * @priv: Private opaque data passed to the @sproc callback.
 *
 * Returns: The same integer error code returned by the @sproc callback.
 */
static int ep_scan_ready_list(struct eventpoll *ep,
              int (*sproc)(struct eventpoll *,
                       struct list_head *, void *),
              void *priv)
{
    int error, pwake = 0;
    unsigned long flags;
    struct epitem *epi, *nepi;
    LIST_HEAD(txlist);
    /*
     * We need to lock this because we could be hit by
     * eventpoll_release_file() and epoll_ctl().
     */
    mutex_lock(&ep->mtx);
    /*
     * Steal the ready list, and re-init the original one to the
     * empty list. Also, set ep->ovflist to NULL so that events
     * happening while looping w/out locks, are not lost. We cannot
     * have the poll callback to queue directly on ep->rdllist,
     * because we want the "sproc" callback to be able to do it
     * in a lockless way.
     */
    spin_lock_irqsave(&ep->lock, flags);
    /* 这一步要注意, 首先, 所有监听到events的epitem都链到rdllist上了,
     * 但是这一步之后, 所有的epitem都转移到了txlist上, 而rdllist被清空了,
     * 要注意哦, rdllist已经被清空了! */
    list_splice_init(&ep->rdllist, &txlist);
    /* ovflist, 在ep_poll_callback()里面我解释过, 此时此刻我们不希望
     * 有新的event加入到ready list中了, 保存后下次再处理... */
    ep->ovflist = NULL;
    spin_unlock_irqrestore(&ep->lock, flags);
    /*
     * Now call the callback function.
     */
    /* 在这个回调函数里面处理每个epitem
     * sproc 就是 ep_send_events_proc, 下面会注释到. */
    error = (*sproc)(ep, &txlist, priv);
    spin_lock_irqsave(&ep->lock, flags);
    /*
     * During the time we spent inside the "sproc" callback, some
     * other events might have been queued by the poll callback.
     * We re-insert them inside the main ready-list here.
     */
    /* 现在我们来处理ovflist, 这些epitem都是我们在传递数据给用户空间时
     * 监听到了事件. */
    for (nepi = ep->ovflist; (epi = nepi) != NULL;
     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
        /*
         * We need to check if the item is already in the list.
         * During the "sproc" callback execution time, items are
         * queued into ->ovflist but the "txlist" might already
         * contain them, and the list_splice() below takes care of them.
         */
        /* 将这些直接放入readylist */
        if (!ep_is_linked(&epi->rdllink))
            list_add_tail(&epi->rdllink, &ep->rdllist);
    }
    /*
     * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
     * releasing the lock, events will be queued in the normal way inside
     * ep->rdllist.
     */
    ep->ovflist = EP_UNACTIVE_PTR;
    /*
     * Quickly re-inject items left on "txlist".
     */
    /* 上一次没有处理完的epitem, 重新插入到ready list */
    list_splice(&txlist, &ep->rdllist);
    /* ready list不为空, 直接唤醒... */
    if (!list_empty(&ep->rdllist)) {
        /*
         * Wake up (if active) both the eventpoll wait list and
         * the ->poll() wait list (delayed after we release the lock).
         */
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }
    spin_unlock_irqrestore(&ep->lock, flags);
    mutex_unlock(&ep->mtx);
    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return error;
}
/* 该函数作为callbakc在ep_scan_ready_list()中被调用
 * head是一个链表, 包含了已经ready的epitem,
 * 这个不是eventpoll里面的ready list, 而是上面函数中的txlist.
 */
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
               void *priv)
{
    struct ep_send_events_data *esed = priv;
    int eventcnt;
    unsigned int revents;
    struct epitem *epi;
    struct epoll_event __user *uevent;
    /*
     * We can loop without lock because we are passed a task private list.
     * Items cannot vanish during the loop because ep_scan_ready_list() is
     * holding "mtx" during this call.
     */
    /* 扫描整个链表... */
    for (eventcnt = 0, uevent = esed->events;
     !list_empty(head) && eventcnt < esed->maxevents;) {
        /* 取出第一个成员 */
        epi = list_first_entry(head, struct epitem, rdllink);
        /* 然后从链表里面移除 */
        list_del_init(&epi->rdllink);
        /* 读取events,
         * 注意events我们ep_poll_callback()里面已经取过一次了, 为啥还要再取?
         * 1. 我们当然希望能拿到此刻的最新数据, events是会变的~
         * 2. 不是所有的poll实现, 都通过等待队列传递了events, 有可能某些驱动压根没传
         * 必须主动去读取. */
        revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
            epi->event.events;
        if (revents) {
            /* 将当前的事件和用户传入的数据都copy给用户空间,
             * 就是epoll_wait()后应用程序能读到的那一堆数据. */
            if (__put_user(revents, &uevent->events) ||
            __put_user(epi->event.data, &uevent->data)) {
                list_add(&epi->rdllink, head);
                return eventcnt ? eventcnt : -EFAULT;
            }
            eventcnt++;
            uevent++;
            if (epi->event.events & EPOLLONESHOT)
                epi->event.events &= EP_PRIVATE_BITS;
            else if (!(epi->event.events & EPOLLET)) {
                /* 嘿嘿, EPOLLET和非ET的区别就在这一步之差呀~
                 * 如果是ET, epitem是不会再进入到readly list,
                 * 除非fd再次发生了状态改变, ep_poll_callback被调用.
                 * 如果是非ET, 不管你还有没有有效的事件或者数据,
                 * 都会被重新插入到ready list, 再下一次epoll_wait
                 * 时, 会立即返回, 并通知给用户空间. 当然如果这个
                 * 被监听的fds确实没事件也没数据了, epoll_wait会返回一个0,
                 * 空转一次.
                 */
                list_add_tail(&epi->rdllink, &ep->rdllist);
            }
        }
    }
    return eventcnt;
}
/* ep_free在epollfd被close时调用,
 * 释放一些资源而已, 比较简单 */
static void ep_free(struct eventpoll *ep)
{
    struct rb_node *rbp;
    struct epitem *epi;
    /* We need to release all tasks waiting for these file */
    if (waitqueue_active(&ep->poll_wait))
        ep_poll_safewake(&ep->poll_wait);
    /*
     * We need to lock this because we could be hit by
     * eventpoll_release_file() while we're freeing the "struct eventpoll".
     * We do not need to hold "ep->mtx" here because the epoll file
     * is on the way to be removed and no one has references to it
     * anymore. The only hit might come from eventpoll_release_file() but
     * holding "epmutex" is sufficent here.
     */
    mutex_lock(&epmutex);
    /*
     * Walks through the whole tree by unregistering poll callbacks.
     */
    for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
        epi = rb_entry(rbp, struct epitem, rbn);
        ep_unregister_pollwait(ep, epi);
    }
    /*
     * Walks through the whole tree by freeing each "struct epitem". At this
     * point we are sure no poll callbacks will be lingering around, and also by
     * holding "epmutex" we can be sure that no file cleanup code will hit
     * us during this operation. So we can avoid the lock on "ep->lock".
     */
    /* 之所以在关闭epollfd之前不需要调用epoll_ctl移除已经添加的fd,
     * 是因为这里已经做了... */
    while ((rbp = rb_first(&ep->rbr)) != NULL) {
        epi = rb_entry(rbp, struct epitem, rbn);
        ep_remove(ep, epi);
    }
    mutex_unlock(&epmutex);
    mutex_destroy(&ep->mtx);
    free_uid(ep->user);
    kfree(ep);
}
/* File callbacks that implement the eventpoll file behaviour */
static const struct file_operations eventpoll_fops = {
    .release    = ep_eventpoll_release,
    .poll        = ep_eventpoll_poll
};
/* Fast test to see if the file is an evenpoll file */
static inline int is_file_epoll(struct file *f)
{
    return f->f_op == &eventpoll_fops;
}
/* OK, eventpoll我认为比较重要的函数都注释完了... */

区别

epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epoll和select和poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。

对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。

对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果)

对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

参考:

https://www.zhihu.com/question/30772664 Go netpoller 原生网络模型之源码全面解析 为何 epoll 的 ET 模式一定要设置为非阻塞IO