sp_iom.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. #include "precompile.h"
  2. #include "sp_iom.h"
  3. #include "sp_def.h"
  4. #include "sp_dbg_export.h"
  5. #include "spinlock.h"
  6. #include "timerqueue.h"
  7. #include "array.h"
  8. #include "memutil.h"
  9. #include <time.h>
  10. #ifndef _WIN32
  11. #include <unistd.h>
  12. #include <sys/syscall.h>
  13. static int GetParentProcessID()
  14. {
  15. #if 0
  16. return getppid();
  17. #else
  18. return (int)syscall(SYS_getppid);
  19. #endif
  20. }
  21. #endif //NOT _WIN32
  22. #define POLL_INTERVAL 10
  23. #define IOM_T_EXIT 0
  24. #define IOM_T_GET_STATE 1
  25. #define IOM_T_SEND_INFO 2
  26. static int translate_error(int bus_error)
  27. {
  28. int error;
  29. switch (bus_error) {
  30. case BUS_E_OK:
  31. error = Error_Succeed;
  32. break;
  33. case BUS_E_FAIL:
  34. error = Error_Unexpect;
  35. break;
  36. case BUS_E_NETBROKEN:
  37. error = Error_NetBroken;
  38. break;
  39. case BUS_E_NOTFOUND:
  40. error = Error_NotExist;
  41. break;
  42. default:
  43. error = Error_Unexpect;
  44. break;
  45. }
  46. return error;
  47. }
  48. typedef struct pkt_handler_t {
  49. int key;
  50. sp_iom_on_pkt on_pkt;
  51. void *user_data;
  52. }pkt_handler_t;
  53. typedef struct sys_handler_t {
  54. int key;
  55. sp_iom_on_sys on_sys;
  56. void *user_data;
  57. }sys_handler_t;
  58. struct sp_iom_t
  59. {
  60. bus_endpt_t *endpt;
  61. timer_queue_t *tm_queue;
  62. spinlock_t tm_lock;
  63. spinlock_t pkt_handler_lock;
  64. spinlock_t sys_handler_lock;
  65. array_header_t *arr_pkt_handler;
  66. array_header_t *arr_sys_handler;
  67. int stop;
  68. int poll_thread_id;
  69. };
  70. static __inline void iom_pkt_handler_lock(sp_iom_t *iom)
  71. {
  72. spinlock_enter(&iom->pkt_handler_lock, -1);
  73. }
  74. static __inline void iom_pkt_handler_unlock(sp_iom_t *iom)
  75. {
  76. spinlock_leave(&iom->pkt_handler_lock);
  77. }
  78. static __inline void iom_sys_handler_lock(sp_iom_t *iom)
  79. {
  80. spinlock_enter(&iom->sys_handler_lock, -1);
  81. }
  82. static __inline void iom_sys_handler_unlock(sp_iom_t *iom)
  83. {
  84. spinlock_leave(&iom->sys_handler_lock);
  85. }
  86. static __inline void iom_tm_lock(sp_iom_t *iom)
  87. {
  88. spinlock_enter(&iom->tm_lock, -1);
  89. }
  90. static __inline void iom_tm_unlock(sp_iom_t *iom)
  91. {
  92. spinlock_leave(&iom->tm_lock);
  93. }
  94. static pkt_handler_t* find_pkt_handler(sp_iom_t *iom, int key, int *idx)
  95. {
  96. int i;
  97. for (i = 0; i < iom->arr_pkt_handler->nelts; ++i) {
  98. pkt_handler_t *t = ARRAY_IDX(iom->arr_pkt_handler, i, pkt_handler_t*);
  99. if (t->key == key) {
  100. if (idx)
  101. *idx = i;
  102. return t;
  103. }
  104. }
  105. return NULL;
  106. }
  107. static sys_handler_t* find_sys_handler(sp_iom_t *iom, int key, int *idx)
  108. {
  109. int i;
  110. for (i = 0; i < iom->arr_sys_handler->nelts; ++i) {
  111. sys_handler_t *t = ARRAY_IDX(iom->arr_sys_handler, i, sys_handler_t*);
  112. if (t->key == key) {
  113. if (idx)
  114. *idx = i;
  115. return t;
  116. }
  117. }
  118. return NULL;
  119. }
  120. static void on_pkt(bus_endpt_t *endpt, int epid, int type, iobuffer_t **p_pkt, void *user_data)
  121. {
  122. sp_iom_t *iom = (sp_iom_t *)user_data;
  123. int i;
  124. iom_pkt_handler_lock(iom);
  125. for (i = 0; i < iom->arr_pkt_handler->nelts; ++i) {
  126. pkt_handler_t *pkt_handler = ARRAY_IDX(iom->arr_pkt_handler, i, pkt_handler_t*);
  127. if (pkt_handler->on_pkt) {
  128. int read_state = iobuffer_get_read_state(*p_pkt);
  129. int write_state = iobuffer_get_write_state(*p_pkt);
  130. int svc_id;
  131. int from_svc_id;
  132. int pkt_id;
  133. int count;
  134. iobuffer_format_read(*p_pkt, "444", &from_svc_id, &svc_id, &pkt_id);
  135. count = pkt_handler->on_pkt(iom, svc_id, epid, from_svc_id, type, pkt_id, p_pkt, pkt_handler->user_data);
  136. if (count && p_pkt && *p_pkt) {
  137. iobuffer_restore_read_state(*p_pkt, read_state);
  138. iobuffer_restore_write_state(*p_pkt, write_state);
  139. } else {
  140. break;
  141. }
  142. }
  143. }
  144. iom_pkt_handler_unlock(iom);
  145. }
  146. static void on_msg(bus_endpt_t *endpt, int msg, int nparam, param_size_t params[], int *result, void *user_data)
  147. {
  148. sp_iom_t *iom = (sp_iom_t *)user_data;
  149. sp_dbg_debug("==> on msg %d, %d, result? %d", msg, nparam, result != NULL ? 1: 0);
  150. if (msg == IOM_T_SEND_INFO) {
  151. int pkt_type = params[0];
  152. int this_svc_id = params[1];
  153. int epid = params[2];
  154. int svc_id = params[3];
  155. int pkt_id = params[4];
  156. iobuffer_t *pkt = (iobuffer_t*)params[5];
  157. int read_state, write_state;
  158. int ret;
  159. read_state = iobuffer_get_read_state(pkt);
  160. write_state = iobuffer_get_write_state(pkt);
  161. iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0);
  162. iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0);
  163. iobuffer_write_head(pkt, IOBUF_T_I4, &this_svc_id, 0);
  164. if (result) {
  165. int state = 0;
  166. sp_dbg_debug("get state first");
  167. ret = bus_endpt_get_state(endpt, epid, &state);
  168. if (ret == 0)
  169. ret = (state == BUS_STATE_ON) ? 0 : -1;
  170. if (ret == 0)
  171. ret = bus_endpt_send_info(endpt, epid, pkt_type, pkt);
  172. *result = ret;
  173. if (ret != 0) {
  174. iobuffer_restore_read_state(pkt, read_state);
  175. iobuffer_restore_write_state(pkt, write_state);
  176. } else {
  177. iobuffer_dec_ref(pkt);
  178. }
  179. } else {
  180. bus_endpt_send_info(endpt, epid, pkt_type, pkt);
  181. iobuffer_dec_ref(pkt);
  182. }
  183. sp_dbg_debug("on_msg send_info end, pkt type: 0x%08X, %d, %d, %d, %d", pkt_type, this_svc_id, epid, svc_id, pkt_id);
  184. } else if (msg == IOM_T_GET_STATE) {
  185. int epid = params[0];
  186. int *state = (int*)params[1];
  187. int *rc = (int*)params[2];
  188. *rc = bus_endpt_get_state(endpt, epid, state);
  189. } else if (msg == IOM_T_EXIT) {
  190. sp_iom_stop(iom);
  191. } else {
  192. assert(0);
  193. }
  194. }
  195. static void on_sys(bus_endpt_t *endpt, int epid, int state, void *user_data)
  196. {
  197. sp_iom_t *iom = (sp_iom_t *)user_data;
  198. int i;
  199. iom_sys_handler_lock(iom);
  200. for (i = 0; i < iom->arr_sys_handler->nelts; ++i) {
  201. sys_handler_t *sys_handler = ARRAY_IDX(iom->arr_sys_handler, i, sys_handler_t*);
  202. if (sys_handler->on_sys) {
  203. sys_handler->on_sys(iom, epid, state, sys_handler->user_data);
  204. }
  205. }
  206. iom_sys_handler_unlock(iom);
  207. }
  208. int sp_iom_create(const char *url, int epid, sp_iom_t **p_iom)
  209. {
  210. sp_iom_t *iom;
  211. bus_endpt_callback callback;
  212. int rc;
  213. iom = ZALLOC_T(sp_iom_t);
  214. callback.on_evt = NULL;
  215. callback.on_sys = &on_sys;
  216. callback.on_pkt = &on_pkt;
  217. callback.on_inf = &on_pkt;
  218. callback.on_msg = &on_msg;
  219. callback.user_data = iom;
  220. rc = bus_endpt_create(url, epid, &callback, &iom->endpt);
  221. if (rc != 0)
  222. goto on_error;
  223. timer_heap_create(&iom->tm_queue);
  224. spinlock_init(&iom->pkt_handler_lock);
  225. spinlock_init(&iom->sys_handler_lock);
  226. spinlock_init(&iom->tm_lock);
  227. iom->arr_pkt_handler = array_make(3, sizeof(pkt_handler_t*));
  228. iom->arr_sys_handler = array_make(3, sizeof(sys_handler_t*));
  229. iom->poll_thread_id = 0;
  230. *p_iom = iom;
  231. return 0;
  232. on_error:
  233. free(iom);
  234. return Error_Unexpect;
  235. }
  236. void sp_iom_destroy(sp_iom_t *iom)
  237. {
  238. timer_queue_destroy(iom->tm_queue);
  239. bus_endpt_destroy(iom->endpt);
  240. array_free(iom->arr_pkt_handler);
  241. array_free(iom->arr_sys_handler);
  242. free(iom);
  243. }
  244. int sp_iom_add_pkt_handler(sp_iom_t *iom, int key, sp_iom_on_pkt on_pkt, void *user_data)
  245. {
  246. int rc = 0;
  247. pkt_handler_t *pkt_handler;
  248. iom_pkt_handler_lock(iom);
  249. pkt_handler = find_pkt_handler(iom, key, NULL);
  250. if (!pkt_handler) {
  251. pkt_handler = MALLOC_T(pkt_handler_t);
  252. pkt_handler->key = key;
  253. pkt_handler->on_pkt = on_pkt;
  254. pkt_handler->user_data = user_data;
  255. ARRAY_PUSH(iom->arr_pkt_handler, pkt_handler_t*) = pkt_handler;
  256. } else {
  257. rc = Error_Duplication;
  258. }
  259. iom_pkt_handler_unlock(iom);
  260. return rc;
  261. }
  262. int sp_iom_remove_pkt_handler(sp_iom_t *iom, int key)
  263. {
  264. int rc = 0;
  265. pkt_handler_t *pkt_handler;
  266. int i;
  267. iom_pkt_handler_lock(iom);
  268. pkt_handler = find_pkt_handler(iom, key, &i);
  269. if (pkt_handler) {
  270. if (i < iom->arr_pkt_handler->nelts-1)
  271. ARRAY_IDX(iom->arr_pkt_handler, i, pkt_handler_t*) = ARRAY_IDX(iom->arr_pkt_handler, iom->arr_pkt_handler->nelts-1, pkt_handler_t*);
  272. array_pop(iom->arr_pkt_handler);
  273. } else {
  274. rc = Error_NotExist;
  275. }
  276. iom_pkt_handler_unlock(iom);
  277. return rc;
  278. }
  279. int sp_iom_add_sys_handler(sp_iom_t *iom, int key, sp_iom_on_sys on_sys, void *user_data)
  280. {
  281. int rc = 0;
  282. sys_handler_t *sys_handler;
  283. iom_sys_handler_lock(iom);
  284. sys_handler = find_sys_handler(iom, key, NULL);
  285. if (!sys_handler) {
  286. sys_handler = MALLOC_T(sys_handler_t);
  287. sys_handler->key = key;
  288. sys_handler->on_sys = on_sys;
  289. sys_handler->user_data = user_data;
  290. ARRAY_PUSH(iom->arr_sys_handler, sys_handler_t*) = sys_handler;
  291. } else {
  292. rc = Error_Duplication;
  293. }
  294. iom_sys_handler_unlock(iom);
  295. return rc;
  296. }
  297. int sp_iom_remove_sys_handler(sp_iom_t *iom, int key)
  298. {
  299. int rc = 0;
  300. sys_handler_t *sys_handler;
  301. int i;
  302. iom_sys_handler_lock(iom);
  303. sys_handler = find_sys_handler(iom, key, &i);
  304. if (sys_handler) {
  305. ARRAY_DEL(iom->arr_sys_handler, i, sys_handler_t*);
  306. } else {
  307. rc = Error_NotExist;
  308. }
  309. iom_sys_handler_unlock(iom);
  310. return rc;
  311. }
  312. static int sp_iom_poll(sp_iom_t *iom, int *timeout)
  313. {
  314. int rc;
  315. if (iom->poll_thread_id == 0) {
  316. iom->poll_thread_id = (int)GetCurrentThreadId();
  317. }
  318. rc = bus_endpt_poll(iom->endpt, *timeout);
  319. for (;;) {
  320. int cnt;
  321. long timerBeginTime, timerPlayTime;
  322. iom_tm_lock(iom);
  323. timerBeginTime = clock();
  324. cnt = timer_queue_poll_one(iom->tm_queue, NULL, timeout); // timeout返回下一个定时器超时间隔
  325. timerPlayTime = clock() - timerBeginTime;
  326. if (timerPlayTime > 500) //timer play time over than 500ms, it may make the main thread run slow
  327. sp_dbg_warn("cur Timer %d has run %d ms", cnt, timerPlayTime);
  328. iom_tm_unlock(iom);
  329. if (!cnt) {
  330. //sp_dbg_debug("no timer execute current times.");
  331. break;
  332. }
  333. }
  334. return rc;
  335. }
  336. int sp_iom_run(sp_iom_t *iom)
  337. {
  338. int timeout = POLL_INTERVAL;
  339. #ifdef _WIN32
  340. while (InterlockedExchangeAdd((LONG*)&iom->stop, 0) == 0 || timer_queue_get_count(iom->tm_queue) > 0) {
  341. int rc = sp_iom_poll(iom, &timeout);
  342. if (rc >= 0) {
  343. if (timeout > POLL_INTERVAL || timeout < 0)
  344. timeout = POLL_INTERVAL;
  345. }
  346. else {
  347. sp_dbg_debug("iom poll failed!");
  348. ExitProcess(-1);
  349. return rc;
  350. }
  351. }
  352. #else
  353. int parent_id = GetParentProcessID();
  354. /*the adapte func 'InterlockedExchangeAdd' implemented under winpr went wrong, maybe 64bit has responsibility*/
  355. while (iom->stop == 0 || timer_queue_get_count(iom->tm_queue) > 0) {
  356. int rc = sp_iom_poll(iom, &timeout);
  357. if (rc >= 0) {
  358. if (timeout > POLL_INTERVAL || timeout < 0)
  359. timeout = POLL_INTERVAL;
  360. }
  361. else {
  362. sp_dbg_debug("iom poll failed!");
  363. ExitProcess(-1);
  364. return rc;
  365. }
  366. if (rc == 0) {
  367. const int cur_ppid = GetParentProcessID();
  368. if (cur_ppid != parent_id && cur_ppid == 1 /*init's pid*/) {
  369. sp_dbg_warn("module process ? spshell has gone ?");
  370. parent_id == cur_ppid; /*just test, or would print lots of log here.*/
  371. }
  372. }
  373. }
  374. #endif //_WIN32
  375. sp_dbg_debug("iom run exit ok!");
  376. return 0;
  377. }
  378. int sp_iom_stop(sp_iom_t *iom)
  379. {
  380. sp_dbg_debug("set iom stop flag!");
  381. InterlockedExchange((LONG*)&iom->stop, 1);
  382. return 0;
  383. }
  384. int sp_iom_send(sp_iom_t *iom, int this_svc_id, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt)
  385. {
  386. iobuffer_t *pkt = p_pkt ? *p_pkt : NULL;
  387. int rc;
  388. sp_dbg_debug("== > sp_iom_send: pkt type: 0x%08X, this_svc id: %d, epid:%d, svc id:%d, pkt_id: %d",
  389. pkt_type, this_svc_id, epid, svc_id, pkt_id);
  390. if (!pkt) {
  391. pkt = iobuffer_create(-1, -1);
  392. }
  393. {
  394. param_size_t params[] = {
  395. pkt_type,
  396. this_svc_id,
  397. epid,
  398. svc_id,
  399. pkt_id,
  400. (param_size_t)pkt,
  401. };
  402. rc = bus_endpt_send_msg(iom->endpt, IOM_T_SEND_INFO, array_size(params), params);
  403. }
  404. if (!p_pkt || !*p_pkt) {
  405. if (rc != 0) {
  406. iobuffer_dec_ref(pkt);
  407. }
  408. } else {
  409. if (rc == 0)
  410. *p_pkt = NULL;
  411. }
  412. if (rc != 0)
  413. rc = Error_IO;
  414. sp_dbg_debug("<== sp_iom_send: pkt type: 0x%08X, this_svc id: %d, epid:%d, svc id:%d, pkt_id: %d, rc=%d!",
  415. pkt_type, this_svc_id, epid, svc_id, pkt_id, rc);
  416. return rc;
  417. }
  418. int sp_iom_post(sp_iom_t *iom, int this_svc_id, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt)
  419. {
  420. iobuffer_t *pkt = p_pkt ? *p_pkt : NULL;
  421. int rc;
  422. if (!pkt) {
  423. pkt = iobuffer_create(-1, -1);
  424. }
  425. {
  426. param_size_t params[] = {
  427. pkt_type,
  428. this_svc_id,
  429. epid,
  430. svc_id,
  431. pkt_id,
  432. (param_size_t)pkt,
  433. };
  434. rc = bus_endpt_post_msg(iom->endpt, IOM_T_SEND_INFO, array_size(params), params);
  435. }
  436. if (!p_pkt || !*p_pkt) {
  437. if (rc != 0)
  438. iobuffer_dec_ref(pkt);
  439. } else {
  440. if (rc == 0)
  441. *p_pkt = NULL;
  442. }
  443. if (rc != 0)
  444. rc = Error_Unexpect;
  445. return rc;
  446. }
  447. int sp_iom_get_state(sp_iom_t *iom, int epid, int *state)
  448. {
  449. int rc = 0;
  450. int rc1;
  451. param_size_t params[] = {epid, (param_size_t)state, (param_size_t)&rc};
  452. // use -1 for get state
  453. rc1 = bus_endpt_send_msg(iom->endpt, IOM_T_GET_STATE, array_size(params), params);
  454. sp_dbg_debug("get epid(%d) state: %d, rc1:%d, rc:%d", epid, *state, rc1, rc);
  455. if (rc != 0)
  456. rc = Error_Unexpect;
  457. return rc1 == 0 ? rc : Error_Unexpect;
  458. }
  459. int sp_iom_schedule_timer(sp_iom_t *iom, timer_entry *entry, unsigned int delay)
  460. {
  461. int rc;
  462. iom_tm_lock(iom);
  463. rc = timer_queue_schedule(iom->tm_queue, entry, delay);
  464. iom_tm_unlock(iom);
  465. return rc ? Error_Unexpect : 0;
  466. }
  467. int sp_iom_cancel_timer(sp_iom_t *iom, timer_entry *entry, int cancel)
  468. {
  469. int rc;
  470. iom_tm_lock(iom);
  471. rc = timer_queue_cancel(iom->tm_queue, entry, cancel);
  472. iom_tm_unlock(iom);
  473. return rc ? Error_Unexpect : 0;
  474. }
  475. int sp_iom_get_epid(sp_iom_t *iom)
  476. {
  477. return bus_endpt_get_epid(iom->endpt);
  478. }
  479. const char *sp_iom_get_comm_url(sp_iom_t *iom)
  480. {
  481. return bus_endpt_get_url(iom->endpt);
  482. }
  483. int sp_iom_post_quit(sp_iom_t *iom)
  484. {
  485. int rc = bus_endpt_post_msg(iom->endpt, IOM_T_EXIT, 0, 0);
  486. if (rc != 0)
  487. rc = Error_Unexpect;
  488. return rc;
  489. }
  490. int sp_iom_get_poll_thread_id(sp_iom_t *iom)
  491. {
  492. return iom->poll_thread_id;
  493. }