|
- #include "precompile.h"
- #include "sp_ses.h"
- #include "sp_def.h"
- #include "sp_svc.h"
- #include "sp_dbg_export.h"
- #include "sp_mod.h"
- #include "sp_env.h"
- #include "SpBase.h"
- #include "memutil.h"
- #include "refcnt.h"
- #include "list.h"
- #include "jhash.h"
- #include "hashset.h"
- #include "spinlock.h"
- #define CONN_BUCKET_SIZE 127
- #define TSX_BUCKET_SIZE 511
- #define MAX_REDIRECT 10
- #define SES_CMD_REQ 0 /* uac -> uas two way call */
- #define SES_CMD_TMPANS 1 /* uas -> uac middle response */
- #define SES_CMD_ANS 2 /* uas -> uac the end response */
- #define SES_CMD_INFO 3 /* uac -> uas one way call */
- #define SES_CMD_CONNACK 4 /* uas -> uac */
- #define SES_CMD_FINC 5 /* uac -> uas session close */
- #define SES_CMD_FINS 6 /* uas -> uac session close */
- #define SES_CMD_CONN 7 /* uac -> uas */
- #define SES_CMD_ERRC 8 /* uac -> uas, session broken */
- #define SES_CMD_ERRS 9 /* uas -> uac, session broken */
- #define SES_CMD_REDIRECT 10 /* uas -> uac, session redirect */
- struct sp_ses_mgr_t
- {
- struct hlist_head uac_buckets[CONN_BUCKET_SIZE];
- struct hlist_head uas_buckets[CONN_BUCKET_SIZE];
- struct hlist_head uac_tsx_buckets[TSX_BUCKET_SIZE];
- struct hlist_head uas_tsx_buckets[TSX_BUCKET_SIZE];
- sp_ses_mgr_callback_t cb;
- sp_svc_t *svc;
- int ses_cnt;
- CRITICAL_SECTION lock;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_ses_mgr, sp_ses_mgr_t)
- struct sp_ses_uac_t
- {
- struct hlist_node hentry; // element of mgr->uac_buckets
- sp_ses_uac_callback cb;
- sp_ses_mgr_t *mgr;
- int remote_epid;
- int remote_svc_id;
- unsigned int conn_id;
- int state;
- int error;
- int tsx_cnt;
- int redirect_cnt;
- y2k_time_t begin_time;
- y2k_time_t state_begin_time;
- spinlock_t lock;
- timer_entry *op_timer;
- struct list_head tsx_list; // list of sp_tsx_uac_t->entry
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_ses_uac, sp_ses_uac_t)
- struct sp_tsx_uac_t
- {
- struct hlist_node hentry; // element of mgr->uac_tsx_buckets
- struct list_head entry; // element of sp_ses_uac_t->tsx_list
- int state;
- int tsx_id;
- int method_id;
- int method_sig;
- spinlock_t lock;
- sp_tsx_uac_callback cb;
- sp_ses_uac_t *uac;
- timer_entry *op_timer;
- strand_t *strand;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_tsx_uac, sp_tsx_uac_t)
- typedef struct method_strand_t {
- int type;
- strand_t *strand;
- struct hlist_node node;
- }method_strand_t;
- struct sp_ses_uas_t
- {
- struct hlist_node hentry; // element of mgr->uas_buckets
- sp_ses_uas_callback cb;
- sp_ses_mgr_t *mgr;
- int remote_epid;
- int remote_svc_id;
- int conn_id;
- int state;
- int error;
- int tsx_cnt;
- y2k_time_t begin_time;
- y2k_time_t state_begin_time;
- spinlock_t lock;
- strand_t *strand;
- hashset_t *method_strand;
- struct list_head tsx_list; // list of sp_tsx_uas_t->entry
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_ses_uas, sp_ses_uas_t)
- struct sp_tsx_uas_t
- {
- struct hlist_node hentry; // element of mgr->uas_tsx_buckets
- struct list_head entry; // element of sp_ses_uas_t->tsx_list
- int state;
- int tsx_id;
- spinlock_t lock;
- sp_tsx_uas_callback cb;
- sp_ses_uas_t *uas;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_tsx_uas, sp_tsx_uas_t)
- static void uac_process_connack(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id);
- static void uac_process_redirect(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt);
- static void tsx_uac_process_ans(sp_tsx_uac_t *tsx, int end, iobuffer_t **ans_pkt);
- static void uac_process_errs(sp_ses_uac_t *uac, int error);
- static void uas_process_errc(sp_ses_uas_t *uas, int error);
- static void uas_process_info(sp_ses_uas_t *uas, int method_id, int method_sig, iobuffer_t **info_pkt);
- static void uas_process_req(sp_ses_uas_t *uas, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt);
- static void uac_trigger(sp_ses_uac_t *uac, int error, int connect);
- static void tsx_uac_process_errs(sp_tsx_uac_t *tsx, int error);
- static void uas_trigger(sp_ses_uas_t *uas, int error);
- static __inline void mgr_lock(sp_ses_mgr_t *mgr)
- {
- EnterCriticalSection(&mgr->lock);
- }
- static __inline void mgr_unlock(sp_ses_mgr_t *mgr)
- {
- LeaveCriticalSection(&mgr->lock);
- }
- static sp_ses_uac_t *mgr_find_uac(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- int slot;
- sp_ses_uac_t *tpos;
- struct hlist_node *pos;
- slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE;
- hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[slot], sp_ses_uac_t, hentry) {
- if (tpos->conn_id == conn_id)
- return tpos;
- }
- return NULL;
- }
- static sp_ses_uas_t *mgr_find_uas(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- int slot;
- sp_ses_uas_t *tpos;
- struct hlist_node *pos;
- slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE;
- hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[slot], sp_ses_uas_t, hentry) {
- if (tpos->conn_id == conn_id)
- return tpos;
- }
- return NULL;
- }
- sp_ses_info_t* sp_ses_mgr_get_ses_info(sp_ses_mgr_t *mgr, int *cnt)
- {
- sp_ses_info_t* ret = NULL;
- int index = 0;
- int i = 0;
-
- *cnt = mgr->ses_cnt;
-
- if (*cnt <=0)
- return NULL;
- ret = (sp_ses_info_t*) malloc(sizeof(sp_ses_info_t)*(*cnt));
- for(i=0; i<CONN_BUCKET_SIZE; i++)
- {
- sp_ses_uac_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[i], sp_ses_uac_t, hentry)
- {
- ret[index].from_svc_id = sp_svc_get_id(mgr->svc);
- ret[index].to_svc_id = tpos->remote_svc_id;
- ret[index].begin_time = tpos->begin_time;
- ret[index].state_begin_time = tpos->state_begin_time;
- ret[index].state = tpos->state == SP_SES_STATE_CONNECTED ? 1 : 3;
- index++;
- }
- }
- for(i=0; i<CONN_BUCKET_SIZE; i++)
- {
- sp_ses_uas_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[i], sp_ses_uas_t, hentry)
- {
- ret[index].from_svc_id = tpos->remote_svc_id;
- ret[index].to_svc_id = sp_svc_get_id(mgr->svc);
- ret[index].begin_time = tpos->begin_time;
- ret[index].state_begin_time = tpos->state_begin_time;
- ret[index].state = tpos->state == SP_SES_STATE_CONNECTED ? 1 : 3;
- index++;
- }
- }
- assert(index == *cnt);
- return ret;
- }
- static sp_tsx_uac_t *mgr_find_tsx_uac(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id)
- {
- int slot;
- sp_tsx_uac_t *tpos;
- struct hlist_node *pos;
- slot = jhash_3words(svc_id, conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
- hlist_for_each_entry(tpos, pos, &mgr->uac_tsx_buckets[slot], sp_tsx_uac_t, hentry) {
- if (tpos->uac->remote_svc_id == svc_id && tpos->uac->conn_id == conn_id && tpos->tsx_id == tsx_id)
- return tpos;
- }
- return NULL;
- }
- static sp_tsx_uas_t *mgr_find_tsx_uas(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id)
- {
- int slot;
- sp_tsx_uas_t *tpos;
- struct hlist_node *pos;
- slot = jhash_3words(svc_id, conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
- hlist_for_each_entry(tpos, pos, &mgr->uas_tsx_buckets[slot], sp_tsx_uas_t, hentry) {
- if (tpos->uas->remote_svc_id == svc_id && tpos->uas->conn_id == conn_id && tpos->tsx_id == tsx_id)
- return tpos;
- }
- return NULL;
- }
- static void mgr_process_connack(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- sp_ses_uac_t *uac;
- mgr_lock(mgr);
- uac = mgr_find_uac(mgr, epid, svc_id, conn_id);
- if (uac)
- sp_ses_uac_inc_ref(uac); // lock
- mgr_unlock(mgr);
- if (!uac) {
- sp_dbg_warn("connack find no peer!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL);
- return;
- }
- uac_process_connack(uac, epid, svc_id, conn_id);
- sp_ses_uac_dec_ref(uac); // unlock
- }
- static void mgr_process_redirect(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt)
- {
- sp_ses_uac_t *uac;
- mgr_lock(mgr);
- uac = mgr_find_uac(mgr, epid, svc_id, conn_id);
- if (uac)
- sp_ses_uac_inc_ref(uac); // lock
- mgr_unlock(mgr);
- if (!uac) {
- sp_dbg_warn("redirect cannot find uac!");
- return;
- }
- uac_process_redirect(uac, epid, svc_id, conn_id, redirect_ent_id, p_pkt);
- sp_ses_uac_dec_ref(uac); // unlock
- }
- static void mgr_process_ans(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int end, int tsx_id, iobuffer_t **ans_pkt)
- {
- sp_tsx_uac_t *tsx;
- mgr_lock(mgr);
- tsx = mgr_find_tsx_uac(mgr, epid, svc_id, conn_id, tsx_id);
- if (tsx)
- sp_tsx_uac_inc_ref(tsx); // lock
- mgr_unlock(mgr);
- if (!tsx) {
- sp_dbg_warn("ack find no peer!");
- //sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL);
- return;
- }
- tsx_uac_process_ans(tsx, end, ans_pkt);
- sp_tsx_uac_dec_ref(tsx); // unlock
- }
- static void mgr_process_errs(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int error)
- {
- sp_ses_uac_t *uac;
- mgr_lock(mgr);
- uac = mgr_find_uac(mgr, epid, svc_id, conn_id);
- if (uac)
- sp_ses_uac_inc_ref(uac); // @ lock uac no delete
- mgr_unlock(mgr);
- if (uac) {
- uac_process_errs(uac, error);
- sp_ses_uac_dec_ref(uac); // @ unlock
- }
- }
- static void mgr_process_errc(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int error)
- {
- sp_ses_uas_t *uas;
- mgr_lock(mgr);
- uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
- if (uas)
- sp_ses_uas_inc_ref(uas); // @ lock uac no delete
- mgr_unlock(mgr);
- if (uas) {
- uas_process_errc(uas, error);
- sp_ses_uas_dec_ref(uas); // @ unlock
- }
- }
- static void mgr_process_fins(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- mgr_process_errs(mgr, epid, svc_id, conn_id, Error_PeerClose);
- }
- static void __threadpool_mgr_process_conn(threadpool_t *threadpool, void *arg, int param1, int param2)
- {
- sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)arg;
- iobuffer_t *conn_pkt = (iobuffer_t*)param1;
- int epid;
- int svc_id;
- int conn_id;
- int redirect_target = 0;
- int rc;
- int read_state;
- sp_rsn_context_t from_rsn;
- sp_rsn_context_t rsn;
- iobuffer_read(conn_pkt, IOBUF_T_I4, &epid, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I4, &conn_id, 0);
- read_state = iobuffer_get_read_state(conn_pkt);
- sp_dbg_info("begin on_accept, from epid:%d, svc_id:%d, conn_id:%d", epid, svc_id, conn_id);
- iobuffer_read(conn_pkt, IOBUF_T_I4, &from_rsn.depth, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I4, &from_rsn.original_type, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0);
- sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn);
- sp_svc_push_runserial_context(mgr->svc, &rsn);
- rc = mgr->cb.on_accept(mgr, epid, svc_id, conn_id, &conn_pkt, &redirect_target, mgr->cb.user_data);
- sp_svc_pop_runserial_context(mgr->svc);
- if (rc) {
- if (redirect_target) {
- sp_dbg_warn("conn from svc_id %d redirect to %d!", svc_id, redirect_target);
- iobuffer_restore_read_state(conn_pkt, read_state);
- iobuffer_write_head(conn_pkt, IOBUF_T_I4, &redirect_target, 0);
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_REDIRECT, conn_id, &conn_pkt);
- } else {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- sp_dbg_warn("conn from svc_id %d reject! rc: %d", svc_id, rc);
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- }
- } else {
- sp_dbg_info("on accept ok!");
- }
- if (conn_pkt) {
- iobuffer_dec_ref(conn_pkt);
- }
- sp_ses_mgr_dec_ref(mgr); // @
- }
- static void mgr_process_conn(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, iobuffer_t **conn_pkt)
- {
- sp_ses_uas_t *uas;
- int err;
- sp_entity_t *ent;
- mgr_lock(mgr);
- uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
- mgr_unlock(mgr);
- ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, sp_svc_get_id(mgr->svc));
- if (ent) {
- if (ent->state == EntityState_Lost) {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_Losted;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- sp_dbg_warn("reject connection because of lost state!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- }
- } else {
- assert(0);// never go here
- }
- if (uas) { // already exist, duplicate
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_Duplication;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- sp_dbg_warn("conn duplicate!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- }
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &conn_id, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &epid, 0);
- sp_ses_mgr_inc_ref(mgr); // @
- err = threadpool_queue_workitem2(sp_svc_get_threadpool(mgr->svc), NULL, &__threadpool_mgr_process_conn, mgr, (int)*conn_pkt, 0);
- if (err)
- {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_PeerReject;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- sp_ses_mgr_dec_ref(mgr); // @
- sp_dbg_warn("conn from svc_id %d reject!", svc_id);
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- } else {
- *conn_pkt = NULL;
- }
- }
- static void mgr_process_info(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt)
- {
- sp_ses_uas_t *uas;
- mgr_lock(mgr);
- uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
- if (uas) {
- sp_ses_uas_inc_ref(uas); // @
- }
- mgr_unlock(mgr);
- if (!uas) {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_NotExist;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- sp_dbg_warn("cannot find uas session!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- }
- uas_process_info(uas, method_id, method_sig, info_pkt);
- sp_ses_uas_dec_ref(uas); // @
- }
- static void mgr_process_req(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt)
- {
- sp_ses_uas_t *uas;
- mgr_lock(mgr);
- uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
- if (uas) {
- sp_ses_uas_inc_ref(uas); // @
- }
- mgr_unlock(mgr);
- if (!uas) {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_NotExist;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- sp_dbg_warn("cannot find uas session!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- }
- uas_process_req(uas, tsx_id, method_id, method_sig, timeout, req_pkt);
- sp_ses_uas_dec_ref(uas); // @
- }
- static void mgr_process_finc(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- mgr_process_errc(mgr, epid, svc_id, conn_id, Error_PeerClose);
- }
- static int mgr_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
- {
- sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)user_data;
- int cmd_type;
- int tsx_id;
- int method_id;
- int method_sig;
- int timeout;
- int redirect_ent_id;
- int rc = 0;
- assert(SP_GET_PKT_TYPE(pkt_type) == SP_PKT_SES);
- cmd_type = SP_GET_TYPE(pkt_type);
- switch (cmd_type) {
- case SES_CMD_INFO:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &method_id, NULL);
- iobuffer_read(*p_pkt, IOBUF_T_I4, &method_sig, NULL);
- mgr_process_info(mgr, epid, svc_id, pkt_id, method_id, method_sig, p_pkt);
- break;
- case SES_CMD_CONN:
- mgr_process_conn(mgr, epid, svc_id, pkt_id, p_pkt);
- break;
- case SES_CMD_CONNACK:
- mgr_process_connack(mgr, epid, svc_id, pkt_id);
- break;
- case SES_CMD_FINC:
- mgr_process_finc(mgr, epid, svc_id, pkt_id);
- break;
- case SES_CMD_FINS:
- mgr_process_fins(mgr, epid, svc_id, pkt_id);
- break;
- case SES_CMD_ERRC:
- mgr_process_errc(mgr, epid, svc_id, pkt_id, Error_NetBroken);
- break;
- case SES_CMD_ERRS:
- if (p_pkt != NULL && *p_pkt != NULL)
- {
- iobuffer_read(*p_pkt, IOBUF_T_I4, &rc, NULL);
- iobuffer_dec_ref(*p_pkt);
- *p_pkt = NULL;
- }
- else
- rc = Error_NetBroken;
- mgr_process_errs(mgr, epid, svc_id, pkt_id, rc);
- break;
- case SES_CMD_REQ:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL);
- iobuffer_read(*p_pkt, IOBUF_T_I4, &method_id, NULL);
- iobuffer_read(*p_pkt, IOBUF_T_I4, &method_sig, NULL);
- iobuffer_read(*p_pkt, IOBUF_T_I4, &timeout, NULL);
- mgr_process_req(mgr, epid, svc_id, pkt_id, tsx_id, method_id, method_sig, timeout, p_pkt);
- break;
- case SES_CMD_TMPANS:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL);
- mgr_process_ans(mgr, epid, svc_id, pkt_id, 0, tsx_id, p_pkt);
- break;
- case SES_CMD_ANS:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL);
- mgr_process_ans(mgr, epid, svc_id, pkt_id, 1, tsx_id, p_pkt);
- break;
- case SES_CMD_REDIRECT:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &redirect_ent_id, NULL);
- mgr_process_redirect(mgr, epid, svc_id, pkt_id, redirect_ent_id, p_pkt);
- break;
- default:
- sp_dbg_warn("recv unknown ses pkt!");
- assert(0);
- break;
- }
- return FALSE;
- }
- static void mgr_on_sys(sp_svc_t *svc,int epid, int state, void *user_data)
- {
- sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)user_data;
- if (state == BUS_STATE_OFF) {
- int i;
- mgr_lock(mgr);
- for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
- sp_ses_uas_t *tpos;
- struct hlist_node *pos, *n;
- hlist_for_each_entry_safe(tpos, pos, n, &mgr->uas_buckets[i], sp_ses_uas_t, hentry) {
- if (epid == tpos->remote_epid) {
- uas_process_errc(tpos, Error_NetBroken);
- }
- }
- }
- for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
- sp_ses_uac_t *tpos;
- struct hlist_node *pos, *n;
- hlist_for_each_entry_safe(tpos, pos, n, &mgr->uac_buckets[i], sp_ses_uac_t, hentry) {
- if (epid == tpos->remote_epid) {
- uac_process_errs(tpos, Error_NetBroken);
- }
- }
- }
- mgr_unlock(mgr);
- }
- }
- int sp_ses_mgr_create(sp_svc_t *svc, sp_ses_mgr_callback_t *cb, sp_ses_mgr_t **p_mgr)
- {
- sp_ses_mgr_t *mgr = MALLOC_T(sp_ses_mgr_t);
- int i;
- mgr->ses_cnt = 0;
- mgr->svc = svc;
- memcpy(&mgr->cb, cb, sizeof(sp_ses_mgr_callback_t));
- REF_COUNT_INIT(&mgr->ref_cnt);
- for (i = 0;i < CONN_BUCKET_SIZE; ++i) {
- INIT_HLIST_HEAD(&mgr->uac_buckets[i]);
- INIT_HLIST_HEAD(&mgr->uas_buckets[i]);
- }
- for (i = 0; i < TSX_BUCKET_SIZE; ++i) {
- INIT_HLIST_HEAD(&mgr->uac_tsx_buckets[i]);
- INIT_HLIST_HEAD(&mgr->uas_tsx_buckets[i]);
- }
- InitializeCriticalSection(&mgr->lock);
- *p_mgr = mgr;
- return 0;
- }
- void sp_ses_mgr_destroy(sp_ses_mgr_t *mgr)
- {
- sp_ses_mgr_dec_ref(mgr);
- }
- int sp_ses_mgr_cancel_all(sp_ses_mgr_t *mgr)
- {
- int i;
- mgr_lock(mgr);
- for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
- sp_ses_uas_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[i], sp_ses_uas_t, hentry) {
- sp_ses_uas_close(tpos);
- }
- }
- for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
- sp_ses_uac_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[i], sp_ses_uac_t, hentry) {
- sp_ses_uac_close(tpos);
- }
- }
- mgr_unlock(mgr);
- return 0;
- }
- int sp_ses_mgr_get_conn_cnt(sp_ses_mgr_t *mgr)
- {
- return mgr->ses_cnt;
- }
- int sp_ses_mgr_start(sp_ses_mgr_t *mgr)
- {
- int rc;
- rc = sp_svc_add_pkt_handler(mgr->svc, (int)mgr, SP_PKT_SES, &mgr_on_pkt, mgr);
- if (rc == 0)
- rc = sp_svc_add_sys_handler(mgr->svc, (int)mgr, &mgr_on_sys, mgr);
- return rc;
- }
- int sp_ses_mgr_stop(sp_ses_mgr_t *mgr)
- {
- int rc;
- rc = sp_svc_remove_pkt_handler(mgr->svc, (int)mgr, SP_PKT_SES);
- if (rc == 0)
- rc = sp_svc_remove_sys_handler(mgr->svc, (int)mgr);
- return rc;
- }
- sp_svc_t *sp_ses_mgr_get_svc(sp_ses_mgr_t *mgr)
- {
- return mgr->svc;
- }
- static void __sp_ses_mgr_destroy(sp_ses_mgr_t *mgr)
- {
- if (mgr->cb.on_destroy)
- mgr->cb.on_destroy(mgr, mgr->cb.user_data);
- assert(mgr->ses_cnt == 0);
- DeleteCriticalSection(&mgr->lock);
- free(mgr);
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_mgr, sp_ses_mgr_t, ref_cnt, __sp_ses_mgr_destroy)
- static __inline void uac_lock(sp_ses_uac_t *ses)
- {
- spinlock_enter(&ses->lock, -1);
- }
- static __inline void uac_unlock(sp_ses_uac_t *ses)
- {
- spinlock_leave(&ses->lock);
- }
- static void uac_process_connack(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id)
- {
- int trigger = 0;
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_CONNECTING) {
- uac->state = SP_SES_STATE_CONNECTED;
- uac->state_begin_time = y2k_time_now();
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- trigger++;
- } else {
- sp_dbg_warn("current state is not connecting!");
- sp_svc_post(uac->mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL);
- }
- if (trigger)
- uac_trigger(uac, 0, 1);
- uac_unlock(uac);
- }
- static void uac_process_redirect(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt)
- {
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_CONNECTING) {
- ++uac->redirect_cnt;
- if (uac->redirect_cnt >= MAX_REDIRECT) {
- sp_dbg_warn("uac has exceed max_direct value, maybe redirection enter end loop!");
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac_trigger(uac, Error_NetBroken, 1);
- } else {
- sp_entity_t *ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, redirect_ent_id);
- if (ent->state == EntityState_Killed || ent->state == EntityState_Lost || ent->state == EntityState_NoStart) {
- sp_dbg_warn("redirected failed! redirect ent state invalid! redirect id:%d", redirect_ent_id);
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac_trigger(uac, Error_InvalidState, 1);
- } else {
- uac->remote_epid = ent->mod->cfg->idx;
- uac->remote_svc_id = ent->cfg->idx;
- sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_CONN, conn_id, p_pkt);
- sp_dbg_info("svc_post, epid:%d, svc_id:%d", uac->remote_epid, uac->remote_svc_id);
- }
- }
- }
- uac_unlock(uac);
- }
- static void uac_process_errs(sp_ses_uac_t *uac, int error)
- {
- int trigger = 0;
- int connect = 0;
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_INIT) {
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac->error = error;
- } else if (uac->state == SP_SES_STATE_CONNECTING) {
- trigger++;
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac->error = error;
- connect ++;
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- } else if (uac->state == SP_SES_STATE_CONNECTED) {
- sp_tsx_uac_t *pos, *n;
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac->error = error;
- list_for_each_entry_safe(pos, n, &uac->tsx_list, sp_tsx_uac_t, entry) {
- tsx_uac_process_errs(pos, error);
- }
- trigger++;
- }
- if (trigger)
- uac_trigger(uac, error, connect);
- uac_unlock(uac);
- }
- static void __threadpool_uac_trigger_close(threadpool_t *threadpool, void *arg, int param1, int param2)
- {
- sp_ses_uac_t *uac = (sp_ses_uac_t*)arg;
- sp_ses_mgr_t *mgr = uac->mgr;
-
- int error = param1;
- sp_uid_t rsn = sp_svc_new_runserial(mgr->svc);
- sp_rsn_context_t rsn_ctx;
- sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_FRAMEWORK, &rsn_ctx);
- sp_svc_push_runserial_context(mgr->svc, &rsn_ctx);
- uac->cb.on_close(uac, error, uac->cb.user_data);
- sp_ses_uac_dec_ref(uac); // @
- sp_svc_pop_runserial_context(mgr->svc);
- }
- static void uac_trigger(sp_ses_uac_t *uac, int error, int connect)
- {
- if (connect) {
- uac->cb.on_connect(uac, error, uac->cb.user_data);
- sp_ses_uac_dec_ref(uac); // @
- } else {
- if (sp_iom_get_poll_thread_id(sp_svc_get_iom(uac->mgr->svc)) == GetCurrentThreadId()) { //
- int rc;
- sp_ses_uac_inc_ref(uac); // @
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uac->mgr->svc), NULL, &__threadpool_uac_trigger_close, uac, error, 0);
- if (rc != 0) {
- sp_ses_uac_dec_ref(uac); // @
- sp_dbg_warn("thread pool queue uac_trirget_close event failed!");
- }
- } else {
- uac->cb.on_close(uac, error, uac->cb.user_data);
- }
- }
- }
- static void uac_on_timer(timer_queue_t *q, timer_entry *timer, int err)
- {
- sp_ses_uac_t *uac = (sp_ses_uac_t*)timer->user_data;
- if (!err && uac->op_timer == timer && uac->state == SP_SES_STATE_CONNECTING) {
- int trigger = 0;
- uac_lock(uac);
- if (uac->op_timer == timer) {
- if (uac->state == SP_SES_STATE_CONNECTING) {
- uac->state = SP_SES_STATE_ERROR; // time out
- uac->state_begin_time = y2k_time_now();
- trigger ++;
- }
- uac->op_timer = NULL;
- }
- if (trigger)
- uac_trigger(uac, Error_TimeOut, 1);
- uac_unlock(uac);
- }
- sp_ses_uac_dec_ref(uac); // @2
- free(timer);
- }
- int sp_ses_uac_create(sp_ses_mgr_t *mgr, int remote_epid, int remote_svc_id, sp_ses_uac_callback *cb, sp_ses_uac_t **p_ses)
- {
- sp_ses_uac_t *ses;
- int slot;
- if (!cb || !cb->on_connect)
- return Error_Param;
- ses = MALLOC_T(sp_ses_uac_t);
- ses->mgr = mgr;
- ses->conn_id = sp_env_new_id(sp_get_env());
- ses->remote_svc_id = remote_svc_id;
- ses->remote_epid = remote_epid;
- ses->state = SP_SES_STATE_INIT;
- ses->state_begin_time = y2k_time_now();
- ses->error = 0;
- ses->op_timer = NULL;
- ses->tsx_cnt = 0;
- ses->begin_time = y2k_time_now();
- ses->redirect_cnt = 0;
- memcpy(&ses->cb, cb, sizeof(sp_ses_uac_callback));
- INIT_HLIST_NODE(&ses->hentry);
- REF_COUNT_INIT(&ses->ref_cnt);
- spinlock_init(&ses->lock);
- INIT_LIST_HEAD(&ses->tsx_list);
- sp_ses_mgr_inc_ref(mgr);
- slot = ses->conn_id % CONN_BUCKET_SIZE;
- sp_ses_uac_inc_ref(ses);
- mgr_lock(mgr);
- hlist_add_head(&ses->hentry, &mgr->uac_buckets[slot]);
- mgr->ses_cnt++;
- mgr_unlock(mgr);
- *p_ses = ses;
- sp_dbg_info("ses uac created!");
- return 0;
- }
- void sp_ses_uac_destroy(sp_ses_uac_t *uac)
- {
- assert(uac->state == SP_SES_STATE_TERM);
- mgr_lock(uac->mgr);
- uac->mgr->ses_cnt--;
- hlist_del_init(&uac->hentry);
- mgr_unlock(uac->mgr);
- sp_ses_uac_dec_ref(uac);
- sp_ses_uac_dec_ref(uac);
- }
- int sp_ses_uac_async_connect(sp_ses_uac_t *uac, int timeout, iobuffer_t **conn_pkt)
- {
- int rc;
- if (uac->state != SP_SES_STATE_INIT)
- return Error_Unexpect;
- sp_ses_uac_inc_ref(uac);
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_INIT) {
- sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc);
- sp_ses_uac_inc_ref(uac); // @1
- if (rsn_ctx) {
- iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- unsigned __int64 rsn = 0;
- int t = 0;
- iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &t, 0);
- }
- rc = sp_svc_send(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_CONN, uac->conn_id, conn_pkt);
- sp_dbg_info("svc_send, epid:%d, svc_id:%d", uac->remote_epid, uac->remote_svc_id);
- if (rc == 0) {
- if (timeout >= 0) {
- uac->op_timer = MALLOC_T(timer_entry);
- uac->op_timer->cb = &uac_on_timer;
- uac->op_timer->user_data = uac;
- sp_ses_uac_inc_ref(uac); // @2
- rc = sp_iom_schedule_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, (unsigned int)timeout);
- if (rc != 0) {
- iobuffer_pop_count(*conn_pkt, 24);
- sp_ses_uac_dec_ref(uac); // @2
- sp_ses_uac_dec_ref(uac); // @1
- free(uac->op_timer);
- uac->op_timer = NULL;
- sp_dbg_warn("post conn, create timer failed!");
- }
- }
- } else {
- iobuffer_pop_count(*conn_pkt, 24);
- sp_ses_uac_dec_ref(uac); // @1
- sp_dbg_warn("uac issue async connect failed, post msg failed, may be remote party offline!");
- rc = Error_IO;
- }
- } else {
- sp_dbg_warn("uac state is not INIT !");
- rc = Error_Unexpect;
- }
- uac->state = rc ? SP_SES_STATE_ERROR : SP_SES_STATE_CONNECTING;
- uac->state_begin_time = y2k_time_now();
- uac_unlock(uac);
- sp_ses_uac_dec_ref(uac);
- return rc;
- }
- int sp_ses_uac_close(sp_ses_uac_t *uac)
- {
- int rc = 0;
- int trigger = 0;
- int connected = 0;
- sp_dbg_info("sp_ses_uac_close invoked!");
- sp_ses_uac_inc_ref(uac);
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_INIT) {
- uac->state = SP_SES_STATE_TERM;
- uac->state_begin_time = y2k_time_now();
- } else if (uac->state == SP_SES_STATE_CONNECTING) {
- trigger++;
- uac->state = SP_SES_STATE_TERM;
- uac->state_begin_time = y2k_time_now();
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- connected++;
- } else if (uac->state == SP_SES_STATE_CONNECTED) {
- sp_tsx_uac_t *pos, *n;
- uac->state = SP_SES_STATE_TERM;
- uac->state_begin_time = y2k_time_now();
- list_for_each_entry_safe(pos, n, &uac->tsx_list, sp_tsx_uac_t, entry) {
- sp_tsx_uac_close(pos);
- }
- trigger++;
- } else if (uac->state == SP_SES_STATE_ERROR) {
- uac->state = SP_SES_STATE_TERM;
- uac->state_begin_time = y2k_time_now();
- } else {
- sp_dbg_warn("uac->state = %d", uac->state);
- rc = Error_Duplication;
- }
- if (trigger)
- uac_trigger(uac, Error_Closed, connected);
- uac_unlock(uac);
- if (rc == 0) {
- sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_FINC, uac->conn_id, NULL);
- }
- sp_ses_uac_dec_ref(uac);
- return rc;
- }
- int sp_ses_uac_get_state(sp_ses_uac_t *uac)
- {
- return uac->state;
- }
- int sp_ses_uac_send_info(sp_ses_uac_t *uac, int method_id, int method_sig, iobuffer_t **info_pkt)
- {
- int rc;
- if (uac->state == SP_SES_STATE_CONNECTED) {
- sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc);
- if (rsn_ctx) {
- iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- unsigned __int64 rsn = 0;
- int t = 0;
- iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &t, 0);
- }
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_sig, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_id, 0);
- rc = sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_INFO, uac->conn_id, info_pkt);
- } else {
- sp_dbg_info("send info failed, uac is not in connected state!");
- rc = Error_IO;
- }
- return rc;
- }
- int sp_ses_uac_get_remote_epid(sp_ses_uac_t *uac)
- {
- return uac->remote_epid;
- }
- int sp_ses_uac_get_remote_svc_id(sp_ses_uac_t *uac)
- {
- return uac->remote_svc_id;
- }
- int sp_ses_uac_get_conn_id(sp_ses_uac_t *uac)
- {
- return uac->conn_id;
- }
- sp_ses_mgr_t *sp_ses_uac_get_mgr(sp_ses_uac_t *uac)
- {
- return uac->mgr;
- }
- int sp_ses_uac_get_tsx_cnt(sp_ses_uac_t *uac)
- {
- return uac->tsx_cnt;
- }
- static void __sp_ses_uac_destroy(sp_ses_uac_t *uac)
- {
- if (uac->cb.on_destroy)
- uac->cb.on_destroy(uac, uac->cb.user_data);
- sp_ses_mgr_dec_ref(uac->mgr);
- free(uac);
- sp_dbg_info("ses uac destroyed!");
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_uac, sp_ses_uac_t, ref_cnt, __sp_ses_uac_destroy)
- static __inline void tsx_uac_lock(sp_tsx_uac_t *tsx)
- {
- spinlock_enter(&tsx->lock, -1);
- }
- static __inline void tsx_uac_unlock(sp_tsx_uac_t *tsx)
- {
- spinlock_leave(&tsx->lock);
- }
- static void tsx_uac_trigger(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt)
- {
- iobuffer_t *pkt = ans_pkt ? *ans_pkt : NULL;
- tsx->cb.on_ans(tsx, error, end, ans_pkt, tsx->cb.user_data);
- if (error || end)
- sp_tsx_uac_dec_ref(tsx); // @
- }
- static void tsx_uac_on_timer(timer_queue_t *q, timer_entry *timer, int err)
- {
- sp_tsx_uac_t *tsx = (sp_tsx_uac_t*)timer->user_data;
- if (!err && tsx->op_timer == timer && tsx->state == SP_TSX_UAC_STATE_REQ) {
- int trigger = 0;
- tsx_uac_lock(tsx);
- if (tsx->op_timer == timer) {
- if (tsx->state == SP_TSX_UAC_STATE_REQ) {
- tsx->state = SP_TSX_UAC_STATE_ERROR; // time out
- trigger ++;
- }
- tsx->op_timer = NULL;
- }
- if (trigger)
- tsx_uac_trigger(tsx, Error_TimeOut, 1, NULL);
- tsx_uac_unlock(tsx);
- }
- sp_tsx_uac_dec_ref(tsx); // @2
- free(timer);
- }
- static void tsx_uac_process_ans(sp_tsx_uac_t *tsx, int end, iobuffer_t **ans_pkt)
- {
- int trigger = 0;
- tsx_uac_lock(tsx);
- if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) {
- if (tsx->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1);
- tsx->op_timer = NULL;
- }
- if (end) {
- tsx->state = SP_TSX_UAC_STATE_ANS;
- } else if (tsx->state == SP_TSX_UAC_STATE_INIT) {
- tsx->state = SP_TSX_UAC_STATE_TMPANS;
- }
- trigger++;
- } else {
- sp_dbg_warn("tsx uac current state cannot recv ack!");
- sp_svc_post(tsx->uac->mgr->svc, tsx->uac->remote_epid, tsx->uac->remote_svc_id, SP_PKT_SES|SES_CMD_ERRC, tsx->uac->conn_id, NULL);
- }
- if (trigger)
- tsx_uac_trigger(tsx, 0, end, ans_pkt);
- tsx_uac_unlock(tsx);
- }
- static void tsx_uac_process_errs(sp_tsx_uac_t *tsx, int error)
- {
- int trigger = 0;
- sp_tsx_uac_inc_ref(tsx);
- tsx_uac_lock(tsx);
- if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) {
- trigger++;
- if (tsx->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1);
- tsx->op_timer = NULL;
- }
- }
- if (tsx->state != SP_TSX_UAC_STATE_TERM)
- tsx->state = SP_TSX_UAC_STATE_ERROR;
- if (trigger)
- tsx_uac_trigger(tsx, error, 1, NULL);
- tsx_uac_unlock(tsx);
- sp_tsx_uac_dec_ref(tsx);
- }
- int sp_tsx_uac_create(sp_ses_uac_t *uac, int tsx_id, int method_id, int method_sig, sp_tsx_uac_callback *cb, sp_tsx_uac_t **p_tsx)
- {
- sp_tsx_uac_t *tsx;
- unsigned int slot;
- if (!cb || !cb->on_ans)
- return Error_Param;
- switch ( uac->state ) {
- case SP_SES_STATE_INIT:
- case SP_SES_STATE_CONNECTING:
- return Error_NotInit;
- case SP_SES_STATE_TERM:
- case SP_SES_STATE_ERROR:
- return Error_NetBroken;
- }
- tsx = MALLOC_T(sp_tsx_uac_t);
- tsx->state = SP_TSX_UAC_STATE_INIT;
- tsx->tsx_id = tsx_id;
- tsx->method_id = method_id;
- tsx->method_sig = method_sig;
- tsx->uac = uac;
- tsx->op_timer = NULL;
- memcpy(&tsx->cb, cb, sizeof(sp_tsx_uac_callback));
- spinlock_init(&tsx->lock);
- INIT_HLIST_NODE(&tsx->hentry);
- REF_COUNT_INIT(&tsx->ref_cnt);
- tsx->strand = strand_create();
- sp_ses_uac_inc_ref(uac);
- sp_tsx_uac_inc_ref(tsx);
- uac_lock(uac);
- list_add_tail(&tsx->entry, &uac->tsx_list);
- uac->tsx_cnt++;
- uac_unlock(uac);
- slot = jhash_3words(uac->remote_svc_id, uac->conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
- sp_tsx_uac_inc_ref(tsx);
- mgr_lock(uac->mgr);
- hlist_add_head(&tsx->hentry, &uac->mgr->uac_tsx_buckets[slot]);
- mgr_unlock(uac->mgr);
- *p_tsx = tsx;
- sp_dbg_info("tsx uac created!");
- return 0;
- }
- void sp_tsx_uac_destroy(sp_tsx_uac_t *tsx)
- {
- sp_ses_uac_t *uac = tsx->uac;
- assert(tsx->state == SP_TSX_UAC_STATE_TERM);
- uac_lock(uac);
- uac->tsx_cnt--;
- list_del_init(&tsx->entry);
- uac_unlock(uac);
- sp_tsx_uac_dec_ref(tsx);
- mgr_lock(uac->mgr);
- hlist_del_init(&tsx->hentry);
- mgr_unlock(uac->mgr);
- sp_tsx_uac_dec_ref(tsx);
- sp_tsx_uac_dec_ref(tsx);
- }
- int sp_tsx_uac_async_req(sp_tsx_uac_t *tsx, int timeout, iobuffer_t **req_pkt)
- {
- int rc;
- sp_ses_uac_t *uac;
- if (tsx->state != SP_TSX_UAC_STATE_INIT)
- return Error_Unexpect;
- uac = tsx->uac;
- if (uac->state != SP_SES_STATE_CONNECTED)
- return Error_NetBroken;
- tsx_uac_lock(tsx);
- if (tsx->state == SP_TSX_UAC_STATE_INIT) {
- sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc);
- if (rsn_ctx) {
- iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- unsigned __int64 rsn = 0;
- int t = 0;
- iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &t, 0);
- }
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &timeout, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->method_sig, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->method_id, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->tsx_id, 0);
- sp_tsx_uac_inc_ref(tsx); // @1
- rc = sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_REQ, uac->conn_id, req_pkt);
- if (rc == 0) {
- if (timeout >= 0) {
- tsx->op_timer = MALLOC_T(timer_entry);
- tsx->op_timer->cb = &tsx_uac_on_timer;
- tsx->op_timer->user_data = tsx;
- sp_tsx_uac_inc_ref(tsx); // @2
- rc = sp_iom_schedule_timer(sp_svc_get_iom(uac->mgr->svc), tsx->op_timer, (unsigned int)timeout);
- if (rc != 0) {
- sp_tsx_uac_dec_ref(tsx); // @2
- sp_tsx_uac_dec_ref(tsx); // @1
- free(tsx->op_timer);
- tsx->op_timer = NULL;
- sp_dbg_warn("uac tsx async_req, create timer failed!");
- }
- }
- } else {
- sp_tsx_uac_dec_ref(tsx); // @1
- sp_dbg_warn("tsx uac issue async req failed, may be remote party offline!");
- rc = Error_IO;
- }
- } else {
- rc = Error_Unexpect;
- sp_dbg_warn("tsx uac state is not INIT!");
- }
- tsx->state = rc ? SP_TSX_UAC_STATE_ERROR : SP_TSX_UAC_STATE_REQ;
- tsx_uac_unlock(tsx);
- return rc;
- }
- int sp_tsx_uac_close(sp_tsx_uac_t *tsx)
- {
- int trigger = 0;
- tsx_uac_lock(tsx);
- if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) {
- trigger++;
- if (tsx->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1);
- tsx->op_timer = NULL;
- }
- }
- if (tsx->state != SP_TSX_UAC_STATE_TERM)
- tsx->state = SP_TSX_UAC_STATE_TERM;
- if (trigger)
- tsx_uac_trigger(tsx, Error_Closed, 1, NULL);
- tsx_uac_unlock(tsx);
- return 0;
- }
- sp_ses_uac_t *sp_tsx_uac_get_session(sp_tsx_uac_t *tsx)
- {
- return tsx->uac;
- }
- int sp_tsx_uac_get_state(sp_tsx_uac_t *tsx)
- {
- return tsx->state;
- }
- int sp_tsx_uac_get_id(sp_tsx_uac_t *tsx)
- {
- return tsx->tsx_id;
- }
- static void __sp_tsx_uac_destroy(sp_tsx_uac_t *tsx)
- {
- if (tsx->cb.on_destroy)
- tsx->cb.on_destroy(tsx, tsx->cb.user_data);
- sp_ses_uac_dec_ref(tsx->uac);
- strand_destroy(tsx->strand);
- free(tsx);
- sp_dbg_info("tsx uac destroyed!");
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_tsx_uac, sp_tsx_uac_t, ref_cnt, __sp_tsx_uac_destroy)
- static __inline void uas_lock(sp_ses_uas_t *uas)
- {
- spinlock_enter(&uas->lock, -1);
- }
- static __inline void uas_unlock(sp_ses_uas_t *uas)
- {
- spinlock_leave(&uas->lock);
- }
- static const void* method_strand_getkey(const void *obj)
- {
- method_strand_t *ms = (method_strand_t *)obj;
- return (const void*)ms->type;
- }
- static unsigned int method_strand_hash(const void *key)
- {
- return (unsigned int)key;
- }
- static int method_strand_cmpkey(const void *key_x, const void *key_y)
- {
- return (int)key_x - (int)key_y;
- }
- static __inline method_strand_t* create_method_strand(int type)
- {
- method_strand_t *ms = MALLOC_T(method_strand_t);
- ms->strand = strand_create();
- ms->type = type;
- INIT_HLIST_NODE(&ms->node);
- return ms;
- }
- static __inline void delete_method_strand(method_strand_t *ms)
- {
- strand_destroy(ms->strand);
- free(ms);
- }
- static void uas_process_errc(sp_ses_uas_t *uas, int error)
- {
- int trigger = 0;
- uas_lock(uas);
- if (uas->state == SP_SES_STATE_INIT) {
- trigger++;
- uas->state = SP_SES_STATE_ERROR;
- uas->state_begin_time = y2k_time_now();
- uas->error = error;
- } else if (uas->state == SP_SES_STATE_CONNECTED) {
- trigger++;
- uas->state = SP_SES_STATE_ERROR;
- uas->state_begin_time = y2k_time_now();
- uas->error = error;
- }
- uas_unlock(uas);
- if (trigger)
- uas_trigger(uas, error);
- }
- static void __threadpool_uas_trigger(threadpool_t *threadpool, void *arg, int param1, int param2)
- {
- sp_ses_uas_t *uas = (sp_ses_uas_t*)arg;
- sp_ses_mgr_t *mgr = uas->mgr;
- int error = param1;
- sp_uid_t rsn = sp_svc_new_runserial(mgr->svc);
- sp_rsn_context_t rsn_ctx;
- sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_FRAMEWORK, &rsn_ctx);
- sp_svc_push_runserial_context(mgr->svc, &rsn_ctx);
- if (uas->cb.on_close) {
- if (error != Error_Closed) {
- sp_ses_uas_close(uas);
- }
- uas->cb.on_close(uas, error, uas->cb.user_data);
- }
- sp_ses_uas_dec_ref(uas); // @
- sp_svc_pop_runserial_context(mgr->svc);
- }
- static void uas_trigger(sp_ses_uas_t *uas, int error)
- {
- int rc;
- sp_ses_uas_inc_ref(uas); // @
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), uas->strand, &__threadpool_uas_trigger, uas, error, 0);
- if (rc != 0) {
- sp_ses_uas_dec_ref(uas); // @
- sp_dbg_warn("uas_trigger, queue work item failed!");
- }
- }
- static strand_t *uas_get_strand(sp_ses_uas_t *uas, int method_id)
- {
- method_strand_t *ms;
- uas_lock(uas);
- ms = hashset_find(uas->method_strand, (const void *)method_id);
- if (!ms) {
- ms = create_method_strand(method_id);
- hashset_add(uas->method_strand, ms);
- }
- uas_unlock(uas);
- return ms->strand;
- }
- static void __threadpool_uas_process_info(threadpool_t *threadpool, void *arg, int param1, int param2)
- {
- sp_ses_uas_t *uas = (sp_ses_uas_t*)arg;
- sp_ses_mgr_t *mgr = uas->mgr;
- int method_id = param1;
- int method_sig;
- iobuffer_t *info_pkt = (iobuffer_t*)param2;
- iobuffer_read(info_pkt, IOBUF_T_I4, &method_sig, 0);
- if (uas->state == SP_SES_STATE_CONNECTED ||
- (uas->state == SP_SES_STATE_ERROR && uas->error == Error_PeerClose)) {
- sp_rsn_context_t from_rsn;
- sp_rsn_context_t rsn;
- iobuffer_read(info_pkt, IOBUF_T_I4, &from_rsn.depth, 0);
- iobuffer_read(info_pkt, IOBUF_T_I4, &from_rsn.original_type, 0);
- iobuffer_read(info_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0);
- iobuffer_read(info_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0);
- sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn);
- sp_svc_push_runserial_context(mgr->svc, &rsn);
- uas->cb.on_info(uas, method_id, method_sig, &info_pkt, uas->cb.user_data);
- sp_svc_pop_runserial_context(mgr->svc);
- }
- if (info_pkt)
- iobuffer_dec_ref(info_pkt);
- sp_ses_uas_dec_ref(uas);//@
- }
- static strand_t *uas_decide_strand(sp_ses_uas_t *uas, int method_id, int overlap)
- {
- strand_t *strand = NULL;
- if (uas->strand) {
- if (overlap) {
- strand = NULL;
- } else {
- strand = uas->strand;
- }
- } else {
- if (!overlap) {
- strand = uas_get_strand(uas, method_id);
- }
- }
- return strand;
- }
- static void uas_process_info(sp_ses_uas_t *uas, int method_id, int method_sig, iobuffer_t **info_pkt)
- {
- int rc;
- int overlap;
- rc = (*uas->cb.get_method_attr)(uas, method_id, method_sig, &overlap, uas->cb.user_data);
- if (rc == 0) {
- strand_t *strand = uas_decide_strand(uas, method_id, overlap);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_sig, 0);
- sp_ses_uas_inc_ref(uas); //@
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), strand, &__threadpool_uas_process_info, uas, method_id, (int)*info_pkt);
- if (rc != 0) {
- sp_dbg_warn("process info queue work item failed!");
- sp_ses_uas_dec_ref(uas); //@
- } else {
- *info_pkt = NULL;
- }
- } else {
- //sp_dbg_warn("method %d may be changed!", method_id);
- sp_dbg_error("method signature not match: method_id=%d, method_sig=%d, error=%d", method_id, method_sig, rc);
- }
- }
- static void uas_process_req_reply_error(sp_ses_uas_t *uas, int tsx_id, int err)
- {
- int v = 0;
- iobuffer_t *ans_pkt = iobuffer_create(-1, -1);
- assert(err != 0);
- iobuffer_write(ans_pkt, IOBUF_T_I4, &err, 0); // sys error
-
- iobuffer_write(ans_pkt, IOBUF_T_I4, &v, 0); // user error
- iobuffer_write_head(ans_pkt, IOBUF_T_I4, &tsx_id, 0);
- sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_ANS, uas->conn_id, &ans_pkt);
- if (ans_pkt)
- iobuffer_dec_ref(ans_pkt);
- }
- static void __threadpool_uas_process_req(threadpool_t *threadpool, void *arg, int param1, int param2)
- {
- sp_ses_uas_t *uas = (sp_ses_uas_t*)arg;
- sp_ses_mgr_t *mgr = uas->mgr;
- int tsx_id = param1;
- int method_id;
- int method_sig;
- int timeout;
- iobuffer_t *req_pkt = (iobuffer_t*)param2;
- iobuffer_read(req_pkt, IOBUF_T_I4, &method_id, 0);
- iobuffer_read(req_pkt, IOBUF_T_I4, &method_sig, 0);
- iobuffer_read(req_pkt, IOBUF_T_I4, &timeout, 0);
- if (uas->state == SP_SES_STATE_CONNECTED) {
- sp_rsn_context_t from_rsn;
- sp_rsn_context_t rsn;
- iobuffer_read(req_pkt, IOBUF_T_I4, &from_rsn.depth, 0);
- iobuffer_read(req_pkt, IOBUF_T_I4, &from_rsn.original_type, 0);
- iobuffer_read(req_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0);
- iobuffer_read(req_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0);
- sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn);
- sp_svc_push_runserial_context(mgr->svc, &rsn);
- uas->cb.on_req(uas, tsx_id, method_id, method_sig, timeout, &req_pkt, uas->cb.user_data);
- sp_svc_pop_runserial_context(mgr->svc);
- } else {
- uas_process_req_reply_error(uas, tsx_id, Error_InvalidState);
- }
- if (req_pkt)
- iobuffer_dec_ref(req_pkt);
- sp_ses_uas_dec_ref(uas); //@
- }
- static void uas_process_req(sp_ses_uas_t *uas, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt)
- {
- int rc;
- int overlap;
- rc = uas->cb.get_method_attr(uas, method_id, method_sig, &overlap, uas->cb.user_data);
- if (rc == 0) {
- strand_t *strand = uas_decide_strand(uas, method_id, overlap);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &timeout, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &method_sig, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &method_id, 0);
- sp_ses_uas_inc_ref(uas); //@
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), strand, &__threadpool_uas_process_req, uas, tsx_id, (int)*req_pkt);
- if (rc != 0) {
- uas_process_req_reply_error(uas, tsx_id, rc);
- sp_dbg_warn("process req queue work item failed!");
- sp_ses_uas_dec_ref(uas); //@
- } else {
- *req_pkt = NULL;
- }
- } else {
- uas_process_req_reply_error(uas, tsx_id, rc);
- sp_dbg_error("method signature not match: tsx_id=%d, method_id=%d, method_sig=%d, error=%d", tsx_id, method_id, method_sig, rc);
- }
- }
- int sp_ses_uas_create(sp_ses_mgr_t *mgr, int remote_epid, int remote_svc_id, int conn_id, int overlap, sp_ses_uas_callback *cb, sp_ses_uas_t **p_uas)
- {
- unsigned int slot;
- sp_ses_uas_t *uas;
- if (!cb || !cb->on_req || !cb->on_info)
- return Error_Param;
- uas = MALLOC_T(sp_ses_uas_t);
- uas->mgr = mgr;
- uas->conn_id = conn_id;
- uas->remote_epid = remote_epid;
- uas->remote_svc_id = remote_svc_id;
- spinlock_init(&uas->lock);
- INIT_HLIST_NODE(&uas->hentry);
- memcpy(&uas->cb, cb, sizeof(sp_ses_uas_callback));
- uas->state = SP_SES_STATE_INIT;
- uas->state_begin_time = y2k_time_now();
- uas->error = 0;
- uas->tsx_cnt = 0;
- uas->begin_time = y2k_time_now();
- uas->strand = overlap ? NULL : strand_create();
- INIT_LIST_HEAD(&uas->tsx_list);
- REF_COUNT_INIT(&uas->ref_cnt);
- uas->method_strand = overlap ? hashset_create(offset_of(method_strand_t, node), &method_strand_getkey, &method_strand_hash, &method_strand_cmpkey) : NULL;
- sp_ses_mgr_inc_ref(mgr);
- slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE;
- sp_ses_uas_inc_ref(uas);
- mgr_lock(mgr);
- hlist_add_head(&uas->hentry, &mgr->uas_buckets[slot]);
- mgr->ses_cnt++;
- mgr_unlock(mgr);
- *p_uas = uas;
- sp_dbg_info("ses uas created!");
- return 0;
- }
- void sp_ses_uas_destroy(sp_ses_uas_t *uas)
- {
- assert(uas->state == SP_SES_STATE_TERM);
- mgr_lock(uas->mgr);
- uas->mgr->ses_cnt--;
- hlist_del_init(&uas->hentry);
- mgr_unlock(uas->mgr);
- sp_ses_uas_dec_ref(uas);
- sp_ses_uas_dec_ref(uas);
- }
- int sp_ses_uas_close(sp_ses_uas_t *uas)
- {
- int rc = 0;
- int trigger = 0;
- sp_ses_uas_inc_ref(uas);
- uas_lock(uas);
- if (uas->state == SP_SES_STATE_INIT) {
- uas->state = SP_SES_STATE_TERM;
- uas->state_begin_time = y2k_time_now();
- trigger++;
- } else if (uas->state == SP_SES_STATE_CONNECTED) {
- sp_tsx_uas_t *pos, *n;
- uas->state = SP_SES_STATE_TERM;
- uas->state_begin_time = y2k_time_now();
- trigger++;
- list_for_each_entry_safe(pos, n, &uas->tsx_list, sp_tsx_uas_t, entry) {
- sp_tsx_uas_close(pos);
- }
- } else if (uas->state == SP_SES_STATE_ERROR) {
- uas->state = SP_SES_STATE_TERM;
- uas->state_begin_time = y2k_time_now();
- } else if (uas->state == SP_SES_STATE_TERM) {
- rc = Error_Duplication;
- } else {
- rc = Error_Unexpect;
- }
- if (trigger)
- uas_trigger(uas, Error_Closed);
- uas_unlock(uas);
- if (rc == 0) {
- sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_FINS, uas->conn_id, NULL);
- }
-
- sp_ses_uas_dec_ref(uas);
- return rc;
- }
- int sp_ses_uas_get_remote_epid(sp_ses_uas_t *uas)
- {
- return uas->remote_epid;
- }
- int sp_ses_uas_get_remote_svc_id(sp_ses_uas_t *uas)
- {
- return uas->remote_svc_id;
- }
- int sp_ses_uas_get_conn_id(sp_ses_uas_t *uas)
- {
- return uas->conn_id;
- }
- sp_ses_mgr_t *sp_ses_uas_get_mgr(sp_ses_uas_t *uas)
- {
- return uas->mgr;
- }
- int sp_ses_uas_get_state(sp_ses_uas_t *uas)
- {
- return uas->state;
- }
- int sp_ses_uas_get_tsx_cnt(sp_ses_uas_t *uas)
- {
- return uas->tsx_cnt;
- }
- int sp_ses_uas_accept(sp_ses_uas_t *uas)
- {
- int rc;
- uas_lock(uas);
- if (uas->state == SP_SES_STATE_INIT) {
- uas->state = SP_SES_STATE_CONNECTED;
- uas->state_begin_time = y2k_time_now();
- rc = Error_Succeed;
- } else if (uas->state == SP_SES_STATE_CONNECTED) {
- rc = Error_Duplication;
- } else {
- rc = Error_NetBroken;
- }
- uas_unlock(uas);
- if (rc == 0) {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uas->mgr->svc);
- if (rsn_ctx) {
- iobuffer_write_head(pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write_head(pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- unsigned __int64 rsn = 0;
- int t = 0;
- iobuffer_write_head(pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
- }
- rc = sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_CONNACK, uas->conn_id, &pkt);
- if (pkt) {
- iobuffer_dec_ref(pkt);
- }
- }
- return rc;
- }
- static void __sp_ses_uas_destroy(sp_ses_uas_t *uas)
- {
- if (uas->cb.on_destroy) {
- uas->cb.on_destroy(uas, uas->cb.user_data);
- }
- sp_ses_mgr_dec_ref(uas->mgr);
- if (uas->strand)
- strand_destroy(uas->strand);
- if (uas->method_strand) {
- method_strand_t *tpos;
- struct hlist_node *pos, *n;
- int slot;
- hashset_for_each_safe(tpos, pos, n, slot, uas->method_strand, method_strand_t) {
- hashset_remove(uas->method_strand, (void*)tpos->type);
- delete_method_strand(tpos);
- }
- hashset_destroy(uas->method_strand);
- }
- free(uas);
- sp_dbg_info("ses uas destroy!");
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_uas, sp_ses_uas_t, ref_cnt, __sp_ses_uas_destroy)
- static __inline void tsx_uas_lock(sp_tsx_uas_t *tsx)
- {
- spinlock_enter(&tsx->lock, -1);
- }
- static __inline void tsx_uas_unlock(sp_tsx_uas_t *tsx)
- {
- spinlock_leave(&tsx->lock);
- }
- int sp_tsx_uas_create(sp_ses_uas_t *uas, int tsx_id, sp_tsx_uas_callback *cb, sp_tsx_uas_t **p_tsx)
- {
- sp_tsx_uas_t *tsx = MALLOC_T(sp_tsx_uas_t);
- unsigned int slot;
- tsx->state = SP_TSX_UAS_STATE_INIT;
- uas->state_begin_time = y2k_time_now();
- tsx->uas = uas;
- tsx->tsx_id = tsx_id;
- spinlock_init(&tsx->lock);
- memcpy(&tsx->cb, cb, sizeof(sp_tsx_uas_callback));
- INIT_HLIST_NODE(&tsx->hentry);
- REF_COUNT_INIT(&tsx->ref_cnt);
- sp_ses_uas_inc_ref(uas);
- uas_lock(uas);
- sp_tsx_uas_inc_ref(tsx);
- list_add_tail(&tsx->entry, &uas->tsx_list);
- uas->tsx_cnt++;
- uas_unlock(uas);
- slot = jhash_3words(uas->remote_svc_id, uas->conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
- mgr_lock(uas->mgr);
- sp_tsx_uas_inc_ref(tsx);
- hlist_add_head(&tsx->hentry, &uas->mgr->uas_tsx_buckets[slot]);
- mgr_unlock(uas->mgr);
- *p_tsx = tsx;
- sp_dbg_info("tsx uas created!");
- return 0;
- }
- void sp_tsx_uas_destroy(sp_tsx_uas_t *tsx)
- {
- sp_ses_uas_t *uas = tsx->uas;
- assert(tsx->state == SP_TSX_UAS_STATE_TERM);
- uas_lock(uas);
- uas->tsx_cnt--;
- list_del_init(&tsx->entry);
- uas_unlock(uas);
- sp_tsx_uas_dec_ref(tsx);
- mgr_lock(uas->mgr);
- hlist_del_init(&tsx->hentry);
- mgr_unlock(uas->mgr);
- sp_tsx_uas_dec_ref(tsx);
- sp_tsx_uas_dec_ref(tsx);
- }
- sp_ses_uas_t *sp_tsx_uas_get_session(sp_tsx_uas_t *tsx)
- {
- return tsx->uas;
- }
- int sp_tsx_uas_get_state(sp_tsx_uas_t *tsx)
- {
- return tsx->state;
- }
- int sp_tsx_uas_get_id(sp_tsx_uas_t *tsx)
- {
- return tsx->tsx_id;
- }
- int sp_tsx_uas_close(sp_tsx_uas_t *tsx)
- {
- int rc;
- tsx_uas_lock(tsx);
- if (tsx->state != SP_TSX_UAS_STATE_TERM) {
- tsx->state = SP_TSX_UAS_STATE_TERM;
- rc = 0;
- } else {
- rc = Error_Duplication;
- }
- tsx_uas_unlock(tsx);
- return rc;
- }
- int sp_tsx_uas_answer(sp_tsx_uas_t *tsx, int end, iobuffer_t **ans_pkt)
- {
- int rc;
- int trigger = 0;
- tsx_uas_lock(tsx);
- if (tsx->state == SP_TSX_UAS_STATE_INIT) {
- trigger++;
- rc = 0;
- tsx->state = end ? SP_TSX_UAS_STATE_ANS : SP_TSX_UAS_STATE_TMPANS;
- } else if (tsx->state == SP_TSX_UAS_STATE_TMPANS) {
- trigger++;
- rc = 0;
- tsx->state = end ? SP_TSX_UAS_STATE_ANS : SP_TSX_UAS_STATE_TMPANS;
- } else {
- rc = Error_Bug;
- }
- tsx_uas_unlock(tsx);
- if (trigger) {
- iobuffer_write_head(*ans_pkt, IOBUF_T_I4, &tsx->tsx_id, 0);
- rc = sp_svc_post(tsx->uas->mgr->svc, tsx->uas->remote_epid, tsx->uas->remote_svc_id, SP_PKT_SES | (end ? SES_CMD_ANS : SES_CMD_TMPANS), tsx->uas->conn_id, ans_pkt);
- }
- return rc;
- }
- static void __sp_tsx_uas_destroy(sp_tsx_uas_t *tsx)
- {
- if (tsx->cb.on_destroy)
- tsx->cb.on_destroy(tsx, tsx->cb.user_data);
- sp_ses_uas_dec_ref(tsx->uas);
- free(tsx);
- sp_dbg_info("tsx uas destroyed!");
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_tsx_uas, sp_tsx_uas_t, ref_cnt, __sp_tsx_uas_destroy)
|