#include "precompile.h" #include "sp_def.h" #include "sp_svc.h" #include "sp_rpc.h" #include "sp_dbg_export.h" #include "list.h" #include "memutil.h" #include "spinlock.h" #include "refcnt.h" #include "sp_logwithlinkforc.h" #include #define TAG SPBASE_TAG("sp_rpc") #define BUCKET_SIZE 127 /* Create +------------+ SENT +------------+ ANS +------------+ Destroy +------------+ ------> | INIT | ----> | SENT | ----->| CALLED | -------->| TERM | +------------+ +------------+ +------------+ +------------+ */ #define STATE_INIT 0 #define STATE_SENT 1 #define STATE_CALLED 2 #define STATE_TERM 3 #define STATE_ERROR 4 #define RPC_CMD_INFO 0 #define RPC_CMD_REQ 1 #define RPC_CMD_ANS 2 struct sp_rpc_server_t { int stop; sp_rpc_server_callback cb; sp_svc_t *svc; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sp_rpc_server, sp_rpc_server_t) static void __threadpool_server_on_pkt(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2) { sp_rpc_server_t *server = (sp_rpc_server_t *)arg; iobuffer_t *pkt = (iobuffer_t*)param1; int epid; int svc_id; int pkt_type; int pkt_id; int cmd_type; iobuffer_read(pkt, IOBUF_T_I4, &epid, 0); iobuffer_read(pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, 0); cmd_type = SP_GET_TYPE(pkt_type); if (cmd_type == RPC_CMD_INFO) { server->cb.on_info(server, epid, svc_id, pkt_id, &pkt, server->cb.user_data); } else if (cmd_type == RPC_CMD_REQ) { int call_type; iobuffer_read(pkt, IOBUF_T_I4, &call_type, NULL); server->cb.on_req(server, epid, svc_id, pkt_id, call_type, &pkt, server->cb.user_data); } else { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "RPC CMD unknown types!"); } sp_rpc_server_dec_ref(server); // @ if (pkt) iobuffer_dec_ref(pkt); } static int server_on_pkt(sp_svc_t *svc, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data) { sp_rpc_server_t *server = (sp_rpc_server_t*)user_data; int rc; iobuffer_t *pkt; pkt = *p_pkt; *p_pkt = NULL; iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_type, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &epid, 0); sp_rpc_server_inc_ref(server); // @ rc = threadpool_queue_workitem2(sp_svc_get_threadpool(svc), NULL, &__threadpool_server_on_pkt, server, (param_size_t)pkt, 0); if (rc != 0) { sp_rpc_server_dec_ref(server); // @ iobuffer_dec_ref(pkt); } return FALSE; } int sp_rpc_server_create(sp_svc_t *svc, sp_rpc_server_callback *cb, sp_rpc_server_t **p_server) { sp_rpc_server_t *server = MALLOC_T(sp_rpc_server_t); server->stop = 0; memcpy(&server->cb, cb, sizeof(sp_rpc_server_callback)); server->svc = svc; REF_COUNT_INIT(&server->ref_cnt); *p_server = server; return 0; } void sp_rpc_server_destroy(sp_rpc_server_t *server) { sp_rpc_server_dec_ref(server); } int sp_rpc_server_start(sp_rpc_server_t *server) { server->stop = 0; return sp_svc_add_pkt_handler(server->svc, (int)server, SP_PKT_RPC, &server_on_pkt, server); } int sp_rpc_server_stop(sp_rpc_server_t *server) { // BugFix [4/5/2020 11:55 Gifur] if (/*!*/server->stop) return Error_Bug; server->stop = 1; return sp_svc_remove_pkt_handler(server->svc, (int)server, SP_PKT_RPC); } sp_svc_t *sp_rpc_server_get_svc(sp_rpc_server_t *server) { return server->svc; } int sp_rpc_server_send_answer(sp_rpc_server_t *server, int epid, int svc_id, int rpc_id, iobuffer_t **ans_pkt) { return sp_svc_post(server->svc, epid, svc_id, SP_PKT_RPC | RPC_CMD_ANS, rpc_id, ans_pkt); } static void __sp_rpc_destroy(sp_rpc_server_t *server) { if (server->cb.on_destroy) { (*server->cb.on_destroy)(server, server->cb.user_data); } free(server); } IMPLEMENT_REF_COUNT_MT(sp_rpc_server, sp_rpc_server_t, ref_cnt, __sp_rpc_destroy) struct sp_rpc_client_t { struct hlist_node hentry; // element of sp_rpc_client_mgr_t->rpc_buckets[index] int state; int remote_epid; int remote_svc_id; unsigned int rpc_id; int call_type; spinlock_t lock; sp_rpc_client_callback cb; sp_rpc_client_mgr_t *mgr; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sp_rpc_client, sp_rpc_client_t) struct sp_rpc_client_mgr_t { struct hlist_head rpc_buckets[BUCKET_SIZE]; // list of sp_rpc_client_t sp_svc_t *svc; int rpc_cnt; int stop; int local_seq; sp_rpc_client_mgr_callback cb; CRITICAL_SECTION lock; DECLARE_REF_COUNT_MEMBER(ref_cnt); }; DECLARE_REF_COUNT_STATIC(sp_rpc_client_mgr, sp_rpc_client_mgr_t) static __inline void mgr_lock(sp_rpc_client_mgr_t *mgr) { EnterCriticalSection(&mgr->lock); } static __inline void mgr_unlock(sp_rpc_client_mgr_t *mgr) { LeaveCriticalSection(&mgr->lock); } static __inline void client_lock(sp_rpc_client_t *client) { spinlock_enter(&client->lock, -1); } static __inline void client_unlock(sp_rpc_client_t *client) { spinlock_leave(&client->lock); } static void client_set_error(sp_rpc_client_t *client, int error); static void client_process_ans(sp_rpc_client_t *client, iobuffer_t **ans_pkt); static void __threadpool_mgr_on_req(threadpool_t *threadpool, void *arg, param_size_t param1, param_size_t param2) { sp_rpc_client_mgr_t *mgr = (sp_rpc_client_mgr_t *)arg; iobuffer_t *pkt = (iobuffer_t*)param1; int epid; int svc_id; int pkt_type; int pkt_id; int cmd_type; iobuffer_read(pkt, IOBUF_T_I4, &epid, 0); iobuffer_read(pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, 0); cmd_type = SP_GET_TYPE(pkt_type); if (cmd_type == RPC_CMD_REQ && mgr->cb.on_req) { int call_type; iobuffer_read(pkt, IOBUF_T_I4, &call_type, NULL); mgr->cb.on_req(mgr, epid, svc_id, pkt_id, call_type, &pkt, mgr->cb.user_data); } else { DbgWithLinkForC(LOG_LEVEL_WARN, LOG_TYPE_SYSTEM, "RPC CMD unknown types!"); } sp_rpc_client_mgr_dec_ref(mgr); // @ if (pkt) iobuffer_dec_ref(pkt); } static int mgr_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data) { sp_rpc_client_mgr_t *mgr = (sp_rpc_client_mgr_t*)user_data; WLog_DBG(TAG, "sp_rpc::mgr_on_pkt: epid:%d, svc_id: %d, pkt_type:0x%08X, pkt_id: %d, rpc:%d", epid, svc_id, pkt_type, pkt_id, SP_GET_TYPE(pkt_type)); if (SP_GET_TYPE(pkt_type) == RPC_CMD_ANS) { int rpc_id = pkt_id; int slot = ((unsigned int)rpc_id) % BUCKET_SIZE; sp_rpc_client_t *tpos; struct hlist_node *pos, *n; mgr_lock(mgr); hlist_for_each_entry_safe(tpos, pos, n, &mgr->rpc_buckets[slot], sp_rpc_client_t, hentry) { if (tpos->rpc_id == rpc_id) { client_process_ans(tpos, p_pkt); break; } } mgr_unlock(mgr); return FALSE; } else if (SP_GET_TYPE(pkt_type) == RPC_CMD_REQ) { int rc; iobuffer_t *pkt = *p_pkt; *p_pkt = NULL; iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_type, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0); iobuffer_write_head(pkt, IOBUF_T_I4, &epid, 0); sp_rpc_client_mgr_inc_ref(mgr); rc = threadpool_queue_workitem2(sp_svc_get_threadpool(svc), NULL, &__threadpool_mgr_on_req, mgr, (param_size_t)pkt, 0); if (rc != 0) { sp_rpc_client_mgr_dec_ref(mgr); // @ iobuffer_dec_ref(pkt); } } return TRUE; } static void mgr_on_sys(sp_svc_t *svc,int epid, int state, void *user_data) { sp_rpc_client_mgr_t *mgr = (sp_rpc_client_mgr_t*)user_data; if (state == BUS_STATE_OFF) { int i; sp_rpc_client_t *tpos; struct hlist_node *pos, *n; mgr_lock(mgr); for (i = 0; i < BUCKET_SIZE; ++i) { hlist_for_each_entry_safe(tpos, pos, n, &mgr->rpc_buckets[i], sp_rpc_client_t, hentry) { if (tpos->remote_epid == epid) { client_set_error(tpos, Error_NetBroken); } } } mgr_unlock(mgr); } } int sp_rpc_client_mgr_create(sp_svc_t *svc, sp_rpc_client_mgr_callback *cb, sp_rpc_client_mgr_t **p_mgr) { int i; sp_rpc_client_mgr_t *mgr = MALLOC_T(sp_rpc_client_mgr_t); mgr->local_seq = 0; mgr->rpc_cnt = 0; mgr->stop = 0; mgr->svc = svc; memcpy(&mgr->cb, cb, sizeof(sp_rpc_client_mgr_callback)); for (i = 0;i < BUCKET_SIZE; ++i) { INIT_HLIST_HEAD(&mgr->rpc_buckets[i]); } InitializeCriticalSection(&mgr->lock); REF_COUNT_INIT(&mgr->ref_cnt); *p_mgr = mgr; return 0; } // {bug} not delete rpc_buckets arrary void sp_rpc_client_mgr_destroy(sp_rpc_client_mgr_t *mgr) { sp_rpc_client_mgr_dec_ref(mgr); } int sp_rpc_client_mgr_start(sp_rpc_client_mgr_t *mgr) { mgr->stop = 0; sp_svc_add_pkt_handler(mgr->svc, (int)mgr, SP_PKT_RPC, &mgr_on_pkt, mgr); sp_svc_add_sys_handler(mgr->svc, (int)mgr, &mgr_on_sys, mgr); return 0; } int sp_rpc_client_mgr_stop(sp_rpc_client_mgr_t *mgr) { sp_svc_remove_pkt_handler(mgr->svc, (int)mgr, SP_PKT_RPC); sp_svc_remove_sys_handler(mgr->svc, (int)mgr); return 0; } sp_svc_t *sp_rpc_client_mgr_get_svc(sp_rpc_client_mgr_t *mgr) { return mgr->svc; } int sp_rpc_client_mgr_cancel_all(sp_rpc_client_mgr_t *mgr) { int i; mgr_lock(mgr); for (i = 0; i < BUCKET_SIZE; ++i) { sp_rpc_client_t *tpos; struct hlist_node *pos; hlist_for_each_entry(tpos, pos, &mgr->rpc_buckets[i], sp_rpc_client_t, hentry) { client_set_error(tpos, Error_Cancel); } } mgr_unlock(mgr); return 0; } int sp_rpc_client_mgr_get_client_cnt(sp_rpc_client_mgr_t *mgr) { return mgr->rpc_cnt; } int sp_rpc_client_mgr_one_way_call(sp_rpc_client_mgr_t *mgr, int epid, int svc_id, int call_type, iobuffer_t **info_pkt) { return sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_RPC| RPC_CMD_INFO, call_type, info_pkt); } int sp_rpc_client_mgr_send_answer(sp_rpc_client_mgr_t *mgr, int epid, int svc_id, int rpc_id, iobuffer_t **ans_pkt) { return sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_RPC | RPC_CMD_ANS, rpc_id, ans_pkt); } static void __sp_rpc_client_mgr_destroy(sp_rpc_client_mgr_t *mgr) { if (mgr->cb.on_destroy) mgr->cb.on_destroy(mgr, mgr->cb.user_data); DeleteCriticalSection(&mgr->lock); free(mgr); } IMPLEMENT_REF_COUNT_MT_STATIC(sp_rpc_client_mgr, sp_rpc_client_mgr_t, ref_cnt, __sp_rpc_client_mgr_destroy) int sp_rpc_client_create(sp_rpc_client_mgr_t *mgr, int epid, int svc_id, int call_type, sp_rpc_client_callback *cb, sp_rpc_client_t **p_client) { sp_rpc_client_t *client = MALLOC_T(sp_rpc_client_t); client->mgr = mgr; client->remote_epid = epid; client->remote_svc_id = svc_id; client->call_type = call_type; memcpy(&client->cb, cb, sizeof(sp_rpc_client_callback)); client->rpc_id = (int)InterlockedIncrement((LONG*)&mgr->local_seq); spinlock_init(&client->lock); client->state = STATE_INIT; REF_COUNT_INIT(&client->ref_cnt); sp_rpc_client_mgr_inc_ref(mgr); sp_rpc_client_inc_ref(client); mgr_lock(mgr); hlist_add_head(&client->hentry, &mgr->rpc_buckets[client->rpc_id % BUCKET_SIZE]); client->mgr->rpc_cnt++; mgr_unlock(mgr); *p_client = client; return 0; } int sp_rpc_client_close(sp_rpc_client_t *client) { int rc; client_lock(client); if (client->state != STATE_TERM && client->state != STATE_ERROR) { client->state = STATE_ERROR; rc = 0; } else { rc = Error_Duplication; } client_unlock(client); return rc; } void sp_rpc_client_destroy(sp_rpc_client_t *client) { mgr_lock(client->mgr); client->mgr->rpc_cnt --; hlist_del(&client->hentry); mgr_unlock(client->mgr); sp_rpc_client_dec_ref(client); client_lock(client); client->state = STATE_TERM; client_unlock(client); sp_rpc_client_dec_ref(client); } int sp_rpc_client_async_call(sp_rpc_client_t *client, iobuffer_t **req_pkt) { sp_rpc_client_mgr_t *mgr = client->mgr; int rc = 0; if (client->state != STATE_INIT) return Error_Bug; client_lock(client); if (client->state == STATE_INIT) { client->state = STATE_SENT; sp_rpc_client_inc_ref(client); // @ iobuffer_write_head(*req_pkt, IOBUF_T_I4, &client->call_type, 0); rc = sp_svc_post(mgr->svc, client->remote_epid, client->remote_svc_id, SP_PKT_RPC|RPC_CMD_REQ, client->rpc_id, req_pkt); if (rc != 0) { sp_rpc_client_dec_ref(client); // @ client->state = STATE_ERROR; } } else { rc = Error_NetBroken; } client_unlock(client); return rc; } int sp_rpc_client_get_rpc_id(sp_rpc_client_t *client) { return client->rpc_id; } int sp_rpc_client_get_remote_epid(sp_rpc_client_t *client) { return client->remote_epid; } int sp_rpc_client_get_remote_svc_id(sp_rpc_client_t *client) { return client->remote_svc_id; } static void client_set_error(sp_rpc_client_t *client, int error) { if (client->state != STATE_ERROR && client->state != STATE_TERM) { client_lock(client); if (client->state != STATE_ERROR && client->state != STATE_TERM) { if (client->state == STATE_SENT) { if (client->cb.on_ans) { client->cb.on_ans(client, error, NULL, client->cb.user_data); } } else { client->state = STATE_ERROR; } } client_unlock(client); } sp_rpc_client_dec_ref(client); // @ } static void client_process_ans(sp_rpc_client_t *client, iobuffer_t **ans_pkt) { if (client->state == STATE_SENT) { client_lock(client); if (client->state == STATE_SENT) { client->state = STATE_CALLED; if (client->cb.on_ans) { client->cb.on_ans(client, 0, ans_pkt, client->cb.user_data); } } client_unlock(client); } sp_rpc_client_dec_ref(client); // @ } static void __client_destroy(sp_rpc_client_t *client) { if (client->cb.on_destroy) client->cb.on_destroy(client, client->cb.user_data); sp_rpc_client_mgr_dec_ref(client->mgr); free(client); } IMPLEMENT_REF_COUNT_MT_STATIC(sp_rpc_client, sp_rpc_client_t, ref_cnt, __client_destroy)