12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006 |
- #include "precompile.h"
- #include "sp_ses.h"
- #include "sp_def.h"
- #include "sp_svc.h"
- #include "sp_dbg_export.h"
- #include "sp_mod.h"
- #include "sp_env.h"
- #include "SpBase.h"
- #include "memutil.h"
- #include "refcnt.h"
- #include "list.h"
- #include "jhash.h"
- #include "hashset.h"
- #include "spinlock.h"
- #include "sp_logwithlinkforc.h"
- #include "dbgutil.h"
- #define TAG SPBASE_TAG("sp_ses")
- #define CONN_BUCKET_SIZE 127
- #define TSX_BUCKET_SIZE 511
- #define MAX_REDIRECT 10
- #define SES_CMD_REQ 0 /* uac -> uas two way call */
- #define SES_CMD_TMPANS 1 /* uas -> uac middle response */
- #define SES_CMD_ANS 2 /* uas -> uac the end response */
- #define SES_CMD_INFO 3 /* uac -> uas one way call */
- #define SES_CMD_CONNACK 4 /* uas -> uac */
- #define SES_CMD_FINC 5 /* uac -> uas session close */
- #define SES_CMD_FINS 6 /* uas -> uac session close */
- #define SES_CMD_CONN 7 /* uac -> uas */
- #define SES_CMD_ERRC 8 /* uac -> uas, session broken */
- #define SES_CMD_ERRS 9 /* uas -> uac, session broken */
- #define SES_CMD_REDIRECT 10 /* uas -> uac, session redirect */
- struct sp_ses_mgr_t
- {
- struct hlist_head uac_buckets[CONN_BUCKET_SIZE];
- struct hlist_head uas_buckets[CONN_BUCKET_SIZE];
- struct hlist_head uac_tsx_buckets[TSX_BUCKET_SIZE];
- struct hlist_head uas_tsx_buckets[TSX_BUCKET_SIZE];
- sp_ses_mgr_callback_t cb;
- sp_svc_t *svc;
- int ses_cnt;
- CRITICAL_SECTION lock;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_ses_mgr, sp_ses_mgr_t)
- struct sp_ses_uac_t
- {
- struct hlist_node hentry; // element of mgr->uac_buckets
- sp_ses_uac_callback cb;
- sp_ses_mgr_t *mgr;
- int remote_epid;
- int remote_svc_id;
- unsigned int conn_id;
- int state;
- int error;
- int tsx_cnt;
- int redirect_cnt;
- y2k_time_t begin_time;
- y2k_time_t state_begin_time;
- spinlock_t lock;
- timer_entry *op_timer;
- struct list_head tsx_list; // list of sp_tsx_uac_t->entry
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_ses_uac, sp_ses_uac_t)
- struct sp_tsx_uac_t
- {
- struct hlist_node hentry; // element of mgr->uac_tsx_buckets
- struct list_head entry; // element of sp_ses_uac_t->tsx_list
- int state;
- int tsx_id;
- int method_id;
- int method_sig;
- spinlock_t lock;
- sp_tsx_uac_callback cb;
- sp_ses_uac_t *uac;
- timer_entry *op_timer;
- strand_t *strand;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_tsx_uac, sp_tsx_uac_t)
- typedef struct method_strand_t {
- int type;
- strand_t *strand;
- struct hlist_node node;
- }method_strand_t;
- struct sp_ses_uas_t
- {
- struct hlist_node hentry; // element of mgr->uas_buckets
- sp_ses_uas_callback cb;
- sp_ses_mgr_t *mgr;
- int remote_epid;
- int remote_svc_id;
- int conn_id;
- int state;
- int error;
- int tsx_cnt;
- y2k_time_t begin_time;
- y2k_time_t state_begin_time;
- spinlock_t lock;
- strand_t *strand;
- hashset_t *method_strand;
- struct list_head tsx_list; // list of sp_tsx_uas_t->entry
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_ses_uas, sp_ses_uas_t)
- struct sp_tsx_uas_t
- {
- struct hlist_node hentry; // element of mgr->uas_tsx_buckets
- struct list_head entry; // element of sp_ses_uas_t->tsx_list
- int state;
- int tsx_id;
- spinlock_t lock;
- sp_tsx_uas_callback cb;
- sp_ses_uas_t *uas;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_tsx_uas, sp_tsx_uas_t)
- static void uac_process_connack(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id);
- static void uac_process_redirect(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt);
- static void tsx_uac_process_ans(sp_tsx_uac_t *tsx, int end, iobuffer_t **ans_pkt);
- static void uac_process_errs(sp_ses_uac_t *uac, int error);
- static void uas_process_errc(sp_ses_uas_t *uas, int error);
- static void uas_process_info(sp_ses_uas_t *uas, int method_id, int method_sig, iobuffer_t **info_pkt);
- static void uas_process_req(sp_ses_uas_t *uas, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt);
- static void uac_trigger(sp_ses_uac_t *uac, int error, int connect);
- static void tsx_uac_process_errs(sp_tsx_uac_t *tsx, int error);
- static void uas_trigger(sp_ses_uas_t *uas, int error);
- static __inline void mgr_lock(sp_ses_mgr_t *mgr)
- {
- EnterCriticalSection(&mgr->lock);
- }
- static __inline void mgr_unlock(sp_ses_mgr_t *mgr)
- {
- LeaveCriticalSection(&mgr->lock);
- }
- static sp_ses_uac_t *mgr_find_uac(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- int slot;
- sp_ses_uac_t *tpos;
- struct hlist_node *pos;
- slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE;
- hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[slot], sp_ses_uac_t, hentry) {
- if (tpos->conn_id == conn_id)
- return tpos;
- }
- return NULL;
- }
- static sp_ses_uas_t *mgr_find_uas(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- int slot;
- sp_ses_uas_t *tpos;
- struct hlist_node *pos;
- slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE;
- hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[slot], sp_ses_uas_t, hentry) {
- if (tpos->conn_id == conn_id)
- return tpos;
- }
- return NULL;
- }
- sp_ses_info_t* sp_ses_mgr_get_ses_info(sp_ses_mgr_t *mgr, int *cnt)
- {
- sp_ses_info_t* ret = NULL;
- int index = 0;
- int i = 0;
-
- *cnt = mgr->ses_cnt;
-
- if (*cnt <=0)
- return NULL;
- ret = (sp_ses_info_t*) malloc(sizeof(sp_ses_info_t)*(*cnt));
- for(i=0; i<CONN_BUCKET_SIZE; i++)
- {
- sp_ses_uac_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[i], sp_ses_uac_t, hentry)
- {
- ret[index].from_svc_id = sp_svc_get_id(mgr->svc);
- ret[index].to_svc_id = tpos->remote_svc_id;
- ret[index].begin_time = tpos->begin_time;
- ret[index].state_begin_time = tpos->state_begin_time;
- ret[index].state = tpos->state == SP_SES_STATE_CONNECTED ? 1 : 3;
- index++;
- }
- }
- for(i=0; i<CONN_BUCKET_SIZE; i++)
- {
- sp_ses_uas_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[i], sp_ses_uas_t, hentry)
- {
- ret[index].from_svc_id = tpos->remote_svc_id;
- ret[index].to_svc_id = sp_svc_get_id(mgr->svc);
- ret[index].begin_time = tpos->begin_time;
- ret[index].state_begin_time = tpos->state_begin_time;
- ret[index].state = tpos->state == SP_SES_STATE_CONNECTED ? 1 : 3;
- index++;
- }
- }
- TOOLKIT_ASSERT(index == *cnt);
- return ret;
- }
- static sp_tsx_uac_t *mgr_find_tsx_uac(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id)
- {
- int slot;
- sp_tsx_uac_t *tpos;
- struct hlist_node *pos;
- slot = jhash_3words(svc_id, conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
- hlist_for_each_entry(tpos, pos, &mgr->uac_tsx_buckets[slot], sp_tsx_uac_t, hentry) {
- if (tpos->uac->remote_svc_id == svc_id && tpos->uac->conn_id == conn_id && tpos->tsx_id == tsx_id)
- return tpos;
- }
- return NULL;
- }
- static sp_tsx_uas_t *mgr_find_tsx_uas(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id)
- {
- int slot;
- sp_tsx_uas_t *tpos;
- struct hlist_node *pos;
- slot = jhash_3words(svc_id, conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
- hlist_for_each_entry(tpos, pos, &mgr->uas_tsx_buckets[slot], sp_tsx_uas_t, hentry) {
- if (tpos->uas->remote_svc_id == svc_id && tpos->uas->conn_id == conn_id && tpos->tsx_id == tsx_id)
- return tpos;
- }
- return NULL;
- }
- static void mgr_process_connack(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- sp_ses_uac_t *uac;
- mgr_lock(mgr);
- uac = mgr_find_uac(mgr, epid, svc_id, conn_id);
- if (uac)
- sp_ses_uac_inc_ref(uac); // lock
- mgr_unlock(mgr);
- if (!uac) {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "connack find no peer!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL);
- return;
- }
- uac_process_connack(uac, epid, svc_id, conn_id);
- sp_ses_uac_dec_ref(uac); // unlock
- }
- static void mgr_process_redirect(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt)
- {
- sp_ses_uac_t *uac;
- mgr_lock(mgr);
- uac = mgr_find_uac(mgr, epid, svc_id, conn_id);
- if (uac)
- sp_ses_uac_inc_ref(uac); // lock
- mgr_unlock(mgr);
- if (!uac) {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "redirect cannot find uac!");
- return;
- }
- uac_process_redirect(uac, epid, svc_id, conn_id, redirect_ent_id, p_pkt);
- sp_ses_uac_dec_ref(uac); // unlock
- }
- static void mgr_process_ans(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int end, int tsx_id, iobuffer_t **ans_pkt)
- {
- sp_tsx_uac_t *tsx;
- mgr_lock(mgr);
- tsx = mgr_find_tsx_uac(mgr, epid, svc_id, conn_id, tsx_id);
- if (tsx)
- sp_tsx_uac_inc_ref(tsx); // lock
- mgr_unlock(mgr);
- if (!tsx) {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "ack find no peer!");
- return;
- }
- tsx_uac_process_ans(tsx, end, ans_pkt);
- sp_tsx_uac_dec_ref(tsx); // unlock
- }
- static void mgr_process_errs(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int error)
- {
- sp_ses_uac_t *uac;
- mgr_lock(mgr);
- uac = mgr_find_uac(mgr, epid, svc_id, conn_id);
- if (uac)
- sp_ses_uac_inc_ref(uac); // @ lock uac no delete
- mgr_unlock(mgr);
- if (uac) {
- uac_process_errs(uac, error);
- sp_ses_uac_dec_ref(uac); // @ unlock
- }
- }
- static void mgr_process_errc(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int error)
- {
- sp_ses_uas_t *uas;
- mgr_lock(mgr);
- uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
- if (uas)
- sp_ses_uas_inc_ref(uas); // @ lock uac no delete
- mgr_unlock(mgr);
- if (uas) {
- uas_process_errc(uas, error);
- sp_ses_uas_dec_ref(uas); // @ unlock
- }
- }
- static void mgr_process_fins(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- mgr_process_errs(mgr, epid, svc_id, conn_id, Error_PeerClose);
- }
- static void __threadpool_mgr_process_conn(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
- {
- sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)arg;
- iobuffer_t *conn_pkt = (iobuffer_t*)param1;
- int epid;
- int svc_id;
- int conn_id;
- int redirect_target = 0;
- int rc;
- int read_state;
- sp_rsn_context_t from_rsn;
- sp_rsn_context_t rsn;
- iobuffer_read(conn_pkt, IOBUF_T_I4, &epid, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I4, &conn_id, 0);
- read_state = iobuffer_get_read_state(conn_pkt);
- WLog_DBG(TAG, "begin on_accept, from epid:%d, svc_id:%d, conn_id:%d", epid, svc_id, conn_id);
- iobuffer_read(conn_pkt, IOBUF_T_I4, &from_rsn.depth, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I4, &from_rsn.original_type, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0);
- iobuffer_read(conn_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0);
- sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn);
- sp_svc_push_runserial_context(mgr->svc, &rsn);
- rc = mgr->cb.on_accept(mgr, epid, svc_id, conn_id, &conn_pkt, &redirect_target, mgr->cb.user_data);
- sp_svc_pop_runserial_context(mgr->svc);
- if (rc) {
- if (redirect_target) {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "conn from svc_id %d redirect to %d!", svc_id, redirect_target);
- iobuffer_restore_read_state(conn_pkt, read_state);
- iobuffer_write_head(conn_pkt, IOBUF_T_I4, &redirect_target, 0);
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_REDIRECT, conn_id, &conn_pkt);
- } else {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "conn from svc_id %d reject! rc: %d", svc_id, rc);
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- }
- } else {
- DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "on accept ok!");
- }
- if (conn_pkt) {
- iobuffer_dec_ref(conn_pkt);
- }
- sp_ses_mgr_dec_ref(mgr); // @
- }
- static void mgr_process_conn(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, iobuffer_t **conn_pkt)
- {
- sp_ses_uas_t *uas;
- int err;
- sp_entity_t *ent;
- mgr_lock(mgr);
- uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
- mgr_unlock(mgr);
- ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, sp_svc_get_id(mgr->svc));
- if (ent) {
- if (ent->state == EntityState_Lost) {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_Losted;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "reject connection because of lost state!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- }
- } else {
- TOOLKIT_ASSERT(0);// never go here
- }
- if (uas) { // already exist, duplicate
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_Duplication;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "conn duplicate!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- }
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &conn_id, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &epid, 0);
- sp_ses_mgr_inc_ref(mgr); // @
- err = threadpool_queue_workitem2(sp_svc_get_threadpool(mgr->svc), NULL, &__threadpool_mgr_process_conn, mgr, (param_size_t)*conn_pkt, 0);
- if (err)
- {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_PeerReject;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- sp_ses_mgr_dec_ref(mgr); // @
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "conn from svc_id %d reject!", svc_id);
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- } else {
- *conn_pkt = NULL;
- }
- }
- static void mgr_process_info(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt)
- {
- sp_ses_uas_t *uas;
- mgr_lock(mgr);
- uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
- if (uas) {
- sp_ses_uas_inc_ref(uas); // @
- }
- mgr_unlock(mgr);
- if (!uas) {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_NotExist;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "cannot find uas session!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- }
- uas_process_info(uas, method_id, method_sig, info_pkt);
- sp_ses_uas_dec_ref(uas); // @
- }
- static void mgr_process_req(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt)
- {
- sp_ses_uas_t *uas;
- mgr_lock(mgr);
- uas = mgr_find_uas(mgr, epid, svc_id, conn_id);
- if (uas) {
- sp_ses_uas_inc_ref(uas); // @
- }
- mgr_unlock(mgr);
- if (!uas) {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- int rc = Error_NotExist;
- iobuffer_write(pkt, IOBUF_T_I4, &rc, 0);
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "cannot find uas session!");
- sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRS, conn_id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- return;
- }
- uas_process_req(uas, tsx_id, method_id, method_sig, timeout, req_pkt);
- sp_ses_uas_dec_ref(uas); // @
- }
- static void mgr_process_finc(sp_ses_mgr_t *mgr, int epid, int svc_id, int conn_id)
- {
- mgr_process_errc(mgr, epid, svc_id, conn_id, Error_PeerClose);
- }
- static int mgr_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
- {
- sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)user_data;
- int cmd_type;
- int tsx_id;
- int method_id;
- int method_sig;
- int timeout;
- int redirect_ent_id;
- int rc = 0;
- WLog_DBG(TAG, "sp_ses::mgr_on_pkt: epid:%d, svc_id: %d, pkt_type:0x%08X, pkt_id: %d, mgr:%d", epid, svc_id, pkt_type, pkt_id, SP_GET_TYPE(pkt_type));
- TOOLKIT_ASSERT(SP_GET_PKT_TYPE(pkt_type) == SP_PKT_SES);
- cmd_type = SP_GET_TYPE(pkt_type);
- switch (cmd_type) {
- case SES_CMD_INFO:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &method_id, NULL);
- iobuffer_read(*p_pkt, IOBUF_T_I4, &method_sig, NULL);
- mgr_process_info(mgr, epid, svc_id, pkt_id, method_id, method_sig, p_pkt);
- break;
- case SES_CMD_CONN:
- mgr_process_conn(mgr, epid, svc_id, pkt_id, p_pkt);
- break;
- case SES_CMD_CONNACK:
- mgr_process_connack(mgr, epid, svc_id, pkt_id);
- break;
- case SES_CMD_FINC:
- mgr_process_finc(mgr, epid, svc_id, pkt_id);
- break;
- case SES_CMD_FINS:
- mgr_process_fins(mgr, epid, svc_id, pkt_id);
- break;
- case SES_CMD_ERRC:
- mgr_process_errc(mgr, epid, svc_id, pkt_id, Error_NetBroken);
- break;
- case SES_CMD_ERRS:
- if (p_pkt != NULL && *p_pkt != NULL)
- {
- iobuffer_read(*p_pkt, IOBUF_T_I4, &rc, NULL);
- iobuffer_dec_ref(*p_pkt);
- *p_pkt = NULL;
- }
- else
- rc = Error_NetBroken;
- mgr_process_errs(mgr, epid, svc_id, pkt_id, rc);
- break;
- case SES_CMD_REQ:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL);
- iobuffer_read(*p_pkt, IOBUF_T_I4, &method_id, NULL);
- iobuffer_read(*p_pkt, IOBUF_T_I4, &method_sig, NULL);
- iobuffer_read(*p_pkt, IOBUF_T_I4, &timeout, NULL);
- mgr_process_req(mgr, epid, svc_id, pkt_id, tsx_id, method_id, method_sig, timeout, p_pkt);
- break;
- case SES_CMD_TMPANS:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL);
- mgr_process_ans(mgr, epid, svc_id, pkt_id, 0, tsx_id, p_pkt);
- break;
- case SES_CMD_ANS:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &tsx_id, NULL);
- mgr_process_ans(mgr, epid, svc_id, pkt_id, 1, tsx_id, p_pkt);
- break;
- case SES_CMD_REDIRECT:
- iobuffer_read(*p_pkt, IOBUF_T_I4, &redirect_ent_id, NULL);
- mgr_process_redirect(mgr, epid, svc_id, pkt_id, redirect_ent_id, p_pkt);
- break;
- default:
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "recv unknown ses pkt!");
- TOOLKIT_ASSERT(0);
- break;
- }
- return FALSE;
- }
- static void mgr_on_sys(sp_svc_t *svc,int epid, int state, void *user_data)
- {
- sp_ses_mgr_t *mgr = (sp_ses_mgr_t*)user_data;
- if (state == BUS_STATE_OFF) {
- int i;
- mgr_lock(mgr);
- for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
- sp_ses_uas_t *tpos;
- struct hlist_node *pos, *n;
- hlist_for_each_entry_safe(tpos, pos, n, &mgr->uas_buckets[i], sp_ses_uas_t, hentry) {
- if (epid == tpos->remote_epid) {
- uas_process_errc(tpos, Error_NetBroken);
- }
- }
- }
- for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
- sp_ses_uac_t *tpos;
- struct hlist_node *pos, *n;
- hlist_for_each_entry_safe(tpos, pos, n, &mgr->uac_buckets[i], sp_ses_uac_t, hentry) {
- if (epid == tpos->remote_epid) {
- uac_process_errs(tpos, Error_NetBroken);
- }
- }
- }
- mgr_unlock(mgr);
- }
- }
- int sp_ses_mgr_create(sp_svc_t *svc, sp_ses_mgr_callback_t *cb, sp_ses_mgr_t **p_mgr)
- {
- sp_ses_mgr_t *mgr = MALLOC_T(sp_ses_mgr_t);
- int i;
- mgr->ses_cnt = 0;
- mgr->svc = svc;
- memcpy(&mgr->cb, cb, sizeof(sp_ses_mgr_callback_t));
- REF_COUNT_INIT(&mgr->ref_cnt);
- for (i = 0;i < CONN_BUCKET_SIZE; ++i) {
- INIT_HLIST_HEAD(&mgr->uac_buckets[i]);
- INIT_HLIST_HEAD(&mgr->uas_buckets[i]);
- }
- for (i = 0; i < TSX_BUCKET_SIZE; ++i) {
- INIT_HLIST_HEAD(&mgr->uac_tsx_buckets[i]);
- INIT_HLIST_HEAD(&mgr->uas_tsx_buckets[i]);
- }
- InitializeCriticalSection(&mgr->lock);
- *p_mgr = mgr;
- return 0;
- }
- void sp_ses_mgr_destroy(sp_ses_mgr_t *mgr)
- {
- sp_ses_mgr_dec_ref(mgr);
- }
- int sp_ses_mgr_cancel_all(sp_ses_mgr_t *mgr)
- {
- int i;
- mgr_lock(mgr);
- for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
- sp_ses_uas_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->uas_buckets[i], sp_ses_uas_t, hentry) {
- sp_ses_uas_close(tpos);
- }
- }
- for (i = 0; i < CONN_BUCKET_SIZE; ++i) {
- sp_ses_uac_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->uac_buckets[i], sp_ses_uac_t, hentry) {
- sp_ses_uac_close(tpos);
- }
- }
- mgr_unlock(mgr);
- return 0;
- }
- int sp_ses_mgr_get_conn_cnt(sp_ses_mgr_t *mgr)
- {
- return mgr->ses_cnt;
- }
- int sp_ses_mgr_start(sp_ses_mgr_t *mgr)
- {
- int rc;
- rc = sp_svc_add_pkt_handler(mgr->svc, (int)mgr, SP_PKT_SES, &mgr_on_pkt, mgr);
- if (rc == 0)
- rc = sp_svc_add_sys_handler(mgr->svc, (int)mgr, &mgr_on_sys, mgr);
- return rc;
- }
- int sp_ses_mgr_stop(sp_ses_mgr_t *mgr)
- {
- int rc;
- rc = sp_svc_remove_pkt_handler(mgr->svc, (int)mgr, SP_PKT_SES);
- if (rc == 0)
- rc = sp_svc_remove_sys_handler(mgr->svc, (int)mgr);
- return rc;
- }
- sp_svc_t *sp_ses_mgr_get_svc(sp_ses_mgr_t *mgr)
- {
- return mgr->svc;
- }
- static void __sp_ses_mgr_destroy(sp_ses_mgr_t *mgr)
- {
- if (mgr->cb.on_destroy)
- mgr->cb.on_destroy(mgr, mgr->cb.user_data);
- TOOLKIT_ASSERT(mgr->ses_cnt == 0);
- DeleteCriticalSection(&mgr->lock);
- free(mgr);
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_mgr, sp_ses_mgr_t, ref_cnt, __sp_ses_mgr_destroy)
- static __inline void uac_lock(sp_ses_uac_t *ses)
- {
- spinlock_enter(&ses->lock, -1);
- }
- static __inline void uac_unlock(sp_ses_uac_t *ses)
- {
- spinlock_leave(&ses->lock);
- }
- static void uac_process_connack(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id)
- {
- int trigger = 0;
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_CONNECTING) {
- uac->state = SP_SES_STATE_CONNECTED;
- uac->state_begin_time = y2k_time_now();
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- trigger++;
- } else {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "current state is not connecting!");
- sp_svc_post(uac->mgr->svc, epid, svc_id, SP_PKT_SES|SES_CMD_ERRC, conn_id, NULL);
- }
- if (trigger)
- uac_trigger(uac, 0, 1);
- uac_unlock(uac);
- }
- static void uac_process_redirect(sp_ses_uac_t *uac, int epid, int svc_id, int conn_id, int redirect_ent_id, iobuffer_t **p_pkt)
- {
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_CONNECTING) {
- ++uac->redirect_cnt;
- if (uac->redirect_cnt >= MAX_REDIRECT) {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac has exceed max_direct value, maybe redirection enter end loop!");
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac_trigger(uac, Error_NetBroken, 1);
- } else {
- sp_entity_t *ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, redirect_ent_id);
- if (ent->state == EntityState_Killed || ent->state == EntityState_Lost || ent->state == EntityState_NoStart) {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "redirected failed! redirect ent state invalid! redirect id:%d", redirect_ent_id);
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac_trigger(uac, Error_InvalidState, 1);
- } else {
- uac->remote_epid = ent->mod->cfg->idx;
- uac->remote_svc_id = ent->cfg->idx;
- sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_CONN, conn_id, p_pkt);
- //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "svc_post, epid:%d, svc_id:%d", uac->remote_epid, uac->remote_svc_id);
- }
- }
- }
- uac_unlock(uac);
- }
- static void uac_process_errs(sp_ses_uac_t *uac, int error)
- {
- int trigger = 0;
- int connect = 0;
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_INIT) {
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac->error = error;
- } else if (uac->state == SP_SES_STATE_CONNECTING) {
- trigger++;
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac->error = error;
- connect ++;
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- } else if (uac->state == SP_SES_STATE_CONNECTED) {
- sp_tsx_uac_t *pos, *n;
- uac->state = SP_SES_STATE_ERROR;
- uac->state_begin_time = y2k_time_now();
- uac->error = error;
- list_for_each_entry_safe(pos, n, &uac->tsx_list, sp_tsx_uac_t, entry) {
- tsx_uac_process_errs(pos, error);
- }
- trigger++;
- }
- if (trigger)
- uac_trigger(uac, error, connect);
- uac_unlock(uac);
- }
- static void __threadpool_uac_trigger_close(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
- {
- sp_ses_uac_t *uac = (sp_ses_uac_t*)arg;
- sp_ses_mgr_t *mgr = uac->mgr;
-
- int error = (int)param1;
- sp_uid_t rsn = sp_svc_new_runserial(mgr->svc);
- sp_rsn_context_t rsn_ctx;
- sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_FRAMEWORK, &rsn_ctx);
- sp_svc_push_runserial_context(mgr->svc, &rsn_ctx);
- uac->cb.on_close(uac, error, uac->cb.user_data);
- sp_ses_uac_dec_ref(uac); // @
- sp_svc_pop_runserial_context(mgr->svc);
- }
- static void uac_trigger(sp_ses_uac_t *uac, int error, int connect)
- {
- if (connect) {
- uac->cb.on_connect(uac, error, uac->cb.user_data);
- sp_ses_uac_dec_ref(uac); // @
- } else {
- if (sp_iom_get_poll_thread_id(sp_svc_get_iom(uac->mgr->svc)) == GetCurrentThreadId()) { //
- int rc;
- sp_ses_uac_inc_ref(uac); // @
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uac->mgr->svc), NULL, &__threadpool_uac_trigger_close, uac, error, 0);
- if (rc != 0) {
- sp_ses_uac_dec_ref(uac); // @
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "thread pool queue uac_trirget_close event failed!");
- }
- } else {
- uac->cb.on_close(uac, error, uac->cb.user_data);
- }
- }
- }
- static void uac_on_timer(timer_queue_t *q, timer_entry *timer, int err)
- {
- sp_ses_uac_t *uac = (sp_ses_uac_t*)timer->user_data;
- if (!err && uac->op_timer == timer && uac->state == SP_SES_STATE_CONNECTING) {
- int trigger = 0;
- uac_lock(uac);
- if (uac->op_timer == timer) {
- if (uac->state == SP_SES_STATE_CONNECTING) {
- uac->state = SP_SES_STATE_ERROR; // time out
- uac->state_begin_time = y2k_time_now();
- trigger ++;
- }
- uac->op_timer = NULL;
- }
- if (trigger)
- uac_trigger(uac, Error_TimeOut, 1);
- uac_unlock(uac);
- }
- sp_ses_uac_dec_ref(uac); // @2
- free(timer);
- }
- int sp_ses_uac_create(sp_ses_mgr_t *mgr, int remote_epid, int remote_svc_id, sp_ses_uac_callback *cb, sp_ses_uac_t **p_ses)
- {
- sp_ses_uac_t *ses;
- int slot;
- if (!cb || !cb->on_connect)
- return Error_Param;
- ses = MALLOC_T(sp_ses_uac_t);
- ses->mgr = mgr;
- ses->conn_id = sp_env_new_id(sp_get_env());
- ses->remote_svc_id = remote_svc_id;
- ses->remote_epid = remote_epid;
- ses->state = SP_SES_STATE_INIT;
- ses->state_begin_time = y2k_time_now();
- ses->error = 0;
- ses->op_timer = NULL;
- ses->tsx_cnt = 0;
- ses->begin_time = y2k_time_now();
- ses->redirect_cnt = 0;
- memcpy(&ses->cb, cb, sizeof(sp_ses_uac_callback));
- INIT_HLIST_NODE(&ses->hentry);
- REF_COUNT_INIT(&ses->ref_cnt);
- spinlock_init(&ses->lock);
- INIT_LIST_HEAD(&ses->tsx_list);
- sp_ses_mgr_inc_ref(mgr);
- slot = ses->conn_id % CONN_BUCKET_SIZE;
- sp_ses_uac_inc_ref(ses);
- mgr_lock(mgr);
- hlist_add_head(&ses->hentry, &mgr->uac_buckets[slot]);
- mgr->ses_cnt++;
- mgr_unlock(mgr);
- *p_ses = ses;
- //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "ses uac created!");
- return 0;
- }
- void sp_ses_uac_destroy(sp_ses_uac_t *uac)
- {
- TOOLKIT_ASSERT(uac->state == SP_SES_STATE_TERM);
- mgr_lock(uac->mgr);
- uac->mgr->ses_cnt--;
- hlist_del_init(&uac->hentry);
- mgr_unlock(uac->mgr);
- sp_ses_uac_dec_ref(uac);
- sp_ses_uac_dec_ref(uac);
- }
- int sp_ses_uac_async_connect(sp_ses_uac_t *uac, int timeout, iobuffer_t **conn_pkt)
- {
- int rc;
- if (uac->state != SP_SES_STATE_INIT)
- return Error_Unexpect;
- sp_ses_uac_inc_ref(uac);
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_INIT) {
- sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc);
- sp_ses_uac_inc_ref(uac); // @1
- if (rsn_ctx) {
- iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- u__int64_t rsn = 0;
- int t = 0;
- iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write_head(*conn_pkt, IOBUF_T_I4, &t, 0);
- }
- rc = sp_svc_send(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_CONN, uac->conn_id, conn_pkt);
- WLog_DBG(TAG, "svc_send, epid:%d, svc_id:%d, rc: %d", uac->remote_epid, uac->remote_svc_id, rc);
- if (rc == 0) {
- if (timeout >= 0) {
- uac->op_timer = MALLOC_T(timer_entry);
- uac->op_timer->cb = &uac_on_timer;
- uac->op_timer->user_data = uac;
- sp_ses_uac_inc_ref(uac); // @2
- rc = sp_iom_schedule_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, (unsigned int)timeout);
- if (rc != 0) {
- iobuffer_pop_count(*conn_pkt, 24);
- sp_ses_uac_dec_ref(uac); // @2
- sp_ses_uac_dec_ref(uac); // @1
- free(uac->op_timer);
- uac->op_timer = NULL;
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "post conn, create timer failed!");
- }
- }
- } else {
- iobuffer_pop_count(*conn_pkt, 24);
- sp_ses_uac_dec_ref(uac); // @1
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac issue async connect failed, post msg failed, may be remote party offline!");
- rc = Error_IO;
- }
- } else {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac state is not INIT !");
- rc = Error_Unexpect;
- }
- uac->state = rc ? SP_SES_STATE_ERROR : SP_SES_STATE_CONNECTING;
- uac->state_begin_time = y2k_time_now();
- uac_unlock(uac);
- sp_ses_uac_dec_ref(uac);
- return rc;
- }
- int sp_ses_uac_close(sp_ses_uac_t *uac)
- {
- int rc = 0;
- int trigger = 0;
- int connected = 0;
- DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "sp_ses_uac_close invoked!");
- sp_ses_uac_inc_ref(uac);
- uac_lock(uac);
- if (uac->state == SP_SES_STATE_INIT) {
- uac->state = SP_SES_STATE_TERM;
- uac->state_begin_time = y2k_time_now();
- } else if (uac->state == SP_SES_STATE_CONNECTING) {
- trigger++;
- uac->state = SP_SES_STATE_TERM;
- uac->state_begin_time = y2k_time_now();
- if (uac->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(uac->mgr->svc), uac->op_timer, 1);
- uac->op_timer = NULL;
- }
- connected++;
- } else if (uac->state == SP_SES_STATE_CONNECTED) {
- sp_tsx_uac_t *pos, *n;
- uac->state = SP_SES_STATE_TERM;
- uac->state_begin_time = y2k_time_now();
- list_for_each_entry_safe(pos, n, &uac->tsx_list, sp_tsx_uac_t, entry) {
- sp_tsx_uac_close(pos);
- }
- trigger++;
- } else if (uac->state == SP_SES_STATE_ERROR) {
- uac->state = SP_SES_STATE_TERM;
- uac->state_begin_time = y2k_time_now();
- } else {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac->state = %d", uac->state);
- rc = Error_Duplication;
- }
- if (trigger)
- uac_trigger(uac, Error_Closed, connected);
- uac_unlock(uac);
- if (rc == 0) {
- sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_FINC, uac->conn_id, NULL);
- }
- sp_ses_uac_dec_ref(uac);
- return rc;
- }
- int sp_ses_uac_get_state(sp_ses_uac_t *uac)
- {
- return uac->state;
- }
- int sp_ses_uac_send_info(sp_ses_uac_t *uac, int method_id, int method_sig, iobuffer_t **info_pkt)
- {
- int rc;
- if (uac->state == SP_SES_STATE_CONNECTED) {
- sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc);
- if (rsn_ctx) {
- iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- u__int64_t rsn = 0;
- int t = 0;
- iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &t, 0);
- }
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_sig, 0);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_id, 0);
- rc = sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_INFO, uac->conn_id, info_pkt);
- WLog_INFO(TAG, "sp_ses_uac_send_info from %d", uac->remote_epid);
- } else {
- DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "send info failed, uac is not in connected state!");
- rc = Error_IO;
- }
- return rc;
- }
- int sp_ses_uac_get_remote_epid(sp_ses_uac_t *uac)
- {
- return uac->remote_epid;
- }
- int sp_ses_uac_get_remote_svc_id(sp_ses_uac_t *uac)
- {
- return uac->remote_svc_id;
- }
- int sp_ses_uac_get_conn_id(sp_ses_uac_t *uac)
- {
- return uac->conn_id;
- }
- sp_ses_mgr_t *sp_ses_uac_get_mgr(sp_ses_uac_t *uac)
- {
- return uac->mgr;
- }
- int sp_ses_uac_get_tsx_cnt(sp_ses_uac_t *uac)
- {
- return uac->tsx_cnt;
- }
- static void __sp_ses_uac_destroy(sp_ses_uac_t *uac)
- {
- if (uac->cb.on_destroy)
- uac->cb.on_destroy(uac, uac->cb.user_data);
- sp_ses_mgr_dec_ref(uac->mgr);
- free(uac);
- DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "ses uac destroyed!");
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_uac, sp_ses_uac_t, ref_cnt, __sp_ses_uac_destroy)
- static __inline void tsx_uac_lock(sp_tsx_uac_t *tsx)
- {
- spinlock_enter(&tsx->lock, -1);
- }
- static __inline void tsx_uac_unlock(sp_tsx_uac_t *tsx)
- {
- spinlock_leave(&tsx->lock);
- }
- static void tsx_uac_trigger(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt)
- {
- iobuffer_t *pkt = ans_pkt ? *ans_pkt : NULL;
- tsx->cb.on_ans(tsx, error, end, ans_pkt, tsx->cb.user_data);
- if (error || end)
- sp_tsx_uac_dec_ref(tsx); // @
- }
- static void tsx_uac_on_timer(timer_queue_t *q, timer_entry *timer, int err)
- {
- sp_tsx_uac_t *tsx = (sp_tsx_uac_t*)timer->user_data;
- if (!err && tsx->op_timer == timer && tsx->state == SP_TSX_UAC_STATE_REQ) {
- int trigger = 0;
- tsx_uac_lock(tsx);
- if (tsx->op_timer == timer) {
- if (tsx->state == SP_TSX_UAC_STATE_REQ) {
- tsx->state = SP_TSX_UAC_STATE_ERROR; // time out
- trigger ++;
- }
- tsx->op_timer = NULL;
- }
- if (trigger)
- tsx_uac_trigger(tsx, Error_TimeOut, 1, NULL);
- tsx_uac_unlock(tsx);
- }
- sp_tsx_uac_dec_ref(tsx); // @2
- free(timer);
- }
- static void tsx_uac_process_ans(sp_tsx_uac_t *tsx, int end, iobuffer_t **ans_pkt)
- {
- int trigger = 0;
- tsx_uac_lock(tsx);
- if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) {
- if (tsx->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1);
- tsx->op_timer = NULL;
- }
- if (end) {
- tsx->state = SP_TSX_UAC_STATE_ANS;
- } else if (tsx->state == SP_TSX_UAC_STATE_INIT) {
- tsx->state = SP_TSX_UAC_STATE_TMPANS;
- }
- trigger++;
- } else {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "tsx uac current state cannot recv ack!");
- sp_svc_post(tsx->uac->mgr->svc, tsx->uac->remote_epid, tsx->uac->remote_svc_id, SP_PKT_SES|SES_CMD_ERRC, tsx->uac->conn_id, NULL);
- WLog_INFO(TAG, "tsx_uac_process_ans remote_epid:%d, remote_svc_id:%d", tsx->uac->remote_epid, tsx->uac->remote_svc_id);
- }
- if (trigger)
- tsx_uac_trigger(tsx, 0, end, ans_pkt);
- tsx_uac_unlock(tsx);
- }
- static void tsx_uac_process_errs(sp_tsx_uac_t *tsx, int error)
- {
- int trigger = 0;
- sp_tsx_uac_inc_ref(tsx);
- tsx_uac_lock(tsx);
- if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) {
- trigger++;
- if (tsx->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1);
- tsx->op_timer = NULL;
- }
- }
- if (tsx->state != SP_TSX_UAC_STATE_TERM)
- tsx->state = SP_TSX_UAC_STATE_ERROR;
- if (trigger)
- tsx_uac_trigger(tsx, error, 1, NULL);
- tsx_uac_unlock(tsx);
- sp_tsx_uac_dec_ref(tsx);
- }
- int sp_tsx_uac_create(sp_ses_uac_t *uac, int tsx_id, int method_id, int method_sig, sp_tsx_uac_callback *cb, sp_tsx_uac_t **p_tsx)
- {
- sp_tsx_uac_t *tsx;
- unsigned int slot;
- if (!cb || !cb->on_ans)
- return Error_Param;
- switch ( uac->state ) {
- case SP_SES_STATE_INIT:
- case SP_SES_STATE_CONNECTING:
- return Error_NotInit;
- case SP_SES_STATE_TERM:
- case SP_SES_STATE_ERROR:
- return Error_NetBroken;
- }
- tsx = MALLOC_T(sp_tsx_uac_t);
- tsx->state = SP_TSX_UAC_STATE_INIT;
- tsx->tsx_id = tsx_id;
- tsx->method_id = method_id;
- tsx->method_sig = method_sig;
- tsx->uac = uac;
- tsx->op_timer = NULL;
- memcpy(&tsx->cb, cb, sizeof(sp_tsx_uac_callback));
- spinlock_init(&tsx->lock);
- INIT_HLIST_NODE(&tsx->hentry);
- REF_COUNT_INIT(&tsx->ref_cnt);
- tsx->strand = strand_create();
- sp_ses_uac_inc_ref(uac);
- sp_tsx_uac_inc_ref(tsx);
- uac_lock(uac);
- list_add_tail(&tsx->entry, &uac->tsx_list);
- uac->tsx_cnt++;
- uac_unlock(uac);
- slot = jhash_3words(uac->remote_svc_id, uac->conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
- sp_tsx_uac_inc_ref(tsx);
- mgr_lock(uac->mgr);
- hlist_add_head(&tsx->hentry, &uac->mgr->uac_tsx_buckets[slot]);
- mgr_unlock(uac->mgr);
- *p_tsx = tsx;
- //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "tsx uac created!");
- return 0;
- }
- void sp_tsx_uac_destroy(sp_tsx_uac_t *tsx)
- {
- sp_ses_uac_t *uac = tsx->uac;
- TOOLKIT_ASSERT(tsx->state == SP_TSX_UAC_STATE_TERM);
- uac_lock(uac);
- uac->tsx_cnt--;
- list_del_init(&tsx->entry);
- uac_unlock(uac);
- sp_tsx_uac_dec_ref(tsx);
- mgr_lock(uac->mgr);
- hlist_del_init(&tsx->hentry);
- mgr_unlock(uac->mgr);
- sp_tsx_uac_dec_ref(tsx);
- sp_tsx_uac_dec_ref(tsx);
- }
- int sp_tsx_uac_async_req(sp_tsx_uac_t *tsx, int timeout, iobuffer_t **req_pkt)
- {
- int rc;
- sp_ses_uac_t *uac;
- if (tsx->state != SP_TSX_UAC_STATE_INIT)
- return Error_Unexpect;
- uac = tsx->uac;
- if (uac->state != SP_SES_STATE_CONNECTED)
- return Error_NetBroken;
- tsx_uac_lock(tsx);
- if (tsx->state == SP_TSX_UAC_STATE_INIT) {
- sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uac->mgr->svc);
- if (rsn_ctx) {
- iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- u__int64_t rsn = 0;
- int t = 0;
- iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &t, 0);
- }
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &timeout, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->method_sig, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->method_id, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &tsx->tsx_id, 0);
- sp_tsx_uac_inc_ref(tsx); // @1
- rc = sp_svc_post(uac->mgr->svc, uac->remote_epid, uac->remote_svc_id, SP_PKT_SES|SES_CMD_REQ, uac->conn_id, req_pkt);
- WLog_INFO(TAG, "sp_tsx_uac_async_req remote_epid:%d, remote_svc_id:%d, method_sig:%d, method_id:%d", uac->remote_epid, uac->remote_svc_id, tsx->method_sig, tsx->method_id);
- if (rc == 0) {
- if (timeout >= 0) {
- tsx->op_timer = MALLOC_T(timer_entry);
- tsx->op_timer->cb = &tsx_uac_on_timer;
- tsx->op_timer->user_data = tsx;
- sp_tsx_uac_inc_ref(tsx); // @2
- rc = sp_iom_schedule_timer(sp_svc_get_iom(uac->mgr->svc), tsx->op_timer, (unsigned int)timeout);
- if (rc != 0) {
- sp_tsx_uac_dec_ref(tsx); // @2
- sp_tsx_uac_dec_ref(tsx); // @1
- free(tsx->op_timer);
- tsx->op_timer = NULL;
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uac tsx async_req, create timer failed!");
- }
- }
- } else {
- sp_tsx_uac_dec_ref(tsx); // @1
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "tsx uac issue async req failed, may be remote party offline!");
- rc = Error_IO;
- }
- } else {
- rc = Error_Unexpect;
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "tsx uac state is not INIT!");
- }
- tsx->state = rc ? SP_TSX_UAC_STATE_ERROR : SP_TSX_UAC_STATE_REQ;
- tsx_uac_unlock(tsx);
- return rc;
- }
- int sp_tsx_uac_close(sp_tsx_uac_t *tsx)
- {
- int trigger = 0;
- tsx_uac_lock(tsx);
- if (tsx->state == SP_TSX_UAC_STATE_REQ || tsx->state == SP_TSX_UAC_STATE_TMPANS) {
- trigger++;
- if (tsx->op_timer) {
- sp_iom_cancel_timer(sp_svc_get_iom(tsx->uac->mgr->svc), tsx->op_timer, 1);
- tsx->op_timer = NULL;
- }
- }
- if (tsx->state != SP_TSX_UAC_STATE_TERM)
- tsx->state = SP_TSX_UAC_STATE_TERM;
- if (trigger)
- tsx_uac_trigger(tsx, Error_Closed, 1, NULL);
- tsx_uac_unlock(tsx);
- return 0;
- }
- sp_ses_uac_t *sp_tsx_uac_get_session(sp_tsx_uac_t *tsx)
- {
- return tsx->uac;
- }
- int sp_tsx_uac_get_state(sp_tsx_uac_t *tsx)
- {
- return tsx->state;
- }
- int sp_tsx_uac_get_id(sp_tsx_uac_t *tsx)
- {
- return tsx->tsx_id;
- }
- static void __sp_tsx_uac_destroy(sp_tsx_uac_t *tsx)
- {
- if (tsx->cb.on_destroy)
- tsx->cb.on_destroy(tsx, tsx->cb.user_data);
- sp_ses_uac_dec_ref(tsx->uac);
- strand_destroy(tsx->strand);
- free(tsx);
- //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "tsx uac destroyed!");
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_tsx_uac, sp_tsx_uac_t, ref_cnt, __sp_tsx_uac_destroy)
- static __inline void uas_lock(sp_ses_uas_t *uas)
- {
- spinlock_enter(&uas->lock, -1);
- }
- static __inline void uas_unlock(sp_ses_uas_t *uas)
- {
- spinlock_leave(&uas->lock);
- }
- static const void* method_strand_getkey(const void *obj)
- {
- method_strand_t *ms = (method_strand_t *)obj;
- return (const void*)ms->type;
- }
- static unsigned int method_strand_hash(const void *key)
- {
- return (unsigned int)key;
- }
- static int method_strand_cmpkey(const void *key_x, const void *key_y)
- {
- return (int)key_x - (int)key_y;
- }
- static __inline method_strand_t* create_method_strand(int type)
- {
- method_strand_t *ms = MALLOC_T(method_strand_t);
- ms->strand = strand_create();
- ms->type = type;
- INIT_HLIST_NODE(&ms->node);
- return ms;
- }
- static __inline void delete_method_strand(method_strand_t *ms)
- {
- strand_destroy(ms->strand);
- free(ms);
- }
- static void uas_process_errc(sp_ses_uas_t *uas, int error)
- {
- int trigger = 0;
- uas_lock(uas);
- if (uas->state == SP_SES_STATE_INIT) {
- trigger++;
- uas->state = SP_SES_STATE_ERROR;
- uas->state_begin_time = y2k_time_now();
- uas->error = error;
- } else if (uas->state == SP_SES_STATE_CONNECTED) {
- trigger++;
- uas->state = SP_SES_STATE_ERROR;
- uas->state_begin_time = y2k_time_now();
- uas->error = error;
- }
- uas_unlock(uas);
- if (trigger)
- uas_trigger(uas, error);
- }
- static void __threadpool_uas_trigger(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
- {
- sp_ses_uas_t *uas = (sp_ses_uas_t*)arg;
- sp_ses_mgr_t *mgr = uas->mgr;
- int error = (int)param1;
- sp_uid_t rsn = sp_svc_new_runserial(mgr->svc);
- sp_rsn_context_t rsn_ctx;
- sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_FRAMEWORK, &rsn_ctx);
- sp_svc_push_runserial_context(mgr->svc, &rsn_ctx);
- if (uas->cb.on_close) {
- if (error != Error_Closed) {
- sp_ses_uas_close(uas);
- }
- uas->cb.on_close(uas, error, uas->cb.user_data);
- }
- sp_ses_uas_dec_ref(uas); // @
- sp_svc_pop_runserial_context(mgr->svc);
- }
- static void uas_trigger(sp_ses_uas_t *uas, int error)
- {
- int rc;
- sp_ses_uas_inc_ref(uas); // @
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), uas->strand, &__threadpool_uas_trigger, uas, error, 0);
- if (rc != 0) {
- sp_ses_uas_dec_ref(uas); // @
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "uas_trigger, queue work item failed!");
- }
- }
- static strand_t *uas_get_strand(sp_ses_uas_t *uas, int method_id)
- {
- method_strand_t *ms;
- uas_lock(uas);
- ms = hashset_find(uas->method_strand, (const void *)method_id);
- if (!ms) {
- ms = create_method_strand(method_id);
- hashset_add(uas->method_strand, ms);
- }
- uas_unlock(uas);
- return ms->strand;
- }
- static void __threadpool_uas_process_info(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
- {
- sp_ses_uas_t *uas = (sp_ses_uas_t*)arg;
- sp_ses_mgr_t *mgr = uas->mgr;
- int method_id = (int)param1;
- int method_sig;
- iobuffer_t *info_pkt = (iobuffer_t*)param2;
- iobuffer_read(info_pkt, IOBUF_T_I4, &method_sig, 0);
- if (uas->state == SP_SES_STATE_CONNECTED ||
- (uas->state == SP_SES_STATE_ERROR && uas->error == Error_PeerClose)) {
- sp_rsn_context_t from_rsn;
- sp_rsn_context_t rsn;
- iobuffer_read(info_pkt, IOBUF_T_I4, &from_rsn.depth, 0);
- iobuffer_read(info_pkt, IOBUF_T_I4, &from_rsn.original_type, 0);
- iobuffer_read(info_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0);
- iobuffer_read(info_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0);
- sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn);
- sp_svc_push_runserial_context(mgr->svc, &rsn);
- uas->cb.on_info(uas, method_id, method_sig, &info_pkt, uas->cb.user_data);
- sp_svc_pop_runserial_context(mgr->svc);
- }
- if (info_pkt)
- iobuffer_dec_ref(info_pkt);
- sp_ses_uas_dec_ref(uas);//@
- }
- static strand_t *uas_decide_strand(sp_ses_uas_t *uas, int method_id, int overlap)
- {
- strand_t *strand = NULL;
- if (uas->strand) {
- if (overlap) {
- strand = NULL;
- } else {
- strand = uas->strand;
- }
- } else {
- if (!overlap) {
- strand = uas_get_strand(uas, method_id);
- }
- }
- return strand;
- }
- static void uas_process_info(sp_ses_uas_t *uas, int method_id, int method_sig, iobuffer_t **info_pkt)
- {
- int rc;
- int overlap;
- rc = (*uas->cb.get_method_attr)(uas, method_id, method_sig, &overlap, uas->cb.user_data);
- if (rc == 0) {
- strand_t *strand = uas_decide_strand(uas, method_id, overlap);
- iobuffer_write_head(*info_pkt, IOBUF_T_I4, &method_sig, 0);
- sp_ses_uas_inc_ref(uas); //@
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), strand, &__threadpool_uas_process_info, uas, method_id, (param_size_t)*info_pkt);
- if (rc != 0) {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "process info queue work item failed!");
- sp_ses_uas_dec_ref(uas); //@
- } else {
- *info_pkt = NULL;
- }
- } else {
- DbgWithLinkForC(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM, "method signature not match: method_id=%d, method_sig=%d, error=%d", method_id, method_sig, rc);
- }
- }
- static void uas_process_req_reply_error(sp_ses_uas_t *uas, int tsx_id, int err)
- {
- int v = 0;
- char* tmpStr = "";
- iobuffer_t *ans_pkt = iobuffer_create(-1, -1);
- TOOLKIT_ASSERT(err != 0);
- iobuffer_write(ans_pkt, IOBUF_T_I4, &err, 0); // sys error
-
- iobuffer_write(ans_pkt, IOBUF_T_I4, &v, 0); // user error
- iobuffer_write(ans_pkt, IOBUF_T_STR, tmpStr, 0);
- iobuffer_write_head(ans_pkt, IOBUF_T_I4, &tsx_id, 0);
- sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_ANS, uas->conn_id, &ans_pkt);
- WLog_INFO(TAG, "uas_process_req_reply_error remote_epid:%d, remote_svc_id:%d", uas->remote_epid, uas->remote_svc_id);
- if (ans_pkt)
- iobuffer_dec_ref(ans_pkt);
- }
- static void __threadpool_uas_process_req(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
- {
- sp_ses_uas_t *uas = (sp_ses_uas_t*)arg;
- sp_ses_mgr_t *mgr = uas->mgr;
- int tsx_id = (int)param1;
- int method_id;
- int method_sig;
- int timeout;
- iobuffer_t *req_pkt = (iobuffer_t*)param2;
- /** 这个数据是准的,不然实体层不会找到对应的接口序号和签名,不一定,这些数据是过来的时候才塞进去的 [Gifur@2022624]*/
- //WLog_DBG(TAG, "read method info");
- iobuffer_read(req_pkt, IOBUF_T_I4, &method_id, 0);
- iobuffer_read(req_pkt, IOBUF_T_I4, &method_sig, 0);
- iobuffer_read(req_pkt, IOBUF_T_I4, &timeout, 0);
- if (uas->state == SP_SES_STATE_CONNECTED) {
- sp_rsn_context_t from_rsn;
- sp_rsn_context_t rsn;
- iobuffer_read(req_pkt, IOBUF_T_I4, &from_rsn.depth, 0);
- iobuffer_read(req_pkt, IOBUF_T_I4, &from_rsn.original_type, 0);
- iobuffer_read(req_pkt, IOBUF_T_I8, &from_rsn.current_rsn, 0);
- iobuffer_read(req_pkt, IOBUF_T_I8, &from_rsn.previous_rsn, 0);
- sp_rsn_context_init_downstream(sp_svc_new_runserial(mgr->svc), &from_rsn, &rsn);
- sp_svc_push_runserial_context(mgr->svc, &rsn);
- uas->cb.on_req(uas, tsx_id, method_id, method_sig, timeout, &req_pkt, uas->cb.user_data);
- sp_svc_pop_runserial_context(mgr->svc);
- } else {
- uas_process_req_reply_error(uas, tsx_id, Error_InvalidState);
- }
- if (req_pkt)
- iobuffer_dec_ref(req_pkt);
- sp_ses_uas_dec_ref(uas); //@
- }
- static void uas_process_req(sp_ses_uas_t *uas, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt)
- {
- int rc;
- int overlap;
- rc = uas->cb.get_method_attr(uas, method_id, method_sig, &overlap, uas->cb.user_data);
- if (rc == 0) {
- strand_t *strand = uas_decide_strand(uas, method_id, overlap);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &timeout, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &method_sig, 0);
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &method_id, 0);
- sp_ses_uas_inc_ref(uas); //@
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(uas->mgr->svc), strand, &__threadpool_uas_process_req, uas, tsx_id, (param_size_t)*req_pkt);
- if (rc != 0) {
- uas_process_req_reply_error(uas, tsx_id, rc);
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "process req queue work item failed!");
- sp_ses_uas_dec_ref(uas); //@
- } else {
- *req_pkt = NULL;
- }
- } else {
- uas_process_req_reply_error(uas, tsx_id, rc);
- DbgWithLinkForC(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM, "method signature not match: tsx_id=%d, method_id=%d, method_sig=%d, error=%d", tsx_id, method_id, method_sig, rc);
- }
- }
- int sp_ses_uas_create(sp_ses_mgr_t *mgr, int remote_epid, int remote_svc_id, int conn_id, int overlap, sp_ses_uas_callback *cb, sp_ses_uas_t **p_uas)
- {
- unsigned int slot;
- sp_ses_uas_t *uas;
- if (!cb || !cb->on_req || !cb->on_info)
- return Error_Param;
- uas = MALLOC_T(sp_ses_uas_t);
- uas->mgr = mgr;
- uas->conn_id = conn_id;
- uas->remote_epid = remote_epid;
- uas->remote_svc_id = remote_svc_id;
- spinlock_init(&uas->lock);
- INIT_HLIST_NODE(&uas->hentry);
- memcpy(&uas->cb, cb, sizeof(sp_ses_uas_callback));
- uas->state = SP_SES_STATE_INIT;
- uas->state_begin_time = y2k_time_now();
- uas->error = 0;
- uas->tsx_cnt = 0;
- uas->begin_time = y2k_time_now();
- uas->strand = overlap ? NULL : strand_create();
- INIT_LIST_HEAD(&uas->tsx_list);
- REF_COUNT_INIT(&uas->ref_cnt);
- uas->method_strand = overlap ? hashset_create(offset_of(method_strand_t, node), &method_strand_getkey, &method_strand_hash, &method_strand_cmpkey) : NULL;
- sp_ses_mgr_inc_ref(mgr);
- slot = ((unsigned int)conn_id) % CONN_BUCKET_SIZE;
- sp_ses_uas_inc_ref(uas);
- mgr_lock(mgr);
- hlist_add_head(&uas->hentry, &mgr->uas_buckets[slot]);
- mgr->ses_cnt++;
- mgr_unlock(mgr);
- *p_uas = uas;
- //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "ses uas created!");
- return 0;
- }
- void sp_ses_uas_destroy(sp_ses_uas_t *uas)
- {
- TOOLKIT_ASSERT(uas->state == SP_SES_STATE_TERM);
- mgr_lock(uas->mgr);
- uas->mgr->ses_cnt--;
- hlist_del_init(&uas->hentry);
- mgr_unlock(uas->mgr);
- sp_ses_uas_dec_ref(uas);
- sp_ses_uas_dec_ref(uas);
- }
- int sp_ses_uas_close(sp_ses_uas_t *uas)
- {
- int rc = 0;
- int trigger = 0;
- sp_ses_uas_inc_ref(uas);
- uas_lock(uas);
- if (uas->state == SP_SES_STATE_INIT) {
- uas->state = SP_SES_STATE_TERM;
- uas->state_begin_time = y2k_time_now();
- trigger++;
- } else if (uas->state == SP_SES_STATE_CONNECTED) {
- sp_tsx_uas_t *pos, *n;
- uas->state = SP_SES_STATE_TERM;
- uas->state_begin_time = y2k_time_now();
- trigger++;
- list_for_each_entry_safe(pos, n, &uas->tsx_list, sp_tsx_uas_t, entry) {
- sp_tsx_uas_close(pos);
- }
- } else if (uas->state == SP_SES_STATE_ERROR) {
- uas->state = SP_SES_STATE_TERM;
- uas->state_begin_time = y2k_time_now();
- } else if (uas->state == SP_SES_STATE_TERM) {
- rc = Error_Duplication;
- } else {
- rc = Error_Unexpect;
- }
- if (trigger)
- uas_trigger(uas, Error_Closed);
- uas_unlock(uas);
- if (rc == 0) {
- sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_FINS, uas->conn_id, NULL);
- }
-
- sp_ses_uas_dec_ref(uas);
- return rc;
- }
- int sp_ses_uas_get_remote_epid(sp_ses_uas_t *uas)
- {
- return uas->remote_epid;
- }
- int sp_ses_uas_get_remote_svc_id(sp_ses_uas_t *uas)
- {
- return uas->remote_svc_id;
- }
- int sp_ses_uas_get_conn_id(sp_ses_uas_t *uas)
- {
- return uas->conn_id;
- }
- sp_ses_mgr_t *sp_ses_uas_get_mgr(sp_ses_uas_t *uas)
- {
- return uas->mgr;
- }
- int sp_ses_uas_get_state(sp_ses_uas_t *uas)
- {
- return uas->state;
- }
- int sp_ses_uas_get_tsx_cnt(sp_ses_uas_t *uas)
- {
- return uas->tsx_cnt;
- }
- int sp_ses_uas_accept(sp_ses_uas_t *uas)
- {
- int rc;
- uas_lock(uas);
- if (uas->state == SP_SES_STATE_INIT) {
- uas->state = SP_SES_STATE_CONNECTED;
- uas->state_begin_time = y2k_time_now();
- rc = Error_Succeed;
- } else if (uas->state == SP_SES_STATE_CONNECTED) {
- rc = Error_Duplication;
- } else {
- rc = Error_NetBroken;
- }
- uas_unlock(uas);
- if (rc == 0) {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- sp_rsn_context_t *rsn_ctx = sp_svc_get_runserial_context(uas->mgr->svc);
- if (rsn_ctx) {
- iobuffer_write_head(pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write_head(pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- u__int64_t rsn = 0;
- int t = 0;
- iobuffer_write_head(pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(pkt, IOBUF_T_I8, &rsn, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
- }
- rc = sp_svc_post(uas->mgr->svc, uas->remote_epid, uas->remote_svc_id, SP_PKT_SES|SES_CMD_CONNACK, uas->conn_id, &pkt);
- if (pkt) {
- iobuffer_dec_ref(pkt);
- }
- }
- return rc;
- }
- static void __sp_ses_uas_destroy(sp_ses_uas_t *uas)
- {
- if (uas->cb.on_destroy) {
- uas->cb.on_destroy(uas, uas->cb.user_data);
- }
- sp_ses_mgr_dec_ref(uas->mgr);
- if (uas->strand)
- strand_destroy(uas->strand);
- if (uas->method_strand) {
- method_strand_t *tpos;
- struct hlist_node *pos, *n;
- int slot;
- hashset_for_each_safe(tpos, pos, n, slot, uas->method_strand, method_strand_t) {
- hashset_remove(uas->method_strand, (void*)tpos->type);
- delete_method_strand(tpos);
- }
- hashset_destroy(uas->method_strand);
- }
- free(uas);
- //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "ses uas destroy!");
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_ses_uas, sp_ses_uas_t, ref_cnt, __sp_ses_uas_destroy)
- static __inline void tsx_uas_lock(sp_tsx_uas_t *tsx)
- {
- spinlock_enter(&tsx->lock, -1);
- }
- static __inline void tsx_uas_unlock(sp_tsx_uas_t *tsx)
- {
- spinlock_leave(&tsx->lock);
- }
- int sp_tsx_uas_create(sp_ses_uas_t *uas, int tsx_id, sp_tsx_uas_callback *cb, sp_tsx_uas_t **p_tsx)
- {
- sp_tsx_uas_t *tsx = MALLOC_T(sp_tsx_uas_t);
- unsigned int slot;
- tsx->state = SP_TSX_UAS_STATE_INIT;
- uas->state_begin_time = y2k_time_now();
- tsx->uas = uas;
- tsx->tsx_id = tsx_id;
- spinlock_init(&tsx->lock);
- memcpy(&tsx->cb, cb, sizeof(sp_tsx_uas_callback));
- INIT_HLIST_NODE(&tsx->hentry);
- REF_COUNT_INIT(&tsx->ref_cnt);
- sp_ses_uas_inc_ref(uas);
- uas_lock(uas);
- sp_tsx_uas_inc_ref(tsx);
- list_add_tail(&tsx->entry, &uas->tsx_list);
- uas->tsx_cnt++;
- uas_unlock(uas);
- slot = jhash_3words(uas->remote_svc_id, uas->conn_id, tsx_id, 0) % TSX_BUCKET_SIZE;
- mgr_lock(uas->mgr);
- sp_tsx_uas_inc_ref(tsx);
- hlist_add_head(&tsx->hentry, &uas->mgr->uas_tsx_buckets[slot]);
- mgr_unlock(uas->mgr);
- *p_tsx = tsx;
- //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "tsx uas created!");
- return 0;
- }
- void sp_tsx_uas_destroy(sp_tsx_uas_t *tsx)
- {
- sp_ses_uas_t *uas = tsx->uas;
- TOOLKIT_ASSERT(tsx->state == SP_TSX_UAS_STATE_TERM);
- uas_lock(uas);
- uas->tsx_cnt--;
- list_del_init(&tsx->entry);
- uas_unlock(uas);
- sp_tsx_uas_dec_ref(tsx);
- mgr_lock(uas->mgr);
- hlist_del_init(&tsx->hentry);
- mgr_unlock(uas->mgr);
- sp_tsx_uas_dec_ref(tsx);
- sp_tsx_uas_dec_ref(tsx);
- }
- sp_ses_uas_t *sp_tsx_uas_get_session(sp_tsx_uas_t *tsx)
- {
- return tsx->uas;
- }
- int sp_tsx_uas_get_state(sp_tsx_uas_t *tsx)
- {
- return tsx->state;
- }
- int sp_tsx_uas_get_id(sp_tsx_uas_t *tsx)
- {
- return tsx->tsx_id;
- }
- int sp_tsx_uas_close(sp_tsx_uas_t *tsx)
- {
- int rc;
- tsx_uas_lock(tsx);
- if (tsx->state != SP_TSX_UAS_STATE_TERM) {
- tsx->state = SP_TSX_UAS_STATE_TERM;
- rc = 0;
- } else {
- rc = Error_Duplication;
- }
- tsx_uas_unlock(tsx);
- return rc;
- }
- int sp_tsx_uas_answer(sp_tsx_uas_t *tsx, int end, iobuffer_t **ans_pkt)
- {
- int rc;
- int trigger = 0;
- tsx_uas_lock(tsx);
- if (tsx->state == SP_TSX_UAS_STATE_INIT) {
- trigger++;
- rc = 0;
- tsx->state = end ? SP_TSX_UAS_STATE_ANS : SP_TSX_UAS_STATE_TMPANS;
- } else if (tsx->state == SP_TSX_UAS_STATE_TMPANS) {
- trigger++;
- rc = 0;
- tsx->state = end ? SP_TSX_UAS_STATE_ANS : SP_TSX_UAS_STATE_TMPANS;
- } else {
- rc = Error_Bug;
- }
- tsx_uas_unlock(tsx);
- if (trigger) {
- iobuffer_write_head(*ans_pkt, IOBUF_T_I4, &tsx->tsx_id, 0);
- rc = sp_svc_post(tsx->uas->mgr->svc, tsx->uas->remote_epid, tsx->uas->remote_svc_id, SP_PKT_SES | (end ? SES_CMD_ANS : SES_CMD_TMPANS), tsx->uas->conn_id, ans_pkt);
- }
- return rc;
- }
- static void __sp_tsx_uas_destroy(sp_tsx_uas_t *tsx)
- {
- if (tsx->cb.on_destroy)
- tsx->cb.on_destroy(tsx, tsx->cb.user_data);
- sp_ses_uas_dec_ref(tsx->uas);
- free(tsx);
- //DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "tsx uas destroyed!");
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_tsx_uas, sp_tsx_uas_t, ref_cnt, __sp_tsx_uas_destroy)
|