#include "precompile.h" #include #include #include "sp_svc.h" #include "sp_def.h" #include "sp_ses.h" #include "sp_tbs.h" #include "sp_env.h" #include "sp_bcm.h" #include "sp_log.h" #include "sp_uid.h" #include "sp_var.h" #include "SpBase.h" #include "toolkit.h" #include "list.h" #include "hash.h" #include "array.h" #include "refcnt.h" #include "spinlock.h" #include "strutil.h" #include "fileutil.h" #include "memutil.h" #include "toolkit.h" #include "uuid4.h" #include "dbgutil.h" #include #include #include #include "unix/evtpoll.h" #include "unix/core.h" #define TAG SPBASE_TAG("tbs") #define SESSION_BUCKET_SIZE 511 #define REGISTER_BUCKET_SIZE 127 #define RX_BUF_SIZE 1024 #define MSG_EVT_IDX (MAXIMUM_WAIT_OBJECTS + 1) #define SRV_EVT_IDX 1 #define MAX_TIMEOUT 30000 #define HDR_LEN 4 #define CONN_INVALID_ID -1 #define MSG_STOP 0 #define MSG_UAC_ON_CONNECT 1 #define MSG_UAC_ON_CLOSE 2 #define MSG_UAC_ON_DESTROY 3 #define MSG_UAC_ON_ANS 4 #define MSG_ON_EVENT 5 #define RECV_STATE_HEADER 0 /* already recv header length */ #define RECV_STATE_BODY 1 /* recv body */ #define PKT_TYPE_INFO 0 #define PKT_TYPE_SESSION 1 #define PKT_TYPE_SESSIONEND 2 #define PKT_TYPE_REQ 3 #define PKT_TYPE_REQACK 4 #define PKT_TYPE_SESSIONACK 5 #define PKT_TYPE_REGISTER_EVENT 6 #define PKT_TYPE_UNREGISTER_EVENT 7 #define PKT_TYPE_EVENT_DATA 8 #define PKT_TYPE_LOG_EVENT 9 #define PKT_TYPE_LOG_WARN 10 // add sys var get & set type by xkm@20150526 #define PKT_TYPE_SET_VAR_REQ 11 // len + type + tsx_id + varname + varvalue #define PKT_TYPE_SET_VAR_ACK 12 // len + type + conn_id + tsx_id + errcode + errmsg #define PKT_TYPE_GET_VAR_REQ 13 // len + type + tsx_id + varname #define PKT_TYPE_GET_VAR_ACK 14 // len + type + conn_id + tsx_id + errcode + varvalue/errmsg //控制信息 0xFF FF 00 00 #define PKT_TYPE_CONTROL_LINKCONTEXT (1 << 31) class link_context { public: char* businessId; //32 char* traceId; //32 char* spanId; //16 char* parentSpanId; //16 wchar_t* wbusinessId; //32 wchar_t* wtraceId; //32 wchar_t* wspanId; //16 wchar_t* wparentSpanId; //16 //重复代码,需优化 public: link_context() { businessId = traceId = spanId = parentSpanId = nullptr; wbusinessId = wtraceId = wspanId = wparentSpanId = nullptr; } ~link_context() { if (businessId) free(businessId); if (traceId) free(traceId); if (spanId) free(spanId); if (parentSpanId) free(parentSpanId); if (wbusinessId) free(wbusinessId); if (wtraceId) free(wtraceId); if (wspanId) free(wspanId); if (wparentSpanId) free(wparentSpanId); } void InitbusinessId(const std::string t_businessId) { if (nullptr == businessId) { businessId = (char*)malloc(t_businessId.length() + 1); ZeroMemory(businessId, t_businessId.length() + 1); strcpy_s(businessId, t_businessId.length() + 1, t_businessId.c_str()); } } void InittraceId(const std::string t_traceId) { if (nullptr == traceId) { traceId = (char*)malloc(t_traceId.length() + 1); ZeroMemory(traceId, t_traceId.length() + 1); strcpy_s(traceId, t_traceId.length() + 1, t_traceId.c_str()); } } void InitspanId(const std::string t_spanId) { if (nullptr == spanId) { spanId = (char*)malloc(t_spanId.length() + 1); ZeroMemory(spanId, t_spanId.length() + 1); strcpy_s(spanId, t_spanId.length() + 1, t_spanId.c_str()); } } void InitparentSpanId(const std::string t_parentSpanId) { if (nullptr == parentSpanId) { parentSpanId = (char*)malloc(t_parentSpanId.length() + 1); ZeroMemory(parentSpanId, t_parentSpanId.length() + 1); strcpy_s(parentSpanId, t_parentSpanId.length() + 1, t_parentSpanId.c_str()); } } const char* businessIdWtoA() { if (businessId) { return businessId; } else if (wbusinessId) { int n = toolkit_wcs2mbs(wbusinessId, NULL, 0); if (n > 0) { businessId = (char*)malloc(n); n = toolkit_wcs2mbs(wbusinessId, businessId, n); if (n <= 0) { free(businessId); businessId = NULL; } else { return businessId; } } } return "None"; } const char* traceIdWtoA() { if (traceId) { return traceId; } else if (wtraceId) { int n = toolkit_wcs2mbs(wtraceId, NULL, 0); if (n > 0) { traceId = (char*)malloc(n); n = toolkit_wcs2mbs(wtraceId, traceId, n); if (n <= 0) { free(traceId); traceId = NULL; } else { return traceId; } } } return "None"; } const char* spanIdWtoA() { if (spanId) { return spanId; } else if (wspanId) { int n = toolkit_wcs2mbs(wspanId, NULL, 0); if (n > 0) { spanId = (char*)malloc(n); n = toolkit_wcs2mbs(wspanId, spanId, n); if (n <= 0) { free(spanId); spanId = NULL; } else { return spanId; } } } return "None"; } const char* parentSpanIdWtoA() { if (parentSpanId) { return parentSpanId; } else if (wparentSpanId) { int n = toolkit_wcs2mbs(wparentSpanId, NULL, 0); if (n > 0) { parentSpanId = (char*)malloc(n); n = toolkit_wcs2mbs(wparentSpanId, parentSpanId, n); if (n <= 0) { free(parentSpanId); parentSpanId = NULL; } else { return parentSpanId; } } } return "None"; } }; typedef struct cmd_entry cmd_entry; typedef struct sub_session sub_session; typedef struct reg_entry reg_entry; typedef struct sock_connection sock_connection; struct cmd_entry { struct list_head entry; int msg; param_size_t param1; param_size_t param2; }; struct sub_session { struct hlist_node hentry; unsigned int id; unsigned int tsx_id; sp_ses_uac_t *uac; sock_connection *conn; }; struct reg_entry { struct hlist_node hentry; unsigned int register_id; unsigned int entity_id; sp_uid_t uid; sp_bcm_listener_t *listener; sock_connection *conn; }; struct sock_connection { SOCKET fd; int idx; int session_cnt; struct hlist_head session_bucket[SESSION_BUCKET_SIZE]; int register_cnt; struct hlist_head register_bucket[REGISTER_BUCKET_SIZE]; iobuffer_queue_t *tx_queue; iobuffer_t *rx_pkt; char rx_hdr_buf[RX_BUF_SIZE]; int rx_hdr_buf_len; int rx_state; int rx_header_len; sp_tbs_t *tbs; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sock_connection, sock_connection) struct sp_tbs_t { SOCKET server_fd; unsigned long server_ip; unsigned short server_port; HANDLE conn_evt[MAXIMUM_WAIT_OBJECTS]; HANDLE ready_evt; int ready_evt_result; sock_connection *conn[MAXIMUM_WAIT_OBJECTS]; int conn_cnt; unsigned int sub_session_seq; struct list_head cmd_list; spinlock_t cmd_list_lock; sp_log_client_t *log_client; HANDLE work_thread; DWORD work_thread_id; sp_ses_mgr_t *ses_mgr; sp_var_client_t *var_client; evtpoll_t* ep; int msg_fd; }; static void on_write(sp_tbs_t *tbs, sock_connection *conn); static void on_accept(sp_tbs_t *tbs); static void on_read(sp_tbs_t *tbs, sock_connection *conn); static void on_close(sp_tbs_t *tbs, sock_connection *conn); static void on_stop(sp_tbs_t *tbs); static void on_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id); static void on_info(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt); static void on_req(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt); static void on_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name); static void on_register_event(sp_tbs_t *tbs, sock_connection *conn, int register_id, char *entity_name, char *function_name); static void on_unregister_event(sp_tbs_t *tbs, sock_connection *conn, int register_id); static void post_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id); static void post_req_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int end, int err, const wchar_t *err_msg, iobuffer_t **ack_pkt); static void post_req_ack_ex(sp_tbs_t* tbs, sock_connection* conn, int conn_id, int tsx_id, int end, int err, int uc, const wchar_t* err_msg, iobuffer_t** ack_pkt); static void post_session_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int err, const wchar_t*err_msg); static void post_event(sp_tbs_t *tbs, reg_entry *entry, int conn_id, int tsx_id, iobuffer_t **evt_pkt); static void uac_on_connect(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error); static void uac_on_close(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error); static void uac_on_ans(sp_tbs_t* tbs, sock_connection* conn, sub_session* session, int tsx_id, int error, int user_error, const char* msg, int end, iobuffer_t** ans_pkt); static void uac_on_destroy(sp_tbs_t *tbs, sock_connection *conn, sub_session *session); static void post_msg(sp_tbs_t *tbs, int msg, param_size_t param1, param_size_t param2); static void destroy_sub_session(sp_tbs_t *tbs, sock_connection *conn, sub_session *session); postLink g_postlink = NULL; void sp_tbs_setpostlink(postLink cur) { g_postlink = cur; } static const wchar_t *get_err_msg(int rc) { static wchar_t wmsg[512]; char msg[512] = { '\0' }; memset(msg, 0, sizeof(msg)); sprintf_s(msg, 512, sp_strerror(rc)); toolkit_mbs2wcs(msg, wmsg, 512); return wmsg; } static const wchar_t *get_user_err_msg(int rc) { static wchar_t wmsg[512]; char msg[512] = { '\0' }; if (rc == 0) return NULL; memset(msg, 0, sizeof(msg)); sprintf_s(msg, 512, "UserError=%d", rc); toolkit_mbs2wcs(msg, wmsg, 512); return wmsg; } /* static const char *_GetFileName(const char *pszFilePath) { int i=strlen(pszFilePath); for( ; i>0 && pszFilePath[i-1]!='\\'; i--)NULL; return pszFilePath+i; } */ static int create_connection(sp_tbs_t *tbs) { sock_connection *conn; SOCKET new_client; int i; BOOL opt; struct sockaddr_in from_addr; int from_len = sizeof(from_addr); new_client = _accept(tbs->server_fd, (struct sockaddr*)&from_addr, &from_len); if (new_client == INVALID_SOCKET) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("accept connection failed! %s %d", _GetFileName(__FILE__), __LINE__); return Error_NetBroken; } if (make_fd_cloexec(new_client, 1) != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("make client cloexec failed"); closesocket(new_client); return Error_Unexpect; } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("accept from %s:%d", inet_ntoa(from_addr.sin_addr), ntohs(from_addr.sin_port)); if (tbs->conn_cnt >= MAXIMUM_WAIT_OBJECTS) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("exceed maximum wait objects, connection discard! %s %d", _GetFileName(__FILE__), __LINE__); closesocket(new_client); return Error_Resource; } conn = MALLOC_T(sock_connection); conn->idx = tbs->conn_cnt++; conn->fd = new_client; conn->rx_hdr_buf_len = 0; conn->tx_queue = iobuffer_queue_create(); conn->rx_pkt = NULL; conn->session_cnt = 0; conn->register_cnt = 0; conn->tbs = tbs; conn->rx_state = RECV_STATE_HEADER; conn->rx_header_len = 0; for (i = 0; i < array_size(conn->session_bucket); ++i) { INIT_HLIST_HEAD(&conn->session_bucket[i]); } for (i = 0; i < array_size(conn->register_bucket); ++i) { INIT_HLIST_HEAD(&conn->register_bucket[i]); } REF_COUNT_INIT(&conn->ref_cnt); tbs->conn[conn->idx] = conn; tbs->conn_evt[conn->idx] = WSACreateEvent(); opt = TRUE; setsockopt(new_client, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt)); opt = TRUE; setsockopt(new_client, SOL_SOCKET, SO_DONTLINGER, (char*)&opt, sizeof(opt)); //WSAEventSelect(new_client, tbs->conn_evt[conn->idx], FD_WRITE | FD_READ | FD_CLOSE); if (evtpoll_attach(tbs->ep, new_client)) { WLog_ERR(TAG, "%s: attach evtpoll for %d failed!", __FUNCTION__, new_client); goto on_error; } if (0 != evtpoll_subscribe(tbs->ep, EV_READ, new_client, (void*)(conn->idx), NULL)) { goto on_error_1; } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("sock connection created!"); return 0; on_error_1: evtpoll_detach(tbs->ep, new_client); on_error: if (new_client != INVALID_SOCKET) closesocket(new_client); FREE(conn); return Error_Unexpect; } static void __destroy_connection(sock_connection *conn) { sp_tbs_t *tbs = conn->tbs; if (conn->fd != INVALID_SOCKET) { WSAEventSelect(conn->fd, tbs->conn_evt[conn->idx], 0); evtpoll_detach(tbs->ep, conn->fd); closesocket(conn->fd); conn->fd = INVALID_SOCKET; } WSACloseEvent(tbs->conn_evt[conn->idx]); tbs->conn_evt[conn->idx] = NULL; tbs->conn[conn->idx] = NULL; if (conn->idx != tbs->conn_cnt-1) { tbs->conn_evt[conn->idx] = tbs->conn_evt[tbs->conn_cnt-1]; tbs->conn[conn->idx] = tbs->conn[tbs->conn_cnt-1]; tbs->conn[conn->idx]->idx = conn->idx; } tbs->conn_cnt--; iobuffer_queue_destroy(conn->tx_queue); conn->tx_queue = NULL; if (conn->rx_pkt) { iobuffer_dec_ref(conn->rx_pkt); conn->rx_pkt = NULL; } free(conn); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("sock connection destroy!"); } IMPLEMENT_REF_COUNT_STATIC(sock_connection, sock_connection, ref_cnt, __destroy_connection) static __inline int is_worker_thread(sp_tbs_t *tbs) { return GetCurrentThreadId() == tbs->work_thread_id; } static sub_session *find_sub_session(sock_connection *conn, int conn_id) { sub_session *tpos; struct hlist_node *pos; int slot = ((unsigned int )conn_id) % SESSION_BUCKET_SIZE; // bugfix, assure slot positive hlist_for_each_entry(tpos, pos, &conn->session_bucket[slot], sub_session, hentry) { if (tpos->id == conn_id) return tpos; } return NULL; } static void __uac_on_connect(sp_ses_uac_t *uac, int error, void *user_data) { sub_session *session = (sub_session*)user_data; if (is_worker_thread(session->conn->tbs)) { uac_on_connect(session->conn->tbs, session->conn, session, error); } else { post_msg(session->conn->tbs, MSG_UAC_ON_CONNECT, (param_size_t)session, error); } } static void __uac_on_close(sp_ses_uac_t *uac, int error, void *user_data) { sub_session *session = (sub_session*)user_data; if (is_worker_thread(session->conn->tbs)) { uac_on_close(session->conn->tbs, session->conn, session, error); } else { post_msg(session->conn->tbs, MSG_UAC_ON_CLOSE, (param_size_t)session, error); } } static void __uac_on_destroy(sp_ses_uac_t *uac, void *user_data) { sub_session *session = (sub_session*)user_data; TOOLKIT_ASSERT(session->uac == NULL); if (is_worker_thread(session->conn->tbs)) { uac_on_destroy(session->conn->tbs, session->conn, session); } else { post_msg(session->conn->tbs, MSG_UAC_ON_DESTROY, (param_size_t)session, 0); } } static void __uac_on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data) { sub_session *session = (sub_session*)user_data; iobuffer_t *pkt = NULL; int tsx_id; int user_error = 0; int slen = 0; char* msg_str = NULL; if (error) { if (ans_pkt) { pkt = *ans_pkt; iobuffer_reset(pkt); *ans_pkt = NULL; } else { pkt = iobuffer_create(-1, -1); } } else { TOOLKIT_ASSERT(ans_pkt); pkt = *ans_pkt; *ans_pkt = NULL; iobuffer_read(pkt, IOBUF_T_I4, &error, NULL); iobuffer_read(pkt, IOBUF_T_I4, &user_error, NULL); if (error) { iobuffer_read(pkt, IOBUF_T_STR, NULL, &slen); if (slen) { msg_str = (char*)malloc(slen + 1); iobuffer_read(pkt, IOBUF_T_WSTR, msg_str, NULL); } iobuffer_reset(pkt); } } tsx_id = sp_tsx_uac_get_id(tsx); if (is_worker_thread(session->conn->tbs)) { uac_on_ans(session->conn->tbs, session->conn, session, tsx_id, error, user_error, msg_str, end, &pkt); if (pkt) iobuffer_dec_ref(pkt); } else { iobuffer_format_write_head(pkt, "s4444", msg_str, &end, &user_error, &error, &tsx_id); post_msg(session->conn->tbs, MSG_UAC_ON_ANS, (param_size_t)session, (param_size_t)pkt); } if (error || end) { sp_tsx_uac_close(tsx); sp_tsx_uac_destroy(tsx); } } static void uac_on_connect(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error) { WLog_DBG(TAG, "Enter %s with error: %d", __FUNCTION__, error); if (!error) { post_session_ack(tbs, conn, session->id, session->tsx_id, error, NULL); } else { sp_ses_uac_t *uac = session->uac; post_session_ack(tbs, conn, CONN_INVALID_ID, session->tsx_id, error, get_err_msg(error)); session->uac = NULL; DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("sub session created failed %d! ses_id = %d, tsx_id = %d", error, session->id, session->tsx_id); if (uac) { sp_ses_uac_close(uac); sp_ses_uac_destroy(uac); } } WLog_DBG(TAG, "Leave %s", __FUNCTION__); } static void uac_on_close(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error) { if (session->uac) { sp_ses_uac_t *uac = session->uac; post_session_end(tbs, conn, session->id); session->uac = NULL; if (error != Error_Closed) sp_ses_uac_close(uac); sp_ses_uac_destroy(uac); } } static void uac_on_ans(sp_tbs_t* tbs, sock_connection* conn, sub_session* session, int tsx_id, int error, int user_error, const char* msg, int end, iobuffer_t** ans_pkt) { if (msg == NULL || strlen(msg) == 0) post_req_ack_ex(tbs, conn, session->id, tsx_id, end, error, user_error, get_user_err_msg(user_error), ans_pkt); else post_req_ack_ex(tbs, conn, session->id, tsx_id, end, error, user_error, CSimpleStringA2W(msg).GetData(), ans_pkt); } static void uac_on_destroy(sp_tbs_t *tbs, sock_connection *conn, sub_session *session) { destroy_sub_session(tbs, conn, session); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("destroy sub session(id:%d) uac %s %d", session->id, _GetFileName(__FILE__), __LINE__); } static int create_sub_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name, sub_session **p_session) { sp_env_t *env = sp_get_env(); sp_entity_t *ent = sp_mod_mgr_find_entity_by_name(env->mod_mgr, entity_name); if (ent) { int rc; sub_session *session = MALLOC_T(sub_session); sp_ses_uac_callback cb; iobuffer_t *pkt = NULL; char *param = NULL; session->id = tbs->sub_session_seq++; session->conn = conn; session->tsx_id = tsx_id; hlist_add_head(&session->hentry, &conn->session_bucket[session->id % SESSION_BUCKET_SIZE]); conn->session_cnt++; sock_connection_inc_ref(conn); cb.user_data = session; cb.on_close = &__uac_on_close; cb.on_destroy = &__uac_on_destroy; cb.on_connect = &__uac_on_connect; rc = sp_ses_uac_create(tbs->ses_mgr, ent->mod->cfg->idx, ent->cfg->idx, &cb, &session->uac); if (rc != 0) { goto on_error; } pkt = iobuffer_create(-1, -1); if (function_name == NULL || strlen(function_name) == 0) { param = class_name; } else { param = strdup_printf("%s::%s", class_name, function_name); } iobuffer_write(pkt, IOBUF_T_STR, param, -1); if (param != class_name) { FREE(param); } rc = sp_ses_uac_async_connect(session->uac, 10000, &pkt); if (pkt) iobuffer_dec_ref(pkt); if (rc != 0) { goto on_error; } on_error: if (rc != 0) { conn->session_cnt--; hlist_del(&session->hentry); sock_connection_dec_ref(conn); free(session); } else { *p_session = session; } return rc; } else { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("cannot find entity from mod mgr by name: %s", entity_name); return Error_NotExist; } } static void destroy_sub_session(sp_tbs_t *tbs, sock_connection *conn, sub_session *session) { conn->session_cnt--; hlist_del(&session->hentry); if (session->uac) { sp_ses_uac_t *uac = session->uac; session->uac = NULL; sp_ses_uac_destroy(uac); } sock_connection_dec_ref(conn); free(session); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("destroy sub session(id:%d) %s %d", session->id, _GetFileName(__FILE__), __LINE__); } static void post_session_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int err, const wchar_t*err_msg) { if (conn->fd != INVALID_SOCKET) { int len; iobuffer_t *pkt = iobuffer_create(-1, -1); int type = PKT_TYPE_SESSIONACK; #if defined(_MSC_VER) iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err_msg); #else iobuffer_format_write(pkt, "4444m", &type, &conn_id, &tsx_id, &err, err_msg); #endif //_MSC_VER len = iobuffer_get_length(pkt)-4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); on_write(tbs, conn); if (err != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("send PKT_TYPE_SESSIONACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err)); } } } static void post_req_ack_ex(sp_tbs_t* tbs, sock_connection* conn, int conn_id, int tsx_id, int end, int err, int uc, const wchar_t* err_msg, iobuffer_t** ack_pkt) { if (conn->fd != INVALID_SOCKET) { iobuffer_t* pkt = NULL; int len; char bend = !!end; int type = PKT_TYPE_REQACK; if (ack_pkt) { pkt = *ack_pkt; *ack_pkt = NULL; } if (!pkt) { pkt = iobuffer_create(-1, -1); } #if defined(_MSC_VER) iobuffer_format_write_head(pkt, "w441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type); #else iobuffer_format_write_head(pkt, "m441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type); #endif //_MSC_VER len = iobuffer_get_length(pkt) - 4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); on_write(tbs, conn); if (err != 0) { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_REQACK at EX tsx_id=%d, err=%s", tsx_id, sp_strerror(err)); } } } static void post_req_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int end, int err, const wchar_t *err_msg, iobuffer_t **ack_pkt) { if (conn->fd != INVALID_SOCKET) { iobuffer_t *pkt = NULL; int len; char bend = !!end; int type = PKT_TYPE_REQACK; int uc = 0; if (ack_pkt) { pkt = *ack_pkt; *ack_pkt = NULL; } if (!pkt) { pkt = iobuffer_create(-1, -1); } #if defined(_MSC_VER) iobuffer_format_write_head(pkt, "w441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type); #else iobuffer_format_write_head(pkt, "m441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type); #endif //_MSC_VER len = iobuffer_get_length(pkt)-4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); on_write(tbs, conn); if (err != 0) { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_REQACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err)); } } } static void post_set_var_ack(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, int err, const wchar_t *err_msg) { if (conn->fd != INVALID_SOCKET) { int len; int type = PKT_TYPE_SET_VAR_ACK; iobuffer_t *pkt = iobuffer_create(-1, -1); int conn_id = CONN_INVALID_ID; #if defined(_MSC_VER) iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err_msg); #else iobuffer_format_write(pkt, "4444m", &type, &conn_id, &tsx_id, &err, err_msg); #endif //_MSC_VER len = iobuffer_get_length(pkt) - 4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); on_write(tbs, conn); if (err != 0) { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_SET_VAR_ACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err)); } } } static void post_get_var_ack(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, int err, const wchar_t *err_msg, const wchar_t*value) { if (conn->fd != INVALID_SOCKET) { int len; int type = PKT_TYPE_GET_VAR_ACK; iobuffer_t *pkt = iobuffer_create(-1, -1); int conn_id = CONN_INVALID_ID; #if defined(_MSC_VER) iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err == 0 ? value : err_msg); #else iobuffer_format_write(pkt, "4444m", &type, &conn_id, &tsx_id, &err, err == 0 ? value : err_msg); #endif //_MSC_VER len = iobuffer_get_length(pkt) - 4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); on_write(tbs, conn); if (err != 0) { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_GET_VAR_ACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err)); } } } static void post_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id) { if (conn->fd != INVALID_SOCKET) { int len; iobuffer_t *pkt = iobuffer_create(-1, -1); int type = PKT_TYPE_SESSIONEND; iobuffer_format_write(pkt, "44", &type, &conn_id); len = iobuffer_get_length(pkt)-4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("send PKT_TYPE_SESSIONEND"); on_write(tbs, conn); } } static void on_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name) { sub_session *session = NULL; int rc; rc = create_sub_session(tbs, conn, tsx_id, entity_name, function_name, class_name, &session); if (rc == 0) { DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("start create sub session!"); //post_session_ack(tbs, conn, session->id, tsx_id, rc, NULL); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create_sub_session failed! entity: %s, err = %d %s %d", entity_name, rc, _GetFileName(__FILE__), __LINE__); post_session_ack(tbs, conn, CONN_INVALID_ID, tsx_id, rc, get_err_msg(rc)); } } static reg_entry *find_reg_entry(sock_connection *conn, int register_id) { reg_entry *tpos; struct hlist_node *pos; int slot = ((unsigned int)register_id) % REGISTER_BUCKET_SIZE; // bugfix, assure slot positive hlist_for_each_entry(tpos, pos, &conn->register_bucket[slot], reg_entry, hentry) { if (tpos->register_id == register_id) { return tpos; } } return NULL; } static void post_event(sp_tbs_t *tbs, reg_entry *entry, int conn_id, int tsx_id, iobuffer_t **evt_pkt) { if (entry->conn->fd != INVALID_SOCKET) { int type; int len; iobuffer_t *pkt = *evt_pkt; *evt_pkt = NULL; type = PKT_TYPE_EVENT_DATA; iobuffer_write_head(pkt, IOBUF_T_I4, &tsx_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &conn_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &type, 0); len = iobuffer_get_length(pkt)-4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(entry->conn->tx_queue, pkt); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("send PKT_TYPE_EVENT_DATA tsx_id=%d", tsx_id); on_write(tbs, entry->conn); } sp_bcm_listener_dec_ref(entry->listener); //@ } static void __bcm_on_message_raw(sp_bcm_listener_t *listener, int from_client_id, iobuffer_t **msg_pkt, void *user_data) { reg_entry *entry = (reg_entry*)user_data; iobuffer_t *pkt = *msg_pkt; *msg_pkt = NULL; sp_bcm_listener_inc_ref(listener); //@ post_msg(entry->conn->tbs, MSG_ON_EVENT, (param_size_t)entry, (param_size_t)pkt); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("post event from_client_id:%d", from_client_id); } static void __bcm_on_destroy(sp_bcm_listener_t *listener, void *user_data) { reg_entry *entry = (reg_entry*)user_data; entry->conn->register_cnt--; hlist_del(&entry->hentry); sock_connection_dec_ref(entry->conn); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("unregister event ok! register_id = %d", entry->register_id); free(entry); } static void on_register_event(sp_tbs_t *tbs, sock_connection *conn, int register_id, char *entity_name, char *function_name) { sp_env_t *env = sp_get_env(); sp_entity_t *ent = sp_mod_mgr_find_entity_by_name(env->mod_mgr, entity_name); if (ent) { int rc; sp_svc_t *svc = sp_ses_mgr_get_svc(tbs->ses_mgr); reg_entry *entry; sp_bcm_listener_cb cb; if (find_reg_entry(conn, register_id)) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("register entry already exist, register id %d", register_id); return; } entry = ZALLOC_T(reg_entry); entry->register_id = register_id; entry->entity_id = ent->cfg->idx; hlist_add_head(&entry->hentry, &conn->register_bucket[((unsigned int)register_id) % REGISTER_BUCKET_SIZE]); // bugfix, assure index positive conn->register_cnt++; sock_connection_inc_ref(conn); entry->conn = conn; cb.on_message_raw = &__bcm_on_message_raw; cb.on_message = NULL; cb.on_destroy = &__bcm_on_destroy; cb.user_data = entry; rc = sp_bcm_listener_create(svc, entry->entity_id, function_name, &cb, &entry->listener); if (rc != 0) { goto on_error; } rc = sp_bcm_listener_subscribe(entry->listener, &entry->uid); if (rc != 0) { goto on_error; } on_error: if (rc != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("create bcm listener failed! entity = %s, register_id = %d", ent->cfg->name, register_id); conn->register_cnt--; hlist_del_init(&entry->hentry); if (entry->listener) { sp_bcm_listener_destroy(entry->listener); } free(entry); sock_connection_dec_ref(conn); } else { DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("register event for %s ok! register_id = %d", ent->cfg->name, register_id); } } else { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("cannot find entity %s!", entity_name); } } static void on_unregister_event(sp_tbs_t *tbs, sock_connection *conn, int register_id) { reg_entry *entry = find_reg_entry(conn, register_id); if (entry) { if (sp_bcm_listener_unsubscribe(entry->listener) == 0) { sp_bcm_listener_destroy(entry->listener); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("entry already unregistered! register_id = %d", register_id); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("%s, entry not found, registered_id = %d", __FUNCTION__, register_id); } } static void on_log_event(sp_tbs_t *tbs, sock_connection *conn, int severity_level, int user_code, const char *msg, const linkContext &t_context) { sp_log_client_logWithLink(tbs->log_client, Log_Event, severity_level, 0, user_code, 0, 0, NULL == msg ? "" : msg, NULL == msg ? 0 : strlen(msg) , t_context.bussinessId.GetData(), t_context.traceId.GetData(), t_context.spanId.GetData(), t_context.parentSpanId.GetData()); } static void on_log_warn(sp_tbs_t *tbs, sock_connection *conn, int severity_level, int user_code, const char *msg, const linkContext& t_context) { sp_log_client_logWithLink(tbs->log_client, Log_Warning, severity_level, 0, user_code, 0, 0, NULL == msg ? "" : msg, NULL == msg ? 0 : strlen(msg) , t_context.bussinessId.GetData(), t_context.traceId.GetData(), t_context.spanId.GetData(), t_context.parentSpanId.GetData()); } static void on_req(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt) { sub_session *session = find_sub_session(conn, conn_id); if (session) { if (session->uac) { int rc; sp_tsx_uac_t *tsx; sp_tsx_uac_callback cb; cb.user_data = session; cb.on_ans = &__uac_on_ans; cb.on_destroy = NULL; rc = sp_tsx_uac_create(session->uac, tsx_id, method_id, method_sig, &cb, &tsx); if (rc == 0) { rc = sp_tsx_uac_async_req(tsx, timeout, req_pkt); if (rc == 0) { WLog_DBG(TAG, "send req: tsx_id=%d, method_id=%d, method_sig=%d, pkt_size=%d", tsx_id, method_id, method_sig, *req_pkt == NULL ? 0 : iobuffer_get_length(*req_pkt)); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("async req tsx failed! err = %d %s %d", rc, _GetFileName(__FILE__), __LINE__); post_req_ack(tbs, conn, conn_id, tsx_id, 1, rc, get_err_msg(rc), NULL); sp_tsx_uac_destroy(tsx); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("create req tsx failed! %s %d", _GetFileName(__FILE__), __LINE__); post_req_ack(tbs, conn, conn_id, tsx_id, 1, rc, get_err_msg(rc), NULL); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("session already closed! %s %d", _GetFileName(__FILE__), __LINE__); post_req_ack(tbs, conn, conn_id, tsx_id, 1, Error_NetBroken, get_err_msg(Error_NetBroken), NULL); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__); post_req_ack(tbs, conn, conn_id, tsx_id, 1, Error_Break, get_err_msg(Error_Break), NULL); } } static void on_info(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt) { sub_session *session = find_sub_session(conn, conn_id); if (session) { if (session->uac) { int rc = sp_ses_uac_send_info(session->uac, method_id, method_sig, info_pkt); if (rc != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("uac send info failed! error = %d %s %d", rc, _GetFileName(__FILE__), __LINE__); } else { //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("send info: method_id=%d, method_sig=%d, pkt_size=%d", method_id, method_sig, // *info_pkt == NULL ? 0 : iobuffer_get_length(*info_pkt)); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("session already closed! info ignored ! %s %d", _GetFileName(__FILE__), __LINE__); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__); } } static void on_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id) { sub_session *session = find_sub_session(conn, conn_id); if (session) { if (session->uac) { sp_ses_uac_t *uac = session->uac; session->uac = NULL; if (sp_ses_uac_close(uac) == 0) { sp_ses_uac_destroy(uac); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("duplicated uac close!"); } } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__); } } static void on_stop(sp_tbs_t *tbs) { int i; WSAEventSelect(tbs->server_fd, tbs->conn_evt[SRV_EVT_IDX], 0); closesocket(tbs->server_fd); tbs->server_fd = INVALID_SOCKET; for (i = tbs->conn_cnt-1; i > SRV_EVT_IDX; --i) { sock_connection *conn = tbs->conn[i]; TOOLKIT_ASSERT(conn); on_close(tbs, conn); } } static void on_close(sp_tbs_t *tbs, sock_connection *conn) { int i; WLog_WARN(TAG, "socket on close is invoked!"); if (conn->fd != INVALID_SOCKET) { WSAEventSelect(conn->fd, tbs->conn_evt[conn->idx], 0); closesocket(conn->fd); conn->fd = INVALID_SOCKET; } for (i = 0; i < SESSION_BUCKET_SIZE; ++i) { sub_session *tpos; struct hlist_node *pos, *n; hlist_for_each_entry_safe(tpos, pos, n, &conn->session_bucket[i], sub_session, hentry) { if (tpos->uac) { sp_ses_uac_t *uac = tpos->uac; tpos->uac = NULL; if (sp_ses_uac_close(uac) == 0) { sp_ses_uac_destroy(uac); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("duplicated uac close! %s %d", _GetFileName(__FILE__), __LINE__); } } } } for (i = 0; i < REGISTER_BUCKET_SIZE; ++i) { reg_entry *tpos; struct hlist_node *pos, *n; hlist_for_each_entry_safe(tpos, pos, n, &conn->register_bucket[i], reg_entry, hentry) { if (tpos->listener) { if (sp_bcm_listener_unsubscribe(tpos->listener) == 0) { sp_bcm_listener_destroy(tpos->listener); } } } } sock_connection_dec_ref(conn); } static void on_set_var_req(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *name, char *value) { int rc; if (name == NULL) { rc = Error_Param; post_set_var_ack(tbs, conn, tsx_id, rc, L"param [name] is null"); return; } rc = sp_var_client_set((sp_var_client_t*)tbs->var_client, name, value, 0); post_set_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc)); } static void on_get_var_req(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *name) { int len = 260; int rc; char szTmp[260] = { 0 }; wchar_t wszValue[260] = { 0 }; if (name == NULL) { rc = Error_Param; post_get_var_ack(tbs, conn, tsx_id, rc, L"param [name] is null", NULL); return; } rc = sp_var_client_lock(tbs->var_client); if (rc == 0) { rc = sp_var_client_get((sp_var_client_t*)tbs->var_client, name, szTmp, &len); } sp_var_client_unlock(tbs->var_client); if (rc == 0) { len = 260; toolkit_mbs2wcs(szTmp, &wszValue[0], len); post_get_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc), wszValue); } else post_get_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc), NULL); } static void process_tcp_pkt(sp_tbs_t *tbs, sock_connection *conn, iobuffer_t **p_pkt) { iobuffer_t *pkt = *p_pkt; int type; iobuffer_read(pkt, IOBUF_T_I4, &type, NULL); link_context cur; char linkStr[512] = ""; bool withLinkContex = !!(type & PKT_TYPE_CONTROL_LINKCONTEXT); int linkId = 0; type = type & 0x0000FFFF; //clear control part switch (type) { case PKT_TYPE_INFO: { int conn_id; int method_id; int method_sig; if (withLinkContex) { iobuffer_format_read(pkt, "444mmmm", &conn_id, &method_id, &method_sig, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); iobuffer_set_linkInfo(pkt, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); } else { iobuffer_format_read(pkt, "444", &conn_id, &method_id, &method_sig); } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_INFO conn_id=%d method_id=%d method_sig=%d, %s", conn_id, method_id, method_sig, linkStr); on_info(tbs, conn, conn_id, method_id, method_sig, &pkt); } break; case PKT_TYPE_REQ: { int conn_id; int tsx_id; int method_id; int method_sig; int timeout; if (withLinkContex) { iobuffer_format_read(pkt, "44444mmmm", &conn_id, &tsx_id, &method_id, &method_sig, &timeout, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); iobuffer_set_linkInfo(pkt, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); } else { iobuffer_format_read(pkt, "44444", &conn_id, &tsx_id, &method_id, &method_sig, &timeout); } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_REQ conn_id=%d tsx_id=%d method_id=%d method_sig=%d timeout=%d, %s", conn_id, tsx_id, method_id, method_sig, timeout, linkStr); on_req(tbs, conn, conn_id, tsx_id, method_id, method_sig, timeout, &pkt); } break; case PKT_TYPE_SESSION: { int tsx_id; wchar_t *wentity_name; wchar_t*wfunction_name; wchar_t*wclass_name; char *entity_name = NULL; char *function_name = NULL; char *class_name = NULL; int n; if (withLinkContex) { iobuffer_format_read(pkt, "4mmmmmmm", &tsx_id , &wentity_name, &wfunction_name, &wclass_name, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else { iobuffer_format_read(pkt, "4mmm", &tsx_id, &wentity_name, &wfunction_name, &wclass_name); } n = toolkit_wcs2mbs(wentity_name, NULL, 0); WLog_DBG(TAG, "wtm len for wentity_name: %d", n); if (wentity_name && n > 0) { entity_name = (char*)malloc(n); toolkit_wcs2mbs(wentity_name, entity_name, n); } FREE(wentity_name); n = toolkit_wcs2mbs(wfunction_name, NULL, 0); WLog_DBG(TAG, "wtm len for wfunction_name: %d", n); if (wfunction_name && n > 0) { function_name = (char*)malloc(n); toolkit_wcs2mbs(wfunction_name, function_name, n); } FREE(wfunction_name); n = toolkit_wcs2mbs(wclass_name, NULL, 0); WLog_DBG(TAG, "wtm len for wclass_name: %d", n); if (wclass_name && n > 0) { class_name = (char*)malloc(n); toolkit_wcs2mbs(wclass_name, class_name, n); } FREE(wclass_name); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SESSION tsx_id=%d entity_name=%s function_name=%s class_name=%s", tsx_id, entity_name ? entity_name : "", function_name ? function_name : "" , class_name ? class_name : ""); on_session(tbs, conn, tsx_id, entity_name, function_name, class_name); if (entity_name) free(entity_name); if (function_name) free(function_name); if (class_name) free(class_name); } break; case PKT_TYPE_SESSIONEND: { int conn_id; if (withLinkContex) { iobuffer_format_read(pkt, "4mmmm", &conn_id, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else { iobuffer_read(pkt, IOBUF_T_I4, &conn_id, NULL); } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SESSIONEND conn_id=%d", conn_id); on_session_end(tbs, conn, conn_id); } break; case PKT_TYPE_REGISTER_EVENT: { int register_id; wchar_t* wentity_name; char* entity_name = NULL; wchar_t* wfunction_name; char *function_name = NULL; int n; if (withLinkContex) { iobuffer_format_read(pkt, "4mmmmmm", ®ister_id, &wentity_name, &wfunction_name, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else { iobuffer_format_read(pkt, "4mm", ®ister_id, &wentity_name, &wfunction_name); } n = toolkit_wcs2mbs(wentity_name, NULL, 0); if (wentity_name && n > 0) { entity_name = (char*)malloc(n); toolkit_wcs2mbs(wentity_name, entity_name, n); } FREE(wentity_name); n = toolkit_wcs2mbs(wfunction_name, NULL, 0); if (wfunction_name && n > 0) { function_name = (char*)malloc(n); toolkit_wcs2mbs(wfunction_name, function_name, n); } FREE(wfunction_name); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_REGISTER_EVENT register_id=%d, entity_name=%s, function_name=%s", register_id, entity_name ? entity_name : "", function_name ? function_name : ""); on_register_event(tbs, conn, register_id, entity_name, function_name); if (entity_name) free(entity_name); if (function_name) free(function_name); } break; case PKT_TYPE_UNREGISTER_EVENT: { int register_id; if (withLinkContex) { iobuffer_format_read(pkt, "4mmmm", ®ister_id, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else { iobuffer_format_read(pkt, "4", ®ister_id); } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_UNREGISTER_EVENT register_id=%d", register_id); on_unregister_event(tbs, conn, register_id); } break; case PKT_TYPE_LOG_EVENT: { int severity_level; int user_code; wchar_t* wmsg = NULL; char *msg = NULL; if (withLinkContex) { iobuffer_format_read(pkt, "44mmmmm", &severity_level, &user_code, &wmsg, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else { iobuffer_format_read(pkt, "44m", &severity_level, &user_code, &wmsg); } if (wmsg) { int n = toolkit_wcs2mbs(wmsg, NULL, 0); const int rn = n > 0 ? n : 1; msg = (char*)malloc(rn); memset(msg, '\0', sizeof(char) * rn); toolkit_wcs2mbs(wmsg, msg, n); FREE(wmsg); } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_LOG_EVENT severity_level=%d, user_code=0x%08x, msg=%s", severity_level, user_code, msg ? msg : ""); linkContext t_Context(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); on_log_event(tbs, conn, severity_level, user_code, msg, t_Context); if (msg) { free(msg); } } break; case PKT_TYPE_LOG_WARN: { int severity_level; int user_code; wchar_t* wmsg = NULL; char *msg = NULL; if (withLinkContex) { iobuffer_format_read(pkt, "44mmmmm", &severity_level, &user_code, &wmsg, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else { iobuffer_format_read(pkt, "44m", &severity_level, &user_code, &wmsg); } if (wmsg) { int n = toolkit_wcs2mbs(wmsg, NULL, 0); const int rn = n > 0 ? n : 1; msg = (char*)malloc(rn); memset(msg, '\0', sizeof(char) * rn); toolkit_wcs2mbs(wmsg, msg, n); FREE(wmsg); } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_LOG_WARN severity_level=%d, user_code=0x%08x, msg=%s", severity_level, user_code, msg ? msg : ""); linkContext t_Context(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); on_log_warn(tbs, conn, severity_level, user_code, msg, t_Context); if (msg) free(msg); } break; case PKT_TYPE_SET_VAR_REQ: { int tsx_id = 0; wchar_t* wname = NULL; wchar_t* wvalue = NULL; char *name = NULL; char *value = NULL; if (withLinkContex) { iobuffer_format_read(pkt, "4mmmmmm", &tsx_id, &wname, &wvalue, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else { iobuffer_format_read(pkt, "4mm", &tsx_id, &wname, &wvalue); } if (wname) { int n = toolkit_wcs2mbs(wname, NULL, 0); const int rn = n > 0 ? n : 1; name = (char*)malloc(rn); memset(name, '\0', sizeof(char)* rn); toolkit_wcs2mbs(wname, name, n); FREE(wname); } if (wvalue) { int n = toolkit_wcs2mbs(wvalue, NULL, 0); const int rn = n > 0 ? n : 1; value = (char*)malloc(rn); memset(value, '\0', sizeof(char)* rn); toolkit_wcs2mbs(wvalue, value, n); FREE(wvalue); } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SET_VAR_REQ tsx_id=%d, name=%s, value=%s, %s" , tsx_id, name ? name : "", value ? value : "", linkStr); on_set_var_req(tbs, conn, tsx_id, name, value); if (name) free(name); if (value) free(value); } break; case PKT_TYPE_GET_VAR_REQ: { int tsx_id = 0; wchar_t*wname = NULL; char *name = NULL; if (withLinkContex) { iobuffer_format_read(pkt, "4mmmmm", &tsx_id, &wname, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else { iobuffer_format_read(pkt, "4m", &tsx_id, &wname); } if (wname) { int n = toolkit_wcs2mbs(wname, NULL, 0); const int rn = n > 0 ? n : 1; name = (char*)malloc(rn); memset(name, '\0', sizeof(char)* rn); toolkit_wcs2mbs(wname, name, n); FREE(wname); } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_GET_VAR_REQ tsx_id=%d, name=%s, %s", tsx_id, name ? name : "", linkStr); on_get_var_req(tbs, conn, tsx_id, name); if (name) free(name); } break; default: DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("recv unknown pkt type: %d", type); //TOOLKIT_ASSERT(0); break; } *p_pkt = NULL; if (pkt) iobuffer_dec_ref(pkt); } static void on_read(sp_tbs_t *tbs, sock_connection *conn) { WLog_DBG(TAG, "Enter %s", __FUNCTION__); int t = -404; if (conn->rx_state == RECV_STATE_HEADER) { do { t = recv(conn->fd, &conn->rx_hdr_buf[conn->rx_hdr_buf_len], RX_BUF_SIZE - conn->rx_hdr_buf_len, 0); } while (t == -1 && errno == EINTR); if (t > 0) { int offset = 0; conn->rx_hdr_buf_len += t; while (conn->rx_hdr_buf_len-offset >= HDR_LEN) { iobuffer_t *pkt; memcpy(&conn->rx_header_len, conn->rx_hdr_buf+offset, HDR_LEN); conn->rx_header_len += 4; // fixed hdr not include type!!! WLog_DBG(TAG, "rx header len(with 4 sizeof(type)): %d", conn->rx_header_len); offset += HDR_LEN; pkt = iobuffer_create(-1, conn->rx_header_len); if (conn->rx_hdr_buf_len-offset >= conn->rx_header_len) { memcpy(iobuffer_data(pkt, -1), conn->rx_hdr_buf+offset, conn->rx_header_len); offset += conn->rx_header_len; iobuffer_push_count(pkt, conn->rx_header_len); process_tcp_pkt(tbs, conn, &pkt); if (pkt) iobuffer_dec_ref(pkt); } else { memcpy(iobuffer_data(pkt, -1), conn->rx_hdr_buf+offset, conn->rx_hdr_buf_len-offset); iobuffer_push_count(pkt, conn->rx_hdr_buf_len-offset); offset = conn->rx_hdr_buf_len; TOOLKIT_ASSERT(conn->rx_pkt == NULL); conn->rx_pkt = pkt; conn->rx_state = RECV_STATE_BODY; } } if (offset == conn->rx_hdr_buf_len) { conn->rx_hdr_buf_len = 0; } else { if (offset > 0) { TOOLKIT_ASSERT(conn->rx_state == RECV_STATE_HEADER); memmove(&conn->rx_hdr_buf[0], &conn->rx_hdr_buf[offset], conn->rx_hdr_buf_len-offset); conn->rx_hdr_buf_len -= offset; } } } } else if (conn->rx_state == RECV_STATE_BODY) { int offset; TOOLKIT_ASSERT(conn->rx_pkt); offset = iobuffer_get_length(conn->rx_pkt); do { t = recv(conn->fd, iobuffer_data(conn->rx_pkt, -1), conn->rx_header_len - offset, 0); } while (t == -1 && errno == EINTR); if (t > 0) { iobuffer_push_count(conn->rx_pkt, t); if (offset+t == conn->rx_header_len) { process_tcp_pkt(tbs, conn, &conn->rx_pkt); if (conn->rx_pkt) { iobuffer_dec_ref(conn->rx_pkt); conn->rx_pkt = NULL; } conn->rx_hdr_buf_len = 0; conn->rx_state = RECV_STATE_HEADER; } } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot go here, bug! file: %s, line: %d", _GetFileName(__FILE__), __LINE__); TOOLKIT_ASSERT(0); } if (t == 0) { on_close(tbs, conn); } else if (t == -1) { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("connection read failed: %s", strerror(errno)); on_close(tbs, conn); } WLog_DBG(TAG, "Leave %s", __FUNCTION__); } static void on_write(sp_tbs_t *tbs, sock_connection *conn) { if (conn->fd == INVALID_SOCKET) return; WLog_DBG(TAG, "Enter %s", __FUNCTION__); int t = -404; while (iobuffer_queue_count(conn->tx_queue) > 0) { iobuffer_t *pkt = iobuffer_queue_head(conn->tx_queue); int sended = 0; int total = iobuffer_get_length(pkt); while (sended < total) { do { t = send(conn->fd, iobuffer_data(pkt, 0), total - sended, 0); } while (t < 0 && errno == EINTR); if (t > 0) { sended += t; iobuffer_pop_count(pkt, t); } else { break; } } if (sended < total) { break; } else { iobuffer_queue_deque(conn->tx_queue); iobuffer_dec_ref(pkt); } } WLog_DBG(TAG, "Leave %s", __FUNCTION__); } static void on_accept(sp_tbs_t *tbs) { int rc; rc = create_connection(tbs); if (rc != 0) { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("create connection failed! err = %d %s %d", rc, _GetFileName(__FILE__), __LINE__); } else { DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("create connection ok! %s %d", _GetFileName(__FILE__), __LINE__); } } static void post_msg(sp_tbs_t *tbs, int msg, param_size_t param1, param_size_t param2) { uint64_t wdata = 0; int rc; cmd_entry *e = MALLOC_T(cmd_entry); e->msg = msg; e->param1 = param1; e->param2 = param2; if (0 != evtpoll_subscribe(tbs->ep, EV_READ, tbs->msg_fd, (void*)(long)MSG_EVT_IDX, NULL)) { WLog_ERR(TAG, "tbs subscribe msg eventfd failed."); free(e); return; } spinlock_enter(&tbs->cmd_list_lock, -1); list_add_tail(&e->entry, &tbs->cmd_list); spinlock_leave(&tbs->cmd_list_lock); wdata = 1; do { rc = write(tbs->msg_fd, &wdata, sizeof wdata); } while (rc < 0 && rc == EINTR); if (rc == -1) { WLog_ERR(TAG, "write to eventfd failed: %d", errno); return; } } static unsigned int __stdcall server_proc(void *arg) { sp_tbs_t *tbs = (sp_tbs_t*)arg; struct sockaddr_in addr = {0}; int istop = 0; int fixed_conn; WLog_DBG(TAG, "enter server listern and deal business thread"); TOOLKIT_ASSERT(tbs->server_fd == INVALID_SOCKET); tbs->server_fd = _socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (tbs->server_fd == INVALID_SOCKET) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("create list fd socket failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError()); tbs->ready_evt_result = Error_Resource; goto on_error; } if (make_fd_cloexec(tbs->server_fd, 1) != 0) goto on_error; { long val = 1; _setsockopt(tbs->server_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&val, sizeof(val)); } addr.sin_family = AF_INET; addr.sin_addr.s_addr = tbs->server_ip; addr.sin_port = tbs->server_port; if (_bind(tbs->server_fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("bind address failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError()); tbs->ready_evt_result = Error_AlreadyExist; goto on_error; } WLog_DBG(TAG, "to attach server fd: %d", tbs->server_fd); if (evtpoll_attach(tbs->ep, tbs->server_fd)) { WLog_ERR(TAG, "%s: server attach evtpoll for %d failed!", __FUNCTION__, tbs->server_fd); goto on_error; } if (0 != evtpoll_subscribe(tbs->ep, EV_ACCEPT, tbs->server_fd, (void*)SRV_EVT_IDX, NULL)) { WLog_ERR(TAG, "%s: server attach evtpoll for %d failed!", __FUNCTION__, tbs->server_fd); goto on_error; } if (_listen(tbs->server_fd, 5) != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("winsock::listen failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError()); tbs->ready_evt_result = Error_AlreadyExist; goto on_error; } tbs->conn_cnt++; DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("start tbs server ok!"); tbs->ready_evt_result = 0; SetEvent(tbs->ready_evt); fixed_conn = tbs->conn_cnt; while (!istop || tbs->conn_cnt > fixed_conn) { int i; const int evt_cnt = tbs->conn_cnt; struct epoll_event events[MAX_EPOLL_EVENT]; const int nfds = evtpoll_wait(tbs->ep, events, MAX_EPOLL_EVENT, MAX_TIMEOUT); if (nfds == TOOLKIT_ETIMEDOUT) { continue; } if (nfds < 0) { break; } for (i = 0; i < nfds; ++i) { struct epoll_event* pe = events + i; int n; const int wf = pe->events & EPOLLOUT ? 1 : 0; const int rf = pe->events & EPOLLIN ? 1 : 0; void* pt = NULL; WLog_INFO(TAG, "poll events[%d]::fd(0x%08X) OUT:%d, IN:%d", i, pe->data.fd, wf, rf); n = evtpoll_deal(tbs->ep, pe, &pt, 0); WLog_DBG(TAG, "evtpoll deal returned: %d, %d", n, (long)(pt)); if (0 == n) { const int idx = (int)(long)(pt); if ((idx == SRV_EVT_IDX) && rf) { DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("FD_ACCEPT trigger!"); on_accept(tbs); } if (idx == MSG_EVT_IDX) { // cmd uint64_t rdata; cmd_entry* e; do { n = read(tbs->msg_fd, &rdata, sizeof rdata); } while (n < 0 && errno == EINTR); spinlock_enter(&tbs->cmd_list_lock, -1); e = list_first_entry(&tbs->cmd_list, cmd_entry, entry); list_del(&e->entry); spinlock_leave(&tbs->cmd_list_lock); WLog_DBG(TAG, "server receive cmd msg: %d", e->msg); switch (e->msg) { case MSG_STOP: { on_stop(tbs); // ...stop istop++; } break; case MSG_UAC_ON_CONNECT: { sub_session* session = (sub_session*)e->param1; const int error = e->param2; uac_on_connect(tbs, session->conn, session, error); } break; case MSG_UAC_ON_DESTROY: { sub_session* session = (sub_session*)e->param1; uac_on_destroy(tbs, session->conn, session); } break; case MSG_UAC_ON_CLOSE: { sub_session* session = (sub_session*)e->param1; int error = e->param2; uac_on_close(tbs, session->conn, session, error); } break; case MSG_UAC_ON_ANS: { sub_session* session = (sub_session*)e->param1; iobuffer_t* ans_pkt = (iobuffer_t*)e->param2; int tsx_id; int error; int end; int user_error; char* msg_str = NULL; iobuffer_format_read(ans_pkt, "4444s", &tsx_id, &error, &user_error, &end, &msg_str); uac_on_ans(tbs, session->conn, session, tsx_id, error, user_error, msg_str, end, &ans_pkt); if (ans_pkt) iobuffer_dec_ref(ans_pkt); if (msg_str) free(msg_str); } break; case MSG_ON_EVENT: { reg_entry* entry = (reg_entry*)e->param1; iobuffer_t* evt_pkt = (iobuffer_t*)e->param2; post_event(tbs, entry, 0, entry->register_id, &evt_pkt); if (evt_pkt) iobuffer_dec_ref(evt_pkt); } break; default: DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("unknown msg type! %s %d", _GetFileName(__FILE__), __LINE__); TOOLKIT_ASSERT(0); break; } free(e); } if (idx != SRV_EVT_IDX && idx < evt_cnt) { sock_connection* conn = tbs->conn[idx]; TOOLKIT_ASSERT(conn); if (rf) { on_read(tbs, conn); } else if (wf) { on_write(tbs, conn); } } } } } DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("tbs server thread exit!"); on_error: if (tbs->server_fd != INVALID_SOCKET) { evtpoll_unsubscribe(tbs->ep, EV_ACCEPT, tbs->server_fd, 0, NULL, NULL); evtpoll_detach(tbs->ep, tbs->server_fd); closesocket(tbs->server_fd); tbs->server_fd = INVALID_SOCKET; } if (tbs->conn_evt[SRV_EVT_IDX]) { WSACloseEvent(tbs->conn_evt[SRV_EVT_IDX]); tbs->conn_evt[SRV_EVT_IDX] = NULL; tbs->conn_cnt--; } if (tbs->ready_evt_result) { SetEvent(tbs->ready_evt); } return 0; } int sp_tbs_create(sp_ses_mgr_t *ses_mgr, const char *ipaddr, unsigned short port, sp_tbs_t **p_tbs) { sp_tbs_t *tbs; sp_svc_t *svc = sp_ses_mgr_get_svc(ses_mgr); WLog_DBG(TAG, "Enter %s", __FUNCTION__); TOOLKIT_ASSERT(ipaddr); TOOLKIT_ASSERT(ses_mgr); tbs = ZALLOC_T(sp_tbs_t); tbs->ep = evtpoll_create(); if (!tbs->ep) { goto on_error_0; } tbs->msg_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); if (tbs->msg_fd == -1) { WLog_ERR(TAG, "create event fd failed: %d", errno); goto on_error_1; } tbs->conn_evt[0] = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL); tbs->ready_evt = CreateEventA(NULL, TRUE, FALSE, NULL); sp_log_client_create(svc, sp_svc_get_iom(svc), &tbs->log_client); tbs->conn_cnt = 1; tbs->server_fd = INVALID_SOCKET; tbs->server_ip = inet_addr(ipaddr); tbs->server_port = htons(port); tbs->sub_session_seq = (unsigned int)GetTickCount(); INIT_LIST_HEAD(&tbs->cmd_list); spinlock_init(&tbs->cmd_list_lock); tbs->ses_mgr = ses_mgr; sp_var_client_create(svc, &tbs->var_client); WLog_DBG(TAG, "to attach msg fd: %d", tbs->msg_fd); if (evtpoll_attach(tbs->ep, tbs->msg_fd) != 0) { WLog_ERR(TAG, "%s: attach evtpoll for %d failed!", __FUNCTION__, tbs->msg_fd); goto on_error_2; } *p_tbs = tbs; WLog_DBG(TAG, "Leave %s succ!", __FUNCTION__); return 0; on_error_2: close(tbs->msg_fd); on_error_1: evtpoll_destroy(tbs->ep); on_error_0: free(tbs); WLog_DBG(TAG, "Leave %s failed!", __FUNCTION__); return -1; } int sp_tbs_start(sp_tbs_t *tbs) { if (tbs->work_thread) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("may be already started! %s %d", _GetFileName(__FILE__), __LINE__); return Error_Duplication; } tbs->ready_evt_result = 0; ResetEvent(tbs->ready_evt); tbs->work_thread = (HANDLE)_beginthreadex(NULL, 0, &server_proc, tbs, 0, (unsigned*)&tbs->work_thread_id); if (!tbs->work_thread) { return Error_Resource; } WaitForSingleObject(tbs->ready_evt, INFINITE); if (tbs->ready_evt_result) { CloseHandle(tbs->work_thread); tbs->work_thread = NULL; } WLog_DBG(TAG, "start tbs finished! %d", tbs->ready_evt_result); return tbs->ready_evt_result; } void sp_tbs_stop(sp_tbs_t *tbs) { TOOLKIT_ASSERT(tbs->work_thread); post_msg(tbs, MSG_STOP, 0, 0); WaitForSingleObject(tbs->work_thread, INFINITE); CloseHandle(tbs->work_thread); tbs->work_thread = NULL; } void sp_tbs_destroy(sp_tbs_t *tbs) { TOOLKIT_ASSERT(tbs->conn_cnt == 1); TOOLKIT_ASSERT(tbs->server_fd == INVALID_SOCKET); CloseHandle(tbs->conn_evt[0]); CloseHandle(tbs->ready_evt); sp_log_client_destroy(tbs->log_client); sp_var_client_destroy(tbs->var_client); evtpoll_detach(tbs->ep, tbs->msg_fd); close(tbs->msg_fd); evtpoll_destroy(tbs->ep); free(tbs); }