#include "precompile.h" #include "sp_bcm.h" #include "sp_svc.h" #include "sp_def.h" #include "sp_logwithlinkforc.h" #include "sp_mod.h" #include "sp_env.h" #include "SpBase.h" #include "iobuffer.h" #include "memutil.h" #include "refcnt.h" #include "array.h" #include "list.h" #include "dbgutil.h" #include #include #include "StartUpBase.h" #define BCM_CMD_MSG 0x01 #define BCM_CMD_SUBSCRIBE 0x02 #define BCM_CMD_UNSUBSCRIBE 0x03 struct sp_bcm_client_t { sp_svc_t *svc; }; int sp_bcm_client_create(sp_svc_t *svc, sp_bcm_client_t **p_client) { sp_bcm_client_t *client = MALLOC_T(sp_bcm_client_t); client->svc = svc; *p_client = client; return 0; } void sp_bcm_client_destroy(sp_bcm_client_t *client) { free(client); } int sp_bcm_client_bcast(sp_bcm_client_t *client, int message_id, int message_sig, iobuffer_t **p_pkt) { int rs = iobuffer_get_read_state(*p_pkt); int ws = iobuffer_get_write_state(*p_pkt); sp_uid_t client_id = sp_svc_get_id(client->svc); int rc; iobuffer_write_head(*p_pkt, IOBUF_T_I4, &message_sig, 0); iobuffer_write_head(*p_pkt, IOBUF_T_I4, &message_id, 0); //TODO: convert sizeof(int64) to IOBUF_T_I4 iobuffer_write_head(*p_pkt, IOBUF_T_I4, &client_id, 0); rc = sp_svc_post(client->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_BCM|BCM_CMD_MSG, sp_svc_get_id(client->svc), p_pkt); if (*p_pkt) { iobuffer_restore_read_state(*p_pkt, rs); iobuffer_restore_write_state(*p_pkt, ws); } return rc; } struct sp_bcm_listener_t { int subcribed; int target_ent_id; sp_bcm_listener_cb cb; char *param; void *tag; sp_uid_t uid; sp_svc_t *svc; strand_t *strand; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; static void listener_on_pkt_threadpool(threadpool_t *threadpool, void *arg) { iobuffer_t *pkt = (iobuffer_t*)arg; sp_bcm_listener_t *listener; int epid; int pkt_type; sp_uid_t svc_id; sp_uid_t pkt_id; sp_uid_t rsn; sp_rsn_context_t rsn_ctx; sp_svc_t *svc; iobuffer_read(pkt, IOBUF_T_PTR, &listener, NULL); iobuffer_read(pkt, IOBUF_T_I4, &epid, NULL); iobuffer_read(pkt, IOBUF_T_I4, &svc_id, NULL); iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, NULL); iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, NULL); svc = listener->svc; rsn = sp_svc_new_runserial(svc); sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_CALLBACK, &rsn_ctx); sp_svc_push_runserial_context(svc, &rsn_ctx); if (listener->subcribed) { if (listener->cb.on_message_raw) { int from_client_id; iobuffer_read(pkt, IOBUF_T_I4, &from_client_id, NULL); listener->cb.on_message_raw(listener, from_client_id, &pkt, listener->cb.user_data); } else if (listener->cb.on_message) { int from_client_id; int message_id; int message_sig; iobuffer_format_read(pkt, "444", &from_client_id, &message_id, &message_sig); listener->cb.on_message(listener, from_client_id, message_id, message_sig, iobuffer_data(pkt, 0), iobuffer_get_length(pkt), listener->cb.user_data); } } if (pkt) iobuffer_dec_ref(pkt); sp_bcm_listener_dec_ref(listener); //@ sp_svc_pop_runserial_context(svc); } static int listener_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data) { sp_bcm_listener_t *listener = (sp_bcm_listener_t *)user_data; int cmd = SP_GET_TYPE(pkt_type); int from_ent_id = pkt_id; sp_uid_t uid; int read_state = iobuffer_get_read_state(*p_pkt); iobuffer_read(*p_pkt, IOBUF_T_I8, &uid, NULL); if (cmd == BCM_CMD_MSG && listener->subcribed && uid == listener->uid) { iobuffer_t *pkt = iobuffer_clone(*p_pkt); iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_type, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &epid, 0); iobuffer_write_head(pkt, IOBUF_T_PTR, &listener, 0); sp_bcm_listener_inc_ref(listener);//@ if (threadpool_queue_workitem(sp_svc_get_threadpool(svc), listener->strand, &listener_on_pkt_threadpool, pkt) < 0) { sp_bcm_listener_dec_ref(listener);//@ iobuffer_dec_ref(pkt); } return FALSE; } else { iobuffer_restore_read_state(*p_pkt, read_state); } return TRUE; // continue } int sp_bcm_listener_create(sp_svc_t *svc, int target_ent_id, const char *param, const sp_bcm_listener_cb *cb, sp_bcm_listener_t **p_listener) { sp_bcm_listener_t *listener; if (!cb->on_message && !cb->on_message_raw) return Error_Param; listener = MALLOC_T(sp_bcm_listener_t); listener->strand = strand_create(); listener->target_ent_id = target_ent_id; listener->svc = svc; listener->param = _strdup(param); listener->uid = 0; memcpy(&listener->cb, cb, sizeof(sp_bcm_listener_cb)); listener->subcribed = 0; REF_COUNT_INIT(&listener->ref_cnt); *p_listener = listener; return 0; } void sp_bcm_listener_destroy(sp_bcm_listener_t *listener) { strand_destroy(listener->strand); sp_bcm_listener_dec_ref(listener); } int sp_bcm_listener_subscribe(sp_bcm_listener_t *listener, sp_uid_t *uid) { int rc; iobuffer_t *pkt = NULL; if (listener->subcribed) return Error_Duplication; sp_svc_add_pkt_handler(listener->svc, (int)listener, SP_PKT_BCM, &listener_on_pkt, listener); listener->uid = sp_svc_new_runserial(listener->svc); pkt = iobuffer_create(-1, -1); iobuffer_write(pkt, IOBUF_T_I8, &listener->uid, 0); iobuffer_write(pkt, IOBUF_T_STR, listener->param, -1); rc = sp_svc_post(listener->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_BCM|BCM_CMD_SUBSCRIBE, listener->target_ent_id, &pkt); if (rc == 0) { listener->subcribed = 1; if (uid) *uid = listener->uid; } if (pkt) iobuffer_dec_ref(pkt); return rc; } int sp_bcm_listener_unsubscribe(sp_bcm_listener_t *listener) { int rc; if (listener->subcribed) { iobuffer_t *pkt = iobuffer_create(-1, -1); iobuffer_write(pkt, IOBUF_T_I8, &listener->uid, 0); rc = sp_svc_post(listener->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_BCM|BCM_CMD_UNSUBSCRIBE, listener->target_ent_id, &pkt); if (rc == 0) { listener->subcribed = 0; sp_svc_remove_pkt_handler(listener->svc, (int)listener, SP_PKT_BCM); } if (pkt) iobuffer_dec_ref(pkt); } else { rc = Error_NotInit; } return rc; } void sp_bcm_listener_set_tag(sp_bcm_listener_t *listener, void *tag) { listener->tag = tag; } void* sp_bcm_listener_get_tag(sp_bcm_listener_t *listener) { return listener->tag; } void sp_bcm_listener_get_uid(sp_bcm_listener_t *listener, sp_uid_t *uid) { *uid = listener->uid; } int sp_bcm_listener_get_target_entity_id(sp_bcm_listener_t *listener) { return listener->target_ent_id; } static void __sp_bcm_listener_destroy(sp_bcm_listener_t *listener) { if (listener->cb.on_destroy) { listener->cb.on_destroy(listener, listener->cb.user_data); } free(listener); } IMPLEMENT_REF_COUNT_MT(sp_bcm_listener, sp_bcm_listener_t, ref_cnt, __sp_bcm_listener_destroy) typedef struct bcm_listener_entry { int epid; int svc_id; int target_ent_id; sp_uid_t uid; struct list_head src_entry; struct list_head dst_entry; char *param; int redirection_queried; // has been sent redirection query }bcm_listener_entry; struct sp_bcm_daemon_t { CRITICAL_SECTION lock; array_header_t *arr_src_list; array_header_t *arr_dst_list; sp_svc_t *svc; }; static void daemon_lock(sp_bcm_daemon_t *daemon) { EnterCriticalSection(&daemon->lock); } static void daemon_unlock(sp_bcm_daemon_t *daemon) { LeaveCriticalSection(&daemon->lock); } static const char *_GetFileName(const char *pszFilePath) { int i=strlen(pszFilePath); for( ; i>0 && pszFilePath[i-1]!='\\'; i--)NULL; return pszFilePath+i; } static int daemon_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data) { sp_bcm_daemon_t *daemon = (sp_bcm_daemon_t *)user_data; int cmd = SP_GET_TYPE(pkt_type); if (cmd == BCM_CMD_MSG) { int from_ent_id = pkt_id; bcm_listener_entry *pos; struct list_head *lst = &ARRAY_IDX(daemon->arr_dst_list, from_ent_id, struct list_head); daemon_lock(daemon); list_for_each_entry(pos, lst, bcm_listener_entry, dst_entry) { iobuffer_t *copy_pkt = iobuffer_clone(*p_pkt); iobuffer_write_head(copy_pkt, IOBUF_T_I8, &pos->uid, 0); sp_svc_post(svc, pos->epid, pos->svc_id, pkt_type, pkt_id, ©_pkt); if (copy_pkt) iobuffer_dec_ref(copy_pkt); } daemon_unlock(daemon); } else if (cmd == BCM_CMD_SUBSCRIBE) { int target_ent_id = pkt_id; sp_entity_t *target_ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, target_ent_id); if (target_ent) { int found = 0; sp_uid_t uid; char *param = NULL; bcm_listener_entry *pos; struct list_head *src_lst = &ARRAY_IDX(daemon->arr_src_list, svc_id, struct list_head); struct list_head *dst_lst = &ARRAY_IDX(daemon->arr_dst_list, target_ent_id, struct list_head); iobuffer_format_read(*p_pkt, "8s", &uid, ¶m); daemon_lock(daemon); list_for_each_entry(pos, src_lst, bcm_listener_entry, src_entry) { if (uid == pos->uid) { found ++; break; } } if (found == 0) { bcm_listener_entry *e = MALLOC_T(bcm_listener_entry); e->redirection_queried = 0; e->epid = epid; e->svc_id = svc_id; e->target_ent_id = target_ent_id; e->uid = uid; e->param = _strdup(param); list_add_tail(&e->src_entry, src_lst); list_add_tail(&e->dst_entry, dst_lst); } else { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "duplicate BCM_CMD_SUBSCRIBE msg!"); } daemon_unlock(daemon); if (found == 0 && (target_ent->state == EntityState_Idle || target_ent->state == EntityState_Busy)) { addStartupStep(target_ent->cfg->idx, "redirect_subscribe", "startup_end", 0, 0, 0); sp_mod_mgr_notify_redirect_subscribe(sp_get_env()->mod_mgr, target_ent, &uid, svc_id, param); } FREE(param); } } else if (cmd == BCM_CMD_UNSUBSCRIBE) { bcm_listener_entry *pos; int target_ent_id = pkt_id; sp_uid_t uid; struct list_head *lst = &ARRAY_IDX(daemon->arr_src_list, svc_id, struct list_head); iobuffer_read(*p_pkt, IOBUF_T_I8, &uid, NULL); daemon_lock(daemon); list_for_each_entry(pos, lst, bcm_listener_entry, src_entry) { if (pos->uid == uid) { list_del(&pos->src_entry); list_del(&pos->dst_entry); free(pos->param); free(pos); break; } } daemon_unlock(daemon); } else { DbgWithLinkForC(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM,"unknown pkt! %s:%d", _GetFileName(__FILE__), __LINE__); TOOLKIT_ASSERT(0); } return TRUE; } static void daemon_on_sys(sp_svc_t *svc,int epid, int state, void *user_data) { sp_bcm_daemon_t *daemon = (sp_bcm_daemon_t *)user_data; if (state == BUS_STATE_OFF) { sp_mod_t *mod = sp_mod_mgr_find_module_by_idx(sp_get_env()->mod_mgr, epid); sp_entity_t *pos; daemon_lock(daemon); list_for_each_entry(pos, &mod->entity_list, sp_entity_t, entry) { bcm_listener_entry *t, *n; struct list_head *lst = &ARRAY_IDX(daemon->arr_src_list, pos->cfg->idx, struct list_head); list_for_each_entry_safe(t, n, lst, bcm_listener_entry, src_entry) { list_del(&t->src_entry); list_del(&t->dst_entry); free(t->param); free(t); } } daemon_unlock(daemon); } } int sp_bcm_deamon_refresh(sp_bcm_daemon_t* daemon) { int nelts = sp_mod_mgr_get_entity_array_nelts(sp_get_env()->mod_mgr); int i; DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "entity array nelts:%d", nelts); /* daemon_lock(daemon); int oldNelts = daemon->arr_dst_list->nelts; //array_reserve(daemon->arr_dst_list, nelts); //array_reserve(daemon->arr_src_list, nelts); //array_header_t* old_dst = daemon->arr_dst_list; //array_header_t* old_src = daemon->arr_src_list; //daemon->arr_dst_list = array_make(nelts, sizeof(struct list_head)); //daemon->arr_src_list = array_make(nelts, sizeof(struct list_head)); /* for (i = 0; i < oldNelts; ++i) { ARRAY_IDX(daemon->arr_dst_list, i, struct list_head*) = ARRAY_IDX(old_dst, i, struct list_head*); ARRAY_IDX(daemon->arr_src_list, i, struct list_head*) = ARRAY_IDX(old_src, i, struct list_head*); } for (i = 0; i < nelts - oldNelts; ++i) { INIT_LIST_HEAD((struct list_head*)array_push(daemon->arr_src_list)); INIT_LIST_HEAD((struct list_head*)array_push(daemon->arr_dst_list)); } daemon_unlock(daemon); */ return 0; } int sp_bcm_daemon_create(sp_svc_t *svc, sp_bcm_daemon_t **p_daemon) { sp_bcm_daemon_t *daemon = ZALLOC_T(sp_bcm_daemon_t); int i; int nelts = 100;// sp_mod_mgr_get_entity_array_nelts(sp_get_env()->mod_mgr); DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "entity array nelts:%d", nelts); daemon->svc = svc; daemon->arr_dst_list = array_make(nelts, sizeof(struct list_head)); daemon->arr_src_list = array_make(nelts, sizeof(struct list_head)); for (i = 0; i < nelts; ++i) { INIT_LIST_HEAD((struct list_head*)array_push(daemon->arr_src_list)); INIT_LIST_HEAD((struct list_head*)array_push(daemon->arr_dst_list)); } InitializeCriticalSection(&daemon->lock); DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "init daemon lock ok!"); sp_svc_add_pkt_handler(svc, (int)daemon, SP_PKT_BCM, &daemon_on_pkt, daemon); sp_svc_add_sys_handler(svc, (int)daemon, &daemon_on_sys, daemon); *p_daemon = daemon; return 0; } void sp_bcm_daemon_destroy(sp_bcm_daemon_t *daemon) { int i; for (i = 0; i < daemon->arr_src_list->nelts; ++i) { bcm_listener_entry *pos, *n; struct list_head *lst = &ARRAY_IDX(daemon->arr_src_list, i, struct list_head); list_for_each_entry_safe(pos, n, lst, bcm_listener_entry, src_entry) { list_del(&pos->src_entry); list_del(&pos->dst_entry); free(pos->param); free(pos); } } array_free(daemon->arr_src_list); array_free(daemon->arr_dst_list); DeleteCriticalSection(&daemon->lock); sp_svc_remove_pkt_handler(daemon->svc, (int)daemon, SP_PKT_BCM); sp_svc_remove_sys_handler(daemon->svc, (int)daemon); free(daemon); } array_header_t *sp_bcm_daemon_get_receiver(sp_bcm_daemon_t *daemon, int entity_id) { struct list_head *lst = &ARRAY_IDX(daemon->arr_dst_list, entity_id, struct list_head); array_header_t *arr = array_make(0, sizeof(sp_bcm_receiver_t*)); bcm_listener_entry *pos; daemon_lock(daemon); list_for_each_entry(pos, lst, bcm_listener_entry, dst_entry) { sp_bcm_receiver_t *rcv = MALLOC_T(sp_bcm_receiver_t); rcv->id = pos->uid; rcv->receiver_id = pos->svc_id; rcv->param = _strdup(pos->param); ARRAY_PUSH(arr, sp_bcm_receiver_t*) = rcv; } daemon_unlock(daemon); return arr; } void sp_bcm_daemon_free_receiver_array(array_header_t *arr_receiver) { int i; for (i = 0; i < arr_receiver->nelts; ++i) { sp_bcm_receiver_t *rcv = ARRAY_IDX(arr_receiver, i, sp_bcm_receiver_t*); free(rcv->param); free(rcv); } array_free(arr_receiver); } int sp_bcm_daemon_process_redirect_subscribe(sp_bcm_daemon_t *daemon, int target_id, const sp_uid_t *uid, int new_target_id) { sp_entity_t *target_ent = NULL; bcm_listener_entry *pos = NULL; struct list_head *list1 = &ARRAY_IDX(daemon->arr_dst_list, target_id, struct list_head); struct list_head *list2 = &ARRAY_IDX(daemon->arr_dst_list, new_target_id, struct list_head); daemon_lock(daemon); list_for_each_entry(pos, list1, bcm_listener_entry, dst_entry) { if (pos->uid == *uid) { //if (pos->redirection_queried == 0) // xkm@20160406: 支持多次重定向 { pos->redirection_queried = 1; if (target_id != new_target_id) { pos->target_ent_id = new_target_id; list_move_tail(&pos->dst_entry, list2); } } break; } } daemon_unlock(daemon); // xkm@20160406: 每次重定向后都触发OnBroadcastSubscribe事件 target_ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, new_target_id); if (pos != NULL && (target_ent->state == EntityState_Idle || target_ent->state == EntityState_Busy)) { DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "notify_redirect_subscribe! target:%s", target_ent->cfg->name); sp_mod_mgr_notify_redirect_subscribe(sp_get_env()->mod_mgr, target_ent, (sp_uid_t *) uid, pos->svc_id, pos->param); } return 0; }