ThreadPool optimization recommendations, correctness

Hello to all. For some time I worked on some code that created threads if necessary, but decided that creating a thread pool would be a simpler and more efficient solution. It is implemented with the queue that I did, which has conditional expectations for the sequence and dismissal. The only reason I'm posting this is because I get some strange errors randomly throughout my code that never happened before going to threadpool, which go away when I add some debug-print statements. If my code starts to work due to print statements, it sounds like a memory problem, and the stack is probably due to some bad stream code.

I thought that the first place to look would be in the streaming space for the correctness and safety of streams. Here are three main features. Threadstart is a function in which each thread is waiting on a dequeue, and an init function that spawns threads. The last is what queues the work item. The q_enq function is what will signal conditional variables that wake up certain threads and then deactivate.

void * threadstart(void *arg) { threadpool_t * tp = (threadpool_t*)arg; while (1) { workitem_t *work = q_dq(tp->workqueue); if (work == NULL) break; (*work->action)(work->arg); free(work); } pthread_exit(NULL); }; threadpool_t * threadpool_init(int max_threads, int max_workload) { threadpool_t *tp; pthread_attr_t attr; register int i=0; int rc =0; ASSERT(max_threads > 0 && max_workload > 0); tp = malloc(sizeof(threadpool_t)); tp->max_threads = max_threads; tp->threads = calloc(max_threads, sizeof(pthread_t)); tp->workqueue = q_init(max_workload); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_attr_setschedpolicy(&attr, SCHED_RR); for (; i < max_threads; i++) { rc = pthread_create(&tp->threads[i], &attr, threadstart, tp); /* worry about errors creating threads later :( */ if (rc) printf("Error creating threadpool thread %d [%d]\r\n", i, rc); } pthread_attr_destroy(&attr); return tp; } void threadpool_q_workitem(threadpool_t *tp, action_f action, void *arg) { workitem_t *item; ASSERT(tp != NULL); item = malloc(sizeof(workitem_t)); item->action = action; item->arg = arg; q_enq(tp->workqueue, (void*)item); }; 

EDIT: queue functions

 void q_enq(struct queue *q, void *data) { struct timeval now; struct timespec timeout; pthread_mutex_lock(q->mut); while (q->full) { gettimeofday(&now, (struct timezone *)0); timeout.tv_sec = now.tv_sec + Q_TIMEOUT; timeout.tv_nsec = now.tv_usec * 1000; pthread_cond_timedwait(q->notfull, q->mut, &timeout); } q->buffer[q->tail++] = data; if (q->tail == q->num) { q->tail = 0; } if (q->head == q->tail) { q->full = 1; } q->empty = 0; pthread_mutex_unlock(q->mut); pthread_cond_signal(q->notempty); } void *q_dq(struct queue *q) { void *data; int rc; struct timeval now; struct timespec timeout; pthread_mutex_lock(q->mut); while (q->empty) { gettimeofday(&now, NULL); timeout.tv_sec = now.tv_sec + Q_TIMEOUT; timeout.tv_nsec = now.tv_usec * 1000; if (q->finished) { pthread_mutex_unlock(q->mut); return NULL; } rc = pthread_cond_timedwait(q->notempty, q->mut, &timeout); if (q->finished) { pthread_mutex_unlock(q->mut); return NULL; } } data = q->buffer[q->head++]; if (q->head == q->num) { q->head = 0; } if (q->head == q->tail) { q->empty = 1; } q->full = 0; pthread_mutex_unlock(q->mut); pthread_cond_signal(q->notfull); return data; } 
+4
source share
2 answers

I think you should do something like this:

 void q_enq(struct queue *q, void *data) { int next; // wait until there room do{ pthread_mutex_lock( q->mut ); next = q->tail + 1; if( next == q->num) { next = 0; } // still room to add if( q->head != next ) break; pthread_mutex_unlock(q->mut); sched_yield(); } while( 1 ); q->tail = next; q->buffer[ q->tail ] = data; // signal consumer and unlock mutex pthread_cond_signal(q->notempty); pthread_mutex_unlock(q->mut); } void *q_dq(struct queue *q) { void *data; int rc; pthread_mutex_lock(q->mut); // while empty wait while( q->tail == q->head ){ pthread_cond_wait(q->notempty, q->mut ); } // get next and wrap data = q->buffer[q->head++]; if (q->head == q->num) { q->head = 0; } pthread_mutex_unlock(q->mut); return data; } 
+6
source

you call pthread_attr_destroy too early; do not press pthread_attr_destroy until its time to destroy the pool (i.e. keep attr around for the life of the thread (s))

+1
source

All Articles