sp_tbs_unix.cpp 63 KB


  1. #include "precompile.h"
  2. #include <sys/eventfd.h>
  3. #include <sys/stat.h>
  4. #include "sp_svc.h"
  5. #include "sp_def.h"
  6. #include "sp_ses.h"
  7. #include "sp_tbs.h"
  8. #include "sp_env.h"
  9. #include "sp_bcm.h"
  10. #include "sp_log.h"
  11. #include "sp_uid.h"
  12. #include "sp_var.h"
  13. #include "SpBase.h"
  14. #include "toolkit.h"
  15. #include "list.h"
  16. #include "hash.h"
  17. #include "array.h"
  18. #include "refcnt.h"
  19. #include "spinlock.h"
  20. #include "strutil.h"
  21. #include "fileutil.h"
  22. #include "memutil.h"
  23. #include "toolkit.h"
  24. #include "uuid4.h"
  25. #include "dbgutil.h"
  26. #include <winpr/winsock.h>
  27. #include <winpr/string.h>
  28. #include <winpr/sysinfo.h>
  29. #include "unix/evtpoll.h"
  30. #include "unix/core.h"
  31. #define TAG SPBASE_TAG("tbs")
  32. #define SESSION_BUCKET_SIZE 511
  33. #define REGISTER_BUCKET_SIZE 127
  34. #define RX_BUF_SIZE 1024
  35. #define MSG_EVT_IDX (MAXIMUM_WAIT_OBJECTS + 1)
  36. #define SRV_EVT_IDX 1
  37. #define MAX_TIMEOUT 30000
  38. #define HDR_LEN 4
  39. #define CONN_INVALID_ID -1
  40. #define MSG_STOP 0
  41. #define MSG_UAC_ON_CONNECT 1
  42. #define MSG_UAC_ON_CLOSE 2
  43. #define MSG_UAC_ON_DESTROY 3
  44. #define MSG_UAC_ON_ANS 4
  45. #define MSG_ON_EVENT 5
  46. #define RECV_STATE_HEADER 0 /* already recv header length */
  47. #define RECV_STATE_BODY 1 /* recv body */
  48. #define PKT_TYPE_INFO 0
  49. #define PKT_TYPE_SESSION 1
  50. #define PKT_TYPE_SESSIONEND 2
  51. #define PKT_TYPE_REQ 3
  52. #define PKT_TYPE_REQACK 4
  53. #define PKT_TYPE_SESSIONACK 5
  54. #define PKT_TYPE_REGISTER_EVENT 6
  55. #define PKT_TYPE_UNREGISTER_EVENT 7
  56. #define PKT_TYPE_EVENT_DATA 8
  57. #define PKT_TYPE_LOG_EVENT 9
  58. #define PKT_TYPE_LOG_WARN 10
  59. // add sys var get & set type by xkm@20150526
  60. #define PKT_TYPE_SET_VAR_REQ 11 // len + type + tsx_id + varname + varvalue
  61. #define PKT_TYPE_SET_VAR_ACK 12 // len + type + conn_id + tsx_id + errcode + errmsg
  62. #define PKT_TYPE_GET_VAR_REQ 13 // len + type + tsx_id + varname
  63. #define PKT_TYPE_GET_VAR_ACK 14 // len + type + conn_id + tsx_id + errcode + varvalue/errmsg
  64. //控制信息 0xFF FF 00 00
  65. #define PKT_TYPE_CONTROL_LINKCONTEXT (1 << 31)
  66. class link_context {
  67. public:
  68. char* businessId; //32
  69. char* traceId; //32
  70. char* spanId; //16
  71. char* parentSpanId; //16
  72. wchar_t* wbusinessId; //32
  73. wchar_t* wtraceId; //32
  74. wchar_t* wspanId; //16
  75. wchar_t* wparentSpanId; //16
  76. //重复代码,需优化
  77. public:
  78. link_context() {
  79. businessId = traceId = spanId = parentSpanId = nullptr;
  80. wbusinessId = wtraceId = wspanId = wparentSpanId = nullptr;
  81. }
  82. ~link_context() {
  83. if (businessId)
  84. free(businessId);
  85. if (traceId)
  86. free(traceId);
  87. if (spanId)
  88. free(spanId);
  89. if (parentSpanId)
  90. free(parentSpanId);
  91. if (wbusinessId)
  92. free(wbusinessId);
  93. if (wtraceId)
  94. free(wtraceId);
  95. if (wspanId)
  96. free(wspanId);
  97. if (wparentSpanId)
  98. free(wparentSpanId);
  99. }
  100. void InitbusinessId(const std::string t_businessId)
  101. {
  102. if (nullptr == businessId) {
  103. businessId = (char*)malloc(t_businessId.length() + 1);
  104. ZeroMemory(businessId, t_businessId.length() + 1);
  105. strcpy_s(businessId, t_businessId.length() + 1, t_businessId.c_str());
  106. }
  107. }
  108. void InittraceId(const std::string t_traceId)
  109. {
  110. if (nullptr == traceId) {
  111. traceId = (char*)malloc(t_traceId.length() + 1);
  112. ZeroMemory(traceId, t_traceId.length() + 1);
  113. strcpy_s(traceId, t_traceId.length() + 1, t_traceId.c_str());
  114. }
  115. }
  116. void InitspanId(const std::string t_spanId)
  117. {
  118. if (nullptr == spanId) {
  119. spanId = (char*)malloc(t_spanId.length() + 1);
  120. ZeroMemory(spanId, t_spanId.length() + 1);
  121. strcpy_s(spanId, t_spanId.length() + 1, t_spanId.c_str());
  122. }
  123. }
  124. void InitparentSpanId(const std::string t_parentSpanId)
  125. {
  126. if (nullptr == parentSpanId) {
  127. parentSpanId = (char*)malloc(t_parentSpanId.length() + 1);
  128. ZeroMemory(parentSpanId, t_parentSpanId.length() + 1);
  129. strcpy_s(parentSpanId, t_parentSpanId.length() + 1, t_parentSpanId.c_str());
  130. }
  131. }
  132. const char* businessIdWtoA()
  133. {
  134. if (businessId) {
  135. return businessId;
  136. } else if (wbusinessId) {
  137. int n = toolkit_wcs2mbs(wbusinessId, NULL, 0);
  138. if (n > 0) {
  139. businessId = (char*)malloc(n);
  140. n = toolkit_wcs2mbs(wbusinessId, businessId, n);
  141. if (n <= 0) {
  142. free(businessId);
  143. businessId = NULL;
  144. } else {
  145. return businessId;
  146. }
  147. }
  148. }
  149. return "None";
  150. }
  151. const char* traceIdWtoA()
  152. {
  153. if (traceId) {
  154. return traceId;
  155. }
  156. else if (wtraceId)
  157. {
  158. int n = toolkit_wcs2mbs(wtraceId, NULL, 0);
  159. if (n > 0) {
  160. traceId = (char*)malloc(n);
  161. n = toolkit_wcs2mbs(wtraceId, traceId, n);
  162. if (n <= 0) {
  163. free(traceId);
  164. traceId = NULL;
  165. } else {
  166. return traceId;
  167. }
  168. }
  169. }
  170. return "None";
  171. }
  172. const char* spanIdWtoA()
  173. {
  174. if (spanId) {
  175. return spanId;
  176. } else if (wspanId)
  177. {
  178. int n = toolkit_wcs2mbs(wspanId, NULL, 0);
  179. if (n > 0) {
  180. spanId = (char*)malloc(n);
  181. n = toolkit_wcs2mbs(wspanId, spanId, n);
  182. if (n <= 0) {
  183. free(spanId);
  184. spanId = NULL;
  185. } else {
  186. return spanId;
  187. }
  188. }
  189. }
  190. return "None";
  191. }
  192. const char* parentSpanIdWtoA()
  193. {
  194. if (parentSpanId) {
  195. return parentSpanId;
  196. }
  197. else if (wparentSpanId)
  198. {
  199. int n = toolkit_wcs2mbs(wparentSpanId, NULL, 0);
  200. if (n > 0) {
  201. parentSpanId = (char*)malloc(n);
  202. n = toolkit_wcs2mbs(wparentSpanId, parentSpanId, n);
  203. if (n <= 0) {
  204. free(parentSpanId);
  205. parentSpanId = NULL;
  206. } else {
  207. return parentSpanId;
  208. }
  209. }
  210. }
  211. return "None";
  212. }
  213. };
  214. typedef struct cmd_entry cmd_entry;
  215. typedef struct sub_session sub_session;
  216. typedef struct reg_entry reg_entry;
  217. typedef struct sock_connection sock_connection;
  218. struct cmd_entry {
  219. struct list_head entry;
  220. int msg;
  221. param_size_t param1;
  222. param_size_t param2;
  223. };
  224. struct sub_session {
  225. struct hlist_node hentry;
  226. unsigned int id;
  227. unsigned int tsx_id;
  228. sp_ses_uac_t *uac;
  229. sock_connection *conn;
  230. };
  231. struct reg_entry {
  232. struct hlist_node hentry;
  233. unsigned int register_id;
  234. unsigned int entity_id;
  235. sp_uid_t uid;
  236. sp_bcm_listener_t *listener;
  237. sock_connection *conn;
  238. };
  239. struct sock_connection {
  240. SOCKET fd;
  241. int idx;
  242. int session_cnt;
  243. struct hlist_head session_bucket[SESSION_BUCKET_SIZE];
  244. int register_cnt;
  245. struct hlist_head register_bucket[REGISTER_BUCKET_SIZE];
  246. iobuffer_queue_t *tx_queue;
  247. iobuffer_t *rx_pkt;
  248. char rx_hdr_buf[RX_BUF_SIZE];
  249. int rx_hdr_buf_len;
  250. int rx_state;
  251. int rx_header_len;
  252. sp_tbs_t *tbs;
  253. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  254. };
  255. DECLARE_REF_COUNT_STATIC(sock_connection, sock_connection)
  256. struct sp_tbs_t
  257. {
  258. SOCKET server_fd;
  259. unsigned long server_ip;
  260. unsigned short server_port;
  261. HANDLE conn_evt[MAXIMUM_WAIT_OBJECTS];
  262. HANDLE ready_evt;
  263. int ready_evt_result;
  264. sock_connection *conn[MAXIMUM_WAIT_OBJECTS];
  265. int conn_cnt;
  266. unsigned int sub_session_seq;
  267. struct list_head cmd_list;
  268. spinlock_t cmd_list_lock;
  269. sp_log_client_t *log_client;
  270. HANDLE work_thread;
  271. DWORD work_thread_id;
  272. sp_ses_mgr_t *ses_mgr;
  273. sp_var_client_t *var_client;
  274. evtpoll_t* ep;
  275. int msg_fd;
  276. };
  277. static void on_write(sp_tbs_t *tbs, sock_connection *conn);
  278. static void on_accept(sp_tbs_t *tbs);
  279. static void on_read(sp_tbs_t *tbs, sock_connection *conn);
  280. static void on_close(sp_tbs_t *tbs, sock_connection *conn);
  281. static void on_stop(sp_tbs_t *tbs);
  282. static void on_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id);
  283. static void on_info(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt);
  284. static void on_req(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt);
  285. static void on_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name);
  286. static void on_register_event(sp_tbs_t *tbs, sock_connection *conn, int register_id, char *entity_name, char *function_name);
  287. static void on_unregister_event(sp_tbs_t *tbs, sock_connection *conn, int register_id);
  288. static void post_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id);
  289. static void post_req_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int end, int err, const wchar_t *err_msg, iobuffer_t **ack_pkt);
  290. static void post_req_ack_ex(sp_tbs_t* tbs, sock_connection* conn, int conn_id, int tsx_id, int end, int err, int uc, const wchar_t* err_msg, iobuffer_t** ack_pkt);
  291. static void post_session_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int err, const wchar_t*err_msg);
  292. static void post_event(sp_tbs_t *tbs, reg_entry *entry, int conn_id, int tsx_id, iobuffer_t **evt_pkt);
  293. static void uac_on_connect(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error);
  294. static void uac_on_close(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error);
  295. static void uac_on_ans(sp_tbs_t* tbs, sock_connection* conn, sub_session* session, int tsx_id, int error, int user_error, const char* msg, int end, iobuffer_t** ans_pkt);
  296. static void uac_on_destroy(sp_tbs_t *tbs, sock_connection *conn, sub_session *session);
  297. static void post_msg(sp_tbs_t *tbs, int msg, param_size_t param1, param_size_t param2);
  298. static void destroy_sub_session(sp_tbs_t *tbs, sock_connection *conn, sub_session *session);
  299. postLink g_postlink = NULL;
  300. void sp_tbs_setpostlink(postLink cur)
  301. {
  302. g_postlink = cur;
  303. }
  304. static const wchar_t *get_err_msg(int rc)
  305. {
  306. static wchar_t wmsg[512];
  307. char msg[512] = { '\0' };
  308. memset(msg, 0, sizeof(msg));
  309. sprintf_s(msg, 512, sp_strerror(rc));
  310. toolkit_mbs2wcs(msg, wmsg, 512);
  311. return wmsg;
  312. }
  313. static const wchar_t *get_user_err_msg(int rc)
  314. {
  315. static wchar_t wmsg[512];
  316. char msg[512] = { '\0' };
  317. if (rc == 0)
  318. return NULL;
  319. memset(msg, 0, sizeof(msg));
  320. sprintf_s(msg, 512, "UserError=%d", rc);
  321. toolkit_mbs2wcs(msg, wmsg, 512);
  322. return wmsg;
  323. }
  324. /*
  325. static const char *_GetFileName(const char *pszFilePath)
  326. {
  327. int i=strlen(pszFilePath);
  328. for( ; i>0 && pszFilePath[i-1]!='\\'; i--)NULL;
  329. return pszFilePath+i;
  330. }
  331. */
  332. static int create_connection(sp_tbs_t *tbs)
  333. {
  334. sock_connection *conn;
  335. SOCKET new_client;
  336. int i;
  337. BOOL opt;
  338. struct sockaddr_in from_addr;
  339. int from_len = sizeof(from_addr);
  340. new_client = _accept(tbs->server_fd, (struct sockaddr*)&from_addr, &from_len);
  341. if (new_client == INVALID_SOCKET) {
  342. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("accept connection failed! %s %d", _GetFileName(__FILE__), __LINE__);
  343. return Error_NetBroken;
  344. }
  345. if (make_fd_cloexec(new_client, 1) != 0) {
  346. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("make client cloexec failed");
  347. closesocket(new_client);
  348. return Error_Unexpect;
  349. }
  350. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("accept from %s:%d", inet_ntoa(from_addr.sin_addr), ntohs(from_addr.sin_port));
  351. if (tbs->conn_cnt >= MAXIMUM_WAIT_OBJECTS) {
  352. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("exceed maximum wait objects, connection discard! %s %d", _GetFileName(__FILE__), __LINE__);
  353. closesocket(new_client);
  354. return Error_Resource;
  355. }
  356. conn = MALLOC_T(sock_connection);
  357. conn->idx = tbs->conn_cnt++;
  358. conn->fd = new_client;
  359. conn->rx_hdr_buf_len = 0;
  360. conn->tx_queue = iobuffer_queue_create();
  361. conn->rx_pkt = NULL;
  362. conn->session_cnt = 0;
  363. conn->register_cnt = 0;
  364. conn->tbs = tbs;
  365. conn->rx_state = RECV_STATE_HEADER;
  366. conn->rx_header_len = 0;
  367. for (i = 0; i < array_size(conn->session_bucket); ++i) {
  368. INIT_HLIST_HEAD(&conn->session_bucket[i]);
  369. }
  370. for (i = 0; i < array_size(conn->register_bucket); ++i) {
  371. INIT_HLIST_HEAD(&conn->register_bucket[i]);
  372. }
  373. REF_COUNT_INIT(&conn->ref_cnt);
  374. tbs->conn[conn->idx] = conn;
  375. tbs->conn_evt[conn->idx] = WSACreateEvent();
  376. opt = TRUE;
  377. setsockopt(new_client, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
  378. opt = TRUE;
  379. setsockopt(new_client, SOL_SOCKET, SO_DONTLINGER, (char*)&opt, sizeof(opt));
  380. //WSAEventSelect(new_client, tbs->conn_evt[conn->idx], FD_WRITE | FD_READ | FD_CLOSE);
  381. if (evtpoll_attach(tbs->ep, new_client)) {
  382. WLog_ERR(TAG, "%s: attach evtpoll for %d failed!", __FUNCTION__, new_client);
  383. goto on_error;
  384. }
  385. if (0 != evtpoll_subscribe(tbs->ep, EV_READ, new_client, (void*)(conn->idx), NULL)) {
  386. goto on_error_1;
  387. }
  388. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("sock connection created!");
  389. return 0;
  390. on_error_1:
  391. evtpoll_detach(tbs->ep, new_client);
  392. on_error:
  393. if (new_client != INVALID_SOCKET)
  394. closesocket(new_client);
  395. FREE(conn);
  396. return Error_Unexpect;
  397. }
  398. static void __destroy_connection(sock_connection *conn)
  399. {
  400. sp_tbs_t *tbs = conn->tbs;
  401. if (conn->fd != INVALID_SOCKET) {
  402. WSAEventSelect(conn->fd, tbs->conn_evt[conn->idx], 0);
  403. evtpoll_detach(tbs->ep, conn->fd);
  404. closesocket(conn->fd);
  405. conn->fd = INVALID_SOCKET;
  406. }
  407. WSACloseEvent(tbs->conn_evt[conn->idx]);
  408. tbs->conn_evt[conn->idx] = NULL;
  409. tbs->conn[conn->idx] = NULL;
  410. if (conn->idx != tbs->conn_cnt-1) {
  411. tbs->conn_evt[conn->idx] = tbs->conn_evt[tbs->conn_cnt-1];
  412. tbs->conn[conn->idx] = tbs->conn[tbs->conn_cnt-1];
  413. tbs->conn[conn->idx]->idx = conn->idx;
  414. }
  415. tbs->conn_cnt--;
  416. iobuffer_queue_destroy(conn->tx_queue);
  417. conn->tx_queue = NULL;
  418. if (conn->rx_pkt) {
  419. iobuffer_dec_ref(conn->rx_pkt);
  420. conn->rx_pkt = NULL;
  421. }
  422. free(conn);
  423. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("sock connection destroy!");
  424. }
  425. IMPLEMENT_REF_COUNT_STATIC(sock_connection, sock_connection, ref_cnt, __destroy_connection)
  426. static __inline int is_worker_thread(sp_tbs_t *tbs)
  427. {
  428. return GetCurrentThreadId() == tbs->work_thread_id;
  429. }
  430. static sub_session *find_sub_session(sock_connection *conn, int conn_id)
  431. {
  432. sub_session *tpos;
  433. struct hlist_node *pos;
  434. int slot = ((unsigned int )conn_id) % SESSION_BUCKET_SIZE; // bugfix, assure slot positive
  435. hlist_for_each_entry(tpos, pos, &conn->session_bucket[slot], sub_session, hentry) {
  436. if (tpos->id == conn_id)
  437. return tpos;
  438. }
  439. return NULL;
  440. }
  441. static void __uac_on_connect(sp_ses_uac_t *uac, int error, void *user_data)
  442. {
  443. sub_session *session = (sub_session*)user_data;
  444. if (is_worker_thread(session->conn->tbs)) {
  445. uac_on_connect(session->conn->tbs, session->conn, session, error);
  446. } else {
  447. post_msg(session->conn->tbs, MSG_UAC_ON_CONNECT, (param_size_t)session, error);
  448. }
  449. }
  450. static void __uac_on_close(sp_ses_uac_t *uac, int error, void *user_data)
  451. {
  452. sub_session *session = (sub_session*)user_data;
  453. if (is_worker_thread(session->conn->tbs)) {
  454. uac_on_close(session->conn->tbs, session->conn, session, error);
  455. } else {
  456. post_msg(session->conn->tbs, MSG_UAC_ON_CLOSE, (param_size_t)session, error);
  457. }
  458. }
  459. static void __uac_on_destroy(sp_ses_uac_t *uac, void *user_data)
  460. {
  461. sub_session *session = (sub_session*)user_data;
  462. TOOLKIT_ASSERT(session->uac == NULL);
  463. if (is_worker_thread(session->conn->tbs)) {
  464. uac_on_destroy(session->conn->tbs, session->conn, session);
  465. } else {
  466. post_msg(session->conn->tbs, MSG_UAC_ON_DESTROY, (param_size_t)session, 0);
  467. }
  468. }
  469. static void __uac_on_ans(sp_tsx_uac_t *tsx, int error, int end, iobuffer_t **ans_pkt, void *user_data)
  470. {
  471. sub_session *session = (sub_session*)user_data;
  472. iobuffer_t *pkt = NULL;
  473. int tsx_id;
  474. int user_error = 0;
  475. int slen = 0;
  476. char* msg_str = NULL;
  477. if (error)
  478. {
  479. if (ans_pkt)
  480. {
  481. pkt = *ans_pkt;
  482. iobuffer_reset(pkt);
  483. *ans_pkt = NULL;
  484. }
  485. else
  486. {
  487. pkt = iobuffer_create(-1, -1);
  488. }
  489. }
  490. else
  491. {
  492. TOOLKIT_ASSERT(ans_pkt);
  493. pkt = *ans_pkt;
  494. *ans_pkt = NULL;
  495. iobuffer_read(pkt, IOBUF_T_I4, &error, NULL);
  496. iobuffer_read(pkt, IOBUF_T_I4, &user_error, NULL);
  497. if (error)
  498. {
  499. iobuffer_read(pkt, IOBUF_T_STR, NULL, &slen);
  500. if (slen) {
  501. msg_str = (char*)malloc(slen + 1);
  502. iobuffer_read(pkt, IOBUF_T_WSTR, msg_str, NULL);
  503. }
  504. iobuffer_reset(pkt);
  505. }
  506. }
  507. tsx_id = sp_tsx_uac_get_id(tsx);
  508. if (is_worker_thread(session->conn->tbs))
  509. {
  510. uac_on_ans(session->conn->tbs, session->conn, session, tsx_id, error, user_error, msg_str, end, &pkt);
  511. if (pkt)
  512. iobuffer_dec_ref(pkt);
  513. }
  514. else
  515. {
  516. iobuffer_format_write_head(pkt, "s4444", msg_str, &end, &user_error, &error, &tsx_id);
  517. post_msg(session->conn->tbs, MSG_UAC_ON_ANS, (param_size_t)session, (param_size_t)pkt);
  518. }
  519. if (error || end)
  520. {
  521. sp_tsx_uac_close(tsx);
  522. sp_tsx_uac_destroy(tsx);
  523. }
  524. }
  525. static void uac_on_connect(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error)
  526. {
  527. WLog_DBG(TAG, "Enter %s with error: %d", __FUNCTION__, error);
  528. if (!error) {
  529. post_session_ack(tbs, conn, session->id, session->tsx_id, error, NULL);
  530. } else {
  531. sp_ses_uac_t *uac = session->uac;
  532. post_session_ack(tbs, conn, CONN_INVALID_ID, session->tsx_id, error, get_err_msg(error));
  533. session->uac = NULL;
  534. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("sub session created failed %d! ses_id = %d, tsx_id = %d", error, session->id, session->tsx_id);
  535. if (uac) {
  536. sp_ses_uac_close(uac);
  537. sp_ses_uac_destroy(uac);
  538. }
  539. }
  540. WLog_DBG(TAG, "Leave %s", __FUNCTION__);
  541. }
  542. static void uac_on_close(sp_tbs_t *tbs, sock_connection *conn, sub_session *session, int error)
  543. {
  544. if (session->uac) {
  545. sp_ses_uac_t *uac = session->uac;
  546. post_session_end(tbs, conn, session->id);
  547. session->uac = NULL;
  548. if (error != Error_Closed)
  549. sp_ses_uac_close(uac);
  550. sp_ses_uac_destroy(uac);
  551. }
  552. }
  553. static void uac_on_ans(sp_tbs_t* tbs, sock_connection* conn, sub_session* session, int tsx_id, int error, int user_error, const char* msg, int end, iobuffer_t** ans_pkt)
  554. {
  555. if (msg == NULL || strlen(msg) == 0)
  556. post_req_ack_ex(tbs, conn, session->id, tsx_id, end, error, user_error, get_user_err_msg(user_error), ans_pkt);
  557. else
  558. post_req_ack_ex(tbs, conn, session->id, tsx_id, end, error, user_error, CSimpleStringA2W(msg).GetData(), ans_pkt);
  559. }
  560. static void uac_on_destroy(sp_tbs_t *tbs, sock_connection *conn, sub_session *session)
  561. {
  562. destroy_sub_session(tbs, conn, session);
  563. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("destroy sub session(id:%d) uac %s %d", session->id, _GetFileName(__FILE__), __LINE__);
  564. }
  565. static int create_sub_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name, sub_session **p_session)
  566. {
  567. sp_env_t *env = sp_get_env();
  568. sp_entity_t *ent = sp_mod_mgr_find_entity_by_name(env->mod_mgr, entity_name);
  569. if (ent) {
  570. int rc;
  571. sub_session *session = MALLOC_T(sub_session);
  572. sp_ses_uac_callback cb;
  573. iobuffer_t *pkt = NULL;
  574. char *param = NULL;
  575. session->id = tbs->sub_session_seq++;
  576. session->conn = conn;
  577. session->tsx_id = tsx_id;
  578. hlist_add_head(&session->hentry, &conn->session_bucket[session->id % SESSION_BUCKET_SIZE]);
  579. conn->session_cnt++;
  580. sock_connection_inc_ref(conn);
  581. cb.user_data = session;
  582. cb.on_close = &__uac_on_close;
  583. cb.on_destroy = &__uac_on_destroy;
  584. cb.on_connect = &__uac_on_connect;
  585. rc = sp_ses_uac_create(tbs->ses_mgr, ent->mod->cfg->idx, ent->cfg->idx, &cb, &session->uac);
  586. if (rc != 0) {
  587. goto on_error;
  588. }
  589. pkt = iobuffer_create(-1, -1);
  590. if (function_name == NULL || strlen(function_name) == 0) {
  591. param = class_name;
  592. } else {
  593. param = strdup_printf("%s::%s", class_name, function_name);
  594. }
  595. iobuffer_write(pkt, IOBUF_T_STR, param, -1);
  596. if (param != class_name) {
  597. FREE(param);
  598. }
  599. rc = sp_ses_uac_async_connect(session->uac, 10000, &pkt);
  600. if (pkt)
  601. iobuffer_dec_ref(pkt);
  602. if (rc != 0) {
  603. goto on_error;
  604. }
  605. on_error:
  606. if (rc != 0) {
  607. conn->session_cnt--;
  608. hlist_del(&session->hentry);
  609. sock_connection_dec_ref(conn);
  610. free(session);
  611. } else {
  612. *p_session = session;
  613. }
  614. return rc;
  615. } else {
  616. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("cannot find entity from mod mgr by name: %s", entity_name);
  617. return Error_NotExist;
  618. }
  619. }
  620. static void destroy_sub_session(sp_tbs_t *tbs, sock_connection *conn, sub_session *session)
  621. {
  622. conn->session_cnt--;
  623. hlist_del(&session->hentry);
  624. if (session->uac) {
  625. sp_ses_uac_t *uac = session->uac;
  626. session->uac = NULL;
  627. sp_ses_uac_destroy(uac);
  628. }
  629. sock_connection_dec_ref(conn);
  630. free(session);
  631. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("destroy sub session(id:%d) %s %d", session->id, _GetFileName(__FILE__), __LINE__);
  632. }
  633. static void post_session_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int err, const wchar_t*err_msg)
  634. {
  635. if (conn->fd != INVALID_SOCKET) {
  636. int len;
  637. iobuffer_t *pkt = iobuffer_create(-1, -1);
  638. int type = PKT_TYPE_SESSIONACK;
  639. #if defined(_MSC_VER)
  640. iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err_msg);
  641. #else
  642. iobuffer_format_write(pkt, "4444m", &type, &conn_id, &tsx_id, &err, err_msg);
  643. #endif //_MSC_VER
  644. len = iobuffer_get_length(pkt)-4;
  645. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  646. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  647. on_write(tbs, conn);
  648. if (err != 0) {
  649. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("send PKT_TYPE_SESSIONACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  650. }
  651. }
  652. }
  653. static void post_req_ack_ex(sp_tbs_t* tbs, sock_connection* conn, int conn_id, int tsx_id, int end, int err, int uc, const wchar_t* err_msg, iobuffer_t** ack_pkt)
  654. {
  655. if (conn->fd != INVALID_SOCKET) {
  656. iobuffer_t* pkt = NULL;
  657. int len;
  658. char bend = !!end;
  659. int type = PKT_TYPE_REQACK;
  660. if (ack_pkt) {
  661. pkt = *ack_pkt;
  662. *ack_pkt = NULL;
  663. }
  664. if (!pkt) {
  665. pkt = iobuffer_create(-1, -1);
  666. }
  667. #if defined(_MSC_VER)
  668. iobuffer_format_write_head(pkt, "w441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type);
  669. #else
  670. iobuffer_format_write_head(pkt, "m441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type);
  671. #endif //_MSC_VER
  672. len = iobuffer_get_length(pkt) - 4;
  673. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  674. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  675. on_write(tbs, conn);
  676. if (err != 0) {
  677. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_REQACK at EX tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  678. }
  679. }
  680. }
  681. static void post_req_ack(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int end, int err, const wchar_t *err_msg, iobuffer_t **ack_pkt)
  682. {
  683. if (conn->fd != INVALID_SOCKET) {
  684. iobuffer_t *pkt = NULL;
  685. int len;
  686. char bend = !!end;
  687. int type = PKT_TYPE_REQACK;
  688. int uc = 0;
  689. if (ack_pkt) {
  690. pkt = *ack_pkt;
  691. *ack_pkt = NULL;
  692. }
  693. if (!pkt) {
  694. pkt = iobuffer_create(-1, -1);
  695. }
  696. #if defined(_MSC_VER)
  697. iobuffer_format_write_head(pkt, "w441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type);
  698. #else
  699. iobuffer_format_write_head(pkt, "m441444", err_msg, &err, &uc, &bend, &tsx_id, &conn_id, &type);
  700. #endif //_MSC_VER
  701. len = iobuffer_get_length(pkt)-4;
  702. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  703. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  704. on_write(tbs, conn);
  705. if (err != 0) {
  706. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_REQACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  707. }
  708. }
  709. }
  710. static void post_set_var_ack(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, int err, const wchar_t *err_msg)
  711. {
  712. if (conn->fd != INVALID_SOCKET) {
  713. int len;
  714. int type = PKT_TYPE_SET_VAR_ACK;
  715. iobuffer_t *pkt = iobuffer_create(-1, -1);
  716. int conn_id = CONN_INVALID_ID;
  717. #if defined(_MSC_VER)
  718. iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err_msg);
  719. #else
  720. iobuffer_format_write(pkt, "4444m", &type, &conn_id, &tsx_id, &err, err_msg);
  721. #endif //_MSC_VER
  722. len = iobuffer_get_length(pkt) - 4;
  723. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  724. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  725. on_write(tbs, conn);
  726. if (err != 0) {
  727. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_SET_VAR_ACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  728. }
  729. }
  730. }
  731. static void post_get_var_ack(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, int err, const wchar_t *err_msg, const wchar_t*value)
  732. {
  733. if (conn->fd != INVALID_SOCKET) {
  734. int len;
  735. int type = PKT_TYPE_GET_VAR_ACK;
  736. iobuffer_t *pkt = iobuffer_create(-1, -1);
  737. int conn_id = CONN_INVALID_ID;
  738. #if defined(_MSC_VER)
  739. iobuffer_format_write(pkt, "4444w", &type, &conn_id, &tsx_id, &err, err == 0 ? value : err_msg);
  740. #else
  741. iobuffer_format_write(pkt, "4444m", &type, &conn_id, &tsx_id, &err, err == 0 ? value : err_msg);
  742. #endif //_MSC_VER
  743. len = iobuffer_get_length(pkt) - 4;
  744. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  745. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  746. on_write(tbs, conn);
  747. if (err != 0) {
  748. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("send PKT_TYPE_GET_VAR_ACK tsx_id=%d, err=%s", tsx_id, sp_strerror(err));
  749. }
  750. }
  751. }
  752. static void post_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id)
  753. {
  754. if (conn->fd != INVALID_SOCKET) {
  755. int len;
  756. iobuffer_t *pkt = iobuffer_create(-1, -1);
  757. int type = PKT_TYPE_SESSIONEND;
  758. iobuffer_format_write(pkt, "44", &type, &conn_id);
  759. len = iobuffer_get_length(pkt)-4;
  760. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  761. iobuffer_queue_enqueue(conn->tx_queue, pkt);
  762. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("send PKT_TYPE_SESSIONEND");
  763. on_write(tbs, conn);
  764. }
  765. }
  766. static void on_session(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *entity_name, char *function_name, char *class_name)
  767. {
  768. sub_session *session = NULL;
  769. int rc;
  770. rc = create_sub_session(tbs, conn, tsx_id, entity_name, function_name, class_name, &session);
  771. if (rc == 0) {
  772. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("start create sub session!");
  773. //post_session_ack(tbs, conn, session->id, tsx_id, rc, NULL);
  774. } else {
  775. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create_sub_session failed! entity: %s, err = %d %s %d",
  776. entity_name, rc, _GetFileName(__FILE__), __LINE__);
  777. post_session_ack(tbs, conn, CONN_INVALID_ID, tsx_id, rc, get_err_msg(rc));
  778. }
  779. }
  780. static reg_entry *find_reg_entry(sock_connection *conn, int register_id)
  781. {
  782. reg_entry *tpos;
  783. struct hlist_node *pos;
  784. int slot = ((unsigned int)register_id) % REGISTER_BUCKET_SIZE; // bugfix, assure slot positive
  785. hlist_for_each_entry(tpos, pos, &conn->register_bucket[slot], reg_entry, hentry) {
  786. if (tpos->register_id == register_id) {
  787. return tpos;
  788. }
  789. }
  790. return NULL;
  791. }
  792. static void post_event(sp_tbs_t *tbs, reg_entry *entry, int conn_id, int tsx_id, iobuffer_t **evt_pkt)
  793. {
  794. if (entry->conn->fd != INVALID_SOCKET) {
  795. int type;
  796. int len;
  797. iobuffer_t *pkt = *evt_pkt;
  798. *evt_pkt = NULL;
  799. type = PKT_TYPE_EVENT_DATA;
  800. iobuffer_write_head(pkt, IOBUF_T_I4, &tsx_id, 0);
  801. iobuffer_write_head(pkt, IOBUF_T_I4, &conn_id, 0);
  802. iobuffer_write_head(pkt, IOBUF_T_I4, &type, 0);
  803. len = iobuffer_get_length(pkt)-4;
  804. iobuffer_write_head(pkt, IOBUF_T_I4, &len, 0);
  805. iobuffer_queue_enqueue(entry->conn->tx_queue, pkt);
  806. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("send PKT_TYPE_EVENT_DATA tsx_id=%d", tsx_id);
  807. on_write(tbs, entry->conn);
  808. }
  809. sp_bcm_listener_dec_ref(entry->listener); //@
  810. }
  811. static void __bcm_on_message_raw(sp_bcm_listener_t *listener, int from_client_id, iobuffer_t **msg_pkt, void *user_data)
  812. {
  813. reg_entry *entry = (reg_entry*)user_data;
  814. iobuffer_t *pkt = *msg_pkt;
  815. *msg_pkt = NULL;
  816. sp_bcm_listener_inc_ref(listener); //@
  817. post_msg(entry->conn->tbs, MSG_ON_EVENT, (param_size_t)entry, (param_size_t)pkt);
  818. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("post event from_client_id:%d", from_client_id);
  819. }
  820. static void __bcm_on_destroy(sp_bcm_listener_t *listener, void *user_data)
  821. {
  822. reg_entry *entry = (reg_entry*)user_data;
  823. entry->conn->register_cnt--;
  824. hlist_del(&entry->hentry);
  825. sock_connection_dec_ref(entry->conn);
  826. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("unregister event ok! register_id = %d", entry->register_id);
  827. free(entry);
  828. }
  829. static void on_register_event(sp_tbs_t *tbs, sock_connection *conn, int register_id, char *entity_name, char *function_name)
  830. {
  831. sp_env_t *env = sp_get_env();
  832. sp_entity_t *ent = sp_mod_mgr_find_entity_by_name(env->mod_mgr, entity_name);
  833. if (ent) {
  834. int rc;
  835. sp_svc_t *svc = sp_ses_mgr_get_svc(tbs->ses_mgr);
  836. reg_entry *entry;
  837. sp_bcm_listener_cb cb;
  838. if (find_reg_entry(conn, register_id)) {
  839. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("register entry already exist, register id %d", register_id);
  840. return;
  841. }
  842. entry = ZALLOC_T(reg_entry);
  843. entry->register_id = register_id;
  844. entry->entity_id = ent->cfg->idx;
  845. hlist_add_head(&entry->hentry, &conn->register_bucket[((unsigned int)register_id) % REGISTER_BUCKET_SIZE]); // bugfix, assure index positive
  846. conn->register_cnt++;
  847. sock_connection_inc_ref(conn);
  848. entry->conn = conn;
  849. cb.on_message_raw = &__bcm_on_message_raw;
  850. cb.on_message = NULL;
  851. cb.on_destroy = &__bcm_on_destroy;
  852. cb.user_data = entry;
  853. rc = sp_bcm_listener_create(svc, entry->entity_id, function_name, &cb, &entry->listener);
  854. if (rc != 0) {
  855. goto on_error;
  856. }
  857. rc = sp_bcm_listener_subscribe(entry->listener, &entry->uid);
  858. if (rc != 0) {
  859. goto on_error;
  860. }
  861. on_error:
  862. if (rc != 0) {
  863. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("create bcm listener failed! entity = %s, register_id = %d", ent->cfg->name, register_id);
  864. conn->register_cnt--;
  865. hlist_del_init(&entry->hentry);
  866. if (entry->listener) {
  867. sp_bcm_listener_destroy(entry->listener);
  868. }
  869. free(entry);
  870. sock_connection_dec_ref(conn);
  871. } else {
  872. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("register event for %s ok! register_id = %d", ent->cfg->name, register_id);
  873. }
  874. } else {
  875. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("cannot find entity %s!", entity_name);
  876. }
  877. }
  878. static void on_unregister_event(sp_tbs_t *tbs, sock_connection *conn, int register_id)
  879. {
  880. reg_entry *entry = find_reg_entry(conn, register_id);
  881. if (entry) {
  882. if (sp_bcm_listener_unsubscribe(entry->listener) == 0) {
  883. sp_bcm_listener_destroy(entry->listener);
  884. } else {
  885. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("entry already unregistered! register_id = %d", register_id);
  886. }
  887. } else {
  888. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("%s, entry not found, registered_id = %d", __FUNCTION__, register_id);
  889. }
  890. }
  891. static void on_log_event(sp_tbs_t *tbs, sock_connection *conn, int severity_level, int user_code, const char *msg, const linkContext &t_context)
  892. {
  893. sp_log_client_logWithLink(tbs->log_client, Log_Event, severity_level, 0, user_code, 0, 0, NULL == msg ? "" : msg, NULL == msg ? 0 : strlen(msg)
  894. , t_context.bussinessId.GetData(), t_context.traceId.GetData(), t_context.spanId.GetData(), t_context.parentSpanId.GetData());
  895. }
  896. static void on_log_warn(sp_tbs_t *tbs, sock_connection *conn, int severity_level, int user_code, const char *msg, const linkContext& t_context)
  897. {
  898. sp_log_client_logWithLink(tbs->log_client, Log_Warning, severity_level, 0, user_code, 0, 0, NULL == msg ? "" : msg, NULL == msg ? 0 : strlen(msg)
  899. , t_context.bussinessId.GetData(), t_context.traceId.GetData(), t_context.spanId.GetData(), t_context.parentSpanId.GetData());
  900. }
  901. static void on_req(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int tsx_id, int method_id, int method_sig, int timeout, iobuffer_t **req_pkt)
  902. {
  903. sub_session *session = find_sub_session(conn, conn_id);
  904. if (session) {
  905. if (session->uac) {
  906. int rc;
  907. sp_tsx_uac_t *tsx;
  908. sp_tsx_uac_callback cb;
  909. cb.user_data = session;
  910. cb.on_ans = &__uac_on_ans;
  911. cb.on_destroy = NULL;
  912. rc = sp_tsx_uac_create(session->uac, tsx_id, method_id, method_sig, &cb, &tsx);
  913. if (rc == 0) {
  914. rc = sp_tsx_uac_async_req(tsx, timeout, req_pkt);
  915. if (rc == 0) {
  916. WLog_DBG(TAG, "send req: tsx_id=%d, method_id=%d, method_sig=%d, pkt_size=%d",
  917. tsx_id, method_id, method_sig, *req_pkt == NULL ? 0 : iobuffer_get_length(*req_pkt));
  918. } else {
  919. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("async req tsx failed! err = %d %s %d", rc, _GetFileName(__FILE__), __LINE__);
  920. post_req_ack(tbs, conn, conn_id, tsx_id, 1, rc, get_err_msg(rc), NULL);
  921. sp_tsx_uac_destroy(tsx);
  922. }
  923. } else {
  924. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("create req tsx failed! %s %d", _GetFileName(__FILE__), __LINE__);
  925. post_req_ack(tbs, conn, conn_id, tsx_id, 1, rc, get_err_msg(rc), NULL);
  926. }
  927. } else {
  928. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("session already closed! %s %d", _GetFileName(__FILE__), __LINE__);
  929. post_req_ack(tbs, conn, conn_id, tsx_id, 1, Error_NetBroken, get_err_msg(Error_NetBroken), NULL);
  930. }
  931. } else {
  932. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__);
  933. post_req_ack(tbs, conn, conn_id, tsx_id, 1, Error_Break, get_err_msg(Error_Break), NULL);
  934. }
  935. }
  936. static void on_info(sp_tbs_t *tbs, sock_connection *conn, int conn_id, int method_id, int method_sig, iobuffer_t **info_pkt)
  937. {
  938. sub_session *session = find_sub_session(conn, conn_id);
  939. if (session) {
  940. if (session->uac) {
  941. int rc = sp_ses_uac_send_info(session->uac, method_id, method_sig, info_pkt);
  942. if (rc != 0) {
  943. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("uac send info failed! error = %d %s %d", rc, _GetFileName(__FILE__), __LINE__);
  944. }
  945. else {
  946. //DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM)("send info: method_id=%d, method_sig=%d, pkt_size=%d", method_id, method_sig,
  947. // *info_pkt == NULL ? 0 : iobuffer_get_length(*info_pkt));
  948. }
  949. } else {
  950. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("session already closed! info ignored ! %s %d", _GetFileName(__FILE__), __LINE__);
  951. }
  952. } else {
  953. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__);
  954. }
  955. }
  956. static void on_session_end(sp_tbs_t *tbs, sock_connection *conn, int conn_id)
  957. {
  958. sub_session *session = find_sub_session(conn, conn_id);
  959. if (session) {
  960. if (session->uac) {
  961. sp_ses_uac_t *uac = session->uac;
  962. session->uac = NULL;
  963. if (sp_ses_uac_close(uac) == 0) {
  964. sp_ses_uac_destroy(uac);
  965. } else {
  966. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("duplicated uac close!");
  967. }
  968. }
  969. } else {
  970. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot find session id in htable! ignored! %s %d", _GetFileName(__FILE__), __LINE__);
  971. }
  972. }
  973. static void on_stop(sp_tbs_t *tbs)
  974. {
  975. int i;
  976. WSAEventSelect(tbs->server_fd, tbs->conn_evt[SRV_EVT_IDX], 0);
  977. closesocket(tbs->server_fd);
  978. tbs->server_fd = INVALID_SOCKET;
  979. for (i = tbs->conn_cnt-1; i > SRV_EVT_IDX; --i) {
  980. sock_connection *conn = tbs->conn[i];
  981. TOOLKIT_ASSERT(conn);
  982. on_close(tbs, conn);
  983. }
  984. }
  985. static void on_close(sp_tbs_t *tbs, sock_connection *conn)
  986. {
  987. int i;
  988. WLog_WARN(TAG, "socket on close is invoked!");
  989. if (conn->fd != INVALID_SOCKET) {
  990. WSAEventSelect(conn->fd, tbs->conn_evt[conn->idx], 0);
  991. closesocket(conn->fd);
  992. conn->fd = INVALID_SOCKET;
  993. }
  994. for (i = 0; i < SESSION_BUCKET_SIZE; ++i) {
  995. sub_session *tpos;
  996. struct hlist_node *pos, *n;
  997. hlist_for_each_entry_safe(tpos, pos, n, &conn->session_bucket[i], sub_session, hentry) {
  998. if (tpos->uac) {
  999. sp_ses_uac_t *uac = tpos->uac;
  1000. tpos->uac = NULL;
  1001. if (sp_ses_uac_close(uac) == 0) {
  1002. sp_ses_uac_destroy(uac);
  1003. } else {
  1004. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("duplicated uac close! %s %d", _GetFileName(__FILE__), __LINE__);
  1005. }
  1006. }
  1007. }
  1008. }
  1009. for (i = 0; i < REGISTER_BUCKET_SIZE; ++i) {
  1010. reg_entry *tpos;
  1011. struct hlist_node *pos, *n;
  1012. hlist_for_each_entry_safe(tpos, pos, n, &conn->register_bucket[i], reg_entry, hentry) {
  1013. if (tpos->listener) {
  1014. if (sp_bcm_listener_unsubscribe(tpos->listener) == 0) {
  1015. sp_bcm_listener_destroy(tpos->listener);
  1016. }
  1017. }
  1018. }
  1019. }
  1020. sock_connection_dec_ref(conn);
  1021. }
  1022. static void on_set_var_req(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *name, char *value)
  1023. {
  1024. int rc;
  1025. if (name == NULL)
  1026. {
  1027. rc = Error_Param;
  1028. post_set_var_ack(tbs, conn, tsx_id, rc, L"param [name] is null");
  1029. return;
  1030. }
  1031. rc = sp_var_client_set((sp_var_client_t*)tbs->var_client, name, value, 0);
  1032. post_set_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc));
  1033. }
  1034. static void on_get_var_req(sp_tbs_t *tbs, sock_connection *conn, int tsx_id, char *name)
  1035. {
  1036. int len = 260;
  1037. int rc;
  1038. char szTmp[260] = { 0 };
  1039. wchar_t wszValue[260] = { 0 };
  1040. if (name == NULL)
  1041. {
  1042. rc = Error_Param;
  1043. post_get_var_ack(tbs, conn, tsx_id, rc, L"param [name] is null", NULL);
  1044. return;
  1045. }
  1046. rc = sp_var_client_lock(tbs->var_client);
  1047. if (rc == 0) {
  1048. rc = sp_var_client_get((sp_var_client_t*)tbs->var_client, name, szTmp, &len);
  1049. }
  1050. sp_var_client_unlock(tbs->var_client);
  1051. if (rc == 0)
  1052. {
  1053. len = 260;
  1054. toolkit_mbs2wcs(szTmp, &wszValue[0], len);
  1055. post_get_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc), wszValue);
  1056. }
  1057. else
  1058. post_get_var_ack(tbs, conn, tsx_id, rc, get_err_msg(rc), NULL);
  1059. }
  1060. static void process_tcp_pkt(sp_tbs_t *tbs, sock_connection *conn, iobuffer_t **p_pkt)
  1061. {
  1062. iobuffer_t *pkt = *p_pkt;
  1063. int type;
  1064. iobuffer_read(pkt, IOBUF_T_I4, &type, NULL);
  1065. link_context cur;
  1066. char linkStr[512] = "";
  1067. bool withLinkContex = !!(type & PKT_TYPE_CONTROL_LINKCONTEXT);
  1068. int linkId = 0;
  1069. type = type & 0x0000FFFF; //clear control part
  1070. switch (type) {
  1071. case PKT_TYPE_INFO:
  1072. {
  1073. int conn_id;
  1074. int method_id;
  1075. int method_sig;
  1076. if (withLinkContex)
  1077. {
  1078. iobuffer_format_read(pkt, "444mmmm", &conn_id, &method_id, &method_sig, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1079. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1080. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1081. iobuffer_set_linkInfo(pkt, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1082. } else {
  1083. iobuffer_format_read(pkt, "444", &conn_id, &method_id, &method_sig);
  1084. }
  1085. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_INFO conn_id=%d method_id=%d method_sig=%d, %s", conn_id, method_id, method_sig, linkStr);
  1086. on_info(tbs, conn, conn_id, method_id, method_sig, &pkt);
  1087. }
  1088. break;
  1089. case PKT_TYPE_REQ:
  1090. {
  1091. int conn_id;
  1092. int tsx_id;
  1093. int method_id;
  1094. int method_sig;
  1095. int timeout;
  1096. if (withLinkContex)
  1097. {
  1098. iobuffer_format_read(pkt, "44444mmmm", &conn_id, &tsx_id, &method_id, &method_sig, &timeout, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1099. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1100. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1101. iobuffer_set_linkInfo(pkt, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1102. }
  1103. else {
  1104. iobuffer_format_read(pkt, "44444", &conn_id, &tsx_id, &method_id, &method_sig, &timeout);
  1105. }
  1106. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_REQ conn_id=%d tsx_id=%d method_id=%d method_sig=%d timeout=%d, %s", conn_id, tsx_id, method_id, method_sig, timeout, linkStr);
  1107. on_req(tbs, conn, conn_id, tsx_id, method_id, method_sig, timeout, &pkt);
  1108. }
  1109. break;
  1110. case PKT_TYPE_SESSION:
  1111. {
  1112. int tsx_id;
  1113. wchar_t *wentity_name;
  1114. wchar_t*wfunction_name;
  1115. wchar_t*wclass_name;
  1116. char *entity_name = NULL;
  1117. char *function_name = NULL;
  1118. char *class_name = NULL;
  1119. int n;
  1120. if (withLinkContex)
  1121. {
  1122. iobuffer_format_read(pkt, "4mmmmmmm", &tsx_id
  1123. , &wentity_name, &wfunction_name, &wclass_name, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1124. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1125. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1126. }
  1127. else {
  1128. iobuffer_format_read(pkt, "4mmm", &tsx_id, &wentity_name, &wfunction_name, &wclass_name);
  1129. }
  1130. n = toolkit_wcs2mbs(wentity_name, NULL, 0);
  1131. WLog_DBG(TAG, "wtm len for wentity_name: %d", n);
  1132. if (wentity_name && n > 0) {
  1133. entity_name = (char*)malloc(n);
  1134. toolkit_wcs2mbs(wentity_name, entity_name, n);
  1135. }
  1136. FREE(wentity_name);
  1137. n = toolkit_wcs2mbs(wfunction_name, NULL, 0);
  1138. WLog_DBG(TAG, "wtm len for wfunction_name: %d", n);
  1139. if (wfunction_name && n > 0) {
  1140. function_name = (char*)malloc(n);
  1141. toolkit_wcs2mbs(wfunction_name, function_name, n);
  1142. }
  1143. FREE(wfunction_name);
  1144. n = toolkit_wcs2mbs(wclass_name, NULL, 0);
  1145. WLog_DBG(TAG, "wtm len for wclass_name: %d", n);
  1146. if (wclass_name && n > 0) {
  1147. class_name = (char*)malloc(n);
  1148. toolkit_wcs2mbs(wclass_name, class_name, n);
  1149. }
  1150. FREE(wclass_name);
  1151. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SESSION tsx_id=%d entity_name=%s function_name=%s class_name=%s",
  1152. tsx_id, entity_name ? entity_name : "<null>", function_name ? function_name : "<null>"
  1153. , class_name ? class_name : "<null>");
  1154. on_session(tbs, conn, tsx_id, entity_name, function_name, class_name);
  1155. if (entity_name)
  1156. free(entity_name);
  1157. if (function_name)
  1158. free(function_name);
  1159. if (class_name)
  1160. free(class_name);
  1161. }
  1162. break;
  1163. case PKT_TYPE_SESSIONEND:
  1164. {
  1165. int conn_id;
  1166. if (withLinkContex)
  1167. {
  1168. iobuffer_format_read(pkt, "4mmmm", &conn_id, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1169. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1170. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1171. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1172. }
  1173. else {
  1174. iobuffer_read(pkt, IOBUF_T_I4, &conn_id, NULL);
  1175. }
  1176. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SESSIONEND conn_id=%d", conn_id);
  1177. on_session_end(tbs, conn, conn_id);
  1178. }
  1179. break;
  1180. case PKT_TYPE_REGISTER_EVENT:
  1181. {
  1182. int register_id;
  1183. wchar_t* wentity_name;
  1184. char* entity_name = NULL;
  1185. wchar_t* wfunction_name;
  1186. char *function_name = NULL;
  1187. int n;
  1188. if (withLinkContex)
  1189. {
  1190. iobuffer_format_read(pkt, "4mmmmmm", &register_id,
  1191. &wentity_name, &wfunction_name, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1192. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1193. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1194. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1195. }
  1196. else {
  1197. iobuffer_format_read(pkt, "4mm", &register_id, &wentity_name, &wfunction_name);
  1198. }
  1199. n = toolkit_wcs2mbs(wentity_name, NULL, 0);
  1200. if (wentity_name && n > 0) {
  1201. entity_name = (char*)malloc(n);
  1202. toolkit_wcs2mbs(wentity_name, entity_name, n);
  1203. }
  1204. FREE(wentity_name);
  1205. n = toolkit_wcs2mbs(wfunction_name, NULL, 0);
  1206. if (wfunction_name && n > 0) {
  1207. function_name = (char*)malloc(n);
  1208. toolkit_wcs2mbs(wfunction_name, function_name, n);
  1209. }
  1210. FREE(wfunction_name);
  1211. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_REGISTER_EVENT register_id=%d, entity_name=%s, function_name=%s",
  1212. register_id, entity_name ? entity_name : "<null>", function_name ? function_name : "<null>");
  1213. on_register_event(tbs, conn, register_id, entity_name, function_name);
  1214. if (entity_name)
  1215. free(entity_name);
  1216. if (function_name)
  1217. free(function_name);
  1218. }
  1219. break;
  1220. case PKT_TYPE_UNREGISTER_EVENT:
  1221. {
  1222. int register_id;
  1223. if (withLinkContex)
  1224. {
  1225. iobuffer_format_read(pkt, "4mmmm", &register_id, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1226. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1227. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1228. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1229. }
  1230. else {
  1231. iobuffer_format_read(pkt, "4", &register_id);
  1232. }
  1233. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_UNREGISTER_EVENT register_id=%d", register_id);
  1234. on_unregister_event(tbs, conn, register_id);
  1235. }
  1236. break;
  1237. case PKT_TYPE_LOG_EVENT:
  1238. {
  1239. int severity_level;
  1240. int user_code;
  1241. wchar_t* wmsg = NULL;
  1242. char *msg = NULL;
  1243. if (withLinkContex)
  1244. {
  1245. iobuffer_format_read(pkt, "44mmmmm", &severity_level, &user_code, &wmsg, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1246. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1247. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1248. } else {
  1249. iobuffer_format_read(pkt, "44m", &severity_level, &user_code, &wmsg);
  1250. }
  1251. if (wmsg) {
  1252. int n = toolkit_wcs2mbs(wmsg, NULL, 0);
  1253. const int rn = n > 0 ? n : 1;
  1254. msg = (char*)malloc(rn);
  1255. memset(msg, '\0', sizeof(char) * rn);
  1256. toolkit_wcs2mbs(wmsg, msg, n);
  1257. FREE(wmsg);
  1258. }
  1259. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_LOG_EVENT severity_level=%d, user_code=0x%08x, msg=%s", severity_level, user_code, msg ? msg : "<null>");
  1260. linkContext t_Context(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1261. on_log_event(tbs, conn, severity_level, user_code, msg, t_Context);
  1262. if (msg) {
  1263. free(msg);
  1264. }
  1265. }
  1266. break;
  1267. case PKT_TYPE_LOG_WARN:
  1268. {
  1269. int severity_level;
  1270. int user_code;
  1271. wchar_t* wmsg = NULL;
  1272. char *msg = NULL;
  1273. if (withLinkContex)
  1274. {
  1275. iobuffer_format_read(pkt, "44mmmmm", &severity_level, &user_code, &wmsg, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1276. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1277. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1278. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1279. } else {
  1280. iobuffer_format_read(pkt, "44m", &severity_level, &user_code, &wmsg);
  1281. }
  1282. if (wmsg) {
  1283. int n = toolkit_wcs2mbs(wmsg, NULL, 0);
  1284. const int rn = n > 0 ? n : 1;
  1285. msg = (char*)malloc(rn);
  1286. memset(msg, '\0', sizeof(char) * rn);
  1287. toolkit_wcs2mbs(wmsg, msg, n);
  1288. FREE(wmsg);
  1289. }
  1290. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_LOG_WARN severity_level=%d, user_code=0x%08x, msg=%s", severity_level, user_code, msg ? msg : "<null>");
  1291. linkContext t_Context(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1292. on_log_warn(tbs, conn, severity_level, user_code, msg, t_Context);
  1293. if (msg)
  1294. free(msg);
  1295. }
  1296. break;
  1297. case PKT_TYPE_SET_VAR_REQ:
  1298. {
  1299. int tsx_id = 0;
  1300. wchar_t* wname = NULL;
  1301. wchar_t* wvalue = NULL;
  1302. char *name = NULL;
  1303. char *value = NULL;
  1304. if (withLinkContex)
  1305. {
  1306. iobuffer_format_read(pkt, "4mmmmmm", &tsx_id, &wname, &wvalue, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1307. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1308. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1309. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1310. } else {
  1311. iobuffer_format_read(pkt, "4mm", &tsx_id, &wname, &wvalue);
  1312. }
  1313. if (wname) {
  1314. int n = toolkit_wcs2mbs(wname, NULL, 0);
  1315. const int rn = n > 0 ? n : 1;
  1316. name = (char*)malloc(rn);
  1317. memset(name, '\0', sizeof(char)* rn);
  1318. toolkit_wcs2mbs(wname, name, n);
  1319. FREE(wname);
  1320. }
  1321. if (wvalue) {
  1322. int n = toolkit_wcs2mbs(wvalue, NULL, 0);
  1323. const int rn = n > 0 ? n : 1;
  1324. value = (char*)malloc(rn);
  1325. memset(value, '\0', sizeof(char)* rn);
  1326. toolkit_wcs2mbs(wvalue, value, n);
  1327. FREE(wvalue);
  1328. }
  1329. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_SET_VAR_REQ tsx_id=%d, name=%s, value=%s, %s"
  1330. , tsx_id, name ? name : "<null>", value ? value : "<null>", linkStr);
  1331. on_set_var_req(tbs, conn, tsx_id, name, value);
  1332. if (name)
  1333. free(name);
  1334. if (value)
  1335. free(value);
  1336. }
  1337. break;
  1338. case PKT_TYPE_GET_VAR_REQ:
  1339. {
  1340. int tsx_id = 0;
  1341. wchar_t*wname = NULL;
  1342. char *name = NULL;
  1343. if (withLinkContex)
  1344. {
  1345. iobuffer_format_read(pkt, "4mmmmm", &tsx_id, &wname, &cur.wbusinessId, &cur.wtraceId, &cur.wspanId, &cur.wparentSpanId);
  1346. if(g_postlink) linkId = g_postlink(cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA());
  1347. sprintf_s(linkStr, sizeof(linkStr), "withLinkContex: %d linkcontext:%s-%s-%s-%s-%d",
  1348. withLinkContex, cur.businessIdWtoA(), cur.traceIdWtoA(), cur.spanIdWtoA(), cur.parentSpanIdWtoA(), linkId);
  1349. } else {
  1350. iobuffer_format_read(pkt, "4m", &tsx_id, &wname);
  1351. }
  1352. if (wname) {
  1353. int n = toolkit_wcs2mbs(wname, NULL, 0);
  1354. const int rn = n > 0 ? n : 1;
  1355. name = (char*)malloc(rn);
  1356. memset(name, '\0', sizeof(char)* rn);
  1357. toolkit_wcs2mbs(wname, name, n);
  1358. FREE(wname);
  1359. }
  1360. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("recv PKT_TYPE_GET_VAR_REQ tsx_id=%d, name=%s, %s", tsx_id, name ? name : "<null>", linkStr);
  1361. on_get_var_req(tbs, conn, tsx_id, name);
  1362. if (name)
  1363. free(name);
  1364. }
  1365. break;
  1366. default:
  1367. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("recv unknown pkt type: %d", type);
  1368. //TOOLKIT_ASSERT(0);
  1369. break;
  1370. }
  1371. *p_pkt = NULL;
  1372. if (pkt)
  1373. iobuffer_dec_ref(pkt);
  1374. }
  1375. static void on_read(sp_tbs_t *tbs, sock_connection *conn)
  1376. {
  1377. WLog_DBG(TAG, "Enter %s", __FUNCTION__);
  1378. int t = -404;
  1379. if (conn->rx_state == RECV_STATE_HEADER) {
  1380. do {
  1381. t = recv(conn->fd, &conn->rx_hdr_buf[conn->rx_hdr_buf_len], RX_BUF_SIZE - conn->rx_hdr_buf_len, 0);
  1382. }
  1383. while (t == -1 && errno == EINTR);
  1384. if (t > 0) {
  1385. int offset = 0;
  1386. conn->rx_hdr_buf_len += t;
  1387. while (conn->rx_hdr_buf_len-offset >= HDR_LEN) {
  1388. iobuffer_t *pkt;
  1389. memcpy(&conn->rx_header_len, conn->rx_hdr_buf+offset, HDR_LEN);
  1390. conn->rx_header_len += 4; // fixed hdr not include type!!!
  1391. WLog_DBG(TAG, "rx header len(with 4 sizeof(type)): %d", conn->rx_header_len);
  1392. offset += HDR_LEN;
  1393. pkt = iobuffer_create(-1, conn->rx_header_len);
  1394. if (conn->rx_hdr_buf_len-offset >= conn->rx_header_len) {
  1395. memcpy(iobuffer_data(pkt, -1), conn->rx_hdr_buf+offset, conn->rx_header_len);
  1396. offset += conn->rx_header_len;
  1397. iobuffer_push_count(pkt, conn->rx_header_len);
  1398. process_tcp_pkt(tbs, conn, &pkt);
  1399. if (pkt)
  1400. iobuffer_dec_ref(pkt);
  1401. } else {
  1402. memcpy(iobuffer_data(pkt, -1), conn->rx_hdr_buf+offset, conn->rx_hdr_buf_len-offset);
  1403. iobuffer_push_count(pkt, conn->rx_hdr_buf_len-offset);
  1404. offset = conn->rx_hdr_buf_len;
  1405. TOOLKIT_ASSERT(conn->rx_pkt == NULL);
  1406. conn->rx_pkt = pkt;
  1407. conn->rx_state = RECV_STATE_BODY;
  1408. }
  1409. }
  1410. if (offset == conn->rx_hdr_buf_len) {
  1411. conn->rx_hdr_buf_len = 0;
  1412. } else {
  1413. if (offset > 0) {
  1414. TOOLKIT_ASSERT(conn->rx_state == RECV_STATE_HEADER);
  1415. memmove(&conn->rx_hdr_buf[0], &conn->rx_hdr_buf[offset], conn->rx_hdr_buf_len-offset);
  1416. conn->rx_hdr_buf_len -= offset;
  1417. }
  1418. }
  1419. }
  1420. } else if (conn->rx_state == RECV_STATE_BODY) {
  1421. int offset;
  1422. TOOLKIT_ASSERT(conn->rx_pkt);
  1423. offset = iobuffer_get_length(conn->rx_pkt);
  1424. do {
  1425. t = recv(conn->fd, iobuffer_data(conn->rx_pkt, -1), conn->rx_header_len - offset, 0);
  1426. } while (t == -1 && errno == EINTR);
  1427. if (t > 0) {
  1428. iobuffer_push_count(conn->rx_pkt, t);
  1429. if (offset+t == conn->rx_header_len) {
  1430. process_tcp_pkt(tbs, conn, &conn->rx_pkt);
  1431. if (conn->rx_pkt) {
  1432. iobuffer_dec_ref(conn->rx_pkt);
  1433. conn->rx_pkt = NULL;
  1434. }
  1435. conn->rx_hdr_buf_len = 0;
  1436. conn->rx_state = RECV_STATE_HEADER;
  1437. }
  1438. }
  1439. } else {
  1440. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("cannot go here, bug! file: %s, line: %d", _GetFileName(__FILE__), __LINE__);
  1441. TOOLKIT_ASSERT(0);
  1442. }
  1443. if (t == 0) {
  1444. on_close(tbs, conn);
  1445. } else if (t == -1) {
  1446. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("connection read failed: %s", strerror(errno));
  1447. on_close(tbs, conn);
  1448. }
  1449. WLog_DBG(TAG, "Leave %s", __FUNCTION__);
  1450. }
  1451. static void on_write(sp_tbs_t *tbs, sock_connection *conn)
  1452. {
  1453. if (conn->fd == INVALID_SOCKET)
  1454. return;
  1455. WLog_DBG(TAG, "Enter %s", __FUNCTION__);
  1456. int t = -404;
  1457. while (iobuffer_queue_count(conn->tx_queue) > 0) {
  1458. iobuffer_t *pkt = iobuffer_queue_head(conn->tx_queue);
  1459. int sended = 0;
  1460. int total = iobuffer_get_length(pkt);
  1461. while (sended < total) {
  1462. do {
  1463. t = send(conn->fd, iobuffer_data(pkt, 0), total - sended, 0);
  1464. } while (t < 0 && errno == EINTR);
  1465. if (t > 0) {
  1466. sended += t;
  1467. iobuffer_pop_count(pkt, t);
  1468. } else {
  1469. break;
  1470. }
  1471. }
  1472. if (sended < total) {
  1473. break;
  1474. } else {
  1475. iobuffer_queue_deque(conn->tx_queue);
  1476. iobuffer_dec_ref(pkt);
  1477. }
  1478. }
  1479. WLog_DBG(TAG, "Leave %s", __FUNCTION__);
  1480. }
  1481. static void on_accept(sp_tbs_t *tbs)
  1482. {
  1483. int rc;
  1484. rc = create_connection(tbs);
  1485. if (rc != 0) {
  1486. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM)("create connection failed! err = %d %s %d", rc, _GetFileName(__FILE__), __LINE__);
  1487. } else {
  1488. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("create connection ok! %s %d", _GetFileName(__FILE__), __LINE__);
  1489. }
  1490. }
  1491. static void post_msg(sp_tbs_t *tbs, int msg, param_size_t param1, param_size_t param2)
  1492. {
  1493. uint64_t wdata = 0;
  1494. int rc;
  1495. cmd_entry *e = MALLOC_T(cmd_entry);
  1496. e->msg = msg;
  1497. e->param1 = param1;
  1498. e->param2 = param2;
  1499. if (0 != evtpoll_subscribe(tbs->ep, EV_READ, tbs->msg_fd, (void*)(long)MSG_EVT_IDX, NULL)) {
  1500. WLog_ERR(TAG, "tbs subscribe msg eventfd failed.");
  1501. free(e);
  1502. return;
  1503. }
  1504. spinlock_enter(&tbs->cmd_list_lock, -1);
  1505. list_add_tail(&e->entry, &tbs->cmd_list);
  1506. spinlock_leave(&tbs->cmd_list_lock);
  1507. wdata = 1;
  1508. do {
  1509. rc = write(tbs->msg_fd, &wdata, sizeof wdata);
  1510. } while (rc < 0 && rc == EINTR);
  1511. if (rc == -1) {
  1512. WLog_ERR(TAG, "write to eventfd failed: %d", errno);
  1513. return;
  1514. }
  1515. }
  1516. static unsigned int __stdcall server_proc(void *arg)
  1517. {
  1518. sp_tbs_t *tbs = (sp_tbs_t*)arg;
  1519. struct sockaddr_in addr = {0};
  1520. int istop = 0;
  1521. int fixed_conn;
  1522. WLog_DBG(TAG, "enter server listern and deal business thread");
  1523. TOOLKIT_ASSERT(tbs->server_fd == INVALID_SOCKET);
  1524. tbs->server_fd = _socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  1525. if (tbs->server_fd == INVALID_SOCKET) {
  1526. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("create list fd socket failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError());
  1527. tbs->ready_evt_result = Error_Resource;
  1528. goto on_error;
  1529. }
  1530. if (make_fd_cloexec(tbs->server_fd, 1) != 0)
  1531. goto on_error;
  1532. {
  1533. long val = 1;
  1534. _setsockopt(tbs->server_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&val, sizeof(val));
  1535. }
  1536. addr.sin_family = AF_INET;
  1537. addr.sin_addr.s_addr = tbs->server_ip;
  1538. addr.sin_port = tbs->server_port;
  1539. if (_bind(tbs->server_fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
  1540. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("bind address failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError());
  1541. tbs->ready_evt_result = Error_AlreadyExist;
  1542. goto on_error;
  1543. }
  1544. WLog_DBG(TAG, "to attach server fd: %d", tbs->server_fd);
  1545. if (evtpoll_attach(tbs->ep, tbs->server_fd)) {
  1546. WLog_ERR(TAG, "%s: server attach evtpoll for %d failed!", __FUNCTION__, tbs->server_fd);
  1547. goto on_error;
  1548. }
  1549. if (0 != evtpoll_subscribe(tbs->ep, EV_ACCEPT, tbs->server_fd, (void*)SRV_EVT_IDX, NULL)) {
  1550. WLog_ERR(TAG, "%s: server attach evtpoll for %d failed!", __FUNCTION__, tbs->server_fd);
  1551. goto on_error;
  1552. }
  1553. if (_listen(tbs->server_fd, 5) != 0) {
  1554. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("winsock::listen failed! %s %d, LastError=%d", _GetFileName(__FILE__), __LINE__, WSAGetLastError());
  1555. tbs->ready_evt_result = Error_AlreadyExist;
  1556. goto on_error;
  1557. }
  1558. tbs->conn_cnt++;
  1559. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("start tbs server ok!");
  1560. tbs->ready_evt_result = 0;
  1561. SetEvent(tbs->ready_evt);
  1562. fixed_conn = tbs->conn_cnt;
  1563. while (!istop || tbs->conn_cnt > fixed_conn) {
  1564. int i;
  1565. const int evt_cnt = tbs->conn_cnt;
  1566. struct epoll_event events[MAX_EPOLL_EVENT];
  1567. const int nfds = evtpoll_wait(tbs->ep, events, MAX_EPOLL_EVENT, MAX_TIMEOUT);
  1568. if (nfds == TOOLKIT_ETIMEDOUT) {
  1569. continue;
  1570. }
  1571. if (nfds < 0) {
  1572. break;
  1573. }
  1574. for (i = 0; i < nfds; ++i) {
  1575. struct epoll_event* pe = events + i;
  1576. int n;
  1577. const int wf = pe->events & EPOLLOUT ? 1 : 0;
  1578. const int rf = pe->events & EPOLLIN ? 1 : 0;
  1579. void* pt = NULL;
  1580. WLog_INFO(TAG, "poll events[%d]::fd(0x%08X) OUT:%d, IN:%d", i, pe->data.fd, wf, rf);
  1581. n = evtpoll_deal(tbs->ep, pe, &pt, 0);
  1582. WLog_DBG(TAG, "evtpoll deal returned: %d, %d", n, (long)(pt));
  1583. if (0 == n) {
  1584. const int idx = (int)(long)(pt);
  1585. if ((idx == SRV_EVT_IDX) && rf) {
  1586. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("FD_ACCEPT trigger!");
  1587. on_accept(tbs);
  1588. }
  1589. if (idx == MSG_EVT_IDX) { // cmd
  1590. uint64_t rdata;
  1591. cmd_entry* e;
  1592. do {
  1593. n = read(tbs->msg_fd, &rdata, sizeof rdata);
  1594. } while (n < 0 && errno == EINTR);
  1595. spinlock_enter(&tbs->cmd_list_lock, -1);
  1596. e = list_first_entry(&tbs->cmd_list, cmd_entry, entry);
  1597. list_del(&e->entry);
  1598. spinlock_leave(&tbs->cmd_list_lock);
  1599. WLog_DBG(TAG, "server receive cmd msg: %d", e->msg);
  1600. switch (e->msg) {
  1601. case MSG_STOP:
  1602. {
  1603. on_stop(tbs); // ...stop
  1604. istop++;
  1605. }
  1606. break;
  1607. case MSG_UAC_ON_CONNECT:
  1608. {
  1609. sub_session* session = (sub_session*)e->param1;
  1610. const int error = e->param2;
  1611. uac_on_connect(tbs, session->conn, session, error);
  1612. }
  1613. break;
  1614. case MSG_UAC_ON_DESTROY:
  1615. {
  1616. sub_session* session = (sub_session*)e->param1;
  1617. uac_on_destroy(tbs, session->conn, session);
  1618. }
  1619. break;
  1620. case MSG_UAC_ON_CLOSE:
  1621. {
  1622. sub_session* session = (sub_session*)e->param1;
  1623. int error = e->param2;
  1624. uac_on_close(tbs, session->conn, session, error);
  1625. }
  1626. break;
  1627. case MSG_UAC_ON_ANS:
  1628. {
  1629. sub_session* session = (sub_session*)e->param1;
  1630. iobuffer_t* ans_pkt = (iobuffer_t*)e->param2;
  1631. int tsx_id;
  1632. int error;
  1633. int end;
  1634. int user_error;
  1635. char* msg_str = NULL;
  1636. iobuffer_format_read(ans_pkt, "4444s", &tsx_id, &error, &user_error, &end, &msg_str);
  1637. uac_on_ans(tbs, session->conn, session, tsx_id, error, user_error, msg_str, end, &ans_pkt);
  1638. if (ans_pkt)
  1639. iobuffer_dec_ref(ans_pkt);
  1640. if (msg_str)
  1641. free(msg_str);
  1642. }
  1643. break;
  1644. case MSG_ON_EVENT:
  1645. {
  1646. reg_entry* entry = (reg_entry*)e->param1;
  1647. iobuffer_t* evt_pkt = (iobuffer_t*)e->param2;
  1648. post_event(tbs, entry, 0, entry->register_id, &evt_pkt);
  1649. if (evt_pkt)
  1650. iobuffer_dec_ref(evt_pkt);
  1651. }
  1652. break;
  1653. default:
  1654. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("unknown msg type! %s %d", _GetFileName(__FILE__), __LINE__);
  1655. TOOLKIT_ASSERT(0);
  1656. break;
  1657. }
  1658. free(e);
  1659. }
  1660. if (idx != SRV_EVT_IDX && idx < evt_cnt) {
  1661. sock_connection* conn = tbs->conn[idx];
  1662. TOOLKIT_ASSERT(conn);
  1663. if (rf) {
  1664. on_read(tbs, conn);
  1665. } else if (wf) {
  1666. on_write(tbs, conn);
  1667. }
  1668. }
  1669. }
  1670. }
  1671. }
  1672. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM)("tbs server thread exit!");
  1673. on_error:
  1674. if (tbs->server_fd != INVALID_SOCKET) {
  1675. evtpoll_unsubscribe(tbs->ep, EV_ACCEPT, tbs->server_fd, 0, NULL, NULL);
  1676. evtpoll_detach(tbs->ep, tbs->server_fd);
  1677. closesocket(tbs->server_fd);
  1678. tbs->server_fd = INVALID_SOCKET;
  1679. }
  1680. if (tbs->conn_evt[SRV_EVT_IDX]) {
  1681. WSACloseEvent(tbs->conn_evt[SRV_EVT_IDX]);
  1682. tbs->conn_evt[SRV_EVT_IDX] = NULL;
  1683. tbs->conn_cnt--;
  1684. }
  1685. if (tbs->ready_evt_result) {
  1686. SetEvent(tbs->ready_evt);
  1687. }
  1688. return 0;
  1689. }
  1690. int sp_tbs_create(sp_ses_mgr_t *ses_mgr, const char *ipaddr, unsigned short port, sp_tbs_t **p_tbs)
  1691. {
  1692. sp_tbs_t *tbs;
  1693. sp_svc_t *svc = sp_ses_mgr_get_svc(ses_mgr);
  1694. WLog_DBG(TAG, "Enter %s", __FUNCTION__);
  1695. TOOLKIT_ASSERT(ipaddr);
  1696. TOOLKIT_ASSERT(ses_mgr);
  1697. tbs = ZALLOC_T(sp_tbs_t);
  1698. tbs->ep = evtpoll_create();
  1699. if (!tbs->ep) {
  1700. goto on_error_0;
  1701. }
  1702. tbs->msg_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
  1703. if (tbs->msg_fd == -1) {
  1704. WLog_ERR(TAG, "create event fd failed: %d", errno);
  1705. goto on_error_1;
  1706. }
  1707. tbs->conn_evt[0] = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL);
  1708. tbs->ready_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  1709. sp_log_client_create(svc, sp_svc_get_iom(svc), &tbs->log_client);
  1710. tbs->conn_cnt = 1;
  1711. tbs->server_fd = INVALID_SOCKET;
  1712. tbs->server_ip = inet_addr(ipaddr);
  1713. tbs->server_port = htons(port);
  1714. tbs->sub_session_seq = (unsigned int)GetTickCount();
  1715. INIT_LIST_HEAD(&tbs->cmd_list);
  1716. spinlock_init(&tbs->cmd_list_lock);
  1717. tbs->ses_mgr = ses_mgr;
  1718. sp_var_client_create(svc, &tbs->var_client);
  1719. WLog_DBG(TAG, "to attach msg fd: %d", tbs->msg_fd);
  1720. if (evtpoll_attach(tbs->ep, tbs->msg_fd) != 0) {
  1721. WLog_ERR(TAG, "%s: attach evtpoll for %d failed!", __FUNCTION__, tbs->msg_fd);
  1722. goto on_error_2;
  1723. }
  1724. *p_tbs = tbs;
  1725. WLog_DBG(TAG, "Leave %s succ!", __FUNCTION__);
  1726. return 0;
  1727. on_error_2:
  1728. close(tbs->msg_fd);
  1729. on_error_1:
  1730. evtpoll_destroy(tbs->ep);
  1731. on_error_0:
  1732. free(tbs);
  1733. WLog_DBG(TAG, "Leave %s failed!", __FUNCTION__);
  1734. return -1;
  1735. }
  1736. int sp_tbs_start(sp_tbs_t *tbs)
  1737. {
  1738. if (tbs->work_thread) {
  1739. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM)("may be already started! %s %d", _GetFileName(__FILE__), __LINE__);
  1740. return Error_Duplication;
  1741. }
  1742. tbs->ready_evt_result = 0;
  1743. ResetEvent(tbs->ready_evt);
  1744. tbs->work_thread = (HANDLE)_beginthreadex(NULL, 0, &server_proc, tbs, 0, (unsigned*)&tbs->work_thread_id);
  1745. if (!tbs->work_thread) {
  1746. return Error_Resource;
  1747. }
  1748. WaitForSingleObject(tbs->ready_evt, INFINITE);
  1749. if (tbs->ready_evt_result) {
  1750. CloseHandle(tbs->work_thread);
  1751. tbs->work_thread = NULL;
  1752. }
  1753. WLog_DBG(TAG, "start tbs finished! %d", tbs->ready_evt_result);
  1754. return tbs->ready_evt_result;
  1755. }
  1756. void sp_tbs_stop(sp_tbs_t *tbs)
  1757. {
  1758. TOOLKIT_ASSERT(tbs->work_thread);
  1759. post_msg(tbs, MSG_STOP, 0, 0);
  1760. WaitForSingleObject(tbs->work_thread, INFINITE);
  1761. CloseHandle(tbs->work_thread);
  1762. tbs->work_thread = NULL;
  1763. }
  1764. void sp_tbs_destroy(sp_tbs_t *tbs)
  1765. {
  1766. TOOLKIT_ASSERT(tbs->conn_cnt == 1);
  1767. TOOLKIT_ASSERT(tbs->server_fd == INVALID_SOCKET);
  1768. CloseHandle(tbs->conn_evt[0]);
  1769. CloseHandle(tbs->ready_evt);
  1770. sp_log_client_destroy(tbs->log_client);
  1771. sp_var_client_destroy(tbs->var_client);
  1772. evtpoll_detach(tbs->ep, tbs->msg_fd);
  1773. close(tbs->msg_fd);
  1774. evtpoll_destroy(tbs->ep);
  1775. free(tbs);
  1776. }