|
- #include "stdafx.h"
- #include "SpBase.h"
- #include "SpMisc.h"
- #include "SpAsyncWait.h"
- #include "SpEntity.h"
- #include "sp_env.h"
- #include "sp_cfg.h"
- #include "dbgutil.h"
- //
- // SpAsyncWait
- //
- SpAsyncWait::SpAsyncWait(SpEntity *pEntity)
- : m_sem(::CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL))
- , m_total_pkt(0)
- , m_bCancel(0)
- , m_bEnd(0)
- ,m_pCallback(NULL)
- , m_pEntity(pEntity)
- , m_evt_cancel(::CreateEventA(NULL, TRUE, FALSE, NULL))
- , m_curr_pkt(NULL)
- , m_callback_strand(strand_create())
- ,m_pRefCount(NULL)
- {
- //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("SpAsyncWait()");
- m_pRefCount = new LONG(1);
- INIT_LIST_HEAD(&m_pending_pkt_list);
- INIT_LIST_HEAD(&m_pending_callback_list);
- spinlock_init(&m_lock);
- }
- SpAsyncWait::~SpAsyncWait()
- {
- //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("~SpAsyncWait()");
- if (m_sem)
- CloseHandle(m_sem);
- if (m_evt_cancel)
- CloseHandle(m_evt_cancel);
- pending_pkt *pos, *n;
- list_for_each_entry_safe(pos, n, &m_pending_pkt_list, pending_pkt, entry) {
- list_del_init(&pos->entry);
- delete pos;
- }
- strand_destroy(m_callback_strand);
- if (m_curr_pkt)
- delete m_curr_pkt;
- }
- ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD dwTimeout)
- {
- DWORD dwUserError = 0;
- CSimpleString str;
- return WaitAnswer(dwUserError, str, dwTimeout);
- }
- ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD &dwUserError, CSimpleString &str, DWORD dwTimeout)
- {
- ErrorCodeEnum Error;
- dwUserError = 0;
- HANDLE hs[] = {m_sem, m_evt_cancel};
- if (dwTimeout == INFINITE)
- dwTimeout = 10000;
- DWORD dwRet = ::WaitForMultipleObjects(2, &hs[0], FALSE, dwTimeout);
- if (dwRet == WAIT_TIMEOUT)
- {
- Error = Error_TimeOut;
- }
- else if (dwRet == WAIT_OBJECT_0)
- {
- if (m_curr_pkt)
- {
- delete m_curr_pkt;
- m_curr_pkt = NULL;
- }
- spinlock_enter(&m_lock, -1);
- m_curr_pkt = list_first_entry(&m_pending_pkt_list, pending_pkt, entry);
- list_del_init(&m_curr_pkt->entry);
- spinlock_leave(&m_lock);
- Error = SpTranslateError(m_curr_pkt->dwSysError);
- dwUserError = m_curr_pkt->dwUserError;
- str = m_curr_pkt->str;
- }
- else if (dwRet == WAIT_OBJECT_0+1)
- {
- Error = Error_Cancel;
- }
- else
- {
- Error = Error_Unexpect;
- }
- OnAnswerWaited(Error);
- return Error;
- }
- ErrorCodeEnum WaitMultiAnswers(CAutoArray<IAsynWaitSp *> arrAsynWaits, DWORD &dwRetIndex, bool bWaitAll, DWORD dwTimeout)
- {
- if (dwTimeout == INFINITE)
- dwTimeout = 10000;
- int nCount = arrAsynWaits.GetCount();
- if (nCount == 0)
- return Error_Param;
- else if (nCount >32)
- return Error_Overflow;
- HANDLE *pHandles = NULL;
- if (bWaitAll)
- pHandles = new HANDLE[nCount];
- else
- pHandles = new HANDLE[nCount*2];
- for(int i=0; i<nCount; i++)
- {
- auto pAsynWait = dynamic_cast<SpAsyncWait*>(arrAsynWaits[i]);
- TOOLKIT_ASSERT(pAsynWait != NULL);
- pHandles[i] = pAsynWait->m_sem;
- if (!bWaitAll)
- pHandles[i+nCount] = pAsynWait->m_evt_cancel;
- }
- dwRetIndex = -1;
- ErrorCodeEnum Error = Error_Succeed;
-
- DWORD dwRet = ::WaitForMultipleObjects(bWaitAll ? nCount : nCount*2, pHandles, bWaitAll, dwTimeout);
- if (dwRet == WAIT_TIMEOUT)
- {
- Error = Error_TimeOut;
- }
- else if (dwRet >= WAIT_OBJECT_0 && dwRet < WAIT_OBJECT_0 + nCount )
- {
- dwRetIndex = dwRet - WAIT_OBJECT_0;
- for(int i=0; i<nCount; i++)
- {
- if (bWaitAll || dwRetIndex == i)
- {
- auto pAsynWait = dynamic_cast<SpAsyncWait*>(arrAsynWaits[i]);
- TOOLKIT_ASSERT(pAsynWait != NULL);
- if (pAsynWait->m_curr_pkt)
- {
- delete pAsynWait->m_curr_pkt;
- pAsynWait->m_curr_pkt = NULL;
- }
- spinlock_enter(&pAsynWait->m_lock, -1);
- pAsynWait->m_curr_pkt = list_first_entry(&pAsynWait->m_pending_pkt_list, SpAsyncWait::pending_pkt, entry);
- list_del_init(&pAsynWait->m_curr_pkt->entry);
- spinlock_leave(&pAsynWait->m_lock);
- Error = SpTranslateError(pAsynWait->m_curr_pkt->dwSysError);
- }
- }
- }
- else if (dwRet >= WAIT_OBJECT_0+nCount && dwRet < WAIT_OBJECT_0 + nCount*2 )
- {
- Error = Error_Cancel;
- dwRetIndex = dwRet - WAIT_OBJECT_0 - nCount;
- }
- else
- {
- Error = Error_Unexpect;
- }
- return Error;
- }
- ErrorCodeEnum SpAsyncWait::CancelWait()
- {
- m_bCancel = TRUE;
- Close();
- SetEvent(m_evt_cancel);
- return Error_Succeed;
- }
- bool SpAsyncWait::IsPending()
- {
- return m_total_pkt == 0;
- }
- ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd)
- {
- DWORD dwUserError = 0;
- CSimpleString str;
- return AsyncGetAnswer(ReceiveBuffer, bEnd, dwUserError, str);
- }
- ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd, DWORD &dwUserError, CSimpleString& str)
- {
- dwUserError = 0;
- if (m_bCancel)
- {
- return Error_Cancel;
- }
- else
- {
- if (m_curr_pkt)
- {
- if (m_curr_pkt->pkt)
- {
- int read_state;
- read_state = iobuffer_get_read_state(m_curr_pkt->pkt);
- {
- int size = iobuffer_get_length(m_curr_pkt->pkt);
- ReceiveBuffer.Init(size);
- if (size)
- iobuffer_read(m_curr_pkt->pkt, IOBUF_T_BUF, &ReceiveBuffer[0], &size);
- }
- iobuffer_restore_read_state(m_curr_pkt->pkt, read_state);
- }
- bEnd = !!m_curr_pkt->end;
- dwUserError = m_curr_pkt->dwUserError;
- str = m_curr_pkt->str;
-
- return SpTranslateError(m_curr_pkt->dwSysError);
- }
- }
- return Error_Unexpect;
- }
- ErrorCodeEnum SpAsyncWait::AsyncGetAnswer()
- {
- if (m_bCancel) {
- return Error_Cancel;
- } else {
- if (m_curr_pkt) {
- return SpTranslateError(m_curr_pkt->dwSysError);
- }
- }
- return Error_Unexpect;
- }
- ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(DWORD& dwUserError, CSimpleString& str)
- {
- CAutoBuffer receiveBuffer;
- bool bEnd = false;
- return AsyncGetAnswer(receiveBuffer, bEnd, dwUserError, str);
- }
- ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(DWORD& dwUserError)
- {
- CAutoBuffer receiveBuffer;
- CSimpleString str;
- bool bEnd = false;
- return AsyncGetAnswer(receiveBuffer, bEnd, dwUserError, str);
- }
- void SpAsyncWait::ReceiveAnsPkt(int error, int end, iobuffer_t **ans_pkt, bool bReadUserCode)
- {
- pending_pkt *p = new pending_pkt();
- ZeroMemory(p, sizeof(pending_pkt));
- p->end = end;
- if (ans_pkt) {
- p->pkt = *ans_pkt;
- *ans_pkt = NULL;
- } else {
- p->pkt = NULL;
- }
- p->dwSysError = error;
- if (!p->dwSysError)
- {
- if (p->pkt)
- {
- iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwSysError, 0); // sys error
- if (bReadUserCode)
- {
- iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwUserError, 0); // user error
- if (p->dwUserError)
- {
- char* tmp = NULL;
- int slen;
- iobuffer_read(p->pkt, IOBUF_T_STR, NULL, &slen);
- tmp = (char*)malloc(slen + 1);
- iobuffer_read(p->pkt, IOBUF_T_STR, tmp, NULL);
- p->str = tmp;
- if (tmp)
- free(tmp);
- }
-
-
- }
- }
- }
- //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("dwSysError: %d, dwUserCode: %d", p->dwSysError, p->dwUserError);
- if (end || error)
- {
- m_bEnd = TRUE;
- }
- spinlock_enter(&m_lock, -1);
- list_add_tail(&p->entry, &m_pending_pkt_list);
- m_total_pkt++;
- if (m_pCallback) { // does set callback
- wait_callback_entry *wce = new wait_callback_entry();
- wce->cnt++;
- IncrementRef();
- list_add_tail(&wce->entry, &m_pending_callback_list);
- threadpool_queue_workitem2(
- sp_svc_get_threadpool(m_pEntity->get_svc())
- , m_callback_strand
- , &__threadpool_on_callback
- , this
- , (param_size_t)wce
- , NULL);
- } else {
- //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("callback is not existed !");
- }
- spinlock_leave(&m_lock);
- ReleaseSemaphore(m_sem, 1, NULL);
- }
- CSmartPointer<IReleasable> SpAsyncWait::GetCallContext()
- {
- CSmartPointer<IReleasable> ret;
- spinlock_enter(&m_lock, -1);
- ret = m_pCallbackContext;
- spinlock_leave(&m_lock);
- return ret;
- }
- bool SpAsyncWait::GetCallback(CSmartPointer<ICallbackListener> &pCallbackListener,CSmartPointer<IReleasable> &pContext)
- {
- bool ok = false;
- spinlock_enter(&m_lock, -1);
- if (m_pCallback) {
- pCallbackListener = CSmartPointer<ICallbackListener>(m_pCallback);
- pContext = m_pCallbackContext;
- ok = true;
- }
- spinlock_leave(&m_lock);
- return ok;
- }
- void SpAsyncWait::SetCallback(ICallbackListener *pCallback,IReleasable *pContext)
- {
- spinlock_enter(&m_lock, -1);
- if (pCallback) { // install callback
- TOOLKIT_ASSERT(m_pCallback == NULL);
- m_pCallback = pCallback;
- m_pCallbackContext.Attach(pContext);
- IncrementRef(); // xkm@20150115
- int cnt = 0;
- pending_pkt *pos;
- list_for_each_entry(pos, &m_pending_pkt_list, pending_pkt, entry) {
- cnt++;
- }
- // if package already return , callback instantly
- if (cnt >0)
- {
- wait_callback_entry *wce = new wait_callback_entry();
- wce->cnt = cnt;
- IncrementRef();
- list_add_tail(&wce->entry, &m_pending_callback_list);
- threadpool_queue_workitem2(
- sp_svc_get_threadpool(m_pEntity->get_svc())
- , m_callback_strand
- , &__threadpool_on_callback
- , this
- , (param_size_t)wce
- , NULL);
- }
- } else { // cancel callback
- if (m_pCallback != NULL) // xkm@20150115
- DecrementRef();
- m_pCallback = NULL; // {bug} should set NULL
- wait_callback_entry *pos;
- list_for_each_entry(pos, &m_pending_callback_list, wait_callback_entry, entry) {
- pos->cancel++;
- }
- }
- spinlock_leave(&m_lock);
- }
- void SpAsyncWait::threadpool_on_callback(SpAsyncWait::wait_callback_entry *wce)
- {
- spinlock_enter(&m_lock, -1);
- if (!wce->cancel) {
- for (int i = 0; i < wce->cnt; ++i) {
- ErrorCodeEnum Error = WaitAnswer(INFINITE);
- if (Error != Error_Cancel || !m_bCancel) {
- m_pCallback->OnAnswer(this);
- } else {
- break; // user cancel
- }
- }
- }
- list_del(&wce->entry);
- spinlock_leave(&m_lock);
- DecrementRef();
- delete wce;
- if (m_bEnd) {
- m_pCallback = NULL;
- DecrementRef(); // xkm@20150115
- }
- }
- void SpAsyncWait::__threadpool_on_callback(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
- {
- SpAsyncWait *pThis = static_cast<SpAsyncWait*>(arg);
- wait_callback_entry *wce = (wait_callback_entry *)param1;
- pThis->threadpool_on_callback(wce);
- }
- //
- // SpAsyncWaitRPC
- //
- SpAsyncWaitRPC::SpAsyncWaitRPC( SpEntity *pEntity, iobuffer_t **req_pkt, int call_type, bool fetch_user)
- : SpAsyncWait(pEntity), m_req_pkt(*req_pkt), m_call_type(call_type), m_rpc(NULL),m_user(fetch_user)
- {
- //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("SpAsyncWaitRPC()");
- //LOG_FUNCTION();
- *req_pkt = NULL;
- }
- SpAsyncWaitRPC::~SpAsyncWaitRPC()
- {
- //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("~SpAsyncWaitRPC()");
- //LOG_FUNCTION();
- if (m_req_pkt)
- iobuffer_dec_ref(m_req_pkt);
- if (m_rpc)
- sp_rpc_client_destroy(m_rpc);
- }
- ErrorCodeEnum SpAsyncWaitRPC::Begin(const void *arg)
- {
- int rc;
- sp_rpc_client_callback cb;
- cb.on_ans = &__on_ans;
- cb.on_destroy = NULL;
- cb.user_data = this;
- if (arg == NULL)
- rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, m_call_type, &cb, &m_rpc);
- else
- {
- // find entity by name
- auto pEnv = sp_get_env();
- auto pEntity = sp_cfg_get_entity_by_name(pEnv->cfg, (const char*)arg);
- if (pEntity == NULL)
- return Error_NotExist;
- rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), pEntity->mod->idx, pEntity->idx, m_call_type, &cb, &m_rpc);
- }
- if (rc != 0)
- return SpTranslateError(rc);
- IncrementRef();
- rc = sp_rpc_client_async_call(m_rpc, &m_req_pkt);
- if (rc != 0) {
- sp_rpc_client_destroy(m_rpc);
- m_rpc = NULL;
- DecrementRef();
- }
- return SpTranslateError(rc);
- }
- ErrorCodeEnum SpAsyncWaitRPC::Close()
- {
- int rc;
- if (m_rpc) {
- rc = sp_rpc_client_close(m_rpc);
- } else {
- rc = Error_NotInit;
- }
- return SpTranslateError(rc);
- }
- void SpAsyncWaitRPC::__on_ans( sp_rpc_client_t *client, int error, iobuffer_t **ans_pkt, void *user_data )
- {
- SpAsyncWaitRPC *pThis = static_cast<SpAsyncWaitRPC*>(user_data);
- pThis->on_ans(error, ans_pkt);
- }
- void SpAsyncWaitRPC::on_ans( int error, iobuffer_t **ans_pkt )
- {
- ReceiveAnsPkt(error, 1, ans_pkt, m_user);
- DecrementRef(); // xkm@20150115
- }
- //
- // SpAsyncWaitTsx
- //
- SpAsyncWaitTsx::SpAsyncWaitTsx(SpEntity *pEntity, sp_ses_uac_t *uac, int timeout, int method_id, int method_sig, CAutoBuffer Buffer, int tsx_id)
- : SpAsyncWait(pEntity), m_tsx(NULL), m_timeout(timeout), m_method_id(method_id), m_method_sig(method_sig), m_req_pkt(NULL), m_tsx_id(tsx_id)
- {
- //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("SpAsyncWaitTsx()");
- //LOG_FUNCTION();
- int rc;
- sp_tsx_uac_callback cb;
- cb.on_ans = &__on_ans;
- cb.on_destroy = NULL;
- cb.user_data = this;
- rc = sp_tsx_uac_create(uac, m_tsx_id, m_method_id, m_method_sig, &cb, &m_tsx);
- if (rc != 0) {
- m_tsx = NULL;
- } else {
- m_req_pkt = iobuffer_create(-1, Buffer.GetCount() + 32);
- if (Buffer.GetCount() > 0) {
- iobuffer_write(m_req_pkt, IOBUF_T_BUF, &Buffer[0], Buffer.GetCount());
- }
- }
- }
- SpAsyncWaitTsx::~SpAsyncWaitTsx()
- {
- //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("~SpAsyncWaitTsx()");
- //LOG_FUNCTION();
- if (m_tsx) {
- sp_tsx_uac_close(m_tsx);
- sp_tsx_uac_destroy(m_tsx);
- }
- if (m_req_pkt)
- iobuffer_dec_ref(m_req_pkt);
- }
- ErrorCodeEnum SpAsyncWaitTsx::Begin(const void *arg)
- {
- int rc;
- if (!m_tsx)
- return Error_NetBroken;
- IncrementRef(); // xkm@20150115
- rc = sp_tsx_uac_async_req(m_tsx, m_timeout, &m_req_pkt);
- if (rc != 0) {
- sp_tsx_uac_destroy(m_tsx);
- m_tsx = NULL;
- DecrementRef(); // xkm@20150115
- }
- return SpTranslateError(rc);
- }
- ErrorCodeEnum SpAsyncWaitTsx::Close()
- {
- int rc;
- if (m_tsx) {
- rc = sp_tsx_uac_close(m_tsx);
- } else {
- rc = Error_Unexpect;
- }
- return SpTranslateError(rc);
- }
- ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD dwTimeout)
- {
- // use constructor timeout first
- DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout;
- return SpAsyncWait::WaitAnswer(dwWaitTime);
- }
- ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD &dwUserError, CSimpleString& str, DWORD dwTimeout)
- {
- // use constructor timeout first
- DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout;
- return SpAsyncWait::WaitAnswer(dwUserError, str, dwWaitTime);
- }
- void SpAsyncWaitTsx::__on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data)
- {
- SpAsyncWaitTsx *pThis = static_cast<SpAsyncWaitTsx*>(user_data);
- pThis->on_ans(error, end, ans_pkt);
- }
- void SpAsyncWaitTsx::on_ans(int error, int end, iobuffer_t **ans_pkt)
- {
- ReceiveAnsPkt(error, end, ans_pkt, true);
- if (error || end) {
- Close();
- DecrementRef(); // xkm@20150115
- }
- }
- void SpAsyncWaitTsx::setLinkContext(linkContext& tmpContext)
- {
- if (m_req_pkt) {
- iobuffer_set_linkInfo(m_req_pkt, tmpContext.bussinessId.GetData(), tmpContext.traceId.GetData(), tmpContext.spanId.GetData(), tmpContext.parentSpanId.GetData());
- }
- }
|