实现一个简单的多路IO模型

用centos实现一个简单的IO(epoll)模型,给出详细步骤,

网上现成的:epoll 入门简明教程(一):认识 epoll
https://www.jianshu.com/p/67c988f750df

img

1、创建一个 socket,并将其设置为非阻塞模式。可以使用 socket() 函数创建一个 socket,并使用 fcntl() 函数将其设置为非阻塞模式。

int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
    perror("socket() failed");
    exit(EXIT_FAILURE);
}

int flags = fcntl(sockfd, F_GETFL, 0);
if (flags < 0) {
    perror("fcntl(F_GETFL) failed");
    exit(EXIT_FAILURE);
}

if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) < 0) {
    perror("fcntl(F_SETFL) failed");
    exit(EXIT_FAILURE);
}

2、绑定 socket 到一个本地地址和端口。可以使用 bind() 函数绑定 socket 到本地地址和端口。

struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(8080);

if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
    perror("bind() failed");
    exit(EXIT_FAILURE);
}

3、将 socket 设置为监听模式。可以使用 listen() 函数将 socket 设置为监听模式。

if (listen(sockfd, SOMAXCONN) < 0) {
    perror("listen() failed");
    exit(EXIT_FAILURE);
}

4、创建一个 epoll 对象。可以使用 epoll_create() 函数创建一个 epoll 对象。

int epfd = epoll_create(1);
if (epfd < 0) {
    perror("epoll_create() failed");
    exit(EXIT_FAILURE);
}

5、将 socket 加入到 epoll 对象中。可以使用 epoll_ctl() 函数将 socket 加入到 epoll 对象中。

struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockfd;

if (epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev) < 0) {
    perror("epoll_ctl(EPOLL_CTL_ADD) failed");
    exit(EXIT_FAILURE);
}

6、创建一个事件结构体数组,并使用 epoll_wait() 函数等待事件的发生。当事件发生时,epoll_wait() 函数会返回,并将事件信息存储在事件结构体数组中。

#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];

while (1) {
    int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
    if (nfds < 0) {
        perror("epoll_wait() failed");
        exit(EXIT_FAILURE);
    }

    for (int i = 0; i < nfds; i++) {
        if (events[i].data.fd == sockfd) {
            // 处理新的连接
        } else {
            // 处理已连接的 socket
        }
    }
}

7、处理事件。当事件发生时,可以检查事件结构体数组中的信息,并根据需要执行相应的操作。

if (events[i].events & EPOLLIN) {
    // 有数据可读
    char buf[1024];
    int n = read(clientfd, buf, sizeof(buf));
    if (n < 0) {
        perror("read() failed");
        close(clientfd);
        continue;
    } else if (n == 0) {
        // 客户端已关闭连接
        close(clientfd);
        continue;
    }
    // 将数据写回给客户端
    write(clientfd, buf, n);
}

8、关闭 socket 和 epoll 对象。在程序结束时,应该使用 close() 函数关闭 socket 和 epoll 对象。

close(sockfd);
close(epfd);

1.创建 epoll 句柄:首入需要使用 epoll_create 函数创建一个 epoll 句柄。这个句柄是 epoll 管理的主要数据结构,用于维护文件描述符的事件。
2.注册文件描述符:使用 epoll_ctl 函数可以将文件描述符注册到 epoll 句柄中。这样,当文件描述符上有事件发生时,epoll 就会捕获到这个事件。
3.等待事件:使用 epoll_wait 函数可以阻塞等待事件的发生。这个函数会挂起进程的执行,直到有事件发生或者超时。
4.处理事件:当 epoll_wait 函数返回时,表示有事件发生。可以使用一个循环来处理所有发生的事件。每个事件都会包含一个文件描述符和事件类型信息,可以根据这些信息来执行相应的操作。
5.关闭 epoll 句柄:最后,使用 close 函数关闭 epoll 句柄即可。

使用 epoll 实现一个简单的 IO 模型的步骤如下:

  1. 创建一个 epoll 句柄:使用 epoll_create 或 epoll_create1 函数创建一个 epoll 句柄。

  2. 创建套接字并绑定地址:使用 socket 函数创建一个套接字,并使用 bind 函数将其绑定到一个地址上。

  3. 设置套接字为监听状态:使用 listen 函数将套接字设置为监听状态。

  4. 将套接字添加到 epoll 句柄中:使用 epoll_ctl 函数将套接字添加到 epoll 句柄中。

  5. 调用 epoll_wait 函数等待事件:使用 epoll_wait 函数等待事件,如果有事件发生,则返回。

  6. 处理事件:当事件发生时,检查事件的类型,并执行相应的操作。

  7. 关闭套接字:使用 close 函数关闭套接字。

下面是一个示例代码,该代码实现了一个简单的 epoll 服务器,该服务器只能同时处理一个客户端连接:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

#define MAX_EVENTS 10
#define BUF_SIZE 1024

int main(int argc, char *argv[])
{
    int sockfd, new_sockfd;
    socklen_t addrlen;
    struct sockaddr_in server_addr, client_addr;
    char buffer[BUF_SIZE];
    int nbytes;

    // 创建 epoll 句柄
    int epfd = epoll_create1(0);
    if (epfd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }

    // 创建套接字并绑定地址
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        perror("socket");
        exit(EXIT_FAILURE);
    }

    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(8888);

    if (bind(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        perror("bind");
        exit(EXIT_FAILURE);
    }

    // 设置套接字为监听状态
    if (listen(sockfd, 5) == -1) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    // 将套接字添加到 epoll 句柄中
    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
        perror("epoll_ctl");
        exit(EXIT_FAILURE);
    }

    // 定义用于存储事件的数组
    struct epoll_event events[MAX_EVENTS];

    while (1) {
        // 调用 epoll_wait 函数等待事件
        int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            exit(EXIT_FAILURE);
        }

                // 遍历所有发生的事件
        for (int i = 0; i < nfds; i++) {
            // 如果有新的客户端连接,则接受连接
            if (events[i].data.fd == sockfd) {
                new_sockfd = accept(sockfd, (struct sockaddr *)&client_addr, &addrlen);
                if (new_sockfd == -1) {
                    perror("accept");
                    exit(EXIT_FAILURE);
                }

                printf("Accepted connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

                // 将新的客户端套接字添加到 epoll 句柄中
                ev.events = EPOLLIN;
                ev.data.fd = new_sockfd;
                if (epoll_ctl(epfd, EPOLL_CTL_ADD, new_sockfd, &ev) == -1) {
                    perror("epoll_ctl");
                    exit(EXIT_FAILURE);
                }
            }
            // 如果客户端套接字有数据可读,则读取数据
            else {
                nbytes = read(events[i].data.fd, buffer, BUF_SIZE);
                if (nbytes == 0) {
                    printf("Closed connection from %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
                    close(events[i].data.fd);
                }
                else {
                    printf("Received message: %s\n", buffer);
                }
            }
        }
    }

    close(sockfd);

    return 0;
}

该代码实现了一个简单的 epoll 服务器,该服务器只能同时处理一个客户端连接。这个示例代码只是一个简单的示例,在实际应用中,还需要考虑更多的细节,如错误处理、超时处理等。

这个解析笔记可以帮助你理解:Linux/Centos Epoll 原理解析,链接:https://zhuanlan.zhihu.com/p/354939854
下面是示例代码:

#include  <unistd.h>
#include  <sys/types.h>       /* basic system data types */
#include  <sys/socket.h>      /* basic socket definitions */
#include  <netinet/in.h>      /* sockaddr_in{} and other Internet defns */
#include  <arpa/inet.h>       /* inet(3) functions */
#include <sys/epoll.h> /* epoll function */
#include <fcntl.h>     /* nonblocking */
#include <sys/resource.h> /*setrlimit */

#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>



#define MAXEPOLLSIZE 10000
#define MAXLINE 10240
int handle(int connfd);
int setnonblocking(int sockfd)
{
    if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1) {
        return -1;
    }
    return 0;
}

int main(int argc, char **argv)
{
    int  servPort = 6888;
    int listenq = 1024;

    int listenfd, connfd, kdpfd, nfds, n, nread, curfds,acceptCount = 0;
    struct sockaddr_in servaddr, cliaddr;
    socklen_t socklen = sizeof(struct sockaddr_in);
    struct epoll_event ev;
    struct epoll_event events[MAXEPOLLSIZE];
    struct rlimit rt;
    char buf[MAXLINE];

    /* 设置每个进程允许打开的最大文件数 */
    rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;
    if (setrlimit(RLIMIT_NOFILE, &rt) == -1) 
    {
        perror("setrlimit error");
        return -1;
    }


    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET; 
    servaddr.sin_addr.s_addr = htonl (INADDR_ANY);
    servaddr.sin_port = htons (servPort);

    listenfd = socket(AF_INET, SOCK_STREAM, 0); 
    if (listenfd == -1) {
        perror("can't create socket file");
        return -1;
    }

    int opt = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    if (setnonblocking(listenfd) < 0) {
        perror("setnonblock error");
    }

    if (bind(listenfd, (struct sockaddr *) &servaddr, sizeof(struct sockaddr)) == -1) 
    {
        perror("bind error");
        return -1;
    } 
    if (listen(listenfd, listenq) == -1) 
    {
        perror("listen error");
        return -1;
    }
    /* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */
    kdpfd = epoll_create(MAXEPOLLSIZE);
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = listenfd;
    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listenfd, &ev) < 0) 
    {
        fprintf(stderr, "epoll set insertion error: fd=%d\n", listenfd);
        return -1;
    }
    curfds = 1;

    printf("epollserver startup,port %d, max connection is %d, backlog is %d\n", servPort, MAXEPOLLSIZE, listenq);

    for (;;) {
        /* 等待有事件发生 */
        nfds = epoll_wait(kdpfd, events, curfds, -1);
        if (nfds == -1)
        {
            perror("epoll_wait");
            continue;
        }
        /* 处理所有事件 */
        for (n = 0; n < nfds; ++n)
        {
            if (events[n].data.fd == listenfd) 
            {
                connfd = accept(listenfd, (struct sockaddr *)&cliaddr,&socklen);
                if (connfd < 0) 
                {
                    perror("accept error");
                    continue;
                }

                sprintf(buf, "accept form %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
                printf("%d:%s", ++acceptCount, buf);

                if (curfds >= MAXEPOLLSIZE) {
                    fprintf(stderr, "too many connection, more than %d\n", MAXEPOLLSIZE);
                    close(connfd);
                    continue;
                } 
                if (setnonblocking(connfd) < 0) {
                    perror("setnonblocking error");
                }
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = connfd;
                if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, connfd, &ev) < 0)
                {
                    fprintf(stderr, "add socket '%d' to epoll failed: %s\n", connfd, strerror(errno));
                    return -1;
                }
                curfds++;
                continue;
            } 
            // 处理客户端请求
            if (handle(events[n].data.fd) < 0) {
                epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
                curfds--;


            }
        }
    }
    close(listenfd);
    return 0;
}
int handle(int connfd) {
    int nread;
    char buf[MAXLINE];
    nread = read(connfd, buf, MAXLINE);//读取客户端socket流

    if (nread == 0) {
        printf("client close the connection\n");//nread==0为客户端正常调用函数closesocket
        close(connfd);
        return -1;
    } 
    if (nread < 0) {
        perror("read error");//nread<0为客户端杀进程
        close(connfd);
        return -1;
    }    
    write(connfd, buf, nread);//响应客户端  
    return 0;
}