#include "precompile.h" #include "sp_iom.h" #include "sp_def.h" #include "spinlock.h" #include "timerqueue.h" #include "array.h" #include "memutil.h" #include "sp_logwithlinkforc.h" #include #include "dbgutil.h" #define TAG SPBASE_TAG("sp_iom") #ifndef _WIN32 #include #include static int GetParentProcessID() { #if 0 return getppid(); #else return (int)syscall(SYS_getppid); #endif } #endif //NOT _WIN32 #define POLL_INTERVAL 10 #define IOM_T_EXIT 0 #define IOM_T_GET_STATE 1 #define IOM_T_SEND_INFO 2 static int translate_error(int bus_error) { int error; switch (bus_error) { case BUS_E_OK: error = Error_Succeed; break; case BUS_E_FAIL: error = Error_Unexpect; break; case BUS_E_NETBROKEN: error = Error_NetBroken; break; case BUS_E_NOTFOUND: error = Error_NotExist; break; default: error = Error_Unexpect; break; } return error; } typedef struct pkt_handler_t { int key; sp_iom_on_pkt on_pkt; void *user_data; }pkt_handler_t; typedef struct sys_handler_t { int key; sp_iom_on_sys on_sys; void *user_data; }sys_handler_t; struct sp_iom_t { bus_endpt_t *endpt; timer_queue_t *tm_queue; spinlock_t tm_lock; spinlock_t pkt_handler_lock; spinlock_t sys_handler_lock; array_header_t *arr_pkt_handler; array_header_t *arr_sys_handler; int stop; int poll_thread_id; }; static __inline void iom_pkt_handler_lock(sp_iom_t *iom) { spinlock_enter(&iom->pkt_handler_lock, -1); } static __inline void iom_pkt_handler_unlock(sp_iom_t *iom) { spinlock_leave(&iom->pkt_handler_lock); } static __inline void iom_sys_handler_lock(sp_iom_t *iom) { spinlock_enter(&iom->sys_handler_lock, -1); } static __inline void iom_sys_handler_unlock(sp_iom_t *iom) { spinlock_leave(&iom->sys_handler_lock); } static __inline void iom_tm_lock(sp_iom_t *iom) { spinlock_enter(&iom->tm_lock, -1); } static __inline void iom_tm_unlock(sp_iom_t *iom) { spinlock_leave(&iom->tm_lock); } static pkt_handler_t* find_pkt_handler(sp_iom_t *iom, int key, int *idx) { int i; for (i = 0; i < iom->arr_pkt_handler->nelts; ++i) { pkt_handler_t *t = ARRAY_IDX(iom->arr_pkt_handler, i, pkt_handler_t*); if (t->key == key) { if (idx) *idx = i; return t; } } return NULL; } static sys_handler_t* find_sys_handler(sp_iom_t *iom, int key, int *idx) { int i; for (i = 0; i < iom->arr_sys_handler->nelts; ++i) { sys_handler_t *t = ARRAY_IDX(iom->arr_sys_handler, i, sys_handler_t*); if (t->key == key) { if (idx) *idx = i; return t; } } return NULL; } static void on_pkt(bus_endpt_t *endpt, int epid, int type, iobuffer_t **p_pkt, void *user_data) { sp_iom_t *iom = (sp_iom_t *)user_data; int i; iom_pkt_handler_lock(iom); for (i = 0; i < iom->arr_pkt_handler->nelts; ++i) { pkt_handler_t *pkt_handler = ARRAY_IDX(iom->arr_pkt_handler, i, pkt_handler_t*); if (pkt_handler->on_pkt) { int read_state = iobuffer_get_read_state(*p_pkt); int write_state = iobuffer_get_write_state(*p_pkt); int svc_id; int from_svc_id; int pkt_id; int count; //WLog_DBG(TAG, "to get svc info"); iobuffer_format_read(*p_pkt, "444", &from_svc_id, &svc_id, &pkt_id); //mod和svc会注册处理句柄,如果返回非0,则认为当前处理器无法处理,需要交由下一个处理器处理 count = pkt_handler->on_pkt(iom, svc_id, epid, from_svc_id, type, pkt_id, p_pkt, pkt_handler->user_data); if (count && p_pkt && *p_pkt) { iobuffer_restore_read_state(*p_pkt, read_state); iobuffer_restore_write_state(*p_pkt, write_state); } else { break; } } } iom_pkt_handler_unlock(iom); } static void on_msg(bus_endpt_t *endpt, int msg, int nparam, param_size_t params[], int *result, void *user_data) { sp_iom_t *iom = (sp_iom_t *)user_data; WLog_DBG(TAG, "==> on msg %d, %d, result? %d", msg, nparam, result != NULL ? 1: 0); if (msg == IOM_T_SEND_INFO) { int pkt_type = params[0]; int this_svc_id = params[1]; int epid = params[2]; int svc_id = params[3]; int pkt_id = params[4]; iobuffer_t *pkt = (iobuffer_t*)params[5]; int read_state, write_state; int ret; read_state = iobuffer_get_read_state(pkt); write_state = iobuffer_get_write_state(pkt); iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &this_svc_id, 0); if (result) { int state = 0; //WLog_DBG(TAG, "get state first"); ret = bus_endpt_get_state(endpt, epid, &state); if (ret == 0) { ret = (state == BUS_STATE_ON) ? 0 : -1; #ifndef _WIN32 if (ret != 0) sp_dbg_warn("bus endpt state is invalid, would not send any IOM_T_SEND_INFO type msg: %d", state); #endif //NOT _WIN32 } if (ret == 0) ret = bus_endpt_send_info(endpt, epid, pkt_type, pkt); *result = ret; if (ret != 0) { iobuffer_restore_read_state(pkt, read_state); iobuffer_restore_write_state(pkt, write_state); } else { iobuffer_dec_ref(pkt); } } else { bus_endpt_send_info(endpt, epid, pkt_type, pkt); iobuffer_dec_ref(pkt); } WLog_DBG(TAG, "<== on_msg send_info end, pkt type: 0x%08X, this_svc_id:%d, epid:%d, svc_id:%d, pkt_id:%d", pkt_type, this_svc_id, epid, svc_id, pkt_id); } else if (msg == IOM_T_GET_STATE) { int epid = params[0]; int *state = (int*)params[1]; int *rc = (int*)params[2]; *rc = bus_endpt_get_state(endpt, epid, state); } else if (msg == IOM_T_EXIT) { sp_iom_stop(iom); } else { TOOLKIT_ASSERT(0); } } static void on_sys(bus_endpt_t *endpt, int epid, int state, void *user_data) { sp_iom_t *iom = (sp_iom_t *)user_data; int i; iom_sys_handler_lock(iom); for (i = 0; i < iom->arr_sys_handler->nelts; ++i) { sys_handler_t *sys_handler = ARRAY_IDX(iom->arr_sys_handler, i, sys_handler_t*); if (sys_handler->on_sys) { sys_handler->on_sys(iom, epid, state, sys_handler->user_data); } } iom_sys_handler_unlock(iom); } int sp_iom_create(const char *url, int epid, sp_iom_t **p_iom) { sp_iom_t *iom; bus_endpt_callback callback; int rc; iom = ZALLOC_T(sp_iom_t); callback.on_evt = NULL; callback.on_sys = &on_sys; callback.on_pkt = &on_pkt; callback.on_inf = &on_pkt; callback.on_msg = &on_msg; callback.user_data = iom; rc = bus_endpt_create(url, epid, &callback, &iom->endpt); if (rc != 0) { WLog_ERR(TAG, "bus endpoint create failed!"); goto on_error; } timer_heap_create(&iom->tm_queue); spinlock_init(&iom->pkt_handler_lock); spinlock_init(&iom->sys_handler_lock); spinlock_init(&iom->tm_lock); iom->arr_pkt_handler = array_make(3, sizeof(pkt_handler_t*)); iom->arr_sys_handler = array_make(3, sizeof(sys_handler_t*)); iom->poll_thread_id = 0; *p_iom = iom; return 0; on_error: free(iom); return Error_Unexpect; } void sp_iom_destroy(sp_iom_t *iom) { timer_queue_destroy(iom->tm_queue); bus_endpt_destroy(iom->endpt); /**replaced with toolkit_array_free2 [5/23/2020 Gifur] */ //array_free(iom->arr_pkt_handler); //array_free(iom->arr_sys_handler); array_free2(iom->arr_pkt_handler); array_free2(iom->arr_sys_handler); free(iom); } int sp_iom_add_pkt_handler(sp_iom_t *iom, int key, sp_iom_on_pkt on_pkt, void *user_data) { int rc = 0; pkt_handler_t *pkt_handler; iom_pkt_handler_lock(iom); pkt_handler = find_pkt_handler(iom, key, NULL); if (!pkt_handler) { pkt_handler = MALLOC_T(pkt_handler_t); pkt_handler->key = key; pkt_handler->on_pkt = on_pkt; pkt_handler->user_data = user_data; ARRAY_PUSH(iom->arr_pkt_handler, pkt_handler_t*) = pkt_handler; } else { rc = Error_Duplication; } iom_pkt_handler_unlock(iom); return rc; } int sp_iom_remove_pkt_handler(sp_iom_t *iom, int key) { int rc = 0; pkt_handler_t *pkt_handler; int i; iom_pkt_handler_lock(iom); pkt_handler = find_pkt_handler(iom, key, &i); if (pkt_handler) { if (i < iom->arr_pkt_handler->nelts-1) ARRAY_IDX(iom->arr_pkt_handler, i, pkt_handler_t*) = ARRAY_IDX(iom->arr_pkt_handler, iom->arr_pkt_handler->nelts-1, pkt_handler_t*); array_pop(iom->arr_pkt_handler); free(pkt_handler); } else { rc = Error_NotExist; } iom_pkt_handler_unlock(iom); return rc; } int sp_iom_add_sys_handler(sp_iom_t *iom, int key, sp_iom_on_sys on_sys, void *user_data) { int rc = 0; sys_handler_t *sys_handler; iom_sys_handler_lock(iom); sys_handler = find_sys_handler(iom, key, NULL); if (!sys_handler) { sys_handler = MALLOC_T(sys_handler_t); sys_handler->key = key; sys_handler->on_sys = on_sys; sys_handler->user_data = user_data; ARRAY_PUSH(iom->arr_sys_handler, sys_handler_t*) = sys_handler; } else { rc = Error_Duplication; } iom_sys_handler_unlock(iom); return rc; } int sp_iom_remove_sys_handler(sp_iom_t *iom, int key) { int rc = 0; sys_handler_t *sys_handler; int i; iom_sys_handler_lock(iom); sys_handler = find_sys_handler(iom, key, &i); if (sys_handler) { ARRAY_DEL(iom->arr_sys_handler, i, sys_handler_t*); free(sys_handler); } else { rc = Error_NotExist; } iom_sys_handler_unlock(iom); return rc; } static int sp_iom_poll(sp_iom_t *iom, int *timeout) { int rc; if (iom->poll_thread_id == 0) { iom->poll_thread_id = (int)GetCurrentThreadId(); } rc = bus_endpt_poll(iom->endpt, *timeout); for (;;) { int cnt; long timerBeginTime, timerPlayTime; iom_tm_lock(iom); timerBeginTime = clock(); cnt = timer_queue_poll_one(iom->tm_queue, NULL, timeout); // timeout返回下一个定时器超时间隔 timerPlayTime = clock() - timerBeginTime; /* if (timerPlayTime > 500) //timer play time over than 500ms, it may make the main thread run slow DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "timer queue poll one consumed %d ms, trigger timer count: %d", timerPlayTime, cnt); */ iom_tm_unlock(iom); if (!cnt) { //sp_dbg_debug("no timer execute current times."); break; } } return rc; } int sp_iom_run(sp_iom_t *iom) { int timeout = POLL_INTERVAL; #ifdef RVC_OS_WIN while (InterlockedExchangeAdd((LONG*)&iom->stop, 0) == 0 || timer_queue_get_count(iom->tm_queue) > 0) { int rc = sp_iom_poll(iom, &timeout); if (rc >= 0) { if (timeout > POLL_INTERVAL || timeout < 0) timeout = POLL_INTERVAL; } else { DbgWithLinkForC(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM, "iom poll failed!"); ExitProcess(-1); return rc; } } #else int parent_id = GetParentProcessID(); /*the adapte func 'InterlockedExchangeAdd' implemented under winpr went wrong, maybe 64bit has responsibility*/ while (iom->stop == 0 || timer_queue_get_count(iom->tm_queue) > 0) { int rc = sp_iom_poll(iom, &timeout); if (rc >= 0) { if (timeout > POLL_INTERVAL || timeout < 0) timeout = POLL_INTERVAL; } else { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "iom poll failed! %d", rc); return rc; } if (rc == 0) { const int cur_ppid = GetParentProcessID(); if (cur_ppid != parent_id) { if (cur_ppid == 1 /*init's pid*/) { DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "for moudle, spshell gone while for spshell, guardian gone. last parent pid: %d", parent_id); } parent_id = cur_ppid; /*just test, or would print lots of log here.*/ } } } #endif //RVC_OS_WIN DbgWithLinkForC(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM, "iom run exit ok!"); return 0; } int sp_iom_stop(sp_iom_t *iom) { DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "set iom stop flag!"); InterlockedExchange((LONG*)&iom->stop, 1); return 0; } int sp_iom_send(sp_iom_t *iom, int this_svc_id, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt) { iobuffer_t *pkt = p_pkt ? *p_pkt : NULL; int rc; WLog_DBG(TAG, "== > sp_iom_send: pkt type: 0x%08X, this_svc id: %d, epid:%d, svc id:%d, pkt_id: %d", pkt_type, this_svc_id, epid, svc_id, pkt_id); if (!pkt) { pkt = iobuffer_create(-1, -1); } { param_size_t params[] = { pkt_type, this_svc_id, epid, svc_id, pkt_id, (param_size_t)pkt, }; rc = bus_endpt_send_msg(iom->endpt, IOM_T_SEND_INFO, array_size(params), params); } if (!p_pkt || !*p_pkt) { if (rc != 0) { iobuffer_dec_ref(pkt); } } else { if (rc == 0) *p_pkt = NULL; } if (rc != 0) rc = Error_IO; WLog_DBG(TAG, "<== sp_iom_send: pkt type: 0x%08X, this_svc id: %d, epid:%d, svc id:%d, pkt_id: %d, rc=%d!", pkt_type, this_svc_id, epid, svc_id, pkt_id, rc); return rc; } int sp_iom_post(sp_iom_t *iom, int this_svc_id, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt) { iobuffer_t *pkt = p_pkt ? *p_pkt : NULL; int rc; if (!pkt) { pkt = iobuffer_create(-1, -1); } { param_size_t params[] = { pkt_type, this_svc_id, epid, svc_id, pkt_id, (param_size_t)pkt, }; rc = bus_endpt_post_msg(iom->endpt, IOM_T_SEND_INFO, array_size(params), params); } if (!p_pkt || !*p_pkt) { if (rc != 0) iobuffer_dec_ref(pkt); } else { if (rc == 0) *p_pkt = NULL; } if (rc != 0) { DbgWithLinkForC(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM, "iom post unexpected, epid:%d, rc:%d", epid, rc); rc = Error_Unexpect; } return rc; } int sp_iom_get_state(sp_iom_t *iom, int epid, int *state) { int rc = 0; int rc1; param_size_t params[] = {epid, (param_size_t)state, (param_size_t)&rc}; // use -1 for get state rc1 = bus_endpt_send_msg(iom->endpt, IOM_T_GET_STATE, array_size(params), params); if (rc != 0) { DbgWithLinkForC(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM, "get epid(%d) state: %d, rc1:%d, rc:%d", epid, *state, rc1, rc); rc = Error_Unexpect; } return rc1 == 0 ? rc : Error_Unexpect; } int sp_iom_schedule_timer(sp_iom_t *iom, timer_entry *entry, unsigned int delay) { int rc; iom_tm_lock(iom); rc = timer_queue_schedule(iom->tm_queue, entry, delay); iom_tm_unlock(iom); return rc ? Error_Unexpect : 0; } int sp_iom_cancel_timer(sp_iom_t *iom, timer_entry *entry, int cancel) { int rc; iom_tm_lock(iom); rc = timer_queue_cancel(iom->tm_queue, entry, cancel); iom_tm_unlock(iom); return rc ? Error_Unexpect : 0; } int sp_iom_get_epid(sp_iom_t *iom) { return bus_endpt_get_epid(iom->endpt); } const char *sp_iom_get_comm_url(sp_iom_t *iom) { return bus_endpt_get_url(iom->endpt); } int sp_iom_post_quit(sp_iom_t *iom) { int rc = bus_endpt_post_msg(iom->endpt, IOM_T_EXIT, 0, 0); if (rc != 0) rc = Error_Unexpect; return rc; } int sp_iom_get_poll_thread_id(sp_iom_t *iom) { return iom->poll_thread_id; }