Epoll IO with Workflows in C

I am writing a small server that will receive data from several sources and process this data. The source data and the data obtained are significant, but no more than the epoll should be able to handle well enough. Nevertheless, all the data obtained must be analyzed and performed using a large number of tests that require a lot of time and will block one stream, despite multiplexing by epoll. In principle, the template should look something like this: the IO-loop receives the data and associates it with the job, sends it to the first thread available in the pool, the packet is processed by the job, and the result is transferred to the IO loop for writing to the file.

I decided to go for one input-output stream and N workflows. The input / output stream for receiving tcp connections and read data is easy to use using the example given in: http://linux.die.net/man/7/epoll

Thread is also usually simple enough to handle, but I try my best to combine the EOL epoll loop with threadpool. I can’t find a “best practice” for using epoll with a working pool on the Internet, but there are quite a few questions regarding the same topic.

So I have a question, I hope someone can help me answer:

  • Can eventfd (and should) be used as a mechanism for two-way synchronization between the input-output stream and all workers? For example, it is a good idea for each workflow to have its own epoll procedure, waiting on a general eventfd (with a structure pointer containing job data / information), i.e. Using eventfd as a job queue somehow? Perhaps there is another eventfd event to send the results back to the input / output stream from multiple worker threads?
  • After the IO stream is signaled about more data in the socket, should the actual recv take place in the I / O stream or should the worker independently display the data so as not to block the I / O stream when analyzing data frames, etc ..? In this case, how can I provide security, for example? in case recv reads 1.5 data frames in the workflow and the other workflow receives the last 0.5 data frames from the same connection?
  • If a pool of workflows is implemented through mutexes, etc., will it expect blocking of the I / O stream if N + 1 threads try to use the same lock?
  • Are there any good practices for creating a pool of workflows around a two-way epoll around epoll (i.e. from IO to workers and vice versa)?

EDIT: Can one of the possible solutions update the ring buffer from the IO cycle after the update sends the ring buffer index to the workers through a common channel for all workers (thus, giving control to this index to the first worker who reads the index from the pipe), let the employee own this index until the end of processing, and then again send the index number back to the IO stream through the pipe, thereby returning control?

My application is for Linux only, so I can use Linux-only functionality to achieve this in the most elegant way. Cross-platform support is not needed, but performance and thread safety.

+8
c multithreading linux posix epoll
source share
3 answers

When executing this model, since we know the packet size only after we have completely received the packet, unfortunately, we cannot unload the reception itself into the workflow. Instead, the best we can do is stream to receive data that will need to pass pointers to fully received packets.

The data itself is probably best stored in a circular buffer, but we need a separate buffer for each input source (if we receive a partial packet, we can continue to receive from other sources without breaking the data. How to inform workers about when a new package is ready , and give them a pointer to the data in the specified packet. Since there is little data, only some pointers in the most elegant way to do this are with posix message queues. They provide the ability for several senders and several receivers to Write and read messages, always ensuring that each message is received and exactly in one stream.

You will need a structure similar to the one below for each data source, now I will use the goals of the fields.

struct DataSource { int SourceFD; char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)]; char *LatestPacket; char *CurrentLocation int SizeLeft; }; 

SourceFD is obviously a file descriptor for the data stream in question, a DataBuffer is where the contents of the Packets are stored during processing, it is a circular buffer. The LatestPacket pointer is used to temporarily hold the pointer to the most resent packet if we receive a partial packet and switch to another source before transferring the packet. CurrentLocation stores where the last packet ends, so that we know where to place the next, or where to continue in case of partial receipt. The remaining balance is the number left in the buffer, this will be used to determine if we can fit in the package or whether we need to round back to the beginning.

Thus, the receive function is effective

  • Copy package contents to clipboard
  • Move CurrentLocation to point to end of package
  • Update Left size to account for reduced buffer
  • If we cannot put the packet at the end of the buffer, we loop around
  • If there is no place, we will try again a little later, going to another source, meanwhile
  • If we have partial storage, then the LatestPacket pointer will point to the beginning of the package and move to another stream until we stay
  • Send a message using posix queue to the workflow so that it can process data, the message will contain a pointer to the DataSource structure so that it can work on it, it also needs a pointer to the package it works on, and size, they can be calculated when we receive the package

The worker thread will do its processing using the received pointers and then increase the size of Left so that the receiver thread knows that it can continue to fill the buffer. Atomic functions are needed to work with the size value in the structure, so we do not get a race condition with the size property (as possible, it is written by the worker and the input-output stream at the same time, causing lost records, see My comments below), they are listed here and are simple and extremely useful.

Now I have given some general information, but I will specifically consider the points:

  • Using EventFD as a synchronization mechanism is pretty much a bad idea, you will find that you are using enough unnecessary processor time, and it is very difficult to perform any synchronization. In particular, if you have multiple threads, you get the same file descriptor, you may have serious problems. This is actually an unpleasant hack that will work sometimes, but is not a real substitute for proper synchronization.
  • It’s also nice to try disabling receiving, as described above, you can get around the problem with complex IPC, but frankly, it is unlikely that IO will take enough time to stop your application, your IO is also likely to be slower than the processor, so receiving with multiple threads will be few. (it is assumed that you are not saying that you have several network cards 10 gigabit cards).
  • Using mutexes or locks is a stupid idea here, it is better for blocking coding, given the low amount of (at the same time) shared data, you really just give work and data. It will also improve receive stream performance and make your application more scalable. Using the functions mentioned here http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html you can do it well and easily. If you did it this way, then you need a semaphore, it can be unlocked every time a packet is received and blocked by each thread that starts the task to allow dynamically more threads if more packets are ready, which is much less overhead than mutex households.
  • There is not much difference in the thread pool, you create a lot of threads, and then they all block mq_receive in the data message queue to wait for messages. When they are done, they send their result back to the main thread, which adds the result message queue to its epoll list. Then it can get the results in this way, it is simple and very efficient for small data like pointers. It will also use a small processor, and not make the main thread waste time managing workers.

Finally, your editing is reasonably reasonable, with the exception of what I suggested, message queues are much better than here because they signal events very efficiently, ensure that the message is fully read, and provide automatic framing.

I hope this helps, but it’s too late if I missed something or you have questions, feel free to comment on the explanation or explanation.

+3
source share

In my tests, one instance of epoll per thread was far superior to complex threading models. If listener sockets are added to all epoll instances, workers simply accept(2) , and the winner will receive the connection and process it for life.

Your employees might look something like this:

 for (;;) { nfds = epoll_wait(worker->efd, &evs, 1024, -1); for (i = 0; i < nfds; i++) ((struct socket_context*)evs[i].data.ptr)->handler( evs[i].data.ptr, evs[i].events); } 

And each file descriptor added to an epoll instance can have a struct socket_context associated with it:

 void listener_handler(struct socket_context* ctx, int ev) { struct socket_context* conn; conn->fd = accept(ctx->fd, NULL, NULL); conn->handler = conn_handler; /* add to calling worker epoll instance or implement some form * of load balancing */ } void conn_handler(struct socket_context* ctx, int ev) { /* read all available data and process. if incomplete, stash * data in ctx and continue next time handler is called */ } void dummy_handler(struct socket_context* ctx, int ev) { /* handle exit condition async by adding a pipe with its * own handler */ } 

I like this strategy because:

  • very simple design;
  • all threads are identical;
  • workers and connections are isolated - do not step on each other or do not call read(2) in the wrong employee;
  • locks are not required (the kernel is worried about synchronization on accept(2) );
  • a slightly balanced load balance, as an unoccupied worker will not actively struggle with accept(2) .

And some notes about epoll:

  • use edge mode, non-blocking sockets and always read before EAGAIN ;
  • Avoid dup(2) call families in order to get rid of some surprises (epoll registers file descriptors, but actually tracks file descriptions);
  • you can epoll_ctl(2) epoll instances of other threads safely;
  • use a large struct epoll_event buffer for epoll_wait(2) to avoid starvation.

Some other notes:

  • use accept4(2) to save the system call;
  • use one thread per core (1 for each physical if attached to the CPU, or 1 for each logical if attached to I / O);
  • poll(2) / select(2) is most likely faster if the number of connections is small.

Hope this helps.

+4
source share

I am sending the same answer in another message: I want to wait for both the file descriptor and the mutex, what is the recommended way to do this?

==================================================== ========

This is a very common problem, especially when you are developing a network server program. The main view of the main server-side Linux program will look like this:

 epoll_add(serv_sock); while(1){ ret = epoll_wait(); foreach(ret as fd){ req = fd.read(); resp = proc(req); fd.send(resp); } } 

This is a single-threaded (main thread) epoll based server. The problem is that it is single-threaded, not multi-threaded. This requires proc () to never block or run for a considerable amount of time (e.g. 10 ms for normal cases).

If proc () will work for a long time, WE NEED A LOT OF THREADS and execute proc () in a separate thread (worker thread).

We can send the task to the workflow without blocking the main thread using the message queue based on the mutex, which is fast enough.

Then we need a way to get the result of the task from the workflow. How? If we simply check the message queue directly, before or after epoll_wait (), however, the check action will be performed after epoll_wait () is completed, and epoll_wait () is usually blocked for 10 microseconds (normal cases) if all file descriptors are inactive.

For a server, 10 ms is long enough! Can we signal epoll_wait () for immediate completion when the result of the task is generated?

Yes! I will describe how this is done in one of my open source projects.

Create a tube for all workflows, and epoll waits on this channel as well. As soon as the result of the task is generated, the worker thread writes one byte to the tube, then epoll_wait () will end almost at the same time! - The Linux line has a delay of 5 to 20 seconds.

In my SSDB project (a Redis compatible compatible NoSQL database on disk) I create a SelectableQueue to send messages between the main thread and the worker threads. Like its name, SelectableQueue has a file descriptor that epoll can wait for.

SelectableQueue: https://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94

Usage in main stream:

 epoll_add(serv_sock); epoll_add(queue->fd()); while(1){ ret = epoll_wait(); foreach(ret as fd){ if(fd is worker_thread){ sock, resp = worker->pop_result(); sock.send(resp); } if(fd is client_socket){ req = fd.read(); worker->add_task(fd, req); } } } 

Use in workflow:

 fd, req = queue->pop_task(); resp = proc(req); queue->add_result(fd, resp); 
0
source share

All Articles