#include "precompile.h" #include "sp_svc.h" #include "sp_def.h" #include "sp_ses.h" #include "sp_tbs.h" #include "sp_env.h" #include "sp_bcm.h" #include "sp_log.h" #include "sp_uid.h" #include "sp_var.h" #include "SpBase.h" #include "list.h" #include "hash.h" #include "array.h" #include "refcnt.h" #include "spinlock.h" #include "strutil.h" #include "fileutil.h" #include "memutil.h" #include "uuid4.h" #include "dbgutil.h" #include #include #define TAG SPBASE_TAG("tbs") #define SESSION_BUCKET_SIZE 511 #define REGISTER_BUCKET_SIZE 127 #define RX_BUF_SIZE 1024 #define MSG_EVT_IDX 0 #define SRV_EVT_IDX 1 #define MAX_TIMEOUT 30000 #define HDR_LEN 4 #define CONN_INVALID_ID -1 #define MSG_STOP 0 #define MSG_UAC_ON_CONNECT 1 #define MSG_UAC_ON_CLOSE 2 #define MSG_UAC_ON_DESTROY 3 #define MSG_UAC_ON_ANS 4 #define MSG_ON_EVENT 5 #define RECV_STATE_HEADER 0 /* already recv header length */ #define RECV_STATE_BODY 1 /* recv body */ //数据信息 0x00 00 FF FF #define PKT_TYPE_INFO 0 #define PKT_TYPE_SESSION 1 #define PKT_TYPE_SESSIONEND 2 #define PKT_TYPE_REQ 3 #define PKT_TYPE_REQACK 4 #define PKT_TYPE_SESSIONACK 5 #define PKT_TYPE_REGISTER_EVENT 6 #define PKT_TYPE_UNREGISTER_EVENT 7 #define PKT_TYPE_EVENT_DATA 8 #define PKT_TYPE_LOG_EVENT 9 #define PKT_TYPE_LOG_WARN 10 // add sys var get & set type by xkm@20150526 #define PKT_TYPE_SET_VAR_REQ 11 // len + type + tsx_id + varname + varvalue #define PKT_TYPE_SET_VAR_ACK 12 // len + type + conn_id + tsx_id + errcode + errmsg #define PKT_TYPE_GET_VAR_REQ 13 // len + type + tsx_id + varname #define PKT_TYPE_GET_VAR_ACK 14 // len + type + conn_id + tsx_id + errcode + varvalue/errmsg //控制信息 0xFF FF 00 00 #define PKT_TYPE_CONTROL_LINKCONTEXT (1 << 31) class link_context { public: char* businessId; //32 char* traceId; //32 char* spanId; //16 char* parentSpanId; //16 wchar_t* wbusinessId; //32 wchar_t* wtraceId; //32 wchar_t* wspanId; //16 wchar_t* wparentSpanId; //16 //重复代码,需优化 public: link_context() { businessId = traceId = spanId = parentSpanId = nullptr; wbusinessId = wtraceId = wspanId = wparentSpanId = nullptr; } ~link_context() { if (businessId) free(businessId); if (traceId) free(traceId); if (spanId) free(spanId); if (parentSpanId) free(parentSpanId); if (wbusinessId) free(wbusinessId); if (wtraceId) free(wtraceId); if (wspanId) free(wspanId); if (wparentSpanId) free(wparentSpanId); } void InitbusinessId(const std::string t_businessId) { if (nullptr == businessId) { businessId = (char*)malloc(t_businessId.length() + 1); ZeroMemory(businessId, t_businessId.length() + 1); strcpy_s(businessId, t_businessId.length() + 1, t_businessId.c_str()); } } void InittraceId(const std::string t_traceId) { if (nullptr == traceId) { traceId = (char*)malloc(t_traceId.length() + 1); ZeroMemory(traceId, t_traceId.length() + 1); strcpy_s(traceId, t_traceId.length() + 1, t_traceId.c_str()); } } void InitspanId(const std::string t_spanId) { if (nullptr == spanId) { spanId = (char*)malloc(t_spanId.length() + 1); ZeroMemory(spanId, t_spanId.length() + 1); strcpy_s(spanId, t_spanId.length() + 1, t_spanId.c_str()); } } void InitparentSpanId(const std::string t_parentSpanId) { if (nullptr == parentSpanId) { parentSpanId = (char*)malloc(t_parentSpanId.length() + 1); ZeroMemory(parentSpanId, t_parentSpanId.length() + 1); strcpy_s(parentSpanId, t_parentSpanId.length() + 1, t_parentSpanId.c_str()); } } const char* businessIdWtoA() { if (businessId) return businessId; else if (wbusinessId) { int n = WideCharToMultiByte(CP_ACP, 0, wbusinessId, -1, NULL, 0, NULL, NULL); businessId = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wbusinessId, -1, businessId, n, NULL, NULL); return businessId; } else return "None"; } const char* traceIdWtoA() { if (traceId) return traceId; else if (wtraceId) { int n = WideCharToMultiByte(CP_ACP, 0, wtraceId, -1, NULL, 0, NULL, NULL); traceId = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wtraceId, -1, traceId, n, NULL, NULL); return traceId; } else return "None"; } const char* spanIdWtoA() { if (spanId) return spanId; else if (wspanId) { int n = WideCharToMultiByte(CP_ACP, 0, wspanId, -1, NULL, 0, NULL, NULL); spanId = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wspanId, -1, spanId, n, NULL, NULL); return spanId; } else return "None"; } const char* parentSpanIdWtoA() { if (parentSpanId) return parentSpanId; else if (wparentSpanId) { int n = WideCharToMultiByte(CP_ACP, 0, wparentSpanId, -1, NULL, 0, NULL, NULL); parentSpanId = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wparentSpanId, -1, parentSpanId, n, NULL, NULL); return parentSpanId; } else return "None"; } }; typedef struct cmd_entry cmd_entry; typedef struct sub_session sub_session; typedef struct reg_entry reg_entry; typedef struct sock_connection sock_connection; struct cmd_entry { struct list_head entry; int msg; param_size_t param1; param_size_t param2; }; struct sub_session { struct hlist_node hentry; unsigned int id; unsigned int tsx_id; sp_ses_uac_t *uac; sock_connection *conn; }; struct reg_entry { struct hlist_node hentry; unsigned int register_id; unsigned int entity_id; sp_uid_t uid; sp_bcm_listener_t *listener; sock_connection *conn; }; struct sock_connection { SOCKET fd; int idx; int session_cnt; struct hlist_head session_bucket[SESSION_BUCKET_SIZE]; int register_cnt; struct hlist_head register_bucket[REGISTER_BUCKET_SIZE]; iobuffer_queue_t *tx_queue; iobuffer_t *rx_pkt; char rx_hdr_buf[RX_BUF_SIZE]; int rx_hdr_buf_len; int rx_state; int rx_header_len; sp_tbs_t *tbs; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sock_connection, sock_connection) struct sp_tbs_t { SOCKET server_fd; unsigned long server_ip; unsigned short server_port; HANDLE conn_evt[MAXIMUM_WAIT_OBJECTS]; // 0 对应待发送消息;1 对应侦听socket事件;2~MAX 对应通讯socket事件 HANDLE ready_evt; int ready_evt_result; sock_connection *conn[MAXIMUM_WAIT_OBJECTS]; // 0和1为空 int conn_cnt; // 实际连接数+2 unsigned int sub_session_seq; struct list_head cmd_list; spinlock_t cmd_list_lock; sp_log_client_t *log_client; HANDLE work_thread; DWORD work_thread_id; sp_ses_mgr_t *ses_mgr; sp_var_client_t *var_client; }; static void on_write(sp_tbs_t *tbs, sock_connection *conn); static void on_accept(sp_tbs_t *tbs); static void on_read(sp_tbs_t *tbs, sock_connection *conn); static void on_close(sp_tbs_t *tbs, sock_connection *conn); static void on_stop(sp_tbs_t *tbs); static void on_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id); static void on_info(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt); static void on_req(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt); static void on_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name); static void on_register_event(sp_tbs_t *tbs, sock_connection *conn, int register_id, char *entity_name, char *function_name); static void on_unregister_event(sp_tbs_t *tbs, sock_connection *conn, int register_id); static void post_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id); static void post_req_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int end, int err, const wchar_t *err_msg, iobuffer_t **ack_pkt); static void post_session_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int err, const wchar_t *err_msg); static void post_event(sp_tbs_t *tbs, reg_entry *entry, int conn_id, int tsx_id, iobuffer_t **evt_pkt); static void uac_on_connect(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error); static void uac_on_close(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error); static void uac_on_ans(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int tsx_id, int error, int user_error, const char* msg, int end, iobuffer_t **ans_pkt); static void uac_on_destroy(sp_tbs_t *tbs, sock_connection *conn, sub_session *session); static void post_msg(sp_tbs_t *tbs, int msg, param_size_t param1, param_size_t param2); static void destroy_sub_session(sp_tbs_t *tbs, sock_connection *conn, sub_session *session); postLink g_postlink = NULL; void sp_tbs_setpostlink(postLink cur) { g_postlink = cur; } static const wchar_t *get_err_msg(int rc) { return rc == 0 ? NULL : L"please find the err msg in ErrorCode.h!"; } static const wchar_t *get_user_err_msg(int rc) { static wchar_t msg[512]; if (rc == 0) return NULL; memset(msg, 0, sizeof(msg)); swprintf_s(msg, 512, L"UserError=%d", rc); return msg; } static int create_connection(sp_tbs_t *tbs) { sock_connection *conn; SOCKET new_client; int i; BOOL opt; struct sockaddr_in from_addr; int from_len = sizeof(from_addr); new_client = accept(tbs->server_fd, (struct sockaddr*)&from_addr, &from_len); if (new_client == INVALID_SOCKET) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("accept connection failed! %s %d", _GetFileName(__FILE__), __LINE__); return Error_NetBroken; } DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("accept from %s:%d", inet_ntoa(from_addr.sin_addr), ntohs(from_addr.sin_port)); if (tbs->conn_cnt >= MAXIMUM_WAIT_OBJECTS) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("exceed maximum wait objects, connection discard! %s %d", _GetFileName(__FILE__), __LINE__); WSAEventSelect(new_client, tbs->conn_evt[SRV_EVT_IDX], 0); closesocket(new_client); return Error_Resource; } conn = MALLOC_T(sock_connection); conn->idx = tbs->conn_cnt++; conn->fd = new_client; conn->rx_hdr_buf_len = 0; conn->tx_queue = iobuffer_queue_create(); conn->rx_pkt = NULL; conn->session_cnt = 0; conn->register_cnt = 0; conn->tbs = tbs; conn->rx_state = RECV_STATE_HEADER; conn->rx_header_len = 0; for (i = 0; i < array_size(conn->session_bucket); ++i) { INIT_HLIST_HEAD(&conn->session_bucket[i]); } for (i = 0; i < array_size(conn->register_bucket); ++i) { INIT_HLIST_HEAD(&conn->register_bucket[i]); } REF_COUNT_INIT(&conn->ref_cnt); tbs->conn[conn->idx] = conn; tbs->conn_evt[conn->idx] = WSACreateEvent(); opt = TRUE; setsockopt(new_client, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt)); opt = TRUE; setsockopt(new_client, SOL_SOCKET, SO_DONTLINGER, (char*)&opt, sizeof(opt)); WSAEventSelect(new_client, tbs->conn_evt[conn->idx], FD_WRITE | FD_READ | FD_CLOSE); DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("sock connection created!"); return 0; } static void __destroy_connection(sock_connection *conn) { sp_tbs_t *tbs = conn->tbs; if (conn->fd != INVALID_SOCKET) { WSAEventSelect(conn->fd, tbs->conn_evt[conn->idx], 0); closesocket(conn->fd); conn->fd = INVALID_SOCKET; } WSACloseEvent(tbs->conn_evt[conn->idx]); tbs->conn_evt[conn->idx] = NULL; tbs->conn[conn->idx] = NULL; if (conn->idx != tbs->conn_cnt-1) { tbs->conn_evt[conn->idx] = tbs->conn_evt[tbs->conn_cnt-1]; tbs->conn[conn->idx] = tbs->conn[tbs->conn_cnt-1]; tbs->conn[conn->idx]->idx = conn->idx; } tbs->conn_cnt--; iobuffer_queue_destroy(conn->tx_queue); conn->tx_queue = NULL; if (conn->rx_pkt) { iobuffer_dec_ref(conn->rx_pkt); conn->rx_pkt = NULL; } free(conn); DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("sock connection destroy!"); } IMPLEMENT_REF_COUNT_STATIC(sock_connection, sock_connection, ref_cnt, __destroy_connection) static __inline int is_worker_thread(sp_tbs_t *tbs) { return GetCurrentThreadId() == tbs->work_thread_id; } static sub_session *find_sub_session(sock_connection *conn, int conn_id) { sub_session *tpos; struct hlist_node *pos; int slot = ((unsigned int )conn_id) % SESSION_BUCKET_SIZE; // bugfix, assure slot positive hlist_for_each_entry(tpos, pos, &conn->session_bucket[slot], sub_session, hentry) { if (tpos->id == conn_id) return tpos; } return NULL; } static void __uac_on_connect(sp_ses_uac_t *uac, int error, void *user_data) { sub_session *session = (sub_session*)user_data; if (is_worker_thread(session->conn->tbs)) { uac_on_connect(session->conn->tbs, session->conn, session, error); } else { post_msg(session->conn->tbs, MSG_UAC_ON_CONNECT, (int)session, error); } } static void __uac_on_close(sp_ses_uac_t *uac, int error, void *user_data) { sub_session *session = (sub_session*)user_data; if (is_worker_thread(session->conn->tbs)) { uac_on_close(session->conn->tbs, session->conn, session, error); } else { post_msg(session->conn->tbs, MSG_UAC_ON_CLOSE, (int)session, error); } } static void __uac_on_destroy(sp_ses_uac_t *uac, void *user_data) { sub_session *session = (sub_session*)user_data; TOOLKIT_ASSERT(session->uac == NULL); if (is_worker_thread(session->conn->tbs)) { uac_on_destroy(session->conn->tbs, session->conn, session); } else { post_msg(session->conn->tbs, MSG_UAC_ON_DESTROY, (int)session, 0); } } static void __uac_on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data) { sub_session *session = (sub_session*)user_data; iobuffer_t *pkt = NULL; int tsx_id; int user_error = 0; int slen = 0; char *msg_str = NULL; if (error) { if (ans_pkt) { pkt = *ans_pkt; iobuffer_reset(pkt); *ans_pkt = NULL; } else { pkt = iobuffer_create(-1, -1); } } else { TOOLKIT_ASSERT(ans_pkt); pkt = *ans_pkt; *ans_pkt = NULL; iobuffer_read(pkt, IOBUF_T_I4, &error, NULL); iobuffer_read(pkt, IOBUF_T_I4, &user_error, NULL); if (error) { iobuffer_read(pkt, IOBUF_T_STR, NULL, &slen); if (slen) { msg_str = (char*)malloc(slen + 1); iobuffer_read(pkt, IOBUF_T_WSTR, msg_str, NULL); } iobuffer_reset(pkt); } } tsx_id = sp_tsx_uac_get_id(tsx); if (is_worker_thread(session->conn->tbs)) { uac_on_ans(session->conn->tbs, session->conn, session, tsx_id, error, user_error, msg_str, end, &pkt); if (pkt) iobuffer_dec_ref(pkt); } else { iobuffer_format_write_head(pkt, "s4444", msg_str, &end, &user_error, &error, &tsx_id); post_msg(session->conn->tbs, MSG_UAC_ON_ANS, (int)session, (int)pkt); } if (error || end) { sp_tsx_uac_close(tsx); sp_tsx_uac_destroy(tsx); } } static void uac_on_connect(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error) { if (!error) { post_session_ack(tbs, conn, session->id, session->tsx_id, error, NULL); if(getReduceSpbaseLog()) DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("sub session created ok! ses_id = %d, tsx_id = %d", session->id, session->tsx_id); } else { sp_ses_uac_t *uac = session->uac; post_session_ack(tbs, conn, CONN_INVALID_ID, session->tsx_id, error, get_err_msg(error)); session->uac = NULL; DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("sub session created failed %d! ses_id = %d, tsx_id = %d", error, session->id, session->tsx_id); if (uac) { sp_ses_uac_close(uac); sp_ses_uac_destroy(uac); } } } static void uac_on_close(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error) { if (session->uac) { sp_ses_uac_t *uac = session->uac; DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("uac on close detected! id = %d, %s %d %d", session->id, _GetFileName(__FILE__), __LINE__, error); post_session_end(tbs, conn, session->id); session->uac = NULL; if (error != Error_Closed) sp_ses_uac_close(uac); sp_ses_uac_destroy(uac); } } static void uac_on_ans(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int tsx_id, int error, int user_error, const char* msg, int end, iobuffer_t **ans_pkt) { if(msg == NULL || strlen(msg) == 0) post_req_ack(tbs, conn, session->id, tsx_id, end, error, get_user_err_msg(user_error), ans_pkt); else post_req_ack(tbs, conn, session->id, tsx_id, end, error, CSimpleStringA2W(msg).GetData(), ans_pkt); } static void uac_on_destroy(sp_tbs_t *tbs, sock_connection *conn, sub_session *session) { destroy_sub_session(tbs, conn, session); DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("destroy sub session(id:%d) uac %s %d", session->id, _GetFileName(__FILE__), __LINE__); } static int create_sub_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name, sub_session **p_session) { sp_env_t *env = sp_get_env(); sp_entity_t *ent = sp_mod_mgr_find_entity_by_name(env->mod_mgr, entity_name); if (ent) { int rc; sub_session *session = MALLOC_T(sub_session); sp_ses_uac_callback cb; iobuffer_t *pkt = NULL; char *param = NULL; session->id = tbs->sub_session_seq++; session->conn = conn; session->tsx_id = tsx_id; hlist_add_head(&session->hentry, &conn->session_bucket[session->id % SESSION_BUCKET_SIZE]); conn->session_cnt++; sock_connection_inc_ref(conn); cb.user_data = session; cb.on_close = &__uac_on_close; cb.on_destroy = &__uac_on_destroy; cb.on_connect = &__uac_on_connect; rc = sp_ses_uac_create(tbs->ses_mgr, ent->mod->cfg->idx, ent->cfg->idx, &cb, &session->uac); if (rc != 0) { goto on_error; } pkt = iobuffer_create(-1, -1); if (function_name == NULL || strlen(function_name) == 0) { param = class_name; } else { param = strdup_printf("%s::%s", class_name, function_name); } iobuffer_write(pkt, IOBUF_T_STR, param, -1); if (param != class_name) { FREE(param); } rc = sp_ses_uac_async_connect(session->uac, 10000, &pkt); if (pkt) iobuffer_dec_ref(pkt); if (rc != 0) { goto on_error; } on_error: if (rc != 0) { conn->session_cnt--; hlist_del(&session->hentry); sock_connection_dec_ref(conn); free(session); } else { *p_session = session; } return rc; } else { return Error_NotExist; } } static void destroy_sub_session(sp_tbs_t *tbs, sock_connection *conn, sub_session *session) { conn->session_cnt--; hlist_del(&session->hentry); if (session->uac) { sp_ses_uac_t *uac = session->uac; session->uac = NULL; sp_ses_uac_destroy(uac); } sock_connection_dec_ref(conn); free(session); DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("destroy sub session(id:%d) %s %d", session->id, _GetFileName(__FILE__), __LINE__); } static void post_session_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int err, const wchar_t *err_msg) { if (conn->fd != INVALID_SOCKET) { int len; iobuffer_t *pkt = iobuffer_create(-1, -1); int type = PKT_TYPE_SESSIONACK; iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err_msg); len = iobuffer_get_length(pkt)-4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); on_write(tbs, conn); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("send PKT_TYPE_SESSIONACK tsx_id=%d, err=%d", tsx_id, err); } } static void post_req_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int end, int err, const wchar_t *err_msg, iobuffer_t **ack_pkt) { if (conn->fd != INVALID_SOCKET) { iobuffer_t *pkt = NULL; int len; char bend = !!end; int type = PKT_TYPE_REQACK; if (ack_pkt) { pkt = *ack_pkt; *ack_pkt = NULL; } if (!pkt) { pkt = iobuffer_create(-1, -1); } iobuffer_format_write_head(pkt, "w41444", err_msg, &err, &bend, &tsx_id, &conn_id, &type); len = iobuffer_get_length(pkt)-4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("send PKT_TYPE_REQACK tsx_id=%d, err=%d, pktLen=%d", tsx_id, err, iobuffer_get_length(pkt)); on_write(tbs, conn); } } static void post_set_var_ack(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, int err, const wchar_t *err_msg) { if (conn->fd != INVALID_SOCKET) { int len; int type = PKT_TYPE_SET_VAR_ACK; iobuffer_t *pkt = iobuffer_create(-1, -1); int conn_id = CONN_INVALID_ID; iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err_msg); len = iobuffer_get_length(pkt) - 4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("send PKT_TYPE_SET_VAR_ACK tsx_id=%d, err=%d", tsx_id, err); on_write(tbs, conn); } } static void post_get_var_ack(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, int err, const wchar_t *err_msg, const wchar_t *value) { if (conn->fd != INVALID_SOCKET) { int len; int type = PKT_TYPE_GET_VAR_ACK; iobuffer_t *pkt = iobuffer_create(-1, -1); int conn_id = CONN_INVALID_ID; iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err == 0 ? value : err_msg); len = iobuffer_get_length(pkt) - 4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("send PKT_TYPE_GET_VAR_ACK tsx_id=%d, err=%d", tsx_id, err); on_write(tbs, conn); } } static void post_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id) { if (conn->fd != INVALID_SOCKET) { int len; iobuffer_t *pkt = iobuffer_create(-1, -1); int type = PKT_TYPE_SESSIONEND; iobuffer_format_write(pkt, "44", &type, &conn_id); len = iobuffer_get_length(pkt)-4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(conn->tx_queue, pkt); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("send PKT_TYPE_SESSIONEND"); on_write(tbs, conn); } } static void on_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name) { sub_session *session = NULL; int rc; rc = create_sub_session(tbs, conn, tsx_id, entity_name, function_name, class_name, &session); if (rc == 0) { if (getReduceSpbaseLog()) DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("start create sub session!"); //post_session_ack(tbs, conn, session->id, tsx_id, rc, NULL); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create_sub_session failed! entity: %s, err = %d %s %d", entity_name, rc, _GetFileName(__FILE__), __LINE__); post_session_ack(tbs, conn, CONN_INVALID_ID, tsx_id, rc, get_err_msg(rc)); } } static reg_entry *find_reg_entry(sock_connection *conn, int register_id) { reg_entry *tpos; struct hlist_node *pos; int slot = ((unsigned int)register_id) % REGISTER_BUCKET_SIZE; // bugfix, assure slot positive hlist_for_each_entry(tpos, pos, &conn->register_bucket[slot], reg_entry, hentry) { if (tpos->register_id == register_id) { return tpos; } } return NULL; } static void post_event(sp_tbs_t *tbs, reg_entry *entry, int conn_id, int tsx_id, iobuffer_t **evt_pkt) { if (entry->conn->fd != INVALID_SOCKET) { int type; int len; iobuffer_t *pkt = *evt_pkt; *evt_pkt = NULL; type = PKT_TYPE_EVENT_DATA; iobuffer_write_head(pkt, IOBUF_T_I4, &tsx_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &conn_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &type, 0); len = iobuffer_get_length(pkt)-4; iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0); iobuffer_queue_enqueue(entry->conn->tx_queue, pkt); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("send PKT_TYPE_EVENT_DATA tsx_id=%d", tsx_id); on_write(tbs, entry->conn); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("post event len:%d", len); } sp_bcm_listener_dec_ref(entry->listener); //@ } static void __bcm_on_message_raw(sp_bcm_listener_t *listener, int from_client_id, iobuffer_t **msg_pkt, void *user_data) { reg_entry *entry = (reg_entry*)user_data; iobuffer_t *pkt = *msg_pkt; *msg_pkt = NULL; sp_bcm_listener_inc_ref(listener); //@ post_msg(entry->conn->tbs, MSG_ON_EVENT, (int)entry, (int)pkt); if(getReduceSpbaseLog()) DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("post event from_client_id:%d", from_client_id); } static void __bcm_on_destroy(sp_bcm_listener_t *listener, void *user_data) { reg_entry *entry = (reg_entry*)user_data; entry->conn->register_cnt--; hlist_del(&entry->hentry); sock_connection_dec_ref(entry->conn); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("unregister event ok! register_id = %d", entry->register_id); free(entry); } static void on_register_event(sp_tbs_t *tbs, sock_connection *conn, int register_id, char *entity_name, char *function_name) { sp_env_t *env = sp_get_env(); sp_entity_t *ent = sp_mod_mgr_find_entity_by_name(env->mod_mgr, entity_name); if (ent) { int rc; sp_svc_t *svc = sp_ses_mgr_get_svc(tbs->ses_mgr); reg_entry *entry; sp_bcm_listener_cb cb; if (find_reg_entry(conn, register_id)) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("register entry already exist, register id %d", register_id); return; } entry = ZALLOC_T(reg_entry); entry->register_id = register_id; entry->entity_id = ent->cfg->idx; hlist_add_head(&entry->hentry, &conn->register_bucket[((unsigned int)register_id) % REGISTER_BUCKET_SIZE]); // bugfix, assure index positive conn->register_cnt++; sock_connection_inc_ref(conn); entry->conn = conn; cb.on_message_raw = &__bcm_on_message_raw; cb.on_message = NULL; cb.on_destroy = &__bcm_on_destroy; cb.user_data = entry; rc = sp_bcm_listener_create(svc, entry->entity_id, function_name, &cb, &entry->listener); if (rc != 0) { goto on_error; } rc = sp_bcm_listener_subscribe(entry->listener, &entry->uid); if (rc != 0) { goto on_error; } on_error: if (rc != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create bcm listener failed! entity = %s, register_id = %d", ent->cfg->name, register_id); conn->register_cnt--; hlist_del_init(&entry->hentry); if (entry->listener) { sp_bcm_listener_destroy(entry->listener); } free(entry); sock_connection_dec_ref(conn); } else { DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("register event for %s ok! register_id = %d", ent->cfg->name, register_id); } } else { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("cannot find entity %s!", entity_name); } } static void on_unregister_event(sp_tbs_t *tbs, sock_connection *conn, int register_id) { reg_entry *entry = find_reg_entry(conn, register_id); if (entry) { if (sp_bcm_listener_unsubscribe(entry->listener) == 0) { sp_bcm_listener_destroy(entry->listener); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("entry already unregistered! register_id = %d", register_id); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("%s, entry not found, registered_id = %d", __FUNCTION__, register_id); } } static void on_log_event(sp_tbs_t *tbs, sock_connection *conn, int severity_level, int user_code, const char *msg, const linkContext &t_context) { sp_log_client_logWithLink(tbs->log_client, Log_Event, severity_level, 0, user_code, 0, 0, NULL == msg ? "" : msg, NULL == msg ? 0 : strlen(msg) , t_context.bussinessId.GetData(), t_context.traceId.GetData(), t_context.spanId.GetData(), t_context.parentSpanId.GetData()); } static void on_log_warn(sp_tbs_t *tbs, sock_connection *conn, int severity_level, int user_code, const char *msg, const linkContext& t_context) { sp_log_client_logWithLink(tbs->log_client, Log_Warning, severity_level, 0, user_code, 0, 0, NULL == msg ? "" : msg, NULL == msg ? 0 : strlen(msg) , t_context.bussinessId.GetData(), t_context.traceId.GetData(), t_context.spanId.GetData(), t_context.parentSpanId.GetData()); } static void on_req(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt) { sub_session *session = find_sub_session(conn, conn_id); if (session) { if (session->uac) { int rc; sp_tsx_uac_t *tsx; sp_tsx_uac_callback cb; cb.user_data = session; cb.on_ans = &__uac_on_ans; cb.on_destroy = NULL; rc = sp_tsx_uac_create(session->uac, tsx_id, method_id, method_sig, &cb, &tsx); if (rc == 0) { rc = sp_tsx_uac_async_req(tsx, timeout, req_pkt); if (rc == 0) { WLog_DBG(TAG, "send req: tsx_id=%d, method_id=%d, method_sig=%d, pkt_size=%d", tsx_id, method_id, method_sig, *req_pkt == NULL ? 0 : iobuffer_get_length(*req_pkt)); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("async req tsx failed! err = %d %s %d", rc, _GetFileName(__FILE__), __LINE__); post_req_ack(tbs, conn, conn_id, tsx_id, 1, rc, get_err_msg(rc), NULL); sp_tsx_uac_destroy(tsx); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create req tsx failed! %s %d", _GetFileName(__FILE__), __LINE__); post_req_ack(tbs, conn, conn_id, tsx_id, 1, rc, get_err_msg(rc), NULL); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("session already closed! %s %d", _GetFileName(__FILE__), __LINE__); post_req_ack(tbs, conn, conn_id, tsx_id, 1, Error_NetBroken, get_err_msg(Error_NetBroken), NULL); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__); post_req_ack(tbs, conn, conn_id, tsx_id, 1, Error_NotExist, get_err_msg(Error_NotExist), NULL); } } static void on_info(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt) { sub_session *session = find_sub_session(conn, conn_id); if (session) { if (session->uac) { int rc = sp_ses_uac_send_info(session->uac, method_id, method_sig, info_pkt); if (rc != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("uac send info failed! error = %d %s %d", rc, _GetFileName(__FILE__), __LINE__); } else { //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("send info: method_id=%d, method_sig=%d, pkt_size=%d", method_id, method_sig, // *info_pkt == NULL ? 0 : iobuffer_get_length(*info_pkt)); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("session already closed! info ignored ! %s %d", _GetFileName(__FILE__), __LINE__); } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__); } } static void on_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id) { sub_session *session = find_sub_session(conn, conn_id); if (session) { if (session->uac) { sp_ses_uac_t *uac = session->uac; session->uac = NULL; if (sp_ses_uac_close(uac) == 0) { sp_ses_uac_destroy(uac); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("duplicated uac close!"); } } } else { DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__); } } static void on_stop(sp_tbs_t *tbs) { int i; WSAEventSelect(tbs->server_fd, tbs->conn_evt[SRV_EVT_IDX], 0); closesocket(tbs->server_fd); tbs->server_fd = INVALID_SOCKET; for (i = tbs->conn_cnt-1; i > SRV_EVT_IDX; --i) { sock_connection *conn = tbs->conn[i]; TOOLKIT_ASSERT(conn); on_close(tbs, conn); } } static void on_close(sp_tbs_t *tbs, sock_connection *conn) { int i; DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("socket on close is invoked!"); if (conn->fd != INVALID_SOCKET) { WSAEventSelect(conn->fd, tbs->conn_evt[conn->idx], 0); closesocket(conn->fd); conn->fd = INVALID_SOCKET; } for (i = 0; i < SESSION_BUCKET_SIZE; ++i) { sub_session *tpos; struct hlist_node *pos, *n; hlist_for_each_entry_safe(tpos, pos, n, &conn->session_bucket[i], sub_session, hentry) { if (tpos->uac) { sp_ses_uac_t *uac = tpos->uac; tpos->uac = NULL; if (sp_ses_uac_close(uac) == 0) { sp_ses_uac_destroy(uac); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("duplicated uac close! %s %d", _GetFileName(__FILE__), __LINE__); } } } } for (i = 0; i < REGISTER_BUCKET_SIZE; ++i) { reg_entry *tpos; struct hlist_node *pos, *n; hlist_for_each_entry_safe(tpos, pos, n, &conn->register_bucket[i], reg_entry, hentry) { if (tpos->listener) { if (sp_bcm_listener_unsubscribe(tpos->listener) == 0) { sp_bcm_listener_destroy(tpos->listener); } } } } sock_connection_dec_ref(conn); } static void on_set_var_req(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *name, char *value) { int rc; if (name == NULL) { rc = Error_Param; post_set_var_ack(tbs, conn, tsx_id, rc, L"param [name] is null"); return; } rc = sp_var_client_set((sp_var_client_t*)tbs->var_client, name, value, 0); post_set_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc)); } static void on_get_var_req(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *name) { int len = 260; int rc; char szTmp[260] = { 0 }; wchar_t wszValue[260] = { 0 }; if (name == NULL) { rc = Error_Param; post_get_var_ack(tbs, conn, tsx_id, rc, L"param [name] is null", NULL); return; } rc = sp_var_client_lock(tbs->var_client); if (rc == 0) { rc = sp_var_client_get((sp_var_client_t*)tbs->var_client, name, szTmp, &len); } sp_var_client_unlock(tbs->var_client); if (rc == 0) { len = 260; MultiByteToWideChar(CP_ACP, 0, szTmp, -1, &wszValue[0], len); post_get_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc), wszValue); } else post_get_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc), NULL); } static void process_tcp_pkt(sp_tbs_t *tbs, sock_connection *conn, iobuffer_t **p_pkt) { iobuffer_t *pkt = *p_pkt; int type; iobuffer_read(pkt, IOBUF_T_I4, &type, NULL); link_context cur; char linkStr[512] = ""; bool withLinkContex = !!(type & PKT_TYPE_CONTROL_LINKCONTEXT); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("process_tcp_pkt withLinkContex:%d", withLinkContex); int linkId = 0; type = type & 0x0000FFFF; //clear control part switch (type) { case PKT_TYPE_INFO: { int conn_id; int method_id; int method_sig; if (withLinkContex) { iobuffer_format_read(pkt, "444wwww", &conn_id, &method_id, &method_sig, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); iobuffer_set_linkInfo(pkt, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); } else iobuffer_format_read(pkt, "444", &conn_id, &method_id, &method_sig); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("recv PKT_TYPE_INFO conn_id=%d method_id=%d method_sig=%d, %s", conn_id, method_id, method_sig, linkStr); on_info(tbs, conn, conn_id, method_id, method_sig, &pkt); } break; case PKT_TYPE_REQ: { int conn_id; int tsx_id; int method_id; int method_sig; int timeout; if (withLinkContex) { iobuffer_format_read(pkt, "44444wwww", &conn_id, &tsx_id, &method_id, &method_sig, &timeout, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); iobuffer_set_linkInfo(pkt, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); } else iobuffer_format_read(pkt, "44444", &conn_id, &tsx_id, &method_id, &method_sig, &timeout); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("recv PKT_TYPE_REQ conn_id=%d tsx_id=%d method_id=%d method_sig=%d timeout=%d, %s", conn_id, tsx_id, method_id, method_sig, timeout, linkStr); on_req(tbs, conn, conn_id, tsx_id, method_id, method_sig, timeout, &pkt); } break; case PKT_TYPE_SESSION: { int tsx_id; wchar_t *wentity_name; wchar_t *wfunction_name; wchar_t *wclass_name; char *entity_name; char *function_name; char *class_name; int n; if (withLinkContex) { iobuffer_format_read(pkt, "4wwwwwww", &tsx_id, &wentity_name, &wfunction_name, &wclass_name, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else iobuffer_format_read(pkt, "4www", &tsx_id, &wentity_name, &wfunction_name, &wclass_name); n = WideCharToMultiByte(CP_ACP, 0, wentity_name, -1, NULL, 0, NULL, NULL); entity_name = (char *)malloc(n); WideCharToMultiByte(CP_ACP, 0, wentity_name, -1, entity_name, n, NULL, NULL); FREE(wentity_name); n = WideCharToMultiByte(CP_ACP, 0, wfunction_name, -1, NULL, 0, NULL, NULL); function_name = (char *)malloc(n); WideCharToMultiByte(CP_ACP, 0, wfunction_name, -1, function_name, n, NULL, NULL); FREE(wfunction_name); n = WideCharToMultiByte(CP_ACP, 0, wclass_name, -1, NULL, 0, NULL, NULL); class_name = (char *)malloc(n); WideCharToMultiByte(CP_ACP, 0, wclass_name, -1, class_name, n, NULL, NULL); FREE(wclass_name); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SESSION tsx_id=%d entity_name=%s function_name=%s class_name=%s", tsx_id, entity_name, function_name, class_name); on_session(tbs, conn, tsx_id, entity_name, function_name, class_name); if (entity_name) free(entity_name); if (function_name) free(function_name); if (class_name) free(class_name); } break; case PKT_TYPE_SESSIONEND: { int conn_id; if (withLinkContex) { iobuffer_format_read(pkt, "4wwww", &conn_id, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else iobuffer_format_read(pkt, "4", &conn_id); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("recv PKT_TYPE_SESSIONEND conn_id=%d, %s", conn_id, linkStr); on_session_end(tbs, conn, conn_id); } break; case PKT_TYPE_REGISTER_EVENT: { int register_id; wchar_t *wentity_name; char *entity_name; wchar_t *wfunction_name; char *function_name; int n; if (withLinkContex) { iobuffer_format_read(pkt, "4wwwwww", ®ister_id, &wentity_name, &wfunction_name, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else iobuffer_format_read(pkt, "4ww", ®ister_id, &wentity_name, &wfunction_name); n = WideCharToMultiByte(CP_ACP, 0, wentity_name, -1, NULL, 0, NULL, NULL); entity_name = (char *)malloc(n); WideCharToMultiByte(CP_ACP, 0, wentity_name, -1, entity_name, n, NULL, NULL); FREE(wentity_name); n = WideCharToMultiByte(CP_ACP, 0, wfunction_name, -1, NULL, 0, NULL, NULL); function_name = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wfunction_name, -1, function_name, n, NULL, NULL); FREE(wfunction_name); DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_REGISTER_EVENT register_id=%d, entity_name=%s, function_name=%s", register_id, entity_name, function_name); on_register_event(tbs, conn, register_id, entity_name, function_name); if (entity_name) free(entity_name); if (function_name) free(function_name); } break; case PKT_TYPE_UNREGISTER_EVENT: { int register_id; if (withLinkContex) { iobuffer_format_read(pkt, "4wwww", ®ister_id, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else iobuffer_format_read(pkt, "4", ®ister_id); //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("recv PKT_TYPE_UNREGISTER_EVENT register_id=%d, %s", register_id, linkStr); on_unregister_event(tbs, conn, register_id); } break; case PKT_TYPE_LOG_EVENT: { int severity_level; int user_code; wchar_t *wmsg = NULL; char *msg = NULL; if (withLinkContex) { iobuffer_format_read(pkt, "44wwwww", &severity_level, &user_code, &wmsg, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else iobuffer_format_read(pkt, "44w", &severity_level, &user_code, &wmsg); if (wmsg) { int n = WideCharToMultiByte(CP_ACP, 0, wmsg, -1, NULL, 0, NULL, NULL); msg = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wmsg, -1, msg, n, NULL, NULL); FREE(wmsg); } //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("recv PKT_TYPE_LOG_EVENT severity_level=%d, user_code=0x%08x, msg=%s, %s", severity_level, user_code, msg ? msg : "", linkStr); linkContext t_Context(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); on_log_event(tbs, conn, severity_level, user_code, msg, t_Context); if (msg) free(msg); } break; case PKT_TYPE_LOG_WARN: { int severity_level; int user_code; wchar_t *wmsg = NULL; char *msg = NULL; if (withLinkContex) { iobuffer_format_read(pkt, "44wwwww", &severity_level, &user_code, &wmsg, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else iobuffer_format_read(pkt, "44w", &severity_level, &user_code, &wmsg); if (wmsg) { int n = WideCharToMultiByte(CP_ACP, 0, wmsg, -1, NULL, 0, NULL, NULL); msg = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wmsg, -1, msg, n, NULL, NULL); FREE(wmsg); } //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("recv PKT_TYPE_LOG_WARN severity_level=%d, user_code=0x%08x, msg=%s, %s", severity_level, user_code, msg ? msg : "", linkStr); linkContext t_Context(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); on_log_warn(tbs, conn, severity_level, user_code, msg, t_Context); if (msg) free(msg); } break; case PKT_TYPE_SET_VAR_REQ: { int tsx_id = 0; wchar_t *wname = NULL; wchar_t *wvalue = NULL; char *name = NULL; char *value = NULL; if (withLinkContex) { iobuffer_format_read(pkt, "4wwwwww", &tsx_id, &wname, &wvalue, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else iobuffer_format_read(pkt, "4ww", &tsx_id, &wname, &wvalue, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if (wname) { int n = WideCharToMultiByte(CP_ACP, 0, wname, -1, NULL, 0, NULL, NULL); name = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wname, -1, name, n, NULL, NULL); FREE(wname); } if (wvalue) { int n = WideCharToMultiByte(CP_ACP, 0, wvalue, -1, NULL, 0, NULL, NULL); value = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wvalue, -1, value, n, NULL, NULL); FREE(wvalue); } DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("recv PKT_TYPE_SET_VAR_REQ tsx_id=%d, name=%s, value=%s, %s" , tsx_id, name ? name : "", value ? value : "", linkStr); on_set_var_req(tbs, conn, tsx_id, name, value); if (name) free(name); if (value) free(value); } break; case PKT_TYPE_GET_VAR_REQ: { int tsx_id = 0; wchar_t *wname = NULL; char *name = NULL; if (withLinkContex) { iobuffer_format_read(pkt, "4wwwww", &tsx_id, &wname, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId); if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA()); sprintf_s(linkStr, sizeof(linkStr), "withLinkContex:%d linkcontext:%s-%s-%s-%s-%d", withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId); } else iobuffer_format_read(pkt, "4w", &tsx_id, &wname); if (wname) { int n = WideCharToMultiByte(CP_ACP, 0, wname, -1, NULL, 0, NULL, NULL); name = (char*)malloc(n); WideCharToMultiByte(CP_ACP, 0, wname, -1, name, n, NULL, NULL); FREE(wname); } DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("recv PKT_TYPE_GET_VAR_REQ tsx_id=%d, name=%s, %s", tsx_id, name ? name : "", linkStr); on_get_var_req(tbs, conn, tsx_id, name); if (name) free(name); } break; default: DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("recv unknown pkt type: %d", type); //assert(0); break; } *p_pkt = NULL; if (pkt) iobuffer_dec_ref(pkt); } static void on_read(sp_tbs_t *tbs, sock_connection *conn) { if (conn->rx_state == RECV_STATE_HEADER) { int t = recv(conn->fd, &conn->rx_hdr_buf[conn->rx_hdr_buf_len], RX_BUF_SIZE-conn->rx_hdr_buf_len, 0); if (t > 0) { int offset = 0; conn->rx_hdr_buf_len += t; while (conn->rx_hdr_buf_len-offset >= HDR_LEN) { iobuffer_t *pkt; memcpy(&conn->rx_header_len, conn->rx_hdr_buf+offset, HDR_LEN); conn->rx_header_len += 4; // fixed hdr not include type!!! offset += HDR_LEN; pkt = iobuffer_create(-1, conn->rx_header_len); if (conn->rx_hdr_buf_len-offset >= conn->rx_header_len) { memcpy(iobuffer_data(pkt, -1), conn->rx_hdr_buf+offset, conn->rx_header_len); offset += conn->rx_header_len; iobuffer_push_count(pkt, conn->rx_header_len); process_tcp_pkt(tbs, conn, &pkt); if (pkt) iobuffer_dec_ref(pkt); } else { memcpy(iobuffer_data(pkt, -1), conn->rx_hdr_buf+offset, conn->rx_hdr_buf_len-offset); iobuffer_push_count(pkt, conn->rx_hdr_buf_len-offset); offset = conn->rx_hdr_buf_len; TOOLKIT_ASSERT(conn->rx_pkt == NULL); conn->rx_pkt = pkt; conn->rx_state = RECV_STATE_BODY; } } if (offset == conn->rx_hdr_buf_len) { conn->rx_hdr_buf_len = 0; } else { if (offset > 0) { TOOLKIT_ASSERT(conn->rx_state == RECV_STATE_HEADER); memmove(&conn->rx_hdr_buf[0], &conn->rx_hdr_buf[offset], conn->rx_hdr_buf_len-offset); conn->rx_hdr_buf_len -= offset; } } } } else if (conn->rx_state == RECV_STATE_BODY) { int offset, t; TOOLKIT_ASSERT(conn->rx_pkt); offset = iobuffer_get_length(conn->rx_pkt); t = recv(conn->fd, iobuffer_data(conn->rx_pkt, -1), conn->rx_header_len-offset, 0); if (t > 0) { iobuffer_push_count(conn->rx_pkt, t); if (offset+t == conn->rx_header_len) { process_tcp_pkt(tbs, conn, &conn->rx_pkt); if (conn->rx_pkt) { iobuffer_dec_ref(conn->rx_pkt); conn->rx_pkt = NULL; } conn->rx_hdr_buf_len = 0; conn->rx_state = RECV_STATE_HEADER; } } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("cannot go here, bug! %s, %d", _GetFileName(__FILE__), __LINE__); TOOLKIT_ASSERT(0); } } static void on_write(sp_tbs_t *tbs, sock_connection *conn) { if (conn->fd == INVALID_SOCKET) return; while (iobuffer_queue_count(conn->tx_queue) > 0) { iobuffer_t *pkt = iobuffer_queue_head(conn->tx_queue); int sended = 0; int total = iobuffer_get_length(pkt); while (sended < total) { int t = send(conn->fd, iobuffer_data(pkt, 0), total-sended, 0); if (t > 0) { sended += t; iobuffer_pop_count(pkt, t); } else { break; } } if (sended < total) { break; } else { iobuffer_queue_deque(conn->tx_queue); iobuffer_dec_ref(pkt); } } } static void on_accept(sp_tbs_t *tbs) { int rc; rc = create_connection(tbs); if (rc != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create connection failed! err = %d %s %d", rc, _GetFileName(__FILE__), __LINE__); } else { DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create connection ok! %s %d", _GetFileName(__FILE__), __LINE__); } } static void post_msg(sp_tbs_t *tbs, int msg, param_size_t param1, param_size_t param2) { cmd_entry *e = MALLOC_T(cmd_entry); e->msg = msg; e->param1 = param1; e->param2 = param2; spinlock_enter(&tbs->cmd_list_lock, -1); list_add_tail(&e->entry, &tbs->cmd_list); spinlock_leave(&tbs->cmd_list_lock); ReleaseSemaphore(tbs->conn_evt[MSG_EVT_IDX], 1, NULL); } static unsigned int __stdcall server_proc(void *arg) { sp_tbs_t *tbs = (sp_tbs_t*)arg; struct sockaddr_in addr = {0}; int istop = 0; int fixed_conn; TOOLKIT_ASSERT(tbs->server_fd == INVALID_SOCKET); tbs->server_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (tbs->server_fd == INVALID_SOCKET) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create list fd socket failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError()); tbs->ready_evt_result = Error_Resource; goto on_error; } { long val = 1; setsockopt(tbs->server_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&val, sizeof(val)); } addr.sin_family = AF_INET; addr.sin_addr.s_addr = tbs->server_ip; addr.sin_port = tbs->server_port; if (bind(tbs->server_fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("bind address failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError()); tbs->ready_evt_result = Error_AlreadyExist; goto on_error; } if (listen(tbs->server_fd, 5) != 0) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("winsock::listen failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError()); tbs->ready_evt_result = Error_AlreadyExist; goto on_error; } TOOLKIT_ASSERT(tbs->conn_evt[SRV_EVT_IDX] == NULL); tbs->conn_evt[SRV_EVT_IDX] = WSACreateEvent(); WSAEventSelect(tbs->server_fd, tbs->conn_evt[SRV_EVT_IDX], FD_ACCEPT | FD_CLOSE); tbs->conn_cnt++; DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("start server ok!"); tbs->ready_evt_result = 0; SetEvent(tbs->ready_evt); fixed_conn = tbs->conn_cnt; while (!istop || tbs->conn_cnt > fixed_conn) { DWORD evt_cnt = (DWORD)tbs->conn_cnt; DWORD dwRet = WaitForMultipleObjects(evt_cnt, &tbs->conn_evt[0], FALSE, MAX_TIMEOUT); if (dwRet == WAIT_TIMEOUT) { continue; } else if (dwRet == WAIT_OBJECT_0+MSG_EVT_IDX) { // cmd cmd_entry *e; spinlock_enter(&tbs->cmd_list_lock, -1); e = list_first_entry(&tbs->cmd_list, cmd_entry, entry); list_del(&e->entry); spinlock_leave(&tbs->cmd_list_lock); switch (e->msg) { case MSG_STOP: { on_stop(tbs); // ...stop istop++; } break; case MSG_UAC_ON_CONNECT: { sub_session *session = (sub_session*)e->param1; int error = e->param2; uac_on_connect(tbs, session->conn, session, error); } break; case MSG_UAC_ON_DESTROY: { sub_session *session = (sub_session*)e->param1; uac_on_destroy(tbs, session->conn, session); } break; case MSG_UAC_ON_CLOSE: { sub_session *session = (sub_session*)e->param1; int error = e->param2; uac_on_close(tbs, session->conn, session, error); } break; case MSG_UAC_ON_ANS: { sub_session *session = (sub_session*)e->param1; iobuffer_t *ans_pkt = (iobuffer_t*)e->param2; int tsx_id; int error; int end; int user_error; char* msg_str = NULL; iobuffer_format_read(ans_pkt, "4444s", &tsx_id, &error, &user_error, &end, &msg_str); uac_on_ans(tbs, session->conn, session, tsx_id, error, user_error, msg_str, end, &ans_pkt); if (ans_pkt) iobuffer_dec_ref(ans_pkt); if (msg_str) free(msg_str); } break; case MSG_ON_EVENT: { reg_entry *entry = (reg_entry*)e->param1; iobuffer_t *evt_pkt = (iobuffer_t *)e->param2; post_event(tbs, entry, 0, entry->register_id, &evt_pkt); if (evt_pkt) iobuffer_dec_ref(evt_pkt); } break; default: DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("unknown msg type! %s %d", _GetFileName(__FILE__), __LINE__); TOOLKIT_ASSERT(0); break; } free(e); } else if (dwRet == WAIT_OBJECT_0 + SRV_EVT_IDX) { // WSANETWORKEVENTS netevents; if (WSAEnumNetworkEvents(tbs->server_fd, tbs->conn_evt[SRV_EVT_IDX], &netevents) != SOCKET_ERROR) { if (netevents.lNetworkEvents & FD_ACCEPT) { if (netevents.iErrorCode[FD_ACCEPT_BIT] == 0) { DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("FD_ACCEPT trigger!"); on_accept(tbs); } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("listen accept error bit! %s, %d", _GetFileName(__FILE__), __LINE__); } } if (netevents.lNetworkEvents & FD_CLOSE) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("listen socket closed detected! %s, %d", _GetFileName(__FILE__), __LINE__); break; } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("WSAEnumNetworkEvents failed, LastError=%d", WSAGetLastError()); } } else if (dwRet < WAIT_OBJECT_0 + evt_cnt) { WSANETWORKEVENTS netevents; int idx = dwRet - WAIT_OBJECT_0; sock_connection *conn = tbs->conn[idx]; TOOLKIT_ASSERT(conn); if (WSAEnumNetworkEvents(conn->fd, tbs->conn_evt[idx], &netevents) != SOCKET_ERROR) { if (netevents.lNetworkEvents & FD_READ) { if (netevents.iErrorCode[FD_READ_BIT] == 0) { on_read(tbs, conn); } } if (netevents.lNetworkEvents & FD_WRITE) { if (netevents.iErrorCode[FD_WRITE_BIT] == 0) { on_write(tbs, conn); } } if (netevents.lNetworkEvents & FD_CLOSE) { on_close(tbs, conn); } } } else { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("unexpected dwRet! %s %d", _GetFileName(__FILE__), __LINE__); break; } } DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("tbs server thread exit!"); on_error: if (tbs->server_fd != INVALID_SOCKET) { if (tbs->conn_evt[SRV_EVT_IDX]) WSAEventSelect(tbs->server_fd, tbs->conn_evt[SRV_EVT_IDX], 0); closesocket(tbs->server_fd); tbs->server_fd = INVALID_SOCKET; } if (tbs->conn_evt[SRV_EVT_IDX]) { WSACloseEvent(tbs->conn_evt[SRV_EVT_IDX]); tbs->conn_evt[SRV_EVT_IDX] = NULL; tbs->conn_cnt--; } if (tbs->ready_evt_result) { SetEvent(tbs->ready_evt); } return 0; } int sp_tbs_create(sp_ses_mgr_t *ses_mgr, const char *ipaddr, unsigned short port, sp_tbs_t **p_tbs) { sp_tbs_t *tbs; sp_svc_t *svc = sp_ses_mgr_get_svc(ses_mgr); TOOLKIT_ASSERT(ipaddr); TOOLKIT_ASSERT(ses_mgr); tbs = ZALLOC_T(sp_tbs_t); tbs->conn_evt[0] = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL); tbs->ready_evt = CreateEventA(NULL, TRUE, FALSE, NULL); sp_log_client_create(svc, sp_svc_get_iom(svc), &tbs->log_client); tbs->conn_cnt = 1; tbs->server_fd = INVALID_SOCKET; tbs->server_ip = inet_addr(ipaddr); tbs->server_port = htons(port); tbs->sub_session_seq = (unsigned int)GetTickCount(); INIT_LIST_HEAD(&tbs->cmd_list); spinlock_init(&tbs->cmd_list_lock); tbs->ses_mgr = ses_mgr; sp_var_client_create(svc, &tbs->var_client); *p_tbs = tbs; return 0; } int sp_tbs_start(sp_tbs_t *tbs) { if (tbs->work_thread) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("may be already started! %s %d", _GetFileName(__FILE__), __LINE__); return Error_Duplication; } tbs->ready_evt_result = 0; ResetEvent(tbs->ready_evt); tbs->work_thread = (HANDLE)_beginthreadex(NULL, 0, &server_proc, tbs, 0, (unsigned*)&tbs->work_thread_id); if (!tbs->work_thread) { return Error_Resource; } WaitForSingleObject(tbs->ready_evt, INFINITE); if (tbs->ready_evt_result) { CloseHandle(tbs->work_thread); tbs->work_thread = NULL; } return tbs->ready_evt_result; } void sp_tbs_stop(sp_tbs_t *tbs) { TOOLKIT_ASSERT(tbs->work_thread); post_msg(tbs, MSG_STOP, 0, 0); WaitForSingleObject(tbs->work_thread, INFINITE); CloseHandle(tbs->work_thread); tbs->work_thread = NULL; } void sp_tbs_destroy(sp_tbs_t *tbs) { TOOLKIT_ASSERT(tbs->conn_cnt == 1); TOOLKIT_ASSERT(tbs->server_fd == INVALID_SOCKET); CloseHandle(tbs->conn_evt[0]); CloseHandle(tbs->ready_evt); sp_log_client_destroy(tbs->log_client); sp_var_client_destroy(tbs->var_client); free(tbs); }