SpAsyncWait.cpp 14 KB


  1. #include "stdafx.h"
  2. #include "SpBase.h"
  3. #include "SpMisc.h"
  4. #include "SpAsyncWait.h"
  5. #include "SpEntity.h"
  6. #include "sp_env.h"
  7. #include "sp_cfg.h"
  8. #include "dbgutil.h"
  9. //
  10. // SpAsyncWait
  11. //
  12. SpAsyncWait::SpAsyncWait(SpEntity *pEntity)
  13. : m_sem(::CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL))
  14. , m_total_pkt(0)
  15. , m_bCancel(0)
  16. , m_bEnd(0)
  17. ,m_pCallback(NULL)
  18. , m_pEntity(pEntity)
  19. , m_evt_cancel(::CreateEventA(NULL, TRUE, FALSE, NULL))
  20. , m_curr_pkt(NULL)
  21. , m_callback_strand(strand_create())
  22. ,m_pRefCount(NULL)
  23. {
  24. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("SpAsyncWait()");
  25. m_pRefCount = new LONG(1);
  26. INIT_LIST_HEAD(&m_pending_pkt_list);
  27. INIT_LIST_HEAD(&m_pending_callback_list);
  28. spinlock_init(&m_lock);
  29. }
  30. SpAsyncWait::~SpAsyncWait()
  31. {
  32. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("~SpAsyncWait()");
  33. if (m_sem)
  34. CloseHandle(m_sem);
  35. if (m_evt_cancel)
  36. CloseHandle(m_evt_cancel);
  37. pending_pkt *pos, *n;
  38. list_for_each_entry_safe(pos, n, &m_pending_pkt_list, pending_pkt, entry) {
  39. list_del_init(&pos->entry);
  40. delete pos;
  41. }
  42. strand_destroy(m_callback_strand);
  43. if (m_curr_pkt)
  44. delete m_curr_pkt;
  45. }
  46. ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD dwTimeout)
  47. {
  48. DWORD dwUserError = 0;
  49. CSimpleString str;
  50. return WaitAnswer(dwUserError, str, dwTimeout);
  51. }
  52. ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD &dwUserError, CSimpleString &str, DWORD dwTimeout)
  53. {
  54. ErrorCodeEnum Error;
  55. dwUserError = 0;
  56. HANDLE hs[] = {m_sem, m_evt_cancel};
  57. if (dwTimeout == INFINITE)
  58. dwTimeout = 10000;
  59. DWORD dwRet = ::WaitForMultipleObjects(2, &hs[0], FALSE, dwTimeout);
  60. if (dwRet == WAIT_TIMEOUT)
  61. {
  62. Error = Error_TimeOut;
  63. }
  64. else if (dwRet == WAIT_OBJECT_0)
  65. {
  66. if (m_curr_pkt)
  67. {
  68. delete m_curr_pkt;
  69. m_curr_pkt = NULL;
  70. }
  71. spinlock_enter(&m_lock, -1);
  72. m_curr_pkt = list_first_entry(&m_pending_pkt_list, pending_pkt, entry);
  73. list_del_init(&m_curr_pkt->entry);
  74. spinlock_leave(&m_lock);
  75. Error = SpTranslateError(m_curr_pkt->dwSysError);
  76. dwUserError = m_curr_pkt->dwUserError;
  77. str = m_curr_pkt->str;
  78. }
  79. else if (dwRet == WAIT_OBJECT_0+1)
  80. {
  81. Error = Error_Cancel;
  82. }
  83. else
  84. {
  85. Error = Error_Unexpect;
  86. }
  87. OnAnswerWaited(Error);
  88. return Error;
  89. }
  90. ErrorCodeEnum WaitMultiAnswers(CAutoArray<IAsynWaitSp *> arrAsynWaits, DWORD &dwRetIndex, bool bWaitAll, DWORD dwTimeout)
  91. {
  92. if (dwTimeout == INFINITE)
  93. dwTimeout = 10000;
  94. int nCount = arrAsynWaits.GetCount();
  95. if (nCount == 0)
  96. return Error_Param;
  97. else if (nCount >32)
  98. return Error_Overflow;
  99. HANDLE *pHandles = NULL;
  100. if (bWaitAll)
  101. pHandles = new HANDLE[nCount];
  102. else
  103. pHandles = new HANDLE[nCount*2];
  104. for(int i=0; i<nCount; i++)
  105. {
  106. auto pAsynWait = dynamic_cast<SpAsyncWait*>(arrAsynWaits[i]);
  107. TOOLKIT_ASSERT(pAsynWait != NULL);
  108. pHandles[i] = pAsynWait->m_sem;
  109. if (!bWaitAll)
  110. pHandles[i+nCount] = pAsynWait->m_evt_cancel;
  111. }
  112. dwRetIndex = -1;
  113. ErrorCodeEnum Error = Error_Succeed;
  114. DWORD dwRet = ::WaitForMultipleObjects(bWaitAll ? nCount : nCount*2, pHandles, bWaitAll, dwTimeout);
  115. if (dwRet == WAIT_TIMEOUT)
  116. {
  117. Error = Error_TimeOut;
  118. }
  119. else if (dwRet >= WAIT_OBJECT_0 && dwRet < WAIT_OBJECT_0 + nCount )
  120. {
  121. dwRetIndex = dwRet - WAIT_OBJECT_0;
  122. for(int i=0; i<nCount; i++)
  123. {
  124. if (bWaitAll || dwRetIndex == i)
  125. {
  126. auto pAsynWait = dynamic_cast<SpAsyncWait*>(arrAsynWaits[i]);
  127. TOOLKIT_ASSERT(pAsynWait != NULL);
  128. if (pAsynWait->m_curr_pkt)
  129. {
  130. delete pAsynWait->m_curr_pkt;
  131. pAsynWait->m_curr_pkt = NULL;
  132. }
  133. spinlock_enter(&pAsynWait->m_lock, -1);
  134. pAsynWait->m_curr_pkt = list_first_entry(&pAsynWait->m_pending_pkt_list, SpAsyncWait::pending_pkt, entry);
  135. list_del_init(&pAsynWait->m_curr_pkt->entry);
  136. spinlock_leave(&pAsynWait->m_lock);
  137. Error = SpTranslateError(pAsynWait->m_curr_pkt->dwSysError);
  138. }
  139. }
  140. }
  141. else if (dwRet >= WAIT_OBJECT_0+nCount && dwRet < WAIT_OBJECT_0 + nCount*2 )
  142. {
  143. Error = Error_Cancel;
  144. dwRetIndex = dwRet - WAIT_OBJECT_0 - nCount;
  145. }
  146. else
  147. {
  148. Error = Error_Unexpect;
  149. }
  150. return Error;
  151. }
  152. ErrorCodeEnum SpAsyncWait::CancelWait()
  153. {
  154. m_bCancel = TRUE;
  155. Close();
  156. SetEvent(m_evt_cancel);
  157. return Error_Succeed;
  158. }
  159. bool SpAsyncWait::IsPending()
  160. {
  161. return m_total_pkt == 0;
  162. }
  163. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd)
  164. {
  165. DWORD dwUserError = 0;
  166. CSimpleString str;
  167. return AsyncGetAnswer(ReceiveBuffer, bEnd, dwUserError, str);
  168. }
  169. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd, DWORD &dwUserError, CSimpleString& str)
  170. {
  171. dwUserError = 0;
  172. if (m_bCancel)
  173. {
  174. return Error_Cancel;
  175. }
  176. else
  177. {
  178. if (m_curr_pkt)
  179. {
  180. if (m_curr_pkt->pkt)
  181. {
  182. int read_state;
  183. read_state = iobuffer_get_read_state(m_curr_pkt->pkt);
  184. {
  185. int size = iobuffer_get_length(m_curr_pkt->pkt);
  186. ReceiveBuffer.Init(size);
  187. if (size)
  188. iobuffer_read(m_curr_pkt->pkt, IOBUF_T_BUF, &ReceiveBuffer[0], &size);
  189. }
  190. iobuffer_restore_read_state(m_curr_pkt->pkt, read_state);
  191. }
  192. bEnd = !!m_curr_pkt->end;
  193. dwUserError = m_curr_pkt->dwUserError;
  194. str = m_curr_pkt->str;
  195. return SpTranslateError(m_curr_pkt->dwSysError);
  196. }
  197. }
  198. return Error_Unexpect;
  199. }
  200. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer()
  201. {
  202. if (m_bCancel) {
  203. return Error_Cancel;
  204. } else {
  205. if (m_curr_pkt) {
  206. return SpTranslateError(m_curr_pkt->dwSysError);
  207. }
  208. }
  209. return Error_Unexpect;
  210. }
  211. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(DWORD& dwUserError, CSimpleString& str)
  212. {
  213. CAutoBuffer receiveBuffer;
  214. bool bEnd = false;
  215. return AsyncGetAnswer(receiveBuffer, bEnd, dwUserError, str);
  216. }
  217. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(DWORD& dwUserError)
  218. {
  219. CAutoBuffer receiveBuffer;
  220. CSimpleString str;
  221. bool bEnd = false;
  222. return AsyncGetAnswer(receiveBuffer, bEnd, dwUserError, str);
  223. }
  224. void SpAsyncWait::ReceiveAnsPkt(int error, int end, iobuffer_t **ans_pkt, bool bReadUserCode)
  225. {
  226. pending_pkt *p = new pending_pkt();
  227. ZeroMemory(p, sizeof(pending_pkt));
  228. p->end = end;
  229. if (ans_pkt) {
  230. p->pkt = *ans_pkt;
  231. *ans_pkt = NULL;
  232. } else {
  233. p->pkt = NULL;
  234. }
  235. p->dwSysError = error;
  236. if (!p->dwSysError)
  237. {
  238. if (p->pkt)
  239. {
  240. iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwSysError, 0); // sys error
  241. if (bReadUserCode)
  242. {
  243. iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwUserError, 0); // user error
  244. if (p->dwUserError)
  245. {
  246. char* tmp = NULL;
  247. int slen;
  248. iobuffer_read(p->pkt, IOBUF_T_STR, NULL, &slen);
  249. tmp = (char*)malloc(slen + 1);
  250. iobuffer_read(p->pkt, IOBUF_T_STR, tmp, NULL);
  251. p->str = tmp;
  252. if (tmp)
  253. free(tmp);
  254. }
  255. }
  256. }
  257. }
  258. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("dwSysError: %d, dwUserCode: %d", p->dwSysError, p->dwUserError);
  259. if (end || error)
  260. {
  261. m_bEnd = TRUE;
  262. }
  263. spinlock_enter(&m_lock, -1);
  264. list_add_tail(&p->entry, &m_pending_pkt_list);
  265. m_total_pkt++;
  266. if (m_pCallback) { // does set callback
  267. wait_callback_entry *wce = new wait_callback_entry();
  268. wce->cnt++;
  269. IncrementRef();
  270. list_add_tail(&wce->entry, &m_pending_callback_list);
  271. threadpool_queue_workitem2(
  272. sp_svc_get_threadpool(m_pEntity->get_svc())
  273. , m_callback_strand
  274. , &__threadpool_on_callback
  275. , this
  276. , (param_size_t)wce
  277. , NULL);
  278. } else {
  279. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("callback is not existed !");
  280. }
  281. spinlock_leave(&m_lock);
  282. ReleaseSemaphore(m_sem, 1, NULL);
  283. }
  284. CSmartPointer<IReleasable> SpAsyncWait::GetCallContext()
  285. {
  286. CSmartPointer<IReleasable> ret;
  287. spinlock_enter(&m_lock, -1);
  288. ret = m_pCallbackContext;
  289. spinlock_leave(&m_lock);
  290. return ret;
  291. }
  292. bool SpAsyncWait::GetCallback(CSmartPointer<ICallbackListener> &pCallbackListener,CSmartPointer<IReleasable> &pContext)
  293. {
  294. bool ok = false;
  295. spinlock_enter(&m_lock, -1);
  296. if (m_pCallback) {
  297. pCallbackListener = CSmartPointer<ICallbackListener>(m_pCallback);
  298. pContext = m_pCallbackContext;
  299. ok = true;
  300. }
  301. spinlock_leave(&m_lock);
  302. return ok;
  303. }
  304. void SpAsyncWait::SetCallback(ICallbackListener *pCallback,IReleasable *pContext)
  305. {
  306. spinlock_enter(&m_lock, -1);
  307. if (pCallback) { // install callback
  308. TOOLKIT_ASSERT(m_pCallback == NULL);
  309. m_pCallback = pCallback;
  310. m_pCallbackContext.Attach(pContext);
  311. IncrementRef(); // xkm@20150115
  312. int cnt = 0;
  313. pending_pkt *pos;
  314. list_for_each_entry(pos, &m_pending_pkt_list, pending_pkt, entry) {
  315. cnt++;
  316. }
  317. // if package already return , callback instantly
  318. if (cnt >0)
  319. {
  320. wait_callback_entry *wce = new wait_callback_entry();
  321. wce->cnt = cnt;
  322. IncrementRef();
  323. list_add_tail(&wce->entry, &m_pending_callback_list);
  324. threadpool_queue_workitem2(
  325. sp_svc_get_threadpool(m_pEntity->get_svc())
  326. , m_callback_strand
  327. , &__threadpool_on_callback
  328. , this
  329. , (param_size_t)wce
  330. , NULL);
  331. }
  332. } else { // cancel callback
  333. if (m_pCallback != NULL) // xkm@20150115
  334. DecrementRef();
  335. m_pCallback = NULL; // {bug} should set NULL
  336. wait_callback_entry *pos;
  337. list_for_each_entry(pos, &m_pending_callback_list, wait_callback_entry, entry) {
  338. pos->cancel++;
  339. }
  340. }
  341. spinlock_leave(&m_lock);
  342. }
  343. void SpAsyncWait::threadpool_on_callback(SpAsyncWait::wait_callback_entry *wce)
  344. {
  345. spinlock_enter(&m_lock, -1);
  346. if (!wce->cancel) {
  347. for (int i = 0; i < wce->cnt; ++i) {
  348. ErrorCodeEnum Error = WaitAnswer(INFINITE);
  349. if (Error != Error_Cancel || !m_bCancel) {
  350. m_pCallback->OnAnswer(this);
  351. } else {
  352. break; // user cancel
  353. }
  354. }
  355. }
  356. list_del(&wce->entry);
  357. spinlock_leave(&m_lock);
  358. DecrementRef();
  359. delete wce;
  360. if (m_bEnd) {
  361. m_pCallback = NULL;
  362. DecrementRef(); // xkm@20150115
  363. }
  364. }
  365. void SpAsyncWait::__threadpool_on_callback(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
  366. {
  367. SpAsyncWait *pThis = static_cast<SpAsyncWait*>(arg);
  368. wait_callback_entry *wce = (wait_callback_entry *)param1;
  369. pThis->threadpool_on_callback(wce);
  370. }
  371. //
  372. // SpAsyncWaitRPC
  373. //
  374. SpAsyncWaitRPC::SpAsyncWaitRPC( SpEntity *pEntity, iobuffer_t **req_pkt, int call_type, bool fetch_user)
  375. : SpAsyncWait(pEntity), m_req_pkt(*req_pkt), m_call_type(call_type), m_rpc(NULL),m_user(fetch_user)
  376. {
  377. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("SpAsyncWaitRPC()");
  378. //LOG_FUNCTION();
  379. *req_pkt = NULL;
  380. }
  381. SpAsyncWaitRPC::~SpAsyncWaitRPC()
  382. {
  383. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("~SpAsyncWaitRPC()");
  384. //LOG_FUNCTION();
  385. if (m_req_pkt)
  386. iobuffer_dec_ref(m_req_pkt);
  387. if (m_rpc)
  388. sp_rpc_client_destroy(m_rpc);
  389. }
  390. ErrorCodeEnum SpAsyncWaitRPC::Begin(const void *arg)
  391. {
  392. int rc;
  393. sp_rpc_client_callback cb;
  394. cb.on_ans = &__on_ans;
  395. cb.on_destroy = NULL;
  396. cb.user_data = this;
  397. if (arg == NULL)
  398. rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, m_call_type, &cb, &m_rpc);
  399. else
  400. {
  401. // find entity by name
  402. auto pEnv = sp_get_env();
  403. auto pEntity = sp_cfg_get_entity_by_name(pEnv->cfg, (const char*)arg);
  404. if (pEntity == NULL)
  405. return Error_NotExist;
  406. rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), pEntity->mod->idx, pEntity->idx, m_call_type, &cb, &m_rpc);
  407. }
  408. if (rc != 0)
  409. return SpTranslateError(rc);
  410. IncrementRef();
  411. rc = sp_rpc_client_async_call(m_rpc, &m_req_pkt);
  412. if (rc != 0) {
  413. sp_rpc_client_destroy(m_rpc);
  414. m_rpc = NULL;
  415. DecrementRef();
  416. }
  417. return SpTranslateError(rc);
  418. }
  419. ErrorCodeEnum SpAsyncWaitRPC::Close()
  420. {
  421. int rc;
  422. if (m_rpc) {
  423. rc = sp_rpc_client_close(m_rpc);
  424. } else {
  425. rc = Error_NotInit;
  426. }
  427. return SpTranslateError(rc);
  428. }
  429. void SpAsyncWaitRPC::__on_ans( sp_rpc_client_t *client, int error, iobuffer_t **ans_pkt, void *user_data )
  430. {
  431. SpAsyncWaitRPC *pThis = static_cast<SpAsyncWaitRPC*>(user_data);
  432. pThis->on_ans(error, ans_pkt);
  433. }
  434. void SpAsyncWaitRPC::on_ans( int error, iobuffer_t **ans_pkt )
  435. {
  436. ReceiveAnsPkt(error, 1, ans_pkt, m_user);
  437. DecrementRef(); // xkm@20150115
  438. }
  439. //
  440. // SpAsyncWaitTsx
  441. //
  442. SpAsyncWaitTsx::SpAsyncWaitTsx(SpEntity *pEntity, sp_ses_uac_t *uac, int timeout, int method_id, int method_sig, CAutoBuffer Buffer, int tsx_id)
  443. : SpAsyncWait(pEntity), m_tsx(NULL), m_timeout(timeout), m_method_id(method_id), m_method_sig(method_sig), m_req_pkt(NULL), m_tsx_id(tsx_id)
  444. {
  445. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("SpAsyncWaitTsx()");
  446. //LOG_FUNCTION();
  447. int rc;
  448. sp_tsx_uac_callback cb;
  449. cb.on_ans = &__on_ans;
  450. cb.on_destroy = NULL;
  451. cb.user_data = this;
  452. rc = sp_tsx_uac_create(uac, m_tsx_id, m_method_id, m_method_sig, &cb, &m_tsx);
  453. if (rc != 0) {
  454. m_tsx = NULL;
  455. } else {
  456. m_req_pkt = iobuffer_create(-1, Buffer.GetCount() + 32);
  457. if (Buffer.GetCount() > 0) {
  458. iobuffer_write(m_req_pkt, IOBUF_T_BUF, &Buffer[0], Buffer.GetCount());
  459. }
  460. }
  461. }
  462. SpAsyncWaitTsx::~SpAsyncWaitTsx()
  463. {
  464. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("~SpAsyncWaitTsx()");
  465. //LOG_FUNCTION();
  466. if (m_tsx) {
  467. sp_tsx_uac_close(m_tsx);
  468. sp_tsx_uac_destroy(m_tsx);
  469. }
  470. if (m_req_pkt)
  471. iobuffer_dec_ref(m_req_pkt);
  472. }
  473. ErrorCodeEnum SpAsyncWaitTsx::Begin(const void *arg)
  474. {
  475. int rc;
  476. if (!m_tsx)
  477. return Error_NetBroken;
  478. IncrementRef(); // xkm@20150115
  479. rc = sp_tsx_uac_async_req(m_tsx, m_timeout, &m_req_pkt);
  480. if (rc != 0) {
  481. sp_tsx_uac_destroy(m_tsx);
  482. m_tsx = NULL;
  483. DecrementRef(); // xkm@20150115
  484. }
  485. return SpTranslateError(rc);
  486. }
  487. ErrorCodeEnum SpAsyncWaitTsx::Close()
  488. {
  489. int rc;
  490. if (m_tsx) {
  491. rc = sp_tsx_uac_close(m_tsx);
  492. } else {
  493. rc = Error_Unexpect;
  494. }
  495. return SpTranslateError(rc);
  496. }
  497. ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD dwTimeout)
  498. {
  499. // use constructor timeout first
  500. DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout;
  501. return SpAsyncWait::WaitAnswer(dwWaitTime);
  502. }
  503. ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD &dwUserError, CSimpleString& str, DWORD dwTimeout)
  504. {
  505. // use constructor timeout first
  506. DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout;
  507. return SpAsyncWait::WaitAnswer(dwUserError, str, dwWaitTime);
  508. }
  509. void SpAsyncWaitTsx::__on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data)
  510. {
  511. SpAsyncWaitTsx *pThis = static_cast<SpAsyncWaitTsx*>(user_data);
  512. pThis->on_ans(error, end, ans_pkt);
  513. }
  514. void SpAsyncWaitTsx::on_ans(int error, int end, iobuffer_t **ans_pkt)
  515. {
  516. ReceiveAnsPkt(error, end, ans_pkt, true);
  517. if (error || end) {
  518. Close();
  519. DecrementRef(); // xkm@20150115
  520. }
  521. }
  522. void SpAsyncWaitTsx::setLinkContext(linkContext& tmpContext)
  523. {
  524. if (m_req_pkt) {
  525. iobuffer_set_linkInfo(m_req_pkt, tmpContext.bussinessId.GetData(), tmpContext.traceId.GetData(), tmpContext.spanId.GetData(), tmpContext.parentSpanId.GetData());
  526. }
  527. }