sp_tbs_unix.cpp 63 KB


  1. #include "precompile.h"
  2. #include <sys/eventfd.h>
  3. #include <sys/stat.h>
  4. #include "sp_svc.h"
  5. #include "sp_def.h"
  6. #include "sp_ses.h"
  7. #include "sp_tbs.h"
  8. #include "sp_env.h"
  9. #include "sp_bcm.h"
  10. #include "sp_log.h"
  11. #include "sp_uid.h"
  12. #include "sp_var.h"
  13. #include "SpBase.h"
  14. #include "toolkit.h"
  15. #include "list.h"
  16. #include "hash.h"
  17. #include "array.h"
  18. #include "refcnt.h"
  19. #include "spinlock.h"
  20. #include "strutil.h"
  21. #include "fileutil.h"
  22. #include "memutil.h"
  23. #include "toolkit.h"
  24. #include "charset.h"
  25. #include "uuid4.h"
  26. #include "dbgutil.h"
  27. #include <winpr/winsock.h>
  28. #include <winpr/string.h>
  29. #include <winpr/sysinfo.h>
  30. #include "unix/evtpoll.h"
  31. #include "unix/core.h"
  32. #define TAG SPBASE_TAG("tbs")
  33. #define SESSION_BUCKET_SIZE 511
  34. #define REGISTER_BUCKET_SIZE 127
  35. #define RX_BUF_SIZE 1024
  36. #define MSG_EVT_IDX (MAXIMUM_WAIT_OBJECTS + 1)
  37. #define SRV_EVT_IDX 1
  38. #define MAX_TIMEOUT 30000
  39. #define HDR_LEN 4
  40. #define CONN_INVALID_ID -1
  41. #define MSG_STOP 0
  42. #define MSG_UAC_ON_CONNECT 1
  43. #define MSG_UAC_ON_CLOSE 2
  44. #define MSG_UAC_ON_DESTROY 3
  45. #define MSG_UAC_ON_ANS 4
  46. #define MSG_ON_EVENT 5
  47. #define RECV_STATE_HEADER 0 /* already recv header length */
  48. #define RECV_STATE_BODY 1 /* recv body */
  49. #define PKT_TYPE_INFO 0
  50. #define PKT_TYPE_SESSION 1
  51. #define PKT_TYPE_SESSIONEND 2
  52. #define PKT_TYPE_REQ 3
  53. #define PKT_TYPE_REQACK 4
  54. #define PKT_TYPE_SESSIONACK 5
  55. #define PKT_TYPE_REGISTER_EVENT 6
  56. #define PKT_TYPE_UNREGISTER_EVENT 7
  57. #define PKT_TYPE_EVENT_DATA 8
  58. #define PKT_TYPE_LOG_EVENT 9
  59. #define PKT_TYPE_LOG_WARN 10
  60. // add sys var get & set type by xkm@20150526
  61. #define PKT_TYPE_SET_VAR_REQ 11 // len + type + tsx_id + varname + varvalue
  62. #define PKT_TYPE_SET_VAR_ACK 12 // len + type + conn_id + tsx_id + errcode + errmsg
  63. #define PKT_TYPE_GET_VAR_REQ 13 // len + type + tsx_id + varname
  64. #define PKT_TYPE_GET_VAR_ACK 14 // len + type + conn_id + tsx_id + errcode + varvalue/errmsg
  65. //控制信息 0xFF FF 00 00
  66. #define PKT_TYPE_CONTROL_LINKCONTEXT (1 << 31)
  67. class link_context {
  68. public:
  69. char* businessId; //32
  70. char* traceId; //32
  71. char* spanId; //16
  72. char* parentSpanId; //16
  73. wchar_t* wbusinessId; //32
  74. wchar_t* wtraceId; //32
  75. wchar_t* wspanId; //16
  76. wchar_t* wparentSpanId; //16
  77. //重复代码,需优化
  78. public:
  79. link_context() {
  80. businessId = traceId = spanId = parentSpanId = nullptr;
  81. wbusinessId = wtraceId = wspanId = wparentSpanId = nullptr;
  82. }
  83. ~link_context() {
  84. if (businessId)
  85. free(businessId);
  86. if (traceId)
  87. free(traceId);
  88. if (spanId)
  89. free(spanId);
  90. if (parentSpanId)
  91. free(parentSpanId);
  92. if (wbusinessId)
  93. free(wbusinessId);
  94. if (wtraceId)
  95. free(wtraceId);
  96. if (wspanId)
  97. free(wspanId);
  98. if (wparentSpanId)
  99. free(wparentSpanId);
  100. }
  101. void InitbusinessId(const std::string t_businessId)
  102. {
  103. if (nullptr == businessId) {
  104. businessId = (char*)malloc(t_businessId.length() + 1);
  105. ZeroMemory(businessId, t_businessId.length() + 1);
  106. strcpy_s(businessId, t_businessId.length() + 1, t_businessId.c_str());
  107. }
  108. }
  109. void InittraceId(const std::string t_traceId)
  110. {
  111. if (nullptr == traceId) {
  112. traceId = (char*)malloc(t_traceId.length() + 1);
  113. ZeroMemory(traceId, t_traceId.length() + 1);
  114. strcpy_s(traceId, t_traceId.length() + 1, t_traceId.c_str());
  115. }
  116. }
  117. void InitspanId(const std::string t_spanId)
  118. {
  119. if (nullptr == spanId) {
  120. spanId = (char*)malloc(t_spanId.length() + 1);
  121. ZeroMemory(spanId, t_spanId.length() + 1);
  122. strcpy_s(spanId, t_spanId.length() + 1, t_spanId.c_str());
  123. }
  124. }
  125. void InitparentSpanId(const std::string t_parentSpanId)
  126. {
  127. if (nullptr == parentSpanId) {
  128. parentSpanId = (char*)malloc(t_parentSpanId.length() + 1);
  129. ZeroMemory(parentSpanId, t_parentSpanId.length() + 1);
  130. strcpy_s(parentSpanId, t_parentSpanId.length() + 1, t_parentSpanId.c_str());
  131. }
  132. }
  133. const char* businessIdWtoA()
  134. {
  135. if (businessId) {
  136. return businessId;
  137. } else if (wbusinessId) {
  138. int n = toolkit_wcs2mbs(wbusinessId, NULL, 0);
  139. if (n > 0) {
  140. businessId = (char*)malloc(n);
  141. n = toolkit_wcs2mbs(wbusinessId, businessId, n);
  142. if (n <= 0) {
  143. free(businessId);
  144. businessId = NULL;
  145. } else {
  146. return businessId;
  147. }
  148. }
  149. }
  150. return "None";
  151. }
  152. const char* traceIdWtoA()
  153. {
  154. if (traceId) {
  155. return traceId;
  156. }
  157. else if (wtraceId)
  158. {
  159. int n = toolkit_wcs2mbs(wtraceId, NULL, 0);
  160. if (n > 0) {
  161. traceId = (char*)malloc(n);
  162. n = toolkit_wcs2mbs(wtraceId, traceId, n);
  163. if (n <= 0) {
  164. free(traceId);
  165. traceId = NULL;
  166. } else {
  167. return traceId;
  168. }
  169. }
  170. }
  171. return "None";
  172. }
  173. const char* spanIdWtoA()
  174. {
  175. if (spanId) {
  176. return spanId;
  177. } else if (wspanId)
  178. {
  179. int n = toolkit_wcs2mbs(wspanId, NULL, 0);
  180. if (n > 0) {
  181. spanId = (char*)malloc(n);
  182. n = toolkit_wcs2mbs(wspanId, spanId, n);
  183. if (n <= 0) {
  184. free(spanId);
  185. spanId = NULL;
  186. } else {
  187. return spanId;
  188. }
  189. }
  190. }
  191. return "None";
  192. }
  193. const char* parentSpanIdWtoA()
  194. {
  195. if (parentSpanId) {
  196. return parentSpanId;
  197. }
  198. else if (wparentSpanId)
  199. {
  200. int n = toolkit_wcs2mbs(wparentSpanId, NULL, 0);
  201. if (n > 0) {
  202. parentSpanId = (char*)malloc(n);
  203. n = toolkit_wcs2mbs(wparentSpanId, parentSpanId, n);
  204. if (n <= 0) {
  205. free(parentSpanId);
  206. parentSpanId = NULL;
  207. } else {
  208. return parentSpanId;
  209. }
  210. }
  211. }
  212. return "None";
  213. }
  214. };
  215. typedef struct cmd_entry cmd_entry;
  216. typedef struct sub_session sub_session;
  217. typedef struct reg_entry reg_entry;
  218. typedef struct sock_connection sock_connection;
  219. struct cmd_entry {
  220. struct list_head entry;
  221. int msg;
  222. param_size_t param1;
  223. param_size_t param2;
  224. };
  225. struct sub_session {
  226. struct hlist_node hentry;
  227. unsigned int id;
  228. unsigned int tsx_id;
  229. sp_ses_uac_t *uac;
  230. sock_connection *conn;
  231. };
  232. struct reg_entry {
  233. struct hlist_node hentry;
  234. unsigned int register_id;
  235. unsigned int entity_id;
  236. sp_uid_t uid;
  237. sp_bcm_listener_t *listener;
  238. sock_connection *conn;
  239. };
  240. struct sock_connection {
  241. SOCKET fd;
  242. int idx;
  243. int session_cnt;
  244. struct hlist_head session_bucket[SESSION_BUCKET_SIZE];
  245. int register_cnt;
  246. struct hlist_head register_bucket[REGISTER_BUCKET_SIZE];
  247. iobuffer_queue_t *tx_queue;
  248. iobuffer_t *rx_pkt;
  249. char rx_hdr_buf[RX_BUF_SIZE];
  250. int rx_hdr_buf_len;
  251. int rx_state;
  252. int rx_header_len;
  253. sp_tbs_t *tbs;
  254. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  255. };
  256. DECLARE_REF_COUNT_STATIC(sock_connection, sock_connection)
  257. struct sp_tbs_t
  258. {
  259. SOCKET server_fd;
  260. unsigned long server_ip;
  261. unsigned short server_port;
  262. HANDLE conn_evt[MAXIMUM_WAIT_OBJECTS];
  263. HANDLE ready_evt;
  264. int ready_evt_result;
  265. sock_connection *conn[MAXIMUM_WAIT_OBJECTS];
  266. int conn_cnt;
  267. unsigned int sub_session_seq;
  268. struct list_head cmd_list;
  269. spinlock_t cmd_list_lock;
  270. sp_log_client_t *log_client;
  271. HANDLE work_thread;
  272. DWORD work_thread_id;
  273. sp_ses_mgr_t *ses_mgr;
  274. sp_var_client_t *var_client;
  275. evtpoll_t* ep;
  276. int msg_fd;
  277. };
  278. static void on_write(sp_tbs_t *tbs, sock_connection *conn);
  279. static void on_accept(sp_tbs_t *tbs);
  280. static void on_read(sp_tbs_t *tbs, sock_connection *conn);
  281. static void on_close(sp_tbs_t *tbs, sock_connection *conn);
  282. static void on_stop(sp_tbs_t *tbs);
  283. static void on_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id);
  284. static void on_info(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt);
  285. static void on_req(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt);
  286. static void on_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name);
  287. static void on_register_event(sp_tbs_t *tbs, sock_connection *conn, int register_id, char *entity_name, char *function_name);
  288. static void on_unregister_event(sp_tbs_t *tbs, sock_connection *conn, int register_id);
  289. static void post_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id);
  290. static void post_req_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int end, int err, const wchar_t *err_msg, iobuffer_t **ack_pkt);
  291. static void post_req_ack_ex(sp_tbs_t* tbs, sock_connection* conn, int conn_id, int tsx_id, int end, int err, int uc, const wchar_t* err_msg, iobuffer_t** ack_pkt);
  292. static void post_session_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int err, const wchar_t*err_msg);
  293. static void post_event(sp_tbs_t *tbs, reg_entry *entry, int conn_id, int tsx_id, iobuffer_t **evt_pkt);
  294. static void uac_on_connect(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error);
  295. static void uac_on_close(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error);
  296. static void uac_on_ans(sp_tbs_t* tbs, sock_connection* conn, sub_session* session, int tsx_id, int error, int user_error, const char* msg, int end, iobuffer_t** ans_pkt);
  297. static void uac_on_destroy(sp_tbs_t *tbs, sock_connection *conn, sub_session *session);
  298. static void post_msg(sp_tbs_t *tbs, int msg, param_size_t param1, param_size_t param2);
  299. static void destroy_sub_session(sp_tbs_t *tbs, sock_connection *conn, sub_session *session);
  300. postLink g_postlink = NULL;
  301. void sp_tbs_setpostlink(postLink cur)
  302. {
  303. g_postlink = cur;
  304. }
  305. static const wchar_t *get_err_msg(int rc)
  306. {
  307. static wchar_t wmsg[512];
  308. char msg[512] = { '\0' };
  309. memset(msg, 0, sizeof(msg));
  310. sprintf_s(msg, 512, sp_strerror(rc));
  311. toolkit_mbs2wcs(msg, wmsg, 512);
  312. return wmsg;
  313. }
  314. static const wchar_t *get_user_err_msg(int rc)
  315. {
  316. static wchar_t wmsg[512];
  317. char msg[512] = { '\0' };
  318. if (rc == 0)
  319. return NULL;
  320. memset(msg, 0, sizeof(msg));
  321. sprintf_s(msg, 512, "UserError=%d", rc);
  322. toolkit_mbs2wcs(msg, wmsg, 512);
  323. return wmsg;
  324. }
  325. static int create_connection(sp_tbs_t *tbs)
  326. {
  327. sock_connection *conn;
  328. SOCKET new_client;
  329. int i;
  330. BOOL opt;
  331. struct sockaddr_in from_addr;
  332. int from_len = sizeof(from_addr);
  333. new_client = _accept(tbs->server_fd, (struct sockaddr*)&from_addr, &from_len);
  334. if (new_client == INVALID_SOCKET) {
  335. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("accept connection failed! %s %d", _GetFileName(__FILE__), __LINE__);
  336. return Error_NetBroken;
  337. }
  338. if (make_fd_cloexec(new_client, 1) != 0) {
  339. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("make client cloexec failed");
  340. closesocket(new_client);
  341. return Error_Unexpect;
  342. }
  343. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("accept from %s:%d", inet_ntoa(from_addr.sin_addr), ntohs(from_addr.sin_port));
  344. if (tbs->conn_cnt >= MAXIMUM_WAIT_OBJECTS) {
  345. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("exceed maximum wait objects, connection discard! %s %d", _GetFileName(__FILE__), __LINE__);
  346. closesocket(new_client);
  347. return Error_Resource;
  348. }
  349. conn = MALLOC_T(sock_connection);
  350. conn->idx = tbs->conn_cnt++;
  351. conn->fd = new_client;
  352. conn->rx_hdr_buf_len = 0;
  353. conn->tx_queue = iobuffer_queue_create();
  354. conn->rx_pkt = NULL;
  355. conn->session_cnt = 0;
  356. conn->register_cnt = 0;
  357. conn->tbs = tbs;
  358. conn->rx_state = RECV_STATE_HEADER;
  359. conn->rx_header_len = 0;
  360. for (i = 0; i < array_size(conn->session_bucket); ++i) {
  361. INIT_HLIST_HEAD(&conn->session_bucket[i]);
  362. }
  363. for (i = 0; i < array_size(conn->register_bucket); ++i) {
  364. INIT_HLIST_HEAD(&conn->register_bucket[i]);
  365. }
  366. REF_COUNT_INIT(&conn->ref_cnt);
  367. tbs->conn[conn->idx] = conn;
  368. tbs->conn_evt[conn->idx] = WSACreateEvent();
  369. opt = TRUE;
  370. setsockopt(new_client, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
  371. opt = TRUE;
  372. setsockopt(new_client, SOL_SOCKET, SO_DONTLINGER, (char*)&opt, sizeof(opt));
  373. //WSAEventSelect(new_client, tbs->conn_evt[conn->idx], FD_WRITE | FD_READ | FD_CLOSE);
  374. if (evtpoll_attach(tbs->ep, new_client)) {
  375. WLog_ERR(TAG, "%s: attach evtpoll for %d failed!", __FUNCTION__, new_client);
  376. goto on_error;
  377. }
  378. if (0 != evtpoll_subscribe(tbs->ep, EV_READ, new_client, (void*)(conn->idx), NULL)) {
  379. goto on_error_1;
  380. }
  381. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("sock connection created!");
  382. return 0;
  383. on_error_1:
  384. evtpoll_detach(tbs->ep, new_client);
  385. on_error:
  386. if (new_client != INVALID_SOCKET)
  387. closesocket(new_client);
  388. FREE(conn);
  389. return Error_Unexpect;
  390. }
  391. static void __destroy_connection(sock_connection *conn)
  392. {
  393. sp_tbs_t *tbs = conn->tbs;
  394. if (conn->fd != INVALID_SOCKET) {
  395. WSAEventSelect(conn->fd, tbs->conn_evt[conn->idx], 0);
  396. evtpoll_detach(tbs->ep, conn->fd);
  397. closesocket(conn->fd);
  398. conn->fd = INVALID_SOCKET;
  399. }
  400. WSACloseEvent(tbs->conn_evt[conn->idx]);
  401. tbs->conn_evt[conn->idx] = NULL;
  402. tbs->conn[conn->idx] = NULL;
  403. if (conn->idx != tbs->conn_cnt-1) {
  404. tbs->conn_evt[conn->idx] = tbs->conn_evt[tbs->conn_cnt-1];
  405. tbs->conn[conn->idx] = tbs->conn[tbs->conn_cnt-1];
  406. tbs->conn[conn->idx]->idx = conn->idx;
  407. }
  408. tbs->conn_cnt--;
  409. iobuffer_queue_destroy(conn->tx_queue);
  410. conn->tx_queue = NULL;
  411. if (conn->rx_pkt) {
  412. iobuffer_dec_ref(conn->rx_pkt);
  413. conn->rx_pkt = NULL;
  414. }
  415. free(conn);
  416. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("sock connection destroy!");
  417. }
  418. IMPLEMENT_REF_COUNT_STATIC(sock_connection, sock_connection, ref_cnt, __destroy_connection)
  419. static __inline int is_worker_thread(sp_tbs_t *tbs)
  420. {
  421. return GetCurrentThreadId() == tbs->work_thread_id;
  422. }
  423. static sub_session *find_sub_session(sock_connection *conn, int conn_id)
  424. {
  425. sub_session *tpos;
  426. struct hlist_node *pos;
  427. int slot = ((unsigned int )conn_id) % SESSION_BUCKET_SIZE; // bugfix, assure slot positive
  428. hlist_for_each_entry(tpos, pos, &conn->session_bucket[slot], sub_session, hentry) {
  429. if (tpos->id == conn_id)
  430. return tpos;
  431. }
  432. return NULL;
  433. }
  434. static void __uac_on_connect(sp_ses_uac_t *uac, int error, void *user_data)
  435. {
  436. sub_session *session = (sub_session*)user_data;
  437. if (is_worker_thread(session->conn->tbs)) {
  438. uac_on_connect(session->conn->tbs, session->conn, session, error);
  439. } else {
  440. post_msg(session->conn->tbs, MSG_UAC_ON_CONNECT, (param_size_t)session, error);
  441. }
  442. }
  443. static void __uac_on_close(sp_ses_uac_t *uac, int error, void *user_data)
  444. {
  445. sub_session *session = (sub_session*)user_data;
  446. if (is_worker_thread(session->conn->tbs)) {
  447. uac_on_close(session->conn->tbs, session->conn, session, error);
  448. } else {
  449. post_msg(session->conn->tbs, MSG_UAC_ON_CLOSE, (param_size_t)session, error);
  450. }
  451. }
  452. static void __uac_on_destroy(sp_ses_uac_t *uac, void *user_data)
  453. {
  454. sub_session *session = (sub_session*)user_data;
  455. TOOLKIT_ASSERT(session->uac == NULL);
  456. if (is_worker_thread(session->conn->tbs)) {
  457. uac_on_destroy(session->conn->tbs, session->conn, session);
  458. } else {
  459. post_msg(session->conn->tbs, MSG_UAC_ON_DESTROY, (param_size_t)session, 0);
  460. }
  461. }
  462. static void __uac_on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data)
  463. {
  464. sub_session *session = (sub_session*)user_data;
  465. iobuffer_t *pkt = NULL;
  466. int tsx_id;
  467. int user_error = 0;
  468. int slen = 0;
  469. char* msg_str = NULL;
  470. if (error)
  471. {
  472. if (ans_pkt)
  473. {
  474. pkt = *ans_pkt;
  475. iobuffer_reset(pkt);
  476. *ans_pkt = NULL;
  477. }
  478. else
  479. {
  480. pkt = iobuffer_create(-1, -1);
  481. }
  482. }
  483. else
  484. {
  485. TOOLKIT_ASSERT(ans_pkt);
  486. pkt = *ans_pkt;
  487. *ans_pkt = NULL;
  488. iobuffer_read(pkt, IOBUF_T_I4, &error, NULL);
  489. iobuffer_read(pkt, IOBUF_T_I4, &user_error, NULL);
  490. if (error)
  491. {
  492. iobuffer_read(pkt, IOBUF_T_STR, NULL, &slen);
  493. if (slen) {
  494. msg_str = (char*)malloc(slen + 1);
  495. iobuffer_read(pkt, IOBUF_T_WSTR, msg_str, NULL);
  496. }
  497. iobuffer_reset(pkt);
  498. }
  499. }
  500. tsx_id = sp_tsx_uac_get_id(tsx);
  501. if (is_worker_thread(session->conn->tbs))
  502. {
  503. uac_on_ans(session->conn->tbs, session->conn, session, tsx_id, error, user_error, msg_str, end, &pkt);
  504. if (pkt)
  505. iobuffer_dec_ref(pkt);
  506. }
  507. else
  508. {
  509. iobuffer_format_write_head(pkt, "s4444", msg_str, &end, &user_error, &error, &tsx_id);
  510. post_msg(session->conn->tbs, MSG_UAC_ON_ANS, (param_size_t)session, (param_size_t)pkt);
  511. }
  512. if (error || end)
  513. {
  514. sp_tsx_uac_close(tsx);
  515. sp_tsx_uac_destroy(tsx);
  516. }
  517. }
  518. static void uac_on_connect(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error)
  519. {
  520. WLog_DBG(TAG, "Enter %s with error: %d", __FUNCTION__, error);
  521. if (!error) {
  522. post_session_ack(tbs, conn, session->id, session->tsx_id, error, NULL);
  523. } else {
  524. sp_ses_uac_t *uac = session->uac;
  525. post_session_ack(tbs, conn, CONN_INVALID_ID, session->tsx_id, error, get_err_msg(error));
  526. session->uac = NULL;
  527. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("sub session created failed %d! ses_id = %d, tsx_id = %d", error, session->id, session->tsx_id);
  528. if (uac) {
  529. sp_ses_uac_close(uac);
  530. sp_ses_uac_destroy(uac);
  531. }
  532. }
  533. WLog_DBG(TAG, "Leave %s", __FUNCTION__);
  534. }
  535. static void uac_on_close(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error)
  536. {
  537. if (session->uac) {
  538. sp_ses_uac_t *uac = session->uac;
  539. post_session_end(tbs, conn, session->id);
  540. session->uac = NULL;
  541. if (error != Error_Closed)
  542. sp_ses_uac_close(uac);
  543. sp_ses_uac_destroy(uac);
  544. }
  545. }
  546. static void uac_on_ans(sp_tbs_t* tbs, sock_connection* conn, sub_session* session, int tsx_id, int error, int user_error, const char* msg, int end, iobuffer_t** ans_pkt)
  547. {
  548. if (msg == NULL || strlen(msg) == 0)
  549. post_req_ack_ex(tbs, conn, session->id, tsx_id, end, error, user_error, get_user_err_msg(user_error), ans_pkt);
  550. else
  551. post_req_ack_ex(tbs, conn, session->id, tsx_id, end, error, user_error, CSimpleStringA2W(msg).GetData(), ans_pkt);
  552. }
  553. static void uac_on_destroy(sp_tbs_t *tbs, sock_connection *conn, sub_session *session)
  554. {
  555. destroy_sub_session(tbs, conn, session);
  556. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("destroy sub session(id:%d) uac %s %d", session->id, _GetFileName(__FILE__), __LINE__);
  557. }
  558. static int create_sub_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name, sub_session **p_session)
  559. {
  560. sp_env_t *env = sp_get_env();
  561. sp_entity_t *ent = sp_mod_mgr_find_entity_by_name(env->mod_mgr, entity_name);
  562. if (ent) {
  563. int rc;
  564. sub_session *session = MALLOC_T(sub_session);
  565. sp_ses_uac_callback cb;
  566. iobuffer_t *pkt = NULL;
  567. char *param = NULL;
  568. session->id = tbs->sub_session_seq++;
  569. session->conn = conn;
  570. session->tsx_id = tsx_id;
  571. hlist_add_head(&session->hentry, &conn->session_bucket[session->id % SESSION_BUCKET_SIZE]);
  572. conn->session_cnt++;
  573. sock_connection_inc_ref(conn);
  574. cb.user_data = session;
  575. cb.on_close = &__uac_on_close;
  576. cb.on_destroy = &__uac_on_destroy;
  577. cb.on_connect = &__uac_on_connect;
  578. rc = sp_ses_uac_create(tbs->ses_mgr, ent->mod->cfg->idx, ent->cfg->idx, &cb, &session->uac);
  579. if (rc != 0) {
  580. goto on_error;
  581. }
  582. pkt = iobuffer_create(-1, -1);
  583. if (function_name == NULL || strlen(function_name) == 0) {
  584. param = class_name;
  585. } else {
  586. param = strdup_printf("%s::%s", class_name, function_name);
  587. }
  588. iobuffer_write(pkt, IOBUF_T_STR, param, -1);
  589. if (param != class_name) {
  590. FREE(param);
  591. }
  592. rc = sp_ses_uac_async_connect(session->uac, 10000, &pkt);
  593. if (pkt)
  594. iobuffer_dec_ref(pkt);
  595. if (rc != 0) {
  596. goto on_error;
  597. }
  598. on_error:
  599. if (rc != 0) {
  600. conn->session_cnt--;
  601. hlist_del(&session->hentry);
  602. sock_connection_dec_ref(conn);
  603. free(session);
  604. } else {
  605. *p_session = session;
  606. }
  607. return rc;
  608. } else {
  609. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("cannot find entity from mod mgr by name: %s", entity_name);
  610. return Error_NotExist;
  611. }
  612. }
  613. static void destroy_sub_session(sp_tbs_t *tbs, sock_connection *conn, sub_session *session)
  614. {
  615. conn->session_cnt--;
  616. hlist_del(&session->hentry);
  617. if (session->uac) {
  618. sp_ses_uac_t *uac = session->uac;
  619. session->uac = NULL;
  620. sp_ses_uac_destroy(uac);
  621. }
  622. sock_connection_dec_ref(conn);
  623. free(session);
  624. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("destroy sub session(id:%d) %s %d", session->id, _GetFileName(__FILE__), __LINE__);
  625. }
  626. static void post_session_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int err, const wchar_t*err_msg)
  627. {
  628. if (conn->fd != INVALID_SOCKET) {
  629. int len;
  630. iobuffer_t *pkt = iobuffer_create(-1, -1);
  631. int type = PKT_TYPE_SESSIONACK;
  632. #if defined(_MSC_VER)
  633. iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err_msg);
  634. #else
  635. iobuffer_format_write(pkt, "4444m", &type, &conn_id, &tsx_id, &err, err_msg);
  636. #endif //_MSC_VER
  637. len = iobuffer_get_length(pkt)-4;
  638. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  639. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  640. on_write(tbs, conn);
  641. if (err != 0) {
  642. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("send PKT_TYPE_SESSIONACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  643. }
  644. }
  645. }
  646. static void post_req_ack_ex(sp_tbs_t* tbs, sock_connection* conn, int conn_id, int tsx_id, int end, int err, int uc, const wchar_t* err_msg, iobuffer_t** ack_pkt)
  647. {
  648. if (conn->fd != INVALID_SOCKET) {
  649. iobuffer_t* pkt = NULL;
  650. int len;
  651. char bend = !!end;
  652. int type = PKT_TYPE_REQACK;
  653. if (ack_pkt) {
  654. pkt = *ack_pkt;
  655. *ack_pkt = NULL;
  656. }
  657. if (!pkt) {
  658. pkt = iobuffer_create(-1, -1);
  659. }
  660. #if defined(_MSC_VER)
  661. iobuffer_format_write_head(pkt, "w441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type);
  662. #else
  663. iobuffer_format_write_head(pkt, "m441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type);
  664. #endif //_MSC_VER
  665. len = iobuffer_get_length(pkt) - 4;
  666. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  667. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  668. on_write(tbs, conn);
  669. if (err != 0) {
  670. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_REQACK at EX tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  671. }
  672. }
  673. }
  674. static void post_req_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int end, int err, const wchar_t *err_msg, iobuffer_t **ack_pkt)
  675. {
  676. if (conn->fd != INVALID_SOCKET) {
  677. iobuffer_t *pkt = NULL;
  678. int len;
  679. char bend = !!end;
  680. int type = PKT_TYPE_REQACK;
  681. int uc = 0;
  682. if (ack_pkt) {
  683. pkt = *ack_pkt;
  684. *ack_pkt = NULL;
  685. }
  686. if (!pkt) {
  687. pkt = iobuffer_create(-1, -1);
  688. }
  689. #if defined(_MSC_VER)
  690. iobuffer_format_write_head(pkt, "w441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type);
  691. #else
  692. iobuffer_format_write_head(pkt, "m441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type);
  693. #endif //_MSC_VER
  694. len = iobuffer_get_length(pkt)-4;
  695. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  696. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  697. on_write(tbs, conn);
  698. if (err != 0) {
  699. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_REQACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  700. }
  701. }
  702. }
  703. static void post_set_var_ack(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, int err, const wchar_t *err_msg)
  704. {
  705. if (conn->fd != INVALID_SOCKET) {
  706. int len;
  707. int type = PKT_TYPE_SET_VAR_ACK;
  708. iobuffer_t *pkt = iobuffer_create(-1, -1);
  709. int conn_id = CONN_INVALID_ID;
  710. #if defined(_MSC_VER)
  711. iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err_msg);
  712. #else
  713. iobuffer_format_write(pkt, "4444m", &type, &conn_id, &tsx_id, &err, err_msg);
  714. #endif //_MSC_VER
  715. len = iobuffer_get_length(pkt) - 4;
  716. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  717. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  718. on_write(tbs, conn);
  719. if (err != 0) {
  720. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_SET_VAR_ACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  721. }
  722. }
  723. }
  724. static void post_get_var_ack(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, int err, const wchar_t *err_msg, const wchar_t*value)
  725. {
  726. if (conn->fd != INVALID_SOCKET) {
  727. int len;
  728. int type = PKT_TYPE_GET_VAR_ACK;
  729. iobuffer_t *pkt = iobuffer_create(-1, -1);
  730. int conn_id = CONN_INVALID_ID;
  731. #if defined(_MSC_VER)
  732. iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err == 0 ? value : err_msg);
  733. #else
  734. iobuffer_format_write(pkt, "4444m", &type, &conn_id, &tsx_id, &err, err == 0 ? value : err_msg);
  735. #endif //_MSC_VER
  736. len = iobuffer_get_length(pkt) - 4;
  737. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  738. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  739. on_write(tbs, conn);
  740. if (err != 0) {
  741. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_GET_VAR_ACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  742. }
  743. }
  744. }
  745. static void post_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id)
  746. {
  747. if (conn->fd != INVALID_SOCKET) {
  748. int len;
  749. iobuffer_t *pkt = iobuffer_create(-1, -1);
  750. int type = PKT_TYPE_SESSIONEND;
  751. iobuffer_format_write(pkt, "44", &type, &conn_id);
  752. len = iobuffer_get_length(pkt)-4;
  753. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  754. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  755. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("send PKT_TYPE_SESSIONEND");
  756. on_write(tbs, conn);
  757. }
  758. }
  759. static void on_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name)
  760. {
  761. sub_session *session = NULL;
  762. int rc;
  763. rc = create_sub_session(tbs, conn, tsx_id, entity_name, function_name, class_name, &session);
  764. if (rc == 0) {
  765. if(getReduceSpbaseLog())
  766. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("start create sub session!");
  767. //post_session_ack(tbs, conn, session->id, tsx_id, rc, NULL);
  768. } else {
  769. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create_sub_session failed! entity: %s, err = %d %s %d",
  770. entity_name, rc, _GetFileName(__FILE__), __LINE__);
  771. post_session_ack(tbs, conn, CONN_INVALID_ID, tsx_id, rc, get_err_msg(rc));
  772. }
  773. }
  774. static reg_entry *find_reg_entry(sock_connection *conn, int register_id)
  775. {
  776. reg_entry *tpos;
  777. struct hlist_node *pos;
  778. int slot = ((unsigned int)register_id) % REGISTER_BUCKET_SIZE; // bugfix, assure slot positive
  779. hlist_for_each_entry(tpos, pos, &conn->register_bucket[slot], reg_entry, hentry) {
  780. if (tpos->register_id == register_id) {
  781. return tpos;
  782. }
  783. }
  784. return NULL;
  785. }
  786. static void post_event(sp_tbs_t *tbs, reg_entry *entry, int conn_id, int tsx_id, iobuffer_t **evt_pkt)
  787. {
  788. if (entry->conn->fd != INVALID_SOCKET) {
  789. int type;
  790. int len;
  791. iobuffer_t *pkt = *evt_pkt;
  792. *evt_pkt = NULL;
  793. type = PKT_TYPE_EVENT_DATA;
  794. iobuffer_write_head(pkt, IOBUF_T_I4, &tsx_id, 0);
  795. iobuffer_write_head(pkt, IOBUF_T_I4, &conn_id, 0);
  796. iobuffer_write_head(pkt, IOBUF_T_I4, &type, 0);
  797. len = iobuffer_get_length(pkt)-4;
  798. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  799. iobuffer_queue_enqueue(entry->conn->tx_queue, pkt);
  800. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("send PKT_TYPE_EVENT_DATA tsx_id=%d", tsx_id);
  801. on_write(tbs, entry->conn);
  802. }
  803. sp_bcm_listener_dec_ref(entry->listener); //@
  804. }
  805. static void __bcm_on_message_raw(sp_bcm_listener_t *listener, int from_client_id, iobuffer_t **msg_pkt, void *user_data)
  806. {
  807. reg_entry *entry = (reg_entry*)user_data;
  808. iobuffer_t *pkt = *msg_pkt;
  809. *msg_pkt = NULL;
  810. sp_bcm_listener_inc_ref(listener); //@
  811. post_msg(entry->conn->tbs, MSG_ON_EVENT, (param_size_t)entry, (param_size_t)pkt);
  812. if (getReduceSpbaseLog())
  813. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("post event from_client_id:%d", from_client_id);
  814. }
  815. static void __bcm_on_destroy(sp_bcm_listener_t *listener, void *user_data)
  816. {
  817. reg_entry *entry = (reg_entry*)user_data;
  818. entry->conn->register_cnt--;
  819. hlist_del(&entry->hentry);
  820. sock_connection_dec_ref(entry->conn);
  821. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("unregister event ok! register_id = %d", entry->register_id);
  822. free(entry);
  823. }
  824. static void on_register_event(sp_tbs_t *tbs, sock_connection *conn, int register_id, char *entity_name, char *function_name)
  825. {
  826. sp_env_t *env = sp_get_env();
  827. sp_entity_t *ent = sp_mod_mgr_find_entity_by_name(env->mod_mgr, entity_name);
  828. if (ent) {
  829. int rc;
  830. sp_svc_t *svc = sp_ses_mgr_get_svc(tbs->ses_mgr);
  831. reg_entry *entry;
  832. sp_bcm_listener_cb cb;
  833. if (find_reg_entry(conn, register_id)) {
  834. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("register entry already exist, register id %d", register_id);
  835. return;
  836. }
  837. entry = ZALLOC_T(reg_entry);
  838. entry->register_id = register_id;
  839. entry->entity_id = ent->cfg->idx;
  840. hlist_add_head(&entry->hentry, &conn->register_bucket[((unsigned int)register_id) % REGISTER_BUCKET_SIZE]); // bugfix, assure index positive
  841. conn->register_cnt++;
  842. sock_connection_inc_ref(conn);
  843. entry->conn = conn;
  844. cb.on_message_raw = &__bcm_on_message_raw;
  845. cb.on_message = NULL;
  846. cb.on_destroy = &__bcm_on_destroy;
  847. cb.user_data = entry;
  848. rc = sp_bcm_listener_create(svc, entry->entity_id, function_name, &cb, &entry->listener);
  849. if (rc != 0) {
  850. goto on_error;
  851. }
  852. rc = sp_bcm_listener_subscribe(entry->listener, &entry->uid);
  853. if (rc != 0) {
  854. goto on_error;
  855. }
  856. on_error:
  857. if (rc != 0) {
  858. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("create bcm listener failed! entity = %s, register_id = %d", ent->cfg->name, register_id);
  859. conn->register_cnt--;
  860. hlist_del_init(&entry->hentry);
  861. if (entry->listener) {
  862. sp_bcm_listener_destroy(entry->listener);
  863. }
  864. free(entry);
  865. sock_connection_dec_ref(conn);
  866. } else {
  867. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("register event for %s ok! register_id = %d", ent->cfg->name, register_id);
  868. }
  869. } else {
  870. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("cannot find entity %s!", entity_name);
  871. }
  872. }
  873. static void on_unregister_event(sp_tbs_t *tbs, sock_connection *conn, int register_id)
  874. {
  875. reg_entry *entry = find_reg_entry(conn, register_id);
  876. if (entry) {
  877. if (sp_bcm_listener_unsubscribe(entry->listener) == 0) {
  878. sp_bcm_listener_destroy(entry->listener);
  879. } else {
  880. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("entry already unregistered! register_id = %d", register_id);
  881. }
  882. } else {
  883. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("%s, entry not found, registered_id = %d", __FUNCTION__, register_id);
  884. }
  885. }
  886. static void on_log_event(sp_tbs_t *tbs, sock_connection *conn, int severity_level, int user_code, const char *msg, const linkContext &t_context)
  887. {
  888. sp_log_client_logWithLink(tbs->log_client, Log_Event, severity_level, 0, user_code, 0, 0, NULL == msg ? "" : msg, NULL == msg ? 0 : strlen(msg)
  889. , t_context.bussinessId.GetData(), t_context.traceId.GetData(), t_context.spanId.GetData(), t_context.parentSpanId.GetData());
  890. }
  891. static void on_log_warn(sp_tbs_t *tbs, sock_connection *conn, int severity_level, int user_code, const char *msg, const linkContext& t_context)
  892. {
  893. sp_log_client_logWithLink(tbs->log_client, Log_Warning, severity_level, 0, user_code, 0, 0, NULL == msg ? "" : msg, NULL == msg ? 0 : strlen(msg)
  894. , t_context.bussinessId.GetData(), t_context.traceId.GetData(), t_context.spanId.GetData(), t_context.parentSpanId.GetData());
  895. }
  896. static void on_req(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt)
  897. {
  898. sub_session *session = find_sub_session(conn, conn_id);
  899. if (session) {
  900. if (session->uac) {
  901. int rc;
  902. sp_tsx_uac_t *tsx;
  903. sp_tsx_uac_callback cb;
  904. cb.user_data = session;
  905. cb.on_ans = &__uac_on_ans;
  906. cb.on_destroy = NULL;
  907. rc = sp_tsx_uac_create(session->uac, tsx_id, method_id, method_sig, &cb, &tsx);
  908. if (rc == 0) {
  909. rc = sp_tsx_uac_async_req(tsx, timeout, req_pkt);
  910. if (rc == 0) {
  911. WLog_DBG(TAG, "send req: tsx_id=%d, method_id=%d, method_sig=%d, pkt_size=%d",
  912. tsx_id, method_id, method_sig, *req_pkt == NULL ? 0 : iobuffer_get_length(*req_pkt));
  913. } else {
  914. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("async req tsx failed! err = %d %s %d", rc, _GetFileName(__FILE__), __LINE__);
  915. post_req_ack(tbs, conn, conn_id, tsx_id, 1, rc, get_err_msg(rc), NULL);
  916. sp_tsx_uac_destroy(tsx);
  917. }
  918. } else {
  919. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("create req tsx failed! %s %d", _GetFileName(__FILE__), __LINE__);
  920. post_req_ack(tbs, conn, conn_id, tsx_id, 1, rc, get_err_msg(rc), NULL);
  921. }
  922. } else {
  923. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("session already closed! %s %d", _GetFileName(__FILE__), __LINE__);
  924. post_req_ack(tbs, conn, conn_id, tsx_id, 1, Error_NetBroken, get_err_msg(Error_NetBroken), NULL);
  925. }
  926. } else {
  927. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__);
  928. post_req_ack(tbs, conn, conn_id, tsx_id, 1, Error_Break, get_err_msg(Error_Break), NULL);
  929. }
  930. }
  931. static void on_info(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt)
  932. {
  933. sub_session *session = find_sub_session(conn, conn_id);
  934. if (session) {
  935. if (session->uac) {
  936. int rc = sp_ses_uac_send_info(session->uac, method_id, method_sig, info_pkt);
  937. if (rc != 0) {
  938. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("uac send info failed! error = %d %s %d", rc, _GetFileName(__FILE__), __LINE__);
  939. }
  940. else {
  941. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("send info: method_id=%d, method_sig=%d, pkt_size=%d", method_id, method_sig,
  942. // *info_pkt == NULL ? 0 : iobuffer_get_length(*info_pkt));
  943. }
  944. } else {
  945. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("session already closed! info ignored ! %s %d", _GetFileName(__FILE__), __LINE__);
  946. }
  947. } else {
  948. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__);
  949. }
  950. }
  951. static void on_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id)
  952. {
  953. sub_session *session = find_sub_session(conn, conn_id);
  954. if (session) {
  955. if (session->uac) {
  956. sp_ses_uac_t *uac = session->uac;
  957. session->uac = NULL;
  958. if (sp_ses_uac_close(uac) == 0) {
  959. sp_ses_uac_destroy(uac);
  960. } else {
  961. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("duplicated uac close!");
  962. }
  963. }
  964. } else {
  965. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__);
  966. }
  967. }
  968. static void on_stop(sp_tbs_t *tbs)
  969. {
  970. int i;
  971. WSAEventSelect(tbs->server_fd, tbs->conn_evt[SRV_EVT_IDX], 0);
  972. closesocket(tbs->server_fd);
  973. tbs->server_fd = INVALID_SOCKET;
  974. for (i = tbs->conn_cnt-1; i > SRV_EVT_IDX; --i) {
  975. sock_connection *conn = tbs->conn[i];
  976. TOOLKIT_ASSERT(conn);
  977. on_close(tbs, conn);
  978. }
  979. }
  980. static void on_close(sp_tbs_t *tbs, sock_connection *conn)
  981. {
  982. int i;
  983. WLog_WARN(TAG, "socket on close is invoked!");
  984. if (conn->fd != INVALID_SOCKET) {
  985. WSAEventSelect(conn->fd, tbs->conn_evt[conn->idx], 0);
  986. closesocket(conn->fd);
  987. conn->fd = INVALID_SOCKET;
  988. }
  989. for (i = 0; i < SESSION_BUCKET_SIZE; ++i) {
  990. sub_session *tpos;
  991. struct hlist_node *pos, *n;
  992. hlist_for_each_entry_safe(tpos, pos, n, &conn->session_bucket[i], sub_session, hentry) {
  993. if (tpos->uac) {
  994. sp_ses_uac_t *uac = tpos->uac;
  995. tpos->uac = NULL;
  996. if (sp_ses_uac_close(uac) == 0) {
  997. sp_ses_uac_destroy(uac);
  998. } else {
  999. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("duplicated uac close! %s %d", _GetFileName(__FILE__), __LINE__);
  1000. }
  1001. }
  1002. }
  1003. }
  1004. for (i = 0; i < REGISTER_BUCKET_SIZE; ++i) {
  1005. reg_entry *tpos;
  1006. struct hlist_node *pos, *n;
  1007. hlist_for_each_entry_safe(tpos, pos, n, &conn->register_bucket[i], reg_entry, hentry) {
  1008. if (tpos->listener) {
  1009. if (sp_bcm_listener_unsubscribe(tpos->listener) == 0) {
  1010. sp_bcm_listener_destroy(tpos->listener);
  1011. }
  1012. }
  1013. }
  1014. }
  1015. sock_connection_dec_ref(conn);
  1016. }
  1017. static void on_set_var_req(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *name, char *value)
  1018. {
  1019. int rc;
  1020. if (name == NULL)
  1021. {
  1022. rc = Error_Param;
  1023. post_set_var_ack(tbs, conn, tsx_id, rc, L"param [name] is null");
  1024. return;
  1025. }
  1026. rc = sp_var_client_set((sp_var_client_t*)tbs->var_client, name, value, 0);
  1027. post_set_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc));
  1028. }
  1029. static void on_get_var_req(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *name)
  1030. {
  1031. int len = 260;
  1032. int rc;
  1033. char szTmp[260] = { 0 };
  1034. wchar_t wszValue[260] = { 0 };
  1035. if (name == NULL)
  1036. {
  1037. rc = Error_Param;
  1038. post_get_var_ack(tbs, conn, tsx_id, rc, L"param [name] is null", NULL);
  1039. return;
  1040. }
  1041. rc = sp_var_client_lock(tbs->var_client);
  1042. if (rc == 0) {
  1043. rc = sp_var_client_get((sp_var_client_t*)tbs->var_client, name, szTmp, &len);
  1044. }
  1045. sp_var_client_unlock(tbs->var_client);
  1046. if (rc == 0)
  1047. {
  1048. len = 260;
  1049. toolkit_mbs2wcs(szTmp, &wszValue[0], len);
  1050. post_get_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc), wszValue);
  1051. }
  1052. else
  1053. post_get_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc), NULL);
  1054. }
  1055. static void process_tcp_pkt(sp_tbs_t *tbs, sock_connection *conn, iobuffer_t **p_pkt)
  1056. {
  1057. iobuffer_t *pkt = *p_pkt;
  1058. int type;
  1059. iobuffer_read(pkt, IOBUF_T_I4, &type, NULL);
  1060. link_context cur;
  1061. char linkStr[512] = "";
  1062. bool withLinkContex = !!(type & PKT_TYPE_CONTROL_LINKCONTEXT);
  1063. int linkId = 0;
  1064. type = type & 0x0000FFFF; //clear control part
  1065. switch (type) {
  1066. case PKT_TYPE_INFO:
  1067. {
  1068. int conn_id;
  1069. int method_id;
  1070. int method_sig;
  1071. if (withLinkContex)
  1072. {
  1073. iobuffer_format_read(pkt, "444mmmm", &conn_id, &method_id, &method_sig, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1074. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1075. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1076. iobuffer_set_linkInfo(pkt, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1077. } else {
  1078. iobuffer_format_read(pkt, "444", &conn_id, &method_id, &method_sig);
  1079. }
  1080. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_INFO conn_id=%d method_id=%d method_sig=%d, %s", conn_id, method_id, method_sig, linkStr);
  1081. on_info(tbs, conn, conn_id, method_id, method_sig, &pkt);
  1082. }
  1083. break;
  1084. case PKT_TYPE_REQ:
  1085. {
  1086. int conn_id;
  1087. int tsx_id;
  1088. int method_id;
  1089. int method_sig;
  1090. int timeout;
  1091. if (withLinkContex)
  1092. {
  1093. iobuffer_format_read(pkt, "44444mmmm", &conn_id, &tsx_id, &method_id, &method_sig, &timeout, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1094. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1095. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1096. iobuffer_set_linkInfo(pkt, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1097. }
  1098. else {
  1099. iobuffer_format_read(pkt, "44444", &conn_id, &tsx_id, &method_id, &method_sig, &timeout);
  1100. }
  1101. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_REQ conn_id=%d tsx_id=%d method_id=%d method_sig=%d timeout=%d, %s", conn_id, tsx_id, method_id, method_sig, timeout, linkStr);
  1102. on_req(tbs, conn, conn_id, tsx_id, method_id, method_sig, timeout, &pkt);
  1103. }
  1104. break;
  1105. case PKT_TYPE_SESSION:
  1106. {
  1107. int tsx_id;
  1108. wchar_t *wentity_name;
  1109. wchar_t*wfunction_name;
  1110. wchar_t*wclass_name;
  1111. char *entity_name = NULL;
  1112. char *function_name = NULL;
  1113. char *class_name = NULL;
  1114. int n;
  1115. if (withLinkContex)
  1116. {
  1117. iobuffer_format_read(pkt, "4mmmmmmm", &tsx_id
  1118. , &wentity_name, &wfunction_name, &wclass_name, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1119. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1120. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1121. }
  1122. else {
  1123. iobuffer_format_read(pkt, "4mmm", &tsx_id, &wentity_name, &wfunction_name, &wclass_name);
  1124. }
  1125. n = toolkit_wcs2mbs(wentity_name, NULL, 0);
  1126. WLog_DBG(TAG, "wtm len for wentity_name: %d", n);
  1127. if (wentity_name && n > 0) {
  1128. entity_name = (char*)malloc(n);
  1129. toolkit_wcs2mbs(wentity_name, entity_name, n);
  1130. }
  1131. FREE(wentity_name);
  1132. n = toolkit_wcs2mbs(wfunction_name, NULL, 0);
  1133. WLog_DBG(TAG, "wtm len for wfunction_name: %d", n);
  1134. if (wfunction_name && n > 0) {
  1135. function_name = (char*)malloc(n);
  1136. toolkit_wcs2mbs(wfunction_name, function_name, n);
  1137. }
  1138. FREE(wfunction_name);
  1139. n = toolkit_wcs2mbs(wclass_name, NULL, 0);
  1140. WLog_DBG(TAG, "wtm len for wclass_name: %d", n);
  1141. if (wclass_name && n > 0) {
  1142. class_name = (char*)malloc(n);
  1143. toolkit_wcs2mbs(wclass_name, class_name, n);
  1144. }
  1145. FREE(wclass_name);
  1146. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SESSION tsx_id=%d entity_name=%s function_name=%s class_name=%s",
  1147. tsx_id, entity_name ? entity_name : "<null>", function_name ? function_name : "<null>"
  1148. , class_name ? class_name : "<null>");
  1149. on_session(tbs, conn, tsx_id, entity_name, function_name, class_name);
  1150. if (entity_name)
  1151. free(entity_name);
  1152. if (function_name)
  1153. free(function_name);
  1154. if (class_name)
  1155. free(class_name);
  1156. }
  1157. break;
  1158. case PKT_TYPE_SESSIONEND:
  1159. {
  1160. int conn_id;
  1161. if (withLinkContex)
  1162. {
  1163. iobuffer_format_read(pkt, "4mmmm", &conn_id, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1164. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1165. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1166. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1167. }
  1168. else {
  1169. iobuffer_read(pkt, IOBUF_T_I4, &conn_id, NULL);
  1170. }
  1171. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SESSIONEND conn_id=%d", conn_id);
  1172. on_session_end(tbs, conn, conn_id);
  1173. }
  1174. break;
  1175. case PKT_TYPE_REGISTER_EVENT:
  1176. {
  1177. int register_id;
  1178. wchar_t* wentity_name;
  1179. char* entity_name = NULL;
  1180. wchar_t* wfunction_name;
  1181. char *function_name = NULL;
  1182. int n;
  1183. if (withLinkContex)
  1184. {
  1185. iobuffer_format_read(pkt, "4mmmmmm", &register_id,
  1186. &wentity_name, &wfunction_name, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1187. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1188. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1189. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1190. }
  1191. else {
  1192. iobuffer_format_read(pkt, "4mm", &register_id, &wentity_name, &wfunction_name);
  1193. }
  1194. n = toolkit_wcs2mbs(wentity_name, NULL, 0);
  1195. if (wentity_name && n > 0) {
  1196. entity_name = (char*)malloc(n);
  1197. toolkit_wcs2mbs(wentity_name, entity_name, n);
  1198. }
  1199. FREE(wentity_name);
  1200. n = toolkit_wcs2mbs(wfunction_name, NULL, 0);
  1201. if (wfunction_name && n > 0) {
  1202. function_name = (char*)malloc(n);
  1203. toolkit_wcs2mbs(wfunction_name, function_name, n);
  1204. }
  1205. FREE(wfunction_name);
  1206. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_REGISTER_EVENT register_id=%d, entity_name=%s, function_name=%s",
  1207. register_id, entity_name ? entity_name : "<null>", function_name ? function_name : "<null>");
  1208. on_register_event(tbs, conn, register_id, entity_name, function_name);
  1209. if (entity_name)
  1210. free(entity_name);
  1211. if (function_name)
  1212. free(function_name);
  1213. }
  1214. break;
  1215. case PKT_TYPE_UNREGISTER_EVENT:
  1216. {
  1217. int register_id;
  1218. if (withLinkContex)
  1219. {
  1220. iobuffer_format_read(pkt, "4mmmm", &register_id, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1221. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1222. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1223. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1224. }
  1225. else {
  1226. iobuffer_format_read(pkt, "4", &register_id);
  1227. }
  1228. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_UNREGISTER_EVENT register_id=%d", register_id);
  1229. on_unregister_event(tbs, conn, register_id);
  1230. }
  1231. break;
  1232. case PKT_TYPE_LOG_EVENT:
  1233. {
  1234. int severity_level;
  1235. int user_code;
  1236. wchar_t* wmsg = NULL;
  1237. char *msg = NULL;
  1238. if (withLinkContex)
  1239. {
  1240. iobuffer_format_read(pkt, "44mmmmm", &severity_level, &user_code, &wmsg, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1241. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1242. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1243. } else {
  1244. iobuffer_format_read(pkt, "44m", &severity_level, &user_code, &wmsg);
  1245. }
  1246. if (wmsg) {
  1247. int n = toolkit_wcs2mbs(wmsg, NULL, 0);
  1248. const int rn = n > 0 ? n : 1;
  1249. msg = (char*)malloc(rn);
  1250. memset(msg, '\0', sizeof(char) * rn);
  1251. toolkit_wcs2mbs(wmsg, msg, n);
  1252. FREE(wmsg);
  1253. }
  1254. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_LOG_EVENT severity_level=%d, user_code=0x%08x, msg=%s", severity_level, user_code, msg ? msg : "<null>");
  1255. linkContext t_Context(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1256. on_log_event(tbs, conn, severity_level, user_code, msg, t_Context);
  1257. if (msg) {
  1258. free(msg);
  1259. }
  1260. }
  1261. break;
  1262. case PKT_TYPE_LOG_WARN:
  1263. {
  1264. int severity_level;
  1265. int user_code;
  1266. wchar_t* wmsg = NULL;
  1267. char *msg = NULL;
  1268. if (withLinkContex)
  1269. {
  1270. iobuffer_format_read(pkt, "44mmmmm", &severity_level, &user_code, &wmsg, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1271. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1272. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1273. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1274. } else {
  1275. iobuffer_format_read(pkt, "44m", &severity_level, &user_code, &wmsg);
  1276. }
  1277. if (wmsg) {
  1278. int n = toolkit_wcs2mbs(wmsg, NULL, 0);
  1279. const int rn = n > 0 ? n : 1;
  1280. msg = (char*)malloc(rn);
  1281. memset(msg, '\0', sizeof(char) * rn);
  1282. toolkit_wcs2mbs(wmsg, msg, n);
  1283. FREE(wmsg);
  1284. }
  1285. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_LOG_WARN severity_level=%d, user_code=0x%08x, msg=%s", severity_level, user_code, msg ? msg : "<null>");
  1286. linkContext t_Context(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1287. on_log_warn(tbs, conn, severity_level, user_code, msg, t_Context);
  1288. if (msg)
  1289. free(msg);
  1290. }
  1291. break;
  1292. case PKT_TYPE_SET_VAR_REQ:
  1293. {
  1294. int tsx_id = 0;
  1295. wchar_t* wname = NULL;
  1296. wchar_t* wvalue = NULL;
  1297. char *name = NULL;
  1298. char *value = NULL;
  1299. if (withLinkContex)
  1300. {
  1301. iobuffer_format_read(pkt, "4mmmmmm", &tsx_id, &wname, &wvalue, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1302. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1303. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1304. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1305. } else {
  1306. iobuffer_format_read(pkt, "4mm", &tsx_id, &wname, &wvalue);
  1307. }
  1308. if (wname) {
  1309. int n = toolkit_wcs2mbs(wname, NULL, 0);
  1310. const int rn = n > 0 ? n : 1;
  1311. name = (char*)malloc(rn);
  1312. memset(name, '\0', sizeof(char)* rn);
  1313. toolkit_wcs2mbs(wname, name, n);
  1314. FREE(wname);
  1315. }
  1316. if (wvalue) {
  1317. int n = toolkit_wcs2mbs(wvalue, NULL, 0);
  1318. const int rn = n > 0 ? n : 1;
  1319. value = (char*)malloc(rn);
  1320. memset(value, '\0', sizeof(char)* rn);
  1321. toolkit_wcs2mbs(wvalue, value, n);
  1322. FREE(wvalue);
  1323. }
  1324. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SET_VAR_REQ tsx_id=%d, name=%s, value=%s, %s"
  1325. , tsx_id, name ? name : "<null>", value ? value : "<null>", linkStr);
  1326. on_set_var_req(tbs, conn, tsx_id, name, value);
  1327. if (name)
  1328. free(name);
  1329. if (value)
  1330. free(value);
  1331. }
  1332. break;
  1333. case PKT_TYPE_GET_VAR_REQ:
  1334. {
  1335. int tsx_id = 0;
  1336. wchar_t*wname = NULL;
  1337. char *name = NULL;
  1338. if (withLinkContex)
  1339. {
  1340. iobuffer_format_read(pkt, "4mmmmm", &tsx_id, &wname, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1341. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1342. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1343. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1344. } else {
  1345. iobuffer_format_read(pkt, "4m", &tsx_id, &wname);
  1346. }
  1347. if (wname) {
  1348. int n = toolkit_wcs2mbs(wname, NULL, 0);
  1349. const int rn = n > 0 ? n : 1;
  1350. name = (char*)malloc(rn);
  1351. memset(name, '\0', sizeof(char)* rn);
  1352. toolkit_wcs2mbs(wname, name, n);
  1353. FREE(wname);
  1354. }
  1355. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_GET_VAR_REQ tsx_id=%d, name=%s, %s", tsx_id, name ? name : "<null>", linkStr);
  1356. on_get_var_req(tbs, conn, tsx_id, name);
  1357. if (name)
  1358. free(name);
  1359. }
  1360. break;
  1361. default:
  1362. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("recv unknown pkt type: %d", type);
  1363. //TOOLKIT_ASSERT(0);
  1364. break;
  1365. }
  1366. *p_pkt = NULL;
  1367. if (pkt)
  1368. iobuffer_dec_ref(pkt);
  1369. }
  1370. static void on_read(sp_tbs_t *tbs, sock_connection *conn)
  1371. {
  1372. WLog_DBG(TAG, "Enter %s", __FUNCTION__);
  1373. int t = -404;
  1374. if (conn->rx_state == RECV_STATE_HEADER) {
  1375. do {
  1376. t = recv(conn->fd, &conn->rx_hdr_buf[conn->rx_hdr_buf_len], RX_BUF_SIZE - conn->rx_hdr_buf_len, 0);
  1377. }
  1378. while (t == -1 && errno == EINTR);
  1379. if (t > 0) {
  1380. int offset = 0;
  1381. conn->rx_hdr_buf_len += t;
  1382. while (conn->rx_hdr_buf_len-offset >= HDR_LEN) {
  1383. iobuffer_t *pkt;
  1384. memcpy(&conn->rx_header_len, conn->rx_hdr_buf+offset, HDR_LEN);
  1385. conn->rx_header_len += 4; // fixed hdr not include type!!!
  1386. WLog_DBG(TAG, "rx header len(with 4 sizeof(type)): %d", conn->rx_header_len);
  1387. offset += HDR_LEN;
  1388. pkt = iobuffer_create(-1, conn->rx_header_len);
  1389. if (conn->rx_hdr_buf_len-offset >= conn->rx_header_len) {
  1390. memcpy(iobuffer_data(pkt, -1), conn->rx_hdr_buf+offset, conn->rx_header_len);
  1391. offset += conn->rx_header_len;
  1392. iobuffer_push_count(pkt, conn->rx_header_len);
  1393. process_tcp_pkt(tbs, conn, &pkt);
  1394. if (pkt)
  1395. iobuffer_dec_ref(pkt);
  1396. } else {
  1397. memcpy(iobuffer_data(pkt, -1), conn->rx_hdr_buf+offset, conn->rx_hdr_buf_len-offset);
  1398. iobuffer_push_count(pkt, conn->rx_hdr_buf_len-offset);
  1399. offset = conn->rx_hdr_buf_len;
  1400. TOOLKIT_ASSERT(conn->rx_pkt == NULL);
  1401. conn->rx_pkt = pkt;
  1402. conn->rx_state = RECV_STATE_BODY;
  1403. }
  1404. }
  1405. if (offset == conn->rx_hdr_buf_len) {
  1406. conn->rx_hdr_buf_len = 0;
  1407. } else {
  1408. if (offset > 0) {
  1409. TOOLKIT_ASSERT(conn->rx_state == RECV_STATE_HEADER);
  1410. memmove(&conn->rx_hdr_buf[0], &conn->rx_hdr_buf[offset], conn->rx_hdr_buf_len-offset);
  1411. conn->rx_hdr_buf_len -= offset;
  1412. }
  1413. }
  1414. }
  1415. } else if (conn->rx_state == RECV_STATE_BODY) {
  1416. int offset;
  1417. TOOLKIT_ASSERT(conn->rx_pkt);
  1418. offset = iobuffer_get_length(conn->rx_pkt);
  1419. do {
  1420. t = recv(conn->fd, iobuffer_data(conn->rx_pkt, -1), conn->rx_header_len - offset, 0);
  1421. } while (t == -1 && errno == EINTR);
  1422. if (t > 0) {
  1423. iobuffer_push_count(conn->rx_pkt, t);
  1424. if (offset+t == conn->rx_header_len) {
  1425. process_tcp_pkt(tbs, conn, &conn->rx_pkt);
  1426. if (conn->rx_pkt) {
  1427. iobuffer_dec_ref(conn->rx_pkt);
  1428. conn->rx_pkt = NULL;
  1429. }
  1430. conn->rx_hdr_buf_len = 0;
  1431. conn->rx_state = RECV_STATE_HEADER;
  1432. }
  1433. }
  1434. } else {
  1435. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot go here, bug! file: %s, line: %d", _GetFileName(__FILE__), __LINE__);
  1436. TOOLKIT_ASSERT(0);
  1437. }
  1438. if (t == 0) {
  1439. on_close(tbs, conn);
  1440. } else if (t == -1) {
  1441. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("connection read failed: %s", strerror(errno));
  1442. on_close(tbs, conn);
  1443. }
  1444. WLog_DBG(TAG, "Leave %s", __FUNCTION__);
  1445. }
  1446. static void on_write(sp_tbs_t *tbs, sock_connection *conn)
  1447. {
  1448. if (conn->fd == INVALID_SOCKET)
  1449. return;
  1450. WLog_DBG(TAG, "Enter %s", __FUNCTION__);
  1451. int t = -404;
  1452. while (iobuffer_queue_count(conn->tx_queue) > 0) {
  1453. iobuffer_t *pkt = iobuffer_queue_head(conn->tx_queue);
  1454. int sended = 0;
  1455. int total = iobuffer_get_length(pkt);
  1456. while (sended < total) {
  1457. do {
  1458. t = send(conn->fd, iobuffer_data(pkt, 0), total - sended, 0);
  1459. } while (t < 0 && errno == EINTR);
  1460. if (t > 0) {
  1461. sended += t;
  1462. iobuffer_pop_count(pkt, t);
  1463. } else {
  1464. break;
  1465. }
  1466. }
  1467. if (sended < total) {
  1468. break;
  1469. } else {
  1470. iobuffer_queue_deque(conn->tx_queue);
  1471. iobuffer_dec_ref(pkt);
  1472. }
  1473. }
  1474. WLog_DBG(TAG, "Leave %s", __FUNCTION__);
  1475. }
  1476. static void on_accept(sp_tbs_t *tbs)
  1477. {
  1478. int rc;
  1479. rc = create_connection(tbs);
  1480. if (rc != 0) {
  1481. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("create connection failed! err = %d %s %d", rc, _GetFileName(__FILE__), __LINE__);
  1482. } else {
  1483. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("create connection ok! %s %d", _GetFileName(__FILE__), __LINE__);
  1484. }
  1485. }
  1486. static void post_msg(sp_tbs_t *tbs, int msg, param_size_t param1, param_size_t param2)
  1487. {
  1488. uint64_t wdata = 0;
  1489. int rc;
  1490. cmd_entry *e = MALLOC_T(cmd_entry);
  1491. e->msg = msg;
  1492. e->param1 = param1;
  1493. e->param2 = param2;
  1494. if (0 != evtpoll_subscribe(tbs->ep, EV_READ, tbs->msg_fd, (void*)(long)MSG_EVT_IDX, NULL)) {
  1495. WLog_ERR(TAG, "tbs subscribe msg eventfd failed.");
  1496. free(e);
  1497. return;
  1498. }
  1499. spinlock_enter(&tbs->cmd_list_lock, -1);
  1500. list_add_tail(&e->entry, &tbs->cmd_list);
  1501. spinlock_leave(&tbs->cmd_list_lock);
  1502. wdata = 1;
  1503. do {
  1504. rc = write(tbs->msg_fd, &wdata, sizeof wdata);
  1505. } while (rc < 0 && rc == EINTR);
  1506. if (rc == -1) {
  1507. WLog_ERR(TAG, "write to eventfd failed: %d", errno);
  1508. return;
  1509. }
  1510. }
  1511. static unsigned int __stdcall server_proc(void *arg)
  1512. {
  1513. sp_tbs_t *tbs = (sp_tbs_t*)arg;
  1514. struct sockaddr_in addr = {0};
  1515. int istop = 0;
  1516. int fixed_conn;
  1517. WLog_DBG(TAG, "enter server listern and deal business thread");
  1518. TOOLKIT_ASSERT(tbs->server_fd == INVALID_SOCKET);
  1519. tbs->server_fd = _socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  1520. if (tbs->server_fd == INVALID_SOCKET) {
  1521. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("create list fd socket failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError());
  1522. tbs->ready_evt_result = Error_Resource;
  1523. goto on_error;
  1524. }
  1525. if (make_fd_cloexec(tbs->server_fd, 1) != 0)
  1526. goto on_error;
  1527. {
  1528. long val = 1;
  1529. _setsockopt(tbs->server_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&val, sizeof(val));
  1530. }
  1531. addr.sin_family = AF_INET;
  1532. addr.sin_addr.s_addr = tbs->server_ip;
  1533. addr.sin_port = tbs->server_port;
  1534. if (_bind(tbs->server_fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
  1535. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("bind address failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError());
  1536. tbs->ready_evt_result = Error_AlreadyExist;
  1537. goto on_error;
  1538. }
  1539. WLog_DBG(TAG, "to attach server fd: %d", tbs->server_fd);
  1540. if (evtpoll_attach(tbs->ep, tbs->server_fd)) {
  1541. WLog_ERR(TAG, "%s: server attach evtpoll for %d failed!", __FUNCTION__, tbs->server_fd);
  1542. goto on_error;
  1543. }
  1544. if (0 != evtpoll_subscribe(tbs->ep, EV_ACCEPT, tbs->server_fd, (void*)SRV_EVT_IDX, NULL)) {
  1545. WLog_ERR(TAG, "%s: server attach evtpoll for %d failed!", __FUNCTION__, tbs->server_fd);
  1546. goto on_error;
  1547. }
  1548. if (_listen(tbs->server_fd, 5) != 0) {
  1549. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("winsock::listen failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError());
  1550. tbs->ready_evt_result = Error_AlreadyExist;
  1551. goto on_error;
  1552. }
  1553. tbs->conn_cnt++;
  1554. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("start tbs server ok!");
  1555. tbs->ready_evt_result = 0;
  1556. SetEvent(tbs->ready_evt);
  1557. fixed_conn = tbs->conn_cnt;
  1558. while (!istop || tbs->conn_cnt > fixed_conn) {
  1559. int i;
  1560. const int evt_cnt = tbs->conn_cnt;
  1561. struct epoll_event events[MAX_EPOLL_EVENT];
  1562. const int nfds = evtpoll_wait(tbs->ep, events, MAX_EPOLL_EVENT, MAX_TIMEOUT);
  1563. if (nfds == TOOLKIT_ETIMEDOUT) {
  1564. continue;
  1565. }
  1566. if (nfds < 0) {
  1567. break;
  1568. }
  1569. for (i = 0; i < nfds; ++i) {
  1570. struct epoll_event* pe = events + i;
  1571. int n;
  1572. const int wf = pe->events & EPOLLOUT ? 1 : 0;
  1573. const int rf = pe->events & EPOLLIN ? 1 : 0;
  1574. void* pt = NULL;
  1575. WLog_INFO(TAG, "poll events[%d]::fd(0x%08X) OUT:%d, IN:%d", i, pe->data.fd, wf, rf);
  1576. n = evtpoll_deal(tbs->ep, pe, &pt, 0);
  1577. WLog_DBG(TAG, "evtpoll deal returned: %d, %d", n, (long)(pt));
  1578. if (0 == n) {
  1579. const int idx = (int)(long)(pt);
  1580. if ((idx == SRV_EVT_IDX) && rf) {
  1581. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("FD_ACCEPT trigger!");
  1582. on_accept(tbs);
  1583. }
  1584. if (idx == MSG_EVT_IDX) { // cmd
  1585. uint64_t rdata;
  1586. cmd_entry* e;
  1587. do {
  1588. n = read(tbs->msg_fd, &rdata, sizeof rdata);
  1589. } while (n < 0 && errno == EINTR);
  1590. spinlock_enter(&tbs->cmd_list_lock, -1);
  1591. e = list_first_entry(&tbs->cmd_list, cmd_entry, entry);
  1592. list_del(&e->entry);
  1593. spinlock_leave(&tbs->cmd_list_lock);
  1594. WLog_DBG(TAG, "server receive cmd msg: %d", e->msg);
  1595. switch (e->msg) {
  1596. case MSG_STOP:
  1597. {
  1598. on_stop(tbs); // ...stop
  1599. istop++;
  1600. }
  1601. break;
  1602. case MSG_UAC_ON_CONNECT:
  1603. {
  1604. sub_session* session = (sub_session*)e->param1;
  1605. const int error = e->param2;
  1606. uac_on_connect(tbs, session->conn, session, error);
  1607. }
  1608. break;
  1609. case MSG_UAC_ON_DESTROY:
  1610. {
  1611. sub_session* session = (sub_session*)e->param1;
  1612. uac_on_destroy(tbs, session->conn, session);
  1613. }
  1614. break;
  1615. case MSG_UAC_ON_CLOSE:
  1616. {
  1617. sub_session* session = (sub_session*)e->param1;
  1618. int error = e->param2;
  1619. uac_on_close(tbs, session->conn, session, error);
  1620. }
  1621. break;
  1622. case MSG_UAC_ON_ANS:
  1623. {
  1624. sub_session* session = (sub_session*)e->param1;
  1625. iobuffer_t* ans_pkt = (iobuffer_t*)e->param2;
  1626. int tsx_id;
  1627. int error;
  1628. int end;
  1629. int user_error;
  1630. char* msg_str = NULL;
  1631. iobuffer_format_read(ans_pkt, "4444s", &tsx_id, &error, &user_error, &end, &msg_str);
  1632. uac_on_ans(tbs, session->conn, session, tsx_id, error, user_error, msg_str, end, &ans_pkt);
  1633. if (ans_pkt)
  1634. iobuffer_dec_ref(ans_pkt);
  1635. if (msg_str)
  1636. free(msg_str);
  1637. }
  1638. break;
  1639. case MSG_ON_EVENT:
  1640. {
  1641. reg_entry* entry = (reg_entry*)e->param1;
  1642. iobuffer_t* evt_pkt = (iobuffer_t*)e->param2;
  1643. post_event(tbs, entry, 0, entry->register_id, &evt_pkt);
  1644. if (evt_pkt)
  1645. iobuffer_dec_ref(evt_pkt);
  1646. }
  1647. break;
  1648. default:
  1649. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("unknown msg type! %s %d", _GetFileName(__FILE__), __LINE__);
  1650. TOOLKIT_ASSERT(0);
  1651. break;
  1652. }
  1653. free(e);
  1654. }
  1655. if (idx != SRV_EVT_IDX && idx < evt_cnt) {
  1656. sock_connection* conn = tbs->conn[idx];
  1657. TOOLKIT_ASSERT(conn);
  1658. if (rf) {
  1659. on_read(tbs, conn);
  1660. } else if (wf) {
  1661. on_write(tbs, conn);
  1662. }
  1663. }
  1664. }
  1665. }
  1666. }
  1667. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("tbs server thread exit!");
  1668. on_error:
  1669. if (tbs->server_fd != INVALID_SOCKET) {
  1670. evtpoll_unsubscribe(tbs->ep, EV_ACCEPT, tbs->server_fd, 0, NULL, NULL);
  1671. evtpoll_detach(tbs->ep, tbs->server_fd);
  1672. closesocket(tbs->server_fd);
  1673. tbs->server_fd = INVALID_SOCKET;
  1674. }
  1675. if (tbs->conn_evt[SRV_EVT_IDX]) {
  1676. WSACloseEvent(tbs->conn_evt[SRV_EVT_IDX]);
  1677. tbs->conn_evt[SRV_EVT_IDX] = NULL;
  1678. tbs->conn_cnt--;
  1679. }
  1680. if (tbs->ready_evt_result) {
  1681. SetEvent(tbs->ready_evt);
  1682. }
  1683. return 0;
  1684. }
  1685. int sp_tbs_create(sp_ses_mgr_t *ses_mgr, const char *ipaddr, unsigned short port, sp_tbs_t **p_tbs)
  1686. {
  1687. sp_tbs_t *tbs;
  1688. sp_svc_t *svc = sp_ses_mgr_get_svc(ses_mgr);
  1689. WLog_DBG(TAG, "Enter %s", __FUNCTION__);
  1690. TOOLKIT_ASSERT(ipaddr);
  1691. TOOLKIT_ASSERT(ses_mgr);
  1692. tbs = ZALLOC_T(sp_tbs_t);
  1693. tbs->ep = evtpoll_create();
  1694. if (!tbs->ep) {
  1695. goto on_error_0;
  1696. }
  1697. tbs->msg_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
  1698. if (tbs->msg_fd == -1) {
  1699. WLog_ERR(TAG, "create event fd failed: %d", errno);
  1700. goto on_error_1;
  1701. }
  1702. tbs->conn_evt[0] = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL);
  1703. tbs->ready_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  1704. sp_log_client_create(svc, sp_svc_get_iom(svc), &tbs->log_client);
  1705. tbs->conn_cnt = 1;
  1706. tbs->server_fd = INVALID_SOCKET;
  1707. tbs->server_ip = inet_addr(ipaddr);
  1708. tbs->server_port = htons(port);
  1709. tbs->sub_session_seq = (unsigned int)GetTickCount();
  1710. INIT_LIST_HEAD(&tbs->cmd_list);
  1711. spinlock_init(&tbs->cmd_list_lock);
  1712. tbs->ses_mgr = ses_mgr;
  1713. sp_var_client_create(svc, &tbs->var_client);
  1714. WLog_DBG(TAG, "to attach msg fd: %d", tbs->msg_fd);
  1715. if (evtpoll_attach(tbs->ep, tbs->msg_fd) != 0) {
  1716. WLog_ERR(TAG, "%s: attach evtpoll for %d failed!", __FUNCTION__, tbs->msg_fd);
  1717. goto on_error_2;
  1718. }
  1719. *p_tbs = tbs;
  1720. WLog_DBG(TAG, "Leave %s succ!", __FUNCTION__);
  1721. return 0;
  1722. on_error_2:
  1723. close(tbs->msg_fd);
  1724. on_error_1:
  1725. evtpoll_destroy(tbs->ep);
  1726. on_error_0:
  1727. free(tbs);
  1728. WLog_DBG(TAG, "Leave %s failed!", __FUNCTION__);
  1729. return -1;
  1730. }
  1731. int sp_tbs_start(sp_tbs_t *tbs)
  1732. {
  1733. if (tbs->work_thread) {
  1734. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("may be already started! %s %d", _GetFileName(__FILE__), __LINE__);
  1735. return Error_Duplication;
  1736. }
  1737. tbs->ready_evt_result = 0;
  1738. ResetEvent(tbs->ready_evt);
  1739. tbs->work_thread = (HANDLE)_beginthreadex(NULL, 0, &server_proc, tbs, 0, (unsigned*)&tbs->work_thread_id);
  1740. if (!tbs->work_thread) {
  1741. return Error_Resource;
  1742. }
  1743. WaitForSingleObject(tbs->ready_evt, INFINITE);
  1744. if (tbs->ready_evt_result) {
  1745. CloseHandle(tbs->work_thread);
  1746. tbs->work_thread = NULL;
  1747. }
  1748. WLog_DBG(TAG, "start tbs finished! %d", tbs->ready_evt_result);
  1749. return tbs->ready_evt_result;
  1750. }
  1751. void sp_tbs_stop(sp_tbs_t *tbs)
  1752. {
  1753. TOOLKIT_ASSERT(tbs->work_thread);
  1754. post_msg(tbs, MSG_STOP, 0, 0);
  1755. WaitForSingleObject(tbs->work_thread, INFINITE);
  1756. CloseHandle(tbs->work_thread);
  1757. tbs->work_thread = NULL;
  1758. }
  1759. void sp_tbs_destroy(sp_tbs_t *tbs)
  1760. {
  1761. TOOLKIT_ASSERT(tbs->conn_cnt == 1);
  1762. TOOLKIT_ASSERT(tbs->server_fd == INVALID_SOCKET);
  1763. CloseHandle(tbs->conn_evt[0]);
  1764. CloseHandle(tbs->ready_evt);
  1765. sp_log_client_destroy(tbs->log_client);
  1766. sp_var_client_destroy(tbs->var_client);
  1767. evtpoll_detach(tbs->ep, tbs->msg_fd);
  1768. close(tbs->msg_fd);
  1769. evtpoll_destroy(tbs->ep);
  1770. free(tbs);
  1771. }