#include "precompile.h" #include "sp_ses.h" #include "sp_def.h" #include "sp_svc.h" #include "sp_dbg_export.h" #include "sp_mod.h" #include "sp_env.h" #include "SpBase.h" #include "memutil.h" #include "refcnt.h" #include "list.h" #include "jhash.h" #include "hashset.h" #include "spinlock.h" #include "sp_logwithlinkforc.h" #include "dbgutil.h" #define TAG SPBASE_TAG("sp_ses") #define CONN_BUCKET_SIZE 127 #define TSX_BUCKET_SIZE 511 #define MAX_REDIRECT 10 #define SES_CMD_REQ 0 /* uac -> uas two way call */ #define SES_CMD_TMPANS 1 /* uas -> uac middle response */ #define SES_CMD_ANS 2 /* uas -> uac the end response */ #define SES_CMD_INFO 3 /* uac -> uas one way call */ #define SES_CMD_CONNACK 4 /* uas -> uac */ #define SES_CMD_FINC 5 /* uac -> uas session close */ #define SES_CMD_FINS 6 /* uas -> uac session close */ #define SES_CMD_CONN 7 /* uac -> uas */ #define SES_CMD_ERRC 8 /* uac -> uas, session broken */ #define SES_CMD_ERRS 9 /* uas -> uac, session broken */ #define SES_CMD_REDIRECT 10 /* uas -> uac, session redirect */ struct sp_ses_mgr_t { struct hlist_head uac_buckets[CONN_BUCKET_SIZE]; struct hlist_head uas_buckets[CONN_BUCKET_SIZE]; struct hlist_head uac_tsx_buckets[TSX_BUCKET_SIZE]; struct hlist_head uas_tsx_buckets[TSX_BUCKET_SIZE]; sp_ses_mgr_callback_t cb; sp_svc_t *svc; int ses_cnt; CRITICAL_SECTION lock; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sp_ses_mgr, sp_ses_mgr_t) struct sp_ses_uac_t { struct hlist_node hentry; // element of mgr->uac_buckets sp_ses_uac_callback cb; sp_ses_mgr_t *mgr; int remote_epid; int remote_svc_id; unsigned int conn_id; int state; int error; int tsx_cnt; int redirect_cnt; y2k_time_t begin_time; y2k_time_t state_begin_time; spinlock_t lock; timer_entry *op_timer; struct list_head tsx_list; // list of sp_tsx_uac_t->entry DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sp_ses_uac, sp_ses_uac_t) struct sp_tsx_uac_t { struct hlist_node hentry; // element of mgr->uac_tsx_buckets struct list_head entry; // element of sp_ses_uac_t->tsx_list int state; int tsx_id; int method_id; int method_sig; spinlock_t lock; sp_tsx_uac_callback cb; sp_ses_uac_t *uac; timer_entry *op_timer; strand_t *strand; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sp_tsx_uac, sp_tsx_uac_t) typedef struct method_strand_t { int type; strand_t *strand; struct hlist_node node; }method_strand_t; struct sp_ses_uas_t { struct hlist_node hentry; // element of mgr->uas_buckets sp_ses_uas_callback cb; sp_ses_mgr_t *mgr; int remote_epid; int remote_svc_id; int conn_id; int state; int error; int tsx_cnt; y2k_time_t begin_time; y2k_time_t state_begin_time; spinlock_t lock; strand_t *strand; hashset_t *method_strand; struct list_head tsx_list; // list of sp_tsx_uas_t->entry DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sp_ses_uas, sp_ses_uas_t) struct sp_tsx_uas_t { struct hlist_node hentry; // element of mgr->uas_tsx_buckets struct list_head entry; // element of sp_ses_uas_t->tsx_list int state; int tsx_id; spinlock_t lock; sp_tsx_uas_callback cb; sp_ses_uas_t *uas; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sp_tsx_uas, sp_tsx_uas_t) static void uac_process_connack(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id); static void uac_process_redirect(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt); static void tsx_uac_process_ans(sp_tsx_uac_t *tsx, int end, iobuffer_t **ans_pkt); static void uac_process_errs(sp_ses_uac_t *uac, int error); static void uas_process_errc(sp_ses_uas_t *uas, int error); static void uas_process_info(sp_ses_uas_t *uas, int method_id, int method_sig, iobuffer_t **info_pkt); static void uas_process_req(sp_ses_uas_t *uas, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt); static void uac_trigger(sp_ses_uac_t *uac, int error, int connect); static void tsx_uac_process_errs(sp_tsx_uac_t *tsx, int error); static void uas_trigger(sp_ses_uas_t *uas, int error); static __inline void mgr_lock(sp_ses_mgr_t *mgr) { EnterCriticalSection(&mgr->lock); } static __inline void mgr_unlock(sp_ses_mgr_t *mgr) { LeaveCriticalSection(&mgr->lock); } static sp_ses_uac_t *mgr_find_uac(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id) { int slot; sp_ses_uac_t *tpos; struct hlist_node *pos; slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE; hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[slot], sp_ses_uac_t, hentry) { if (tpos->conn_id == conn_id) return tpos; } return NULL; } static sp_ses_uas_t *mgr_find_uas(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id) { int slot; sp_ses_uas_t *tpos; struct hlist_node *pos; slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE; hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[slot], sp_ses_uas_t, hentry) { if (tpos->conn_id == conn_id) return tpos; } return NULL; } sp_ses_info_t* sp_ses_mgr_get_ses_info(sp_ses_mgr_t *mgr, int *cnt) { sp_ses_info_t* ret = NULL; int index = 0; int i = 0; *cnt = mgr->ses_cnt; if (*cnt <=0) return NULL; ret = (sp_ses_info_t*) malloc(sizeof(sp_ses_info_t)*(*cnt)); for(i=0; iuac_buckets[i], sp_ses_uac_t, hentry) { ret[index].from_svc_id = sp_svc_get_id(mgr->svc); ret[index].to_svc_id = tpos->remote_svc_id; ret[index].begin_time = tpos->begin_time; ret[index].state_begin_time = tpos->state_begin_time; ret[index].state = tpos->state == SP_SES_STATE_CONNECTED ? 1 : 3; index++; } } for(i=0; iuas_buckets[i], sp_ses_uas_t, hentry) { ret[index].from_svc_id = tpos->remote_svc_id; ret[index].to_svc_id = sp_svc_get_id(mgr->svc); ret[index].begin_time = tpos->begin_time; ret[index].state_begin_time = tpos->state_begin_time; ret[index].state = tpos->state == SP_SES_STATE_CONNECTED ? 1 : 3; index++; } } TOOLKIT_ASSERT(index == *cnt); return ret; } static sp_tsx_uac_t *mgr_find_tsx_uac(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id) { int slot; sp_tsx_uac_t *tpos; struct hlist_node *pos; slot = jhash_3words(svc_id, conn_id, tsx_id, 0) % TSX_BUCKET_SIZE; hlist_for_each_entry(tpos, pos, &mgr->uac_tsx_buckets[slot], sp_tsx_uac_t, hentry) { if (tpos->uac->remote_svc_id == svc_id && tpos->uac->conn_id == conn_id && tpos->tsx_id == tsx_id) return tpos; } return NULL; } static sp_tsx_uas_t *mgr_find_tsx_uas(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id) { int slot; sp_tsx_uas_t *tpos; struct hlist_node *pos; slot = jhash_3words(svc_id, conn_id, tsx_id, 0) % TSX_BUCKET_SIZE; hlist_for_each_entry(tpos, pos, &mgr->uas_tsx_buckets[slot], sp_tsx_uas_t, hentry) { if (tpos->uas->remote_svc_id == svc_id && tpos->uas->conn_id == conn_id && tpos->tsx_id == tsx_id) return tpos; } return NULL; } static void mgr_process_connack(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id) { sp_ses_uac_t *uac; mgr_lock(mgr); uac = mgr_find_uac(mgr, epid, svc_id, conn_id); if (uac) sp_ses_uac_inc_ref(uac); // lock mgr_unlock(mgr); if (!uac) { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "connack find no peer!"); sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL); return; } uac_process_connack(uac, epid, svc_id, conn_id); sp_ses_uac_dec_ref(uac); // unlock } static void mgr_process_redirect(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt) { sp_ses_uac_t *uac; mgr_lock(mgr); uac = mgr_find_uac(mgr, epid, svc_id, conn_id); if (uac) sp_ses_uac_inc_ref(uac); // lock mgr_unlock(mgr); if (!uac) { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "redirect cannot find uac!"); return; } uac_process_redirect(uac, epid, svc_id, conn_id, redirect_ent_id, p_pkt); sp_ses_uac_dec_ref(uac); // unlock } static void mgr_process_ans(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int end, int tsx_id, iobuffer_t **ans_pkt) { sp_tsx_uac_t *tsx; mgr_lock(mgr); tsx = mgr_find_tsx_uac(mgr, epid, svc_id, conn_id, tsx_id); if (tsx) sp_tsx_uac_inc_ref(tsx); // lock mgr_unlock(mgr); if (!tsx) { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "ack find no peer!"); return; } tsx_uac_process_ans(tsx, end, ans_pkt); sp_tsx_uac_dec_ref(tsx); // unlock } static void mgr_process_errs(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int error) { sp_ses_uac_t *uac; mgr_lock(mgr); uac = mgr_find_uac(mgr, epid, svc_id, conn_id); if (uac) sp_ses_uac_inc_ref(uac); // @ lock uac no delete mgr_unlock(mgr); if (uac) { uac_process_errs(uac, error); sp_ses_uac_dec_ref(uac); // @ unlock } } static void mgr_process_errc(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int error) { sp_ses_uas_t *uas; mgr_lock(mgr); uas = mgr_find_uas(mgr, epid, svc_id, conn_id); if (uas) sp_ses_uas_inc_ref(uas); // @ lock uac no delete mgr_unlock(mgr); if (uas) { uas_process_errc(uas, error); sp_ses_uas_dec_ref(uas); // @ unlock } } static void mgr_process_fins(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id) { mgr_process_errs(mgr, epid, svc_id, conn_id, Error_PeerClose); } static void __threadpool_mgr_process_conn(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2) { sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)arg; iobuffer_t *conn_pkt = (iobuffer_t*)param1; int epid; int svc_id; int conn_id; int redirect_target = 0; int rc; int read_state; sp_rsn_context_t from_rsn; sp_rsn_context_t rsn; iobuffer_read(conn_pkt, IOBUF_T_I4, &epid, 0); iobuffer_read(conn_pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_read(conn_pkt, IOBUF_T_I4, &conn_id, 0); read_state = iobuffer_get_read_state(conn_pkt); WLog_DBG(TAG, "begin on_accept, from epid:%d, svc_id:%d, conn_id:%d", epid, svc_id, conn_id); iobuffer_read(conn_pkt, IOBUF_T_I4, &from_rsn.depth, 0); iobuffer_read(conn_pkt, IOBUF_T_I4, &from_rsn.original_type, 0); iobuffer_read(conn_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0); iobuffer_read(conn_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0); sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn); sp_svc_push_runserial_context(mgr->svc, &rsn); rc = mgr->cb.on_accept(mgr, epid, svc_id, conn_id, &conn_pkt, &redirect_target, mgr->cb.user_data); sp_svc_pop_runserial_context(mgr->svc); if (rc) { if (redirect_target) { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "conn from svc_id %d redirect to %d!", svc_id, redirect_target); iobuffer_restore_read_state(conn_pkt, read_state); iobuffer_write_head(conn_pkt, IOBUF_T_I4, &redirect_target, 0); sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_REDIRECT, conn_id, &conn_pkt); } else { iobuffer_t *pkt = iobuffer_create(-1, -1); iobuffer_write(pkt, IOBUF_T_I4, &rc, 0); DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "conn from svc_id %d reject! rc: %d", svc_id, rc); sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt); if (pkt) iobuffer_dec_ref(pkt); } } else { DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "on accept ok!"); } if (conn_pkt) { iobuffer_dec_ref(conn_pkt); } sp_ses_mgr_dec_ref(mgr); // @ } static void mgr_process_conn(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, iobuffer_t **conn_pkt) { sp_ses_uas_t *uas; int err; sp_entity_t *ent; mgr_lock(mgr); uas = mgr_find_uas(mgr, epid, svc_id, conn_id); mgr_unlock(mgr); ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, sp_svc_get_id(mgr->svc)); if (ent) { if (ent->state == EntityState_Lost) { iobuffer_t *pkt = iobuffer_create(-1, -1); int rc = Error_Losted; iobuffer_write(pkt, IOBUF_T_I4, &rc, 0); DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "reject connection because of lost state!"); sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt); if (pkt) iobuffer_dec_ref(pkt); return; } } else { TOOLKIT_ASSERT(0);// never go here } if (uas) { // already exist, duplicate iobuffer_t *pkt = iobuffer_create(-1, -1); int rc = Error_Duplication; iobuffer_write(pkt, IOBUF_T_I4, &rc, 0); DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "conn duplicate!"); sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt); if (pkt) iobuffer_dec_ref(pkt); return; } iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &conn_id, 0); iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &epid, 0); sp_ses_mgr_inc_ref(mgr); // @ err = threadpool_queue_workitem2(sp_svc_get_threadpool(mgr->svc), NULL, &__threadpool_mgr_process_conn, mgr, (param_size_t)*conn_pkt, 0); if (err) { iobuffer_t *pkt = iobuffer_create(-1, -1); int rc = Error_PeerReject; iobuffer_write(pkt, IOBUF_T_I4, &rc, 0); sp_ses_mgr_dec_ref(mgr); // @ DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "conn from svc_id %d reject!", svc_id); sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt); if (pkt) iobuffer_dec_ref(pkt); return; } else { *conn_pkt = NULL; } } static void mgr_process_info(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt) { sp_ses_uas_t *uas; mgr_lock(mgr); uas = mgr_find_uas(mgr, epid, svc_id, conn_id); if (uas) { sp_ses_uas_inc_ref(uas); // @ } mgr_unlock(mgr); if (!uas) { iobuffer_t *pkt = iobuffer_create(-1, -1); int rc = Error_NotExist; iobuffer_write(pkt, IOBUF_T_I4, &rc, 0); DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "cannot find uas session!"); sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt); if (pkt) iobuffer_dec_ref(pkt); return; } uas_process_info(uas, method_id, method_sig, info_pkt); sp_ses_uas_dec_ref(uas); // @ } static void mgr_process_req(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt) { sp_ses_uas_t *uas; mgr_lock(mgr); uas = mgr_find_uas(mgr, epid, svc_id, conn_id); if (uas) { sp_ses_uas_inc_ref(uas); // @ } mgr_unlock(mgr); if (!uas) { iobuffer_t *pkt = iobuffer_create(-1, -1); int rc = Error_NotExist; iobuffer_write(pkt, IOBUF_T_I4, &rc, 0); DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "cannot find uas session!"); sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt); if (pkt) iobuffer_dec_ref(pkt); return; } uas_process_req(uas, tsx_id, method_id, method_sig, timeout, req_pkt); sp_ses_uas_dec_ref(uas); // @ } static void mgr_process_finc(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id) { mgr_process_errc(mgr, epid, svc_id, conn_id, Error_PeerClose); } static int mgr_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data) { sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)user_data; int cmd_type; int tsx_id; int method_id; int method_sig; int timeout; int redirect_ent_id; int rc = 0; WLog_DBG(TAG, "sp_ses::mgr_on_pkt: epid:%d, svc_id: %d, pkt_type:0x%08X, pkt_id: %d, mgr:%d", epid, svc_id, pkt_type, pkt_id, SP_GET_TYPE(pkt_type)); TOOLKIT_ASSERT(SP_GET_PKT_TYPE(pkt_type) == SP_PKT_SES); cmd_type = SP_GET_TYPE(pkt_type); switch (cmd_type) { case SES_CMD_INFO: iobuffer_read(*p_pkt, IOBUF_T_I4, &method_id, NULL); iobuffer_read(*p_pkt, IOBUF_T_I4, &method_sig, NULL); mgr_process_info(mgr, epid, svc_id, pkt_id, method_id, method_sig, p_pkt); break; case SES_CMD_CONN: mgr_process_conn(mgr, epid, svc_id, pkt_id, p_pkt); break; case SES_CMD_CONNACK: mgr_process_connack(mgr, epid, svc_id, pkt_id); break; case SES_CMD_FINC: mgr_process_finc(mgr, epid, svc_id, pkt_id); break; case SES_CMD_FINS: mgr_process_fins(mgr, epid, svc_id, pkt_id); break; case SES_CMD_ERRC: mgr_process_errc(mgr, epid, svc_id, pkt_id, Error_NetBroken); break; case SES_CMD_ERRS: if (p_pkt != NULL && *p_pkt != NULL) { iobuffer_read(*p_pkt, IOBUF_T_I4, &rc, NULL); iobuffer_dec_ref(*p_pkt); *p_pkt = NULL; } else rc = Error_NetBroken; mgr_process_errs(mgr, epid, svc_id, pkt_id, rc); break; case SES_CMD_REQ: iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL); iobuffer_read(*p_pkt, IOBUF_T_I4, &method_id, NULL); iobuffer_read(*p_pkt, IOBUF_T_I4, &method_sig, NULL); iobuffer_read(*p_pkt, IOBUF_T_I4, &timeout, NULL); mgr_process_req(mgr, epid, svc_id, pkt_id, tsx_id, method_id, method_sig, timeout, p_pkt); break; case SES_CMD_TMPANS: iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL); mgr_process_ans(mgr, epid, svc_id, pkt_id, 0, tsx_id, p_pkt); break; case SES_CMD_ANS: iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL); mgr_process_ans(mgr, epid, svc_id, pkt_id, 1, tsx_id, p_pkt); break; case SES_CMD_REDIRECT: iobuffer_read(*p_pkt, IOBUF_T_I4, &redirect_ent_id, NULL); mgr_process_redirect(mgr, epid, svc_id, pkt_id, redirect_ent_id, p_pkt); break; default: DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "recv unknown ses pkt!"); TOOLKIT_ASSERT(0); break; } return FALSE; } static void mgr_on_sys(sp_svc_t *svc,int epid, int state, void *user_data) { sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)user_data; if (state == BUS_STATE_OFF) { int i; mgr_lock(mgr); for (i = 0; i < CONN_BUCKET_SIZE; ++i) { sp_ses_uas_t *tpos; struct hlist_node *pos, *n; hlist_for_each_entry_safe(tpos, pos, n, &mgr->uas_buckets[i], sp_ses_uas_t, hentry) { if (epid == tpos->remote_epid) { uas_process_errc(tpos, Error_NetBroken); } } } for (i = 0; i < CONN_BUCKET_SIZE; ++i) { sp_ses_uac_t *tpos; struct hlist_node *pos, *n; hlist_for_each_entry_safe(tpos, pos, n, &mgr->uac_buckets[i], sp_ses_uac_t, hentry) { if (epid == tpos->remote_epid) { uac_process_errs(tpos, Error_NetBroken); } } } mgr_unlock(mgr); } } int sp_ses_mgr_create(sp_svc_t *svc, sp_ses_mgr_callback_t *cb, sp_ses_mgr_t **p_mgr) { sp_ses_mgr_t *mgr = MALLOC_T(sp_ses_mgr_t); int i; mgr->ses_cnt = 0; mgr->svc = svc; memcpy(&mgr->cb, cb, sizeof(sp_ses_mgr_callback_t)); REF_COUNT_INIT(&mgr->ref_cnt); for (i = 0;i < CONN_BUCKET_SIZE; ++i) { INIT_HLIST_HEAD(&mgr->uac_buckets[i]); INIT_HLIST_HEAD(&mgr->uas_buckets[i]); } for (i = 0; i < TSX_BUCKET_SIZE; ++i) { INIT_HLIST_HEAD(&mgr->uac_tsx_buckets[i]); INIT_HLIST_HEAD(&mgr->uas_tsx_buckets[i]); } InitializeCriticalSection(&mgr->lock); *p_mgr = mgr; return 0; } void sp_ses_mgr_destroy(sp_ses_mgr_t *mgr) { sp_ses_mgr_dec_ref(mgr); } int sp_ses_mgr_cancel_all(sp_ses_mgr_t *mgr) { int i; mgr_lock(mgr); for (i = 0; i < CONN_BUCKET_SIZE; ++i) { sp_ses_uas_t *tpos; struct hlist_node *pos; hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[i], sp_ses_uas_t, hentry) { sp_ses_uas_close(tpos); } } for (i = 0; i < CONN_BUCKET_SIZE; ++i) { sp_ses_uac_t *tpos; struct hlist_node *pos; hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[i], sp_ses_uac_t, hentry) { sp_ses_uac_close(tpos); } } mgr_unlock(mgr); return 0; } int sp_ses_mgr_get_conn_cnt(sp_ses_mgr_t *mgr) { return mgr->ses_cnt; } int sp_ses_mgr_start(sp_ses_mgr_t *mgr) { int rc; rc = sp_svc_add_pkt_handler(mgr->svc, (int)mgr, SP_PKT_SES, &mgr_on_pkt, mgr); if (rc == 0) rc = sp_svc_add_sys_handler(mgr->svc, (int)mgr, &mgr_on_sys, mgr); return rc; } int sp_ses_mgr_stop(sp_ses_mgr_t *mgr) { int rc; rc = sp_svc_remove_pkt_handler(mgr->svc, (int)mgr, SP_PKT_SES); if (rc == 0) rc = sp_svc_remove_sys_handler(mgr->svc, (int)mgr); return rc; } sp_svc_t *sp_ses_mgr_get_svc(sp_ses_mgr_t *mgr) { return mgr->svc; } static void __sp_ses_mgr_destroy(sp_ses_mgr_t *mgr) { if (mgr->cb.on_destroy) mgr->cb.on_destroy(mgr, mgr->cb.user_data); TOOLKIT_ASSERT(mgr->ses_cnt == 0); DeleteCriticalSection(&mgr->lock); free(mgr); } IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_mgr, sp_ses_mgr_t, ref_cnt, __sp_ses_mgr_destroy) static __inline void uac_lock(sp_ses_uac_t *ses) { spinlock_enter(&ses->lock, -1); } static __inline void uac_unlock(sp_ses_uac_t *ses) { spinlock_leave(&ses->lock); } static void uac_process_connack(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id) { int trigger = 0; uac_lock(uac); if (uac->state == SP_SES_STATE_CONNECTING) { uac->state = SP_SES_STATE_CONNECTED; uac->state_begin_time = y2k_time_now(); if (uac->op_timer) { sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1); uac->op_timer = NULL; } trigger++; } else { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "current state is not connecting!"); sp_svc_post(uac->mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL); } if (trigger) uac_trigger(uac, 0, 1); uac_unlock(uac); } static void uac_process_redirect(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt) { uac_lock(uac); if (uac->state == SP_SES_STATE_CONNECTING) { ++uac->redirect_cnt; if (uac->redirect_cnt >= MAX_REDIRECT) { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac has exceed max_direct value, maybe redirection enter end loop!"); if (uac->op_timer) { sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1); uac->op_timer = NULL; } uac->state = SP_SES_STATE_ERROR; uac->state_begin_time = y2k_time_now(); uac_trigger(uac, Error_NetBroken, 1); } else { sp_entity_t *ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, redirect_ent_id); if (ent->state == EntityState_Killed || ent->state == EntityState_Lost || ent->state == EntityState_NoStart) { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "redirected failed! redirect ent state invalid! redirect id:%d", redirect_ent_id); if (uac->op_timer) { sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1); uac->op_timer = NULL; } uac->state = SP_SES_STATE_ERROR; uac->state_begin_time = y2k_time_now(); uac_trigger(uac, Error_InvalidState, 1); } else { uac->remote_epid = ent->mod->cfg->idx; uac->remote_svc_id = ent->cfg->idx; sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_CONN, conn_id, p_pkt); //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "svc_post, epid:%d, svc_id:%d", uac->remote_epid, uac->remote_svc_id); } } } uac_unlock(uac); } static void uac_process_errs(sp_ses_uac_t *uac, int error) { int trigger = 0; int connect = 0; uac_lock(uac); if (uac->state == SP_SES_STATE_INIT) { uac->state = SP_SES_STATE_ERROR; uac->state_begin_time = y2k_time_now(); uac->error = error; } else if (uac->state == SP_SES_STATE_CONNECTING) { trigger++; uac->state = SP_SES_STATE_ERROR; uac->state_begin_time = y2k_time_now(); uac->error = error; connect ++; if (uac->op_timer) { sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1); uac->op_timer = NULL; } } else if (uac->state == SP_SES_STATE_CONNECTED) { sp_tsx_uac_t *pos, *n; uac->state = SP_SES_STATE_ERROR; uac->state_begin_time = y2k_time_now(); uac->error = error; list_for_each_entry_safe(pos, n, &uac->tsx_list, sp_tsx_uac_t, entry) { tsx_uac_process_errs(pos, error); } trigger++; } if (trigger) uac_trigger(uac, error, connect); uac_unlock(uac); } static void __threadpool_uac_trigger_close(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2) { sp_ses_uac_t *uac = (sp_ses_uac_t*)arg; sp_ses_mgr_t *mgr = uac->mgr; int error = (int)param1; sp_uid_t rsn = sp_svc_new_runserial(mgr->svc); sp_rsn_context_t rsn_ctx; sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_FRAMEWORK, &rsn_ctx); sp_svc_push_runserial_context(mgr->svc, &rsn_ctx); uac->cb.on_close(uac, error, uac->cb.user_data); sp_ses_uac_dec_ref(uac); // @ sp_svc_pop_runserial_context(mgr->svc); } static void uac_trigger(sp_ses_uac_t *uac, int error, int connect) { if (connect) { uac->cb.on_connect(uac, error, uac->cb.user_data); sp_ses_uac_dec_ref(uac); // @ } else { if (sp_iom_get_poll_thread_id(sp_svc_get_iom(uac->mgr->svc)) == GetCurrentThreadId()) { // int rc; sp_ses_uac_inc_ref(uac); // @ rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uac->mgr->svc), NULL, &__threadpool_uac_trigger_close, uac, error, 0); if (rc != 0) { sp_ses_uac_dec_ref(uac); // @ DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "thread pool queue uac_trirget_close event failed!"); } } else { uac->cb.on_close(uac, error, uac->cb.user_data); } } } static void uac_on_timer(timer_queue_t *q, timer_entry *timer, int err) { sp_ses_uac_t *uac = (sp_ses_uac_t*)timer->user_data; if (!err && uac->op_timer == timer && uac->state == SP_SES_STATE_CONNECTING) { int trigger = 0; uac_lock(uac); if (uac->op_timer == timer) { if (uac->state == SP_SES_STATE_CONNECTING) { uac->state = SP_SES_STATE_ERROR; // time out uac->state_begin_time = y2k_time_now(); trigger ++; } uac->op_timer = NULL; } if (trigger) uac_trigger(uac, Error_TimeOut, 1); uac_unlock(uac); } sp_ses_uac_dec_ref(uac); // @2 free(timer); } int sp_ses_uac_create(sp_ses_mgr_t *mgr, int remote_epid, int remote_svc_id, sp_ses_uac_callback *cb, sp_ses_uac_t **p_ses) { sp_ses_uac_t *ses; int slot; if (!cb || !cb->on_connect) return Error_Param; ses = MALLOC_T(sp_ses_uac_t); ses->mgr = mgr; ses->conn_id = sp_env_new_id(sp_get_env()); ses->remote_svc_id = remote_svc_id; ses->remote_epid = remote_epid; ses->state = SP_SES_STATE_INIT; ses->state_begin_time = y2k_time_now(); ses->error = 0; ses->op_timer = NULL; ses->tsx_cnt = 0; ses->begin_time = y2k_time_now(); ses->redirect_cnt = 0; memcpy(&ses->cb, cb, sizeof(sp_ses_uac_callback)); INIT_HLIST_NODE(&ses->hentry); REF_COUNT_INIT(&ses->ref_cnt); spinlock_init(&ses->lock); INIT_LIST_HEAD(&ses->tsx_list); sp_ses_mgr_inc_ref(mgr); slot = ses->conn_id % CONN_BUCKET_SIZE; sp_ses_uac_inc_ref(ses); mgr_lock(mgr); hlist_add_head(&ses->hentry, &mgr->uac_buckets[slot]); mgr->ses_cnt++; mgr_unlock(mgr); *p_ses = ses; //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "ses uac created!"); return 0; } void sp_ses_uac_destroy(sp_ses_uac_t *uac) { TOOLKIT_ASSERT(uac->state == SP_SES_STATE_TERM); mgr_lock(uac->mgr); uac->mgr->ses_cnt--; hlist_del_init(&uac->hentry); mgr_unlock(uac->mgr); sp_ses_uac_dec_ref(uac); sp_ses_uac_dec_ref(uac); } int sp_ses_uac_async_connect(sp_ses_uac_t *uac, int timeout, iobuffer_t **conn_pkt) { int rc; if (uac->state != SP_SES_STATE_INIT) return Error_Unexpect; sp_ses_uac_inc_ref(uac); uac_lock(uac); if (uac->state == SP_SES_STATE_INIT) { sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc); sp_ses_uac_inc_ref(uac); // @1 if (rsn_ctx) { iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0); iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0); iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0); iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0); } else { u__int64_t rsn = 0; int t = 0; iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn, 0); iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn, 0); iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &t, 0); iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &t, 0); } rc = sp_svc_send(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_CONN, uac->conn_id, conn_pkt); WLog_DBG(TAG, "svc_send, epid:%d, svc_id:%d, rc: %d", uac->remote_epid, uac->remote_svc_id, rc); if (rc == 0) { if (timeout >= 0) { uac->op_timer = MALLOC_T(timer_entry); uac->op_timer->cb = &uac_on_timer; uac->op_timer->user_data = uac; sp_ses_uac_inc_ref(uac); // @2 rc = sp_iom_schedule_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, (unsigned int)timeout); if (rc != 0) { iobuffer_pop_count(*conn_pkt, 24); sp_ses_uac_dec_ref(uac); // @2 sp_ses_uac_dec_ref(uac); // @1 free(uac->op_timer); uac->op_timer = NULL; DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "post conn, create timer failed!"); } } } else { iobuffer_pop_count(*conn_pkt, 24); sp_ses_uac_dec_ref(uac); // @1 DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac issue async connect failed, post msg failed, may be remote party offline!"); rc = Error_IO; } } else { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac state is not INIT !"); rc = Error_Unexpect; } uac->state = rc ? SP_SES_STATE_ERROR : SP_SES_STATE_CONNECTING; uac->state_begin_time = y2k_time_now(); uac_unlock(uac); sp_ses_uac_dec_ref(uac); return rc; } int sp_ses_uac_close(sp_ses_uac_t *uac) { int rc = 0; int trigger = 0; int connected = 0; DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "sp_ses_uac_close invoked!"); sp_ses_uac_inc_ref(uac); uac_lock(uac); if (uac->state == SP_SES_STATE_INIT) { uac->state = SP_SES_STATE_TERM; uac->state_begin_time = y2k_time_now(); } else if (uac->state == SP_SES_STATE_CONNECTING) { trigger++; uac->state = SP_SES_STATE_TERM; uac->state_begin_time = y2k_time_now(); if (uac->op_timer) { sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1); uac->op_timer = NULL; } connected++; } else if (uac->state == SP_SES_STATE_CONNECTED) { sp_tsx_uac_t *pos, *n; uac->state = SP_SES_STATE_TERM; uac->state_begin_time = y2k_time_now(); list_for_each_entry_safe(pos, n, &uac->tsx_list, sp_tsx_uac_t, entry) { sp_tsx_uac_close(pos); } trigger++; } else if (uac->state == SP_SES_STATE_ERROR) { uac->state = SP_SES_STATE_TERM; uac->state_begin_time = y2k_time_now(); } else { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac->state = %d", uac->state); rc = Error_Duplication; } if (trigger) uac_trigger(uac, Error_Closed, connected); uac_unlock(uac); if (rc == 0) { sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_FINC, uac->conn_id, NULL); } sp_ses_uac_dec_ref(uac); return rc; } int sp_ses_uac_get_state(sp_ses_uac_t *uac) { return uac->state; } int sp_ses_uac_send_info(sp_ses_uac_t *uac, int method_id, int method_sig, iobuffer_t **info_pkt) { int rc; if (uac->state == SP_SES_STATE_CONNECTED) { sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc); if (rsn_ctx) { iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0); iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0); iobuffer_write_head(*info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0); iobuffer_write_head(*info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0); } else { u__int64_t rsn = 0; int t = 0; iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn, 0); iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn, 0); iobuffer_write_head(*info_pkt, IOBUF_T_I4, &t, 0); iobuffer_write_head(*info_pkt, IOBUF_T_I4, &t, 0); } iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_sig, 0); iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_id, 0); rc = sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_INFO, uac->conn_id, info_pkt); WLog_INFO(TAG, "sp_ses_uac_send_info from %d", uac->remote_epid); } else { DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "send info failed, uac is not in connected state!"); rc = Error_IO; } return rc; } int sp_ses_uac_get_remote_epid(sp_ses_uac_t *uac) { return uac->remote_epid; } int sp_ses_uac_get_remote_svc_id(sp_ses_uac_t *uac) { return uac->remote_svc_id; } int sp_ses_uac_get_conn_id(sp_ses_uac_t *uac) { return uac->conn_id; } sp_ses_mgr_t *sp_ses_uac_get_mgr(sp_ses_uac_t *uac) { return uac->mgr; } int sp_ses_uac_get_tsx_cnt(sp_ses_uac_t *uac) { return uac->tsx_cnt; } static void __sp_ses_uac_destroy(sp_ses_uac_t *uac) { if (uac->cb.on_destroy) uac->cb.on_destroy(uac, uac->cb.user_data); sp_ses_mgr_dec_ref(uac->mgr); free(uac); DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "ses uac destroyed!"); } IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_uac, sp_ses_uac_t, ref_cnt, __sp_ses_uac_destroy) static __inline void tsx_uac_lock(sp_tsx_uac_t *tsx) { spinlock_enter(&tsx->lock, -1); } static __inline void tsx_uac_unlock(sp_tsx_uac_t *tsx) { spinlock_leave(&tsx->lock); } static void tsx_uac_trigger(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt) { iobuffer_t *pkt = ans_pkt ? *ans_pkt : NULL; tsx->cb.on_ans(tsx, error, end, ans_pkt, tsx->cb.user_data); if (error || end) sp_tsx_uac_dec_ref(tsx); // @ } static void tsx_uac_on_timer(timer_queue_t *q, timer_entry *timer, int err) { sp_tsx_uac_t *tsx = (sp_tsx_uac_t*)timer->user_data; if (!err && tsx->op_timer == timer && tsx->state == SP_TSX_UAC_STATE_REQ) { int trigger = 0; tsx_uac_lock(tsx); if (tsx->op_timer == timer) { if (tsx->state == SP_TSX_UAC_STATE_REQ) { tsx->state = SP_TSX_UAC_STATE_ERROR; // time out trigger ++; } tsx->op_timer = NULL; } if (trigger) tsx_uac_trigger(tsx, Error_TimeOut, 1, NULL); tsx_uac_unlock(tsx); } sp_tsx_uac_dec_ref(tsx); // @2 free(timer); } static void tsx_uac_process_ans(sp_tsx_uac_t *tsx, int end, iobuffer_t **ans_pkt) { int trigger = 0; tsx_uac_lock(tsx); if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) { if (tsx->op_timer) { sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1); tsx->op_timer = NULL; } if (end) { tsx->state = SP_TSX_UAC_STATE_ANS; } else if (tsx->state == SP_TSX_UAC_STATE_INIT) { tsx->state = SP_TSX_UAC_STATE_TMPANS; } trigger++; } else { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "tsx uac current state cannot recv ack!"); sp_svc_post(tsx->uac->mgr->svc, tsx->uac->remote_epid, tsx->uac->remote_svc_id, SP_PKT_SES|SES_CMD_ERRC, tsx->uac->conn_id, NULL); WLog_INFO(TAG, "tsx_uac_process_ans remote_epid:%d, remote_svc_id:%d", tsx->uac->remote_epid, tsx->uac->remote_svc_id); } if (trigger) tsx_uac_trigger(tsx, 0, end, ans_pkt); tsx_uac_unlock(tsx); } static void tsx_uac_process_errs(sp_tsx_uac_t *tsx, int error) { int trigger = 0; sp_tsx_uac_inc_ref(tsx); tsx_uac_lock(tsx); if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) { trigger++; if (tsx->op_timer) { sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1); tsx->op_timer = NULL; } } if (tsx->state != SP_TSX_UAC_STATE_TERM) tsx->state = SP_TSX_UAC_STATE_ERROR; if (trigger) tsx_uac_trigger(tsx, error, 1, NULL); tsx_uac_unlock(tsx); sp_tsx_uac_dec_ref(tsx); } int sp_tsx_uac_create(sp_ses_uac_t *uac, int tsx_id, int method_id, int method_sig, sp_tsx_uac_callback *cb, sp_tsx_uac_t **p_tsx) { sp_tsx_uac_t *tsx; unsigned int slot; if (!cb || !cb->on_ans) return Error_Param; switch ( uac->state ) { case SP_SES_STATE_INIT: case SP_SES_STATE_CONNECTING: return Error_NotInit; case SP_SES_STATE_TERM: case SP_SES_STATE_ERROR: return Error_NetBroken; } tsx = MALLOC_T(sp_tsx_uac_t); tsx->state = SP_TSX_UAC_STATE_INIT; tsx->tsx_id = tsx_id; tsx->method_id = method_id; tsx->method_sig = method_sig; tsx->uac = uac; tsx->op_timer = NULL; memcpy(&tsx->cb, cb, sizeof(sp_tsx_uac_callback)); spinlock_init(&tsx->lock); INIT_HLIST_NODE(&tsx->hentry); REF_COUNT_INIT(&tsx->ref_cnt); tsx->strand = strand_create(); sp_ses_uac_inc_ref(uac); sp_tsx_uac_inc_ref(tsx); uac_lock(uac); list_add_tail(&tsx->entry, &uac->tsx_list); uac->tsx_cnt++; uac_unlock(uac); slot = jhash_3words(uac->remote_svc_id, uac->conn_id, tsx_id, 0) % TSX_BUCKET_SIZE; sp_tsx_uac_inc_ref(tsx); mgr_lock(uac->mgr); hlist_add_head(&tsx->hentry, &uac->mgr->uac_tsx_buckets[slot]); mgr_unlock(uac->mgr); *p_tsx = tsx; //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "tsx uac created!"); return 0; } void sp_tsx_uac_destroy(sp_tsx_uac_t *tsx) { sp_ses_uac_t *uac = tsx->uac; TOOLKIT_ASSERT(tsx->state == SP_TSX_UAC_STATE_TERM); uac_lock(uac); uac->tsx_cnt--; list_del_init(&tsx->entry); uac_unlock(uac); sp_tsx_uac_dec_ref(tsx); mgr_lock(uac->mgr); hlist_del_init(&tsx->hentry); mgr_unlock(uac->mgr); sp_tsx_uac_dec_ref(tsx); sp_tsx_uac_dec_ref(tsx); } int sp_tsx_uac_async_req(sp_tsx_uac_t *tsx, int timeout, iobuffer_t **req_pkt) { int rc; sp_ses_uac_t *uac; if (tsx->state != SP_TSX_UAC_STATE_INIT) return Error_Unexpect; uac = tsx->uac; if (uac->state != SP_SES_STATE_CONNECTED) return Error_NetBroken; tsx_uac_lock(tsx); if (tsx->state == SP_TSX_UAC_STATE_INIT) { sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc); if (rsn_ctx) { iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0); } else { u__int64_t rsn = 0; int t = 0; iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &t, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &t, 0); } iobuffer_write_head(*req_pkt, IOBUF_T_I4, &timeout, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->method_sig, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->method_id, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->tsx_id, 0); sp_tsx_uac_inc_ref(tsx); // @1 rc = sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_REQ, uac->conn_id, req_pkt); WLog_INFO(TAG, "sp_tsx_uac_async_req remote_epid:%d, remote_svc_id:%d, method_sig:%d, method_id:%d", uac->remote_epid, uac->remote_svc_id, tsx->method_sig, tsx->method_id); if (rc == 0) { if (timeout >= 0) { tsx->op_timer = MALLOC_T(timer_entry); tsx->op_timer->cb = &tsx_uac_on_timer; tsx->op_timer->user_data = tsx; sp_tsx_uac_inc_ref(tsx); // @2 rc = sp_iom_schedule_timer(sp_svc_get_iom(uac->mgr->svc), tsx->op_timer, (unsigned int)timeout); if (rc != 0) { sp_tsx_uac_dec_ref(tsx); // @2 sp_tsx_uac_dec_ref(tsx); // @1 free(tsx->op_timer); tsx->op_timer = NULL; DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac tsx async_req, create timer failed!"); } } } else { sp_tsx_uac_dec_ref(tsx); // @1 DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "tsx uac issue async req failed, may be remote party offline!"); rc = Error_IO; } } else { rc = Error_Unexpect; DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "tsx uac state is not INIT!"); } tsx->state = rc ? SP_TSX_UAC_STATE_ERROR : SP_TSX_UAC_STATE_REQ; tsx_uac_unlock(tsx); return rc; } int sp_tsx_uac_close(sp_tsx_uac_t *tsx) { int trigger = 0; tsx_uac_lock(tsx); if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) { trigger++; if (tsx->op_timer) { sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1); tsx->op_timer = NULL; } } if (tsx->state != SP_TSX_UAC_STATE_TERM) tsx->state = SP_TSX_UAC_STATE_TERM; if (trigger) tsx_uac_trigger(tsx, Error_Closed, 1, NULL); tsx_uac_unlock(tsx); return 0; } sp_ses_uac_t *sp_tsx_uac_get_session(sp_tsx_uac_t *tsx) { return tsx->uac; } int sp_tsx_uac_get_state(sp_tsx_uac_t *tsx) { return tsx->state; } int sp_tsx_uac_get_id(sp_tsx_uac_t *tsx) { return tsx->tsx_id; } static void __sp_tsx_uac_destroy(sp_tsx_uac_t *tsx) { if (tsx->cb.on_destroy) tsx->cb.on_destroy(tsx, tsx->cb.user_data); sp_ses_uac_dec_ref(tsx->uac); strand_destroy(tsx->strand); free(tsx); //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "tsx uac destroyed!"); } IMPLEMENT_REF_COUNT_MT_STATIC(sp_tsx_uac, sp_tsx_uac_t, ref_cnt, __sp_tsx_uac_destroy) static __inline void uas_lock(sp_ses_uas_t *uas) { spinlock_enter(&uas->lock, -1); } static __inline void uas_unlock(sp_ses_uas_t *uas) { spinlock_leave(&uas->lock); } static const void* method_strand_getkey(const void *obj) { method_strand_t *ms = (method_strand_t *)obj; return (const void*)ms->type; } static unsigned int method_strand_hash(const void *key) { return (unsigned int)key; } static int method_strand_cmpkey(const void *key_x, const void *key_y) { return (int)key_x - (int)key_y; } static __inline method_strand_t* create_method_strand(int type) { method_strand_t *ms = MALLOC_T(method_strand_t); ms->strand = strand_create(); ms->type = type; INIT_HLIST_NODE(&ms->node); return ms; } static __inline void delete_method_strand(method_strand_t *ms) { strand_destroy(ms->strand); free(ms); } static void uas_process_errc(sp_ses_uas_t *uas, int error) { int trigger = 0; uas_lock(uas); if (uas->state == SP_SES_STATE_INIT) { trigger++; uas->state = SP_SES_STATE_ERROR; uas->state_begin_time = y2k_time_now(); uas->error = error; } else if (uas->state == SP_SES_STATE_CONNECTED) { trigger++; uas->state = SP_SES_STATE_ERROR; uas->state_begin_time = y2k_time_now(); uas->error = error; } uas_unlock(uas); if (trigger) uas_trigger(uas, error); } static void __threadpool_uas_trigger(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2) { sp_ses_uas_t *uas = (sp_ses_uas_t*)arg; sp_ses_mgr_t *mgr = uas->mgr; int error = (int)param1; sp_uid_t rsn = sp_svc_new_runserial(mgr->svc); sp_rsn_context_t rsn_ctx; sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_FRAMEWORK, &rsn_ctx); sp_svc_push_runserial_context(mgr->svc, &rsn_ctx); if (uas->cb.on_close) { if (error != Error_Closed) { sp_ses_uas_close(uas); } uas->cb.on_close(uas, error, uas->cb.user_data); } sp_ses_uas_dec_ref(uas); // @ sp_svc_pop_runserial_context(mgr->svc); } static void uas_trigger(sp_ses_uas_t *uas, int error) { int rc; sp_ses_uas_inc_ref(uas); // @ rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), uas->strand, &__threadpool_uas_trigger, uas, error, 0); if (rc != 0) { sp_ses_uas_dec_ref(uas); // @ DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uas_trigger, queue work item failed!"); } } static strand_t *uas_get_strand(sp_ses_uas_t *uas, int method_id) { method_strand_t *ms; uas_lock(uas); ms = hashset_find(uas->method_strand, (const void *)method_id); if (!ms) { ms = create_method_strand(method_id); hashset_add(uas->method_strand, ms); } uas_unlock(uas); return ms->strand; } static void __threadpool_uas_process_info(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2) { sp_ses_uas_t *uas = (sp_ses_uas_t*)arg; sp_ses_mgr_t *mgr = uas->mgr; int method_id = (int)param1; int method_sig; iobuffer_t *info_pkt = (iobuffer_t*)param2; iobuffer_read(info_pkt, IOBUF_T_I4, &method_sig, 0); if (uas->state == SP_SES_STATE_CONNECTED || (uas->state == SP_SES_STATE_ERROR && uas->error == Error_PeerClose)) { sp_rsn_context_t from_rsn; sp_rsn_context_t rsn; iobuffer_read(info_pkt, IOBUF_T_I4, &from_rsn.depth, 0); iobuffer_read(info_pkt, IOBUF_T_I4, &from_rsn.original_type, 0); iobuffer_read(info_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0); iobuffer_read(info_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0); sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn); sp_svc_push_runserial_context(mgr->svc, &rsn); uas->cb.on_info(uas, method_id, method_sig, &info_pkt, uas->cb.user_data); sp_svc_pop_runserial_context(mgr->svc); } if (info_pkt) iobuffer_dec_ref(info_pkt); sp_ses_uas_dec_ref(uas);//@ } static strand_t *uas_decide_strand(sp_ses_uas_t *uas, int method_id, int overlap) { strand_t *strand = NULL; if (uas->strand) { if (overlap) { strand = NULL; } else { strand = uas->strand; } } else { if (!overlap) { strand = uas_get_strand(uas, method_id); } } return strand; } static void uas_process_info(sp_ses_uas_t *uas, int method_id, int method_sig, iobuffer_t **info_pkt) { int rc; int overlap; rc = (*uas->cb.get_method_attr)(uas, method_id, method_sig, &overlap, uas->cb.user_data); if (rc == 0) { strand_t *strand = uas_decide_strand(uas, method_id, overlap); iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_sig, 0); sp_ses_uas_inc_ref(uas); //@ rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), strand, &__threadpool_uas_process_info, uas, method_id, (param_size_t)*info_pkt); if (rc != 0) { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "process info queue work item failed!"); sp_ses_uas_dec_ref(uas); //@ } else { *info_pkt = NULL; } } else { DbgWithLinkForC(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM, "method signature not match: method_id=%d, method_sig=%d, error=%d", method_id, method_sig, rc); } } static void uas_process_req_reply_error(sp_ses_uas_t *uas, int tsx_id, int err) { int v = 0; char* tmpStr = ""; iobuffer_t *ans_pkt = iobuffer_create(-1, -1); TOOLKIT_ASSERT(err != 0); iobuffer_write(ans_pkt, IOBUF_T_I4, &err, 0); // sys error iobuffer_write(ans_pkt, IOBUF_T_I4, &v, 0); // user error iobuffer_write(ans_pkt, IOBUF_T_STR, tmpStr, 0); iobuffer_write_head(ans_pkt, IOBUF_T_I4, &tsx_id, 0); sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_ANS, uas->conn_id, &ans_pkt); WLog_INFO(TAG, "uas_process_req_reply_error remote_epid:%d, remote_svc_id:%d", uas->remote_epid, uas->remote_svc_id); if (ans_pkt) iobuffer_dec_ref(ans_pkt); } static void __threadpool_uas_process_req(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2) { sp_ses_uas_t *uas = (sp_ses_uas_t*)arg; sp_ses_mgr_t *mgr = uas->mgr; int tsx_id = (int)param1; int method_id; int method_sig; int timeout; iobuffer_t *req_pkt = (iobuffer_t*)param2; /** 这个数据是准的,不然实体层不会找到对应的接口序号和签名,不一定,这些数据是过来的时候才塞进去的 [Gifur@2022624]*/ //WLog_DBG(TAG, "read method info"); iobuffer_read(req_pkt, IOBUF_T_I4, &method_id, 0); iobuffer_read(req_pkt, IOBUF_T_I4, &method_sig, 0); iobuffer_read(req_pkt, IOBUF_T_I4, &timeout, 0); if (uas->state == SP_SES_STATE_CONNECTED) { sp_rsn_context_t from_rsn; sp_rsn_context_t rsn; iobuffer_read(req_pkt, IOBUF_T_I4, &from_rsn.depth, 0); iobuffer_read(req_pkt, IOBUF_T_I4, &from_rsn.original_type, 0); iobuffer_read(req_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0); iobuffer_read(req_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0); sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn); sp_svc_push_runserial_context(mgr->svc, &rsn); uas->cb.on_req(uas, tsx_id, method_id, method_sig, timeout, &req_pkt, uas->cb.user_data); sp_svc_pop_runserial_context(mgr->svc); } else { uas_process_req_reply_error(uas, tsx_id, Error_InvalidState); } if (req_pkt) iobuffer_dec_ref(req_pkt); sp_ses_uas_dec_ref(uas); //@ } static void uas_process_req(sp_ses_uas_t *uas, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt) { int rc; int overlap; rc = uas->cb.get_method_attr(uas, method_id, method_sig, &overlap, uas->cb.user_data); if (rc == 0) { strand_t *strand = uas_decide_strand(uas, method_id, overlap); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &timeout, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &method_sig, 0); iobuffer_write_head(*req_pkt, IOBUF_T_I4, &method_id, 0); sp_ses_uas_inc_ref(uas); //@ rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), strand, &__threadpool_uas_process_req, uas, tsx_id, (param_size_t)*req_pkt); if (rc != 0) { uas_process_req_reply_error(uas, tsx_id, rc); DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "process req queue work item failed!"); sp_ses_uas_dec_ref(uas); //@ } else { *req_pkt = NULL; } } else { uas_process_req_reply_error(uas, tsx_id, rc); DbgWithLinkForC(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM, "method signature not match: tsx_id=%d, method_id=%d, method_sig=%d, error=%d", tsx_id, method_id, method_sig, rc); } } int sp_ses_uas_create(sp_ses_mgr_t *mgr, int remote_epid, int remote_svc_id, int conn_id, int overlap, sp_ses_uas_callback *cb, sp_ses_uas_t **p_uas) { unsigned int slot; sp_ses_uas_t *uas; if (!cb || !cb->on_req || !cb->on_info) return Error_Param; uas = MALLOC_T(sp_ses_uas_t); uas->mgr = mgr; uas->conn_id = conn_id; uas->remote_epid = remote_epid; uas->remote_svc_id = remote_svc_id; spinlock_init(&uas->lock); INIT_HLIST_NODE(&uas->hentry); memcpy(&uas->cb, cb, sizeof(sp_ses_uas_callback)); uas->state = SP_SES_STATE_INIT; uas->state_begin_time = y2k_time_now(); uas->error = 0; uas->tsx_cnt = 0; uas->begin_time = y2k_time_now(); uas->strand = overlap ? NULL : strand_create(); INIT_LIST_HEAD(&uas->tsx_list); REF_COUNT_INIT(&uas->ref_cnt); uas->method_strand = overlap ? hashset_create(offset_of(method_strand_t, node), &method_strand_getkey, &method_strand_hash, &method_strand_cmpkey) : NULL; sp_ses_mgr_inc_ref(mgr); slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE; sp_ses_uas_inc_ref(uas); mgr_lock(mgr); hlist_add_head(&uas->hentry, &mgr->uas_buckets[slot]); mgr->ses_cnt++; mgr_unlock(mgr); *p_uas = uas; //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "ses uas created!"); return 0; } void sp_ses_uas_destroy(sp_ses_uas_t *uas) { TOOLKIT_ASSERT(uas->state == SP_SES_STATE_TERM); mgr_lock(uas->mgr); uas->mgr->ses_cnt--; hlist_del_init(&uas->hentry); mgr_unlock(uas->mgr); sp_ses_uas_dec_ref(uas); sp_ses_uas_dec_ref(uas); } int sp_ses_uas_close(sp_ses_uas_t *uas) { int rc = 0; int trigger = 0; sp_ses_uas_inc_ref(uas); uas_lock(uas); if (uas->state == SP_SES_STATE_INIT) { uas->state = SP_SES_STATE_TERM; uas->state_begin_time = y2k_time_now(); trigger++; } else if (uas->state == SP_SES_STATE_CONNECTED) { sp_tsx_uas_t *pos, *n; uas->state = SP_SES_STATE_TERM; uas->state_begin_time = y2k_time_now(); trigger++; list_for_each_entry_safe(pos, n, &uas->tsx_list, sp_tsx_uas_t, entry) { sp_tsx_uas_close(pos); } } else if (uas->state == SP_SES_STATE_ERROR) { uas->state = SP_SES_STATE_TERM; uas->state_begin_time = y2k_time_now(); } else if (uas->state == SP_SES_STATE_TERM) { rc = Error_Duplication; } else { rc = Error_Unexpect; } if (trigger) uas_trigger(uas, Error_Closed); uas_unlock(uas); if (rc == 0) { sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_FINS, uas->conn_id, NULL); } sp_ses_uas_dec_ref(uas); return rc; } int sp_ses_uas_get_remote_epid(sp_ses_uas_t *uas) { return uas->remote_epid; } int sp_ses_uas_get_remote_svc_id(sp_ses_uas_t *uas) { return uas->remote_svc_id; } int sp_ses_uas_get_conn_id(sp_ses_uas_t *uas) { return uas->conn_id; } sp_ses_mgr_t *sp_ses_uas_get_mgr(sp_ses_uas_t *uas) { return uas->mgr; } int sp_ses_uas_get_state(sp_ses_uas_t *uas) { return uas->state; } int sp_ses_uas_get_tsx_cnt(sp_ses_uas_t *uas) { return uas->tsx_cnt; } int sp_ses_uas_accept(sp_ses_uas_t *uas) { int rc; uas_lock(uas); if (uas->state == SP_SES_STATE_INIT) { uas->state = SP_SES_STATE_CONNECTED; uas->state_begin_time = y2k_time_now(); rc = Error_Succeed; } else if (uas->state == SP_SES_STATE_CONNECTED) { rc = Error_Duplication; } else { rc = Error_NetBroken; } uas_unlock(uas); if (rc == 0) { iobuffer_t *pkt = iobuffer_create(-1, -1); sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uas->mgr->svc); if (rsn_ctx) { iobuffer_write_head(pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0); iobuffer_write_head(pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &rsn_ctx->depth, 0); } else { u__int64_t rsn = 0; int t = 0; iobuffer_write_head(pkt, IOBUF_T_I8, &rsn, 0); iobuffer_write_head(pkt, IOBUF_T_I8, &rsn, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); } rc = sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_CONNACK, uas->conn_id, &pkt); if (pkt) { iobuffer_dec_ref(pkt); } } return rc; } static void __sp_ses_uas_destroy(sp_ses_uas_t *uas) { if (uas->cb.on_destroy) { uas->cb.on_destroy(uas, uas->cb.user_data); } sp_ses_mgr_dec_ref(uas->mgr); if (uas->strand) strand_destroy(uas->strand); if (uas->method_strand) { method_strand_t *tpos; struct hlist_node *pos, *n; int slot; hashset_for_each_safe(tpos, pos, n, slot, uas->method_strand, method_strand_t) { hashset_remove(uas->method_strand, (void*)tpos->type); delete_method_strand(tpos); } hashset_destroy(uas->method_strand); } free(uas); //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "ses uas destroy!"); } IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_uas, sp_ses_uas_t, ref_cnt, __sp_ses_uas_destroy) static __inline void tsx_uas_lock(sp_tsx_uas_t *tsx) { spinlock_enter(&tsx->lock, -1); } static __inline void tsx_uas_unlock(sp_tsx_uas_t *tsx) { spinlock_leave(&tsx->lock); } int sp_tsx_uas_create(sp_ses_uas_t *uas, int tsx_id, sp_tsx_uas_callback *cb, sp_tsx_uas_t **p_tsx) { sp_tsx_uas_t *tsx = MALLOC_T(sp_tsx_uas_t); unsigned int slot; tsx->state = SP_TSX_UAS_STATE_INIT; uas->state_begin_time = y2k_time_now(); tsx->uas = uas; tsx->tsx_id = tsx_id; spinlock_init(&tsx->lock); memcpy(&tsx->cb, cb, sizeof(sp_tsx_uas_callback)); INIT_HLIST_NODE(&tsx->hentry); REF_COUNT_INIT(&tsx->ref_cnt); sp_ses_uas_inc_ref(uas); uas_lock(uas); sp_tsx_uas_inc_ref(tsx); list_add_tail(&tsx->entry, &uas->tsx_list); uas->tsx_cnt++; uas_unlock(uas); slot = jhash_3words(uas->remote_svc_id, uas->conn_id, tsx_id, 0) % TSX_BUCKET_SIZE; mgr_lock(uas->mgr); sp_tsx_uas_inc_ref(tsx); hlist_add_head(&tsx->hentry, &uas->mgr->uas_tsx_buckets[slot]); mgr_unlock(uas->mgr); *p_tsx = tsx; //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "tsx uas created!"); return 0; } void sp_tsx_uas_destroy(sp_tsx_uas_t *tsx) { sp_ses_uas_t *uas = tsx->uas; TOOLKIT_ASSERT(tsx->state == SP_TSX_UAS_STATE_TERM); uas_lock(uas); uas->tsx_cnt--; list_del_init(&tsx->entry); uas_unlock(uas); sp_tsx_uas_dec_ref(tsx); mgr_lock(uas->mgr); hlist_del_init(&tsx->hentry); mgr_unlock(uas->mgr); sp_tsx_uas_dec_ref(tsx); sp_tsx_uas_dec_ref(tsx); } sp_ses_uas_t *sp_tsx_uas_get_session(sp_tsx_uas_t *tsx) { return tsx->uas; } int sp_tsx_uas_get_state(sp_tsx_uas_t *tsx) { return tsx->state; } int sp_tsx_uas_get_id(sp_tsx_uas_t *tsx) { return tsx->tsx_id; } int sp_tsx_uas_close(sp_tsx_uas_t *tsx) { int rc; tsx_uas_lock(tsx); if (tsx->state != SP_TSX_UAS_STATE_TERM) { tsx->state = SP_TSX_UAS_STATE_TERM; rc = 0; } else { rc = Error_Duplication; } tsx_uas_unlock(tsx); return rc; } int sp_tsx_uas_answer(sp_tsx_uas_t *tsx, int end, iobuffer_t **ans_pkt) { int rc; int trigger = 0; tsx_uas_lock(tsx); if (tsx->state == SP_TSX_UAS_STATE_INIT) { trigger++; rc = 0; tsx->state = end ? SP_TSX_UAS_STATE_ANS : SP_TSX_UAS_STATE_TMPANS; } else if (tsx->state == SP_TSX_UAS_STATE_TMPANS) { trigger++; rc = 0; tsx->state = end ? SP_TSX_UAS_STATE_ANS : SP_TSX_UAS_STATE_TMPANS; } else { rc = Error_Bug; } tsx_uas_unlock(tsx); if (trigger) { iobuffer_write_head(*ans_pkt, IOBUF_T_I4, &tsx->tsx_id, 0); rc = sp_svc_post(tsx->uas->mgr->svc, tsx->uas->remote_epid, tsx->uas->remote_svc_id, SP_PKT_SES | (end ? SES_CMD_ANS : SES_CMD_TMPANS), tsx->uas->conn_id, ans_pkt); } return rc; } static void __sp_tsx_uas_destroy(sp_tsx_uas_t *tsx) { if (tsx->cb.on_destroy) tsx->cb.on_destroy(tsx, tsx->cb.user_data); sp_ses_uas_dec_ref(tsx->uas); free(tsx); //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "tsx uas destroyed!"); } IMPLEMENT_REF_COUNT_MT_STATIC(sp_tsx_uas, sp_tsx_uas_t, ref_cnt, __sp_tsx_uas_destroy)