SpAsyncWait.cpp 13 KB


  1. #include "stdafx.h"
  2. #include "SpBase.h"
  3. #include "SpMisc.h"
  4. #include "SpAsyncWait.h"
  5. #include "SpEntity.h"
  6. #include "sp_env.h"
  7. #include "sp_cfg.h"
  8. //
  9. // SpAsyncWait
  10. //
  11. SpAsyncWait::SpAsyncWait(SpEntity *pEntity)
  12. : m_sem(::CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL))
  13. , m_total_pkt(0)
  14. , m_bCancel(0)
  15. , m_bEnd(0)
  16. ,m_pCallback(NULL)
  17. , m_pEntity(pEntity)
  18. , m_evt_cancel(::CreateEventA(NULL, TRUE, FALSE, NULL))
  19. , m_curr_pkt(NULL)
  20. , m_callback_strand(strand_create())
  21. ,m_pRefCount(NULL)
  22. {
  23. //Dbg("SpAsyncWait()");
  24. m_pRefCount = new LONG(1);
  25. INIT_LIST_HEAD(&m_pending_pkt_list);
  26. INIT_LIST_HEAD(&m_pending_callback_list);
  27. spinlock_init(&m_lock);
  28. }
  29. SpAsyncWait::~SpAsyncWait()
  30. {
  31. //Dbg("~SpAsyncWait()");
  32. if (m_sem)
  33. CloseHandle(m_sem);
  34. if (m_evt_cancel)
  35. CloseHandle(m_evt_cancel);
  36. pending_pkt *pos, *n;
  37. list_for_each_entry_safe(pos, n, &m_pending_pkt_list, pending_pkt, entry) {
  38. list_del_init(&pos->entry);
  39. delete pos;
  40. }
  41. strand_destroy(m_callback_strand);
  42. if (m_curr_pkt)
  43. delete m_curr_pkt;
  44. }
  45. ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD dwTimeout)
  46. {
  47. DWORD dwUserError = 0;
  48. return WaitAnswer(dwUserError, dwTimeout);
  49. }
  50. ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD &dwUserError, DWORD dwTimeout)
  51. {
  52. ErrorCodeEnum Error;
  53. dwUserError = 0;
  54. HANDLE hs[] = {m_sem, m_evt_cancel};
  55. if (dwTimeout == INFINITE)
  56. dwTimeout = 10000;
  57. DWORD dwRet = ::WaitForMultipleObjects(2, &hs[0], FALSE, dwTimeout);
  58. Dbg("WaitAnswer return: 0x%08X", dwRet);
  59. if (dwRet == WAIT_TIMEOUT)
  60. {
  61. Error = Error_TimeOut;
  62. }
  63. else if (dwRet == WAIT_OBJECT_0)
  64. {
  65. if (m_curr_pkt)
  66. {
  67. delete m_curr_pkt;
  68. m_curr_pkt = NULL;
  69. }
  70. spinlock_enter(&m_lock, -1);
  71. m_curr_pkt = list_first_entry(&m_pending_pkt_list, pending_pkt, entry);
  72. list_del_init(&m_curr_pkt->entry);
  73. spinlock_leave(&m_lock);
  74. Dbg("SpTranslateError(%d)", m_curr_pkt->dwSysError);
  75. Error = SpTranslateError(m_curr_pkt->dwSysError);
  76. dwUserError = m_curr_pkt->dwUserError;
  77. }
  78. else if (dwRet == WAIT_OBJECT_0+1)
  79. {
  80. Error = Error_Cancel;
  81. }
  82. else
  83. {
  84. Error = Error_Unexpect;
  85. }
  86. OnAnswerWaited(Error);
  87. return Error;
  88. }
  89. ErrorCodeEnum WaitMultiAnswers(CAutoArray<IAsynWaitSp *> arrAsynWaits, DWORD &dwRetIndex, bool bWaitAll, DWORD dwTimeout)
  90. {
  91. if (dwTimeout == INFINITE)
  92. dwTimeout = 10000;
  93. int nCount = arrAsynWaits.GetCount();
  94. if (nCount == 0)
  95. return Error_Param;
  96. else if (nCount >32)
  97. return Error_Overflow;
  98. HANDLE *pHandles = NULL;
  99. if (bWaitAll)
  100. pHandles = new HANDLE[nCount];
  101. else
  102. pHandles = new HANDLE[nCount*2];
  103. for(int i=0; i<nCount; i++)
  104. {
  105. auto pAsynWait = dynamic_cast<SpAsyncWait*>(arrAsynWaits[i]);
  106. assert(pAsynWait != NULL);
  107. pHandles[i] = pAsynWait->m_sem;
  108. if (!bWaitAll)
  109. pHandles[i+nCount] = pAsynWait->m_evt_cancel;
  110. }
  111. dwRetIndex = -1;
  112. ErrorCodeEnum Error = Error_Succeed;
  113. DWORD dwRet = ::WaitForMultipleObjects(bWaitAll ? nCount : nCount*2, pHandles, bWaitAll, dwTimeout);
  114. if (dwRet == WAIT_TIMEOUT)
  115. {
  116. Error = Error_TimeOut;
  117. }
  118. else if (dwRet >= WAIT_OBJECT_0 && dwRet < WAIT_OBJECT_0 + nCount )
  119. {
  120. dwRetIndex = dwRet - WAIT_OBJECT_0;
  121. for(int i=0; i<nCount; i++)
  122. {
  123. if (bWaitAll || dwRetIndex == i)
  124. {
  125. auto pAsynWait = dynamic_cast<SpAsyncWait*>(arrAsynWaits[i]);
  126. assert(pAsynWait != NULL);
  127. if (pAsynWait->m_curr_pkt)
  128. {
  129. delete pAsynWait->m_curr_pkt;
  130. pAsynWait->m_curr_pkt = NULL;
  131. }
  132. spinlock_enter(&pAsynWait->m_lock, -1);
  133. pAsynWait->m_curr_pkt = list_first_entry(&pAsynWait->m_pending_pkt_list, SpAsyncWait::pending_pkt, entry);
  134. list_del_init(&pAsynWait->m_curr_pkt->entry);
  135. spinlock_leave(&pAsynWait->m_lock);
  136. Error = SpTranslateError(pAsynWait->m_curr_pkt->dwSysError);
  137. }
  138. }
  139. }
  140. else if (dwRet >= WAIT_OBJECT_0+nCount && dwRet < WAIT_OBJECT_0 + nCount*2 )
  141. {
  142. Error = Error_Cancel;
  143. dwRetIndex = dwRet - WAIT_OBJECT_0 - nCount;
  144. }
  145. else
  146. {
  147. Error = Error_Unexpect;
  148. }
  149. return Error;
  150. }
  151. ErrorCodeEnum SpAsyncWait::CancelWait()
  152. {
  153. m_bCancel = TRUE;
  154. Close();
  155. SetEvent(m_evt_cancel);
  156. return Error_Succeed;
  157. }
  158. bool SpAsyncWait::IsPending()
  159. {
  160. return m_total_pkt == 0;
  161. }
  162. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd)
  163. {
  164. DWORD dwUserError = 0;
  165. return AsyncGetAnswer(ReceiveBuffer, bEnd, dwUserError);
  166. }
  167. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd, DWORD &dwUserError)
  168. {
  169. dwUserError = 0;
  170. if (m_bCancel)
  171. {
  172. return Error_Cancel;
  173. }
  174. else
  175. {
  176. if (m_curr_pkt)
  177. {
  178. if (m_curr_pkt->pkt)
  179. {
  180. int read_state;
  181. read_state = iobuffer_get_read_state(m_curr_pkt->pkt);
  182. {
  183. int size = iobuffer_get_length(m_curr_pkt->pkt);
  184. ReceiveBuffer.Init(size);
  185. if (size)
  186. iobuffer_read(m_curr_pkt->pkt, IOBUF_T_BUF, &ReceiveBuffer[0], &size);
  187. }
  188. iobuffer_restore_read_state(m_curr_pkt->pkt, read_state);
  189. }
  190. bEnd = !!m_curr_pkt->end;
  191. dwUserError = m_curr_pkt->dwUserError;
  192. return SpTranslateError(m_curr_pkt->dwSysError);
  193. }
  194. }
  195. return Error_Unexpect;
  196. }
  197. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer()
  198. {
  199. if (m_bCancel) {
  200. return Error_Cancel;
  201. } else {
  202. if (m_curr_pkt) {
  203. return SpTranslateError(m_curr_pkt->dwSysError);
  204. }
  205. }
  206. return Error_Unexpect;
  207. }
  208. void SpAsyncWait::ReceiveAnsPkt(int error, int end, iobuffer_t **ans_pkt, bool bReadUserCode)
  209. {
  210. Dbg("==> %s: error:%d, end:%d", __FUNCTION__, error, end);
  211. pending_pkt *p = new pending_pkt();
  212. ZeroMemory(p, sizeof(pending_pkt));
  213. p->end = end;
  214. if (ans_pkt) {
  215. p->pkt = *ans_pkt;
  216. *ans_pkt = NULL;
  217. } else {
  218. p->pkt = NULL;
  219. }
  220. p->dwSysError = error;
  221. if (!p->dwSysError)
  222. {
  223. if (p->pkt)
  224. {
  225. iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwSysError, 0); // sys error
  226. if (bReadUserCode)
  227. iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwUserError, 0); // user error
  228. }
  229. }
  230. Dbg("dwSysError: %d", p->dwSysError);
  231. if (end || error)
  232. {
  233. m_bEnd = TRUE;
  234. }
  235. spinlock_enter(&m_lock, -1);
  236. list_add_tail(&p->entry, &m_pending_pkt_list);
  237. m_total_pkt++;
  238. if (m_pCallback) { // does set callback
  239. wait_callback_entry *wce = new wait_callback_entry();
  240. wce->cnt++;
  241. IncrementRef();
  242. list_add_tail(&wce->entry, &m_pending_callback_list);
  243. threadpool_queue_workitem2(
  244. sp_svc_get_threadpool(m_pEntity->get_svc())
  245. , m_callback_strand
  246. , &__threadpool_on_callback
  247. , this
  248. , (param_size_t)wce
  249. , NULL);
  250. }
  251. spinlock_leave(&m_lock);
  252. ReleaseSemaphore(m_sem, 1, NULL);
  253. }
  254. CSmartPointer<IReleasable> SpAsyncWait::GetCallContext()
  255. {
  256. CSmartPointer<IReleasable> ret;
  257. spinlock_enter(&m_lock, -1);
  258. ret = m_pCallbackContext;
  259. spinlock_leave(&m_lock);
  260. return ret;
  261. }
  262. bool SpAsyncWait::GetCallback(CSmartPointer<ICallbackListener> &pCallbackListener,CSmartPointer<IReleasable> &pContext)
  263. {
  264. bool ok = false;
  265. spinlock_enter(&m_lock, -1);
  266. if (m_pCallback) {
  267. pCallbackListener = CSmartPointer<ICallbackListener>(m_pCallback);
  268. pContext = m_pCallbackContext;
  269. ok = true;
  270. }
  271. spinlock_leave(&m_lock);
  272. return ok;
  273. }
  274. void SpAsyncWait::SetCallback(ICallbackListener *pCallback,IReleasable *pContext)
  275. {
  276. spinlock_enter(&m_lock, -1);
  277. if (pCallback) { // install callback
  278. assert(m_pCallback == NULL);
  279. m_pCallback = pCallback;
  280. m_pCallbackContext.Attach(pContext);
  281. IncrementRef(); // xkm@20150115
  282. int cnt = 0;
  283. pending_pkt *pos;
  284. list_for_each_entry(pos, &m_pending_pkt_list, pending_pkt, entry) {
  285. cnt++;
  286. }
  287. // if package already return , callback instantly
  288. if (cnt >0)
  289. {
  290. wait_callback_entry *wce = new wait_callback_entry();
  291. wce->cnt = cnt;
  292. IncrementRef();
  293. list_add_tail(&wce->entry, &m_pending_callback_list);
  294. threadpool_queue_workitem2(
  295. sp_svc_get_threadpool(m_pEntity->get_svc())
  296. , m_callback_strand
  297. , &__threadpool_on_callback
  298. , this
  299. , (param_size_t)wce
  300. , NULL);
  301. }
  302. } else { // cancel callback
  303. if (m_pCallback != NULL) // xkm@20150115
  304. DecrementRef();
  305. m_pCallback = NULL; // {bug} should set NULL
  306. wait_callback_entry *pos;
  307. list_for_each_entry(pos, &m_pending_callback_list, wait_callback_entry, entry) {
  308. pos->cancel++;
  309. }
  310. }
  311. spinlock_leave(&m_lock);
  312. }
  313. void SpAsyncWait::threadpool_on_callback(SpAsyncWait::wait_callback_entry *wce)
  314. {
  315. spinlock_enter(&m_lock, -1);
  316. if (!wce->cancel) {
  317. for (int i = 0; i < wce->cnt; ++i) {
  318. ErrorCodeEnum Error = WaitAnswer(INFINITE);
  319. if (Error != Error_Cancel || !m_bCancel) {
  320. m_pCallback->OnAnswer(this);
  321. } else {
  322. break; // user cancel
  323. }
  324. }
  325. }
  326. list_del(&wce->entry);
  327. spinlock_leave(&m_lock);
  328. DecrementRef();
  329. delete wce;
  330. if (m_bEnd) {
  331. // ¶ÔÓ¦SetCallbackʱIncrementRef()
  332. m_pCallback = NULL;
  333. DecrementRef(); // xkm@20150115
  334. }
  335. }
  336. void SpAsyncWait::__threadpool_on_callback(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
  337. {
  338. SpAsyncWait *pThis = static_cast<SpAsyncWait*>(arg);
  339. wait_callback_entry *wce = (wait_callback_entry *)param1;
  340. pThis->threadpool_on_callback(wce);
  341. }
  342. //
  343. // SpAsyncWaitRPC
  344. //
  345. SpAsyncWaitRPC::SpAsyncWaitRPC( SpEntity *pEntity, iobuffer_t **req_pkt, int call_type ) : SpAsyncWait(pEntity), m_req_pkt(*req_pkt), m_call_type(call_type), m_rpc(NULL)
  346. {
  347. //Dbg("SpAsyncWaitRPC()");
  348. //LOG_FUNCTION();
  349. *req_pkt = NULL;
  350. }
  351. SpAsyncWaitRPC::~SpAsyncWaitRPC()
  352. {
  353. //Dbg("~SpAsyncWaitRPC()");
  354. //LOG_FUNCTION();
  355. if (m_req_pkt)
  356. iobuffer_dec_ref(m_req_pkt);
  357. if (m_rpc)
  358. sp_rpc_client_destroy(m_rpc);
  359. }
  360. ErrorCodeEnum SpAsyncWaitRPC::Begin(const void *arg)
  361. {
  362. int rc;
  363. sp_rpc_client_callback cb;
  364. cb.on_ans = &__on_ans;
  365. cb.on_destroy = NULL;
  366. cb.user_data = this;
  367. if (arg == NULL)
  368. rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, m_call_type, &cb, &m_rpc);
  369. else
  370. {
  371. // find entity by name
  372. auto pEnv = sp_get_env();
  373. auto pEntity = sp_cfg_get_entity_by_name(pEnv->cfg, (const char*)arg);
  374. if (pEntity == NULL)
  375. return Error_NotExist;
  376. rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), pEntity->mod->idx, pEntity->idx, m_call_type, &cb, &m_rpc);
  377. }
  378. if (rc != 0)
  379. return SpTranslateError(rc);
  380. IncrementRef();
  381. rc = sp_rpc_client_async_call(m_rpc, &m_req_pkt);
  382. if (rc != 0) {
  383. sp_rpc_client_destroy(m_rpc);
  384. m_rpc = NULL;
  385. DecrementRef();
  386. }
  387. return SpTranslateError(rc);
  388. }
  389. ErrorCodeEnum SpAsyncWaitRPC::Close()
  390. {
  391. int rc;
  392. if (m_rpc) {
  393. rc = sp_rpc_client_close(m_rpc);
  394. } else {
  395. rc = Error_NotInit;
  396. }
  397. return SpTranslateError(rc);
  398. }
  399. void SpAsyncWaitRPC::__on_ans( sp_rpc_client_t *client, int error, iobuffer_t **ans_pkt, void *user_data )
  400. {
  401. SpAsyncWaitRPC *pThis = static_cast<SpAsyncWaitRPC*>(user_data);
  402. pThis->on_ans(error, ans_pkt);
  403. }
  404. void SpAsyncWaitRPC::on_ans( int error, iobuffer_t **ans_pkt )
  405. {
  406. ReceiveAnsPkt(error, 1, ans_pkt);
  407. DecrementRef(); // xkm@20150115
  408. }
  409. //
  410. // SpAsyncWaitTsx
  411. //
  412. SpAsyncWaitTsx::SpAsyncWaitTsx(SpEntity *pEntity, sp_ses_uac_t *uac, int timeout, int method_id, int method_sig, CAutoBuffer Buffer, int tsx_id)
  413. : SpAsyncWait(pEntity), m_tsx(NULL), m_timeout(timeout), m_method_id(method_id), m_method_sig(method_sig), m_req_pkt(NULL), m_tsx_id(tsx_id)
  414. {
  415. //Dbg("SpAsyncWaitTsx()");
  416. //LOG_FUNCTION();
  417. int rc;
  418. sp_tsx_uac_callback cb;
  419. cb.on_ans = &__on_ans;
  420. cb.on_destroy = NULL;
  421. cb.user_data = this;
  422. rc = sp_tsx_uac_create(uac, m_tsx_id, m_method_id, m_method_sig, &cb, &m_tsx);
  423. if (rc != 0) {
  424. m_tsx = NULL;
  425. } else {
  426. m_req_pkt = iobuffer_create(-1, Buffer.GetCount() + 32);
  427. if (Buffer.GetCount() > 0) {
  428. iobuffer_write(m_req_pkt, IOBUF_T_BUF, &Buffer[0], Buffer.GetCount());
  429. }
  430. }
  431. }
  432. SpAsyncWaitTsx::~SpAsyncWaitTsx()
  433. {
  434. //Dbg("~SpAsyncWaitTsx()");
  435. //LOG_FUNCTION();
  436. if (m_tsx) {
  437. sp_tsx_uac_close(m_tsx);
  438. sp_tsx_uac_destroy(m_tsx);
  439. }
  440. if (m_req_pkt)
  441. iobuffer_dec_ref(m_req_pkt);
  442. }
  443. ErrorCodeEnum SpAsyncWaitTsx::Begin(const void *arg)
  444. {
  445. int rc;
  446. if (!m_tsx)
  447. return Error_NetBroken;
  448. IncrementRef(); // xkm@20150115
  449. rc = sp_tsx_uac_async_req(m_tsx, m_timeout, &m_req_pkt);
  450. if (rc != 0) {
  451. sp_tsx_uac_destroy(m_tsx);
  452. m_tsx = NULL;
  453. DecrementRef(); // xkm@20150115
  454. }
  455. return SpTranslateError(rc);
  456. }
  457. ErrorCodeEnum SpAsyncWaitTsx::Close()
  458. {
  459. int rc;
  460. if (m_tsx) {
  461. rc = sp_tsx_uac_close(m_tsx);
  462. } else {
  463. rc = Error_Unexpect;
  464. }
  465. return SpTranslateError(rc);
  466. }
  467. ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD dwTimeout)
  468. {
  469. // use constructor timeout first
  470. DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout;
  471. return SpAsyncWait::WaitAnswer(dwWaitTime);
  472. }
  473. ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD &dwUserError, DWORD dwTimeout)
  474. {
  475. // use constructor timeout first
  476. DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout;
  477. return SpAsyncWait::WaitAnswer(dwUserError, dwWaitTime);
  478. }
  479. void SpAsyncWaitTsx::__on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data)
  480. {
  481. SpAsyncWaitTsx *pThis = static_cast<SpAsyncWaitTsx*>(user_data);
  482. pThis->on_ans(error, end, ans_pkt);
  483. }
  484. void SpAsyncWaitTsx::on_ans(int error, int end, iobuffer_t **ans_pkt)
  485. {
  486. ReceiveAnsPkt(error, end, ans_pkt, true);
  487. if (error || end) {
  488. Close();
  489. DecrementRef(); // xkm@20150115
  490. }
  491. }