123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- #include "precompile.h"
- #include "sp_def.h"
- #include "sp_svc.h"
- #include "sp_rpc.h"
- #include "sp_dbg_export.h"
- #include "list.h"
- #include "memutil.h"
- #include "spinlock.h"
- #include "refcnt.h"
- #include "sp_logwithlinkforc.h"
- #include <winpr/synch.h>
- #define TAG SPBASE_TAG("sp_rpc")
- #define BUCKET_SIZE 127
- /*
- Create +------------+ SENT +------------+ ANS +------------+ Destroy +------------+
- ------> | INIT | ----> | SENT | ----->| CALLED | -------->| TERM |
- +------------+ +------------+ +------------+ +------------+
- */
- #define STATE_INIT 0
- #define STATE_SENT 1
- #define STATE_CALLED 2
- #define STATE_TERM 3
- #define STATE_ERROR 4
- #define RPC_CMD_INFO 0
- #define RPC_CMD_REQ 1
- #define RPC_CMD_ANS 2
- struct sp_rpc_server_t
- {
- int stop;
- sp_rpc_server_callback cb;
- sp_svc_t *svc;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_rpc_server, sp_rpc_server_t)
- static void __threadpool_server_on_pkt(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
- {
- sp_rpc_server_t *server = (sp_rpc_server_t *)arg;
- iobuffer_t *pkt = (iobuffer_t*)param1;
- int epid;
- int svc_id;
- int pkt_type;
- int pkt_id;
- int cmd_type;
- iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, 0);
- cmd_type = SP_GET_TYPE(pkt_type);
- if (cmd_type == RPC_CMD_INFO) {
- server->cb.on_info(server, epid, svc_id, pkt_id, &pkt, server->cb.user_data);
- } else if (cmd_type == RPC_CMD_REQ) {
- int call_type;
- iobuffer_read(pkt, IOBUF_T_I4, &call_type, NULL);
- server->cb.on_req(server, epid, svc_id, pkt_id, call_type, &pkt, server->cb.user_data);
- } else {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "RPC CMD unknown types!");
- }
- sp_rpc_server_dec_ref(server); // @
- if (pkt)
- iobuffer_dec_ref(pkt);
- }
- static int server_on_pkt(sp_svc_t *svc, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
- {
- sp_rpc_server_t *server = (sp_rpc_server_t*)user_data;
- int rc;
- iobuffer_t *pkt;
- pkt = *p_pkt;
- *p_pkt = NULL;
- iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_type, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &epid, 0);
- sp_rpc_server_inc_ref(server); // @
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(svc), NULL, &__threadpool_server_on_pkt, server, (param_size_t)pkt, 0);
- if (rc != 0) {
- sp_rpc_server_dec_ref(server); // @
- iobuffer_dec_ref(pkt);
- }
- return FALSE;
- }
- int sp_rpc_server_create(sp_svc_t *svc, sp_rpc_server_callback *cb, sp_rpc_server_t **p_server)
- {
- sp_rpc_server_t *server = MALLOC_T(sp_rpc_server_t);
- server->stop = 0;
- memcpy(&server->cb, cb, sizeof(sp_rpc_server_callback));
- server->svc = svc;
- REF_COUNT_INIT(&server->ref_cnt);
- *p_server = server;
- return 0;
- }
- void sp_rpc_server_destroy(sp_rpc_server_t *server)
- {
- sp_rpc_server_dec_ref(server);
- }
- int sp_rpc_server_start(sp_rpc_server_t *server)
- {
- server->stop = 0;
- return sp_svc_add_pkt_handler(server->svc, (int)server, SP_PKT_RPC, &server_on_pkt, server);
- }
- int sp_rpc_server_stop(sp_rpc_server_t *server)
- {
- // BugFix [4/5/2020 11:55 Gifur]
- if (/*!*/server->stop)
- return Error_Bug;
- server->stop = 1;
- return sp_svc_remove_pkt_handler(server->svc, (int)server, SP_PKT_RPC);
- }
- sp_svc_t *sp_rpc_server_get_svc(sp_rpc_server_t *server)
- {
- return server->svc;
- }
- int sp_rpc_server_send_answer(sp_rpc_server_t *server, int epid, int svc_id, int rpc_id, iobuffer_t **ans_pkt)
- {
- return sp_svc_post(server->svc, epid, svc_id, SP_PKT_RPC | RPC_CMD_ANS, rpc_id, ans_pkt);
- }
- static void __sp_rpc_destroy(sp_rpc_server_t *server)
- {
- if (server->cb.on_destroy) {
- (*server->cb.on_destroy)(server, server->cb.user_data);
- }
- free(server);
- }
- IMPLEMENT_REF_COUNT_MT(sp_rpc_server, sp_rpc_server_t, ref_cnt, __sp_rpc_destroy)
- struct sp_rpc_client_t
- {
- struct hlist_node hentry; // element of sp_rpc_client_mgr_t->rpc_buckets[index]
- int state;
- int remote_epid;
- int remote_svc_id;
- unsigned int rpc_id;
- int call_type;
- spinlock_t lock;
- sp_rpc_client_callback cb;
- sp_rpc_client_mgr_t *mgr;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_rpc_client, sp_rpc_client_t)
- struct sp_rpc_client_mgr_t
- {
- struct hlist_head rpc_buckets[BUCKET_SIZE]; // list of sp_rpc_client_t
- sp_svc_t *svc;
- int rpc_cnt;
- int stop;
- int local_seq;
- sp_rpc_client_mgr_callback cb;
- CRITICAL_SECTION lock;
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- };
- DECLARE_REF_COUNT_STATIC(sp_rpc_client_mgr, sp_rpc_client_mgr_t)
- static __inline void mgr_lock(sp_rpc_client_mgr_t *mgr)
- {
- EnterCriticalSection(&mgr->lock);
- }
- static __inline void mgr_unlock(sp_rpc_client_mgr_t *mgr)
- {
- LeaveCriticalSection(&mgr->lock);
- }
- static __inline void client_lock(sp_rpc_client_t *client)
- {
- spinlock_enter(&client->lock, -1);
- }
- static __inline void client_unlock(sp_rpc_client_t *client)
- {
- spinlock_leave(&client->lock);
- }
- static void client_set_error(sp_rpc_client_t *client, int error);
- static void client_process_ans(sp_rpc_client_t *client, iobuffer_t **ans_pkt);
- static void __threadpool_mgr_on_req(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2)
- {
- sp_rpc_client_mgr_t *mgr = (sp_rpc_client_mgr_t *)arg;
- iobuffer_t *pkt = (iobuffer_t*)param1;
- int epid;
- int svc_id;
- int pkt_type;
- int pkt_id;
- int cmd_type;
- iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, 0);
- cmd_type = SP_GET_TYPE(pkt_type);
- if (cmd_type == RPC_CMD_REQ && mgr->cb.on_req)
- {
- int call_type;
- iobuffer_read(pkt, IOBUF_T_I4, &call_type, NULL);
- mgr->cb.on_req(mgr, epid, svc_id, pkt_id, call_type, &pkt, mgr->cb.user_data);
- }
- else
- {
- DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "RPC CMD unknown types!");
- }
- sp_rpc_client_mgr_dec_ref(mgr); // @
-
- if (pkt)
- iobuffer_dec_ref(pkt);
- }
- static int mgr_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
- {
- sp_rpc_client_mgr_t *mgr = (sp_rpc_client_mgr_t*)user_data;
- WLog_DBG(TAG, "sp_rpc::mgr_on_pkt: epid:%d, svc_id: %d, pkt_type:0x%08X, pkt_id: %d, rpc:%d", epid, svc_id, pkt_type, pkt_id, SP_GET_TYPE(pkt_type));
- if (SP_GET_TYPE(pkt_type) == RPC_CMD_ANS) {
- int rpc_id = pkt_id;
- int slot = ((unsigned int)rpc_id) % BUCKET_SIZE;
- sp_rpc_client_t *tpos;
- struct hlist_node *pos, *n;
- mgr_lock(mgr);
- hlist_for_each_entry_safe(tpos, pos, n, &mgr->rpc_buckets[slot], sp_rpc_client_t, hentry) {
- if (tpos->rpc_id == rpc_id) {
- client_process_ans(tpos, p_pkt);
- break;
- }
- }
- mgr_unlock(mgr);
- return FALSE;
- }
- else if (SP_GET_TYPE(pkt_type) == RPC_CMD_REQ)
- {
- int rc;
- iobuffer_t *pkt = *p_pkt;
- *p_pkt = NULL;
- iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_type, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0);
- iobuffer_write_head(pkt, IOBUF_T_I4, &epid, 0);
-
- sp_rpc_client_mgr_inc_ref(mgr);
- rc = threadpool_queue_workitem2(sp_svc_get_threadpool(svc), NULL, &__threadpool_mgr_on_req, mgr, (param_size_t)pkt, 0);
- if (rc != 0) {
- sp_rpc_client_mgr_dec_ref(mgr); // @
- iobuffer_dec_ref(pkt);
- }
- }
- return TRUE;
- }
- static void mgr_on_sys(sp_svc_t *svc,int epid, int state, void *user_data)
- {
- sp_rpc_client_mgr_t *mgr = (sp_rpc_client_mgr_t*)user_data;
- if (state == BUS_STATE_OFF) {
- int i;
- sp_rpc_client_t *tpos;
- struct hlist_node *pos, *n;
- mgr_lock(mgr);
- for (i = 0; i < BUCKET_SIZE; ++i) {
- hlist_for_each_entry_safe(tpos, pos, n, &mgr->rpc_buckets[i], sp_rpc_client_t, hentry) {
- if (tpos->remote_epid == epid) {
- client_set_error(tpos, Error_NetBroken);
- }
- }
- }
- mgr_unlock(mgr);
- }
- }
- int sp_rpc_client_mgr_create(sp_svc_t *svc, sp_rpc_client_mgr_callback *cb, sp_rpc_client_mgr_t **p_mgr)
- {
- int i;
- sp_rpc_client_mgr_t *mgr = MALLOC_T(sp_rpc_client_mgr_t);
- mgr->local_seq = 0;
- mgr->rpc_cnt = 0;
- mgr->stop = 0;
- mgr->svc = svc;
- memcpy(&mgr->cb, cb, sizeof(sp_rpc_client_mgr_callback));
- for (i = 0;i < BUCKET_SIZE; ++i) {
- INIT_HLIST_HEAD(&mgr->rpc_buckets[i]);
- }
- InitializeCriticalSection(&mgr->lock);
- REF_COUNT_INIT(&mgr->ref_cnt);
-
- *p_mgr = mgr;
- return 0;
- }
- // {bug} not delete rpc_buckets arrary
- void sp_rpc_client_mgr_destroy(sp_rpc_client_mgr_t *mgr)
- {
- sp_rpc_client_mgr_dec_ref(mgr);
- }
- int sp_rpc_client_mgr_start(sp_rpc_client_mgr_t *mgr)
- {
- mgr->stop = 0;
- sp_svc_add_pkt_handler(mgr->svc, (int)mgr, SP_PKT_RPC, &mgr_on_pkt, mgr);
- sp_svc_add_sys_handler(mgr->svc, (int)mgr, &mgr_on_sys, mgr);
- return 0;
- }
- int sp_rpc_client_mgr_stop(sp_rpc_client_mgr_t *mgr)
- {
- sp_svc_remove_pkt_handler(mgr->svc, (int)mgr, SP_PKT_RPC);
- sp_svc_remove_sys_handler(mgr->svc, (int)mgr);
- return 0;
- }
- sp_svc_t *sp_rpc_client_mgr_get_svc(sp_rpc_client_mgr_t *mgr)
- {
- return mgr->svc;
- }
- int sp_rpc_client_mgr_cancel_all(sp_rpc_client_mgr_t *mgr)
- {
- int i;
- mgr_lock(mgr);
- for (i = 0; i < BUCKET_SIZE; ++i) {
- sp_rpc_client_t *tpos;
- struct hlist_node *pos;
- hlist_for_each_entry(tpos, pos, &mgr->rpc_buckets[i], sp_rpc_client_t, hentry) {
- client_set_error(tpos, Error_Cancel);
- }
- }
- mgr_unlock(mgr);
- return 0;
- }
- int sp_rpc_client_mgr_get_client_cnt(sp_rpc_client_mgr_t *mgr)
- {
- return mgr->rpc_cnt;
- }
- int sp_rpc_client_mgr_one_way_call(sp_rpc_client_mgr_t *mgr, int epid, int svc_id, int call_type, iobuffer_t **info_pkt)
- {
- return sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_RPC| RPC_CMD_INFO, call_type, info_pkt);
- }
- int sp_rpc_client_mgr_send_answer(sp_rpc_client_mgr_t *mgr, int epid, int svc_id, int rpc_id, iobuffer_t **ans_pkt)
- {
- return sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_RPC | RPC_CMD_ANS, rpc_id, ans_pkt);
- }
- static void __sp_rpc_client_mgr_destroy(sp_rpc_client_mgr_t *mgr)
- {
- if (mgr->cb.on_destroy)
- mgr->cb.on_destroy(mgr, mgr->cb.user_data);
- DeleteCriticalSection(&mgr->lock);
- free(mgr);
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_rpc_client_mgr, sp_rpc_client_mgr_t, ref_cnt, __sp_rpc_client_mgr_destroy)
- int sp_rpc_client_create(sp_rpc_client_mgr_t *mgr, int epid, int svc_id, int call_type, sp_rpc_client_callback *cb, sp_rpc_client_t **p_client)
- {
- sp_rpc_client_t *client = MALLOC_T(sp_rpc_client_t);
- client->mgr = mgr;
- client->remote_epid = epid;
- client->remote_svc_id = svc_id;
- client->call_type = call_type;
- memcpy(&client->cb, cb, sizeof(sp_rpc_client_callback));
- client->rpc_id = (int)InterlockedIncrement((LONG*)&mgr->local_seq);
- spinlock_init(&client->lock);
- client->state = STATE_INIT;
- REF_COUNT_INIT(&client->ref_cnt);
- sp_rpc_client_mgr_inc_ref(mgr);
- sp_rpc_client_inc_ref(client);
- mgr_lock(mgr);
- hlist_add_head(&client->hentry, &mgr->rpc_buckets[client->rpc_id % BUCKET_SIZE]);
- client->mgr->rpc_cnt++;
- mgr_unlock(mgr);
- *p_client = client;
- return 0;
- }
- int sp_rpc_client_close(sp_rpc_client_t *client)
- {
- int rc;
- client_lock(client);
- if (client->state != STATE_TERM && client->state != STATE_ERROR) {
- client->state = STATE_ERROR;
- rc = 0;
- } else {
- rc = Error_Duplication;
- }
- client_unlock(client);
- return rc;
- }
- void sp_rpc_client_destroy(sp_rpc_client_t *client)
- {
- mgr_lock(client->mgr);
- client->mgr->rpc_cnt --;
- hlist_del(&client->hentry);
- mgr_unlock(client->mgr);
- sp_rpc_client_dec_ref(client);
- client_lock(client);
- client->state = STATE_TERM;
- client_unlock(client);
- sp_rpc_client_dec_ref(client);
- }
- int sp_rpc_client_async_call(sp_rpc_client_t *client, iobuffer_t **req_pkt)
- {
- sp_rpc_client_mgr_t *mgr = client->mgr;
- int rc = 0;
- if (client->state != STATE_INIT)
- return Error_Bug;
- client_lock(client);
- if (client->state == STATE_INIT) {
- client->state = STATE_SENT;
- sp_rpc_client_inc_ref(client); // @
- iobuffer_write_head(*req_pkt, IOBUF_T_I4, &client->call_type, 0);
- rc = sp_svc_post(mgr->svc, client->remote_epid, client->remote_svc_id, SP_PKT_RPC|RPC_CMD_REQ, client->rpc_id, req_pkt);
- if (rc != 0) {
- sp_rpc_client_dec_ref(client); // @
- client->state = STATE_ERROR;
- }
- } else {
- rc = Error_NetBroken;
- }
- client_unlock(client);
- return rc;
- }
- int sp_rpc_client_get_rpc_id(sp_rpc_client_t *client)
- {
- return client->rpc_id;
- }
- int sp_rpc_client_get_remote_epid(sp_rpc_client_t *client)
- {
- return client->remote_epid;
- }
- int sp_rpc_client_get_remote_svc_id(sp_rpc_client_t *client)
- {
- return client->remote_svc_id;
- }
- static void client_set_error(sp_rpc_client_t *client, int error)
- {
- if (client->state != STATE_ERROR && client->state != STATE_TERM) {
- client_lock(client);
- if (client->state != STATE_ERROR && client->state != STATE_TERM) {
- if (client->state == STATE_SENT) {
- if (client->cb.on_ans) {
- client->cb.on_ans(client, error, NULL, client->cb.user_data);
- }
- } else {
- client->state = STATE_ERROR;
- }
- }
- client_unlock(client);
- }
- sp_rpc_client_dec_ref(client); // @
- }
- static void client_process_ans(sp_rpc_client_t *client, iobuffer_t **ans_pkt)
- {
- if (client->state == STATE_SENT) {
- client_lock(client);
- if (client->state == STATE_SENT) {
- client->state = STATE_CALLED;
- if (client->cb.on_ans) {
- client->cb.on_ans(client, 0, ans_pkt, client->cb.user_data);
- }
- }
- client_unlock(client);
- }
- sp_rpc_client_dec_ref(client); // @
- }
- static void __client_destroy(sp_rpc_client_t *client)
- {
- if (client->cb.on_destroy)
- client->cb.on_destroy(client, client->cb.user_data);
- sp_rpc_client_mgr_dec_ref(client->mgr);
- free(client);
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(sp_rpc_client, sp_rpc_client_t, ref_cnt, __client_destroy)
|