123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236 |
- #include "precompile.h"
- #include <stdarg.h>
- #include "sp_log.h"
- #include "sp_def.h"
- #include "sp_svc.h"
- #include "sp_env.h"
- #include "sp_uid.h"
- #include "sp_rsn.h"
- #include "SpBase.h"
- #include "memutil.h"
- #include "list.h"
- #include "array.h"
- #include "hashset.h"
- #include "jhash.h"
- #include "refcnt.h"
- #include "y2k_time.h"
- #include "dbgutil.h"
- #include <winpr/synch.h>
- #include <winpr/sysinfo.h>
- #include <winpr/thread.h>
- #define LOG_CMD_RECORD 0x01
- #define LOG_CMD_FLUSH 0x02
- #define LOG_CMD_SUBSCRIBE 0x03
- #define LOG_CMD_UNSUBSCRIBE 0x04
- #define LOG_CMD_LISTEN_RECORD 0x05
- #define DAEMON_LOG_TIMEOUT_INTERVAL 5
- #define LOG_FILTER_BIT_LOGTYPE 0
- #define LOG_FILTER_BIT_ENTITY 1
- #define LOG_FILTER_BIT_SEVERITY 2
- #define LOG_FILTER_BIT_SYSCODE 3
- #define BIT_MASK(bit) (1 << (bit))
- #define FILTER_HASHTABLE_SIZE 1023
- struct sp_log_client_t
- {
- int local_svc_id;
- sp_svc_t *svc;
- sp_iom_t *iom;
- };
- // svc == null, iom cannot be null, anonymous
- // svc != null, iom can be null
- int sp_log_client_create(sp_svc_t *svc, sp_iom_t *iom, sp_log_client_t **p_client)
- {
- sp_log_client_t *client = MALLOC_T(sp_log_client_t);
- if (client) {
- if (svc) {
- if (!iom) {
- iom = sp_svc_get_iom(svc);
- }
- }
- client->svc = svc;
- client->iom = iom;
- if (svc) {
- client->local_svc_id = sp_svc_get_id(svc);
- } else {
- client->local_svc_id = SP_INVALID_SVC_ID;
- }
- *p_client = client;
- return 0;
- } else {
- return -1;
- };
- }
- int sp_log_client_destroy(sp_log_client_t *client)
- {
- free(client);
- return 0;
- }
- // TODO: param size type [4/2/2020 12:50 Gifur]
- /*
- int sp_log_client_log(sp_log_client_t *client, int type, int severity, int sys_error, int usr_error, int param_cnt, int *params, const char *format, ...)
- {
- va_list arg;
- va_start(arg, format);
- return sp_log_client_logv(client, type, severity, sys_error, usr_error, param_cnt, params, format, arg);
- }
- */
- //对比sp_log_client_logv, 去除了可变参数,当前已经完全替代了sp_log_client_log和sp_log_client_logv
- int sp_log_client_logEx(sp_log_client_t* client, int type, int severity, int sys_error, int usr_error, int param_cnt, int* params, const char* msg, int len)
- {
- int i, n;
- int rc;
- iobuffer_t* info_pkt;
- sp_env_t* env = sp_get_env();
- sp_iom_t* iom = client->iom;
- int local_epid = sp_iom_get_epid(iom);
- u__int64_t log_id = 0;
- y2k_time_t log_time;
- sp_rsn_context_t* rsn_ctx;
- sp_entity_t* ent;
- n = len;
- if (n < 0 || n > SP_LOG_MAX_LEN) {
- DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log len is not right, %d, only accept max len: %d", n, SP_LOG_MAX_LEN);
- return Error_Param;
- }
- ent = sp_mod_mgr_find_entity_by_idx(env->mod_mgr, client->local_svc_id);
- if (!ent) {
- DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log sp_mod_mgr_find_entity_by_idx failed");
- return Error_Param;
- }
- log_time = y2k_time_now();
- rsn_ctx = client->svc ? sp_svc_get_runserial_context(client->svc) : NULL;
- info_pkt = iobuffer_create(-1, n + 112);
- iobuffer_write(info_pkt, IOBUF_T_I4, &ent->instance_id, 0);
- iobuffer_write(info_pkt, IOBUF_T_I8, &log_id, 0);
- if (rsn_ctx) {
- iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- u__int64_t invalid_rsn = 0;
- int t = 0;
- iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0);
- iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0);
- }
- iobuffer_write(info_pkt, IOBUF_T_I4, &log_time, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &type, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &client->local_svc_id, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &local_epid, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &severity, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &sys_error, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &usr_error, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, ¶m_cnt, 0);
- for (i = 0; i < param_cnt; ++i) {
- iobuffer_write(info_pkt, IOBUF_T_I4, ¶ms[i], 0);
- }
- iobuffer_write(info_pkt, IOBUF_T_7BIT, &n, 0);
- if (n > 0) {
- memcpy(iobuffer_data(info_pkt, -1), msg, n);
- iobuffer_push_count(info_pkt, n);
- }
- rc = sp_iom_post(client->iom, client->local_svc_id,
- SP_SHELL_MOD_ID, SP_SHELL_SVC_ID,
- SP_PKT_LOG | LOG_CMD_RECORD, 0, &info_pkt);
- if (info_pkt)
- iobuffer_dec_ref(info_pkt);
- return rc;
- }
- // TODO: param size type [4/2/2020 12:50 Gifur]
- int sp_log_client_logWithLink(sp_log_client_t* client, int type, int severity, int sys_error, int usr_error, int param_cnt, int* params, const char* msg, int len
- , const char* bussId, const char* traceId, const char* spanId, const char* parentSpanId)
- {
- int i, n;
- int rc;
- iobuffer_t *info_pkt;
- sp_env_t *env = sp_get_env();
- sp_iom_t *iom = client->iom;
- int local_epid = sp_iom_get_epid(iom);
- u__int64_t log_id = 0;
- y2k_time_t log_time;
- sp_rsn_context_t *rsn_ctx;
- sp_entity_t *ent;
- n = len;
- if (n < 0 || n > SP_LOG_MAX_LEN)
- {
- DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log len is not right, %d, only accept max len: %d", n, SP_LOG_MAX_LEN);
- return Error_Param;
- }
- ent = sp_mod_mgr_find_entity_by_idx(env->mod_mgr, client->local_svc_id);
- if (!ent) {
- DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log sp_mod_mgr_find_entity_by_idx failed");
- return Error_Param;
- }
- log_time = y2k_time_now();
- rsn_ctx = client->svc ? sp_svc_get_runserial_context(client->svc) : NULL;
- info_pkt = iobuffer_create(-1, n + 112);
- if (NULL == bussId || NULL == traceId || NULL == spanId || NULL == parentSpanId || 0 == strlen(bussId) || 0 == strlen(traceId))
- {
- linkContext tmp;
- tmp.AutoGenerate();
- iobuffer_set_linkInfo(info_pkt, tmp.bussinessId.GetData(), tmp.traceId.GetData(), tmp.spanId.GetData(), tmp.parentSpanId.GetData());
- }
- else
- iobuffer_set_linkInfo(info_pkt, bussId, traceId, spanId, parentSpanId);
- iobuffer_write(info_pkt, IOBUF_T_I4, &ent->instance_id, 0);
- iobuffer_write(info_pkt, IOBUF_T_I8, &log_id, 0);
- if (rsn_ctx) {
- iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0);
- iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0);
- } else {
- u__int64_t invalid_rsn = 0;
- int t = 0;
- iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0);
- iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0);
- }
- iobuffer_write(info_pkt, IOBUF_T_I4, &log_time, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &type, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &client->local_svc_id, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &local_epid, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &severity, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &sys_error, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, &usr_error, 0);
- iobuffer_write(info_pkt, IOBUF_T_I4, ¶m_cnt, 0);
- for (i = 0; i < param_cnt; ++i) {
- iobuffer_write(info_pkt, IOBUF_T_I4, ¶ms[i], 0);
- }
- iobuffer_write(info_pkt, IOBUF_T_7BIT, &n, 0);
- if (n > 0) {
- memcpy(iobuffer_data(info_pkt, -1), msg, n);
- iobuffer_push_count(info_pkt, n);
- }
- rc = sp_iom_post(client->iom, client->local_svc_id,
- SP_SHELL_MOD_ID, SP_SHELL_SVC_ID,
- SP_PKT_LOG|LOG_CMD_RECORD, 0, &info_pkt);
- if (info_pkt)
- iobuffer_dec_ref(info_pkt);
- return rc;
- }
- int sp_log_client_flush(sp_log_client_t *client)
- {
- int rc;
- rc = sp_iom_post(client->iom, client->local_svc_id, SP_SHELL_MOD_ID,
- SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_FLUSH, 0, NULL);
- return rc;
- }
- //
- // listener
- //
- #define INDEX_HASHTABLE_SIZE 1023
- typedef struct sp_log_listener_cb // 对应一个SpEntity
- {
- struct list_head __entry; // < sp_log_listener_mgr_t . cb_list
- struct list_head __working_entry;
- struct list_head __list; // > listener_t . entry
- struct list_head __working_list;
- void *key;
- int enable;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- }sp_log_listener_cb;
- static sp_log_listener_cb* sp_log_listener_cb_create(void *key)
- {
- sp_log_listener_cb *cb = MALLOC_T(sp_log_listener_cb);
- cb->ref_cnt = 1;
- cb->enable = 1;
- cb->key = key;
- INIT_LIST_HEAD(&cb->__working_list);
- INIT_LIST_HEAD(&cb->__list);
- return cb;
- }
- static __inline void __sp_log_listener_cb_destroy(sp_log_listener_cb *cb){free(cb);}
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_log_listener_cb, sp_log_listener_cb, ref_cnt, __sp_log_listener_cb_destroy)
- typedef struct listener_t // 对应每次Subscribe
- {
- struct hlist_node hentry; // < sp_log_listener_mgr_t . index[slot]
- struct list_head entry; // < sp_log_listener_cb . __list
- struct list_head working_entry;
- unsigned int id;
- int ignore_msg_body;
- sp_log_listener_cb *cb;
- }listener_t;
- struct sp_log_listener_mgr_t
- {
- struct list_head cb_list; // > sp_log_listener_cb . __entry
- struct hlist_head index[INDEX_HASHTABLE_SIZE]; // > listener_t . hentry
- sp_svc_t *svc;
- sp_log_on_log on_log;
- strand_t *strand;
- int enabled;
- DECLARE_REF_COUNT_MEMBER(ref_cnt); ///
- };
- static listener_t *listener_find(sp_log_listener_mgr_t *mgr, unsigned int listen_id)
- {
- unsigned slot = listen_id % INDEX_HASHTABLE_SIZE;
- listener_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->index[slot], listener_t, hentry) {
- if (listen_id == tpos->id) {
- return tpos;
- }
- }
- return NULL;
- }
- static __inline void listener_add_index(sp_log_listener_mgr_t *mgr, listener_t *listener)
- {
- int slot = listener->id % INDEX_HASHTABLE_SIZE;
- hlist_add_head(&listener->hentry, &mgr->index[slot]);
- }
- static __inline void listener_del_index(sp_log_listener_mgr_t *mgr, listener_t *listener)
- {
- int slot = listener->id % INDEX_HASHTABLE_SIZE;
- listener_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->index[slot], listener_t, hentry) {
- if (tpos->id == listener->id) {
- hlist_del(pos);
- return;
- }
- }
- }
- static sp_log_listener_cb *listener_find_cb(sp_log_listener_mgr_t *mgr, void *key)
- {
- sp_log_listener_cb *pos;
- TOOLKIT_ASSERT(key);
- list_for_each_entry(pos, &mgr->cb_list, sp_log_listener_cb, __entry) {
- if (pos->key == key)
- return pos;
- }
- return NULL;
- }
- static __inline void listener_invoke_cb(sp_log_listener_mgr_t *mgr, sp_log_listener_cb *cb,
- int nsub,
- u__int64_t *sub,
- int client_id,
- int log_epid,
- int client_instance_id,
- u__int64_t log_id,
- u__int64_t prev_rsn,
- u__int64_t curr_rsn,
- int original_rsn_type,
- int rsn_depth,
- unsigned int log_time,
- int log_type,
- int log_severity,
- int log_sys_error,
- int log_usr_error,
- int param_cnt,
- int *params,
- const char *msg,
- const char* bussId,
- const char* traceId,
- const char* spanId,
- const char* parentSpanId)
- {
- mgr->on_log(mgr,
- nsub,
- sub,
- client_id,
- log_epid,
- client_instance_id,
- log_id,
- prev_rsn,
- curr_rsn,
- original_rsn_type,
- rsn_depth,
- log_time,
- log_type,
- log_severity,
- log_sys_error,
- log_usr_error,
- param_cnt,
- params,
- msg,
- cb->key, bussId, traceId, spanId, parentSpanId);
- }
- static void listener_on_pkt_threadpool(threadpool_t *threadpool, void *arg)
- {
- iobuffer_t *pkt = (iobuffer_t*)arg;
- sp_log_listener_mgr_t *mgr = NULL;
- int epid, pkt_type, pkt_id, svc_id;
- sp_uid_t rsn;
- sp_rsn_context_t rsn_ctx;
- sp_svc_t *svc = NULL;
- int nsub = 0;
- int *sub;
- int instance_id;
- u__int64_t log_id;
- u__int64_t prev_rsn;
- u__int64_t curr_rsn;
- int original_rsn_type;
- int rsn_depth;
- unsigned int log_time;
- int log_type, log_severity, log_sys_error, log_usr_error, log_client_id, log_epid;
- char *msg = NULL;
- int param_cnt;
- int *params = NULL;
- int i;
- //TOOLKIT_ASSERT(nsub);
- #ifdef _WIN32
- iobuffer_format_read(pkt, "44444", &mgr, &epid, &svc_id, &pkt_type, &pkt_id);
- #else
- iobuffer_format_read(pkt, "84444", &mgr, &epid, &svc_id, &pkt_type, &pkt_id);
- #endif //_WIN32
- svc = mgr->svc;
- rsn = sp_svc_new_runserial(svc);
- sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_CALLBACK, &rsn_ctx);
- sp_svc_push_runserial_context(svc, &rsn_ctx);
- if (mgr->enabled) {
- svc = mgr->svc;
- nsub = pkt_id;
- sub = (int*)iobuffer_data(pkt, 0);
- iobuffer_pop_count(pkt, 4*nsub);
- iobuffer_format_read(pkt, "4888444444444", &instance_id, &log_id,
- &prev_rsn, &curr_rsn, &original_rsn_type, &rsn_depth,
- &log_time, &log_type, &log_client_id, &log_epid, &log_severity, &log_sys_error, &log_usr_error);
- iobuffer_read(pkt, IOBUF_T_I4, ¶m_cnt, NULL);
- if (param_cnt) {
- params = (int*)malloc(sizeof(int)*param_cnt);
- for (i = 0; i < param_cnt; ++i) {
- iobuffer_read(pkt, IOBUF_T_I4, ¶ms[i], NULL);
- }
- }
- iobuffer_format_read(pkt, "s", &msg);
- {
- struct list_head cb_list;
- sp_log_listener_cb *pos;
- u__int64_t *tmp_sub;
- char bussinessId[LINKINFO_BUSSID_LEN], traceId[LINKINFO_TRACEID_LEN], spanId[LINKINFO_SPANID_LEN], parentSpanId[LINKINFO_PARENTSPANID_LEN];
- INIT_LIST_HEAD(&cb_list);
- memset(bussinessId, 0, LINKINFO_BUSSID_LEN);
- memset(traceId, 0, LINKINFO_TRACEID_LEN);
- memset(spanId, 0, LINKINFO_SPANID_LEN);
- memset(parentSpanId, 0, LINKINFO_PARENTSPANID_LEN);
- for (i = 0; i < nsub; ++i) {
- unsigned int id = (unsigned int)sub[i];
- listener_t *listener = listener_find(mgr, id);
- if (listener) {
- sp_log_listener_cb *cb = listener->cb;
- if (list_empty(&cb->__working_list)) {
- list_add_tail(&cb->__working_entry, &cb_list);
- }
- list_add_tail(&listener->working_entry, &cb->__working_list);
- }
- }
- tmp_sub = (u__int64_t*)_alloca(nsub*8);
- list_for_each_entry(pos, &cb_list, sp_log_listener_cb, __working_entry) {
- listener_t *k_pos;
- int cnt = 0;
- list_for_each_entry(k_pos, &pos->__working_list, listener_t, working_entry) {
- tmp_sub[cnt++] = k_pos->id;
- }
- iobuffer_get_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId);
- listener_invoke_cb(mgr, pos, cnt, tmp_sub, log_client_id, log_epid,
- instance_id, log_id, prev_rsn, curr_rsn, original_rsn_type, rsn_depth,
- log_time, log_type, log_severity, log_sys_error, log_usr_error, param_cnt, params, msg,bussinessId, traceId, spanId, parentSpanId);
- INIT_LIST_HEAD(&pos->__working_list);
- }
- }
- FREE(msg);
- free(params);
- }
- if (pkt)
- iobuffer_dec_ref(pkt);
- sp_log_listener_mgr_dec_ref(mgr);//@
- sp_svc_pop_runserial_context(svc);
- }
- static int listener_on_pkt(sp_svc_t *svc, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
- {
- sp_log_listener_mgr_t *mgr = (sp_log_listener_mgr_t*)user_data;
- int log_cmd = SP_GET_TYPE(pkt_type);
- if (log_cmd == LOG_CMD_LISTEN_RECORD) {
- iobuffer_t *pkt = *p_pkt;
- *p_pkt = NULL;
- #ifdef _WIN32
- iobuffer_format_write_head(pkt, "44444", &pkt_id, &pkt_type, &svc_id, &epid, (void*)&mgr);
- #else
- iobuffer_format_write_head(pkt, "44448", &pkt_id, &pkt_type, &svc_id, &epid, (void*)&mgr);
- #endif //_WIN32
- sp_log_listener_mgr_inc_ref(mgr);//@
- if (threadpool_queue_workitem(sp_svc_get_threadpool(svc), mgr->strand, &listener_on_pkt_threadpool, pkt) != 0) {
- sp_log_listener_mgr_dec_ref(mgr);//@
- iobuffer_dec_ref(pkt);
- }
- return FALSE;
- }
- return TRUE; // continue
- }
- int sp_log_listener_mgr_create(sp_svc_t *svc, sp_log_on_log on_log, sp_log_listener_mgr_t **p_mgr)
- {
- sp_log_listener_mgr_t *mgr = MALLOC_T(sp_log_listener_mgr_t);
- if (mgr) {
- int i;
- INIT_LIST_HEAD(&mgr->cb_list);
- mgr->enabled = 0;
- mgr->strand = strand_create();
- mgr->on_log = on_log;
- mgr->ref_cnt = 1;
- mgr->svc = svc;
- for (i = 0; i < INDEX_HASHTABLE_SIZE; ++i) {
- INIT_HLIST_HEAD(&mgr->index[i]);
- }
- *p_mgr = mgr;
- return 0;
- }
- return Error_Resource;
- }
- int sp_log_listener_mgr_start(sp_log_listener_mgr_t *mgr)
- {
- mgr->enabled = 1;
- sp_svc_add_pkt_handler(mgr->svc, (long)mgr, SP_PKT_LOG, &listener_on_pkt, mgr);
- return 0;
- }
- int sp_log_listener_mgr_stop(sp_log_listener_mgr_t *mgr)
- {
- mgr->enabled = 0;
- sp_svc_remove_pkt_handler(mgr->svc, (long)mgr, SP_PKT_LOG);
- return 0;
- }
- int sp_log_listener_mgr_subscribe(sp_log_listener_mgr_t *mgr, unsigned int *listen_id, void *key, int ignore_msg_body, int log_type, int ent_id, int severity_filter, int sys_code, int user_code)
- {
- sp_env_t *env = sp_get_env();
- int new_id = sp_env_new_id(env);
- sp_log_listener_cb *cb = listener_find_cb(mgr, key);
- listener_t *listener = NULL;
- iobuffer_t *pkt = NULL;
- if (!cb) {
- cb = sp_log_listener_cb_create(key);
- if (!cb)
- return Error_Resource;
- list_add_tail(&cb->__entry, &mgr->cb_list);
- }
- listener = MALLOC_T(listener_t);
- listener->cb = cb;
- listener->id = new_id;
- listener->ignore_msg_body = ignore_msg_body;
- listener_add_index(mgr, listener);
- list_add_tail(&listener->entry, &cb->__list);
- pkt = iobuffer_create(-1, -1);
- iobuffer_format_write(pkt, "444444", &ignore_msg_body, &log_type, &ent_id, &severity_filter, &sys_code, &user_code);
- sp_svc_post(mgr->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_SUBSCRIBE, listener->id, &pkt);
- if (pkt)
- iobuffer_dec_ref(pkt);
- *listen_id = new_id;
- return 0;
- }
- int sp_log_listener_mgr_unsubscribe(sp_log_listener_mgr_t *mgr, unsigned int listen_id)
- {
- listener_t *listener = listener_find(mgr, listen_id);
- if (listener) {
- sp_log_listener_cb *cb = listener->cb;
- iobuffer_t *pkt = NULL;
- listener_del_index(mgr, listener);
- list_del(&listener->entry);
- if (list_empty(&cb->__list)) {
- // list_del(&mgr->cb_list); {bug}
- list_del(&cb->__entry);
- free(cb);
- }
- sp_svc_post(mgr->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_UNSUBSCRIBE, listener->id, NULL);
- free(listener);
- return 0;
- }
- return Error_NotExist;
- }
- static void __sp_log_listener_mgr_destroy(sp_log_listener_mgr_t *mgr)
- {
- strand_destroy(mgr->strand);
- free(mgr);
- }
- IMPLEMENT_REF_COUNT_MT(sp_log_listener_mgr, sp_log_listener_mgr_t, ref_cnt, __sp_log_listener_mgr_destroy)
- //
- // daemon
- //
- typedef struct log_filter_key_t log_filter_key_t;
- typedef struct log_listener_t log_listener_t;
- typedef struct log_listener_entity_t log_listener_entity_t;
- struct log_filter_key_t {
- int log_type;
- int ent_id;
- int severity_filter;
- int sys_code;
- int user_code; // -1 and -2 has special means, -2 means accept all, -1 means reject any
- unsigned int index_hash_code;
- };
- struct log_listener_t {
- struct list_head entry; // < log_listener_entity_t . listener_list
- int id;
- int ignore_msg_body;
- log_filter_key_t index_key;
- struct hlist_node index_hentry; // < sp_log_daemon_t . arr_filter_index
- struct hlist_node working_hentry;
- log_listener_entity_t *owner; // = log_listener_entity_t
- };
- struct log_listener_entity_t {
- struct list_head listener_list; // > log_listener_t . entry
- int epid;
- int svc_id;
- };
- struct sp_log_daemon_t
- {
- sp_log_on_log on_log;
- sp_log_on_flush on_flush;
- sp_log_on_timeout_interval on_timeout_interval;
- void *user_data;
- sp_svc_t *svc;
- CRITICAL_SECTION lock;
- iobuffer_queue_t *pkt_queue;
- HANDLE pkt_sem;
- HANDLE stop_evt;
- HANDLE worker_thread;
- sp_uid_t last_log_id;
- struct hlist_head arr_filter_index[FILTER_HASHTABLE_SIZE]; // log_listener_t . index_hentry
- int masks[16];
- int masks_cnt;
- array_header_t *arr_entity; // > log_listener_entity_t
- };
- static __inline void daemon_lock(sp_log_daemon_t *daemon)
- {
- EnterCriticalSection(&daemon->lock);
- }
- static __inline void daemon_unlock(sp_log_daemon_t *daemon)
- {
- LeaveCriticalSection(&daemon->lock);
- }
- static __inline unsigned int hash_filter(int log_type, int ent_id, int severity_filter, int sys_code, int user_code)
- {
- unsigned int t = (log_type << 16) | (ent_id << 8) | severity_filter;
- return jhash_3words(t, sys_code, user_code, 0);
- }
- static log_listener_entity_t* daemon_find_entity(sp_log_daemon_t *daemon, int ent_id)
- {
- int i;
- for (i = 0; i < daemon->arr_entity->nelts; ++i) {
- log_listener_entity_t *listen_ent = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*);
- if (listen_ent->svc_id == ent_id)
- return listen_ent;
- }
- return NULL;
- }
- static log_listener_t *daemon_find_listener(sp_log_daemon_t *daemon, int listen_id, int *idx)
- {
- int i;
- for (i = 0; i < daemon->arr_entity->nelts; ++i) {
- log_listener_entity_t *e = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*);
- log_listener_t *pos;
- list_for_each_entry(pos, &e->listener_list, log_listener_t, entry) {
- if (pos->id == listen_id) {
- if (idx)
- *idx = i;
- return pos;
- }
- }
- }
- return NULL;
- }
- static __inline int calc_filter_mask(int log_type, int ent_id, int severity_filter, int sys_code)
- {
- int filter_mask = 0;
- if (log_type == Log_Ignore)
- filter_mask |= 1 << LOG_FILTER_BIT_LOGTYPE;
- if (ent_id == -1)
- filter_mask |= 1 << LOG_FILTER_BIT_ENTITY;
- if (severity_filter == Severity_None)
- filter_mask |= 1 << LOG_FILTER_BIT_SEVERITY;
- if (sys_code == Error_IgnoreAll)
- filter_mask |= 1 << LOG_FILTER_BIT_SYSCODE;
- return filter_mask;
- }
- #define OP_ADD 1
- #define OP_DEL 0
- static void update_filter_mask_table(sp_log_daemon_t *daemon, int filter_mask, int op_add)
- {
- int i;
- if (op_add) {
- for (i = 0; i < daemon->masks_cnt; ++i)
- if ((daemon->masks[i]&0xffff) == filter_mask) {
- int cnt = (daemon->masks[i]&0xffff0000) >> 16;
- cnt++;
- daemon->masks[i] = filter_mask | (cnt << 16);
- break;
- }
- if (i == daemon->masks_cnt)
- daemon->masks[daemon->masks_cnt++] = filter_mask | (1<<16);
- } else {
- for (i = 0; i < daemon->masks_cnt; ++i) {
- if ((daemon->masks[i]&0xffff) == filter_mask) {
- int cnt = (daemon->masks[i] & 0xffff0000) >> 16;
- int filter_mask = daemon->masks[i] & 0xffff;
- cnt--;
- if (cnt == 0) {
- if (i != daemon->masks_cnt-1) {
- daemon->masks[i] = daemon->masks[daemon->masks_cnt-1];
- }
- daemon->masks_cnt--;
- } else {
- daemon->masks[i] = ((cnt-1) << 16) | filter_mask;
- }
- break;
- }
- }
- }
- }
- static __inline void daemon_add_index(sp_log_daemon_t *daemon, log_listener_t *e)
- {
- int filter_mask = 0;
- int slot = e->index_key.index_hash_code % FILTER_HASHTABLE_SIZE;
- hlist_add_head(&e->index_hentry, &daemon->arr_filter_index[slot]);
- filter_mask = calc_filter_mask(e->index_key.log_type, e->index_key.ent_id, e->index_key.severity_filter, e->index_key.sys_code);
- update_filter_mask_table(daemon, filter_mask, OP_ADD);
- }
- static __inline void daemon_del_index(sp_log_daemon_t *daemon, log_listener_t *e)
- {
- int filter_mask = 0;
- hlist_del(&e->index_hentry);
- filter_mask = calc_filter_mask(e->index_key.log_type, e->index_key.ent_id, e->index_key.severity_filter, e->index_key.sys_code);
- update_filter_mask_table(daemon, filter_mask, OP_DEL);
- }
- static int daemon_listener_need_log(sp_log_daemon_t *daemon,
- log_listener_t *e, int log_type,
- int log_severity,
- int log_sys_error,
- int log_usr_error,
- int log_client_id)
- {
- log_filter_key_t *key = &e->index_key;
- if (key->ent_id == -1 || key->ent_id == log_client_id) {
- if (key->log_type == Log_Ignore || key->log_type == log_type) {
- if (key->severity_filter == Severity_None || key->severity_filter == log_severity) {
- if (key->sys_code == Error_IgnoreAll || key->sys_code == log_sys_error) {
- if (key->user_code == -2 || (key->user_code == -1 && log_usr_error) || key->user_code == log_usr_error)
- return TRUE;
- }
- }
- }
- }
- return FALSE;
- }
- static void daemon_on_sys(sp_svc_t *svc,int epid, int state, void *user_data)
- {
- sp_log_daemon_t *daemon = (sp_log_daemon_t*)user_data;
- if (state == BUS_STATE_OFF) {
- int i = 0;
- daemon_lock(daemon);
- for (i = 0; i < daemon->arr_entity->nelts; ++i) {
- log_listener_entity_t *listen_ent = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*);
- if (listen_ent->epid == epid) {
- log_listener_t *pos, *n;
- list_for_each_entry_safe(pos, n, &listen_ent->listener_list, log_listener_t, entry) {
- list_del(&pos->entry);
- daemon_del_index(daemon, pos);
- free(pos);
- }
- ARRAY_DEL(daemon->arr_entity, i, log_listener_entity_t*);
- free(listen_ent);
- }
- }
- daemon_unlock(daemon);
- }
- }
- static int daemon_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
- {
- sp_log_daemon_t *daemon = (sp_log_daemon_t*)user_data;
- int log_cmd = SP_GET_TYPE(pkt_type);
- int log_client_id = pkt_id;
- if (log_cmd == LOG_CMD_RECORD || log_cmd == LOG_CMD_FLUSH) {
- if (log_cmd == LOG_CMD_RECORD)
- {
- char bussinessId[LINKINFO_BUSSID_LEN];
- char traceId[LINKINFO_TRACEID_LEN];
- char spanId[LINKINFO_SPANID_LEN];
- char parentSpanId[LINKINFO_PARENTSPANID_LEN];
- iobuffer_get_linkInfo(*p_pkt, bussinessId, traceId, spanId, parentSpanId);
- iobuffer_write_head(*p_pkt, IOBUF_T_BUF, parentSpanId, sizeof(parentSpanId));
- iobuffer_write_head(*p_pkt, IOBUF_T_BUF, spanId, sizeof(spanId));
- iobuffer_write_head(*p_pkt, IOBUF_T_BUF, traceId, sizeof(traceId));
- iobuffer_write_head(*p_pkt, IOBUF_T_BUF, bussinessId, sizeof(bussinessId));
- }
- iobuffer_write_head(*p_pkt, IOBUF_T_I4, &pkt_id, 0);
- iobuffer_write_head(*p_pkt, IOBUF_T_I4, &pkt_type, 0);
- iobuffer_write_head(*p_pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_write_head(*p_pkt, IOBUF_T_I4, &epid, 0);
- daemon_lock(daemon);
- iobuffer_queue_enqueue(daemon->pkt_queue, *p_pkt);
- daemon_unlock(daemon);
- ReleaseSemaphore(daemon->pkt_sem, 1, NULL);
- *p_pkt = NULL;
- } else if (log_cmd == LOG_CMD_SUBSCRIBE) {
- int id = pkt_id;
- int ignore_msg_body;
- int log_type;
- int ent_id;
- int severity_filter;
- int sys_code;
- int user_code;
- log_listener_entity_t *listen_ent;
- iobuffer_format_read(*p_pkt, "444444", &ignore_msg_body, &log_type, &ent_id, &severity_filter, &sys_code, &user_code);
- daemon_lock(daemon);
- listen_ent = daemon_find_entity(daemon, svc_id);
- if (!listen_ent) {
- listen_ent = MALLOC_T(log_listener_entity_t);
- listen_ent->epid = epid;
- listen_ent->svc_id = svc_id;
- INIT_LIST_HEAD(&listen_ent->listener_list);
- ARRAY_PUSH(daemon->arr_entity, log_listener_entity_t*) = listen_ent;
- }
- if (daemon_find_listener(daemon, id, NULL) == NULL) {
- log_listener_t *listener = MALLOC_T(log_listener_t);
- log_filter_key_t *key = &listener->index_key;
- listener->owner = listen_ent;
- listener->ignore_msg_body = ignore_msg_body;
- listener->id = id;
- INIT_HLIST_NODE(&listener->working_hentry);
- key->ent_id = ent_id;
- key->log_type = log_type;
- key->severity_filter = severity_filter;
- key->sys_code = sys_code;
- key->user_code = user_code;
- key->index_hash_code = hash_filter(log_type, ent_id, severity_filter, sys_code, user_code);
- daemon_add_index(daemon, listener);
- list_add_tail(&listener->entry, &listen_ent->listener_list);
- }
- daemon_unlock(daemon);
- } else if (log_cmd == LOG_CMD_UNSUBSCRIBE) {
- int i;
- int id = pkt_id;
- log_listener_t* tmp;
- daemon_lock(daemon);
- tmp = daemon_find_listener(daemon, id, &i);
- if (tmp) {
- list_del(&tmp->entry);
- if (list_empty(&tmp->owner->listener_list)) {
- ARRAY_DEL(daemon->arr_entity, i, log_listener_entity_t*);
- free(tmp->owner);
- }
- daemon_del_index(daemon, tmp);
- free(tmp);
- }
- daemon_unlock(daemon);
- } else {
- TOOLKIT_ASSERT(0);
- }
- return TRUE;
- }
- static int get_log_iobuffer_header_length(iobuffer_t *pkt)
- {
- //"4888444444444"
- int nparam;
- int len = 64;
- int rs = iobuffer_get_read_state(pkt);
- iobuffer_pop_count(pkt, len);
- iobuffer_read(pkt, IOBUF_T_I4, &nparam, 0);
- len += 4;
- len += 4 * nparam;
- iobuffer_restore_read_state(pkt, rs);
- return len;
- }
- static void daemon_bcast(sp_log_daemon_t *daemon, int from_client_id, iobuffer_t *pkt)
- {
- int i, j;
- int instance_id;
- u__int64_t prev_rsn;
- u__int64_t curr_rsn;
- int original_rsn_type;
- int rsn_depth;
- u__int64_t log_id;
- unsigned int log_time;
- int log_type, log_severity, log_sys_error, log_usr_error, log_client_id, log_epid;
- int rs = iobuffer_get_read_state(pkt);
-
- struct {
- struct hlist_head ent;
- int ignore_msg_body;
- }results[SP_MAX_ENTITY];
-
- for (i = 0; i < SP_MAX_ENTITY; ++i) {
- INIT_HLIST_HEAD(&results[i].ent);
- results[i].ignore_msg_body = 1;
- }
- iobuffer_format_read(pkt, "4888444444444", &instance_id, &log_id,
- &prev_rsn, &curr_rsn, &original_rsn_type, &rsn_depth,
- &log_time, &log_type, &log_client_id, &log_epid,
- &log_severity, &log_sys_error, &log_usr_error);
- iobuffer_restore_read_state(pkt, rs);
- daemon_lock(daemon);
- for (j = 0; j < 3; ++j)
- {
- int t_user_code;
- if (j == 0)
- {
- t_user_code = log_usr_error; // strict match
- }
- else if (j == 1)
- {
- t_user_code = -2; // accept any
- }
- else
- {
- if (log_usr_error) {
- t_user_code = -1; // reject ones that has no user code
- } else {
- continue;
- }
- }
- for (i = 0; i < daemon->masks_cnt; ++i) {
- int t = daemon->masks[i] & 0xffff;
- int t_log_type = (t & BIT_MASK(LOG_FILTER_BIT_LOGTYPE)) ? Log_Ignore : log_type;
- int t_ent_id = (t & BIT_MASK(LOG_FILTER_BIT_ENTITY)) ? -1 : log_client_id;
- int t_severity_filter = (t & BIT_MASK(LOG_FILTER_BIT_SEVERITY)) ? Severity_None : log_severity;
- int t_sys_code = (t & BIT_MASK(LOG_FILTER_BIT_SYSCODE)) ? Error_IgnoreAll : log_sys_error;
- unsigned int index_hash_code = hash_filter(t_log_type, t_ent_id, t_severity_filter, t_sys_code, t_user_code);
- log_listener_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &daemon->arr_filter_index[index_hash_code%FILTER_HASHTABLE_SIZE], log_listener_t, index_hentry) {
- if (index_hash_code == tpos->index_key.index_hash_code) {
- if (daemon_listener_need_log(daemon, tpos, log_type, log_severity, log_sys_error, log_usr_error, from_client_id))
- {
- if (tpos->working_hentry.pprev == NULL)
- {
- hlist_add_head(&tpos->working_hentry, &results[tpos->owner->svc_id].ent);
- results[tpos->owner->svc_id].ignore_msg_body &= tpos->ignore_msg_body;
- }
- }
- }
- }
- }
- }
- for (i = 0; i < SP_MAX_ENTITY; ++i) {
- if (!hlist_empty(&results[i].ent)) {
- int ignore_msg_body = results[i].ignore_msg_body;
- iobuffer_t *copy_pkt = iobuffer_create(-1, -1);
- struct hlist_node *pos, *n;
- log_listener_t *tpos;
- int cnt = 0;
- int epid = -1;
- int svc_id = -1;
- hlist_for_each_entry(tpos, pos, &results[i].ent, log_listener_t, working_hentry) {
- cnt++;
- if (epid == -1) {
- epid = tpos->owner->epid;
- svc_id = tpos->owner->svc_id;
- }
- }
-
- //iobuffer_write(copy_pkt, IOBUF_T_I4, &cnt, 0);
- hlist_for_each_entry_safe(tpos, pos, n, &results[i].ent, log_listener_t, working_hentry) {
- iobuffer_write(copy_pkt, IOBUF_T_I4, &tpos->id, 0);
- hlist_del(pos);
- }
- // 复制原包内容
- if (results[i].ignore_msg_body) {
- int msg_body_len = 0;
- int dat_len = get_log_iobuffer_header_length(pkt);
- void *dat = iobuffer_data(pkt, 0);
- iobuffer_write(copy_pkt, IOBUF_T_BUF, dat, dat_len);
- iobuffer_write(copy_pkt, IOBUF_T_7BIT, &msg_body_len, 0);
- } else {
- void *dat = iobuffer_data(pkt, 0);
- int dat_len = iobuffer_get_length(pkt);
- iobuffer_write(copy_pkt, IOBUF_T_BUF, dat, dat_len);
- }
- iobuffer_copy_linkInfo(copy_pkt, pkt);
-
- sp_svc_post(daemon->svc, epid, svc_id, SP_PKT_LOG|LOG_CMD_LISTEN_RECORD, cnt, ©_pkt);
- if (copy_pkt)
- iobuffer_dec_ref(copy_pkt);
- }
- }
- daemon_unlock(daemon);
- }
- static unsigned int __stdcall daemon_work_proc(void *param)
- {
- sp_log_daemon_t *daemon = (sp_log_daemon_t*)param;
- HANDLE hs[] = {daemon->pkt_sem, daemon->stop_evt};
- sp_env_t *env = sp_get_env();
- for (;;) {
- DWORD dwRet = WaitForMultipleObjects(array_size(hs), &hs[0], FALSE, DAEMON_LOG_TIMEOUT_INTERVAL);
- if (dwRet == WAIT_OBJECT_0)
- {
- int pkt_id, svc_id, pkt_type, epid;
- iobuffer_t *pkt;
- char bussinessId[LINKINFO_BUSSID_LEN], traceId[LINKINFO_TRACEID_LEN], spanId[LINKINFO_SPANID_LEN], parentSpanId[LINKINFO_PARENTSPANID_LEN];
- daemon_lock(daemon);
- pkt = iobuffer_queue_deque(daemon->pkt_queue);
- daemon_unlock(daemon);
- iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, 0);
- if (SP_GET_TYPE(pkt_type) == LOG_CMD_RECORD)
- {
- int readLen = 0;
- memset(bussinessId, 0, LINKINFO_BUSSID_LEN);
- memset(traceId, 0, LINKINFO_TRACEID_LEN);
- memset(spanId, 0, LINKINFO_SPANID_LEN);
- memset(parentSpanId, 0, LINKINFO_PARENTSPANID_LEN);
- readLen = LINKINFO_BUSSID_LEN;
- iobuffer_read(pkt, IOBUF_T_BUF, bussinessId, &readLen);
- readLen = LINKINFO_TRACEID_LEN;
- iobuffer_read(pkt, IOBUF_T_BUF, traceId, &readLen);
- readLen = LINKINFO_SPANID_LEN;
- iobuffer_read(pkt, IOBUF_T_BUF, spanId, &readLen);
- readLen = LINKINFO_PARENTSPANID_LEN;
- iobuffer_read(pkt, IOBUF_T_BUF, parentSpanId, &readLen);
- iobuffer_set_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId);
- int instance_id;
- u__int64_t log_id;
- u__int64_t prev_rsn;
- u__int64_t curr_rsn;
- int original_rsn_type;
- int rsn_depth;
- unsigned int log_time;
- int log_type, log_severity, log_sys_error, log_usr_error, log_client_id, log_epid;
- char *msg = NULL;
- int param_cnt;
- int *params = NULL;
- int i;
- iobuffer_read(pkt, IOBUF_T_I4, &instance_id, NULL);
- iobuffer_read(pkt, IOBUF_T_I8, &log_id, NULL);
- daemon->last_log_id = log_id = sp_uid_update(daemon->last_log_id);
- log_id = sp_uid_change_app_id(log_id, 0);
- iobuffer_write_head(pkt, IOBUF_T_I8, &log_id, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &instance_id, 0);
- if (svc_id != SP_INVALID_SVC_ID)
- daemon_bcast(daemon, svc_id, pkt);
- iobuffer_read(pkt, IOBUF_T_I4, &instance_id, NULL);
- iobuffer_read(pkt, IOBUF_T_I8, &log_id, NULL);
- iobuffer_read(pkt, IOBUF_T_I8, &prev_rsn, NULL);
- iobuffer_read(pkt, IOBUF_T_I8, &curr_rsn, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &original_rsn_type, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &rsn_depth, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &log_time, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &log_type, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &log_client_id, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &log_epid, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &log_severity, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &log_sys_error, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &log_usr_error, NULL);
- /** TODO: 移出与实体关联的特定事件!!*/
- // GPIO移动事件不写日志
- if (log_usr_error != 0x2090000A && log_usr_error != 0x20900009)
- {
- iobuffer_read(pkt, IOBUF_T_I4, ¶m_cnt, NULL);
- if (param_cnt) {
- params = (int*)malloc(sizeof(int)*param_cnt);
- for (i = 0; i < param_cnt; ++i) {
- iobuffer_read(pkt, IOBUF_T_I4, ¶ms[i], NULL);
- }
- }
- iobuffer_format_read(pkt, "s", &msg);
- daemon->on_log(daemon, 0, 0, log_client_id, log_epid, instance_id,
- log_id, prev_rsn, curr_rsn, original_rsn_type, rsn_depth,
- log_time, log_type, log_severity, log_sys_error,
- log_usr_error, param_cnt, params, msg, daemon->user_data, bussinessId, traceId, spanId, parentSpanId);
-
- FREE(msg);
- free(params);
- }
- }
- else if (SP_GET_TYPE(pkt_type) == LOG_CMD_FLUSH)
- {
- SYSTEMTIME st;
- GetLocalTime(&st);
- daemon->on_flush(daemon, pkt_id, &st, daemon->user_data);
- }
- else
- {
- TOOLKIT_ASSERT(0);
- }
- iobuffer_dec_ref(pkt);
- }
- else if (dwRet == WAIT_OBJECT_0+1) {
- break; // stop
- }
- else if (dwRet == WAIT_TIMEOUT) {
- SYSTEMTIME st;
- GetLocalTime(&st);
- daemon->on_timeout_interval(daemon, &st, daemon->user_data);
- }
- }
- return 0;
- }
- int sp_log_daemon_create(sp_log_on_log on_log,
- sp_log_on_flush on_flush,
- sp_log_on_timeout_interval on_timeout_interval,
- void *user_data,
- sp_svc_t *svc,
- sp_log_daemon_t **p_daemon)
- {
- sp_log_daemon_t *daemon = ZALLOC_T(sp_log_daemon_t);
- sp_env_t *env = sp_get_env();
- int i;
- daemon->on_log = on_log;
- daemon->on_flush = on_flush;
- daemon->on_timeout_interval = on_timeout_interval;
- daemon->user_data = user_data;
- daemon->svc = svc;
- daemon->pkt_queue = iobuffer_queue_create();
- daemon->arr_entity = array_make(0, sizeof(log_listener_entity_t*));
- InitializeCriticalSection(&daemon->lock);
- daemon->stop_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
- daemon->pkt_sem = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL);
- daemon->last_log_id = sp_uid_make(0);
- daemon->masks_cnt = 0;
- for (i = 0; i < FILTER_HASHTABLE_SIZE; ++i) {
- INIT_HLIST_HEAD(&daemon->arr_filter_index[i]);
- }
- daemon->worker_thread = (HANDLE)_beginthreadex(NULL, 0, &daemon_work_proc, daemon, 0, NULL);
- if (!daemon->worker_thread) {
- sp_log_daemon_destroy(daemon);
- return Error_Resource;
- }
- sp_svc_add_pkt_handler(svc, (long)daemon, SP_PKT_LOG, &daemon_on_pkt, daemon);
- sp_svc_add_sys_handler(svc, (long)daemon, &daemon_on_sys, daemon);
- *p_daemon = daemon;
- return 0;
- }
- int sp_log_daemon_destroy(sp_log_daemon_t *daemon)
- {
- int i;
- if (daemon->worker_thread) {
- ///**TODO(Gifur@5/6/2022): *’ to ‘int’ loses precision [-fpermissive] */
- sp_svc_remove_pkt_handler(daemon->svc, (long)daemon, SP_PKT_LOG);
- sp_svc_remove_sys_handler(daemon->svc, (long)daemon);
- SetEvent(daemon->stop_evt);
- WaitForSingleObject(daemon->worker_thread, INFINITE);
- CloseHandle(daemon->worker_thread);
- }
- CloseHandle(daemon->stop_evt);
- for (i = 0; i < daemon->arr_entity->nelts; ++i) {
- log_listener_entity_t *tmp = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*);
- log_listener_t *pos, *n;
- list_for_each_entry_safe(pos, n, &tmp->listener_list, log_listener_t, entry) {
- list_del(&pos->entry);
- free(pos);
- }
- free(tmp);
- }
- array_free(daemon->arr_entity);
- if (daemon->pkt_sem) {
- CloseHandle(daemon->pkt_sem);
- }
- iobuffer_queue_destroy(daemon->pkt_queue);
- DeleteCriticalSection(&daemon->lock);
- free(daemon);
- return 0;
- }
|