SpAsyncWait.h 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #pragma once
  2. #include "SpBase.h"
  3. #include "sp_rpc.h"
  4. #include "sp_ses.h"
  5. #include "sp_svc.h"
  6. #include "sp_def.h"
  7. #include "sp_env.h"
  8. #include "spinlock.h"
  9. #include "list.h"
  10. class SpEntity;
  11. class SpAsyncWait : public IAsynWaitSp
  12. {
  13. public:
  14. SpAsyncWait(SpEntity *m_pEntity);
  15. virtual ~SpAsyncWait();
  16. // IAsyncWaitSp
  17. virtual const char *GetFuctionName() { return ""; }
  18. virtual ErrorCodeEnum WaitAnswer(DWORD dwTimeout);
  19. virtual ErrorCodeEnum WaitAnswer(DWORD &dwUserError, CSimpleString& str, DWORD dwTimeout);
  20. virtual ErrorCodeEnum CancelWait();
  21. virtual bool IsPending();
  22. virtual DWORD GetMessageID() { return m_dwMessageID; }
  23. virtual ErrorCodeEnum AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd);
  24. virtual ErrorCodeEnum AsyncGetAnswer();
  25. virtual ErrorCodeEnum AsyncGetAnswer(CAutoBuffer &ReceiveBuffer, bool &bEnd, DWORD &dwUserError, CSimpleString& str);
  26. virtual ErrorCodeEnum AsyncGetAnswer(DWORD& dwUserError, CSimpleString& str);
  27. virtual ErrorCodeEnum AsyncGetAnswer(DWORD& dwUserError);
  28. virtual CSmartPointer<IReleasable> GetCallContext();
  29. virtual bool GetCallback(CSmartPointer<ICallbackListener> &pCallbackListener,CSmartPointer<IReleasable> &pContext);
  30. virtual void SetCallback(ICallbackListener *pCallback,IReleasable *pContext=NULL);
  31. virtual ErrorCodeEnum GetExpireTime(DWORD &dwWholeTime,DWORD &dwLeftTime) { return Error_NotImpl; }
  32. virtual ErrorCodeEnum Begin(const void *arg = NULL) { return Error_NotImpl; }
  33. virtual ErrorCodeEnum Close() { return Error_NotImpl; }
  34. virtual void OnAnswerWaited(ErrorCodeEnum Error) { }
  35. void SetMessageID(DWORD dwMessageID) { m_dwMessageID = dwMessageID; }
  36. void ReceiveAnsPkt(int error, int end, iobuffer_t **ans_pkt, bool bReadUserCode = false);
  37. LONG IncrementRef()
  38. {
  39. return InterlockedIncrement(m_pRefCount);
  40. }
  41. LONG DecrementRef() {
  42. LONG ret = InterlockedDecrement(m_pRefCount);
  43. if (ret == 0) {
  44. delete m_pRefCount; // put delete here to avoid release it twice in case of CSmartPointer
  45. delete this;
  46. }
  47. return ret;
  48. }
  49. // return m_pRefCount as CSmartPointer ref count param, so it could be released by CSmartPointer
  50. // hence m_pRefCount should not be release in ~SpAsyncWait()
  51. LONG* GetRefCountPtr()
  52. {
  53. return m_pRefCount;
  54. }
  55. protected:
  56. LONG *m_pRefCount; // could be passed into CSmartPointer by GetRefCountPtr(), so release it carefully
  57. BOOL m_bEnd;
  58. BOOL m_bCancel;
  59. HANDLE m_sem;
  60. HANDLE m_evt_cancel;
  61. spinlock_t m_lock;
  62. CSmartPointer<IReleasable> m_pCallbackContext;
  63. ICallbackListener *m_pCallback;
  64. struct pending_pkt {
  65. struct list_head entry;
  66. iobuffer_t *pkt;
  67. DWORD dwSysError;
  68. DWORD dwUserError;
  69. CSimpleString str;
  70. int end;
  71. ~pending_pkt() {
  72. if (pkt)
  73. iobuffer_dec_ref(pkt);
  74. }
  75. };
  76. struct list_head m_pending_pkt_list;
  77. pending_pkt *m_curr_pkt;
  78. int m_total_pkt;
  79. struct wait_callback_entry {
  80. struct list_head entry;
  81. int cancel;
  82. int cnt;
  83. wait_callback_entry() : cancel(0), cnt(0) {}
  84. };
  85. struct list_head m_pending_callback_list;
  86. strand_t *m_callback_strand;
  87. void threadpool_on_callback(struct wait_callback_entry *wce);
  88. static void __threadpool_on_callback(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2);
  89. DWORD m_dwMessageID;
  90. SpEntity *m_pEntity;
  91. friend ErrorCodeEnum WaitMultiAnswers(CAutoArray<IAsynWaitSp *> arrAsynWaits, DWORD &dwRetIndex, bool bWaitAll, DWORD dwTimeout);
  92. };
  93. class SpAsyncWaitRPC : public SpAsyncWait
  94. {
  95. public:
  96. SpAsyncWaitRPC(SpEntity *pEntity, iobuffer_t **req_pkt, int call_type, bool fetch_user = false);
  97. virtual ~SpAsyncWaitRPC();
  98. virtual ErrorCodeEnum Begin(const void *arg = NULL);
  99. virtual ErrorCodeEnum Close();
  100. static void __on_ans(sp_rpc_client_t *client, int error, iobuffer_t **ans_pkt, void *user_data);
  101. void on_ans(int error, iobuffer_t **ans_pkt);
  102. private:
  103. int m_call_type;
  104. bool m_user;
  105. iobuffer_t *m_req_pkt;
  106. sp_rpc_client_t *m_rpc;
  107. };
  108. class SpAsyncWaitTsx : public SpAsyncWait
  109. {
  110. public:
  111. SpAsyncWaitTsx(SpEntity *pEntity, sp_ses_uac_t *uac, int timeout, int method_id, int method_sig, CAutoBuffer Buffer, int tsx_id);
  112. virtual ~SpAsyncWaitTsx();
  113. virtual ErrorCodeEnum Begin(const void *arg = NULL);
  114. virtual ErrorCodeEnum Close();
  115. virtual ErrorCodeEnum WaitAnswer(DWORD dwTimeout);
  116. virtual ErrorCodeEnum WaitAnswer(DWORD &dwUserError, CSimpleString& str, DWORD dwTimeout);
  117. static void __on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data);
  118. void on_ans(int error, int end, iobuffer_t **ans_pkt);
  119. void setLinkContext(linkContext& tmpContext);
  120. private:
  121. iobuffer_t *m_req_pkt;
  122. int m_method_id;
  123. int m_method_sig;
  124. int m_timeout;
  125. int m_tsx_id;
  126. sp_tsx_uac_t *m_tsx;
  127. };