Is something missing or am I just not understanding epoll?

Full disclosure, I am a student, and this is the assignment. I have been working on it for more than a week with almost no stops (in addition to the previous time), and I cannot understand what I'm doing wrong. My server continues to hang on epoll_wait after only a few recvs have been made (several because I expect a few GB of data and I only get a few tens of MB). I don’t think that something is wrong with my client, because it works fine with my and multi-threaded servers. Please take a quick look and let me know if there is anything that tells you how the cause of my problem is.

The main idea of ​​the client / server is to bombard the server with connections (10k +) and transfer data several times. This epoll server has been experiencing problems since 2000, when my multi-threaded server handles only 10k target constraint.

I am NOT asking you to complete my assignment for me (I am almost done) I just need help figuring out what I'm doing wrong here. Thanks in advance for any help you can offer :)

1 #include "common.h" 2 #include <sys/epoll.h> 3 4 uint16_t ready[MAX_CONNS]; 5 uint16_t next; 6 pthread_mutex_t mutex; 7 8 void *worker_thread(void *param) { 9 int my_sock, pos; 10 struct conn_params *conn_ps = (struct conn_params *)param; 11 12 while (1) { 13 pthread_mutex_lock(&mutex); 14 15 while (1) { 16 if (next == MAX_CONNS) { 17 printf("balls\n"); 18 next = 4; 19 } 20 21 if (ready[next] != 0) { 22 pos = next; 23 my_sock = ready[pos]; 24 next++; 25 break; 26 } 27 } 28 29 pthread_mutex_unlock(&mutex); 30 /* handle recv/send */ 31 if (echo_recv(&conn_ps[my_sock], MULTIPLE) == 0) { /* closed conn */ 32 shutdown(my_sock, SHUT_RDWR); 33 close(my_sock); 34 serv_stats.active_connections--; 35 } 36 ready[pos] = 0; 37 /* print_conn_stats(&conn_ps[my_sock]);*/ 38 } 39 } 40 41 void *add_client_thread(void *param) { 42 struct epoll_accept_thread *eat = (struct epoll_accept_thread *)param; 43 struct sockaddr client; 44 struct epoll_event event; 45 socklen_t client_len; 46 int new_sock, ret; 47 char hostbuf[NI_MAXHOST], servbuf[NI_MAXSERV]; 48 49 bzero(&client, sizeof(struct sockaddr)); 50 event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; 51 52 while ((new_sock = accept(eat->listen_sock, &client, &client_len)) != -1) { 53 set_nonblock(new_sock); 54 event.data.fd = new_sock; 55 if (epoll_ctl(eat->fd_epoll, EPOLL_CTL_ADD, new_sock, &event) == -1) { 56 perror("epoll_ctl"); 57 printf("%u\n", new_sock); 58 continue; 59 } 60 61 bzero(&(eat->conn_ps[new_sock]), sizeof(struct conn_params)); 62 eat->conn_ps[new_sock].sock = new_sock; 63 if ((ret = getnameinfo(&client, client_len, hostbuf, NI_MAXHOST, servbuf, NI_MAXSERV, NI_NUMERICHOST)) != 0) { 64 gai_strerror(ret); 65 } 66 67 update_server_stats(); 68 printf("added client\n"); 69 } 70 71 if (errno != EAGAIN) { 72 perror("Couldn't accept connection"); 73 } 74 75 pthread_exit(NULL); 76 } 77 78 int main(int argc, char **argv) { 79 char opt, *port = NULL; 80 struct addrinfo hints, *results, *p; 81 int listen_sock = new_tcp_sock(), nfds, i, ret; 82 int fd_epoll, next_avail = 4; 83 struct conn_params conn_ps[MAX_CONNS]; 84 struct epoll_event evs[MAX_CONNS]; 85 struct epoll_event event; 86 struct epoll_accept_thread eat; 87 pthread_t thread; 88 89 while ((opt = getopt(argc, argv, ":l:")) != -1) { 90 switch (opt) { 91 case 'l': /* port to listen on */ 92 port = optarg; 93 break; 94 case '?': /* unknown option */ 95 fprintf(stderr, "The option -%c is not supported.\n", opt); 96 exit(1); 97 case ':': /* required arg not supplied for option */ 98 fprintf(stderr, "The option -%c requires an argument.\n", opt); 99 exit(1); 100 } 101 } /* command line arg processing done */ 102 103 if (port == NULL) { 104 fprintf(stderr, "You must provide the port to listen on (-l).\n"); 105 exit(1); 106 } 107 108 signal(SIGINT, handle_interrupt); 109 110 bzero(&hints, sizeof(struct addrinfo)); 111 hints.ai_family = AF_INET; 112 hints.ai_socktype = SOCK_STREAM; 113 hints.ai_flags = AI_PASSIVE; 114 115 set_nonblock(listen_sock); 116 set_reuseaddr(listen_sock); 117 118 if ((ret = getaddrinfo(NULL, port, &hints, &results) != 0)) { 119 gai_strerror(ret); 120 exit(1); 121 } 122 123 for (p = results; p != NULL; p = p->ai_next) { /* attempt to connect to the host */ 124 if (bind(listen_sock, p->ai_addr, p->ai_addrlen) == -1) { 125 perror("Bind failed"); 126 } else { 127 break; 128 } 129 } 130 131 if (p == NULL) { /* we were unable to connect to anything */ 132 fprintf(stderr, "Unable to bind to the specified port. Exiting...\n"); 133 exit(1); 134 } 135 136 freeaddrinfo(results); 137 138 if (listen(listen_sock, 5) == -1) { 139 perror("Listen failed"); 140 exit(1); 141 } 142 143 /* everything is set up. method-specific code goes below */ 144 145 start_server_stats(); 146 next = 4; 147 148 if ((fd_epoll = epoll_create(MAX_CONNS)) == -1) { 149 perror("epoll_create"); 150 exit(1); 151 } 152 153 event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; 154 event.data.fd = listen_sock; 155 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, listen_sock, &event) == -1) { 156 perror("epoll_ctl"); 157 exit(1); 158 } 159 160 signal(SIGPIPE, SIG_IGN); 161 bzero(ready, MAX_CONNS * sizeof(uint16_t)); 162 pthread_mutex_init(&mutex, NULL); 163 164 for (i = 0; i < 5; i++) { /* five workers should be enough */ 165 pthread_create(&thread, NULL, worker_thread, (void *)&conn_ps); 166 } 167 168 while (1) { 169 if ((nfds = epoll_wait(fd_epoll, evs, MAX_CONNS, -1)) > 0 && errno == EINTR) { 170 continue; 171 } 172 for (i = 0; i < nfds; i++) { /* loop through all FDs */ 173 if (evs[i].events & (EPOLLERR | EPOLLHUP)) { /* if there an error or a hangup */ 174 /*fprintf(stderr, "Error! Danger, Will Robinson! Danger!");*/ 175 close(evs[i].data.fd); 176 continue; 177 } else if (evs[i].data.fd == listen_sock) { /* we have a new connection coming in */ 178 eat.listen_sock = listen_sock; 179 eat.fd_epoll = fd_epoll; 180 eat.conn_ps = conn_ps; 181 pthread_create(&thread, NULL, add_client_thread, (void *)&eat); 182 } else { /* inbound data */ 183 while (ready[next_avail] != 0) { 184 next_avail++; 185 186 if (next_avail == MAX_CONNS) { 187 next_avail = 4; 188 } 189 } 190 ready[next_avail] = evs[i].data.fd; 191 } /* end inbound data */ 192 } /* end iterating through FDs */ 193 } /* end epoll_wait loop */ 194 195 perror("epoll_wait"); 196 197 return 0; 198 } 

And here is the echo_recv function, since I assume that someone will want to see this as well:

  14 int echo_recv(struct conn_params *conn_p, int single) { 15 char client_buf[CLIENT_BUF_SIZE], buffer[BUF_SIZE]; 16 int nread, nwrite, nsent = 0, i; 17 18 while ((nread = recv(conn_p->sock, client_buf, CLIENT_BUF_SIZE, 0)) > 0) { 19 /* create buffer of MULTIPLIER(int) times what was received */ 20 for (i = 0; i < MULTIPLIER && nread*i < BUF_SIZE; i++) { 21 memcpy(buffer+(nread*i), client_buf, nread); 22 } 23 24 /* send the created buffer */ 25 while ((nwrite = send(conn_p->sock, buffer+nsent, (nread*MULTIPLIER)-nsent, 0)) > 0) { 26 nsent += nwrite; 27 } 28 29 conn_p->total_recvd += nread; /* update our stats for this conn */ 30 conn_p->total_sent += nsent; /* update our status for this conn */ 31 serv_stats.total_recvd += nread; 32 serv_stats.total_sent += nsent; 33 nsent = 0; 34 35 if (single) { 36 return 1; 37 } 38 } 39 40 if (nread == -1 && (errno & EAGAIN)) { 41 return 1; 42 } 43 44 if (nread == -1) { 45 perror("wtf?"); 46 } 47 48 shutdown(conn_p->sock, SHUT_RDWR); 49 close(conn_p->sock); 50 51 return 0; /* recv failed */ 52 } 
+7
source share
2 answers

Here are a few thoughts:

  • You should really see how the shared ready array is accessed. In your workflow, you get mutexes to read it, however, there are times when you change this outside the lock, in addition, you do not acquire this lock in your polling cycle (main thread), you just write an array - this is wrong.
  • You do not save thread IDs for all worker threads, as you propose to kill them (or wait for them to finish), as a rule, you need to call pthread_join )
  • You create a separate thread to accept the connection, but again you change the general epoll_accept_thread structure in that thread - and there is no blocking around it.

First, I will fix all the synchronization problems, and then I will find other problems.

+2
source

I wanted to post this in the comment above, but he got a lot more time than it allowed:

Try implementing a simple epoll-based server that is completely asynchronous (steps)

  • Configure your receive socket ...
  • Add to epoll.
  • enter a wait loop:
    • Check if the event is plugged into a receive socket or regular socket
    • If accept socket, accept connection, add to epoll, return to 3
    • If the event is in a normal socket for reading, read X bytes, save for writing a buffer and enable the write event in epoll for the socket, go back to 3
    • If the event is for a regular socket for recording, write bytes from the buffer to the network, disable recording recording, if the recording buffer is empty, return to 3.
    • If an error occurs, remove the socket from epoll
  • There is no fourth step ... the program should go in cycles forever.

This should eliminate any complexity you added from streaming processing that can cause problems. This moves epoll back to the same domain type as select() , except that it is usually much faster. The whole idea of ​​using an event library is to know when you can read / write instead of setting the socket to non-blocking, and try to read from / write to it.

You also never check the return value from write() , which may not be executed due to SIGPIPE reception (I know that you ignored the signal, but you still get EAGAIN / EINTR errno).

Another thing I see is that you are doing a busy cycle inside your thread, which expects sockets to be ready. When you use select() or epoll , in this case you get a notification that there is something new, so you do not need to do a busy cycle ...

I'm not quite sure what you are trying to execute, but your code is extremely inefficient.

What could you do by executing a simple asynchronous example, using the steps described above, run several worker threads that all listen (using epoll) for read events on the listener / accept socket and have each of the threads handle different connections (still using that that I posted above).

+1
source

All Articles