#include "stdafx.h" #include "SpBase.h" #include "SpMisc.h" #include "SpAsyncWait.h" #include "SpEntity.h" #include "sp_env.h" #include "sp_cfg.h" #include "dbgutil.h" // // SpAsyncWait // SpAsyncWait::SpAsyncWait(SpEntity *pEntity) : m_sem(::CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL)) , m_total_pkt(0) , m_bCancel(0) , m_bEnd(0) ,m_pCallback(NULL) , m_pEntity(pEntity) , m_evt_cancel(::CreateEventA(NULL, TRUE, FALSE, NULL)) , m_curr_pkt(NULL) , m_callback_strand(strand_create()) ,m_pRefCount(NULL) { //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("SpAsyncWait()"); m_pRefCount = new LONG(1); INIT_LIST_HEAD(&m_pending_pkt_list); INIT_LIST_HEAD(&m_pending_callback_list); spinlock_init(&m_lock); } SpAsyncWait::~SpAsyncWait() { //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("~SpAsyncWait()"); if (m_sem) CloseHandle(m_sem); if (m_evt_cancel) CloseHandle(m_evt_cancel); pending_pkt *pos, *n; list_for_each_entry_safe(pos, n, &m_pending_pkt_list, pending_pkt, entry) { list_del_init(&pos->entry); delete pos; } strand_destroy(m_callback_strand); if (m_curr_pkt) delete m_curr_pkt; } ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD dwTimeout) { DWORD dwUserError = 0; CSimpleString str; return WaitAnswer(dwUserError, str, dwTimeout); } ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD &dwUserError, CSimpleString &str, DWORD dwTimeout) { ErrorCodeEnum Error; dwUserError = 0; HANDLE hs[] = {m_sem, m_evt_cancel}; if (dwTimeout == INFINITE) dwTimeout = 10000; DWORD dwRet = ::WaitForMultipleObjects(2, &hs[0], FALSE, dwTimeout); if (dwRet == WAIT_TIMEOUT) { Error = Error_TimeOut; } else if (dwRet == WAIT_OBJECT_0) { if (m_curr_pkt) { delete m_curr_pkt; m_curr_pkt = NULL; } spinlock_enter(&m_lock, -1); m_curr_pkt = list_first_entry(&m_pending_pkt_list, pending_pkt, entry); list_del_init(&m_curr_pkt->entry); spinlock_leave(&m_lock); Error = SpTranslateError(m_curr_pkt->dwSysError); dwUserError = m_curr_pkt->dwUserError; str = m_curr_pkt->str; } else if (dwRet == WAIT_OBJECT_0+1) { Error = Error_Cancel; } else { Error = Error_Unexpect; } OnAnswerWaited(Error); return Error; } ErrorCodeEnum WaitMultiAnswers(CAutoArray arrAsynWaits, DWORD &dwRetIndex, bool bWaitAll, DWORD dwTimeout) { if (dwTimeout == INFINITE) dwTimeout = 10000; int nCount = arrAsynWaits.GetCount(); if (nCount == 0) return Error_Param; else if (nCount >32) return Error_Overflow; HANDLE *pHandles = NULL; if (bWaitAll) pHandles = new HANDLE[nCount]; else pHandles = new HANDLE[nCount*2]; for(int i=0; i(arrAsynWaits[i]); TOOLKIT_ASSERT(pAsynWait != NULL); pHandles[i] = pAsynWait->m_sem; if (!bWaitAll) pHandles[i+nCount] = pAsynWait->m_evt_cancel; } dwRetIndex = -1; ErrorCodeEnum Error = Error_Succeed; DWORD dwRet = ::WaitForMultipleObjects(bWaitAll ? nCount : nCount*2, pHandles, bWaitAll, dwTimeout); if (dwRet == WAIT_TIMEOUT) { Error = Error_TimeOut; } else if (dwRet >= WAIT_OBJECT_0 && dwRet < WAIT_OBJECT_0 + nCount ) { dwRetIndex = dwRet - WAIT_OBJECT_0; for(int i=0; i(arrAsynWaits[i]); TOOLKIT_ASSERT(pAsynWait != NULL); if (pAsynWait->m_curr_pkt) { delete pAsynWait->m_curr_pkt; pAsynWait->m_curr_pkt = NULL; } spinlock_enter(&pAsynWait->m_lock, -1); pAsynWait->m_curr_pkt = list_first_entry(&pAsynWait->m_pending_pkt_list, SpAsyncWait::pending_pkt, entry); list_del_init(&pAsynWait->m_curr_pkt->entry); spinlock_leave(&pAsynWait->m_lock); Error = SpTranslateError(pAsynWait->m_curr_pkt->dwSysError); } } } else if (dwRet >= WAIT_OBJECT_0+nCount && dwRet < WAIT_OBJECT_0 + nCount*2 ) { Error = Error_Cancel; dwRetIndex = dwRet - WAIT_OBJECT_0 - nCount; } else { Error = Error_Unexpect; } return Error; } ErrorCodeEnum SpAsyncWait::CancelWait() { m_bCancel = TRUE; Close(); SetEvent(m_evt_cancel); return Error_Succeed; } bool SpAsyncWait::IsPending() { return m_total_pkt == 0; } ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd) { DWORD dwUserError = 0; CSimpleString str; return AsyncGetAnswer(ReceiveBuffer, bEnd, dwUserError, str); } ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd, DWORD &dwUserError, CSimpleString& str) { dwUserError = 0; if (m_bCancel) { return Error_Cancel; } else { if (m_curr_pkt) { if (m_curr_pkt->pkt) { int read_state; read_state = iobuffer_get_read_state(m_curr_pkt->pkt); { int size = iobuffer_get_length(m_curr_pkt->pkt); ReceiveBuffer.Init(size); if (size) iobuffer_read(m_curr_pkt->pkt, IOBUF_T_BUF, &ReceiveBuffer[0], &size); } iobuffer_restore_read_state(m_curr_pkt->pkt, read_state); } bEnd = !!m_curr_pkt->end; dwUserError = m_curr_pkt->dwUserError; str = m_curr_pkt->str; return SpTranslateError(m_curr_pkt->dwSysError); } } return Error_Unexpect; } ErrorCodeEnum SpAsyncWait::AsyncGetAnswer() { if (m_bCancel) { return Error_Cancel; } else { if (m_curr_pkt) { return SpTranslateError(m_curr_pkt->dwSysError); } } return Error_Unexpect; } ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(DWORD& dwUserError, CSimpleString& str) { CAutoBuffer receiveBuffer; bool bEnd = false; return AsyncGetAnswer(receiveBuffer, bEnd, dwUserError, str); } ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(DWORD& dwUserError) { CAutoBuffer receiveBuffer; CSimpleString str; bool bEnd = false; return AsyncGetAnswer(receiveBuffer, bEnd, dwUserError, str); } void SpAsyncWait::ReceiveAnsPkt(int error, int end, iobuffer_t **ans_pkt, bool bReadUserCode) { pending_pkt *p = new pending_pkt(); ZeroMemory(p, sizeof(pending_pkt)); p->end = end; if (ans_pkt) { p->pkt = *ans_pkt; *ans_pkt = NULL; } else { p->pkt = NULL; } p->dwSysError = error; if (!p->dwSysError) { if (p->pkt) { iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwSysError, 0); // sys error if (bReadUserCode) { iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwUserError, 0); // user error if (p->dwUserError) { char* tmp = NULL; int slen; iobuffer_read(p->pkt, IOBUF_T_STR, NULL, &slen); tmp = (char*)malloc(slen + 1); iobuffer_read(p->pkt, IOBUF_T_STR, tmp, NULL); p->str = tmp; if (tmp) free(tmp); } } } } //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("dwSysError: %d, dwUserCode: %d", p->dwSysError, p->dwUserError); if (end || error) { m_bEnd = TRUE; } spinlock_enter(&m_lock, -1); list_add_tail(&p->entry, &m_pending_pkt_list); m_total_pkt++; if (m_pCallback) { // does set callback wait_callback_entry *wce = new wait_callback_entry(); wce->cnt++; IncrementRef(); list_add_tail(&wce->entry, &m_pending_callback_list); threadpool_queue_workitem2( sp_svc_get_threadpool(m_pEntity->get_svc()) , m_callback_strand , &__threadpool_on_callback , this , (param_size_t)wce , NULL); } else { //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("callback is not existed !"); } spinlock_leave(&m_lock); ReleaseSemaphore(m_sem, 1, NULL); } CSmartPointer SpAsyncWait::GetCallContext() { CSmartPointer ret; spinlock_enter(&m_lock, -1); ret = m_pCallbackContext; spinlock_leave(&m_lock); return ret; } bool SpAsyncWait::GetCallback(CSmartPointer &pCallbackListener,CSmartPointer &pContext) { bool ok = false; spinlock_enter(&m_lock, -1); if (m_pCallback) { pCallbackListener = CSmartPointer(m_pCallback); pContext = m_pCallbackContext; ok = true; } spinlock_leave(&m_lock); return ok; } void SpAsyncWait::SetCallback(ICallbackListener *pCallback,IReleasable *pContext) { spinlock_enter(&m_lock, -1); if (pCallback) { // install callback TOOLKIT_ASSERT(m_pCallback == NULL); m_pCallback = pCallback; m_pCallbackContext.Attach(pContext); IncrementRef(); // xkm@20150115 int cnt = 0; pending_pkt *pos; list_for_each_entry(pos, &m_pending_pkt_list, pending_pkt, entry) { cnt++; } // if package already return , callback instantly if (cnt >0) { wait_callback_entry *wce = new wait_callback_entry(); wce->cnt = cnt; IncrementRef(); list_add_tail(&wce->entry, &m_pending_callback_list); threadpool_queue_workitem2( sp_svc_get_threadpool(m_pEntity->get_svc()) , m_callback_strand , &__threadpool_on_callback , this , (param_size_t)wce , NULL); } } else { // cancel callback if (m_pCallback != NULL) // xkm@20150115 DecrementRef(); m_pCallback = NULL; // {bug} should set NULL wait_callback_entry *pos; list_for_each_entry(pos, &m_pending_callback_list, wait_callback_entry, entry) { pos->cancel++; } } spinlock_leave(&m_lock); } void SpAsyncWait::threadpool_on_callback(SpAsyncWait::wait_callback_entry *wce) { spinlock_enter(&m_lock, -1); if (!wce->cancel) { for (int i = 0; i < wce->cnt; ++i) { ErrorCodeEnum Error = WaitAnswer(INFINITE); if (Error != Error_Cancel || !m_bCancel) { m_pCallback->OnAnswer(this); } else { break; // user cancel } } } list_del(&wce->entry); spinlock_leave(&m_lock); DecrementRef(); delete wce; if (m_bEnd) { m_pCallback = NULL; DecrementRef(); // xkm@20150115 } } void SpAsyncWait::__threadpool_on_callback(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2) { SpAsyncWait *pThis = static_cast(arg); wait_callback_entry *wce = (wait_callback_entry *)param1; pThis->threadpool_on_callback(wce); } // // SpAsyncWaitRPC // SpAsyncWaitRPC::SpAsyncWaitRPC( SpEntity *pEntity, iobuffer_t **req_pkt, int call_type, bool fetch_user) : SpAsyncWait(pEntity), m_req_pkt(*req_pkt), m_call_type(call_type), m_rpc(NULL),m_user(fetch_user) { //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("SpAsyncWaitRPC()"); //LOG_FUNCTION(); *req_pkt = NULL; } SpAsyncWaitRPC::~SpAsyncWaitRPC() { //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("~SpAsyncWaitRPC()"); //LOG_FUNCTION(); if (m_req_pkt) iobuffer_dec_ref(m_req_pkt); if (m_rpc) sp_rpc_client_destroy(m_rpc); } ErrorCodeEnum SpAsyncWaitRPC::Begin(const void *arg) { int rc; sp_rpc_client_callback cb; cb.on_ans = &__on_ans; cb.on_destroy = NULL; cb.user_data = this; if (arg == NULL) rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, m_call_type, &cb, &m_rpc); else { // find entity by name auto pEnv = sp_get_env(); auto pEntity = sp_cfg_get_entity_by_name(pEnv->cfg, (const char*)arg); if (pEntity == NULL) return Error_NotExist; rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), pEntity->mod->idx, pEntity->idx, m_call_type, &cb, &m_rpc); } if (rc != 0) return SpTranslateError(rc); IncrementRef(); rc = sp_rpc_client_async_call(m_rpc, &m_req_pkt); if (rc != 0) { sp_rpc_client_destroy(m_rpc); m_rpc = NULL; DecrementRef(); } return SpTranslateError(rc); } ErrorCodeEnum SpAsyncWaitRPC::Close() { int rc; if (m_rpc) { rc = sp_rpc_client_close(m_rpc); } else { rc = Error_NotInit; } return SpTranslateError(rc); } void SpAsyncWaitRPC::__on_ans( sp_rpc_client_t *client, int error, iobuffer_t **ans_pkt, void *user_data ) { SpAsyncWaitRPC *pThis = static_cast(user_data); pThis->on_ans(error, ans_pkt); } void SpAsyncWaitRPC::on_ans( int error, iobuffer_t **ans_pkt ) { ReceiveAnsPkt(error, 1, ans_pkt, m_user); DecrementRef(); // xkm@20150115 } // // SpAsyncWaitTsx // SpAsyncWaitTsx::SpAsyncWaitTsx(SpEntity *pEntity, sp_ses_uac_t *uac, int timeout, int method_id, int method_sig, CAutoBuffer Buffer, int tsx_id) : SpAsyncWait(pEntity), m_tsx(NULL), m_timeout(timeout), m_method_id(method_id), m_method_sig(method_sig), m_req_pkt(NULL), m_tsx_id(tsx_id) { //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("SpAsyncWaitTsx()"); //LOG_FUNCTION(); int rc; sp_tsx_uac_callback cb; cb.on_ans = &__on_ans; cb.on_destroy = NULL; cb.user_data = this; rc = sp_tsx_uac_create(uac, m_tsx_id, m_method_id, m_method_sig, &cb, &m_tsx); if (rc != 0) { m_tsx = NULL; } else { m_req_pkt = iobuffer_create(-1, Buffer.GetCount() + 32); if (Buffer.GetCount() > 0) { iobuffer_write(m_req_pkt, IOBUF_T_BUF, &Buffer[0], Buffer.GetCount()); } } } SpAsyncWaitTsx::~SpAsyncWaitTsx() { //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("~SpAsyncWaitTsx()"); //LOG_FUNCTION(); if (m_tsx) { sp_tsx_uac_close(m_tsx); sp_tsx_uac_destroy(m_tsx); } if (m_req_pkt) iobuffer_dec_ref(m_req_pkt); } ErrorCodeEnum SpAsyncWaitTsx::Begin(const void *arg) { int rc; if (!m_tsx) return Error_NetBroken; IncrementRef(); // xkm@20150115 rc = sp_tsx_uac_async_req(m_tsx, m_timeout, &m_req_pkt); if (rc != 0) { sp_tsx_uac_destroy(m_tsx); m_tsx = NULL; DecrementRef(); // xkm@20150115 } return SpTranslateError(rc); } ErrorCodeEnum SpAsyncWaitTsx::Close() { int rc; if (m_tsx) { rc = sp_tsx_uac_close(m_tsx); } else { rc = Error_Unexpect; } return SpTranslateError(rc); } ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD dwTimeout) { // use constructor timeout first DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout; return SpAsyncWait::WaitAnswer(dwWaitTime); } ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD &dwUserError, CSimpleString& str, DWORD dwTimeout) { // use constructor timeout first DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout; return SpAsyncWait::WaitAnswer(dwUserError, str, dwWaitTime); } void SpAsyncWaitTsx::__on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data) { SpAsyncWaitTsx *pThis = static_cast(user_data); pThis->on_ans(error, end, ans_pkt); } void SpAsyncWaitTsx::on_ans(int error, int end, iobuffer_t **ans_pkt) { ReceiveAnsPkt(error, end, ans_pkt, true); if (error || end) { Close(); DecrementRef(); // xkm@20150115 } } void SpAsyncWaitTsx::setLinkContext(linkContext& tmpContext) { if (m_req_pkt) { iobuffer_set_linkInfo(m_req_pkt, tmpContext.bussinessId.GetData(), tmpContext.traceId.GetData(), tmpContext.spanId.GetData(), tmpContext.parentSpanId.GetData()); } }