Is it possible to use the same epoll file descriptor among streams?

Is it possible to share the same Epoll fd (not socket fd) among multiple threads? And if so, should each thread pass its own array of events to epoll_wait(2) or can it share it?

for example

  void *thread_func(void *thread_args) { // extract socket_fd, epoll_fd, &event, &events_array from // thread_args // epoll_wait() using epoll_fd and events_array received from main // now all threads would be using same epoll_fd and events array } void main( void ) { // create and bind to socket // create events_fd // allocate memory for events array // subscribe to events EPOLLIN and EPOLLET // pack the socket_fd, epoll_fd, &events, &events_array into // thread_args struct. // create multiple threads and pass thread_func and // same thread_args to all threads } 

Or better to do it like this:

  void *thread_func(void *socket_fd) { // create events_fd // allocate memory for events array // subscribe to events EPOLLIN and EPOLLET // epoll_wait using own epoll_fd and events_array // now all threads would have a separate epoll_fd with // events populated on its own array } void main(void) { // create and bind to socket //create multiple threads and pass thread_func and socket_fd to // all threads } 

Is there a good example of how to do this in C? The examples I saw start the event loop in main() and spawn a new thread to process the request whenever an event is detected. What I want to do is create a certain number of threads at the beginning of the program, and each thread starts an event loop and processing requests.

+7
c linux pthreads epoll
source share
1 answer

Is it possible to use the same Epoll fd (not socket fd) among several threads.

Yes, it is safe - the epoll(7) interface is thread safe, but you should be careful, at least use EPOLLET (the mode with the boundary mode, as opposed to the initial default level), to avoid side awakenings in other threads. This is because the level mode triggers the awakening of each thread when a new event is available for processing. Since only one thread will deal with it, this will make most threads unnecessary.

If shared epfd is used, each thread must pass its own events array or shared events array to epoll_wait ()

Yes, you need a separate array of events in each thread, otherwise you will have race conditions, and unpleasant things may arise. For example, you might have a stream that is still iterating through the events returned by epoll_wait(2) , and processing requests when suddenly another thread calls epoll_wait(2) with the same array, and then the events will be overwritten at the same time. when another stream reading them. Not good! You absolutely need a separate array for each thread.

Assuming you have a separate array for each thread, either the ability - waiting on the same epoll fd, or a separate epoll fd for each thread - will work equally well, but note that the semantics are different. With the globally shared epoll fd, each thread expects a request from any client, because all clients are added to the same epoll fd. When using a separate epoll fd for each thread, each thread is primarily responsible for a subset of clients (those clients that were accepted by this thread).

This may not be appropriate for your system, or it can make a huge difference. For example, it may happen that a thread is unsuccessful enough to get a group of experienced users who make heavy and frequent requests, leaving this thread congested, while other threads with less aggressive clients are almost inactive. Wouldn't that be unfair? On the other hand, perhaps you would like to have only some streams belonging to a certain class of users, in which case it might make sense to have different epoll fds for each stream. As usual, you need to consider both possibilities, evaluate a compromise, think about your specific problem and make a decision.

The following is an example of using the globally separated era of fd. I didn’t initially plan to do all this, but one thing led to another, and, well, it was fun, and I think it can help you get started. This is an echo server that listens on port 3000 and has a pool of 20 threads, using epoll to simultaneously receive new clients and serve requests.

 #include <stdio.h> #include <stdlib.h> #include <inttypes.h> #include <errno.h> #include <string.h> #include <pthread.h> #include <assert.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/epoll.h> #define SERVERPORT 3000 #define SERVERBACKLOG 10 #define THREADSNO 20 #define EVENTS_BUFF_SZ 256 static int serversock; static int epoll_fd; static pthread_t threads[THREADSNO]; int accept_new_client(void) { int clientsock; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) { return -1; } char ip_buff[INET_ADDRSTRLEN+1]; if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) { close(clientsock); return -1; } printf("*** [%p] Client connected from %s:%" PRIu16 "\n", (void *) pthread_self(), ip_buff, ntohs(addr.sin_port)); struct epoll_event epevent; epevent.events = EPOLLIN | EPOLLET; epevent.data.fd = clientsock; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) { perror("epoll_ctl(2) failed attempting to add new client"); close(clientsock); return -1; } return 0; } int handle_request(int clientfd) { char readbuff[512]; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); ssize_t n; if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) { return -1; } if (n == 0) { return 0; } readbuff[n] = '\0'; if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) { return -1; } char ip_buff[INET_ADDRSTRLEN+1]; if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) { return -1; } printf("*** [%p] [%s:%" PRIu16 "] -> server: %s", (void *) pthread_self(), ip_buff, ntohs(addr.sin_port), readbuff); ssize_t sent; if ((sent = send(clientfd, readbuff, n, 0)) < 0) { return -1; } readbuff[sent] = '\0'; printf("*** [%p] server -> [%s:%" PRIu16 "]: %s", (void *) pthread_self(), ip_buff, ntohs(addr.sin_port), readbuff); return 0; } void *worker_thr(void *args) { struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ); if (events == NULL) { perror("malloc(3) failed when attempting to allocate events buffer"); pthread_exit(NULL); } int events_cnt; while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) { int i; for (i = 0; i < events_cnt; i++) { assert(events[i].events & EPOLLIN); if (events[i].data.fd == serversock) { if (accept_new_client() == -1) { fprintf(stderr, "Error accepting new client: %s\n", strerror(errno)); } } else { if (handle_request(events[i].data.fd) == -1) { fprintf(stderr, "Error handling request: %s\n", strerror(errno)); } } } } if (events_cnt == 0) { fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?"); } else { perror("epoll_wait(2) error"); } free(events); return NULL; } int main(void) { if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { perror("socket(2) failed"); exit(EXIT_FAILURE); } struct sockaddr_in serveraddr; serveraddr.sin_family = AF_INET; serveraddr.sin_port = htons(SERVERPORT); serveraddr.sin_addr.s_addr = INADDR_ANY; if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) { perror("bind(2) failed"); exit(EXIT_FAILURE); } if (listen(serversock, SERVERBACKLOG) < 0) { perror("listen(2) failed"); exit(EXIT_FAILURE); } if ((epoll_fd = epoll_create(1)) < 0) { perror("epoll_create(2) failed"); exit(EXIT_FAILURE); } struct epoll_event epevent; epevent.events = EPOLLIN | EPOLLET; epevent.data.fd = serversock; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) { perror("epoll_ctl(2) failed on main server socket"); exit(EXIT_FAILURE); } int i; for (i = 0; i < THREADSNO; i++) { if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) { perror("pthread_create(3) failed"); exit(EXIT_FAILURE); } } /* main thread also contributes as worker thread */ worker_thr(NULL); return 0; } 

A few notes:

  • main() should return int , not void (as shown in your example)
  • Always deal with error return codes. Very often they are ignored, and when everything is difficult to understand what happened.
  • The code assumes that the request does not exceed 511 bytes (as can be seen from the buffer size in handle_request() ). If the request is larger than this, it is possible that some data remains on the socket for a very long time, because epoll_wait(2) will not report it until a new event appears in this file descriptor (because we use EPOLLET ). In the worst case, the client can never send any new data and wait forever for a response.
  • The code that prints the thread identifier for each request assumes that pthread_t is an opaque pointer type. Indeed, pthread_t is a pointer type in Linux, but it can be an integer type on other platforms, so this is not portable. However, this is probably not a problem, since epoll is specific to Linux, so the code is still not portable.
  • It is assumed that no other requests from the same client are received when the thread is still serving a request from this client. If a new request arrives at this point, and another thread starts serving it, we have a race condition, and the client does not necessarily receive the echo messages in the same order that it sent them (however, write(2) is atomic, therefore, although answers may not be in order, they will not overlap).
+13
source share

All Articles