#include "precompile.h" #include #include "sp_log.h" #include "sp_def.h" #include "sp_svc.h" #include "sp_env.h" #include "sp_uid.h" #include "sp_rsn.h" #include "SpBase.h" #include "memutil.h" #include "list.h" #include "array.h" #include "hashset.h" #include "jhash.h" #include "refcnt.h" #include "y2k_time.h" #include "dbgutil.h" #include #include #include #define LOG_CMD_RECORD 0x01 #define LOG_CMD_FLUSH 0x02 #define LOG_CMD_SUBSCRIBE 0x03 #define LOG_CMD_UNSUBSCRIBE 0x04 #define LOG_CMD_LISTEN_RECORD 0x05 #define DAEMON_LOG_TIMEOUT_INTERVAL 5 #define LOG_FILTER_BIT_LOGTYPE 0 #define LOG_FILTER_BIT_ENTITY 1 #define LOG_FILTER_BIT_SEVERITY 2 #define LOG_FILTER_BIT_SYSCODE 3 #define BIT_MASK(bit) (1 << (bit)) #define FILTER_HASHTABLE_SIZE 1023 struct sp_log_client_t { int local_svc_id; sp_svc_t *svc; sp_iom_t *iom; }; // svc == null, iom cannot be null, anonymous // svc != null, iom can be null int sp_log_client_create(sp_svc_t *svc, sp_iom_t *iom, sp_log_client_t **p_client) { sp_log_client_t *client = MALLOC_T(sp_log_client_t); if (client) { if (svc) { if (!iom) { iom = sp_svc_get_iom(svc); } } client->svc = svc; client->iom = iom; if (svc) { client->local_svc_id = sp_svc_get_id(svc); } else { client->local_svc_id = SP_INVALID_SVC_ID; } *p_client = client; return 0; } else { return -1; }; } int sp_log_client_destroy(sp_log_client_t *client) { free(client); return 0; } // TODO: param size type [4/2/2020 12:50 Gifur] /* int sp_log_client_log(sp_log_client_t *client, int type, int severity, int sys_error, int usr_error, int param_cnt, int *params, const char *format, ...) { va_list arg; va_start(arg, format); return sp_log_client_logv(client, type, severity, sys_error, usr_error, param_cnt, params, format, arg); } */ //对比sp_log_client_logv, 去除了可变参数,当前已经完全替代了sp_log_client_log和sp_log_client_logv int sp_log_client_logEx(sp_log_client_t* client, int type, int severity, int sys_error, int usr_error, int param_cnt, int* params, const char* msg, int len) { int i, n; int rc; iobuffer_t* info_pkt; sp_env_t* env = sp_get_env(); sp_iom_t* iom = client->iom; int local_epid = sp_iom_get_epid(iom); u__int64_t log_id = 0; y2k_time_t log_time; sp_rsn_context_t* rsn_ctx; sp_entity_t* ent; n = len; if (n < 0 || n > SP_LOG_MAX_LEN) { DbgWithLink(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log len is not right, %d, only accept max len: %d", n, SP_LOG_MAX_LEN); return Error_Param; } ent = sp_mod_mgr_find_entity_by_idx(env->mod_mgr, client->local_svc_id); if (!ent) { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log sp_mod_mgr_find_entity_by_idx failed"); return Error_Param; } log_time = y2k_time_now(); rsn_ctx = client->svc ? sp_svc_get_runserial_context(client->svc) : NULL; info_pkt = iobuffer_create(-1, n + 112); iobuffer_write(info_pkt, IOBUF_T_I4, &ent->instance_id, 0); iobuffer_write(info_pkt, IOBUF_T_I8, &log_id, 0); if (rsn_ctx) { iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0); iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0); } else { u__int64_t invalid_rsn = 0; int t = 0; iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0); iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0); } iobuffer_write(info_pkt, IOBUF_T_I4, &log_time, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &type, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &client->local_svc_id, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &local_epid, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &severity, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &sys_error, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &usr_error, 0); iobuffer_write(info_pkt, IOBUF_T_I4, ¶m_cnt, 0); for (i = 0; i < param_cnt; ++i) { iobuffer_write(info_pkt, IOBUF_T_I4, ¶ms[i], 0); } iobuffer_write(info_pkt, IOBUF_T_7BIT, &n, 0); if (n > 0) { memcpy(iobuffer_data(info_pkt, -1), msg, n); iobuffer_push_count(info_pkt, n); } rc = sp_iom_post(client->iom, client->local_svc_id, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_LOG | LOG_CMD_RECORD, 0, &info_pkt); if (info_pkt) iobuffer_dec_ref(info_pkt); return rc; } // TODO: param size type [4/2/2020 12:50 Gifur] int sp_log_client_logWithLink(sp_log_client_t* client, int type, int severity, int sys_error, int usr_error, int param_cnt, int* params, const char* msg, int len , const char* bussId, const char* traceId, const char* spanId, const char* parentSpanId) { int i, n; int rc; iobuffer_t *info_pkt; sp_env_t *env = sp_get_env(); sp_iom_t *iom = client->iom; int local_epid = sp_iom_get_epid(iom); u__int64_t log_id = 0; y2k_time_t log_time; sp_rsn_context_t *rsn_ctx; sp_entity_t *ent; n = len; if (n < 0 || n > SP_LOG_MAX_LEN) { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log len is not right, %d, only accept max len: %d", n, SP_LOG_MAX_LEN); return Error_Param; } ent = sp_mod_mgr_find_entity_by_idx(env->mod_mgr, client->local_svc_id); if (!ent) { DbgWithLink(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("client log sp_mod_mgr_find_entity_by_idx failed"); return Error_Param; } log_time = y2k_time_now(); rsn_ctx = client->svc ? sp_svc_get_runserial_context(client->svc) : NULL; info_pkt = iobuffer_create(-1, n + 112); if (NULL == bussId || NULL == traceId || NULL == spanId || NULL == parentSpanId || 0 == strlen(bussId) || 0 == strlen(traceId)) { linkContext tmp; tmp.AutoGenerate(); iobuffer_set_linkInfo(info_pkt, tmp.bussinessId.GetData(), tmp.traceId.GetData(), tmp.spanId.GetData(), tmp.parentSpanId.GetData()); } else iobuffer_set_linkInfo(info_pkt, bussId, traceId, spanId, parentSpanId); iobuffer_write(info_pkt, IOBUF_T_I4, &ent->instance_id, 0); iobuffer_write(info_pkt, IOBUF_T_I8, &log_id, 0); if (rsn_ctx) { iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->previous_rsn, 0); iobuffer_write(info_pkt, IOBUF_T_I8, &rsn_ctx->current_rsn, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->original_type, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &rsn_ctx->depth, 0); } else { u__int64_t invalid_rsn = 0; int t = 0; iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0); iobuffer_write(info_pkt, IOBUF_T_I8, &invalid_rsn, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &t, 0); } iobuffer_write(info_pkt, IOBUF_T_I4, &log_time, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &type, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &client->local_svc_id, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &local_epid, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &severity, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &sys_error, 0); iobuffer_write(info_pkt, IOBUF_T_I4, &usr_error, 0); iobuffer_write(info_pkt, IOBUF_T_I4, ¶m_cnt, 0); for (i = 0; i < param_cnt; ++i) { iobuffer_write(info_pkt, IOBUF_T_I4, ¶ms[i], 0); } iobuffer_write(info_pkt, IOBUF_T_7BIT, &n, 0); if (n > 0) { memcpy(iobuffer_data(info_pkt, -1), msg, n); iobuffer_push_count(info_pkt, n); } rc = sp_iom_post(client->iom, client->local_svc_id, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_RECORD, 0, &info_pkt); if (info_pkt) iobuffer_dec_ref(info_pkt); return rc; } int sp_log_client_flush(sp_log_client_t *client) { int rc; rc = sp_iom_post(client->iom, client->local_svc_id, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_FLUSH, 0, NULL); return rc; } // // listener // #define INDEX_HASHTABLE_SIZE 1023 typedef struct sp_log_listener_cb // 对应一个SpEntity { struct list_head __entry; // < sp_log_listener_mgr_t . cb_list struct list_head __working_entry; struct list_head __list; // > listener_t . entry struct list_head __working_list; void *key; int enable; DECLARE_REF_COUNT_MEMBER(ref_cnt); }sp_log_listener_cb; static sp_log_listener_cb* sp_log_listener_cb_create(void *key) { sp_log_listener_cb *cb = MALLOC_T(sp_log_listener_cb); cb->ref_cnt = 1; cb->enable = 1; cb->key = key; INIT_LIST_HEAD(&cb->__working_list); INIT_LIST_HEAD(&cb->__list); return cb; } static __inline void __sp_log_listener_cb_destroy(sp_log_listener_cb *cb){free(cb);} IMPLEMENT_REF_COUNT_MT_STATIC(sp_log_listener_cb, sp_log_listener_cb, ref_cnt, __sp_log_listener_cb_destroy) typedef struct listener_t // 对应每次Subscribe { struct hlist_node hentry; // < sp_log_listener_mgr_t . index[slot] struct list_head entry; // < sp_log_listener_cb . __list struct list_head working_entry; unsigned int id; int ignore_msg_body; sp_log_listener_cb *cb; }listener_t; struct sp_log_listener_mgr_t { struct list_head cb_list; // > sp_log_listener_cb . __entry struct hlist_head index[INDEX_HASHTABLE_SIZE]; // > listener_t . hentry sp_svc_t *svc; sp_log_on_log on_log; strand_t *strand; int enabled; DECLARE_REF_COUNT_MEMBER(ref_cnt); /// }; static listener_t *listener_find(sp_log_listener_mgr_t *mgr, unsigned int listen_id) { unsigned slot = listen_id % INDEX_HASHTABLE_SIZE; listener_t *tpos; struct hlist_node *pos; hlist_for_each_entry(tpos, pos, &mgr->index[slot], listener_t, hentry) { if (listen_id == tpos->id) { return tpos; } } return NULL; } static __inline void listener_add_index(sp_log_listener_mgr_t *mgr, listener_t *listener) { int slot = listener->id % INDEX_HASHTABLE_SIZE; hlist_add_head(&listener->hentry, &mgr->index[slot]); } static __inline void listener_del_index(sp_log_listener_mgr_t *mgr, listener_t *listener) { int slot = listener->id % INDEX_HASHTABLE_SIZE; listener_t *tpos; struct hlist_node *pos; hlist_for_each_entry(tpos, pos, &mgr->index[slot], listener_t, hentry) { if (tpos->id == listener->id) { hlist_del(pos); return; } } } static sp_log_listener_cb *listener_find_cb(sp_log_listener_mgr_t *mgr, void *key) { sp_log_listener_cb *pos; TOOLKIT_ASSERT(key); list_for_each_entry(pos, &mgr->cb_list, sp_log_listener_cb, __entry) { if (pos->key == key) return pos; } return NULL; } static __inline void listener_invoke_cb(sp_log_listener_mgr_t *mgr, sp_log_listener_cb *cb, int nsub, u__int64_t *sub, int client_id, int log_epid, int client_instance_id, u__int64_t log_id, u__int64_t prev_rsn, u__int64_t curr_rsn, int original_rsn_type, int rsn_depth, unsigned int log_time, int log_type, int log_severity, int log_sys_error, int log_usr_error, int param_cnt, int *params, const char *msg, const char* bussId, const char* traceId, const char* spanId, const char* parentSpanId) { mgr->on_log(mgr, nsub, sub, client_id, log_epid, client_instance_id, log_id, prev_rsn, curr_rsn, original_rsn_type, rsn_depth, log_time, log_type, log_severity, log_sys_error, log_usr_error, param_cnt, params, msg, cb->key, bussId, traceId, spanId, parentSpanId); } static void listener_on_pkt_threadpool(threadpool_t *threadpool, void *arg) { iobuffer_t *pkt = (iobuffer_t*)arg; sp_log_listener_mgr_t *mgr = NULL; int epid, pkt_type, pkt_id, svc_id; sp_uid_t rsn; sp_rsn_context_t rsn_ctx; sp_svc_t *svc = NULL; int nsub = 0; int *sub; int instance_id; u__int64_t log_id; u__int64_t prev_rsn; u__int64_t curr_rsn; int original_rsn_type; int rsn_depth; unsigned int log_time; int log_type, log_severity, log_sys_error, log_usr_error, log_client_id, log_epid; char *msg = NULL; int param_cnt; int *params = NULL; int i; //TOOLKIT_ASSERT(nsub); #ifdef _WIN32 iobuffer_format_read(pkt, "44444", &mgr, &epid, &svc_id, &pkt_type, &pkt_id); #else iobuffer_format_read(pkt, "84444", &mgr, &epid, &svc_id, &pkt_type, &pkt_id); #endif //_WIN32 svc = mgr->svc; rsn = sp_svc_new_runserial(svc); sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_CALLBACK, &rsn_ctx); sp_svc_push_runserial_context(svc, &rsn_ctx); if (mgr->enabled) { svc = mgr->svc; nsub = pkt_id; sub = (int*)iobuffer_data(pkt, 0); iobuffer_pop_count(pkt, 4*nsub); iobuffer_format_read(pkt, "4888444444444", &instance_id, &log_id, &prev_rsn, &curr_rsn, &original_rsn_type, &rsn_depth, &log_time, &log_type, &log_client_id, &log_epid, &log_severity, &log_sys_error, &log_usr_error); iobuffer_read(pkt, IOBUF_T_I4, ¶m_cnt, NULL); if (param_cnt) { params = (int*)malloc(sizeof(int)*param_cnt); for (i = 0; i < param_cnt; ++i) { iobuffer_read(pkt, IOBUF_T_I4, ¶ms[i], NULL); } } iobuffer_format_read(pkt, "s", &msg); { struct list_head cb_list; sp_log_listener_cb *pos; u__int64_t *tmp_sub; char bussinessId[LINKINFO_BUSSID_LEN], traceId[LINKINFO_TRACEID_LEN], spanId[LINKINFO_SPANID_LEN], parentSpanId[LINKINFO_PARENTSPANID_LEN]; INIT_LIST_HEAD(&cb_list); memset(bussinessId, 0, LINKINFO_BUSSID_LEN); memset(traceId, 0, LINKINFO_TRACEID_LEN); memset(spanId, 0, LINKINFO_SPANID_LEN); memset(parentSpanId, 0, LINKINFO_PARENTSPANID_LEN); for (i = 0; i < nsub; ++i) { unsigned int id = (unsigned int)sub[i]; listener_t *listener = listener_find(mgr, id); if (listener) { sp_log_listener_cb *cb = listener->cb; if (list_empty(&cb->__working_list)) { list_add_tail(&cb->__working_entry, &cb_list); } list_add_tail(&listener->working_entry, &cb->__working_list); } } tmp_sub = (u__int64_t*)_alloca(nsub*8); list_for_each_entry(pos, &cb_list, sp_log_listener_cb, __working_entry) { listener_t *k_pos; int cnt = 0; list_for_each_entry(k_pos, &pos->__working_list, listener_t, working_entry) { tmp_sub[cnt++] = k_pos->id; } iobuffer_get_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId); listener_invoke_cb(mgr, pos, cnt, tmp_sub, log_client_id, log_epid, instance_id, log_id, prev_rsn, curr_rsn, original_rsn_type, rsn_depth, log_time, log_type, log_severity, log_sys_error, log_usr_error, param_cnt, params, msg,bussinessId, traceId, spanId, parentSpanId); INIT_LIST_HEAD(&pos->__working_list); } } FREE(msg); free(params); } if (pkt) iobuffer_dec_ref(pkt); sp_log_listener_mgr_dec_ref(mgr);//@ sp_svc_pop_runserial_context(svc); } static int listener_on_pkt(sp_svc_t *svc, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data) { sp_log_listener_mgr_t *mgr = (sp_log_listener_mgr_t*)user_data; int log_cmd = SP_GET_TYPE(pkt_type); if (log_cmd == LOG_CMD_LISTEN_RECORD) { iobuffer_t *pkt = *p_pkt; *p_pkt = NULL; #ifdef _WIN32 iobuffer_format_write_head(pkt, "44444", &pkt_id, &pkt_type, &svc_id, &epid, (void*)&mgr); #else iobuffer_format_write_head(pkt, "44448", &pkt_id, &pkt_type, &svc_id, &epid, (void*)&mgr); #endif //_WIN32 sp_log_listener_mgr_inc_ref(mgr);//@ if (threadpool_queue_workitem(sp_svc_get_threadpool(svc), mgr->strand, &listener_on_pkt_threadpool, pkt) != 0) { sp_log_listener_mgr_dec_ref(mgr);//@ iobuffer_dec_ref(pkt); } return FALSE; } return TRUE; // continue } int sp_log_listener_mgr_create(sp_svc_t *svc, sp_log_on_log on_log, sp_log_listener_mgr_t **p_mgr) { sp_log_listener_mgr_t *mgr = MALLOC_T(sp_log_listener_mgr_t); if (mgr) { int i; INIT_LIST_HEAD(&mgr->cb_list); mgr->enabled = 0; mgr->strand = strand_create(); mgr->on_log = on_log; mgr->ref_cnt = 1; mgr->svc = svc; for (i = 0; i < INDEX_HASHTABLE_SIZE; ++i) { INIT_HLIST_HEAD(&mgr->index[i]); } *p_mgr = mgr; return 0; } return Error_Resource; } int sp_log_listener_mgr_start(sp_log_listener_mgr_t *mgr) { mgr->enabled = 1; sp_svc_add_pkt_handler(mgr->svc, (long)mgr, SP_PKT_LOG, &listener_on_pkt, mgr); return 0; } int sp_log_listener_mgr_stop(sp_log_listener_mgr_t *mgr) { mgr->enabled = 0; sp_svc_remove_pkt_handler(mgr->svc, (long)mgr, SP_PKT_LOG); return 0; } int sp_log_listener_mgr_subscribe(sp_log_listener_mgr_t *mgr, unsigned int *listen_id, void *key, int ignore_msg_body, int log_type, int ent_id, int severity_filter, int sys_code, int user_code) { sp_env_t *env = sp_get_env(); int new_id = sp_env_new_id(env); sp_log_listener_cb *cb = listener_find_cb(mgr, key); listener_t *listener = NULL; iobuffer_t *pkt = NULL; if (!cb) { cb = sp_log_listener_cb_create(key); if (!cb) return Error_Resource; list_add_tail(&cb->__entry, &mgr->cb_list); } listener = MALLOC_T(listener_t); listener->cb = cb; listener->id = new_id; listener->ignore_msg_body = ignore_msg_body; listener_add_index(mgr, listener); list_add_tail(&listener->entry, &cb->__list); pkt = iobuffer_create(-1, -1); iobuffer_format_write(pkt, "444444", &ignore_msg_body, &log_type, &ent_id, &severity_filter, &sys_code, &user_code); sp_svc_post(mgr->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_SUBSCRIBE, listener->id, &pkt); if (pkt) iobuffer_dec_ref(pkt); *listen_id = new_id; return 0; } int sp_log_listener_mgr_unsubscribe(sp_log_listener_mgr_t *mgr, unsigned int listen_id) { listener_t *listener = listener_find(mgr, listen_id); if (listener) { sp_log_listener_cb *cb = listener->cb; iobuffer_t *pkt = NULL; listener_del_index(mgr, listener); list_del(&listener->entry); if (list_empty(&cb->__list)) { // list_del(&mgr->cb_list); {bug} list_del(&cb->__entry); free(cb); } sp_svc_post(mgr->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_LOG|LOG_CMD_UNSUBSCRIBE, listener->id, NULL); free(listener); return 0; } return Error_NotExist; } static void __sp_log_listener_mgr_destroy(sp_log_listener_mgr_t *mgr) { strand_destroy(mgr->strand); free(mgr); } IMPLEMENT_REF_COUNT_MT(sp_log_listener_mgr, sp_log_listener_mgr_t, ref_cnt, __sp_log_listener_mgr_destroy) // // daemon // typedef struct log_filter_key_t log_filter_key_t; typedef struct log_listener_t log_listener_t; typedef struct log_listener_entity_t log_listener_entity_t; struct log_filter_key_t { int log_type; int ent_id; int severity_filter; int sys_code; int user_code; // -1 and -2 has special means, -2 means accept all, -1 means reject any unsigned int index_hash_code; }; struct log_listener_t { struct list_head entry; // < log_listener_entity_t . listener_list int id; int ignore_msg_body; log_filter_key_t index_key; struct hlist_node index_hentry; // < sp_log_daemon_t . arr_filter_index struct hlist_node working_hentry; log_listener_entity_t *owner; // = log_listener_entity_t }; struct log_listener_entity_t { struct list_head listener_list; // > log_listener_t . entry int epid; int svc_id; }; struct sp_log_daemon_t { sp_log_on_log on_log; sp_log_on_flush on_flush; sp_log_on_timeout_interval on_timeout_interval; void *user_data; sp_svc_t *svc; CRITICAL_SECTION lock; iobuffer_queue_t *pkt_queue; HANDLE pkt_sem; HANDLE stop_evt; HANDLE worker_thread; sp_uid_t last_log_id; struct hlist_head arr_filter_index[FILTER_HASHTABLE_SIZE]; // log_listener_t . index_hentry int masks[16]; int masks_cnt; array_header_t *arr_entity; // > log_listener_entity_t }; static __inline void daemon_lock(sp_log_daemon_t *daemon) { EnterCriticalSection(&daemon->lock); } static __inline void daemon_unlock(sp_log_daemon_t *daemon) { LeaveCriticalSection(&daemon->lock); } static __inline unsigned int hash_filter(int log_type, int ent_id, int severity_filter, int sys_code, int user_code) { unsigned int t = (log_type << 16) | (ent_id << 8) | severity_filter; return jhash_3words(t, sys_code, user_code, 0); } static log_listener_entity_t* daemon_find_entity(sp_log_daemon_t *daemon, int ent_id) { int i; for (i = 0; i < daemon->arr_entity->nelts; ++i) { log_listener_entity_t *listen_ent = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*); if (listen_ent->svc_id == ent_id) return listen_ent; } return NULL; } static log_listener_t *daemon_find_listener(sp_log_daemon_t *daemon, int listen_id, int *idx) { int i; for (i = 0; i < daemon->arr_entity->nelts; ++i) { log_listener_entity_t *e = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*); log_listener_t *pos; list_for_each_entry(pos, &e->listener_list, log_listener_t, entry) { if (pos->id == listen_id) { if (idx) *idx = i; return pos; } } } return NULL; } static __inline int calc_filter_mask(int log_type, int ent_id, int severity_filter, int sys_code) { int filter_mask = 0; if (log_type == Log_Ignore) filter_mask |= 1 << LOG_FILTER_BIT_LOGTYPE; if (ent_id == -1) filter_mask |= 1 << LOG_FILTER_BIT_ENTITY; if (severity_filter == Severity_None) filter_mask |= 1 << LOG_FILTER_BIT_SEVERITY; if (sys_code == Error_IgnoreAll) filter_mask |= 1 << LOG_FILTER_BIT_SYSCODE; return filter_mask; } #define OP_ADD 1 #define OP_DEL 0 static void update_filter_mask_table(sp_log_daemon_t *daemon, int filter_mask, int op_add) { int i; if (op_add) { for (i = 0; i < daemon->masks_cnt; ++i) if ((daemon->masks[i]&0xffff) == filter_mask) { int cnt = (daemon->masks[i]&0xffff0000) >> 16; cnt++; daemon->masks[i] = filter_mask | (cnt << 16); break; } if (i == daemon->masks_cnt) daemon->masks[daemon->masks_cnt++] = filter_mask | (1<<16); } else { for (i = 0; i < daemon->masks_cnt; ++i) { if ((daemon->masks[i]&0xffff) == filter_mask) { int cnt = (daemon->masks[i] & 0xffff0000) >> 16; int filter_mask = daemon->masks[i] & 0xffff; cnt--; if (cnt == 0) { if (i != daemon->masks_cnt-1) { daemon->masks[i] = daemon->masks[daemon->masks_cnt-1]; } daemon->masks_cnt--; } else { daemon->masks[i] = ((cnt-1) << 16) | filter_mask; } break; } } } } static __inline void daemon_add_index(sp_log_daemon_t *daemon, log_listener_t *e) { int filter_mask = 0; int slot = e->index_key.index_hash_code % FILTER_HASHTABLE_SIZE; hlist_add_head(&e->index_hentry, &daemon->arr_filter_index[slot]); filter_mask = calc_filter_mask(e->index_key.log_type, e->index_key.ent_id, e->index_key.severity_filter, e->index_key.sys_code); update_filter_mask_table(daemon, filter_mask, OP_ADD); } static __inline void daemon_del_index(sp_log_daemon_t *daemon, log_listener_t *e) { int filter_mask = 0; hlist_del(&e->index_hentry); filter_mask = calc_filter_mask(e->index_key.log_type, e->index_key.ent_id, e->index_key.severity_filter, e->index_key.sys_code); update_filter_mask_table(daemon, filter_mask, OP_DEL); } static int daemon_listener_need_log(sp_log_daemon_t *daemon, log_listener_t *e, int log_type, int log_severity, int log_sys_error, int log_usr_error, int log_client_id) { log_filter_key_t *key = &e->index_key; if (key->ent_id == -1 || key->ent_id == log_client_id) { if (key->log_type == Log_Ignore || key->log_type == log_type) { if (key->severity_filter == Severity_None || key->severity_filter == log_severity) { if (key->sys_code == Error_IgnoreAll || key->sys_code == log_sys_error) { if (key->user_code == -2 || (key->user_code == -1 && log_usr_error) || key->user_code == log_usr_error) return TRUE; } } } } return FALSE; } static void daemon_on_sys(sp_svc_t *svc,int epid, int state, void *user_data) { sp_log_daemon_t *daemon = (sp_log_daemon_t*)user_data; if (state == BUS_STATE_OFF) { int i = 0; daemon_lock(daemon); for (i = 0; i < daemon->arr_entity->nelts; ++i) { log_listener_entity_t *listen_ent = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*); if (listen_ent->epid == epid) { log_listener_t *pos, *n; list_for_each_entry_safe(pos, n, &listen_ent->listener_list, log_listener_t, entry) { list_del(&pos->entry); daemon_del_index(daemon, pos); free(pos); } ARRAY_DEL(daemon->arr_entity, i, log_listener_entity_t*); free(listen_ent); } } daemon_unlock(daemon); } } static int daemon_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data) { sp_log_daemon_t *daemon = (sp_log_daemon_t*)user_data; int log_cmd = SP_GET_TYPE(pkt_type); int log_client_id = pkt_id; if (log_cmd == LOG_CMD_RECORD || log_cmd == LOG_CMD_FLUSH) { if (log_cmd == LOG_CMD_RECORD) { char bussinessId[LINKINFO_BUSSID_LEN]; char traceId[LINKINFO_TRACEID_LEN]; char spanId[LINKINFO_SPANID_LEN]; char parentSpanId[LINKINFO_PARENTSPANID_LEN]; iobuffer_get_linkInfo(*p_pkt, bussinessId, traceId, spanId, parentSpanId); iobuffer_write_head(*p_pkt, IOBUF_T_BUF, parentSpanId, sizeof(parentSpanId)); iobuffer_write_head(*p_pkt, IOBUF_T_BUF, spanId, sizeof(spanId)); iobuffer_write_head(*p_pkt, IOBUF_T_BUF, traceId, sizeof(traceId)); iobuffer_write_head(*p_pkt, IOBUF_T_BUF, bussinessId, sizeof(bussinessId)); } iobuffer_write_head(*p_pkt, IOBUF_T_I4, &pkt_id, 0); iobuffer_write_head(*p_pkt, IOBUF_T_I4, &pkt_type, 0); iobuffer_write_head(*p_pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_write_head(*p_pkt, IOBUF_T_I4, &epid, 0); daemon_lock(daemon); iobuffer_queue_enqueue(daemon->pkt_queue, *p_pkt); daemon_unlock(daemon); ReleaseSemaphore(daemon->pkt_sem, 1, NULL); *p_pkt = NULL; } else if (log_cmd == LOG_CMD_SUBSCRIBE) { int id = pkt_id; int ignore_msg_body; int log_type; int ent_id; int severity_filter; int sys_code; int user_code; log_listener_entity_t *listen_ent; iobuffer_format_read(*p_pkt, "444444", &ignore_msg_body, &log_type, &ent_id, &severity_filter, &sys_code, &user_code); daemon_lock(daemon); listen_ent = daemon_find_entity(daemon, svc_id); if (!listen_ent) { listen_ent = MALLOC_T(log_listener_entity_t); listen_ent->epid = epid; listen_ent->svc_id = svc_id; INIT_LIST_HEAD(&listen_ent->listener_list); ARRAY_PUSH(daemon->arr_entity, log_listener_entity_t*) = listen_ent; } if (daemon_find_listener(daemon, id, NULL) == NULL) { log_listener_t *listener = MALLOC_T(log_listener_t); log_filter_key_t *key = &listener->index_key; listener->owner = listen_ent; listener->ignore_msg_body = ignore_msg_body; listener->id = id; INIT_HLIST_NODE(&listener->working_hentry); key->ent_id = ent_id; key->log_type = log_type; key->severity_filter = severity_filter; key->sys_code = sys_code; key->user_code = user_code; key->index_hash_code = hash_filter(log_type, ent_id, severity_filter, sys_code, user_code); daemon_add_index(daemon, listener); list_add_tail(&listener->entry, &listen_ent->listener_list); } daemon_unlock(daemon); } else if (log_cmd == LOG_CMD_UNSUBSCRIBE) { int i; int id = pkt_id; log_listener_t* tmp; daemon_lock(daemon); tmp = daemon_find_listener(daemon, id, &i); if (tmp) { list_del(&tmp->entry); if (list_empty(&tmp->owner->listener_list)) { ARRAY_DEL(daemon->arr_entity, i, log_listener_entity_t*); free(tmp->owner); } daemon_del_index(daemon, tmp); free(tmp); } daemon_unlock(daemon); } else { TOOLKIT_ASSERT(0); } return TRUE; } static int get_log_iobuffer_header_length(iobuffer_t *pkt) { //"4888444444444" int nparam; int len = 64; int rs = iobuffer_get_read_state(pkt); iobuffer_pop_count(pkt, len); iobuffer_read(pkt, IOBUF_T_I4, &nparam, 0); len += 4; len += 4 * nparam; iobuffer_restore_read_state(pkt, rs); return len; } static void daemon_bcast(sp_log_daemon_t *daemon, int from_client_id, iobuffer_t *pkt) { int i, j; int instance_id; u__int64_t prev_rsn; u__int64_t curr_rsn; int original_rsn_type; int rsn_depth; u__int64_t log_id; unsigned int log_time; int log_type, log_severity, log_sys_error, log_usr_error, log_client_id, log_epid; int rs = iobuffer_get_read_state(pkt); struct { struct hlist_head ent; int ignore_msg_body; }results[SP_MAX_ENTITY]; for (i = 0; i < SP_MAX_ENTITY; ++i) { INIT_HLIST_HEAD(&results[i].ent); results[i].ignore_msg_body = 1; } iobuffer_format_read(pkt, "4888444444444", &instance_id, &log_id, &prev_rsn, &curr_rsn, &original_rsn_type, &rsn_depth, &log_time, &log_type, &log_client_id, &log_epid, &log_severity, &log_sys_error, &log_usr_error); iobuffer_restore_read_state(pkt, rs); daemon_lock(daemon); for (j = 0; j < 3; ++j) { int t_user_code; if (j == 0) { t_user_code = log_usr_error; // strict match } else if (j == 1) { t_user_code = -2; // accept any } else { if (log_usr_error) { t_user_code = -1; // reject ones that has no user code } else { continue; } } for (i = 0; i < daemon->masks_cnt; ++i) { int t = daemon->masks[i] & 0xffff; int t_log_type = (t & BIT_MASK(LOG_FILTER_BIT_LOGTYPE)) ? Log_Ignore : log_type; int t_ent_id = (t & BIT_MASK(LOG_FILTER_BIT_ENTITY)) ? -1 : log_client_id; int t_severity_filter = (t & BIT_MASK(LOG_FILTER_BIT_SEVERITY)) ? Severity_None : log_severity; int t_sys_code = (t & BIT_MASK(LOG_FILTER_BIT_SYSCODE)) ? Error_IgnoreAll : log_sys_error; unsigned int index_hash_code = hash_filter(t_log_type, t_ent_id, t_severity_filter, t_sys_code, t_user_code); log_listener_t *tpos; struct hlist_node *pos; hlist_for_each_entry(tpos, pos, &daemon->arr_filter_index[index_hash_code%FILTER_HASHTABLE_SIZE], log_listener_t, index_hentry) { if (index_hash_code == tpos->index_key.index_hash_code) { if (daemon_listener_need_log(daemon, tpos, log_type, log_severity, log_sys_error, log_usr_error, from_client_id)) { if (tpos->working_hentry.pprev == NULL) { hlist_add_head(&tpos->working_hentry, &results[tpos->owner->svc_id].ent); results[tpos->owner->svc_id].ignore_msg_body &= tpos->ignore_msg_body; } } } } } } for (i = 0; i < SP_MAX_ENTITY; ++i) { if (!hlist_empty(&results[i].ent)) { int ignore_msg_body = results[i].ignore_msg_body; iobuffer_t *copy_pkt = iobuffer_create(-1, -1); struct hlist_node *pos, *n; log_listener_t *tpos; int cnt = 0; int epid = -1; int svc_id = -1; hlist_for_each_entry(tpos, pos, &results[i].ent, log_listener_t, working_hentry) { cnt++; if (epid == -1) { epid = tpos->owner->epid; svc_id = tpos->owner->svc_id; } } //iobuffer_write(copy_pkt, IOBUF_T_I4, &cnt, 0); hlist_for_each_entry_safe(tpos, pos, n, &results[i].ent, log_listener_t, working_hentry) { iobuffer_write(copy_pkt, IOBUF_T_I4, &tpos->id, 0); hlist_del(pos); } // 复制原包内容 if (results[i].ignore_msg_body) { int msg_body_len = 0; int dat_len = get_log_iobuffer_header_length(pkt); void *dat = iobuffer_data(pkt, 0); iobuffer_write(copy_pkt, IOBUF_T_BUF, dat, dat_len); iobuffer_write(copy_pkt, IOBUF_T_7BIT, &msg_body_len, 0); } else { void *dat = iobuffer_data(pkt, 0); int dat_len = iobuffer_get_length(pkt); iobuffer_write(copy_pkt, IOBUF_T_BUF, dat, dat_len); } iobuffer_copy_linkInfo(copy_pkt, pkt); sp_svc_post(daemon->svc, epid, svc_id, SP_PKT_LOG|LOG_CMD_LISTEN_RECORD, cnt, ©_pkt); if (copy_pkt) iobuffer_dec_ref(copy_pkt); } } daemon_unlock(daemon); } static unsigned int __stdcall daemon_work_proc(void *param) { sp_log_daemon_t *daemon = (sp_log_daemon_t*)param; HANDLE hs[] = {daemon->pkt_sem, daemon->stop_evt}; sp_env_t *env = sp_get_env(); for (;;) { DWORD dwRet = WaitForMultipleObjects(array_size(hs), &hs[0], FALSE, DAEMON_LOG_TIMEOUT_INTERVAL); if (dwRet == WAIT_OBJECT_0) { int pkt_id, svc_id, pkt_type, epid; iobuffer_t *pkt; char bussinessId[LINKINFO_BUSSID_LEN], traceId[LINKINFO_TRACEID_LEN], spanId[LINKINFO_SPANID_LEN], parentSpanId[LINKINFO_PARENTSPANID_LEN]; daemon_lock(daemon); pkt = iobuffer_queue_deque(daemon->pkt_queue); daemon_unlock(daemon); iobuffer_read(pkt, IOBUF_T_I4, &epid, 0); iobuffer_read(pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, 0); if (SP_GET_TYPE(pkt_type) == LOG_CMD_RECORD) { int readLen = 0; memset(bussinessId, 0, LINKINFO_BUSSID_LEN); memset(traceId, 0, LINKINFO_TRACEID_LEN); memset(spanId, 0, LINKINFO_SPANID_LEN); memset(parentSpanId, 0, LINKINFO_PARENTSPANID_LEN); readLen = LINKINFO_BUSSID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, bussinessId, &readLen); readLen = LINKINFO_TRACEID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, traceId, &readLen); readLen = LINKINFO_SPANID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, spanId, &readLen); readLen = LINKINFO_PARENTSPANID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, parentSpanId, &readLen); iobuffer_set_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId); int instance_id; u__int64_t log_id; u__int64_t prev_rsn; u__int64_t curr_rsn; int original_rsn_type; int rsn_depth; unsigned int log_time; int log_type, log_severity, log_sys_error, log_usr_error, log_client_id, log_epid; char *msg = NULL; int param_cnt; int *params = NULL; int i; iobuffer_read(pkt, IOBUF_T_I4, &instance_id, NULL); iobuffer_read(pkt, IOBUF_T_I8, &log_id, NULL); daemon->last_log_id = log_id = sp_uid_update(daemon->last_log_id); log_id = sp_uid_change_app_id(log_id, 0); iobuffer_write_head(pkt, IOBUF_T_I8, &log_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &instance_id, 0); if (svc_id != SP_INVALID_SVC_ID) daemon_bcast(daemon, svc_id, pkt); iobuffer_read(pkt, IOBUF_T_I4, &instance_id, NULL); iobuffer_read(pkt, IOBUF_T_I8, &log_id, NULL); iobuffer_read(pkt, IOBUF_T_I8, &prev_rsn, NULL); iobuffer_read(pkt, IOBUF_T_I8, &curr_rsn, NULL); iobuffer_read(pkt, IOBUF_T_I4, &original_rsn_type, NULL); iobuffer_read(pkt, IOBUF_T_I4, &rsn_depth, NULL); iobuffer_read(pkt, IOBUF_T_I4, &log_time, NULL); iobuffer_read(pkt, IOBUF_T_I4, &log_type, NULL); iobuffer_read(pkt, IOBUF_T_I4, &log_client_id, NULL); iobuffer_read(pkt, IOBUF_T_I4, &log_epid, NULL); iobuffer_read(pkt, IOBUF_T_I4, &log_severity, NULL); iobuffer_read(pkt, IOBUF_T_I4, &log_sys_error, NULL); iobuffer_read(pkt, IOBUF_T_I4, &log_usr_error, NULL); /** TODO: 移出与实体关联的特定事件!!*/ // GPIO移动事件不写日志 if (log_usr_error != 0x2090000A && log_usr_error != 0x20900009) { iobuffer_read(pkt, IOBUF_T_I4, ¶m_cnt, NULL); if (param_cnt) { params = (int*)malloc(sizeof(int)*param_cnt); for (i = 0; i < param_cnt; ++i) { iobuffer_read(pkt, IOBUF_T_I4, ¶ms[i], NULL); } } iobuffer_format_read(pkt, "s", &msg); daemon->on_log(daemon, 0, 0, log_client_id, log_epid, instance_id, log_id, prev_rsn, curr_rsn, original_rsn_type, rsn_depth, log_time, log_type, log_severity, log_sys_error, log_usr_error, param_cnt, params, msg, daemon->user_data, bussinessId, traceId, spanId, parentSpanId); FREE(msg); free(params); } } else if (SP_GET_TYPE(pkt_type) == LOG_CMD_FLUSH) { SYSTEMTIME st; GetLocalTime(&st); daemon->on_flush(daemon, pkt_id, &st, daemon->user_data); } else { TOOLKIT_ASSERT(0); } iobuffer_dec_ref(pkt); } else if (dwRet == WAIT_OBJECT_0+1) { break; // stop } else if (dwRet == WAIT_TIMEOUT) { SYSTEMTIME st; GetLocalTime(&st); daemon->on_timeout_interval(daemon, &st, daemon->user_data); } } return 0; } int sp_log_daemon_create(sp_log_on_log on_log, sp_log_on_flush on_flush, sp_log_on_timeout_interval on_timeout_interval, void *user_data, sp_svc_t *svc, sp_log_daemon_t **p_daemon) { sp_log_daemon_t *daemon = ZALLOC_T(sp_log_daemon_t); sp_env_t *env = sp_get_env(); int i; daemon->on_log = on_log; daemon->on_flush = on_flush; daemon->on_timeout_interval = on_timeout_interval; daemon->user_data = user_data; daemon->svc = svc; daemon->pkt_queue = iobuffer_queue_create(); daemon->arr_entity = array_make(0, sizeof(log_listener_entity_t*)); InitializeCriticalSection(&daemon->lock); daemon->stop_evt = CreateEventA(NULL, TRUE, FALSE, NULL); daemon->pkt_sem = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL); daemon->last_log_id = sp_uid_make(0); daemon->masks_cnt = 0; for (i = 0; i < FILTER_HASHTABLE_SIZE; ++i) { INIT_HLIST_HEAD(&daemon->arr_filter_index[i]); } daemon->worker_thread = (HANDLE)_beginthreadex(NULL, 0, &daemon_work_proc, daemon, 0, NULL); if (!daemon->worker_thread) { sp_log_daemon_destroy(daemon); return Error_Resource; } sp_svc_add_pkt_handler(svc, (long)daemon, SP_PKT_LOG, &daemon_on_pkt, daemon); sp_svc_add_sys_handler(svc, (long)daemon, &daemon_on_sys, daemon); *p_daemon = daemon; return 0; } int sp_log_daemon_destroy(sp_log_daemon_t *daemon) { int i; if (daemon->worker_thread) { ///**TODO(Gifur@5/6/2022): *’ to ‘int’ loses precision [-fpermissive] */ sp_svc_remove_pkt_handler(daemon->svc, (long)daemon, SP_PKT_LOG); sp_svc_remove_sys_handler(daemon->svc, (long)daemon); SetEvent(daemon->stop_evt); WaitForSingleObject(daemon->worker_thread, INFINITE); CloseHandle(daemon->worker_thread); } CloseHandle(daemon->stop_evt); for (i = 0; i < daemon->arr_entity->nelts; ++i) { log_listener_entity_t *tmp = ARRAY_IDX(daemon->arr_entity, i, log_listener_entity_t*); log_listener_t *pos, *n; list_for_each_entry_safe(pos, n, &tmp->listener_list, log_listener_t, entry) { list_del(&pos->entry); free(pos); } free(tmp); } array_free(daemon->arr_entity); if (daemon->pkt_sem) { CloseHandle(daemon->pkt_sem); } iobuffer_queue_destroy(daemon->pkt_queue); DeleteCriticalSection(&daemon->lock); free(daemon); return 0; }