sp_log.cpp 37 KB


  1. #include "precompile.h"
  2. #include <stdarg.h>
  3. #include "sp_log.h"
  4. #include "sp_def.h"
  5. #include "sp_svc.h"
  6. #include "sp_env.h"
  7. #include "sp_uid.h"
  8. #include "sp_rsn.h"
  9. #include "SpBase.h"
  10. #include "memutil.h"
  11. #include "list.h"
  12. #include "array.h"
  13. #include "hashset.h"
  14. #include "jhash.h"
  15. #include "refcnt.h"
  16. #include "y2k_time.h"
  17. #include "dbgutil.h"
  18. #include <winpr/synch.h>
  19. #include <winpr/sysinfo.h>
  20. #include <winpr/thread.h>
  21. #define LOG_CMD_RECORD 0x01
  22. #define LOG_CMD_FLUSH 0x02
  23. #define LOG_CMD_SUBSCRIBE 0x03
  24. #define LOG_CMD_UNSUBSCRIBE 0x04
  25. #define LOG_CMD_LISTEN_RECORD 0x05
  26. #define DAEMON_LOG_TIMEOUT_INTERVAL 5
  27. #define LOG_FILTER_BIT_LOGTYPE 0
  28. #define LOG_FILTER_BIT_ENTITY 1
  29. #define LOG_FILTER_BIT_SEVERITY 2
  30. #define LOG_FILTER_BIT_SYSCODE 3
  31. #define BIT_MASK(bit) (1 << (bit))
  32. #define FILTER_HASHTABLE_SIZE 1023
  33. struct sp_log_client_t
  34. {
  35. int local_svc_id;
  36. sp_svc_t *svc;
  37. sp_iom_t *iom;
  38. };
  39. // svc == null, iom cannot be null, anonymous
  40. // svc != null, iom can be null
  41. int sp_log_client_create(sp_svc_t *svc, sp_iom_t *iom, sp_log_client_t **p_client)
  42. {
  43. sp_log_client_t *client = MALLOC_T(sp_log_client_t);
  44. if (client) {
  45. if (svc) {
  46. if (!iom) {
  47. iom = sp_svc_get_iom(svc);
  48. }
  49. }
  50. client->svc = svc;
  51. client->iom = iom;
  52. if (svc) {
  53. client->local_svc_id = sp_svc_get_id(svc);
  54. } else {
  55. client->local_svc_id = SP_INVALID_SVC_ID;
  56. }
  57. *p_client = client;
  58. return 0;
  59. } else {
  60. return -1;
  61. };
  62. }
  63. int sp_log_client_destroy(sp_log_client_t *client)
  64. {
  65. free(client);
  66. return 0;
  67. }
  68. // TODO: param size type [4/2/2020 12:50 Gifur]
  69. /*
  70. int sp_log_client_log(sp_log_client_t *client, int type, int severity, int sys_error, int usr_error, int param_cnt, int *params, const char *format, ...)
  71. {
  72. va_list arg;
  73. va_start(arg, format);
  74. return sp_log_client_logv(client, type, severity, sys_error, usr_error, param_cnt, params, format, arg);
  75. }
  76. */
  77. //对比sp_log_client_logv, 去除了可变参数,当前已经完全替代了sp_log_client_log和sp_log_client_logv
  78. int sp_log_client_logEx(sp_log_client_t* client, int type, int severity, int sys_error, int usr_error, int param_cnt, int* params, const char* msg, int len)
  79. {
  80. int i, n;
  81. int rc;
  82. iobuffer_t* info_pkt;
  83. sp_env_t* env = sp_get_env();
  84. sp_iom_t* iom = client->iom;
  85. int local_epid = sp_iom_get_epid(iom);
  86. u__int64_t log_id = 0;
  87. y2k_time_t log_time;
  88. sp_rsn_context_t* rsn_ctx;
  89. sp_entity_t* ent;
  90. n = len;
  91. if (n < 0 || n > SP_LOG_MAX_LEN) {
  92. DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log len is not right, %d, only accept max len: %d", n, SP_LOG_MAX_LEN);
  93. return Error_Param;
  94. }
  95. ent = sp_mod_mgr_find_entity_by_idx(env->mod_mgr, client->local_svc_id);
  96. if (!ent) {
  97. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log sp_mod_mgr_find_entity_by_idx failed");
  98. return Error_Param;
  99. }
  100. log_time = y2k_time_now();
  101. rsn_ctx = client->svc ? sp_svc_get_runserial_context(client->svc) : NULL;
  102. info_pkt = iobuffer_create(-1, n + 112);
  103. iobuffer_write(info_pkt, IOBUF_T_I4, &ent->instance_id, 0);
  104. iobuffer_write(info_pkt, IOBUF_T_I8, &log_id, 0);
  105. if (rsn_ctx) {
  106. iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
  107. iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
  108. iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
  109. iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
  110. } else {
  111. u__int64_t invalid_rsn = 0;
  112. int t = 0;
  113. iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0);
  114. iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0);
  115. iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0);
  116. iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0);
  117. }
  118. iobuffer_write(info_pkt, IOBUF_T_I4, &log_time, 0);
  119. iobuffer_write(info_pkt, IOBUF_T_I4, &type, 0);
  120. iobuffer_write(info_pkt, IOBUF_T_I4, &client->local_svc_id, 0);
  121. iobuffer_write(info_pkt, IOBUF_T_I4, &local_epid, 0);
  122. iobuffer_write(info_pkt, IOBUF_T_I4, &severity, 0);
  123. iobuffer_write(info_pkt, IOBUF_T_I4, &sys_error, 0);
  124. iobuffer_write(info_pkt, IOBUF_T_I4, &usr_error, 0);
  125. iobuffer_write(info_pkt, IOBUF_T_I4, &param_cnt, 0);
  126. for (i = 0; i < param_cnt; ++i) {
  127. iobuffer_write(info_pkt, IOBUF_T_I4, &params[i], 0);
  128. }
  129. iobuffer_write(info_pkt, IOBUF_T_7BIT, &n, 0);
  130. if (n > 0) {
  131. memcpy(iobuffer_data(info_pkt, -1), msg, n);
  132. iobuffer_push_count(info_pkt, n);
  133. }
  134. rc = sp_iom_post(client->iom, client->local_svc_id,
  135. SP_SHELL_MOD_ID, SP_SHELL_SVC_ID,
  136. SP_PKT_LOG | LOG_CMD_RECORD, 0, &info_pkt);
  137. if (info_pkt)
  138. iobuffer_dec_ref(info_pkt);
  139. return rc;
  140. }
  141. // TODO: param size type [4/2/2020 12:50 Gifur]
  142. int sp_log_client_logWithLink(sp_log_client_t* client, int type, int severity, int sys_error, int usr_error, int param_cnt, int* params, const char* msg, int len
  143. , const char* bussId, const char* traceId, const char* spanId, const char* parentSpanId)
  144. {
  145. int i, n;
  146. int rc;
  147. iobuffer_t *info_pkt;
  148. sp_env_t *env = sp_get_env();
  149. sp_iom_t *iom = client->iom;
  150. int local_epid = sp_iom_get_epid(iom);
  151. u__int64_t log_id = 0;
  152. y2k_time_t log_time;
  153. sp_rsn_context_t *rsn_ctx;
  154. sp_entity_t *ent;
  155. n = len;
  156. if (n < 0 || n > SP_LOG_MAX_LEN)
  157. {
  158. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log len is not right, %d, only accept max len: %d", n, SP_LOG_MAX_LEN);
  159. return Error_Param;
  160. }
  161. ent = sp_mod_mgr_find_entity_by_idx(env->mod_mgr, client->local_svc_id);
  162. if (!ent) {
  163. DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log sp_mod_mgr_find_entity_by_idx failed");
  164. return Error_Param;
  165. }
  166. log_time = y2k_time_now();
  167. rsn_ctx = client->svc ? sp_svc_get_runserial_context(client->svc) : NULL;
  168. info_pkt = iobuffer_create(-1, n + 112);
  169. if (NULL == bussId || NULL == traceId || NULL == spanId || NULL == parentSpanId || 0 == strlen(bussId) || 0 == strlen(traceId))
  170. {
  171. linkContext tmp;
  172. tmp.AutoGenerate();
  173. iobuffer_set_linkInfo(info_pkt, tmp.bussinessId.GetData(), tmp.traceId.GetData(), tmp.spanId.GetData(), tmp.parentSpanId.GetData());
  174. }
  175. else
  176. iobuffer_set_linkInfo(info_pkt, bussId, traceId, spanId, parentSpanId);
  177. iobuffer_write(info_pkt, IOBUF_T_I4, &ent->instance_id, 0);
  178. iobuffer_write(info_pkt, IOBUF_T_I8, &log_id, 0);
  179. if (rsn_ctx) {
  180. iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
  181. iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
  182. iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
  183. iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
  184. } else {
  185. u__int64_t invalid_rsn = 0;
  186. int t = 0;
  187. iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0);
  188. iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0);
  189. iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0);
  190. iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0);
  191. }
  192. iobuffer_write(info_pkt, IOBUF_T_I4, &log_time, 0);
  193. iobuffer_write(info_pkt, IOBUF_T_I4, &type, 0);
  194. iobuffer_write(info_pkt, IOBUF_T_I4, &client->local_svc_id, 0);
  195. iobuffer_write(info_pkt, IOBUF_T_I4, &local_epid, 0);
  196. iobuffer_write(info_pkt, IOBUF_T_I4, &severity, 0);
  197. iobuffer_write(info_pkt, IOBUF_T_I4, &sys_error, 0);
  198. iobuffer_write(info_pkt, IOBUF_T_I4, &usr_error, 0);
  199. iobuffer_write(info_pkt, IOBUF_T_I4, &param_cnt, 0);
  200. for (i = 0; i < param_cnt; ++i) {
  201. iobuffer_write(info_pkt, IOBUF_T_I4, &params[i], 0);
  202. }
  203. iobuffer_write(info_pkt, IOBUF_T_7BIT, &n, 0);
  204. if (n > 0) {
  205. memcpy(iobuffer_data(info_pkt, -1), msg, n);
  206. iobuffer_push_count(info_pkt, n);
  207. }
  208. rc = sp_iom_post(client->iom, client->local_svc_id,
  209. SP_SHELL_MOD_ID, SP_SHELL_SVC_ID,
  210. SP_PKT_LOG|LOG_CMD_RECORD, 0, &info_pkt);
  211. if (info_pkt)
  212. iobuffer_dec_ref(info_pkt);
  213. return rc;
  214. }
  215. int sp_log_client_flush(sp_log_client_t *client)
  216. {
  217. int rc;
  218. rc = sp_iom_post(client->iom, client->local_svc_id, SP_SHELL_MOD_ID,
  219. SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_FLUSH, 0, NULL);
  220. return rc;
  221. }
  222. //
  223. // listener
  224. //
  225. #define INDEX_HASHTABLE_SIZE 1023
  226. typedef struct sp_log_listener_cb // 对应一个SpEntity
  227. {
  228. struct list_head __entry; // < sp_log_listener_mgr_t . cb_list
  229. struct list_head __working_entry;
  230. struct list_head __list; // > listener_t . entry
  231. struct list_head __working_list;
  232. void *key;
  233. int enable;
  234. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  235. }sp_log_listener_cb;
  236. static sp_log_listener_cb* sp_log_listener_cb_create(void *key)
  237. {
  238. sp_log_listener_cb *cb = MALLOC_T(sp_log_listener_cb);
  239. cb->ref_cnt = 1;
  240. cb->enable = 1;
  241. cb->key = key;
  242. INIT_LIST_HEAD(&cb->__working_list);
  243. INIT_LIST_HEAD(&cb->__list);
  244. return cb;
  245. }
  246. static __inline void __sp_log_listener_cb_destroy(sp_log_listener_cb *cb){free(cb);}
  247. IMPLEMENT_REF_COUNT_MT_STATIC(sp_log_listener_cb, sp_log_listener_cb, ref_cnt, __sp_log_listener_cb_destroy)
  248. typedef struct listener_t // 对应每次Subscribe
  249. {
  250. struct hlist_node hentry; // < sp_log_listener_mgr_t . index[slot]
  251. struct list_head entry; // < sp_log_listener_cb . __list
  252. struct list_head working_entry;
  253. unsigned int id;
  254. int ignore_msg_body;
  255. sp_log_listener_cb *cb;
  256. }listener_t;
  257. struct sp_log_listener_mgr_t
  258. {
  259. struct list_head cb_list; // > sp_log_listener_cb . __entry
  260. struct hlist_head index[INDEX_HASHTABLE_SIZE]; // > listener_t . hentry
  261. sp_svc_t *svc;
  262. sp_log_on_log on_log;
  263. strand_t *strand;
  264. int enabled;
  265. DECLARE_REF_COUNT_MEMBER(ref_cnt); ///
  266. };
  267. static listener_t *listener_find(sp_log_listener_mgr_t *mgr, unsigned int listen_id)
  268. {
  269. unsigned slot = listen_id % INDEX_HASHTABLE_SIZE;
  270. listener_t *tpos;
  271. struct hlist_node *pos;
  272. hlist_for_each_entry(tpos, pos, &mgr->index[slot], listener_t, hentry) {
  273. if (listen_id == tpos->id) {
  274. return tpos;
  275. }
  276. }
  277. return NULL;
  278. }
  279. static __inline void listener_add_index(sp_log_listener_mgr_t *mgr, listener_t *listener)
  280. {
  281. int slot = listener->id % INDEX_HASHTABLE_SIZE;
  282. hlist_add_head(&listener->hentry, &mgr->index[slot]);
  283. }
  284. static __inline void listener_del_index(sp_log_listener_mgr_t *mgr, listener_t *listener)
  285. {
  286. int slot = listener->id % INDEX_HASHTABLE_SIZE;
  287. listener_t *tpos;
  288. struct hlist_node *pos;
  289. hlist_for_each_entry(tpos, pos, &mgr->index[slot], listener_t, hentry) {
  290. if (tpos->id == listener->id) {
  291. hlist_del(pos);
  292. return;
  293. }
  294. }
  295. }
  296. static sp_log_listener_cb *listener_find_cb(sp_log_listener_mgr_t *mgr, void *key)
  297. {
  298. sp_log_listener_cb *pos;
  299. TOOLKIT_ASSERT(key);
  300. list_for_each_entry(pos, &mgr->cb_list, sp_log_listener_cb, __entry) {
  301. if (pos->key == key)
  302. return pos;
  303. }
  304. return NULL;
  305. }
  306. static __inline void listener_invoke_cb(sp_log_listener_mgr_t *mgr, sp_log_listener_cb *cb,
  307. int nsub,
  308. u__int64_t *sub,
  309. int client_id,
  310. int log_epid,
  311. int client_instance_id,
  312. u__int64_t log_id,
  313. u__int64_t prev_rsn,
  314. u__int64_t curr_rsn,
  315. int original_rsn_type,
  316. int rsn_depth,
  317. unsigned int log_time,
  318. int log_type,
  319. int log_severity,
  320. int log_sys_error,
  321. int log_usr_error,
  322. int param_cnt,
  323. int *params,
  324. const char *msg,
  325. const char* bussId,
  326. const char* traceId,
  327. const char* spanId,
  328. const char* parentSpanId)
  329. {
  330. mgr->on_log(mgr,
  331. nsub,
  332. sub,
  333. client_id,
  334. log_epid,
  335. client_instance_id,
  336. log_id,
  337. prev_rsn,
  338. curr_rsn,
  339. original_rsn_type,
  340. rsn_depth,
  341. log_time,
  342. log_type,
  343. log_severity,
  344. log_sys_error,
  345. log_usr_error,
  346. param_cnt,
  347. params,
  348. msg,
  349. cb->key, bussId, traceId, spanId, parentSpanId);
  350. }
  351. static void listener_on_pkt_threadpool(threadpool_t *threadpool, void *arg)
  352. {
  353. iobuffer_t *pkt = (iobuffer_t*)arg;
  354. sp_log_listener_mgr_t *mgr = NULL;
  355. int epid, pkt_type, pkt_id, svc_id;
  356. sp_uid_t rsn;
  357. sp_rsn_context_t rsn_ctx;
  358. sp_svc_t *svc = NULL;
  359. int nsub = 0;
  360. int *sub;
  361. int instance_id;
  362. u__int64_t log_id;
  363. u__int64_t prev_rsn;
  364. u__int64_t curr_rsn;
  365. int original_rsn_type;
  366. int rsn_depth;
  367. unsigned int log_time;
  368. int log_type, log_severity, log_sys_error, log_usr_error, log_client_id, log_epid;
  369. char *msg = NULL;
  370. int param_cnt;
  371. int *params = NULL;
  372. int i;
  373. //TOOLKIT_ASSERT(nsub);
  374. #ifdef _WIN32
  375. iobuffer_format_read(pkt, "44444", &mgr, &epid, &svc_id, &pkt_type, &pkt_id);
  376. #else
  377. iobuffer_format_read(pkt, "84444", &mgr, &epid, &svc_id, &pkt_type, &pkt_id);
  378. #endif //_WIN32
  379. svc = mgr->svc;
  380. rsn = sp_svc_new_runserial(svc);
  381. sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_CALLBACK, &rsn_ctx);
  382. sp_svc_push_runserial_context(svc, &rsn_ctx);
  383. if (mgr->enabled) {
  384. svc = mgr->svc;
  385. nsub = pkt_id;
  386. sub = (int*)iobuffer_data(pkt, 0);
  387. iobuffer_pop_count(pkt, 4*nsub);
  388. iobuffer_format_read(pkt, "4888444444444", &instance_id, &log_id,
  389. &prev_rsn, &curr_rsn, &original_rsn_type, &rsn_depth,
  390. &log_time, &log_type, &log_client_id, &log_epid, &log_severity, &log_sys_error, &log_usr_error);
  391. iobuffer_read(pkt, IOBUF_T_I4, &param_cnt, NULL);
  392. if (param_cnt) {
  393. params = (int*)malloc(sizeof(int)*param_cnt);
  394. for (i = 0; i < param_cnt; ++i) {
  395. iobuffer_read(pkt, IOBUF_T_I4, &params[i], NULL);
  396. }
  397. }
  398. iobuffer_format_read(pkt, "s", &msg);
  399. {
  400. struct list_head cb_list;
  401. sp_log_listener_cb *pos;
  402. u__int64_t *tmp_sub;
  403. char bussinessId[LINKINFO_BUSSID_LEN], traceId[LINKINFO_TRACEID_LEN], spanId[LINKINFO_SPANID_LEN], parentSpanId[LINKINFO_PARENTSPANID_LEN];
  404. INIT_LIST_HEAD(&cb_list);
  405. memset(bussinessId, 0, LINKINFO_BUSSID_LEN);
  406. memset(traceId, 0, LINKINFO_TRACEID_LEN);
  407. memset(spanId, 0, LINKINFO_SPANID_LEN);
  408. memset(parentSpanId, 0, LINKINFO_PARENTSPANID_LEN);
  409. for (i = 0; i < nsub; ++i) {
  410. unsigned int id = (unsigned int)sub[i];
  411. listener_t *listener = listener_find(mgr, id);
  412. if (listener) {
  413. sp_log_listener_cb *cb = listener->cb;
  414. if (list_empty(&cb->__working_list)) {
  415. list_add_tail(&cb->__working_entry, &cb_list);
  416. }
  417. list_add_tail(&listener->working_entry, &cb->__working_list);
  418. }
  419. }
  420. tmp_sub = (u__int64_t*)_alloca(nsub*8);
  421. list_for_each_entry(pos, &cb_list, sp_log_listener_cb, __working_entry) {
  422. listener_t *k_pos;
  423. int cnt = 0;
  424. list_for_each_entry(k_pos, &pos->__working_list, listener_t, working_entry) {
  425. tmp_sub[cnt++] = k_pos->id;
  426. }
  427. iobuffer_get_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId);
  428. listener_invoke_cb(mgr, pos, cnt, tmp_sub, log_client_id, log_epid,
  429. instance_id, log_id, prev_rsn, curr_rsn, original_rsn_type, rsn_depth,
  430. log_time, log_type, log_severity, log_sys_error, log_usr_error, param_cnt, params, msg,bussinessId, traceId, spanId, parentSpanId);
  431. INIT_LIST_HEAD(&pos->__working_list);
  432. }
  433. }
  434. FREE(msg);
  435. free(params);
  436. }
  437. if (pkt)
  438. iobuffer_dec_ref(pkt);
  439. sp_log_listener_mgr_dec_ref(mgr);//@
  440. sp_svc_pop_runserial_context(svc);
  441. }
  442. static int listener_on_pkt(sp_svc_t *svc, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
  443. {
  444. sp_log_listener_mgr_t *mgr = (sp_log_listener_mgr_t*)user_data;
  445. int log_cmd = SP_GET_TYPE(pkt_type);
  446. if (log_cmd == LOG_CMD_LISTEN_RECORD) {
  447. iobuffer_t *pkt = *p_pkt;
  448. *p_pkt = NULL;
  449. #ifdef _WIN32
  450. iobuffer_format_write_head(pkt, "44444", &pkt_id, &pkt_type, &svc_id, &epid, (void*)&mgr);
  451. #else
  452. iobuffer_format_write_head(pkt, "44448", &pkt_id, &pkt_type, &svc_id, &epid, (void*)&mgr);
  453. #endif //_WIN32
  454. sp_log_listener_mgr_inc_ref(mgr);//@
  455. if (threadpool_queue_workitem(sp_svc_get_threadpool(svc), mgr->strand, &listener_on_pkt_threadpool, pkt) != 0) {
  456. sp_log_listener_mgr_dec_ref(mgr);//@
  457. iobuffer_dec_ref(pkt);
  458. }
  459. return FALSE;
  460. }
  461. return TRUE; // continue
  462. }
  463. int sp_log_listener_mgr_create(sp_svc_t *svc, sp_log_on_log on_log, sp_log_listener_mgr_t **p_mgr)
  464. {
  465. sp_log_listener_mgr_t *mgr = MALLOC_T(sp_log_listener_mgr_t);
  466. if (mgr) {
  467. int i;
  468. INIT_LIST_HEAD(&mgr->cb_list);
  469. mgr->enabled = 0;
  470. mgr->strand = strand_create();
  471. mgr->on_log = on_log;
  472. mgr->ref_cnt = 1;
  473. mgr->svc = svc;
  474. for (i = 0; i < INDEX_HASHTABLE_SIZE; ++i) {
  475. INIT_HLIST_HEAD(&mgr->index[i]);
  476. }
  477. *p_mgr = mgr;
  478. return 0;
  479. }
  480. return Error_Resource;
  481. }
  482. int sp_log_listener_mgr_start(sp_log_listener_mgr_t *mgr)
  483. {
  484. mgr->enabled = 1;
  485. sp_svc_add_pkt_handler(mgr->svc, (long)mgr, SP_PKT_LOG, &listener_on_pkt, mgr);
  486. return 0;
  487. }
  488. int sp_log_listener_mgr_stop(sp_log_listener_mgr_t *mgr)
  489. {
  490. mgr->enabled = 0;
  491. sp_svc_remove_pkt_handler(mgr->svc, (long)mgr, SP_PKT_LOG);
  492. return 0;
  493. }
  494. int sp_log_listener_mgr_subscribe(sp_log_listener_mgr_t *mgr, unsigned int *listen_id, void *key, int ignore_msg_body, int log_type, int ent_id, int severity_filter, int sys_code, int user_code)
  495. {
  496. sp_env_t *env = sp_get_env();
  497. int new_id = sp_env_new_id(env);
  498. sp_log_listener_cb *cb = listener_find_cb(mgr, key);
  499. listener_t *listener = NULL;
  500. iobuffer_t *pkt = NULL;
  501. if (!cb) {
  502. cb = sp_log_listener_cb_create(key);
  503. if (!cb)
  504. return Error_Resource;
  505. list_add_tail(&cb->__entry, &mgr->cb_list);
  506. }
  507. listener = MALLOC_T(listener_t);
  508. listener->cb = cb;
  509. listener->id = new_id;
  510. listener->ignore_msg_body = ignore_msg_body;
  511. listener_add_index(mgr, listener);
  512. list_add_tail(&listener->entry, &cb->__list);
  513. pkt = iobuffer_create(-1, -1);
  514. iobuffer_format_write(pkt, "444444", &ignore_msg_body, &log_type, &ent_id, &severity_filter, &sys_code, &user_code);
  515. sp_svc_post(mgr->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_SUBSCRIBE, listener->id, &pkt);
  516. if (pkt)
  517. iobuffer_dec_ref(pkt);
  518. *listen_id = new_id;
  519. return 0;
  520. }
  521. int sp_log_listener_mgr_unsubscribe(sp_log_listener_mgr_t *mgr, unsigned int listen_id)
  522. {
  523. listener_t *listener = listener_find(mgr, listen_id);
  524. if (listener) {
  525. sp_log_listener_cb *cb = listener->cb;
  526. iobuffer_t *pkt = NULL;
  527. listener_del_index(mgr, listener);
  528. list_del(&listener->entry);
  529. if (list_empty(&cb->__list)) {
  530. // list_del(&mgr->cb_list); {bug}
  531. list_del(&cb->__entry);
  532. free(cb);
  533. }
  534. sp_svc_post(mgr->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_UNSUBSCRIBE, listener->id, NULL);
  535. free(listener);
  536. return 0;
  537. }
  538. return Error_NotExist;
  539. }
  540. static void __sp_log_listener_mgr_destroy(sp_log_listener_mgr_t *mgr)
  541. {
  542. strand_destroy(mgr->strand);
  543. free(mgr);
  544. }
  545. IMPLEMENT_REF_COUNT_MT(sp_log_listener_mgr, sp_log_listener_mgr_t, ref_cnt, __sp_log_listener_mgr_destroy)
  546. //
  547. // daemon
  548. //
  549. typedef struct log_filter_key_t log_filter_key_t;
  550. typedef struct log_listener_t log_listener_t;
  551. typedef struct log_listener_entity_t log_listener_entity_t;
  552. struct log_filter_key_t {
  553. int log_type;
  554. int ent_id;
  555. int severity_filter;
  556. int sys_code;
  557. int user_code; // -1 and -2 has special means, -2 means accept all, -1 means reject any
  558. unsigned int index_hash_code;
  559. };
  560. struct log_listener_t {
  561. struct list_head entry; // < log_listener_entity_t . listener_list
  562. int id;
  563. int ignore_msg_body;
  564. log_filter_key_t index_key;
  565. struct hlist_node index_hentry; // < sp_log_daemon_t . arr_filter_index
  566. struct hlist_node working_hentry;
  567. log_listener_entity_t *owner; // = log_listener_entity_t
  568. };
  569. struct log_listener_entity_t {
  570. struct list_head listener_list; // > log_listener_t . entry
  571. int epid;
  572. int svc_id;
  573. };
  574. struct sp_log_daemon_t
  575. {
  576. sp_log_on_log on_log;
  577. sp_log_on_flush on_flush;
  578. sp_log_on_timeout_interval on_timeout_interval;
  579. void *user_data;
  580. sp_svc_t *svc;
  581. CRITICAL_SECTION lock;
  582. iobuffer_queue_t *pkt_queue;
  583. HANDLE pkt_sem;
  584. HANDLE stop_evt;
  585. HANDLE worker_thread;
  586. sp_uid_t last_log_id;
  587. struct hlist_head arr_filter_index[FILTER_HASHTABLE_SIZE]; // log_listener_t . index_hentry
  588. int masks[16];
  589. int masks_cnt;
  590. array_header_t *arr_entity; // > log_listener_entity_t
  591. };
  592. static __inline void daemon_lock(sp_log_daemon_t *daemon)
  593. {
  594. EnterCriticalSection(&daemon->lock);
  595. }
  596. static __inline void daemon_unlock(sp_log_daemon_t *daemon)
  597. {
  598. LeaveCriticalSection(&daemon->lock);
  599. }
  600. static __inline unsigned int hash_filter(int log_type, int ent_id, int severity_filter, int sys_code, int user_code)
  601. {
  602. unsigned int t = (log_type << 16) | (ent_id << 8) | severity_filter;
  603. return jhash_3words(t, sys_code, user_code, 0);
  604. }
  605. static log_listener_entity_t* daemon_find_entity(sp_log_daemon_t *daemon, int ent_id)
  606. {
  607. int i;
  608. for (i = 0; i < daemon->arr_entity->nelts; ++i) {
  609. log_listener_entity_t *listen_ent = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*);
  610. if (listen_ent->svc_id == ent_id)
  611. return listen_ent;
  612. }
  613. return NULL;
  614. }
  615. static log_listener_t *daemon_find_listener(sp_log_daemon_t *daemon, int listen_id, int *idx)
  616. {
  617. int i;
  618. for (i = 0; i < daemon->arr_entity->nelts; ++i) {
  619. log_listener_entity_t *e = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*);
  620. log_listener_t *pos;
  621. list_for_each_entry(pos, &e->listener_list, log_listener_t, entry) {
  622. if (pos->id == listen_id) {
  623. if (idx)
  624. *idx = i;
  625. return pos;
  626. }
  627. }
  628. }
  629. return NULL;
  630. }
  631. static __inline int calc_filter_mask(int log_type, int ent_id, int severity_filter, int sys_code)
  632. {
  633. int filter_mask = 0;
  634. if (log_type == Log_Ignore)
  635. filter_mask |= 1 << LOG_FILTER_BIT_LOGTYPE;
  636. if (ent_id == -1)
  637. filter_mask |= 1 << LOG_FILTER_BIT_ENTITY;
  638. if (severity_filter == Severity_None)
  639. filter_mask |= 1 << LOG_FILTER_BIT_SEVERITY;
  640. if (sys_code == Error_IgnoreAll)
  641. filter_mask |= 1 << LOG_FILTER_BIT_SYSCODE;
  642. return filter_mask;
  643. }
  644. #define OP_ADD 1
  645. #define OP_DEL 0
  646. static void update_filter_mask_table(sp_log_daemon_t *daemon, int filter_mask, int op_add)
  647. {
  648. int i;
  649. if (op_add) {
  650. for (i = 0; i < daemon->masks_cnt; ++i)
  651. if ((daemon->masks[i]&0xffff) == filter_mask) {
  652. int cnt = (daemon->masks[i]&0xffff0000) >> 16;
  653. cnt++;
  654. daemon->masks[i] = filter_mask | (cnt << 16);
  655. break;
  656. }
  657. if (i == daemon->masks_cnt)
  658. daemon->masks[daemon->masks_cnt++] = filter_mask | (1<<16);
  659. } else {
  660. for (i = 0; i < daemon->masks_cnt; ++i) {
  661. if ((daemon->masks[i]&0xffff) == filter_mask) {
  662. int cnt = (daemon->masks[i] & 0xffff0000) >> 16;
  663. int filter_mask = daemon->masks[i] & 0xffff;
  664. cnt--;
  665. if (cnt == 0) {
  666. if (i != daemon->masks_cnt-1) {
  667. daemon->masks[i] = daemon->masks[daemon->masks_cnt-1];
  668. }
  669. daemon->masks_cnt--;
  670. } else {
  671. daemon->masks[i] = ((cnt-1) << 16) | filter_mask;
  672. }
  673. break;
  674. }
  675. }
  676. }
  677. }
  678. static __inline void daemon_add_index(sp_log_daemon_t *daemon, log_listener_t *e)
  679. {
  680. int filter_mask = 0;
  681. int slot = e->index_key.index_hash_code % FILTER_HASHTABLE_SIZE;
  682. hlist_add_head(&e->index_hentry, &daemon->arr_filter_index[slot]);
  683. filter_mask = calc_filter_mask(e->index_key.log_type, e->index_key.ent_id, e->index_key.severity_filter, e->index_key.sys_code);
  684. update_filter_mask_table(daemon, filter_mask, OP_ADD);
  685. }
  686. static __inline void daemon_del_index(sp_log_daemon_t *daemon, log_listener_t *e)
  687. {
  688. int filter_mask = 0;
  689. hlist_del(&e->index_hentry);
  690. filter_mask = calc_filter_mask(e->index_key.log_type, e->index_key.ent_id, e->index_key.severity_filter, e->index_key.sys_code);
  691. update_filter_mask_table(daemon, filter_mask, OP_DEL);
  692. }
  693. static int daemon_listener_need_log(sp_log_daemon_t *daemon,
  694. log_listener_t *e, int log_type,
  695. int log_severity,
  696. int log_sys_error,
  697. int log_usr_error,
  698. int log_client_id)
  699. {
  700. log_filter_key_t *key = &e->index_key;
  701. if (key->ent_id == -1 || key->ent_id == log_client_id) {
  702. if (key->log_type == Log_Ignore || key->log_type == log_type) {
  703. if (key->severity_filter == Severity_None || key->severity_filter == log_severity) {
  704. if (key->sys_code == Error_IgnoreAll || key->sys_code == log_sys_error) {
  705. if (key->user_code == -2 || (key->user_code == -1 && log_usr_error) || key->user_code == log_usr_error)
  706. return TRUE;
  707. }
  708. }
  709. }
  710. }
  711. return FALSE;
  712. }
  713. static void daemon_on_sys(sp_svc_t *svc,int epid, int state, void *user_data)
  714. {
  715. sp_log_daemon_t *daemon = (sp_log_daemon_t*)user_data;
  716. if (state == BUS_STATE_OFF) {
  717. int i = 0;
  718. daemon_lock(daemon);
  719. for (i = 0; i < daemon->arr_entity->nelts; ++i) {
  720. log_listener_entity_t *listen_ent = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*);
  721. if (listen_ent->epid == epid) {
  722. log_listener_t *pos, *n;
  723. list_for_each_entry_safe(pos, n, &listen_ent->listener_list, log_listener_t, entry) {
  724. list_del(&pos->entry);
  725. daemon_del_index(daemon, pos);
  726. free(pos);
  727. }
  728. ARRAY_DEL(daemon->arr_entity, i, log_listener_entity_t*);
  729. free(listen_ent);
  730. }
  731. }
  732. daemon_unlock(daemon);
  733. }
  734. }
  735. static int daemon_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
  736. {
  737. sp_log_daemon_t *daemon = (sp_log_daemon_t*)user_data;
  738. int log_cmd = SP_GET_TYPE(pkt_type);
  739. int log_client_id = pkt_id;
  740. if (log_cmd == LOG_CMD_RECORD || log_cmd == LOG_CMD_FLUSH) {
  741. if (log_cmd == LOG_CMD_RECORD)
  742. {
  743. char bussinessId[LINKINFO_BUSSID_LEN];
  744. char traceId[LINKINFO_TRACEID_LEN];
  745. char spanId[LINKINFO_SPANID_LEN];
  746. char parentSpanId[LINKINFO_PARENTSPANID_LEN];
  747. iobuffer_get_linkInfo(*p_pkt, bussinessId, traceId, spanId, parentSpanId);
  748. iobuffer_write_head(*p_pkt, IOBUF_T_BUF, parentSpanId, sizeof(parentSpanId));
  749. iobuffer_write_head(*p_pkt, IOBUF_T_BUF, spanId, sizeof(spanId));
  750. iobuffer_write_head(*p_pkt, IOBUF_T_BUF, traceId, sizeof(traceId));
  751. iobuffer_write_head(*p_pkt, IOBUF_T_BUF, bussinessId, sizeof(bussinessId));
  752. }
  753. iobuffer_write_head(*p_pkt, IOBUF_T_I4, &pkt_id, 0);
  754. iobuffer_write_head(*p_pkt, IOBUF_T_I4, &pkt_type, 0);
  755. iobuffer_write_head(*p_pkt, IOBUF_T_I4, &svc_id, 0);
  756. iobuffer_write_head(*p_pkt, IOBUF_T_I4, &epid, 0);
  757. daemon_lock(daemon);
  758. iobuffer_queue_enqueue(daemon->pkt_queue, *p_pkt);
  759. daemon_unlock(daemon);
  760. ReleaseSemaphore(daemon->pkt_sem, 1, NULL);
  761. *p_pkt = NULL;
  762. } else if (log_cmd == LOG_CMD_SUBSCRIBE) {
  763. int id = pkt_id;
  764. int ignore_msg_body;
  765. int log_type;
  766. int ent_id;
  767. int severity_filter;
  768. int sys_code;
  769. int user_code;
  770. log_listener_entity_t *listen_ent;
  771. iobuffer_format_read(*p_pkt, "444444", &ignore_msg_body, &log_type, &ent_id, &severity_filter, &sys_code, &user_code);
  772. daemon_lock(daemon);
  773. listen_ent = daemon_find_entity(daemon, svc_id);
  774. if (!listen_ent) {
  775. listen_ent = MALLOC_T(log_listener_entity_t);
  776. listen_ent->epid = epid;
  777. listen_ent->svc_id = svc_id;
  778. INIT_LIST_HEAD(&listen_ent->listener_list);
  779. ARRAY_PUSH(daemon->arr_entity, log_listener_entity_t*) = listen_ent;
  780. }
  781. if (daemon_find_listener(daemon, id, NULL) == NULL) {
  782. log_listener_t *listener = MALLOC_T(log_listener_t);
  783. log_filter_key_t *key = &listener->index_key;
  784. listener->owner = listen_ent;
  785. listener->ignore_msg_body = ignore_msg_body;
  786. listener->id = id;
  787. INIT_HLIST_NODE(&listener->working_hentry);
  788. key->ent_id = ent_id;
  789. key->log_type = log_type;
  790. key->severity_filter = severity_filter;
  791. key->sys_code = sys_code;
  792. key->user_code = user_code;
  793. key->index_hash_code = hash_filter(log_type, ent_id, severity_filter, sys_code, user_code);
  794. daemon_add_index(daemon, listener);
  795. list_add_tail(&listener->entry, &listen_ent->listener_list);
  796. }
  797. daemon_unlock(daemon);
  798. } else if (log_cmd == LOG_CMD_UNSUBSCRIBE) {
  799. int i;
  800. int id = pkt_id;
  801. log_listener_t* tmp;
  802. daemon_lock(daemon);
  803. tmp = daemon_find_listener(daemon, id, &i);
  804. if (tmp) {
  805. list_del(&tmp->entry);
  806. if (list_empty(&tmp->owner->listener_list)) {
  807. ARRAY_DEL(daemon->arr_entity, i, log_listener_entity_t*);
  808. free(tmp->owner);
  809. }
  810. daemon_del_index(daemon, tmp);
  811. free(tmp);
  812. }
  813. daemon_unlock(daemon);
  814. } else {
  815. TOOLKIT_ASSERT(0);
  816. }
  817. return TRUE;
  818. }
  819. static int get_log_iobuffer_header_length(iobuffer_t *pkt)
  820. {
  821. //"4888444444444"
  822. int nparam;
  823. int len = 64;
  824. int rs = iobuffer_get_read_state(pkt);
  825. iobuffer_pop_count(pkt, len);
  826. iobuffer_read(pkt, IOBUF_T_I4, &nparam, 0);
  827. len += 4;
  828. len += 4 * nparam;
  829. iobuffer_restore_read_state(pkt, rs);
  830. return len;
  831. }
  832. static void daemon_bcast(sp_log_daemon_t *daemon, int from_client_id, iobuffer_t *pkt)
  833. {
  834. int i, j;
  835. int instance_id;
  836. u__int64_t prev_rsn;
  837. u__int64_t curr_rsn;
  838. int original_rsn_type;
  839. int rsn_depth;
  840. u__int64_t log_id;
  841. unsigned int log_time;
  842. int log_type, log_severity, log_sys_error, log_usr_error, log_client_id, log_epid;
  843. int rs = iobuffer_get_read_state(pkt);
  844. struct {
  845. struct hlist_head ent;
  846. int ignore_msg_body;
  847. }results[SP_MAX_ENTITY];
  848. for (i = 0; i < SP_MAX_ENTITY; ++i) {
  849. INIT_HLIST_HEAD(&results[i].ent);
  850. results[i].ignore_msg_body = 1;
  851. }
  852. iobuffer_format_read(pkt, "4888444444444", &instance_id, &log_id,
  853. &prev_rsn, &curr_rsn, &original_rsn_type, &rsn_depth,
  854. &log_time, &log_type, &log_client_id, &log_epid,
  855. &log_severity, &log_sys_error, &log_usr_error);
  856. iobuffer_restore_read_state(pkt, rs);
  857. daemon_lock(daemon);
  858. for (j = 0; j < 3; ++j)
  859. {
  860. int t_user_code;
  861. if (j == 0)
  862. {
  863. t_user_code = log_usr_error; // strict match
  864. }
  865. else if (j == 1)
  866. {
  867. t_user_code = -2; // accept any
  868. }
  869. else
  870. {
  871. if (log_usr_error) {
  872. t_user_code = -1; // reject ones that has no user code
  873. } else {
  874. continue;
  875. }
  876. }
  877. for (i = 0; i < daemon->masks_cnt; ++i) {
  878. int t = daemon->masks[i] & 0xffff;
  879. int t_log_type = (t & BIT_MASK(LOG_FILTER_BIT_LOGTYPE)) ? Log_Ignore : log_type;
  880. int t_ent_id = (t & BIT_MASK(LOG_FILTER_BIT_ENTITY)) ? -1 : log_client_id;
  881. int t_severity_filter = (t & BIT_MASK(LOG_FILTER_BIT_SEVERITY)) ? Severity_None : log_severity;
  882. int t_sys_code = (t & BIT_MASK(LOG_FILTER_BIT_SYSCODE)) ? Error_IgnoreAll : log_sys_error;
  883. unsigned int index_hash_code = hash_filter(t_log_type, t_ent_id, t_severity_filter, t_sys_code, t_user_code);
  884. log_listener_t *tpos;
  885. struct hlist_node *pos;
  886. hlist_for_each_entry(tpos, pos, &daemon->arr_filter_index[index_hash_code%FILTER_HASHTABLE_SIZE], log_listener_t, index_hentry) {
  887. if (index_hash_code == tpos->index_key.index_hash_code) {
  888. if (daemon_listener_need_log(daemon, tpos, log_type, log_severity, log_sys_error, log_usr_error, from_client_id))
  889. {
  890. if (tpos->working_hentry.pprev == NULL)
  891. {
  892. hlist_add_head(&tpos->working_hentry, &results[tpos->owner->svc_id].ent);
  893. results[tpos->owner->svc_id].ignore_msg_body &= tpos->ignore_msg_body;
  894. }
  895. }
  896. }
  897. }
  898. }
  899. }
  900. for (i = 0; i < SP_MAX_ENTITY; ++i) {
  901. if (!hlist_empty(&results[i].ent)) {
  902. int ignore_msg_body = results[i].ignore_msg_body;
  903. iobuffer_t *copy_pkt = iobuffer_create(-1, -1);
  904. struct hlist_node *pos, *n;
  905. log_listener_t *tpos;
  906. int cnt = 0;
  907. int epid = -1;
  908. int svc_id = -1;
  909. hlist_for_each_entry(tpos, pos, &results[i].ent, log_listener_t, working_hentry) {
  910. cnt++;
  911. if (epid == -1) {
  912. epid = tpos->owner->epid;
  913. svc_id = tpos->owner->svc_id;
  914. }
  915. }
  916. //iobuffer_write(copy_pkt, IOBUF_T_I4, &cnt, 0);
  917. hlist_for_each_entry_safe(tpos, pos, n, &results[i].ent, log_listener_t, working_hentry) {
  918. iobuffer_write(copy_pkt, IOBUF_T_I4, &tpos->id, 0);
  919. hlist_del(pos);
  920. }
  921. // 复制原包内容
  922. if (results[i].ignore_msg_body) {
  923. int msg_body_len = 0;
  924. int dat_len = get_log_iobuffer_header_length(pkt);
  925. void *dat = iobuffer_data(pkt, 0);
  926. iobuffer_write(copy_pkt, IOBUF_T_BUF, dat, dat_len);
  927. iobuffer_write(copy_pkt, IOBUF_T_7BIT, &msg_body_len, 0);
  928. } else {
  929. void *dat = iobuffer_data(pkt, 0);
  930. int dat_len = iobuffer_get_length(pkt);
  931. iobuffer_write(copy_pkt, IOBUF_T_BUF, dat, dat_len);
  932. }
  933. iobuffer_copy_linkInfo(copy_pkt, pkt);
  934. sp_svc_post(daemon->svc, epid, svc_id, SP_PKT_LOG|LOG_CMD_LISTEN_RECORD, cnt, &copy_pkt);
  935. if (copy_pkt)
  936. iobuffer_dec_ref(copy_pkt);
  937. }
  938. }
  939. daemon_unlock(daemon);
  940. }
  941. static unsigned int __stdcall daemon_work_proc(void *param)
  942. {
  943. sp_log_daemon_t *daemon = (sp_log_daemon_t*)param;
  944. HANDLE hs[] = {daemon->pkt_sem, daemon->stop_evt};
  945. sp_env_t *env = sp_get_env();
  946. for (;;) {
  947. DWORD dwRet = WaitForMultipleObjects(array_size(hs), &hs[0], FALSE, DAEMON_LOG_TIMEOUT_INTERVAL);
  948. if (dwRet == WAIT_OBJECT_0)
  949. {
  950. int pkt_id, svc_id, pkt_type, epid;
  951. iobuffer_t *pkt;
  952. char bussinessId[LINKINFO_BUSSID_LEN], traceId[LINKINFO_TRACEID_LEN], spanId[LINKINFO_SPANID_LEN], parentSpanId[LINKINFO_PARENTSPANID_LEN];
  953. daemon_lock(daemon);
  954. pkt = iobuffer_queue_deque(daemon->pkt_queue);
  955. daemon_unlock(daemon);
  956. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  957. iobuffer_read(pkt, IOBUF_T_I4, &svc_id, 0);
  958. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  959. iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, 0);
  960. if (SP_GET_TYPE(pkt_type) == LOG_CMD_RECORD)
  961. {
  962. int readLen = 0;
  963. memset(bussinessId, 0, LINKINFO_BUSSID_LEN);
  964. memset(traceId, 0, LINKINFO_TRACEID_LEN);
  965. memset(spanId, 0, LINKINFO_SPANID_LEN);
  966. memset(parentSpanId, 0, LINKINFO_PARENTSPANID_LEN);
  967. readLen = LINKINFO_BUSSID_LEN;
  968. iobuffer_read(pkt, IOBUF_T_BUF, bussinessId, &readLen);
  969. readLen = LINKINFO_TRACEID_LEN;
  970. iobuffer_read(pkt, IOBUF_T_BUF, traceId, &readLen);
  971. readLen = LINKINFO_SPANID_LEN;
  972. iobuffer_read(pkt, IOBUF_T_BUF, spanId, &readLen);
  973. readLen = LINKINFO_PARENTSPANID_LEN;
  974. iobuffer_read(pkt, IOBUF_T_BUF, parentSpanId, &readLen);
  975. iobuffer_set_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId);
  976. int instance_id;
  977. u__int64_t log_id;
  978. u__int64_t prev_rsn;
  979. u__int64_t curr_rsn;
  980. int original_rsn_type;
  981. int rsn_depth;
  982. unsigned int log_time;
  983. int log_type, log_severity, log_sys_error, log_usr_error, log_client_id, log_epid;
  984. char *msg = NULL;
  985. int param_cnt;
  986. int *params = NULL;
  987. int i;
  988. iobuffer_read(pkt, IOBUF_T_I4, &instance_id, NULL);
  989. iobuffer_read(pkt, IOBUF_T_I8, &log_id, NULL);
  990. daemon->last_log_id = log_id = sp_uid_update(daemon->last_log_id);
  991. log_id = sp_uid_change_app_id(log_id, 0);
  992. iobuffer_write_head(pkt, IOBUF_T_I8, &log_id, 0);
  993. iobuffer_write_head(pkt, IOBUF_T_I4, &instance_id, 0);
  994. if (svc_id != SP_INVALID_SVC_ID)
  995. daemon_bcast(daemon, svc_id, pkt);
  996. iobuffer_read(pkt, IOBUF_T_I4, &instance_id, NULL);
  997. iobuffer_read(pkt, IOBUF_T_I8, &log_id, NULL);
  998. iobuffer_read(pkt, IOBUF_T_I8, &prev_rsn, NULL);
  999. iobuffer_read(pkt, IOBUF_T_I8, &curr_rsn, NULL);
  1000. iobuffer_read(pkt, IOBUF_T_I4, &original_rsn_type, NULL);
  1001. iobuffer_read(pkt, IOBUF_T_I4, &rsn_depth, NULL);
  1002. iobuffer_read(pkt, IOBUF_T_I4, &log_time, NULL);
  1003. iobuffer_read(pkt, IOBUF_T_I4, &log_type, NULL);
  1004. iobuffer_read(pkt, IOBUF_T_I4, &log_client_id, NULL);
  1005. iobuffer_read(pkt, IOBUF_T_I4, &log_epid, NULL);
  1006. iobuffer_read(pkt, IOBUF_T_I4, &log_severity, NULL);
  1007. iobuffer_read(pkt, IOBUF_T_I4, &log_sys_error, NULL);
  1008. iobuffer_read(pkt, IOBUF_T_I4, &log_usr_error, NULL);
  1009. /** TODO: 移出与实体关联的特定事件!!*/
  1010. // GPIO移动事件不写日志
  1011. if (log_usr_error != 0x2090000A && log_usr_error != 0x20900009)
  1012. {
  1013. iobuffer_read(pkt, IOBUF_T_I4, &param_cnt, NULL);
  1014. if (param_cnt) {
  1015. params = (int*)malloc(sizeof(int)*param_cnt);
  1016. for (i = 0; i < param_cnt; ++i) {
  1017. iobuffer_read(pkt, IOBUF_T_I4, &params[i], NULL);
  1018. }
  1019. }
  1020. iobuffer_format_read(pkt, "s", &msg);
  1021. daemon->on_log(daemon, 0, 0, log_client_id, log_epid, instance_id,
  1022. log_id, prev_rsn, curr_rsn, original_rsn_type, rsn_depth,
  1023. log_time, log_type, log_severity, log_sys_error,
  1024. log_usr_error, param_cnt, params, msg, daemon->user_data, bussinessId, traceId, spanId, parentSpanId);
  1025. FREE(msg);
  1026. free(params);
  1027. }
  1028. }
  1029. else if (SP_GET_TYPE(pkt_type) == LOG_CMD_FLUSH)
  1030. {
  1031. SYSTEMTIME st;
  1032. GetLocalTime(&st);
  1033. daemon->on_flush(daemon, pkt_id, &st, daemon->user_data);
  1034. }
  1035. else
  1036. {
  1037. TOOLKIT_ASSERT(0);
  1038. }
  1039. iobuffer_dec_ref(pkt);
  1040. }
  1041. else if (dwRet == WAIT_OBJECT_0+1) {
  1042. break; // stop
  1043. }
  1044. else if (dwRet == WAIT_TIMEOUT) {
  1045. SYSTEMTIME st;
  1046. GetLocalTime(&st);
  1047. daemon->on_timeout_interval(daemon, &st, daemon->user_data);
  1048. }
  1049. }
  1050. return 0;
  1051. }
  1052. int sp_log_daemon_create(sp_log_on_log on_log,
  1053. sp_log_on_flush on_flush,
  1054. sp_log_on_timeout_interval on_timeout_interval,
  1055. void *user_data,
  1056. sp_svc_t *svc,
  1057. sp_log_daemon_t **p_daemon)
  1058. {
  1059. sp_log_daemon_t *daemon = ZALLOC_T(sp_log_daemon_t);
  1060. sp_env_t *env = sp_get_env();
  1061. int i;
  1062. daemon->on_log = on_log;
  1063. daemon->on_flush = on_flush;
  1064. daemon->on_timeout_interval = on_timeout_interval;
  1065. daemon->user_data = user_data;
  1066. daemon->svc = svc;
  1067. daemon->pkt_queue = iobuffer_queue_create();
  1068. daemon->arr_entity = array_make(0, sizeof(log_listener_entity_t*));
  1069. InitializeCriticalSection(&daemon->lock);
  1070. daemon->stop_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  1071. daemon->pkt_sem = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL);
  1072. daemon->last_log_id = sp_uid_make(0);
  1073. daemon->masks_cnt = 0;
  1074. for (i = 0; i < FILTER_HASHTABLE_SIZE; ++i) {
  1075. INIT_HLIST_HEAD(&daemon->arr_filter_index[i]);
  1076. }
  1077. daemon->worker_thread = (HANDLE)_beginthreadex(NULL, 0, &daemon_work_proc, daemon, 0, NULL);
  1078. if (!daemon->worker_thread) {
  1079. sp_log_daemon_destroy(daemon);
  1080. return Error_Resource;
  1081. }
  1082. sp_svc_add_pkt_handler(svc, (long)daemon, SP_PKT_LOG, &daemon_on_pkt, daemon);
  1083. sp_svc_add_sys_handler(svc, (long)daemon, &daemon_on_sys, daemon);
  1084. *p_daemon = daemon;
  1085. return 0;
  1086. }
  1087. int sp_log_daemon_destroy(sp_log_daemon_t *daemon)
  1088. {
  1089. int i;
  1090. if (daemon->worker_thread) {
  1091. ///**TODO(Gifur@5/6/2022): *’ to ‘int’ loses precision [-fpermissive] */
  1092. sp_svc_remove_pkt_handler(daemon->svc, (long)daemon, SP_PKT_LOG);
  1093. sp_svc_remove_sys_handler(daemon->svc, (long)daemon);
  1094. SetEvent(daemon->stop_evt);
  1095. WaitForSingleObject(daemon->worker_thread, INFINITE);
  1096. CloseHandle(daemon->worker_thread);
  1097. }
  1098. CloseHandle(daemon->stop_evt);
  1099. for (i = 0; i < daemon->arr_entity->nelts; ++i) {
  1100. log_listener_entity_t *tmp = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*);
  1101. log_listener_t *pos, *n;
  1102. list_for_each_entry_safe(pos, n, &tmp->listener_list, log_listener_t, entry) {
  1103. list_del(&pos->entry);
  1104. free(pos);
  1105. }
  1106. free(tmp);
  1107. }
  1108. array_free(daemon->arr_entity);
  1109. if (daemon->pkt_sem) {
  1110. CloseHandle(daemon->pkt_sem);
  1111. }
  1112. iobuffer_queue_destroy(daemon->pkt_queue);
  1113. DeleteCriticalSection(&daemon->lock);
  1114. free(daemon);
  1115. return 0;
  1116. }