sp_ses.c 52 KB


  1. #include "precompile.h"
  2. #include "sp_ses.h"
  3. #include "sp_def.h"
  4. #include "sp_svc.h"
  5. #include "sp_dbg_export.h"
  6. #include "sp_mod.h"
  7. #include "sp_env.h"
  8. #include "SpBase.h"
  9. #include "memutil.h"
  10. #include "refcnt.h"
  11. #include "list.h"
  12. #include "jhash.h"
  13. #include "hashset.h"
  14. #include "spinlock.h"
  15. #define CONN_BUCKET_SIZE 127
  16. #define TSX_BUCKET_SIZE 511
  17. #define MAX_REDIRECT 10
  18. #define SES_CMD_REQ 0 /* uac -> uas two way call */
  19. #define SES_CMD_TMPANS 1 /* uas -> uac middle response */
  20. #define SES_CMD_ANS 2 /* uas -> uac the end response */
  21. #define SES_CMD_INFO 3 /* uac -> uas one way call */
  22. #define SES_CMD_CONNACK 4 /* uas -> uac */
  23. #define SES_CMD_FINC 5 /* uac -> uas session close */
  24. #define SES_CMD_FINS 6 /* uas -> uac session close */
  25. #define SES_CMD_CONN 7 /* uac -> uas */
  26. #define SES_CMD_ERRC 8 /* uac -> uas, session broken */
  27. #define SES_CMD_ERRS 9 /* uas -> uac, session broken */
  28. #define SES_CMD_REDIRECT 10 /* uas -> uac, session redirect */
  29. struct sp_ses_mgr_t
  30. {
  31. struct hlist_head uac_buckets[CONN_BUCKET_SIZE];
  32. struct hlist_head uas_buckets[CONN_BUCKET_SIZE];
  33. struct hlist_head uac_tsx_buckets[TSX_BUCKET_SIZE];
  34. struct hlist_head uas_tsx_buckets[TSX_BUCKET_SIZE];
  35. sp_ses_mgr_callback_t cb;
  36. sp_svc_t *svc;
  37. int ses_cnt;
  38. CRITICAL_SECTION lock;
  39. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  40. };
  41. DECLARE_REF_COUNT_STATIC(sp_ses_mgr, sp_ses_mgr_t)
  42. struct sp_ses_uac_t
  43. {
  44. struct hlist_node hentry; // element of mgr->uac_buckets
  45. sp_ses_uac_callback cb;
  46. sp_ses_mgr_t *mgr;
  47. int remote_epid;
  48. int remote_svc_id;
  49. unsigned int conn_id;
  50. int state;
  51. int error;
  52. int tsx_cnt;
  53. int redirect_cnt;
  54. y2k_time_t begin_time;
  55. y2k_time_t state_begin_time;
  56. spinlock_t lock;
  57. timer_entry *op_timer;
  58. struct list_head tsx_list; // list of sp_tsx_uac_t->entry
  59. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  60. };
  61. DECLARE_REF_COUNT_STATIC(sp_ses_uac, sp_ses_uac_t)
  62. struct sp_tsx_uac_t
  63. {
  64. struct hlist_node hentry; // element of mgr->uac_tsx_buckets
  65. struct list_head entry; // element of sp_ses_uac_t->tsx_list
  66. int state;
  67. int tsx_id;
  68. int method_id;
  69. int method_sig;
  70. spinlock_t lock;
  71. sp_tsx_uac_callback cb;
  72. sp_ses_uac_t *uac;
  73. timer_entry *op_timer;
  74. strand_t *strand;
  75. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  76. };
  77. DECLARE_REF_COUNT_STATIC(sp_tsx_uac, sp_tsx_uac_t)
  78. typedef struct method_strand_t {
  79. int type;
  80. strand_t *strand;
  81. struct hlist_node node;
  82. }method_strand_t;
  83. struct sp_ses_uas_t
  84. {
  85. struct hlist_node hentry; // element of mgr->uas_buckets
  86. sp_ses_uas_callback cb;
  87. sp_ses_mgr_t *mgr;
  88. int remote_epid;
  89. int remote_svc_id;
  90. int conn_id;
  91. int state;
  92. int error;
  93. int tsx_cnt;
  94. y2k_time_t begin_time;
  95. y2k_time_t state_begin_time;
  96. spinlock_t lock;
  97. strand_t *strand;
  98. hashset_t *method_strand;
  99. struct list_head tsx_list; // list of sp_tsx_uas_t->entry
  100. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  101. };
  102. DECLARE_REF_COUNT_STATIC(sp_ses_uas, sp_ses_uas_t)
  103. struct sp_tsx_uas_t
  104. {
  105. struct hlist_node hentry; // element of mgr->uas_tsx_buckets
  106. struct list_head entry; // element of sp_ses_uas_t->tsx_list
  107. int state;
  108. int tsx_id;
  109. spinlock_t lock;
  110. sp_tsx_uas_callback cb;
  111. sp_ses_uas_t *uas;
  112. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  113. };
  114. DECLARE_REF_COUNT_STATIC(sp_tsx_uas, sp_tsx_uas_t)
  115. static void uac_process_connack(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id);
  116. static void uac_process_redirect(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt);
  117. static void tsx_uac_process_ans(sp_tsx_uac_t *tsx, int end, iobuffer_t **ans_pkt);
  118. static void uac_process_errs(sp_ses_uac_t *uac, int error);
  119. static void uas_process_errc(sp_ses_uas_t *uas, int error);
  120. static void uas_process_info(sp_ses_uas_t *uas, int method_id, int method_sig, iobuffer_t **info_pkt);
  121. static void uas_process_req(sp_ses_uas_t *uas, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt);
  122. static void uac_trigger(sp_ses_uac_t *uac, int error, int connect);
  123. static void tsx_uac_process_errs(sp_tsx_uac_t *tsx, int error);
  124. static void uas_trigger(sp_ses_uas_t *uas, int error);
  125. static __inline void mgr_lock(sp_ses_mgr_t *mgr)
  126. {
  127. EnterCriticalSection(&mgr->lock);
  128. }
  129. static __inline void mgr_unlock(sp_ses_mgr_t *mgr)
  130. {
  131. LeaveCriticalSection(&mgr->lock);
  132. }
  133. static sp_ses_uac_t *mgr_find_uac(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
  134. {
  135. int slot;
  136. sp_ses_uac_t *tpos;
  137. struct hlist_node *pos;
  138. slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE;
  139. hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[slot], sp_ses_uac_t, hentry) {
  140. if (tpos->conn_id == conn_id)
  141. return tpos;
  142. }
  143. return NULL;
  144. }
  145. static sp_ses_uas_t *mgr_find_uas(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
  146. {
  147. int slot;
  148. sp_ses_uas_t *tpos;
  149. struct hlist_node *pos;
  150. slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE;
  151. hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[slot], sp_ses_uas_t, hentry) {
  152. if (tpos->conn_id == conn_id)
  153. return tpos;
  154. }
  155. return NULL;
  156. }
  157. sp_ses_info_t* sp_ses_mgr_get_ses_info(sp_ses_mgr_t *mgr, int *cnt)
  158. {
  159. sp_ses_info_t* ret = NULL;
  160. int index = 0;
  161. int i = 0;
  162. *cnt = mgr->ses_cnt;
  163. if (*cnt <=0)
  164. return NULL;
  165. ret = (sp_ses_info_t*) malloc(sizeof(sp_ses_info_t)*(*cnt));
  166. for(i=0; i<CONN_BUCKET_SIZE; i++)
  167. {
  168. sp_ses_uac_t *tpos;
  169. struct hlist_node *pos;
  170. hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[i], sp_ses_uac_t, hentry)
  171. {
  172. ret[index].from_svc_id = sp_svc_get_id(mgr->svc);
  173. ret[index].to_svc_id = tpos->remote_svc_id;
  174. ret[index].begin_time = tpos->begin_time;
  175. ret[index].state_begin_time = tpos->state_begin_time;
  176. ret[index].state = tpos->state == SP_SES_STATE_CONNECTED ? 1 : 3;
  177. index++;
  178. }
  179. }
  180. for(i=0; i<CONN_BUCKET_SIZE; i++)
  181. {
  182. sp_ses_uas_t *tpos;
  183. struct hlist_node *pos;
  184. hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[i], sp_ses_uas_t, hentry)
  185. {
  186. ret[index].from_svc_id = tpos->remote_svc_id;
  187. ret[index].to_svc_id = sp_svc_get_id(mgr->svc);
  188. ret[index].begin_time = tpos->begin_time;
  189. ret[index].state_begin_time = tpos->state_begin_time;
  190. ret[index].state = tpos->state == SP_SES_STATE_CONNECTED ? 1 : 3;
  191. index++;
  192. }
  193. }
  194. assert(index == *cnt);
  195. return ret;
  196. }
  197. static sp_tsx_uac_t *mgr_find_tsx_uac(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id)
  198. {
  199. int slot;
  200. sp_tsx_uac_t *tpos;
  201. struct hlist_node *pos;
  202. slot = jhash_3words(svc_id, conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
  203. hlist_for_each_entry(tpos, pos, &mgr->uac_tsx_buckets[slot], sp_tsx_uac_t, hentry) {
  204. if (tpos->uac->remote_svc_id == svc_id && tpos->uac->conn_id == conn_id && tpos->tsx_id == tsx_id)
  205. return tpos;
  206. }
  207. return NULL;
  208. }
  209. static sp_tsx_uas_t *mgr_find_tsx_uas(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id)
  210. {
  211. int slot;
  212. sp_tsx_uas_t *tpos;
  213. struct hlist_node *pos;
  214. slot = jhash_3words(svc_id, conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
  215. hlist_for_each_entry(tpos, pos, &mgr->uas_tsx_buckets[slot], sp_tsx_uas_t, hentry) {
  216. if (tpos->uas->remote_svc_id == svc_id && tpos->uas->conn_id == conn_id && tpos->tsx_id == tsx_id)
  217. return tpos;
  218. }
  219. return NULL;
  220. }
  221. static void mgr_process_connack(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
  222. {
  223. sp_ses_uac_t *uac;
  224. mgr_lock(mgr);
  225. uac = mgr_find_uac(mgr, epid, svc_id, conn_id);
  226. if (uac)
  227. sp_ses_uac_inc_ref(uac); // lock
  228. mgr_unlock(mgr);
  229. if (!uac) {
  230. sp_dbg_warn("connack find no peer!");
  231. sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL);
  232. return;
  233. }
  234. uac_process_connack(uac, epid, svc_id, conn_id);
  235. sp_ses_uac_dec_ref(uac); // unlock
  236. }
  237. static void mgr_process_redirect(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt)
  238. {
  239. sp_ses_uac_t *uac;
  240. mgr_lock(mgr);
  241. uac = mgr_find_uac(mgr, epid, svc_id, conn_id);
  242. if (uac)
  243. sp_ses_uac_inc_ref(uac); // lock
  244. mgr_unlock(mgr);
  245. if (!uac) {
  246. sp_dbg_warn("redirect cannot find uac!");
  247. return;
  248. }
  249. uac_process_redirect(uac, epid, svc_id, conn_id, redirect_ent_id, p_pkt);
  250. sp_ses_uac_dec_ref(uac); // unlock
  251. }
  252. static void mgr_process_ans(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int end, int tsx_id, iobuffer_t **ans_pkt)
  253. {
  254. sp_tsx_uac_t *tsx;
  255. mgr_lock(mgr);
  256. tsx = mgr_find_tsx_uac(mgr, epid, svc_id, conn_id, tsx_id);
  257. if (tsx)
  258. sp_tsx_uac_inc_ref(tsx); // lock
  259. mgr_unlock(mgr);
  260. if (!tsx) {
  261. sp_dbg_warn("ack find no peer!");
  262. //sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL);
  263. return;
  264. }
  265. tsx_uac_process_ans(tsx, end, ans_pkt);
  266. sp_tsx_uac_dec_ref(tsx); // unlock
  267. }
  268. static void mgr_process_errs(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int error)
  269. {
  270. sp_ses_uac_t *uac;
  271. mgr_lock(mgr);
  272. uac = mgr_find_uac(mgr, epid, svc_id, conn_id);
  273. if (uac)
  274. sp_ses_uac_inc_ref(uac); // @ lock uac no delete
  275. mgr_unlock(mgr);
  276. if (uac) {
  277. uac_process_errs(uac, error);
  278. sp_ses_uac_dec_ref(uac); // @ unlock
  279. }
  280. }
  281. static void mgr_process_errc(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int error)
  282. {
  283. sp_ses_uas_t *uas;
  284. mgr_lock(mgr);
  285. uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
  286. if (uas)
  287. sp_ses_uas_inc_ref(uas); // @ lock uac no delete
  288. mgr_unlock(mgr);
  289. if (uas) {
  290. uas_process_errc(uas, error);
  291. sp_ses_uas_dec_ref(uas); // @ unlock
  292. }
  293. }
  294. static void mgr_process_fins(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
  295. {
  296. mgr_process_errs(mgr, epid, svc_id, conn_id, Error_PeerClose);
  297. }
  298. static void __threadpool_mgr_process_conn(threadpool_t *threadpool, void *arg, int param1, int param2)
  299. {
  300. sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)arg;
  301. iobuffer_t *conn_pkt = (iobuffer_t*)param1;
  302. int epid;
  303. int svc_id;
  304. int conn_id;
  305. int redirect_target = 0;
  306. int rc;
  307. int read_state;
  308. sp_rsn_context_t from_rsn;
  309. sp_rsn_context_t rsn;
  310. iobuffer_read(conn_pkt, IOBUF_T_I4, &epid, 0);
  311. iobuffer_read(conn_pkt, IOBUF_T_I4, &svc_id, 0);
  312. iobuffer_read(conn_pkt, IOBUF_T_I4, &conn_id, 0);
  313. read_state = iobuffer_get_read_state(conn_pkt);
  314. sp_dbg_info("begin on_accept, from epid:%d, svc_id:%d, conn_id:%d", epid, svc_id, conn_id);
  315. iobuffer_read(conn_pkt, IOBUF_T_I4, &from_rsn.depth, 0);
  316. iobuffer_read(conn_pkt, IOBUF_T_I4, &from_rsn.original_type, 0);
  317. iobuffer_read(conn_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0);
  318. iobuffer_read(conn_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0);
  319. sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn);
  320. sp_svc_push_runserial_context(mgr->svc, &rsn);
  321. rc = mgr->cb.on_accept(mgr, epid, svc_id, conn_id, &conn_pkt, &redirect_target, mgr->cb.user_data);
  322. sp_svc_pop_runserial_context(mgr->svc);
  323. if (rc) {
  324. if (redirect_target) {
  325. sp_dbg_warn("conn from svc_id %d redirect to %d!", svc_id, redirect_target);
  326. iobuffer_restore_read_state(conn_pkt, read_state);
  327. iobuffer_write_head(conn_pkt, IOBUF_T_I4, &redirect_target, 0);
  328. sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_REDIRECT, conn_id, &conn_pkt);
  329. } else {
  330. iobuffer_t *pkt = iobuffer_create(-1, -1);
  331. iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
  332. sp_dbg_warn("conn from svc_id %d reject! rc: %d", svc_id, rc);
  333. sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
  334. if (pkt)
  335. iobuffer_dec_ref(pkt);
  336. }
  337. } else {
  338. sp_dbg_info("on accept ok!");
  339. }
  340. if (conn_pkt) {
  341. iobuffer_dec_ref(conn_pkt);
  342. }
  343. sp_ses_mgr_dec_ref(mgr); // @
  344. }
  345. static void mgr_process_conn(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, iobuffer_t **conn_pkt)
  346. {
  347. sp_ses_uas_t *uas;
  348. int err;
  349. sp_entity_t *ent;
  350. mgr_lock(mgr);
  351. uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
  352. mgr_unlock(mgr);
  353. ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, sp_svc_get_id(mgr->svc));
  354. if (ent) {
  355. if (ent->state == EntityState_Lost) {
  356. iobuffer_t *pkt = iobuffer_create(-1, -1);
  357. int rc = Error_Losted;
  358. iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
  359. sp_dbg_warn("reject connection because of lost state!");
  360. sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
  361. if (pkt)
  362. iobuffer_dec_ref(pkt);
  363. return;
  364. }
  365. } else {
  366. assert(0);// never go here
  367. }
  368. if (uas) { // already exist, duplicate
  369. iobuffer_t *pkt = iobuffer_create(-1, -1);
  370. int rc = Error_Duplication;
  371. iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
  372. sp_dbg_warn("conn duplicate!");
  373. sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
  374. if (pkt)
  375. iobuffer_dec_ref(pkt);
  376. return;
  377. }
  378. iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &conn_id, 0);
  379. iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &svc_id, 0);
  380. iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &epid, 0);
  381. sp_ses_mgr_inc_ref(mgr); // @
  382. err = threadpool_queue_workitem2(sp_svc_get_threadpool(mgr->svc), NULL, &__threadpool_mgr_process_conn, mgr, (int)*conn_pkt, 0);
  383. if (err)
  384. {
  385. iobuffer_t *pkt = iobuffer_create(-1, -1);
  386. int rc = Error_PeerReject;
  387. iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
  388. sp_ses_mgr_dec_ref(mgr); // @
  389. sp_dbg_warn("conn from svc_id %d reject!", svc_id);
  390. sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
  391. if (pkt)
  392. iobuffer_dec_ref(pkt);
  393. return;
  394. } else {
  395. *conn_pkt = NULL;
  396. }
  397. }
  398. static void mgr_process_info(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt)
  399. {
  400. sp_ses_uas_t *uas;
  401. mgr_lock(mgr);
  402. uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
  403. if (uas) {
  404. sp_ses_uas_inc_ref(uas); // @
  405. }
  406. mgr_unlock(mgr);
  407. if (!uas) {
  408. iobuffer_t *pkt = iobuffer_create(-1, -1);
  409. int rc = Error_NotExist;
  410. iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
  411. sp_dbg_warn("cannot find uas session!");
  412. sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
  413. if (pkt)
  414. iobuffer_dec_ref(pkt);
  415. return;
  416. }
  417. uas_process_info(uas, method_id, method_sig, info_pkt);
  418. sp_ses_uas_dec_ref(uas); // @
  419. }
  420. static void mgr_process_req(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt)
  421. {
  422. sp_ses_uas_t *uas;
  423. mgr_lock(mgr);
  424. uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
  425. if (uas) {
  426. sp_ses_uas_inc_ref(uas); // @
  427. }
  428. mgr_unlock(mgr);
  429. if (!uas) {
  430. iobuffer_t *pkt = iobuffer_create(-1, -1);
  431. int rc = Error_NotExist;
  432. iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
  433. sp_dbg_warn("cannot find uas session!");
  434. sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
  435. if (pkt)
  436. iobuffer_dec_ref(pkt);
  437. return;
  438. }
  439. uas_process_req(uas, tsx_id, method_id, method_sig, timeout, req_pkt);
  440. sp_ses_uas_dec_ref(uas); // @
  441. }
  442. static void mgr_process_finc(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
  443. {
  444. mgr_process_errc(mgr, epid, svc_id, conn_id, Error_PeerClose);
  445. }
  446. static int mgr_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
  447. {
  448. sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)user_data;
  449. int cmd_type;
  450. int tsx_id;
  451. int method_id;
  452. int method_sig;
  453. int timeout;
  454. int redirect_ent_id;
  455. int rc = 0;
  456. assert(SP_GET_PKT_TYPE(pkt_type) == SP_PKT_SES);
  457. cmd_type = SP_GET_TYPE(pkt_type);
  458. switch (cmd_type) {
  459. case SES_CMD_INFO:
  460. iobuffer_read(*p_pkt, IOBUF_T_I4, &method_id, NULL);
  461. iobuffer_read(*p_pkt, IOBUF_T_I4, &method_sig, NULL);
  462. mgr_process_info(mgr, epid, svc_id, pkt_id, method_id, method_sig, p_pkt);
  463. break;
  464. case SES_CMD_CONN:
  465. mgr_process_conn(mgr, epid, svc_id, pkt_id, p_pkt);
  466. break;
  467. case SES_CMD_CONNACK:
  468. mgr_process_connack(mgr, epid, svc_id, pkt_id);
  469. break;
  470. case SES_CMD_FINC:
  471. mgr_process_finc(mgr, epid, svc_id, pkt_id);
  472. break;
  473. case SES_CMD_FINS:
  474. mgr_process_fins(mgr, epid, svc_id, pkt_id);
  475. break;
  476. case SES_CMD_ERRC:
  477. mgr_process_errc(mgr, epid, svc_id, pkt_id, Error_NetBroken);
  478. break;
  479. case SES_CMD_ERRS:
  480. if (p_pkt != NULL && *p_pkt != NULL)
  481. {
  482. iobuffer_read(*p_pkt, IOBUF_T_I4, &rc, NULL);
  483. iobuffer_dec_ref(*p_pkt);
  484. *p_pkt = NULL;
  485. }
  486. else
  487. rc = Error_NetBroken;
  488. mgr_process_errs(mgr, epid, svc_id, pkt_id, rc);
  489. break;
  490. case SES_CMD_REQ:
  491. iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL);
  492. iobuffer_read(*p_pkt, IOBUF_T_I4, &method_id, NULL);
  493. iobuffer_read(*p_pkt, IOBUF_T_I4, &method_sig, NULL);
  494. iobuffer_read(*p_pkt, IOBUF_T_I4, &timeout, NULL);
  495. mgr_process_req(mgr, epid, svc_id, pkt_id, tsx_id, method_id, method_sig, timeout, p_pkt);
  496. break;
  497. case SES_CMD_TMPANS:
  498. iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL);
  499. mgr_process_ans(mgr, epid, svc_id, pkt_id, 0, tsx_id, p_pkt);
  500. break;
  501. case SES_CMD_ANS:
  502. iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL);
  503. mgr_process_ans(mgr, epid, svc_id, pkt_id, 1, tsx_id, p_pkt);
  504. break;
  505. case SES_CMD_REDIRECT:
  506. iobuffer_read(*p_pkt, IOBUF_T_I4, &redirect_ent_id, NULL);
  507. mgr_process_redirect(mgr, epid, svc_id, pkt_id, redirect_ent_id, p_pkt);
  508. break;
  509. default:
  510. sp_dbg_warn("recv unknown ses pkt!");
  511. assert(0);
  512. break;
  513. }
  514. return FALSE;
  515. }
  516. static void mgr_on_sys(sp_svc_t *svc,int epid, int state, void *user_data)
  517. {
  518. sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)user_data;
  519. if (state == BUS_STATE_OFF) {
  520. int i;
  521. mgr_lock(mgr);
  522. for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
  523. sp_ses_uas_t *tpos;
  524. struct hlist_node *pos, *n;
  525. hlist_for_each_entry_safe(tpos, pos, n, &mgr->uas_buckets[i], sp_ses_uas_t, hentry) {
  526. if (epid == tpos->remote_epid) {
  527. uas_process_errc(tpos, Error_NetBroken);
  528. }
  529. }
  530. }
  531. for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
  532. sp_ses_uac_t *tpos;
  533. struct hlist_node *pos, *n;
  534. hlist_for_each_entry_safe(tpos, pos, n, &mgr->uac_buckets[i], sp_ses_uac_t, hentry) {
  535. if (epid == tpos->remote_epid) {
  536. uac_process_errs(tpos, Error_NetBroken);
  537. }
  538. }
  539. }
  540. mgr_unlock(mgr);
  541. }
  542. }
  543. int sp_ses_mgr_create(sp_svc_t *svc, sp_ses_mgr_callback_t *cb, sp_ses_mgr_t **p_mgr)
  544. {
  545. sp_ses_mgr_t *mgr = MALLOC_T(sp_ses_mgr_t);
  546. int i;
  547. mgr->ses_cnt = 0;
  548. mgr->svc = svc;
  549. memcpy(&mgr->cb, cb, sizeof(sp_ses_mgr_callback_t));
  550. REF_COUNT_INIT(&mgr->ref_cnt);
  551. for (i = 0;i < CONN_BUCKET_SIZE; ++i) {
  552. INIT_HLIST_HEAD(&mgr->uac_buckets[i]);
  553. INIT_HLIST_HEAD(&mgr->uas_buckets[i]);
  554. }
  555. for (i = 0; i < TSX_BUCKET_SIZE; ++i) {
  556. INIT_HLIST_HEAD(&mgr->uac_tsx_buckets[i]);
  557. INIT_HLIST_HEAD(&mgr->uas_tsx_buckets[i]);
  558. }
  559. InitializeCriticalSection(&mgr->lock);
  560. *p_mgr = mgr;
  561. return 0;
  562. }
  563. void sp_ses_mgr_destroy(sp_ses_mgr_t *mgr)
  564. {
  565. sp_ses_mgr_dec_ref(mgr);
  566. }
  567. int sp_ses_mgr_cancel_all(sp_ses_mgr_t *mgr)
  568. {
  569. int i;
  570. mgr_lock(mgr);
  571. for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
  572. sp_ses_uas_t *tpos;
  573. struct hlist_node *pos;
  574. hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[i], sp_ses_uas_t, hentry) {
  575. sp_ses_uas_close(tpos);
  576. }
  577. }
  578. for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
  579. sp_ses_uac_t *tpos;
  580. struct hlist_node *pos;
  581. hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[i], sp_ses_uac_t, hentry) {
  582. sp_ses_uac_close(tpos);
  583. }
  584. }
  585. mgr_unlock(mgr);
  586. return 0;
  587. }
  588. int sp_ses_mgr_get_conn_cnt(sp_ses_mgr_t *mgr)
  589. {
  590. return mgr->ses_cnt;
  591. }
  592. int sp_ses_mgr_start(sp_ses_mgr_t *mgr)
  593. {
  594. int rc;
  595. rc = sp_svc_add_pkt_handler(mgr->svc, (int)mgr, SP_PKT_SES, &mgr_on_pkt, mgr);
  596. if (rc == 0)
  597. rc = sp_svc_add_sys_handler(mgr->svc, (int)mgr, &mgr_on_sys, mgr);
  598. return rc;
  599. }
  600. int sp_ses_mgr_stop(sp_ses_mgr_t *mgr)
  601. {
  602. int rc;
  603. rc = sp_svc_remove_pkt_handler(mgr->svc, (int)mgr, SP_PKT_SES);
  604. if (rc == 0)
  605. rc = sp_svc_remove_sys_handler(mgr->svc, (int)mgr);
  606. return rc;
  607. }
  608. sp_svc_t *sp_ses_mgr_get_svc(sp_ses_mgr_t *mgr)
  609. {
  610. return mgr->svc;
  611. }
  612. static void __sp_ses_mgr_destroy(sp_ses_mgr_t *mgr)
  613. {
  614. if (mgr->cb.on_destroy)
  615. mgr->cb.on_destroy(mgr, mgr->cb.user_data);
  616. assert(mgr->ses_cnt == 0);
  617. DeleteCriticalSection(&mgr->lock);
  618. free(mgr);
  619. }
  620. IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_mgr, sp_ses_mgr_t, ref_cnt, __sp_ses_mgr_destroy)
  621. static __inline void uac_lock(sp_ses_uac_t *ses)
  622. {
  623. spinlock_enter(&ses->lock, -1);
  624. }
  625. static __inline void uac_unlock(sp_ses_uac_t *ses)
  626. {
  627. spinlock_leave(&ses->lock);
  628. }
  629. static void uac_process_connack(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id)
  630. {
  631. int trigger = 0;
  632. uac_lock(uac);
  633. if (uac->state == SP_SES_STATE_CONNECTING) {
  634. uac->state = SP_SES_STATE_CONNECTED;
  635. uac->state_begin_time = y2k_time_now();
  636. if (uac->op_timer) {
  637. sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
  638. uac->op_timer = NULL;
  639. }
  640. trigger++;
  641. } else {
  642. sp_dbg_warn("current state is not connecting!");
  643. sp_svc_post(uac->mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL);
  644. }
  645. if (trigger)
  646. uac_trigger(uac, 0, 1);
  647. uac_unlock(uac);
  648. }
  649. static void uac_process_redirect(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt)
  650. {
  651. uac_lock(uac);
  652. if (uac->state == SP_SES_STATE_CONNECTING) {
  653. ++uac->redirect_cnt;
  654. if (uac->redirect_cnt >= MAX_REDIRECT) {
  655. sp_dbg_warn("uac has exceed max_direct value, maybe redirection enter end loop!");
  656. if (uac->op_timer) {
  657. sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
  658. uac->op_timer = NULL;
  659. }
  660. uac->state = SP_SES_STATE_ERROR;
  661. uac->state_begin_time = y2k_time_now();
  662. uac_trigger(uac, Error_NetBroken, 1);
  663. } else {
  664. sp_entity_t *ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, redirect_ent_id);
  665. if (ent->state == EntityState_Killed || ent->state == EntityState_Lost || ent->state == EntityState_NoStart) {
  666. sp_dbg_warn("redirected failed! redirect ent state invalid! redirect id:%d", redirect_ent_id);
  667. if (uac->op_timer) {
  668. sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
  669. uac->op_timer = NULL;
  670. }
  671. uac->state = SP_SES_STATE_ERROR;
  672. uac->state_begin_time = y2k_time_now();
  673. uac_trigger(uac, Error_InvalidState, 1);
  674. } else {
  675. uac->remote_epid = ent->mod->cfg->idx;
  676. uac->remote_svc_id = ent->cfg->idx;
  677. sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_CONN, conn_id, p_pkt);
  678. sp_dbg_info("svc_post, epid:%d, svc_id:%d", uac->remote_epid, uac->remote_svc_id);
  679. }
  680. }
  681. }
  682. uac_unlock(uac);
  683. }
  684. static void uac_process_errs(sp_ses_uac_t *uac, int error)
  685. {
  686. int trigger = 0;
  687. int connect = 0;
  688. uac_lock(uac);
  689. if (uac->state == SP_SES_STATE_INIT) {
  690. uac->state = SP_SES_STATE_ERROR;
  691. uac->state_begin_time = y2k_time_now();
  692. uac->error = error;
  693. } else if (uac->state == SP_SES_STATE_CONNECTING) {
  694. trigger++;
  695. uac->state = SP_SES_STATE_ERROR;
  696. uac->state_begin_time = y2k_time_now();
  697. uac->error = error;
  698. connect ++;
  699. if (uac->op_timer) {
  700. sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
  701. uac->op_timer = NULL;
  702. }
  703. } else if (uac->state == SP_SES_STATE_CONNECTED) {
  704. sp_tsx_uac_t *pos, *n;
  705. uac->state = SP_SES_STATE_ERROR;
  706. uac->state_begin_time = y2k_time_now();
  707. uac->error = error;
  708. list_for_each_entry_safe(pos, n, &uac->tsx_list, sp_tsx_uac_t, entry) {
  709. tsx_uac_process_errs(pos, error);
  710. }
  711. trigger++;
  712. }
  713. if (trigger)
  714. uac_trigger(uac, error, connect);
  715. uac_unlock(uac);
  716. }
  717. static void __threadpool_uac_trigger_close(threadpool_t *threadpool, void *arg, int param1, int param2)
  718. {
  719. sp_ses_uac_t *uac = (sp_ses_uac_t*)arg;
  720. sp_ses_mgr_t *mgr = uac->mgr;
  721. int error = param1;
  722. sp_uid_t rsn = sp_svc_new_runserial(mgr->svc);
  723. sp_rsn_context_t rsn_ctx;
  724. sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_FRAMEWORK, &rsn_ctx);
  725. sp_svc_push_runserial_context(mgr->svc, &rsn_ctx);
  726. uac->cb.on_close(uac, error, uac->cb.user_data);
  727. sp_ses_uac_dec_ref(uac); // @
  728. sp_svc_pop_runserial_context(mgr->svc);
  729. }
  730. static void uac_trigger(sp_ses_uac_t *uac, int error, int connect)
  731. {
  732. if (connect) {
  733. uac->cb.on_connect(uac, error, uac->cb.user_data);
  734. sp_ses_uac_dec_ref(uac); // @
  735. } else {
  736. if (sp_iom_get_poll_thread_id(sp_svc_get_iom(uac->mgr->svc)) == GetCurrentThreadId()) { //
  737. int rc;
  738. sp_ses_uac_inc_ref(uac); // @
  739. rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uac->mgr->svc), NULL, &__threadpool_uac_trigger_close, uac, error, 0);
  740. if (rc != 0) {
  741. sp_ses_uac_dec_ref(uac); // @
  742. sp_dbg_warn("thread pool queue uac_trirget_close event failed!");
  743. }
  744. } else {
  745. uac->cb.on_close(uac, error, uac->cb.user_data);
  746. }
  747. }
  748. }
  749. static void uac_on_timer(timer_queue_t *q, timer_entry *timer, int err)
  750. {
  751. sp_ses_uac_t *uac = (sp_ses_uac_t*)timer->user_data;
  752. if (!err && uac->op_timer == timer && uac->state == SP_SES_STATE_CONNECTING) {
  753. int trigger = 0;
  754. uac_lock(uac);
  755. if (uac->op_timer == timer) {
  756. if (uac->state == SP_SES_STATE_CONNECTING) {
  757. uac->state = SP_SES_STATE_ERROR; // time out
  758. uac->state_begin_time = y2k_time_now();
  759. trigger ++;
  760. }
  761. uac->op_timer = NULL;
  762. }
  763. if (trigger)
  764. uac_trigger(uac, Error_TimeOut, 1);
  765. uac_unlock(uac);
  766. }
  767. sp_ses_uac_dec_ref(uac); // @2
  768. free(timer);
  769. }
  770. int sp_ses_uac_create(sp_ses_mgr_t *mgr, int remote_epid, int remote_svc_id, sp_ses_uac_callback *cb, sp_ses_uac_t **p_ses)
  771. {
  772. sp_ses_uac_t *ses;
  773. int slot;
  774. if (!cb || !cb->on_connect)
  775. return Error_Param;
  776. ses = MALLOC_T(sp_ses_uac_t);
  777. ses->mgr = mgr;
  778. ses->conn_id = sp_env_new_id(sp_get_env());
  779. ses->remote_svc_id = remote_svc_id;
  780. ses->remote_epid = remote_epid;
  781. ses->state = SP_SES_STATE_INIT;
  782. ses->state_begin_time = y2k_time_now();
  783. ses->error = 0;
  784. ses->op_timer = NULL;
  785. ses->tsx_cnt = 0;
  786. ses->begin_time = y2k_time_now();
  787. ses->redirect_cnt = 0;
  788. memcpy(&ses->cb, cb, sizeof(sp_ses_uac_callback));
  789. INIT_HLIST_NODE(&ses->hentry);
  790. REF_COUNT_INIT(&ses->ref_cnt);
  791. spinlock_init(&ses->lock);
  792. INIT_LIST_HEAD(&ses->tsx_list);
  793. sp_ses_mgr_inc_ref(mgr);
  794. slot = ses->conn_id % CONN_BUCKET_SIZE;
  795. sp_ses_uac_inc_ref(ses);
  796. mgr_lock(mgr);
  797. hlist_add_head(&ses->hentry, &mgr->uac_buckets[slot]);
  798. mgr->ses_cnt++;
  799. mgr_unlock(mgr);
  800. *p_ses = ses;
  801. sp_dbg_info("ses uac created!");
  802. return 0;
  803. }
  804. void sp_ses_uac_destroy(sp_ses_uac_t *uac)
  805. {
  806. assert(uac->state == SP_SES_STATE_TERM);
  807. mgr_lock(uac->mgr);
  808. uac->mgr->ses_cnt--;
  809. hlist_del_init(&uac->hentry);
  810. mgr_unlock(uac->mgr);
  811. sp_ses_uac_dec_ref(uac);
  812. sp_ses_uac_dec_ref(uac);
  813. }
  814. int sp_ses_uac_async_connect(sp_ses_uac_t *uac, int timeout, iobuffer_t **conn_pkt)
  815. {
  816. int rc;
  817. if (uac->state != SP_SES_STATE_INIT)
  818. return Error_Unexpect;
  819. sp_ses_uac_inc_ref(uac);
  820. uac_lock(uac);
  821. if (uac->state == SP_SES_STATE_INIT) {
  822. sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc);
  823. sp_ses_uac_inc_ref(uac); // @1
  824. if (rsn_ctx) {
  825. iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
  826. iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
  827. iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
  828. iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
  829. } else {
  830. unsigned __int64 rsn = 0;
  831. int t = 0;
  832. iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn, 0);
  833. iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn, 0);
  834. iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &t, 0);
  835. iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &t, 0);
  836. }
  837. rc = sp_svc_send(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_CONN, uac->conn_id, conn_pkt);
  838. sp_dbg_info("svc_send, epid:%d, svc_id:%d", uac->remote_epid, uac->remote_svc_id);
  839. if (rc == 0) {
  840. if (timeout >= 0) {
  841. uac->op_timer = MALLOC_T(timer_entry);
  842. uac->op_timer->cb = &uac_on_timer;
  843. uac->op_timer->user_data = uac;
  844. sp_ses_uac_inc_ref(uac); // @2
  845. rc = sp_iom_schedule_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, (unsigned int)timeout);
  846. if (rc != 0) {
  847. iobuffer_pop_count(*conn_pkt, 24);
  848. sp_ses_uac_dec_ref(uac); // @2
  849. sp_ses_uac_dec_ref(uac); // @1
  850. free(uac->op_timer);
  851. uac->op_timer = NULL;
  852. sp_dbg_warn("post conn, create timer failed!");
  853. }
  854. }
  855. } else {
  856. iobuffer_pop_count(*conn_pkt, 24);
  857. sp_ses_uac_dec_ref(uac); // @1
  858. sp_dbg_warn("uac issue async connect failed, post msg failed, may be remote party offline!");
  859. rc = Error_IO;
  860. }
  861. } else {
  862. sp_dbg_warn("uac state is not INIT !");
  863. rc = Error_Unexpect;
  864. }
  865. uac->state = rc ? SP_SES_STATE_ERROR : SP_SES_STATE_CONNECTING;
  866. uac->state_begin_time = y2k_time_now();
  867. uac_unlock(uac);
  868. sp_ses_uac_dec_ref(uac);
  869. return rc;
  870. }
  871. int sp_ses_uac_close(sp_ses_uac_t *uac)
  872. {
  873. int rc = 0;
  874. int trigger = 0;
  875. int connected = 0;
  876. sp_dbg_info("sp_ses_uac_close invoked!");
  877. sp_ses_uac_inc_ref(uac);
  878. uac_lock(uac);
  879. if (uac->state == SP_SES_STATE_INIT) {
  880. uac->state = SP_SES_STATE_TERM;
  881. uac->state_begin_time = y2k_time_now();
  882. } else if (uac->state == SP_SES_STATE_CONNECTING) {
  883. trigger++;
  884. uac->state = SP_SES_STATE_TERM;
  885. uac->state_begin_time = y2k_time_now();
  886. if (uac->op_timer) {
  887. sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
  888. uac->op_timer = NULL;
  889. }
  890. connected++;
  891. } else if (uac->state == SP_SES_STATE_CONNECTED) {
  892. sp_tsx_uac_t *pos, *n;
  893. uac->state = SP_SES_STATE_TERM;
  894. uac->state_begin_time = y2k_time_now();
  895. list_for_each_entry_safe(pos, n, &uac->tsx_list, sp_tsx_uac_t, entry) {
  896. sp_tsx_uac_close(pos);
  897. }
  898. trigger++;
  899. } else if (uac->state == SP_SES_STATE_ERROR) {
  900. uac->state = SP_SES_STATE_TERM;
  901. uac->state_begin_time = y2k_time_now();
  902. } else {
  903. sp_dbg_warn("uac->state = %d", uac->state);
  904. rc = Error_Duplication;
  905. }
  906. if (trigger)
  907. uac_trigger(uac, Error_Closed, connected);
  908. uac_unlock(uac);
  909. if (rc == 0) {
  910. sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_FINC, uac->conn_id, NULL);
  911. }
  912. sp_ses_uac_dec_ref(uac);
  913. return rc;
  914. }
  915. int sp_ses_uac_get_state(sp_ses_uac_t *uac)
  916. {
  917. return uac->state;
  918. }
  919. int sp_ses_uac_send_info(sp_ses_uac_t *uac, int method_id, int method_sig, iobuffer_t **info_pkt)
  920. {
  921. int rc;
  922. if (uac->state == SP_SES_STATE_CONNECTED) {
  923. sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc);
  924. if (rsn_ctx) {
  925. iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
  926. iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
  927. iobuffer_write_head(*info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
  928. iobuffer_write_head(*info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
  929. } else {
  930. unsigned __int64 rsn = 0;
  931. int t = 0;
  932. iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn, 0);
  933. iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn, 0);
  934. iobuffer_write_head(*info_pkt, IOBUF_T_I4, &t, 0);
  935. iobuffer_write_head(*info_pkt, IOBUF_T_I4, &t, 0);
  936. }
  937. iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_sig, 0);
  938. iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_id, 0);
  939. rc = sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_INFO, uac->conn_id, info_pkt);
  940. } else {
  941. sp_dbg_info("send info failed, uac is not in connected state!");
  942. rc = Error_IO;
  943. }
  944. return rc;
  945. }
  946. int sp_ses_uac_get_remote_epid(sp_ses_uac_t *uac)
  947. {
  948. return uac->remote_epid;
  949. }
  950. int sp_ses_uac_get_remote_svc_id(sp_ses_uac_t *uac)
  951. {
  952. return uac->remote_svc_id;
  953. }
  954. int sp_ses_uac_get_conn_id(sp_ses_uac_t *uac)
  955. {
  956. return uac->conn_id;
  957. }
  958. sp_ses_mgr_t *sp_ses_uac_get_mgr(sp_ses_uac_t *uac)
  959. {
  960. return uac->mgr;
  961. }
  962. int sp_ses_uac_get_tsx_cnt(sp_ses_uac_t *uac)
  963. {
  964. return uac->tsx_cnt;
  965. }
  966. static void __sp_ses_uac_destroy(sp_ses_uac_t *uac)
  967. {
  968. if (uac->cb.on_destroy)
  969. uac->cb.on_destroy(uac, uac->cb.user_data);
  970. sp_ses_mgr_dec_ref(uac->mgr);
  971. free(uac);
  972. sp_dbg_info("ses uac destroyed!");
  973. }
  974. IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_uac, sp_ses_uac_t, ref_cnt, __sp_ses_uac_destroy)
  975. static __inline void tsx_uac_lock(sp_tsx_uac_t *tsx)
  976. {
  977. spinlock_enter(&tsx->lock, -1);
  978. }
  979. static __inline void tsx_uac_unlock(sp_tsx_uac_t *tsx)
  980. {
  981. spinlock_leave(&tsx->lock);
  982. }
  983. static void tsx_uac_trigger(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt)
  984. {
  985. iobuffer_t *pkt = ans_pkt ? *ans_pkt : NULL;
  986. tsx->cb.on_ans(tsx, error, end, ans_pkt, tsx->cb.user_data);
  987. if (error || end)
  988. sp_tsx_uac_dec_ref(tsx); // @
  989. }
  990. static void tsx_uac_on_timer(timer_queue_t *q, timer_entry *timer, int err)
  991. {
  992. sp_tsx_uac_t *tsx = (sp_tsx_uac_t*)timer->user_data;
  993. if (!err && tsx->op_timer == timer && tsx->state == SP_TSX_UAC_STATE_REQ) {
  994. int trigger = 0;
  995. tsx_uac_lock(tsx);
  996. if (tsx->op_timer == timer) {
  997. if (tsx->state == SP_TSX_UAC_STATE_REQ) {
  998. tsx->state = SP_TSX_UAC_STATE_ERROR; // time out
  999. trigger ++;
  1000. }
  1001. tsx->op_timer = NULL;
  1002. }
  1003. if (trigger)
  1004. tsx_uac_trigger(tsx, Error_TimeOut, 1, NULL);
  1005. tsx_uac_unlock(tsx);
  1006. }
  1007. sp_tsx_uac_dec_ref(tsx); // @2
  1008. free(timer);
  1009. }
  1010. static void tsx_uac_process_ans(sp_tsx_uac_t *tsx, int end, iobuffer_t **ans_pkt)
  1011. {
  1012. int trigger = 0;
  1013. tsx_uac_lock(tsx);
  1014. if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) {
  1015. if (tsx->op_timer) {
  1016. sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1);
  1017. tsx->op_timer = NULL;
  1018. }
  1019. if (end) {
  1020. tsx->state = SP_TSX_UAC_STATE_ANS;
  1021. } else if (tsx->state == SP_TSX_UAC_STATE_INIT) {
  1022. tsx->state = SP_TSX_UAC_STATE_TMPANS;
  1023. }
  1024. trigger++;
  1025. } else {
  1026. sp_dbg_warn("tsx uac current state cannot recv ack!");
  1027. sp_svc_post(tsx->uac->mgr->svc, tsx->uac->remote_epid, tsx->uac->remote_svc_id, SP_PKT_SES|SES_CMD_ERRC, tsx->uac->conn_id, NULL);
  1028. }
  1029. if (trigger)
  1030. tsx_uac_trigger(tsx, 0, end, ans_pkt);
  1031. tsx_uac_unlock(tsx);
  1032. }
  1033. static void tsx_uac_process_errs(sp_tsx_uac_t *tsx, int error)
  1034. {
  1035. int trigger = 0;
  1036. sp_tsx_uac_inc_ref(tsx);
  1037. tsx_uac_lock(tsx);
  1038. if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) {
  1039. trigger++;
  1040. if (tsx->op_timer) {
  1041. sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1);
  1042. tsx->op_timer = NULL;
  1043. }
  1044. }
  1045. if (tsx->state != SP_TSX_UAC_STATE_TERM)
  1046. tsx->state = SP_TSX_UAC_STATE_ERROR;
  1047. if (trigger)
  1048. tsx_uac_trigger(tsx, error, 1, NULL);
  1049. tsx_uac_unlock(tsx);
  1050. sp_tsx_uac_dec_ref(tsx);
  1051. }
  1052. int sp_tsx_uac_create(sp_ses_uac_t *uac, int tsx_id, int method_id, int method_sig, sp_tsx_uac_callback *cb, sp_tsx_uac_t **p_tsx)
  1053. {
  1054. sp_tsx_uac_t *tsx;
  1055. unsigned int slot;
  1056. if (!cb || !cb->on_ans)
  1057. return Error_Param;
  1058. switch ( uac->state ) {
  1059. case SP_SES_STATE_INIT:
  1060. case SP_SES_STATE_CONNECTING:
  1061. return Error_NotInit;
  1062. case SP_SES_STATE_TERM:
  1063. case SP_SES_STATE_ERROR:
  1064. return Error_NetBroken;
  1065. }
  1066. tsx = MALLOC_T(sp_tsx_uac_t);
  1067. tsx->state = SP_TSX_UAC_STATE_INIT;
  1068. tsx->tsx_id = tsx_id;
  1069. tsx->method_id = method_id;
  1070. tsx->method_sig = method_sig;
  1071. tsx->uac = uac;
  1072. tsx->op_timer = NULL;
  1073. memcpy(&tsx->cb, cb, sizeof(sp_tsx_uac_callback));
  1074. spinlock_init(&tsx->lock);
  1075. INIT_HLIST_NODE(&tsx->hentry);
  1076. REF_COUNT_INIT(&tsx->ref_cnt);
  1077. tsx->strand = strand_create();
  1078. sp_ses_uac_inc_ref(uac);
  1079. sp_tsx_uac_inc_ref(tsx);
  1080. uac_lock(uac);
  1081. list_add_tail(&tsx->entry, &uac->tsx_list);
  1082. uac->tsx_cnt++;
  1083. uac_unlock(uac);
  1084. slot = jhash_3words(uac->remote_svc_id, uac->conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
  1085. sp_tsx_uac_inc_ref(tsx);
  1086. mgr_lock(uac->mgr);
  1087. hlist_add_head(&tsx->hentry, &uac->mgr->uac_tsx_buckets[slot]);
  1088. mgr_unlock(uac->mgr);
  1089. *p_tsx = tsx;
  1090. sp_dbg_info("tsx uac created!");
  1091. return 0;
  1092. }
  1093. void sp_tsx_uac_destroy(sp_tsx_uac_t *tsx)
  1094. {
  1095. sp_ses_uac_t *uac = tsx->uac;
  1096. assert(tsx->state == SP_TSX_UAC_STATE_TERM);
  1097. uac_lock(uac);
  1098. uac->tsx_cnt--;
  1099. list_del_init(&tsx->entry);
  1100. uac_unlock(uac);
  1101. sp_tsx_uac_dec_ref(tsx);
  1102. mgr_lock(uac->mgr);
  1103. hlist_del_init(&tsx->hentry);
  1104. mgr_unlock(uac->mgr);
  1105. sp_tsx_uac_dec_ref(tsx);
  1106. sp_tsx_uac_dec_ref(tsx);
  1107. }
  1108. int sp_tsx_uac_async_req(sp_tsx_uac_t *tsx, int timeout, iobuffer_t **req_pkt)
  1109. {
  1110. int rc;
  1111. sp_ses_uac_t *uac;
  1112. if (tsx->state != SP_TSX_UAC_STATE_INIT)
  1113. return Error_Unexpect;
  1114. uac = tsx->uac;
  1115. if (uac->state != SP_SES_STATE_CONNECTED)
  1116. return Error_NetBroken;
  1117. tsx_uac_lock(tsx);
  1118. if (tsx->state == SP_TSX_UAC_STATE_INIT) {
  1119. sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc);
  1120. if (rsn_ctx) {
  1121. iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
  1122. iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
  1123. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
  1124. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
  1125. } else {
  1126. unsigned __int64 rsn = 0;
  1127. int t = 0;
  1128. iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn, 0);
  1129. iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn, 0);
  1130. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &t, 0);
  1131. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &t, 0);
  1132. }
  1133. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &timeout, 0);
  1134. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->method_sig, 0);
  1135. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->method_id, 0);
  1136. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->tsx_id, 0);
  1137. sp_tsx_uac_inc_ref(tsx); // @1
  1138. rc = sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_REQ, uac->conn_id, req_pkt);
  1139. if (rc == 0) {
  1140. if (timeout >= 0) {
  1141. tsx->op_timer = MALLOC_T(timer_entry);
  1142. tsx->op_timer->cb = &tsx_uac_on_timer;
  1143. tsx->op_timer->user_data = tsx;
  1144. sp_tsx_uac_inc_ref(tsx); // @2
  1145. rc = sp_iom_schedule_timer(sp_svc_get_iom(uac->mgr->svc), tsx->op_timer, (unsigned int)timeout);
  1146. if (rc != 0) {
  1147. sp_tsx_uac_dec_ref(tsx); // @2
  1148. sp_tsx_uac_dec_ref(tsx); // @1
  1149. free(tsx->op_timer);
  1150. tsx->op_timer = NULL;
  1151. sp_dbg_warn("uac tsx async_req, create timer failed!");
  1152. }
  1153. }
  1154. } else {
  1155. sp_tsx_uac_dec_ref(tsx); // @1
  1156. sp_dbg_warn("tsx uac issue async req failed, may be remote party offline!");
  1157. rc = Error_IO;
  1158. }
  1159. } else {
  1160. rc = Error_Unexpect;
  1161. sp_dbg_warn("tsx uac state is not INIT!");
  1162. }
  1163. tsx->state = rc ? SP_TSX_UAC_STATE_ERROR : SP_TSX_UAC_STATE_REQ;
  1164. tsx_uac_unlock(tsx);
  1165. return rc;
  1166. }
  1167. int sp_tsx_uac_close(sp_tsx_uac_t *tsx)
  1168. {
  1169. int trigger = 0;
  1170. tsx_uac_lock(tsx);
  1171. if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) {
  1172. trigger++;
  1173. if (tsx->op_timer) {
  1174. sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1);
  1175. tsx->op_timer = NULL;
  1176. }
  1177. }
  1178. if (tsx->state != SP_TSX_UAC_STATE_TERM)
  1179. tsx->state = SP_TSX_UAC_STATE_TERM;
  1180. if (trigger)
  1181. tsx_uac_trigger(tsx, Error_Closed, 1, NULL);
  1182. tsx_uac_unlock(tsx);
  1183. return 0;
  1184. }
  1185. sp_ses_uac_t *sp_tsx_uac_get_session(sp_tsx_uac_t *tsx)
  1186. {
  1187. return tsx->uac;
  1188. }
  1189. int sp_tsx_uac_get_state(sp_tsx_uac_t *tsx)
  1190. {
  1191. return tsx->state;
  1192. }
  1193. int sp_tsx_uac_get_id(sp_tsx_uac_t *tsx)
  1194. {
  1195. return tsx->tsx_id;
  1196. }
  1197. static void __sp_tsx_uac_destroy(sp_tsx_uac_t *tsx)
  1198. {
  1199. if (tsx->cb.on_destroy)
  1200. tsx->cb.on_destroy(tsx, tsx->cb.user_data);
  1201. sp_ses_uac_dec_ref(tsx->uac);
  1202. strand_destroy(tsx->strand);
  1203. free(tsx);
  1204. sp_dbg_info("tsx uac destroyed!");
  1205. }
  1206. IMPLEMENT_REF_COUNT_MT_STATIC(sp_tsx_uac, sp_tsx_uac_t, ref_cnt, __sp_tsx_uac_destroy)
  1207. static __inline void uas_lock(sp_ses_uas_t *uas)
  1208. {
  1209. spinlock_enter(&uas->lock, -1);
  1210. }
  1211. static __inline void uas_unlock(sp_ses_uas_t *uas)
  1212. {
  1213. spinlock_leave(&uas->lock);
  1214. }
  1215. static const void* method_strand_getkey(const void *obj)
  1216. {
  1217. method_strand_t *ms = (method_strand_t *)obj;
  1218. return (const void*)ms->type;
  1219. }
  1220. static unsigned int method_strand_hash(const void *key)
  1221. {
  1222. return (unsigned int)key;
  1223. }
  1224. static int method_strand_cmpkey(const void *key_x, const void *key_y)
  1225. {
  1226. return (int)key_x - (int)key_y;
  1227. }
  1228. static __inline method_strand_t* create_method_strand(int type)
  1229. {
  1230. method_strand_t *ms = MALLOC_T(method_strand_t);
  1231. ms->strand = strand_create();
  1232. ms->type = type;
  1233. INIT_HLIST_NODE(&ms->node);
  1234. return ms;
  1235. }
  1236. static __inline void delete_method_strand(method_strand_t *ms)
  1237. {
  1238. strand_destroy(ms->strand);
  1239. free(ms);
  1240. }
  1241. static void uas_process_errc(sp_ses_uas_t *uas, int error)
  1242. {
  1243. int trigger = 0;
  1244. uas_lock(uas);
  1245. if (uas->state == SP_SES_STATE_INIT) {
  1246. trigger++;
  1247. uas->state = SP_SES_STATE_ERROR;
  1248. uas->state_begin_time = y2k_time_now();
  1249. uas->error = error;
  1250. } else if (uas->state == SP_SES_STATE_CONNECTED) {
  1251. trigger++;
  1252. uas->state = SP_SES_STATE_ERROR;
  1253. uas->state_begin_time = y2k_time_now();
  1254. uas->error = error;
  1255. }
  1256. uas_unlock(uas);
  1257. if (trigger)
  1258. uas_trigger(uas, error);
  1259. }
  1260. static void __threadpool_uas_trigger(threadpool_t *threadpool, void *arg, int param1, int param2)
  1261. {
  1262. sp_ses_uas_t *uas = (sp_ses_uas_t*)arg;
  1263. sp_ses_mgr_t *mgr = uas->mgr;
  1264. int error = param1;
  1265. sp_uid_t rsn = sp_svc_new_runserial(mgr->svc);
  1266. sp_rsn_context_t rsn_ctx;
  1267. sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_FRAMEWORK, &rsn_ctx);
  1268. sp_svc_push_runserial_context(mgr->svc, &rsn_ctx);
  1269. if (uas->cb.on_close) {
  1270. if (error != Error_Closed) {
  1271. sp_ses_uas_close(uas);
  1272. }
  1273. uas->cb.on_close(uas, error, uas->cb.user_data);
  1274. }
  1275. sp_ses_uas_dec_ref(uas); // @
  1276. sp_svc_pop_runserial_context(mgr->svc);
  1277. }
  1278. static void uas_trigger(sp_ses_uas_t *uas, int error)
  1279. {
  1280. int rc;
  1281. sp_ses_uas_inc_ref(uas); // @
  1282. rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), uas->strand, &__threadpool_uas_trigger, uas, error, 0);
  1283. if (rc != 0) {
  1284. sp_ses_uas_dec_ref(uas); // @
  1285. sp_dbg_warn("uas_trigger, queue work item failed!");
  1286. }
  1287. }
  1288. static strand_t *uas_get_strand(sp_ses_uas_t *uas, int method_id)
  1289. {
  1290. method_strand_t *ms;
  1291. uas_lock(uas);
  1292. ms = hashset_find(uas->method_strand, (const void *)method_id);
  1293. if (!ms) {
  1294. ms = create_method_strand(method_id);
  1295. hashset_add(uas->method_strand, ms);
  1296. }
  1297. uas_unlock(uas);
  1298. return ms->strand;
  1299. }
  1300. static void __threadpool_uas_process_info(threadpool_t *threadpool, void *arg, int param1, int param2)
  1301. {
  1302. sp_ses_uas_t *uas = (sp_ses_uas_t*)arg;
  1303. sp_ses_mgr_t *mgr = uas->mgr;
  1304. int method_id = param1;
  1305. int method_sig;
  1306. iobuffer_t *info_pkt = (iobuffer_t*)param2;
  1307. iobuffer_read(info_pkt, IOBUF_T_I4, &method_sig, 0);
  1308. if (uas->state == SP_SES_STATE_CONNECTED ||
  1309. (uas->state == SP_SES_STATE_ERROR && uas->error == Error_PeerClose)) {
  1310. sp_rsn_context_t from_rsn;
  1311. sp_rsn_context_t rsn;
  1312. iobuffer_read(info_pkt, IOBUF_T_I4, &from_rsn.depth, 0);
  1313. iobuffer_read(info_pkt, IOBUF_T_I4, &from_rsn.original_type, 0);
  1314. iobuffer_read(info_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0);
  1315. iobuffer_read(info_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0);
  1316. sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn);
  1317. sp_svc_push_runserial_context(mgr->svc, &rsn);
  1318. uas->cb.on_info(uas, method_id, method_sig, &info_pkt, uas->cb.user_data);
  1319. sp_svc_pop_runserial_context(mgr->svc);
  1320. }
  1321. if (info_pkt)
  1322. iobuffer_dec_ref(info_pkt);
  1323. sp_ses_uas_dec_ref(uas);//@
  1324. }
  1325. static strand_t *uas_decide_strand(sp_ses_uas_t *uas, int method_id, int overlap)
  1326. {
  1327. strand_t *strand = NULL;
  1328. if (uas->strand) {
  1329. if (overlap) {
  1330. strand = NULL;
  1331. } else {
  1332. strand = uas->strand;
  1333. }
  1334. } else {
  1335. if (!overlap) {
  1336. strand = uas_get_strand(uas, method_id);
  1337. }
  1338. }
  1339. return strand;
  1340. }
  1341. static void uas_process_info(sp_ses_uas_t *uas, int method_id, int method_sig, iobuffer_t **info_pkt)
  1342. {
  1343. int rc;
  1344. int overlap;
  1345. rc = (*uas->cb.get_method_attr)(uas, method_id, method_sig, &overlap, uas->cb.user_data);
  1346. if (rc == 0) {
  1347. strand_t *strand = uas_decide_strand(uas, method_id, overlap);
  1348. iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_sig, 0);
  1349. sp_ses_uas_inc_ref(uas); //@
  1350. rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), strand, &__threadpool_uas_process_info, uas, method_id, (int)*info_pkt);
  1351. if (rc != 0) {
  1352. sp_dbg_warn("process info queue work item failed!");
  1353. sp_ses_uas_dec_ref(uas); //@
  1354. } else {
  1355. *info_pkt = NULL;
  1356. }
  1357. } else {
  1358. //sp_dbg_warn("method %d may be changed!", method_id);
  1359. sp_dbg_error("method signature not match: method_id=%d, method_sig=%d, error=%d", method_id, method_sig, rc);
  1360. }
  1361. }
  1362. static void uas_process_req_reply_error(sp_ses_uas_t *uas, int tsx_id, int err)
  1363. {
  1364. int v = 0;
  1365. iobuffer_t *ans_pkt = iobuffer_create(-1, -1);
  1366. assert(err != 0);
  1367. iobuffer_write(ans_pkt, IOBUF_T_I4, &err, 0); // sys error
  1368. iobuffer_write(ans_pkt, IOBUF_T_I4, &v, 0); // user error
  1369. iobuffer_write_head(ans_pkt, IOBUF_T_I4, &tsx_id, 0);
  1370. sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_ANS, uas->conn_id, &ans_pkt);
  1371. if (ans_pkt)
  1372. iobuffer_dec_ref(ans_pkt);
  1373. }
  1374. static void __threadpool_uas_process_req(threadpool_t *threadpool, void *arg, int param1, int param2)
  1375. {
  1376. sp_ses_uas_t *uas = (sp_ses_uas_t*)arg;
  1377. sp_ses_mgr_t *mgr = uas->mgr;
  1378. int tsx_id = param1;
  1379. int method_id;
  1380. int method_sig;
  1381. int timeout;
  1382. iobuffer_t *req_pkt = (iobuffer_t*)param2;
  1383. iobuffer_read(req_pkt, IOBUF_T_I4, &method_id, 0);
  1384. iobuffer_read(req_pkt, IOBUF_T_I4, &method_sig, 0);
  1385. iobuffer_read(req_pkt, IOBUF_T_I4, &timeout, 0);
  1386. if (uas->state == SP_SES_STATE_CONNECTED) {
  1387. sp_rsn_context_t from_rsn;
  1388. sp_rsn_context_t rsn;
  1389. iobuffer_read(req_pkt, IOBUF_T_I4, &from_rsn.depth, 0);
  1390. iobuffer_read(req_pkt, IOBUF_T_I4, &from_rsn.original_type, 0);
  1391. iobuffer_read(req_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0);
  1392. iobuffer_read(req_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0);
  1393. sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn);
  1394. sp_svc_push_runserial_context(mgr->svc, &rsn);
  1395. uas->cb.on_req(uas, tsx_id, method_id, method_sig, timeout, &req_pkt, uas->cb.user_data);
  1396. sp_svc_pop_runserial_context(mgr->svc);
  1397. } else {
  1398. uas_process_req_reply_error(uas, tsx_id, Error_InvalidState);
  1399. }
  1400. if (req_pkt)
  1401. iobuffer_dec_ref(req_pkt);
  1402. sp_ses_uas_dec_ref(uas); //@
  1403. }
  1404. static void uas_process_req(sp_ses_uas_t *uas, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt)
  1405. {
  1406. int rc;
  1407. int overlap;
  1408. rc = uas->cb.get_method_attr(uas, method_id, method_sig, &overlap, uas->cb.user_data);
  1409. if (rc == 0) {
  1410. strand_t *strand = uas_decide_strand(uas, method_id, overlap);
  1411. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &timeout, 0);
  1412. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &method_sig, 0);
  1413. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &method_id, 0);
  1414. sp_ses_uas_inc_ref(uas); //@
  1415. rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), strand, &__threadpool_uas_process_req, uas, tsx_id, (int)*req_pkt);
  1416. if (rc != 0) {
  1417. uas_process_req_reply_error(uas, tsx_id, rc);
  1418. sp_dbg_warn("process req queue work item failed!");
  1419. sp_ses_uas_dec_ref(uas); //@
  1420. } else {
  1421. *req_pkt = NULL;
  1422. }
  1423. } else {
  1424. uas_process_req_reply_error(uas, tsx_id, rc);
  1425. sp_dbg_error("method signature not match: tsx_id=%d, method_id=%d, method_sig=%d, error=%d", tsx_id, method_id, method_sig, rc);
  1426. }
  1427. }
  1428. int sp_ses_uas_create(sp_ses_mgr_t *mgr, int remote_epid, int remote_svc_id, int conn_id, int overlap, sp_ses_uas_callback *cb, sp_ses_uas_t **p_uas)
  1429. {
  1430. unsigned int slot;
  1431. sp_ses_uas_t *uas;
  1432. if (!cb || !cb->on_req || !cb->on_info)
  1433. return Error_Param;
  1434. uas = MALLOC_T(sp_ses_uas_t);
  1435. uas->mgr = mgr;
  1436. uas->conn_id = conn_id;
  1437. uas->remote_epid = remote_epid;
  1438. uas->remote_svc_id = remote_svc_id;
  1439. spinlock_init(&uas->lock);
  1440. INIT_HLIST_NODE(&uas->hentry);
  1441. memcpy(&uas->cb, cb, sizeof(sp_ses_uas_callback));
  1442. uas->state = SP_SES_STATE_INIT;
  1443. uas->state_begin_time = y2k_time_now();
  1444. uas->error = 0;
  1445. uas->tsx_cnt = 0;
  1446. uas->begin_time = y2k_time_now();
  1447. uas->strand = overlap ? NULL : strand_create();
  1448. INIT_LIST_HEAD(&uas->tsx_list);
  1449. REF_COUNT_INIT(&uas->ref_cnt);
  1450. uas->method_strand = overlap ? hashset_create(offset_of(method_strand_t, node), &method_strand_getkey, &method_strand_hash, &method_strand_cmpkey) : NULL;
  1451. sp_ses_mgr_inc_ref(mgr);
  1452. slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE;
  1453. sp_ses_uas_inc_ref(uas);
  1454. mgr_lock(mgr);
  1455. hlist_add_head(&uas->hentry, &mgr->uas_buckets[slot]);
  1456. mgr->ses_cnt++;
  1457. mgr_unlock(mgr);
  1458. *p_uas = uas;
  1459. sp_dbg_info("ses uas created!");
  1460. return 0;
  1461. }
  1462. void sp_ses_uas_destroy(sp_ses_uas_t *uas)
  1463. {
  1464. assert(uas->state == SP_SES_STATE_TERM);
  1465. mgr_lock(uas->mgr);
  1466. uas->mgr->ses_cnt--;
  1467. hlist_del_init(&uas->hentry);
  1468. mgr_unlock(uas->mgr);
  1469. sp_ses_uas_dec_ref(uas);
  1470. sp_ses_uas_dec_ref(uas);
  1471. }
  1472. int sp_ses_uas_close(sp_ses_uas_t *uas)
  1473. {
  1474. int rc = 0;
  1475. int trigger = 0;
  1476. sp_ses_uas_inc_ref(uas);
  1477. uas_lock(uas);
  1478. if (uas->state == SP_SES_STATE_INIT) {
  1479. uas->state = SP_SES_STATE_TERM;
  1480. uas->state_begin_time = y2k_time_now();
  1481. trigger++;
  1482. } else if (uas->state == SP_SES_STATE_CONNECTED) {
  1483. sp_tsx_uas_t *pos, *n;
  1484. uas->state = SP_SES_STATE_TERM;
  1485. uas->state_begin_time = y2k_time_now();
  1486. trigger++;
  1487. list_for_each_entry_safe(pos, n, &uas->tsx_list, sp_tsx_uas_t, entry) {
  1488. sp_tsx_uas_close(pos);
  1489. }
  1490. } else if (uas->state == SP_SES_STATE_ERROR) {
  1491. uas->state = SP_SES_STATE_TERM;
  1492. uas->state_begin_time = y2k_time_now();
  1493. } else if (uas->state == SP_SES_STATE_TERM) {
  1494. rc = Error_Duplication;
  1495. } else {
  1496. rc = Error_Unexpect;
  1497. }
  1498. if (trigger)
  1499. uas_trigger(uas, Error_Closed);
  1500. uas_unlock(uas);
  1501. if (rc == 0) {
  1502. sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_FINS, uas->conn_id, NULL);
  1503. }
  1504. sp_ses_uas_dec_ref(uas);
  1505. return rc;
  1506. }
  1507. int sp_ses_uas_get_remote_epid(sp_ses_uas_t *uas)
  1508. {
  1509. return uas->remote_epid;
  1510. }
  1511. int sp_ses_uas_get_remote_svc_id(sp_ses_uas_t *uas)
  1512. {
  1513. return uas->remote_svc_id;
  1514. }
  1515. int sp_ses_uas_get_conn_id(sp_ses_uas_t *uas)
  1516. {
  1517. return uas->conn_id;
  1518. }
  1519. sp_ses_mgr_t *sp_ses_uas_get_mgr(sp_ses_uas_t *uas)
  1520. {
  1521. return uas->mgr;
  1522. }
  1523. int sp_ses_uas_get_state(sp_ses_uas_t *uas)
  1524. {
  1525. return uas->state;
  1526. }
  1527. int sp_ses_uas_get_tsx_cnt(sp_ses_uas_t *uas)
  1528. {
  1529. return uas->tsx_cnt;
  1530. }
  1531. int sp_ses_uas_accept(sp_ses_uas_t *uas)
  1532. {
  1533. int rc;
  1534. uas_lock(uas);
  1535. if (uas->state == SP_SES_STATE_INIT) {
  1536. uas->state = SP_SES_STATE_CONNECTED;
  1537. uas->state_begin_time = y2k_time_now();
  1538. rc = Error_Succeed;
  1539. } else if (uas->state == SP_SES_STATE_CONNECTED) {
  1540. rc = Error_Duplication;
  1541. } else {
  1542. rc = Error_NetBroken;
  1543. }
  1544. uas_unlock(uas);
  1545. if (rc == 0) {
  1546. iobuffer_t *pkt = iobuffer_create(-1, -1);
  1547. sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uas->mgr->svc);
  1548. if (rsn_ctx) {
  1549. iobuffer_write_head(pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
  1550. iobuffer_write_head(pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
  1551. iobuffer_write_head(pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
  1552. iobuffer_write_head(pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
  1553. } else {
  1554. unsigned __int64 rsn = 0;
  1555. int t = 0;
  1556. iobuffer_write_head(pkt, IOBUF_T_I8, &rsn, 0);
  1557. iobuffer_write_head(pkt, IOBUF_T_I8, &rsn, 0);
  1558. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  1559. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  1560. }
  1561. rc = sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_CONNACK, uas->conn_id, &pkt);
  1562. if (pkt) {
  1563. iobuffer_dec_ref(pkt);
  1564. }
  1565. }
  1566. return rc;
  1567. }
  1568. static void __sp_ses_uas_destroy(sp_ses_uas_t *uas)
  1569. {
  1570. if (uas->cb.on_destroy) {
  1571. uas->cb.on_destroy(uas, uas->cb.user_data);
  1572. }
  1573. sp_ses_mgr_dec_ref(uas->mgr);
  1574. if (uas->strand)
  1575. strand_destroy(uas->strand);
  1576. if (uas->method_strand) {
  1577. method_strand_t *tpos;
  1578. struct hlist_node *pos, *n;
  1579. int slot;
  1580. hashset_for_each_safe(tpos, pos, n, slot, uas->method_strand, method_strand_t) {
  1581. hashset_remove(uas->method_strand, (void*)tpos->type);
  1582. delete_method_strand(tpos);
  1583. }
  1584. hashset_destroy(uas->method_strand);
  1585. }
  1586. free(uas);
  1587. sp_dbg_info("ses uas destroy!");
  1588. }
  1589. IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_uas, sp_ses_uas_t, ref_cnt, __sp_ses_uas_destroy)
  1590. static __inline void tsx_uas_lock(sp_tsx_uas_t *tsx)
  1591. {
  1592. spinlock_enter(&tsx->lock, -1);
  1593. }
  1594. static __inline void tsx_uas_unlock(sp_tsx_uas_t *tsx)
  1595. {
  1596. spinlock_leave(&tsx->lock);
  1597. }
  1598. int sp_tsx_uas_create(sp_ses_uas_t *uas, int tsx_id, sp_tsx_uas_callback *cb, sp_tsx_uas_t **p_tsx)
  1599. {
  1600. sp_tsx_uas_t *tsx = MALLOC_T(sp_tsx_uas_t);
  1601. unsigned int slot;
  1602. tsx->state = SP_TSX_UAS_STATE_INIT;
  1603. uas->state_begin_time = y2k_time_now();
  1604. tsx->uas = uas;
  1605. tsx->tsx_id = tsx_id;
  1606. spinlock_init(&tsx->lock);
  1607. memcpy(&tsx->cb, cb, sizeof(sp_tsx_uas_callback));
  1608. INIT_HLIST_NODE(&tsx->hentry);
  1609. REF_COUNT_INIT(&tsx->ref_cnt);
  1610. sp_ses_uas_inc_ref(uas);
  1611. uas_lock(uas);
  1612. sp_tsx_uas_inc_ref(tsx);
  1613. list_add_tail(&tsx->entry, &uas->tsx_list);
  1614. uas->tsx_cnt++;
  1615. uas_unlock(uas);
  1616. slot = jhash_3words(uas->remote_svc_id, uas->conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
  1617. mgr_lock(uas->mgr);
  1618. sp_tsx_uas_inc_ref(tsx);
  1619. hlist_add_head(&tsx->hentry, &uas->mgr->uas_tsx_buckets[slot]);
  1620. mgr_unlock(uas->mgr);
  1621. *p_tsx = tsx;
  1622. sp_dbg_info("tsx uas created!");
  1623. return 0;
  1624. }
  1625. void sp_tsx_uas_destroy(sp_tsx_uas_t *tsx)
  1626. {
  1627. sp_ses_uas_t *uas = tsx->uas;
  1628. assert(tsx->state == SP_TSX_UAS_STATE_TERM);
  1629. uas_lock(uas);
  1630. uas->tsx_cnt--;
  1631. list_del_init(&tsx->entry);
  1632. uas_unlock(uas);
  1633. sp_tsx_uas_dec_ref(tsx);
  1634. mgr_lock(uas->mgr);
  1635. hlist_del_init(&tsx->hentry);
  1636. mgr_unlock(uas->mgr);
  1637. sp_tsx_uas_dec_ref(tsx);
  1638. sp_tsx_uas_dec_ref(tsx);
  1639. }
  1640. sp_ses_uas_t *sp_tsx_uas_get_session(sp_tsx_uas_t *tsx)
  1641. {
  1642. return tsx->uas;
  1643. }
  1644. int sp_tsx_uas_get_state(sp_tsx_uas_t *tsx)
  1645. {
  1646. return tsx->state;
  1647. }
  1648. int sp_tsx_uas_get_id(sp_tsx_uas_t *tsx)
  1649. {
  1650. return tsx->tsx_id;
  1651. }
  1652. int sp_tsx_uas_close(sp_tsx_uas_t *tsx)
  1653. {
  1654. int rc;
  1655. tsx_uas_lock(tsx);
  1656. if (tsx->state != SP_TSX_UAS_STATE_TERM) {
  1657. tsx->state = SP_TSX_UAS_STATE_TERM;
  1658. rc = 0;
  1659. } else {
  1660. rc = Error_Duplication;
  1661. }
  1662. tsx_uas_unlock(tsx);
  1663. return rc;
  1664. }
  1665. int sp_tsx_uas_answer(sp_tsx_uas_t *tsx, int end, iobuffer_t **ans_pkt)
  1666. {
  1667. int rc;
  1668. int trigger = 0;
  1669. tsx_uas_lock(tsx);
  1670. if (tsx->state == SP_TSX_UAS_STATE_INIT) {
  1671. trigger++;
  1672. rc = 0;
  1673. tsx->state = end ? SP_TSX_UAS_STATE_ANS : SP_TSX_UAS_STATE_TMPANS;
  1674. } else if (tsx->state == SP_TSX_UAS_STATE_TMPANS) {
  1675. trigger++;
  1676. rc = 0;
  1677. tsx->state = end ? SP_TSX_UAS_STATE_ANS : SP_TSX_UAS_STATE_TMPANS;
  1678. } else {
  1679. rc = Error_Bug;
  1680. }
  1681. tsx_uas_unlock(tsx);
  1682. if (trigger) {
  1683. iobuffer_write_head(*ans_pkt, IOBUF_T_I4, &tsx->tsx_id, 0);
  1684. rc = sp_svc_post(tsx->uas->mgr->svc, tsx->uas->remote_epid, tsx->uas->remote_svc_id, SP_PKT_SES | (end ? SES_CMD_ANS : SES_CMD_TMPANS), tsx->uas->conn_id, ans_pkt);
  1685. }
  1686. return rc;
  1687. }
  1688. static void __sp_tsx_uas_destroy(sp_tsx_uas_t *tsx)
  1689. {
  1690. if (tsx->cb.on_destroy)
  1691. tsx->cb.on_destroy(tsx, tsx->cb.user_data);
  1692. sp_ses_uas_dec_ref(tsx->uas);
  1693. free(tsx);
  1694. sp_dbg_info("tsx uas destroyed!");
  1695. }
  1696. IMPLEMENT_REF_COUNT_MT_STATIC(sp_tsx_uas, sp_tsx_uas_t, ref_cnt, __sp_tsx_uas_destroy)