Here is a piece of code in which a segmentation error occurs (perror is not raised):
job = malloc(sizeof(task_t)); if(job == NULL) perror("malloc");
To be more precise, gdb says that segfault happens inside the __int_malloc call, which is a routine call made by malloc .
Since the malloc function is called in parallel with other threads, initially I thought this could be a problem. I used version 2.19 of glibc.
Data Structures:
typedef struct rv_thread thread_wrapper_t; typedef struct future { pthread_cond_t wait; pthread_mutex_t mutex; long completed; } future_t; typedef struct task { future_t * f; void * data; void * (*fun)(thread_wrapper_t *, void *); } task_t; typedef struct { queue_t * queue; } pool_worker_t; typedef struct { task_t * t; } sfuture_t; struct rv_thread { pool_worker_t * pool; };
Now a future implementation:
future_t * create_future() { future_t * new_f = malloc(sizeof(future_t)); if(new_f == NULL) perror("malloc"); new_f->completed = 0; pthread_mutex_init(&(new_f->mutex), NULL); pthread_cond_init(&(new_f->wait), NULL); return new_f; } int wait_future(future_t * f) { pthread_mutex_lock(&(f->mutex)); while (!f->completed) { pthread_cond_wait(&(f->wait),&(f->mutex)); } pthread_mutex_unlock(&(f->mutex)); return 0; } void complete(future_t * f) { pthread_mutex_lock(&(f->mutex)); f->completed = 1; pthread_mutex_unlock(&(f->mutex)); pthread_cond_broadcast(&(f->wait)); }
The thread pool itself:
pool_worker_t * create_work_pool(int threads) { pool_worker_t * new_p = malloc(sizeof(pool_worker_t)); if(new_p == NULL) perror("malloc"); threads = 1; new_p->queue = create_queue(); int i; for (i = 0; i < threads; i++){ thread_wrapper_t * w = malloc(sizeof(thread_wrapper_t)); if(w == NULL) perror("malloc"); w->pool = new_p; pthread_t n; pthread_create(&n, NULL, work, w); } return new_p; } task_t * try_get_new_task(thread_wrapper_t * thr) { task_t * t = NULL; try_dequeue(thr->pool->queue, t); return t; } void submit_job(pool_worker_t * p, task_t * t) { enqueue(p->queue, t); } void * work(void * data) { thread_wrapper_t * thr = (thread_wrapper_t *) data; while (1){ task_t * t = NULL; while ((t = (task_t *) try_get_new_task(thr)) == NULL); future_t * f = t->f; (*(t->fun))(thr,t->data); complete(f); } pthread_exit(NULL); }
And finally, task.c:
pool_worker_t * create_tpool() { return (create_work_pool(8)); } sfuture_t * async(pool_worker_t * p, thread_wrapper_t * thr, void * (*fun)(thread_wrapper_t *, void *), void * data) { task_t * job = NULL; job = malloc(sizeof(task_t)); if(job == NULL) perror("malloc"); job->data = data; job->fun = fun; job->f = create_future(); submit_job(p, job); sfuture_t * new_t = malloc(sizeof(sfuture_t)); if(new_t == NULL) perror("malloc"); new_t->t = job; return (new_t); } void mywait(thread_wrapper_t * thr, sfuture_t * sf) { if (sf == NULL) return; if (thr != NULL) { while (!sf->t->f->completed) { task_t * t_n = try_get_new_task(thr); if (t_n != NULL) { future_t * f = t_n->f; (*(t_n->fun))(thr,t_n->data); complete(f); } } return; } wait_future(sf->t->f); return ; }
A queue is an unblocked lfds queue.
#define enqueue(q,t) { \ if(!lfds611_queue_enqueue(q->lq, t)) \ { \ lfds611_queue_guaranteed_enqueue(q->lq, t); \ } \ } #define try_dequeue(q,t) { \ lfds611_queue_dequeue(q->lq, &t); \ }
The problem occurs whenever the number of calls for asynchronization is very large.
Valgrind Output:
Process terminating with default action of signal 11 (SIGSEGV) ==12022== Bad permissions for mapped region at address 0x5AF9FF8 ==12022== at 0x4C28737: malloc (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)