#include "precompile.h" #include "threadpool.h" #include "list.h" #include "spinlock.h" #include "refcnt.h" #include "modCheck.h" #include #include #ifndef _WIN32 #include /*for errno compactility*/ #endif #include #include #define TAG TOOLKIT_TAG("thread") #include "dbgutil.h" struct strand_task_entry { struct list_head entry; strand_t *strand; threadpool_workitem_proc f; threadpool_workitem_proc2 f2; void *arg; param_size_t param1; param_size_t param2; }; typedef struct strand_task_entry strand_task_entry; struct strand_t { struct list_head entry_list; int working; spinlock_t lock; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; static strand_task_entry *make_task_entry(strand_t *strand, threadpool_workitem_proc f, threadpool_workitem_proc2 f2, void *arg, param_size_t param1, param_size_t param2) { strand_task_entry *task_entry = MALLOC_T(strand_task_entry); task_entry->strand = strand; task_entry->f = f; task_entry->f2 = f2; task_entry->arg = arg; task_entry->param1 = param1; task_entry->param2 = param2; if (strand) strand_inc_ref(strand); return task_entry; } static void destroy_task_entry(strand_task_entry *task_entry) { if (task_entry->strand) strand_dec_ref(task_entry->strand); free(task_entry); } TOOLKIT_API strand_t *strand_create() { strand_t *strand = MALLOC_T(strand_t); INIT_LIST_HEAD(&strand->entry_list); REF_COUNT_INIT(&strand->ref_cnt); spinlock_init(&strand->lock); strand->working = 0; return strand; } TOOLKIT_API void strand_lock(strand_t *strand) { spinlock_enter(&strand->lock, 0); } TOOLKIT_API void strand_unlock(strand_t *strand) { spinlock_leave(&strand->lock); } static void __strand_destroy(strand_t *strand) { TOOLKIT_ASSERT(list_empty(&strand->entry_list)); free(strand); } IMPLEMENT_REF_COUNT_MT(strand, strand_t, ref_cnt, __strand_destroy) static void sp_strand_enqueue(strand_t *strand, strand_task_entry *task_entry) { list_add_tail(&task_entry->entry, &strand->entry_list); } static strand_task_entry *sp_strand_dequeue(strand_t *strand) { strand_task_entry *task_entry = NULL; if (!list_empty(&strand->entry_list)) { task_entry = list_first_entry(&strand->entry_list, strand_task_entry, entry); list_del(&task_entry->entry); } return task_entry; } static void sp_strand_work(void *threadpool, strand_task_entry *task_entry) { strand_t *strand = task_entry->strand; if (strand) { strand_inc_ref(strand); strand_lock(strand); if (strand->working == 0) { strand->working = 1; while (task_entry) { strand_unlock(strand); if (task_entry->f) { task_entry->f(threadpool, task_entry->arg); } else if (task_entry->f2) { task_entry->f2(threadpool, task_entry->arg, task_entry->param1, task_entry->param2); } destroy_task_entry(task_entry); strand_lock(strand); task_entry = sp_strand_dequeue(strand); } strand->working = 0; } else { sp_strand_enqueue(strand, task_entry); } strand_unlock(strand); strand_dec_ref(strand); } else { if (task_entry->f) { task_entry->f(threadpool, task_entry->arg); } else if (task_entry->f2) { task_entry->f2(threadpool, task_entry->arg, task_entry->param1, task_entry->param2); } destroy_task_entry(task_entry); } } struct threadpool_t { LONG lstop; DWORD tmp_thread_ttl; // thread max free time that can hold, in millisecond DWORD num_fix_thread; // fixed threads in the pool DWORD max_tmp_thread; // max threads in the pool DWORD stack_size; volatile DWORD curr_fix_threads; // current fixed thread in the pool volatile DWORD curr_tmp_threads; // current active temp thread count HANDLE workitem_sem; volatile DWORD freethread_count; //HANDLE freethread_sem; CRITICAL_SECTION lock; threadpool_decorator_callback decorator_init_cb; threadpool_decorator_callback decorator_term_cb; void *decorator_cb_user_data; void *user_data; struct list_head workitem_list; uint32_t workitem_cnt; log_func log; }; static void threadpool_lock(threadpool_t *threadpool) { EnterCriticalSection(&threadpool->lock); } static void threadpool_unlock(threadpool_t *threadpool) { LeaveCriticalSection(&threadpool->lock); } static void __threadpool_log(threadpool_t *threadpool, const char *fmt, va_list arg) { int n; n = _vscprintf(fmt, arg); if (n > 0) { char *buf = _alloca(n+1); vsprintf(buf, fmt, arg); threadpool->log(threadpool, buf); } } static void threadpool_log(threadpool_t *threadpool, const char *fmt, ...) { if (threadpool->log) { va_list arg; va_start(arg, fmt); __threadpool_log(threadpool, fmt, arg); va_end(arg); } } static void enqueue_workitem(threadpool_t *threadpool, strand_task_entry *task_entry, int fifo) //add a workItem { threadpool_lock(threadpool); if (fifo) { list_add_tail(&task_entry->entry, &threadpool->workitem_list); } else { list_add(&task_entry->entry, &threadpool->workitem_list); } threadpool->workitem_cnt++; threadpool_unlock(threadpool); } static strand_task_entry* dequeue_workitem(threadpool_t *threadpool) //remove a workItem { strand_task_entry *task_entry; threadpool_lock(threadpool); task_entry = list_first_entry(&threadpool->workitem_list, strand_task_entry, entry); list_del(&task_entry->entry); threadpool->workitem_cnt--; threadpool_unlock(threadpool); return task_entry; } static unsigned int __stdcall fix_thread_proc(void *param) { threadpool_t *threadpool = (threadpool_t *)param; #ifdef _WIN32 toolkit_setThreadGroupByAssign(threadpool); #endif //_WIN32 if (threadpool->decorator_init_cb) threadpool->decorator_init_cb(threadpool, threadpool->decorator_cb_user_data); InterlockedIncrement(&threadpool->freethread_count); while (!threadpool->lstop) { DWORD ret = WaitForSingleObject(threadpool->workitem_sem, threadpool->lstop ? 0 : INFINITE); if (ret == WAIT_OBJECT_0) { InterlockedDecrement(&threadpool->freethread_count); { strand_task_entry *task_entry = dequeue_workitem(threadpool); sp_strand_work(threadpool, task_entry); } InterlockedIncrement(&threadpool->freethread_count); } else if (ret == WAIT_TIMEOUT) { break; } else { TOOLKIT_ASSERT(0); } } InterlockedDecrement(&threadpool->freethread_count); InterlockedDecrement((LONG*)&threadpool->curr_fix_threads); if (threadpool->decorator_term_cb) threadpool->decorator_term_cb(threadpool, threadpool->decorator_cb_user_data); return 0; } static unsigned int __stdcall tmp_thread_proc(void *param) { threadpool_t *threadpool = (threadpool_t *)param; WLog_DBG(TAG, "tmp_thread_proc enter id(%u)", GetCurrentThreadId()); #ifdef _WIN32 toolkit_setThreadGroupByAssign(threadpool); #endif //_WIN32 if (threadpool->decorator_init_cb) threadpool->decorator_init_cb(threadpool, threadpool->decorator_cb_user_data); InterlockedIncrement(&threadpool->freethread_count); while (!threadpool->lstop) { DWORD ret = 0; WLog_DBG(TAG, "to wait workitem sem and will to exec! id(%u)", GetCurrentThreadId()); ret = WaitForSingleObject(threadpool->workitem_sem, threadpool->lstop ? 0 : threadpool->tmp_thread_ttl); if (ret == WAIT_OBJECT_0) { InterlockedDecrement(&threadpool->freethread_count); { strand_task_entry* task_entry = dequeue_workitem(threadpool); WLog_DBG(TAG, "get a task entry and exec! id(%u)", GetCurrentThreadId()); sp_strand_work(threadpool, task_entry); WLog_DBG(TAG, "task entry done, to wait next task! id(%u)", GetCurrentThreadId()); } InterlockedIncrement(&threadpool->freethread_count); } else if (ret == WAIT_TIMEOUT) { WLog_DBG(TAG, "tmp thread ttl detected, begin exit!"); break; } else { TOOLKIT_ASSERT(0); } } InterlockedDecrement(&threadpool->freethread_count); InterlockedDecrement((LONG*)&threadpool->curr_tmp_threads); if (threadpool->decorator_term_cb) threadpool->decorator_term_cb(threadpool, threadpool->decorator_cb_user_data); WLog_DBG(TAG, "tmp_thread_proc leave! id(%u)", GetCurrentThreadId()); return 0; } TOOLKIT_API int threadpool_create(threadpool_t **p_threadpool) { threadpool_t *threadpool = ZALLOC_T(threadpool_t); InitializeCriticalSection(&threadpool->lock); threadpool->workitem_cnt = 0; *p_threadpool = threadpool; return 0; } TOOLKIT_API int threadpool_destroy(threadpool_t *threadpool) { DeleteCriticalSection(&threadpool->lock); free(threadpool); return 0; } TOOLKIT_API int threadpool_set_thread_decorator(threadpool_t *threadpool, threadpool_decorator_callback dc_init, threadpool_decorator_callback dc_term, void *user_data) { threadpool->decorator_init_cb = dc_init; threadpool->decorator_term_cb = dc_term; threadpool->decorator_cb_user_data = user_data; return 0; } TOOLKIT_API int threadpool_start(threadpool_t *threadpool, int num_fix_thread, int max_tmp_thread, int tmp_thread_ttl, int stack_size) { //TOOLKIT_ASSERT(threadpool->freethread_sem == NULL); TOOLKIT_ASSERT(threadpool->workitem_sem == NULL); threadpool->lstop = 0; threadpool->num_fix_thread = num_fix_thread; threadpool->max_tmp_thread = max_tmp_thread; threadpool->stack_size = stack_size; if (num_fix_thread == 0 && max_tmp_thread == 0) threadpool->max_tmp_thread = 1; threadpool->tmp_thread_ttl = tmp_thread_ttl; INIT_LIST_HEAD(&threadpool->workitem_list); threadpool->workitem_cnt = 0; threadpool->curr_fix_threads = 0; threadpool->curr_tmp_threads = 0; /*threadpool->freethread_sem = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL); if (!threadpool->freethread_sem) return -1;*/ WLog_DBG(TAG, "%p: %d, %d", threadpool, num_fix_thread, max_tmp_thread); threadpool->workitem_sem = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL); for (threadpool->curr_fix_threads = 0; threadpool->curr_fix_threads < threadpool->num_fix_thread; ++threadpool->curr_fix_threads) { DWORD dwCreationFlags = threadpool->stack_size ? STACK_SIZE_PARAM_IS_A_RESERVATION : 0; HANDLE hThread = NULL; #ifdef _WIN32 toolkit_setAssign(threadpool); #endif //_WIN32 hThread = (HANDLE)_beginthreadex(NULL, threadpool->stack_size, &fix_thread_proc, threadpool, dwCreationFlags, NULL); if (hThread) { CloseHandle(hThread); } else { return -1; } } return 0; } TOOLKIT_API int threadpool_stop(threadpool_t *threadpool) { DWORD i, t; InterlockedExchange((LONG*)&threadpool->lstop, 1); t = threadpool->curr_tmp_threads + threadpool->curr_fix_threads; for (i = 0; i < t; ++i) { strand_task_entry *task_entry = make_task_entry(NULL, NULL, NULL, NULL, 0, 0); // use empty item to awake blocked thread to exit enqueue_workitem(threadpool, task_entry, 1); ReleaseSemaphore(threadpool->workitem_sem, 1, NULL); } while (threadpool->curr_tmp_threads + threadpool->curr_fix_threads) Sleep(1); /* to see that any pending pWorkItem, so need to delete them(these items all are empty item. why?) */ while (WaitForSingleObject(threadpool->workitem_sem, 0) == WAIT_OBJECT_0) { //destory all the workItem strand_task_entry *task_entry = dequeue_workitem(threadpool); destroy_task_entry(task_entry); } return 0; } static int __threadpool_queue_workitem(threadpool_t *threadpool, strand_t *strand, threadpool_workitem_proc workitem, threadpool_workitem_proc2 workitem2, void *arg, param_size_t param1, param_size_t param2, int fifo) { strand_task_entry *task_entry = NULL; DWORD ret; if (!workitem && !workitem2) return -1; if (threadpool->lstop) return -1; WLog_DBG(TAG, "%p: __threadpool_queue_workitem fifo:%d", threadpool, fifo); task_entry = make_task_entry(strand, workitem, workitem2, arg, param1, param2); enqueue_workitem(threadpool, task_entry, fifo); ReleaseSemaphore(threadpool->workitem_sem, 1, NULL); if (threadpool->freethread_count >= threadpool->workitem_cnt) { WLog_DBG(TAG, "let free thread to exec %u %u", threadpool->freethread_count, threadpool->workitem_cnt); } else if (threadpool->curr_tmp_threads < threadpool->max_tmp_thread) { HANDLE hThread = NULL; #ifdef _WIN32 toolkit_setAssign(threadpool); #endif //_WIN32 hThread = (HANDLE)_beginthreadex(NULL, 0, &tmp_thread_proc, threadpool, 0, NULL); if (hThread) { WLog_DBG(TAG, "alloc a new thread ok!"); CloseHandle(hThread); InterlockedIncrement(&threadpool->curr_tmp_threads); WLog_DBG(TAG, "curr tmp thread: %d, max tmp thread: %d", threadpool->curr_tmp_threads, threadpool->max_tmp_thread); } else { WLog_DBG(TAG, "alloc a new thread failed, errno: %d, last error: %d", errno, GetLastError()); return -1; } } else { WLog_WARN(TAG, "curr tmp thread: %d, max tmp thread %d, curr fix threads: %d, max fix thread: %d, workitem count: %d" , threadpool->curr_tmp_threads, threadpool->max_tmp_thread, threadpool->curr_fix_threads, threadpool->num_fix_thread, threadpool->workitem_cnt); } return 0; } TOOLKIT_API int threadpool_queue_workitem(threadpool_t *threadpool, strand_t *strand, threadpool_workitem_proc workitem, void *arg) { return __threadpool_queue_workitem(threadpool, strand, workitem, NULL, arg, 0, 0, 1); } TOOLKIT_API int threadpool_queue_workitem2(threadpool_t *threadpool, strand_t *strand, threadpool_workitem_proc2 workitem, void *arg, param_size_t param1, param_size_t param2) { return __threadpool_queue_workitem(threadpool, strand, NULL, workitem, arg, param1, param2, 1); } TOOLKIT_API int threadpool_post_workitem_lifo(threadpool_t *threadpool, strand_t *strand, threadpool_workitem_proc workitem, void *arg) { return __threadpool_queue_workitem(threadpool, strand, workitem, NULL, arg, 0, 0, 0); } TOOLKIT_API int threadpool_post_workitem_lifo2(threadpool_t *threadpool, strand_t *strand, threadpool_workitem_proc2 workitem, void *arg, param_size_t param1, param_size_t param2) { return __threadpool_queue_workitem(threadpool, strand, NULL, workitem, arg, param1, param2, 0); } TOOLKIT_API void threadpool_set_user_data(threadpool_t *threadpool, void *user_data) { threadpool->user_data = user_data; } TOOLKIT_API void *threadpool_get_user_data(threadpool_t *threadpool) { return threadpool->user_data; } TOOLKIT_API void threadpool_set_log(threadpool_t *threadpool, log_func func) { threadpool->log = func; }