epoll 有水平触发 Level-triggered(LT) 和边沿触发 edge-triggered(ET) 两种模式。
假设有如下过程:
- 注册 pipe 文件描述符的读端(rfd)到 epoll 上
- 在 pipe 的写端写入 2KB 的数据
- 因为 rfd 文件描述符已经准备好读,epoll_wait 返回
- 从 rfd 中读取 1KB 数据
- 程序再次运行到 epoll_wait
如果为边沿触发(ET)模式,程序将会阻塞在 epoll_wait 上,即使有剩余数据也不会返回,意味着读不到剩余的 1KB 数据。但如果采用水平触发(LT),epoll_wait 函数将会再次返回。
如果采用边沿触发(ET)模式,建议:
- 使用非阻塞文件描述符
- 调用 read 或 write 直到 errno==EAGAIN
当 close 文件描述符时,该描述符会被自动的从 epoll 监听集中移除。
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64;} epoll_data_t;struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */};
epoll_wait 使用该结构体数组返回就绪的文件描述符集合,这样就不用遍历所有的文件描述符。
源码
#include#include #include #include #include #include #include #include #include #include #define MAXEVENTS 64/* 设置 socket 为非阻塞,必须先获取再设置 */static int make_socket_non_blocking(int sfd){ int flags, s; flags = fcntl(sfd, F_GETFL, 0); if (flags == -1) { perror ("fcntl"); return -1; } flags |= O_NONBLOCK; s = fcntl(sfd, F_SETFL, flags); if (s == -1) { perror("fcntl"); return -1; } return 0;}/* 创建并绑定 socket */static int create_and_bind(char *port){ struct addrinfo hints; struct addrinfo *result, *rp; int s, sfd; memset(&hints, 0, sizeof (struct addrinfo)); hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */ hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */ hints.ai_flags = AI_PASSIVE; /* All interfaces */ s = getaddrinfo(NULL, port, &hints, &result); if (s != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror (s)); return -1; } for (rp = result; rp != NULL; rp = rp->ai_next) { /* 创建 socket */ sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sfd == -1) { continue; } /* 绑定 socket 到地址 */ s = bind(sfd, rp->ai_addr, rp->ai_addrlen); if (s == 0) { /* We managed to bind successfully! */ break; } close(sfd); } if (rp == NULL) { fprintf (stderr, "Could not bind\n"); return -1; } freeaddrinfo(result); return sfd;}int main(int argc, char *argv[]){ int sfd, efd, s; struct epoll_event event; struct epoll_event *events; if (argc != 2) { fprintf(stderr, "Usage: %s [port]\n", argv[0]); exit(EXIT_FAILURE); } /* 创建并绑定 socket */ sfd = create_and_bind(argv[1]); if (sfd == -1) { abort(); } /* 设置 socket 为非阻塞 */ s = make_socket_non_blocking(sfd); if (s == -1) { abort(); } /* 监听 socket */ s = listen(sfd, SOMAXCONN); if (s == -1) { perror("listen"); abort(); } efd = epoll_create1(0); if (efd == -1) { perror("epoll_create"); abort(); } event.data.fd = sfd; event.events = EPOLLIN | EPOLLET; // 读事件 | 边沿触发 // 添加 sfd(监听 socket) 到 epoll s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event); if (s == -1) { perror("epoll_ctl"); abort(); } /* Buffer where events are returned */ events = calloc(MAXEVENTS, sizeof(event)); /* The event loop */ while (1) { int n, i; if ((n = epoll_wait(efd, events, MAXEVENTS, -1)) == -1) { perror("epoll_wait"); abort(); } for (i = 0; i < n; i++) { if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { /* socket 出错 | socket 挂起 | socket 读未准备好 * 为什么 socket 读为准备好也有问题? */ fprintf(stderr, "epoll error\n"); close(events[i].data.fd); continue; } else if (sfd == events[i].data.fd) { /* 监听 socket 事件通知,意味着一个或者多个连接到来 */ while (1) { struct sockaddr in_addr; socklen_t in_len; int infd; char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; in_len = sizeof(in_addr); infd = accept(sfd, &in_addr, &in_len); if (infd == -1) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { /* 所有连接请求都已处理完成 */ break; } else { perror ("accept"); break; } } s = getnameinfo(&in_addr, in_len, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV); if (s == 0) { printf("Accepted connection on descriptor %d " "(host=%s, port=%s)\n", infd, hbuf, sbuf); } /* 将 socket 转为非阻塞 */ s = make_socket_non_blocking(infd); if (s == -1) { abort(); } /* 将新的连接放入 epoll 中 */ event.data.fd = infd; event.events = EPOLLIN | EPOLLET; // 读事件 | 边沿触发 s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event); if (s == -1) { perror ("epoll_ctl"); abort(); } } continue; } else { int done = 0; /* socket 已经准备好读。在 EPOLLET 边沿触发模式下,必须将缓冲区中的数据都读取完, * 否则剩下的数据等不到通知 */ while (1) { ssize_t count; char buf[1024]; count = read(events[i].data.fd, buf, sizeof(buf)); if (count == -1) { /* errno == EAGAIN 意味着所有数据都已读取完成,此时可以跳出循环 */ if (errno != EAGAIN) { perror ("read"); done = 1; } break; } else if (count == 0) { /* count == 0,意味着对方关闭连接 */ done = 1; break; } /* 将缓冲区中的数据写入 socket */ s = write(events[i].data.fd, buf, count); if (s == -1) { perror("write"); abort(); } } if (done) { printf("Closed connection on descriptor %d\n", events[i].data.fd); /* 将 socket 关闭时,同时也会将该 socket 从 epoll 监听中移除 */ close(events[i].data.fd); } } } } free(events); close(sfd); return EXIT_SUCCESS;}
参考资料
1.
2.man epoll