123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- #include "precompile.h"
- #include "sp_bcm.h"
- #include "sp_svc.h"
- #include "sp_def.h"
- #include "sp_logwithlinkforc.h"
- #include "sp_mod.h"
- #include "sp_env.h"
- #include "SpBase.h"
- #include "iobuffer.h"
- #include "memutil.h"
- #include "refcnt.h"
- #include "array.h"
- #include "list.h"
- #include "dbgutil.h"
- #include <winpr/synch.h>
- #include <winpr/string.h>
- #include "StartUpBase.h"
- #define BCM_CMD_MSG 0x01
- #define BCM_CMD_SUBSCRIBE 0x02
- #define BCM_CMD_UNSUBSCRIBE 0x03
- struct sp_bcm_client_t
- {
- sp_svc_t *svc;
- };
- int sp_bcm_client_create(sp_svc_t *svc, sp_bcm_client_t **p_client)
- {
- sp_bcm_client_t *client = MALLOC_T(sp_bcm_client_t);
- client->svc = svc;
- *p_client = client;
- return 0;
- }
- void sp_bcm_client_destroy(sp_bcm_client_t *client)
- {
- free(client);
- }
- int sp_bcm_client_bcast(sp_bcm_client_t *client, int message_id, int message_sig, iobuffer_t **p_pkt)
- {
- int rs = iobuffer_get_read_state(*p_pkt);
- int ws = iobuffer_get_write_state(*p_pkt);
- sp_uid_t client_id = sp_svc_get_id(client->svc);
- int rc;
- iobuffer_write_head(*p_pkt, IOBUF_T_I4, &message_sig, 0);
- iobuffer_write_head(*p_pkt, IOBUF_T_I4, &message_id, 0);
- //TODO: convert sizeof(int64) to IOBUF_T_I4
- iobuffer_write_head(*p_pkt, IOBUF_T_I4, &client_id, 0);
- rc = sp_svc_post(client->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_BCM|BCM_CMD_MSG, sp_svc_get_id(client->svc), p_pkt);
- if (*p_pkt) {
- iobuffer_restore_read_state(*p_pkt, rs);
- iobuffer_restore_write_state(*p_pkt, ws);
- }
- return rc;
- }
- struct sp_bcm_listener_t
- {
- int subcribed;
- int target_ent_id;
- sp_bcm_listener_cb cb;
- char *param;
- void *tag;
- sp_uid_t uid;
- sp_svc_t *svc;
- strand_t *strand;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- static void listener_on_pkt_threadpool(threadpool_t *threadpool, void *arg)
- {
- iobuffer_t *pkt = (iobuffer_t*)arg;
- sp_bcm_listener_t *listener;
- int epid;
- int pkt_type;
- sp_uid_t svc_id;
- sp_uid_t pkt_id;
- sp_uid_t rsn;
- sp_rsn_context_t rsn_ctx;
- sp_svc_t *svc;
- iobuffer_read(pkt, IOBUF_T_PTR, &listener, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &epid, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &svc_id, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, NULL);
- iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, NULL);
- svc = listener->svc;
- rsn = sp_svc_new_runserial(svc);
- sp_rsn_context_init_original(rsn, SP_ORIGINAL_T_CALLBACK, &rsn_ctx);
- sp_svc_push_runserial_context(svc, &rsn_ctx);
- if (listener->subcribed) {
- if (listener->cb.on_message_raw) {
- int from_client_id;
- iobuffer_read(pkt, IOBUF_T_I4, &from_client_id, NULL);
- listener->cb.on_message_raw(listener, from_client_id, &pkt, listener->cb.user_data);
- } else if (listener->cb.on_message) {
- int from_client_id;
- int message_id;
- int message_sig;
- iobuffer_format_read(pkt, "444", &from_client_id, &message_id, &message_sig);
- listener->cb.on_message(listener, from_client_id, message_id, message_sig, iobuffer_data(pkt, 0), iobuffer_get_length(pkt), listener->cb.user_data);
- }
- }
- if (pkt)
- iobuffer_dec_ref(pkt);
- sp_bcm_listener_dec_ref(listener); //@
- sp_svc_pop_runserial_context(svc);
- }
- static int listener_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
- {
- sp_bcm_listener_t *listener = (sp_bcm_listener_t *)user_data;
- int cmd = SP_GET_TYPE(pkt_type);
- int from_ent_id = pkt_id;
- sp_uid_t uid;
- int read_state = iobuffer_get_read_state(*p_pkt);
- iobuffer_read(*p_pkt, IOBUF_T_I8, &uid, NULL);
- if (cmd == BCM_CMD_MSG && listener->subcribed && uid == listener->uid) {
- iobuffer_t *pkt = iobuffer_clone(*p_pkt);
- iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_type, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &epid, 0);
- iobuffer_write_head(pkt, IOBUF_T_PTR, &listener, 0);
- sp_bcm_listener_inc_ref(listener);//@
-
- if (threadpool_queue_workitem(sp_svc_get_threadpool(svc), listener->strand, &listener_on_pkt_threadpool, pkt) < 0) {
- sp_bcm_listener_dec_ref(listener);//@
- iobuffer_dec_ref(pkt);
- }
- return FALSE;
- } else {
- iobuffer_restore_read_state(*p_pkt, read_state);
- }
- return TRUE; // continue
- }
- int sp_bcm_listener_create(sp_svc_t *svc, int target_ent_id, const char *param, const sp_bcm_listener_cb *cb, sp_bcm_listener_t **p_listener)
- {
- sp_bcm_listener_t *listener;
- if (!cb->on_message && !cb->on_message_raw)
- return Error_Param;
- listener = MALLOC_T(sp_bcm_listener_t);
- listener->strand = strand_create();
- listener->target_ent_id = target_ent_id;
- listener->svc = svc;
- listener->param = _strdup(param);
- listener->uid = 0;
- memcpy(&listener->cb, cb, sizeof(sp_bcm_listener_cb));
- listener->subcribed = 0;
- REF_COUNT_INIT(&listener->ref_cnt);
- *p_listener = listener;
- return 0;
- }
- void sp_bcm_listener_destroy(sp_bcm_listener_t *listener)
- {
- strand_destroy(listener->strand);
- sp_bcm_listener_dec_ref(listener);
- }
- int sp_bcm_listener_subscribe(sp_bcm_listener_t *listener, sp_uid_t *uid)
- {
- int rc;
- iobuffer_t *pkt = NULL;
- if (listener->subcribed)
- return Error_Duplication;
- sp_svc_add_pkt_handler(listener->svc, (int)listener, SP_PKT_BCM, &listener_on_pkt, listener);
- listener->uid = sp_svc_new_runserial(listener->svc);
- pkt = iobuffer_create(-1, -1);
- iobuffer_write(pkt, IOBUF_T_I8, &listener->uid, 0);
- iobuffer_write(pkt, IOBUF_T_STR, listener->param, -1);
- rc = sp_svc_post(listener->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_BCM|BCM_CMD_SUBSCRIBE, listener->target_ent_id, &pkt);
- if (rc == 0) {
- listener->subcribed = 1;
- if (uid)
- *uid = listener->uid;
- }
- if (pkt)
- iobuffer_dec_ref(pkt);
- return rc;
- }
- int sp_bcm_listener_unsubscribe(sp_bcm_listener_t *listener)
- {
- int rc;
- if (listener->subcribed) {
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- iobuffer_write(pkt, IOBUF_T_I8, &listener->uid, 0);
- rc = sp_svc_post(listener->svc, SP_SHELL_MOD_ID, SP_SHELL_SVC_ID, SP_PKT_BCM|BCM_CMD_UNSUBSCRIBE, listener->target_ent_id, &pkt);
- if (rc == 0) {
- listener->subcribed = 0;
- sp_svc_remove_pkt_handler(listener->svc, (int)listener, SP_PKT_BCM);
- }
- if (pkt)
- iobuffer_dec_ref(pkt);
- } else {
- rc = Error_NotInit;
- }
- return rc;
- }
- void sp_bcm_listener_set_tag(sp_bcm_listener_t *listener, void *tag)
- {
- listener->tag = tag;
- }
- void* sp_bcm_listener_get_tag(sp_bcm_listener_t *listener)
- {
- return listener->tag;
- }
- void sp_bcm_listener_get_uid(sp_bcm_listener_t *listener, sp_uid_t *uid)
- {
- *uid = listener->uid;
- }
- int sp_bcm_listener_get_target_entity_id(sp_bcm_listener_t *listener)
- {
- return listener->target_ent_id;
- }
- static void __sp_bcm_listener_destroy(sp_bcm_listener_t *listener)
- {
- if (listener->cb.on_destroy) {
- listener->cb.on_destroy(listener, listener->cb.user_data);
- }
- free(listener);
- }
- IMPLEMENT_REF_COUNT_MT(sp_bcm_listener, sp_bcm_listener_t, ref_cnt, __sp_bcm_listener_destroy)
- typedef struct bcm_listener_entry {
- int epid;
- int svc_id;
- int target_ent_id;
- sp_uid_t uid;
- struct list_head src_entry;
- struct list_head dst_entry;
- char *param;
- int redirection_queried; // has been sent redirection query
- }bcm_listener_entry;
- struct sp_bcm_daemon_t
- {
- CRITICAL_SECTION lock;
- array_header_t *arr_src_list;
- array_header_t *arr_dst_list;
- sp_svc_t *svc;
- };
- static void daemon_lock(sp_bcm_daemon_t *daemon)
- {
- EnterCriticalSection(&daemon->lock);
- }
- static void daemon_unlock(sp_bcm_daemon_t *daemon)
- {
- LeaveCriticalSection(&daemon->lock);
- }
- static const char *_GetFileName(const char *pszFilePath)
- {
- int i=strlen(pszFilePath);
- for( ; i>0 && pszFilePath[i-1]!='\\'; i--)NULL;
- return pszFilePath+i;
- }
- static int daemon_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
- {
- sp_bcm_daemon_t *daemon = (sp_bcm_daemon_t *)user_data;
- int cmd = SP_GET_TYPE(pkt_type);
- if (cmd == BCM_CMD_MSG) {
- int from_ent_id = pkt_id;
- bcm_listener_entry *pos;
- struct list_head *lst = &ARRAY_IDX(daemon->arr_dst_list, from_ent_id, struct list_head);
- daemon_lock(daemon);
- list_for_each_entry(pos, lst, bcm_listener_entry, dst_entry) {
- iobuffer_t *copy_pkt = iobuffer_clone(*p_pkt);
- iobuffer_write_head(copy_pkt, IOBUF_T_I8, &pos->uid, 0);
- sp_svc_post(svc, pos->epid, pos->svc_id, pkt_type, pkt_id, ©_pkt);
- if (copy_pkt)
- iobuffer_dec_ref(copy_pkt);
- }
- daemon_unlock(daemon);
- } else if (cmd == BCM_CMD_SUBSCRIBE) {
- int target_ent_id = pkt_id;
- sp_entity_t *target_ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, target_ent_id);
- if (target_ent) {
- int found = 0;
- sp_uid_t uid;
- char *param = NULL;
- bcm_listener_entry *pos;
- struct list_head *src_lst = &ARRAY_IDX(daemon->arr_src_list, svc_id, struct list_head);
- struct list_head *dst_lst = &ARRAY_IDX(daemon->arr_dst_list, target_ent_id, struct list_head);
- iobuffer_format_read(*p_pkt, "8s", &uid, ¶m);
- daemon_lock(daemon);
- list_for_each_entry(pos, src_lst, bcm_listener_entry, src_entry) {
- if (uid == pos->uid) {
- found ++;
- break;
- }
- }
- if (found == 0) {
- bcm_listener_entry *e = MALLOC_T(bcm_listener_entry);
- e->redirection_queried = 0;
- e->epid = epid;
- e->svc_id = svc_id;
- e->target_ent_id = target_ent_id;
- e->uid = uid;
- e->param = _strdup(param);
- list_add_tail(&e->src_entry, src_lst);
- list_add_tail(&e->dst_entry, dst_lst);
- } else {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "duplicate BCM_CMD_SUBSCRIBE msg!");
- }
- daemon_unlock(daemon);
- if (found == 0 && (target_ent->state == EntityState_Idle || target_ent->state == EntityState_Busy)) {
- addStartupStep(target_ent->cfg->idx, "redirect_subscribe", "startup_end", 0, 0, 0);
- sp_mod_mgr_notify_redirect_subscribe(sp_get_env()->mod_mgr, target_ent, &uid, svc_id, param);
- }
- FREE(param);
- }
- } else if (cmd == BCM_CMD_UNSUBSCRIBE) {
- bcm_listener_entry *pos;
- int target_ent_id = pkt_id;
- sp_uid_t uid;
- struct list_head *lst = &ARRAY_IDX(daemon->arr_src_list, svc_id, struct list_head);
- iobuffer_read(*p_pkt, IOBUF_T_I8, &uid, NULL);
- daemon_lock(daemon);
- list_for_each_entry(pos, lst, bcm_listener_entry, src_entry) {
- if (pos->uid == uid) {
- list_del(&pos->src_entry);
- list_del(&pos->dst_entry);
- free(pos->param);
- free(pos);
- break;
- }
- }
- daemon_unlock(daemon);
- } else {
- DbgWithLinkForC(LOG_LEVEL_ERROR, LOG_TYPE_SYSTEM,"unknown pkt! %s:%d", _GetFileName(__FILE__), __LINE__);
- TOOLKIT_ASSERT(0);
- }
- return TRUE;
- }
- static void daemon_on_sys(sp_svc_t *svc,int epid, int state, void *user_data)
- {
- sp_bcm_daemon_t *daemon = (sp_bcm_daemon_t *)user_data;
- if (state == BUS_STATE_OFF) {
- sp_mod_t *mod = sp_mod_mgr_find_module_by_idx(sp_get_env()->mod_mgr, epid);
- sp_entity_t *pos;
- daemon_lock(daemon);
- list_for_each_entry(pos, &mod->entity_list, sp_entity_t, entry) {
- bcm_listener_entry *t, *n;
- struct list_head *lst = &ARRAY_IDX(daemon->arr_src_list, pos->cfg->idx, struct list_head);
- list_for_each_entry_safe(t, n, lst, bcm_listener_entry, src_entry) {
- list_del(&t->src_entry);
- list_del(&t->dst_entry);
- free(t->param);
- free(t);
- }
- }
- daemon_unlock(daemon);
- }
- }
- int sp_bcm_deamon_refresh(sp_bcm_daemon_t* daemon)
- {
- int nelts = sp_mod_mgr_get_entity_array_nelts(sp_get_env()->mod_mgr);
- int i;
- DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "entity array nelts:%d", nelts);
- /*
- daemon_lock(daemon);
- int oldNelts = daemon->arr_dst_list->nelts;
- //array_reserve(daemon->arr_dst_list, nelts);
- //array_reserve(daemon->arr_src_list, nelts);
- //array_header_t* old_dst = daemon->arr_dst_list;
- //array_header_t* old_src = daemon->arr_src_list;
- //daemon->arr_dst_list = array_make(nelts, sizeof(struct list_head));
- //daemon->arr_src_list = array_make(nelts, sizeof(struct list_head));
- /*
- for (i = 0; i < oldNelts; ++i)
- {
- ARRAY_IDX(daemon->arr_dst_list, i, struct list_head*) = ARRAY_IDX(old_dst, i, struct list_head*);
- ARRAY_IDX(daemon->arr_src_list, i, struct list_head*) = ARRAY_IDX(old_src, i, struct list_head*);
- }
-
- for (i = 0; i < nelts - oldNelts; ++i) {
- INIT_LIST_HEAD((struct list_head*)array_push(daemon->arr_src_list));
- INIT_LIST_HEAD((struct list_head*)array_push(daemon->arr_dst_list));
- }
-
- daemon_unlock(daemon);
- */
- return 0;
- }
- int sp_bcm_daemon_create(sp_svc_t *svc, sp_bcm_daemon_t **p_daemon)
- {
- sp_bcm_daemon_t *daemon = ZALLOC_T(sp_bcm_daemon_t);
- int i;
- int nelts = 100;// sp_mod_mgr_get_entity_array_nelts(sp_get_env()->mod_mgr);
- DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "entity array nelts:%d", nelts);
- daemon->svc = svc;
- daemon->arr_dst_list = array_make(nelts, sizeof(struct list_head));
- daemon->arr_src_list = array_make(nelts, sizeof(struct list_head));
- for (i = 0; i < nelts; ++i) {
- INIT_LIST_HEAD((struct list_head*)array_push(daemon->arr_src_list));
- INIT_LIST_HEAD((struct list_head*)array_push(daemon->arr_dst_list));
- }
- InitializeCriticalSection(&daemon->lock);
- DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "init daemon lock ok!");
- sp_svc_add_pkt_handler(svc, (int)daemon, SP_PKT_BCM, &daemon_on_pkt, daemon);
- sp_svc_add_sys_handler(svc, (int)daemon, &daemon_on_sys, daemon);
- *p_daemon = daemon;
- return 0;
- }
- void sp_bcm_daemon_destroy(sp_bcm_daemon_t *daemon)
- {
- int i;
- for (i = 0; i < daemon->arr_src_list->nelts; ++i) {
- bcm_listener_entry *pos, *n;
- struct list_head *lst = &ARRAY_IDX(daemon->arr_src_list, i, struct list_head);
- list_for_each_entry_safe(pos, n, lst, bcm_listener_entry, src_entry) {
- list_del(&pos->src_entry);
- list_del(&pos->dst_entry);
- free(pos->param);
- free(pos);
- }
- }
- array_free(daemon->arr_src_list);
- array_free(daemon->arr_dst_list);
- DeleteCriticalSection(&daemon->lock);
- sp_svc_remove_pkt_handler(daemon->svc, (int)daemon, SP_PKT_BCM);
- sp_svc_remove_sys_handler(daemon->svc, (int)daemon);
- free(daemon);
- }
- array_header_t *sp_bcm_daemon_get_receiver(sp_bcm_daemon_t *daemon, int entity_id)
- {
- struct list_head *lst = &ARRAY_IDX(daemon->arr_dst_list, entity_id, struct list_head);
- array_header_t *arr = array_make(0, sizeof(sp_bcm_receiver_t*));
- bcm_listener_entry *pos;
- daemon_lock(daemon);
- list_for_each_entry(pos, lst, bcm_listener_entry, dst_entry) {
- sp_bcm_receiver_t *rcv = MALLOC_T(sp_bcm_receiver_t);
- rcv->id = pos->uid;
- rcv->receiver_id = pos->svc_id;
- rcv->param = _strdup(pos->param);
- ARRAY_PUSH(arr, sp_bcm_receiver_t*) = rcv;
- }
- daemon_unlock(daemon);
- return arr;
- }
- void sp_bcm_daemon_free_receiver_array(array_header_t *arr_receiver)
- {
- int i;
- for (i = 0; i < arr_receiver->nelts; ++i) {
- sp_bcm_receiver_t *rcv = ARRAY_IDX(arr_receiver, i, sp_bcm_receiver_t*);
- free(rcv->param);
- free(rcv);
- }
- array_free(arr_receiver);
- }
- int sp_bcm_daemon_process_redirect_subscribe(sp_bcm_daemon_t *daemon, int target_id, const sp_uid_t *uid, int new_target_id)
- {
- sp_entity_t *target_ent = NULL;
- bcm_listener_entry *pos = NULL;
- struct list_head *list1 = &ARRAY_IDX(daemon->arr_dst_list, target_id, struct list_head);
- struct list_head *list2 = &ARRAY_IDX(daemon->arr_dst_list, new_target_id, struct list_head);
- daemon_lock(daemon);
- list_for_each_entry(pos, list1, bcm_listener_entry, dst_entry)
- {
- if (pos->uid == *uid)
- {
- //if (pos->redirection_queried == 0) // xkm@20160406: 支持多次重定向
- {
- pos->redirection_queried = 1;
- if (target_id != new_target_id)
- {
- pos->target_ent_id = new_target_id;
- list_move_tail(&pos->dst_entry, list2);
- }
- }
- break;
- }
- }
- daemon_unlock(daemon);
- // xkm@20160406: 每次重定向后都触发OnBroadcastSubscribe事件
- target_ent = sp_mod_mgr_find_entity_by_idx(sp_get_env()->mod_mgr, new_target_id);
- if (pos != NULL && (target_ent->state == EntityState_Idle || target_ent->state == EntityState_Busy))
- {
- DbgWithLinkForC(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM, "notify_redirect_subscribe! target:%s", target_ent->cfg->name);
- sp_mod_mgr_notify_redirect_subscribe(sp_get_env()->mod_mgr, target_ent, (sp_uid_t *) uid, pos->svc_id, pos->param);
- }
- return 0;
- }
|