SpAsyncWait.cpp 12 KB


  1. #include "stdafx.h"
  2. #include "SpBase.h"
  3. #include "SpMisc.h"
  4. #include "SpAsyncWait.h"
  5. #include "SpEntity.h"
  6. #include "sp_env.h"
  7. #include "sp_cfg.h"
  8. //
  9. // SpAsyncWait
  10. //
  11. SpAsyncWait::SpAsyncWait(SpEntity *pEntity) : m_sem(::CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL)), m_total_pkt(0), m_bCancel(0), m_bEnd(0),
  12. m_pCallback(NULL), m_pEntity(pEntity), m_evt_cancel(::CreateEventA(NULL, TRUE, FALSE, NULL)), m_curr_pkt(NULL), m_callback_strand(strand_create()),m_pRefCount(NULL)
  13. {
  14. //Dbg("SpAsyncWait()");
  15. m_pRefCount = new LONG(1);
  16. INIT_LIST_HEAD(&m_pending_pkt_list);
  17. INIT_LIST_HEAD(&m_pending_callback_list);
  18. spinlock_init(&m_lock);
  19. }
  20. SpAsyncWait::~SpAsyncWait()
  21. {
  22. //Dbg("~SpAsyncWait()");
  23. if (m_sem)
  24. CloseHandle(m_sem);
  25. if (m_evt_cancel)
  26. CloseHandle(m_evt_cancel);
  27. pending_pkt *pos, *n;
  28. list_for_each_entry_safe(pos, n, &m_pending_pkt_list, pending_pkt, entry) {
  29. list_del_init(&pos->entry);
  30. delete pos;
  31. }
  32. strand_destroy(m_callback_strand);
  33. if (m_curr_pkt)
  34. delete m_curr_pkt;
  35. }
  36. ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD dwTimeout)
  37. {
  38. DWORD dwUserError = 0;
  39. return WaitAnswer(dwUserError, dwTimeout);
  40. }
  41. ErrorCodeEnum SpAsyncWait::WaitAnswer(DWORD &dwUserError, DWORD dwTimeout)
  42. {
  43. ErrorCodeEnum Error;
  44. dwUserError = 0;
  45. HANDLE hs[] = {m_sem, m_evt_cancel};
  46. if (dwTimeout == INFINITE)
  47. dwTimeout = 10000;
  48. DWORD dwRet = ::WaitForMultipleObjects(2, &hs[0], FALSE, dwTimeout);
  49. if (dwRet == WAIT_TIMEOUT)
  50. {
  51. Error = Error_TimeOut;
  52. }
  53. else if (dwRet == WAIT_OBJECT_0)
  54. {
  55. if (m_curr_pkt)
  56. {
  57. delete m_curr_pkt;
  58. m_curr_pkt = NULL;
  59. }
  60. spinlock_enter(&m_lock, -1);
  61. m_curr_pkt = list_first_entry(&m_pending_pkt_list, pending_pkt, entry);
  62. list_del_init(&m_curr_pkt->entry);
  63. spinlock_leave(&m_lock);
  64. Error = SpTranslateError(m_curr_pkt->dwSysError);
  65. dwUserError = m_curr_pkt->dwUserError;
  66. }
  67. else if (dwRet == WAIT_OBJECT_0+1)
  68. {
  69. Error = Error_Cancel;
  70. }
  71. else
  72. {
  73. Error = Error_Unexpect;
  74. }
  75. OnAnswerWaited(Error);
  76. return Error;
  77. }
  78. ErrorCodeEnum WaitMultiAnswers(CAutoArray<IAsynWaitSp *> arrAsynWaits, DWORD &dwRetIndex, bool bWaitAll, DWORD dwTimeout)
  79. {
  80. if (dwTimeout == INFINITE)
  81. dwTimeout = 10000;
  82. int nCount = arrAsynWaits.GetCount();
  83. if (nCount == 0)
  84. return Error_Param;
  85. else if (nCount >32)
  86. return Error_Overflow;
  87. HANDLE *pHandles = NULL;
  88. if (bWaitAll)
  89. pHandles = new HANDLE[nCount];
  90. else
  91. pHandles = new HANDLE[nCount*2];
  92. for(int i=0; i<nCount; i++)
  93. {
  94. auto pAsynWait = dynamic_cast<SpAsyncWait*>(arrAsynWaits[i]);
  95. assert(pAsynWait != NULL);
  96. pHandles[i] = pAsynWait->m_sem;
  97. if (!bWaitAll)
  98. pHandles[i+nCount] = pAsynWait->m_evt_cancel;
  99. }
  100. dwRetIndex = -1;
  101. ErrorCodeEnum Error = Error_Succeed;
  102. DWORD dwRet = ::WaitForMultipleObjects(bWaitAll ? nCount : nCount*2, pHandles, bWaitAll, dwTimeout);
  103. if (dwRet == WAIT_TIMEOUT)
  104. {
  105. Error = Error_TimeOut;
  106. }
  107. else if (dwRet >= WAIT_OBJECT_0 && dwRet < WAIT_OBJECT_0 + nCount )
  108. {
  109. dwRetIndex = dwRet - WAIT_OBJECT_0;
  110. for(int i=0; i<nCount; i++)
  111. {
  112. if (bWaitAll || dwRetIndex == i)
  113. {
  114. auto pAsynWait = dynamic_cast<SpAsyncWait*>(arrAsynWaits[i]);
  115. assert(pAsynWait != NULL);
  116. if (pAsynWait->m_curr_pkt)
  117. {
  118. delete pAsynWait->m_curr_pkt;
  119. pAsynWait->m_curr_pkt = NULL;
  120. }
  121. spinlock_enter(&pAsynWait->m_lock, -1);
  122. pAsynWait->m_curr_pkt = list_first_entry(&pAsynWait->m_pending_pkt_list, SpAsyncWait::pending_pkt, entry);
  123. list_del_init(&pAsynWait->m_curr_pkt->entry);
  124. spinlock_leave(&pAsynWait->m_lock);
  125. Error = SpTranslateError(pAsynWait->m_curr_pkt->dwSysError);
  126. }
  127. }
  128. }
  129. else if (dwRet >= WAIT_OBJECT_0+nCount && dwRet < WAIT_OBJECT_0 + nCount*2 )
  130. {
  131. Error = Error_Cancel;
  132. dwRetIndex = dwRet - WAIT_OBJECT_0 - nCount;
  133. }
  134. else
  135. {
  136. Error = Error_Unexpect;
  137. }
  138. return Error;
  139. }
  140. ErrorCodeEnum SpAsyncWait::CancelWait()
  141. {
  142. m_bCancel = TRUE;
  143. Close();
  144. SetEvent(m_evt_cancel);
  145. return Error_Succeed;
  146. }
  147. bool SpAsyncWait::IsPending()
  148. {
  149. return m_total_pkt == 0;
  150. }
  151. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd)
  152. {
  153. DWORD dwUserError = 0;
  154. return AsyncGetAnswer(ReceiveBuffer, bEnd, dwUserError);
  155. }
  156. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd, DWORD &dwUserError)
  157. {
  158. dwUserError = 0;
  159. if (m_bCancel)
  160. {
  161. return Error_Cancel;
  162. }
  163. else
  164. {
  165. if (m_curr_pkt)
  166. {
  167. if (m_curr_pkt->pkt)
  168. {
  169. int read_state;
  170. read_state = iobuffer_get_read_state(m_curr_pkt->pkt);
  171. {
  172. int size = iobuffer_get_length(m_curr_pkt->pkt);
  173. ReceiveBuffer.Init(size);
  174. if (size)
  175. iobuffer_read(m_curr_pkt->pkt, IOBUF_T_BUF, &ReceiveBuffer[0], &size);
  176. }
  177. iobuffer_restore_read_state(m_curr_pkt->pkt, read_state);
  178. }
  179. bEnd = !!m_curr_pkt->end;
  180. dwUserError = m_curr_pkt->dwUserError;
  181. return SpTranslateError(m_curr_pkt->dwSysError);
  182. }
  183. }
  184. return Error_Unexpect;
  185. }
  186. ErrorCodeEnum SpAsyncWait::AsyncGetAnswer()
  187. {
  188. if (m_bCancel) {
  189. return Error_Cancel;
  190. } else {
  191. if (m_curr_pkt) {
  192. return SpTranslateError(m_curr_pkt->dwSysError);
  193. }
  194. }
  195. return Error_Unexpect;
  196. }
  197. void SpAsyncWait::ReceiveAnsPkt(int error, int end, iobuffer_t **ans_pkt, bool bReadUserCode)
  198. {
  199. pending_pkt *p = new pending_pkt();
  200. ZeroMemory(p, sizeof(pending_pkt));
  201. p->end = end;
  202. if (ans_pkt) {
  203. p->pkt = *ans_pkt;
  204. *ans_pkt = NULL;
  205. } else {
  206. p->pkt = NULL;
  207. }
  208. p->dwSysError = error;
  209. if (!p->dwSysError)
  210. {
  211. if (p->pkt)
  212. {
  213. iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwSysError, 0); // sys error
  214. if (bReadUserCode)
  215. iobuffer_read(p->pkt, IOBUF_T_I4, &p->dwUserError, 0); // user error
  216. }
  217. }
  218. if (end || error)
  219. {
  220. m_bEnd = TRUE;
  221. }
  222. spinlock_enter(&m_lock, -1);
  223. list_add_tail(&p->entry, &m_pending_pkt_list);
  224. m_total_pkt++;
  225. if (m_pCallback) { // does set callback
  226. wait_callback_entry *wce = new wait_callback_entry();
  227. wce->cnt++;
  228. IncrementRef();
  229. list_add_tail(&wce->entry, &m_pending_callback_list);
  230. threadpool_queue_workitem2(sp_svc_get_threadpool(m_pEntity->get_svc()), m_callback_strand, &__threadpool_on_callback, this, (int)wce, NULL);
  231. }
  232. spinlock_leave(&m_lock);
  233. ReleaseSemaphore(m_sem, 1, NULL);
  234. }
  235. CSmartPointer<IReleasable> SpAsyncWait::GetCallContext()
  236. {
  237. CSmartPointer<IReleasable> ret;
  238. spinlock_enter(&m_lock, -1);
  239. ret = m_pCallbackContext;
  240. spinlock_leave(&m_lock);
  241. return ret;
  242. }
  243. bool SpAsyncWait::GetCallback(CSmartPointer<ICallbackListener> &pCallbackListener,CSmartPointer<IReleasable> &pContext)
  244. {
  245. bool ok = false;
  246. spinlock_enter(&m_lock, -1);
  247. if (m_pCallback) {
  248. pCallbackListener = CSmartPointer<ICallbackListener>(m_pCallback);
  249. pContext = m_pCallbackContext;
  250. ok = true;
  251. }
  252. spinlock_leave(&m_lock);
  253. return ok;
  254. }
  255. void SpAsyncWait::SetCallback(ICallbackListener *pCallback,IReleasable *pContext)
  256. {
  257. spinlock_enter(&m_lock, -1);
  258. if (pCallback) { // install callback
  259. assert(m_pCallback == NULL);
  260. m_pCallback = pCallback;
  261. m_pCallbackContext.Attach(pContext);
  262. IncrementRef(); // xkm@20150115
  263. int cnt = 0;
  264. pending_pkt *pos;
  265. list_for_each_entry(pos, &m_pending_pkt_list, pending_pkt, entry) {
  266. cnt++;
  267. }
  268. // if package already return , callback instantly
  269. if (cnt >0)
  270. {
  271. wait_callback_entry *wce = new wait_callback_entry();
  272. wce->cnt = cnt;
  273. IncrementRef();
  274. list_add_tail(&wce->entry, &m_pending_callback_list);
  275. threadpool_queue_workitem2(sp_svc_get_threadpool(m_pEntity->get_svc()), m_callback_strand, &__threadpool_on_callback, this, (int)wce, NULL);
  276. }
  277. } else { // cancel callback
  278. if (m_pCallback != NULL) // xkm@20150115
  279. DecrementRef();
  280. m_pCallback = NULL; // {bug} should set NULL
  281. wait_callback_entry *pos;
  282. list_for_each_entry(pos, &m_pending_callback_list, wait_callback_entry, entry) {
  283. pos->cancel++;
  284. }
  285. }
  286. spinlock_leave(&m_lock);
  287. }
  288. void SpAsyncWait::threadpool_on_callback(SpAsyncWait::wait_callback_entry *wce)
  289. {
  290. spinlock_enter(&m_lock, -1);
  291. if (!wce->cancel) {
  292. for (int i = 0; i < wce->cnt; ++i) {
  293. ErrorCodeEnum Error = WaitAnswer(INFINITE);
  294. if (Error != Error_Cancel || !m_bCancel) {
  295. m_pCallback->OnAnswer(this);
  296. } else {
  297. break; // user cancel
  298. }
  299. }
  300. }
  301. list_del(&wce->entry);
  302. spinlock_leave(&m_lock);
  303. DecrementRef();
  304. delete wce;
  305. if (m_bEnd) {
  306. // ¶ÔÓ¦SetCallbackʱIncrementRef()
  307. m_pCallback = NULL;
  308. DecrementRef(); // xkm@20150115
  309. }
  310. }
  311. void SpAsyncWait::__threadpool_on_callback(threadpool_t *threadpool, void *arg, int param1, int param2)
  312. {
  313. SpAsyncWait *pThis = static_cast<SpAsyncWait*>(arg);
  314. wait_callback_entry *wce = (wait_callback_entry *)param1;
  315. pThis->threadpool_on_callback(wce);
  316. }
  317. //
  318. // SpAsyncWaitRPC
  319. //
  320. SpAsyncWaitRPC::SpAsyncWaitRPC( SpEntity *pEntity, iobuffer_t **req_pkt, int call_type ) : SpAsyncWait(pEntity), m_req_pkt(*req_pkt), m_call_type(call_type), m_rpc(NULL)
  321. {
  322. //Dbg("SpAsyncWaitRPC()");
  323. //LOG_FUNCTION();
  324. *req_pkt = NULL;
  325. }
  326. SpAsyncWaitRPC::~SpAsyncWaitRPC()
  327. {
  328. //Dbg("~SpAsyncWaitRPC()");
  329. //LOG_FUNCTION();
  330. if (m_req_pkt)
  331. iobuffer_dec_ref(m_req_pkt);
  332. if (m_rpc)
  333. sp_rpc_client_destroy(m_rpc);
  334. }
  335. ErrorCodeEnum SpAsyncWaitRPC::Begin(const void *arg)
  336. {
  337. int rc;
  338. sp_rpc_client_callback cb;
  339. cb.on_ans = &__on_ans;
  340. cb.on_destroy = NULL;
  341. cb.user_data = this;
  342. if (arg == NULL)
  343. rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, m_call_type, &cb, &m_rpc);
  344. else
  345. {
  346. // find entity by name
  347. auto pEnv = sp_get_env();
  348. auto pEntity = sp_cfg_get_entity_by_name(pEnv->cfg, (const char*)arg);
  349. if (pEntity == NULL)
  350. return Error_NotExist;
  351. rc = sp_rpc_client_create(m_pEntity->get_rpc_mgr(), pEntity->mod->idx, pEntity->idx, m_call_type, &cb, &m_rpc);
  352. }
  353. if (rc != 0)
  354. return SpTranslateError(rc);
  355. IncrementRef();
  356. rc = sp_rpc_client_async_call(m_rpc, &m_req_pkt);
  357. if (rc != 0) {
  358. sp_rpc_client_destroy(m_rpc);
  359. m_rpc = NULL;
  360. DecrementRef();
  361. }
  362. return SpTranslateError(rc);
  363. }
  364. ErrorCodeEnum SpAsyncWaitRPC::Close()
  365. {
  366. int rc;
  367. if (m_rpc) {
  368. rc = sp_rpc_client_close(m_rpc);
  369. } else {
  370. rc = Error_NotInit;
  371. }
  372. return SpTranslateError(rc);
  373. }
  374. void SpAsyncWaitRPC::__on_ans( sp_rpc_client_t *client, int error, iobuffer_t **ans_pkt, void *user_data )
  375. {
  376. SpAsyncWaitRPC *pThis = static_cast<SpAsyncWaitRPC*>(user_data);
  377. pThis->on_ans(error, ans_pkt);
  378. }
  379. void SpAsyncWaitRPC::on_ans( int error, iobuffer_t **ans_pkt )
  380. {
  381. ReceiveAnsPkt(error, 1, ans_pkt);
  382. DecrementRef(); // xkm@20150115
  383. }
  384. //
  385. // SpAsyncWaitTsx
  386. //
  387. SpAsyncWaitTsx::SpAsyncWaitTsx(SpEntity *pEntity, sp_ses_uac_t *uac, int timeout, int method_id, int method_sig, CAutoBuffer Buffer, int tsx_id)
  388. : SpAsyncWait(pEntity), m_tsx(NULL), m_timeout(timeout), m_method_id(method_id), m_method_sig(method_sig), m_req_pkt(NULL), m_tsx_id(tsx_id)
  389. {
  390. //Dbg("SpAsyncWaitTsx()");
  391. //LOG_FUNCTION();
  392. int rc;
  393. sp_tsx_uac_callback cb;
  394. cb.on_ans = &__on_ans;
  395. cb.on_destroy = NULL;
  396. cb.user_data = this;
  397. rc = sp_tsx_uac_create(uac, m_tsx_id, m_method_id, m_method_sig, &cb, &m_tsx);
  398. if (rc != 0) {
  399. m_tsx = NULL;
  400. } else {
  401. m_req_pkt = iobuffer_create(-1, Buffer.GetCount() + 32);
  402. if (Buffer.GetCount() > 0) {
  403. iobuffer_write(m_req_pkt, IOBUF_T_BUF, &Buffer[0], Buffer.GetCount());
  404. }
  405. }
  406. }
  407. SpAsyncWaitTsx::~SpAsyncWaitTsx()
  408. {
  409. //Dbg("~SpAsyncWaitTsx()");
  410. //LOG_FUNCTION();
  411. if (m_tsx) {
  412. sp_tsx_uac_close(m_tsx);
  413. sp_tsx_uac_destroy(m_tsx);
  414. }
  415. if (m_req_pkt)
  416. iobuffer_dec_ref(m_req_pkt);
  417. }
  418. ErrorCodeEnum SpAsyncWaitTsx::Begin(const void *arg)
  419. {
  420. int rc;
  421. if (!m_tsx)
  422. return Error_NetBroken;
  423. IncrementRef(); // xkm@20150115
  424. rc = sp_tsx_uac_async_req(m_tsx, m_timeout, &m_req_pkt);
  425. if (rc != 0) {
  426. sp_tsx_uac_destroy(m_tsx);
  427. m_tsx = NULL;
  428. DecrementRef(); // xkm@20150115
  429. }
  430. return SpTranslateError(rc);
  431. }
  432. ErrorCodeEnum SpAsyncWaitTsx::Close()
  433. {
  434. int rc;
  435. if (m_tsx) {
  436. rc = sp_tsx_uac_close(m_tsx);
  437. } else {
  438. rc = Error_Unexpect;
  439. }
  440. return SpTranslateError(rc);
  441. }
  442. ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD dwTimeout)
  443. {
  444. // use constructor timeout first
  445. DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout;
  446. return SpAsyncWait::WaitAnswer(dwWaitTime);
  447. }
  448. ErrorCodeEnum SpAsyncWaitTsx::WaitAnswer(DWORD &dwUserError, DWORD dwTimeout)
  449. {
  450. // use constructor timeout first
  451. DWORD dwWaitTime = m_timeout > 0 ? m_timeout : dwTimeout;
  452. return SpAsyncWait::WaitAnswer(dwUserError, dwWaitTime);
  453. }
  454. void SpAsyncWaitTsx::__on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data)
  455. {
  456. SpAsyncWaitTsx *pThis = static_cast<SpAsyncWaitTsx*>(user_data);
  457. pThis->on_ans(error, end, ans_pkt);
  458. }
  459. void SpAsyncWaitTsx::on_ans(int error, int end, iobuffer_t **ans_pkt)
  460. {
  461. ReceiveAnsPkt(error, end, ans_pkt, true);
  462. if (error || end) {
  463. Close();
  464. DecrementRef(); // xkm@20150115
  465. }
  466. }