Hello to all. For some time I worked on some code that created threads if necessary, but decided that creating a thread pool would be a simpler and more efficient solution. It is implemented with the queue that I did, which has conditional expectations for the sequence and dismissal. The only reason I'm posting this is because I get some strange errors randomly throughout my code that never happened before going to threadpool, which go away when I add some debug-print statements. If my code starts to work due to print statements, it sounds like a memory problem, and the stack is probably due to some bad stream code.
I thought that the first place to look would be in the streaming space for the correctness and safety of streams. Here are three main features. Threadstart is a function in which each thread is waiting on a dequeue, and an init function that spawns threads. The last is what queues the work item. The q_enq function is what will signal conditional variables that wake up certain threads and then deactivate.
void * threadstart(void *arg) { threadpool_t * tp = (threadpool_t*)arg; while (1) { workitem_t *work = q_dq(tp->workqueue); if (work == NULL) break; (*work->action)(work->arg); free(work); } pthread_exit(NULL); }; threadpool_t * threadpool_init(int max_threads, int max_workload) { threadpool_t *tp; pthread_attr_t attr; register int i=0; int rc =0; ASSERT(max_threads > 0 && max_workload > 0); tp = malloc(sizeof(threadpool_t)); tp->max_threads = max_threads; tp->threads = calloc(max_threads, sizeof(pthread_t)); tp->workqueue = q_init(max_workload); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_attr_setschedpolicy(&attr, SCHED_RR); for (; i < max_threads; i++) { rc = pthread_create(&tp->threads[i], &attr, threadstart, tp); if (rc) printf("Error creating threadpool thread %d [%d]\r\n", i, rc); } pthread_attr_destroy(&attr); return tp; } void threadpool_q_workitem(threadpool_t *tp, action_f action, void *arg) { workitem_t *item; ASSERT(tp != NULL); item = malloc(sizeof(workitem_t)); item->action = action; item->arg = arg; q_enq(tp->workqueue, (void*)item); };
EDIT: queue functions
void q_enq(struct queue *q, void *data) { struct timeval now; struct timespec timeout; pthread_mutex_lock(q->mut); while (q->full) { gettimeofday(&now, (struct timezone *)0); timeout.tv_sec = now.tv_sec + Q_TIMEOUT; timeout.tv_nsec = now.tv_usec * 1000; pthread_cond_timedwait(q->notfull, q->mut, &timeout); } q->buffer[q->tail++] = data; if (q->tail == q->num) { q->tail = 0; } if (q->head == q->tail) { q->full = 1; } q->empty = 0; pthread_mutex_unlock(q->mut); pthread_cond_signal(q->notempty); } void *q_dq(struct queue *q) { void *data; int rc; struct timeval now; struct timespec timeout; pthread_mutex_lock(q->mut); while (q->empty) { gettimeofday(&now, NULL); timeout.tv_sec = now.tv_sec + Q_TIMEOUT; timeout.tv_nsec = now.tv_usec * 1000; if (q->finished) { pthread_mutex_unlock(q->mut); return NULL; } rc = pthread_cond_timedwait(q->notempty, q->mut, &timeout); if (q->finished) { pthread_mutex_unlock(q->mut); return NULL; } } data = q->buffer[q->head++]; if (q->head == q->num) { q->head = 0; } if (q->head == q->tail) { q->empty = 1; } q->full = 0; pthread_mutex_unlock(q->mut); pthread_cond_signal(q->notfull); return data; }
source share