bus_daemon.c 25 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019
  1. #include "precompile.h"
  2. #include "ioqueue.h"
  3. #include "bus.h"
  4. #include "bus_internal.h"
  5. #include "url.h"
  6. #include "refcnt.h"
  7. #include "list.h"
  8. #include "iobuffer.h"
  9. #include "array.h"
  10. #include "dbgutil.h"
  11. #include <time.h>
  12. #include <winpr/synch.h>
  13. #include <winpr/interlocked.h>
  14. #include <winpr/string.h>
  15. #include <winpr/sysinfo.h>
  16. #include <winpr/thread.h>
  17. #include <winpr/winsock.h>
  18. #define TAG TOOLKIT_TAG("bus_deamon")
  19. #define MAX_THREADS 32
  20. #define DEFAULT_ACCEPT_OP_COUNT 5
  21. #define MSG_REMOVE_REGISTAR 1
  22. typedef struct endpt_session_t endpt_session_t;
  23. typedef struct daemon_accetpor_t daemon_accetpor_t;
  24. struct endpt_session_t
  25. {
  26. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  27. int epid;
  28. int registered;
  29. union {
  30. ioqueue_tcpsock_t tcp;
  31. ioqueue_file_t pipe;
  32. };
  33. CRITICAL_SECTION lock;
  34. time_t start_time;
  35. int err;
  36. int rx_pending_pkt_len;
  37. int type;
  38. int tx_pending;
  39. struct list_head entry;
  40. iobuffer_queue_t *tx_iobuf_queue;
  41. ioqueue_overlapped_t rx_overlapped;
  42. ioqueue_overlapped_t tx_overlapped;
  43. iobuffer_t *rx_pending_buf;
  44. iobuffer_t *tx_pending_buf;
  45. bus_daemon_t *daemon;
  46. daemon_accetpor_t *acceptor;
  47. };
  48. struct daemon_accetpor_t
  49. {
  50. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  51. union {
  52. ioqueue_acceptor_t tcp_acceptor;
  53. ioqueue_pipe_acceptor_t pipe_acceptor;
  54. };
  55. int type;
  56. array_header_t *arr_ov;
  57. bus_daemon_t *daemon;
  58. };
  59. struct bus_daemon_t
  60. {
  61. ioqueue_t *ioq;
  62. volatile LONG lstop;
  63. CRITICAL_SECTION lock;
  64. int nthread;
  65. array_header_t *arr_uri;
  66. array_header_t *arr_acceptor;
  67. array_header_t *arr_thread;
  68. struct list_head registered_session_list;
  69. struct list_head unregistered_session_list;
  70. };
  71. DECLARE_REF_COUNT_STATIC(endpt_session, endpt_session_t)
  72. DECLARE_REF_COUNT_STATIC(daemon_accetpor, daemon_accetpor_t)
  73. static unsigned int __stdcall thread_proc(void *param)
  74. {
  75. bus_daemon_t *daemon = (bus_daemon_t*)param;
  76. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  77. while (!daemon->lstop) {
  78. ioqueue_poll(daemon->ioq, 10);
  79. }
  80. return 0;
  81. }
  82. static int start_accept(daemon_accetpor_t *acceptor, int idx);
  83. static int queue_ans_pkt(endpt_session_t *session, int rc);
  84. static int start_send_pkt(endpt_session_t *session, iobuffer_t **pkt);
  85. static int session_start_recv_hdr(endpt_session_t *session);
  86. static int queue_sys_pkt(endpt_session_t *session, int epid, int state);
  87. static void daemon_lock(bus_daemon_t *daemon)
  88. {
  89. EnterCriticalSection(&daemon->lock);
  90. }
  91. static void daemon_unlock(bus_daemon_t *daemon)
  92. {
  93. LeaveCriticalSection(&daemon->lock);
  94. }
  95. static void session_lock(endpt_session_t *session)
  96. {
  97. EnterCriticalSection(&session->lock);
  98. }
  99. static void session_unlock(endpt_session_t *session)
  100. {
  101. LeaveCriticalSection(&session->lock);
  102. }
  103. static void add_unregistered_list(endpt_session_t *session)
  104. {
  105. bus_daemon_t *daemon = session->daemon;
  106. daemon_lock(daemon);
  107. list_add_tail(&session->entry, &daemon->unregistered_session_list);
  108. daemon_unlock(daemon);
  109. }
  110. static void remove_session_list(endpt_session_t *session)
  111. {
  112. bus_daemon_t *daemon = session->daemon;
  113. daemon_lock(daemon);
  114. list_del(&session->entry);
  115. if (session->epid != BUS_INVALID_EPID) {
  116. endpt_session_t *pos, *n;
  117. list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
  118. queue_sys_pkt(pos, session->epid, BUS_STATE_OFF);
  119. }
  120. }
  121. daemon_unlock(daemon);
  122. }
  123. static void move_to_registered_session(endpt_session_t *session)
  124. {
  125. bus_daemon_t *daemon = session->daemon;
  126. daemon_lock(daemon);
  127. {
  128. endpt_session_t *pos, *n;
  129. list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
  130. queue_sys_pkt(pos, session->epid, BUS_STATE_ON);
  131. }
  132. }
  133. list_move_tail(&session->entry, &daemon->registered_session_list);
  134. daemon_unlock(daemon);
  135. }
  136. static void move_to_unregistered_session(endpt_session_t *session)
  137. {
  138. bus_daemon_t *daemon = session->daemon;
  139. daemon_lock(daemon);
  140. list_move_tail(&session->entry, &daemon->unregistered_session_list);
  141. daemon_unlock(daemon);
  142. }
  143. static endpt_session_t* create_session(daemon_accetpor_t *acceptor, int type, int fd)
  144. {
  145. int rc;
  146. endpt_session_t *session;
  147. session = ZALLOC_T(endpt_session_t);
  148. if (type == TYPE_PIPE) {
  149. rc = ioqueue_pipe_acceptor_create_client(&acceptor->pipe_acceptor, (HANDLE)fd, &session->pipe);
  150. if (rc < 0)
  151. goto on_error;
  152. } else if (type == TYPE_TCP) {
  153. rc = ioqueue_acceptor_create_client(&acceptor->tcp_acceptor, fd, &session->tcp);
  154. if (rc < 0)
  155. goto on_error;
  156. }
  157. session->epid = BUS_INVALID_EPID;
  158. session->start_time = time(NULL);
  159. session->daemon = acceptor->daemon;
  160. session->acceptor = acceptor;
  161. session->type = type;
  162. REF_COUNT_INIT(&session->ref_cnt);
  163. InitializeCriticalSection(&session->lock);
  164. session->tx_iobuf_queue = iobuffer_queue_create();
  165. return session;
  166. on_error:
  167. free(session);
  168. return NULL;
  169. }
  170. static void destroy_session(endpt_session_t *session)
  171. {
  172. while (iobuffer_queue_count(session->tx_iobuf_queue) > 0) {
  173. iobuffer_t *iobuf = iobuffer_queue_deque(session->tx_iobuf_queue);
  174. endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(iobuf);
  175. if (ts) {
  176. queue_ans_pkt(ts, BUS_E_NETBROKEN);
  177. endpt_session_dec_ref(ts);
  178. }
  179. iobuffer_destroy(iobuf);
  180. }
  181. iobuffer_queue_destroy(session->tx_iobuf_queue);
  182. DeleteCriticalSection(&session->lock);
  183. if (session->type == TYPE_PIPE) {
  184. ioqueue_file_destroy(&session->pipe);
  185. } else if (session->type == TYPE_TCP) {
  186. ioqueue_tcpsock_destroy(&session->tcp);
  187. } else {
  188. TOOLKIT_ASSERT(0);
  189. }
  190. free(session);
  191. }
  192. IMPLEMENT_REF_COUNT_MT_STATIC(endpt_session, endpt_session_t, ref_cnt, destroy_session)
  193. static endpt_session_t *find_session(bus_daemon_t *daemon, int epid)
  194. {
  195. endpt_session_t *session = NULL, *pos;
  196. daemon_lock(daemon);
  197. list_for_each_entry(pos, &daemon->registered_session_list, endpt_session_t, entry) {
  198. if (pos->epid == epid) {
  199. endpt_session_inc_ref(pos);
  200. session = pos;
  201. break;
  202. }
  203. }
  204. daemon_unlock(daemon);
  205. return session;
  206. }
  207. static void on_send_pkt(endpt_session_t *session, int err)
  208. {
  209. iobuffer_t *pkt;
  210. session_lock(session);
  211. pkt = session->tx_pending_buf;
  212. session->tx_pending_buf = NULL;
  213. session->tx_pending = 0;
  214. session_unlock(session);
  215. if (pkt) {
  216. if (iobuffer_get_user_data(pkt)) {
  217. endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(pkt);
  218. queue_ans_pkt(ts, err ? BUS_E_NETBROKEN : BUS_E_OK);
  219. endpt_session_dec_ref(ts);
  220. }
  221. iobuffer_destroy(pkt);
  222. pkt = NULL;
  223. }
  224. if (!err) {
  225. int rc = 0;
  226. session_lock(session);
  227. if (!session->tx_pending) {
  228. if (iobuffer_queue_count(session->tx_iobuf_queue)) {
  229. iobuffer_t *tpkt = iobuffer_queue_deque(session->tx_iobuf_queue);
  230. session->tx_pending = 1;
  231. rc = start_send_pkt(session, &tpkt);
  232. if (rc < 0) {
  233. session->tx_pending = 0;
  234. }
  235. if (tpkt) {
  236. pkt = tpkt;
  237. }
  238. }
  239. }
  240. session_unlock(session);
  241. if (rc < 0) {
  242. if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
  243. remove_session_list(session);
  244. endpt_session_dec_ref(session);
  245. }
  246. }
  247. if (pkt) {
  248. if (iobuffer_get_user_data(pkt)) {
  249. endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(pkt);
  250. queue_ans_pkt(ts, err ? BUS_E_NETBROKEN : BUS_E_OK);
  251. endpt_session_dec_ref(ts);
  252. }
  253. iobuffer_destroy(pkt);
  254. pkt = NULL;
  255. }
  256. } else {
  257. session_lock(session);
  258. while (iobuffer_queue_count(session->tx_iobuf_queue) > 0) {
  259. iobuffer_t *iobuf = iobuffer_queue_deque(session->tx_iobuf_queue);
  260. endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(iobuf);
  261. if (ts) {
  262. session_unlock(session);
  263. queue_ans_pkt(ts, BUS_E_NETBROKEN);
  264. session_lock(session);
  265. endpt_session_dec_ref(ts);
  266. }
  267. iobuffer_destroy(iobuf);
  268. }
  269. session_unlock(session);
  270. }
  271. endpt_session_dec_ref(session);
  272. }
  273. static void on_pipe_send_pkt(ioqueue_file_t* file,
  274. ioqueue_overlapped_t *overlapped,
  275. void *buf,
  276. unsigned int transfer_bytes,
  277. void *user_data,
  278. int err)
  279. {
  280. on_send_pkt((endpt_session_t*)user_data, err);
  281. }
  282. static void on_tcp_send_pkt(ioqueue_tcpsock_t* tcpsock,
  283. ioqueue_overlapped_t *overlapped,
  284. void *buf,
  285. unsigned int transfer_bytes,
  286. void *user_data,
  287. int err)
  288. {
  289. on_send_pkt((endpt_session_t*)user_data, err);
  290. }
  291. static int start_send_pkt(endpt_session_t *session, iobuffer_t **pkt)
  292. {
  293. int rc = -1;
  294. int v;
  295. if (session->err < 0)
  296. return rc;
  297. v = iobuffer_get_length(*pkt);
  298. iobuffer_write_head(*pkt, IOBUF_T_I4, &v, 0);
  299. endpt_session_inc_ref(session);
  300. session->tx_pending_buf = *pkt;
  301. *pkt = NULL;
  302. if (session->type == TYPE_PIPE) {
  303. rc = ioqueue_file_async_writen(&session->pipe,
  304. &session->tx_overlapped,
  305. iobuffer_data(session->tx_pending_buf, 0),
  306. iobuffer_get_length(session->tx_pending_buf),
  307. &on_pipe_send_pkt, session);
  308. } else if (session->type == TYPE_TCP) {
  309. rc = ioqueue_tcpsock_async_sendn(&session->tcp,
  310. &session->tx_overlapped,
  311. iobuffer_data(session->tx_pending_buf, 0),
  312. iobuffer_get_length(session->tx_pending_buf),
  313. &on_tcp_send_pkt, session);
  314. } else {
  315. TOOLKIT_ASSERT(0);
  316. }
  317. if (rc < 0) {
  318. *pkt = session->tx_pending_buf;
  319. endpt_session_dec_ref(session);
  320. }
  321. return rc;
  322. }
  323. static int queue_tx_pkt(endpt_session_t *session, iobuffer_t **pkt)
  324. {
  325. int rc = -1;
  326. session_lock(session);
  327. if (session->tx_pending) {
  328. iobuffer_queue_enqueue(session->tx_iobuf_queue, *pkt);
  329. *pkt = NULL;
  330. rc = 0;
  331. } else {
  332. session->tx_pending = 1;
  333. rc = start_send_pkt(session, pkt);
  334. }
  335. session_unlock(session);
  336. if (rc < 0) {
  337. if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
  338. ioqueue_post_message(session->daemon->ioq, MSG_REMOVE_REGISTAR, (int)session, 0);
  339. }
  340. }
  341. return rc;
  342. }
  343. static iobuffer_t *make_ans_pkt(int rc)
  344. {
  345. int v;
  346. iobuffer_t *pkt = iobuffer_create(-1, -1);
  347. v = BUS_TYPE_ERROR;
  348. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  349. v = rc;
  350. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  351. return pkt;
  352. }
  353. static int queue_ans_pkt(endpt_session_t *session, int rc)
  354. {
  355. iobuffer_t *pkt = make_ans_pkt(rc);
  356. int err = queue_tx_pkt(session, &pkt);
  357. if (pkt) {
  358. iobuffer_destroy(pkt);
  359. }
  360. return err;
  361. }
  362. static iobuffer_t *make_ans_state_pkt(bus_daemon_t *daemon, int epid)
  363. {
  364. int v;
  365. endpt_session_t *ts;
  366. iobuffer_t *pkt;
  367. int state;
  368. ts = find_session(daemon, epid);
  369. if (!ts) {
  370. state = BUS_STATE_OFF;
  371. } else {
  372. state = BUS_STATE_ON;
  373. endpt_session_dec_ref(ts);
  374. ts = NULL;
  375. }
  376. pkt = iobuffer_create(-1, -1);
  377. v = BUS_TYPE_ENDPT_GET_STATE;
  378. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  379. v = epid;
  380. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  381. v = state;
  382. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  383. return pkt;
  384. }
  385. static iobuffer_t *make_sys_pkt(int epid, int state)
  386. {
  387. int v;
  388. iobuffer_t *pkt = iobuffer_create(-1, -1);
  389. v = BUS_TYPE_SYSTEM;
  390. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  391. v = epid;
  392. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  393. v = state;
  394. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  395. return pkt;
  396. }
  397. static int queue_sys_pkt(endpt_session_t *session, int epid, int state)
  398. {
  399. iobuffer_t *pkt = make_sys_pkt(epid, state);
  400. int err = queue_tx_pkt(session, &pkt);
  401. if (pkt) {
  402. iobuffer_destroy(pkt);
  403. }
  404. return err;
  405. }
  406. static int on_process_pkt(endpt_session_t *session, iobuffer_t **ppkt)
  407. {
  408. bus_daemon_t *daemon = session->daemon;
  409. iobuffer_t *pkt = *ppkt;
  410. int type;
  411. int read_state;
  412. int rc = -1;
  413. read_state = iobuffer_get_read_state(pkt);
  414. iobuffer_read(pkt, IOBUF_T_I4, &type, 0);
  415. if (session->registered) {
  416. if (type == BUS_TYPE_ENDPT_UNREGISTER) {
  417. move_to_unregistered_session(session);
  418. session->registered = 0;
  419. queue_ans_pkt(session, BUS_E_OK);
  420. rc = -1;
  421. } else if (type == BUS_TYPE_ENDPT_GET_STATE) {
  422. int epid;
  423. iobuffer_t *ans_pkt;
  424. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  425. ans_pkt = make_ans_state_pkt(session->daemon, epid);
  426. rc = queue_tx_pkt(session, &ans_pkt);
  427. if (ans_pkt)
  428. iobuffer_dec_ref(ans_pkt);
  429. } else if (type == BUS_TYPE_PACKET) {
  430. int from_epid;
  431. int to_epid;
  432. int user_type;
  433. iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0);
  434. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  435. iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0);
  436. iobuffer_restore_read_state(pkt, read_state);
  437. if (to_epid == session->epid) {
  438. endpt_session_inc_ref(session);
  439. iobuffer_set_user_data(pkt, session);
  440. rc = queue_tx_pkt(session, ppkt);
  441. if (rc < 0) {
  442. endpt_session_dec_ref(session);
  443. iobuffer_set_user_data(pkt, NULL);
  444. }
  445. } else {
  446. endpt_session_t *ts = find_session(session->daemon, to_epid);
  447. if (!ts) {
  448. rc = queue_ans_pkt(session, BUS_E_NOTFOUND);
  449. } else {
  450. if (ts->err) {
  451. rc = queue_ans_pkt(session, BUS_E_NETBROKEN);
  452. } else {
  453. endpt_session_inc_ref(session);
  454. iobuffer_set_user_data(pkt, session);
  455. rc = queue_tx_pkt(ts, ppkt);
  456. if (rc < 0) {
  457. endpt_session_dec_ref(session);
  458. iobuffer_set_user_data(pkt, NULL);
  459. rc = queue_ans_pkt(session, BUS_E_NETBROKEN);
  460. }
  461. }
  462. endpt_session_dec_ref(ts); // find session
  463. }
  464. }
  465. } else if (type == BUS_TYPE_EVENT) {
  466. int epid;
  467. int user_type;
  468. iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0);
  469. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  470. iobuffer_restore_read_state(pkt, read_state);
  471. daemon_lock(session->daemon);
  472. {
  473. endpt_session_t *pos, *n;
  474. list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
  475. if (pos != session) {
  476. iobuffer_t *tbuf = iobuffer_clone(pkt);
  477. queue_tx_pkt(pos, &tbuf);
  478. if (tbuf)
  479. iobuffer_destroy(tbuf);
  480. }
  481. }
  482. }
  483. daemon_unlock(session->daemon);
  484. rc = 0;
  485. } else if (type == BUS_TYPE_INFO) {
  486. int from_epid;
  487. int to_epid;
  488. int user_type;
  489. iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0);
  490. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  491. iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0);
  492. iobuffer_restore_read_state(pkt, read_state);
  493. if (to_epid == session->epid) {
  494. queue_tx_pkt(session, ppkt);
  495. } else {
  496. endpt_session_t *ts = find_session(session->daemon, to_epid);
  497. if (ts) {
  498. if (!ts->err) {
  499. queue_tx_pkt(ts, ppkt);
  500. }
  501. endpt_session_dec_ref(ts); // find session
  502. }
  503. }
  504. rc = 0;
  505. }
  506. } else {
  507. if (type == BUS_TYPE_ENDPT_REGISTER) {
  508. int epid;
  509. endpt_session_t *ts;
  510. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  511. ts = find_session(session->daemon, epid);
  512. if (!ts) {
  513. session->registered = 1;
  514. session->epid = epid;
  515. rc = queue_ans_pkt(session, BUS_E_OK);
  516. if (rc == 0) {
  517. move_to_registered_session(session);
  518. }
  519. } else {
  520. endpt_session_dec_ref(ts);
  521. }
  522. }
  523. }
  524. if (*ppkt)
  525. iobuffer_restore_read_state(pkt, read_state);
  526. return rc;
  527. }
  528. static void on_recv_body(endpt_session_t *session, unsigned int transfer_bytes, int err)
  529. {
  530. int rc = -1;
  531. if (!err) {
  532. iobuffer_t *pkt = session->rx_pending_buf;
  533. session->rx_pending_buf = NULL;
  534. iobuffer_push_count(pkt, transfer_bytes);
  535. rc = on_process_pkt(session, &pkt);
  536. if (rc == 0) {
  537. rc = session_start_recv_hdr(session);
  538. }
  539. if (pkt)
  540. iobuffer_dec_ref(pkt);
  541. }
  542. if (rc < 0) {
  543. if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
  544. remove_session_list(session);
  545. endpt_session_dec_ref(session);
  546. }
  547. }
  548. endpt_session_dec_ref(session);
  549. }
  550. static void on_pipe_recv_body(ioqueue_file_t* file,
  551. ioqueue_overlapped_t *overlapped,
  552. void *buf,
  553. unsigned int transfer_bytes,
  554. void *user_data,
  555. int err)
  556. {
  557. on_recv_body((endpt_session_t*)user_data, transfer_bytes, err);
  558. }
  559. static void on_tcp_recv_body(ioqueue_tcpsock_t* tcpsock,
  560. ioqueue_overlapped_t *overlapped,
  561. void *buf,
  562. unsigned int transfer_bytes,
  563. void *user_data,
  564. int err)
  565. {
  566. on_recv_body((endpt_session_t*)user_data, transfer_bytes, err);
  567. }
  568. static int session_start_recv_body(endpt_session_t *session)
  569. {
  570. int rc = -1;
  571. if (session->err < 0)
  572. return rc;
  573. endpt_session_inc_ref(session);
  574. session->rx_pending_buf = iobuffer_create(-1, session->rx_pending_pkt_len);
  575. if (session->type == TYPE_PIPE) {
  576. rc = ioqueue_file_async_readn(&session->pipe,
  577. &session->rx_overlapped,
  578. iobuffer_data(session->rx_pending_buf, 0),
  579. session->rx_pending_pkt_len,
  580. &on_pipe_recv_body,
  581. session);
  582. } else if (session->type == TYPE_TCP) {
  583. rc = ioqueue_tcpsock_async_recvn(&session->tcp,
  584. &session->rx_overlapped,
  585. iobuffer_data(session->rx_pending_buf, 0),
  586. session->rx_pending_pkt_len,
  587. &on_tcp_recv_body,
  588. session);
  589. }
  590. if (rc < 0) {
  591. iobuffer_destroy(session->rx_pending_buf);
  592. session->rx_pending_buf = NULL;
  593. endpt_session_dec_ref(session);
  594. }
  595. return rc;
  596. }
  597. static void on_recv_hdr(endpt_session_t *session, int err)
  598. {
  599. int rc = -1;
  600. if (!err) {
  601. rc = session_start_recv_body(session);
  602. }
  603. if (rc < 0) {
  604. if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
  605. remove_session_list(session);
  606. endpt_session_dec_ref(session);
  607. }
  608. }
  609. endpt_session_dec_ref(session);
  610. }
  611. static void on_pipe_recv_hdr(ioqueue_file_t *file,
  612. ioqueue_overlapped_t *overlapped,
  613. void *buf,
  614. unsigned int transfer_bytes,
  615. void *user_data,
  616. int err)
  617. {
  618. on_recv_hdr((endpt_session_t*)user_data, err);
  619. }
  620. static void on_tcp_recv_hdr(ioqueue_tcpsock_t *tcpsock,
  621. ioqueue_overlapped_t *overlapped,
  622. void *buf,
  623. unsigned int transfer_bytes,
  624. void *user_data,
  625. int err)
  626. {
  627. on_recv_hdr((endpt_session_t*)user_data, err);
  628. }
  629. static int session_start_recv_hdr(endpt_session_t *session)
  630. {
  631. int rc = -1;
  632. if (session->err < 0)
  633. return rc;
  634. endpt_session_inc_ref(session);
  635. if (session->type == TYPE_PIPE) {
  636. rc = ioqueue_file_async_readn(&session->pipe,
  637. &session->rx_overlapped,
  638. &session->rx_pending_pkt_len,
  639. 4,
  640. &on_pipe_recv_hdr,
  641. session);
  642. } else if (session->type == TYPE_TCP) {
  643. rc = ioqueue_tcpsock_async_recvn(&session->tcp,
  644. &session->rx_overlapped,
  645. &session->rx_pending_pkt_len,
  646. 4,
  647. &on_tcp_recv_hdr,
  648. session);
  649. }
  650. if (rc < 0)
  651. endpt_session_dec_ref(session);
  652. return rc;
  653. }
  654. static int on_msg(unsigned short msg_id, int param1, int param2)
  655. {
  656. if (msg_id == MSG_REMOVE_REGISTAR) {
  657. endpt_session_t *session = (endpt_session_t*)param1;
  658. remove_session_list(session);
  659. endpt_session_dec_ref(session);
  660. }
  661. return TRUE;
  662. }
  663. static int on_accept(daemon_accetpor_t *dacceptor, void *user_data, int fd, int err)
  664. {
  665. int idx = (int)user_data;
  666. if (!err) {
  667. endpt_session_t *session = create_session(dacceptor, dacceptor->type, fd);
  668. if (session) {
  669. int rc;
  670. add_unregistered_list(session);
  671. rc = session_start_recv_hdr(session);
  672. if (rc < 0) {
  673. remove_session_list(session);
  674. endpt_session_dec_ref(session);
  675. }
  676. }
  677. start_accept(dacceptor, idx);
  678. }
  679. daemon_accetpor_dec_ref(dacceptor);
  680. return 1;
  681. }
  682. static int on_pipe_accept_callback(ioqueue_pipe_acceptor_t *acceptor,
  683. ioqueue_overlapped_t *overlapped,
  684. HANDLE pipe,
  685. void *user_data,
  686. int err)
  687. {
  688. daemon_accetpor_t *dacceptor = CONTAINING_RECORD(acceptor, daemon_accetpor_t, pipe_acceptor);
  689. return on_accept(dacceptor, user_data, (int)pipe, err);
  690. }
  691. static int on_tcp_accept_callback(ioqueue_acceptor_t *acceptor,
  692. ioqueue_overlapped_t *overlapped,
  693. SOCKET in_sock,
  694. void *user_data,
  695. int err)
  696. {
  697. daemon_accetpor_t *dacceptor = CONTAINING_RECORD(acceptor, daemon_accetpor_t, tcp_acceptor);
  698. return on_accept(dacceptor, user_data, in_sock, err);
  699. }
  700. static int start_accept(daemon_accetpor_t *acceptor, int idx)
  701. {
  702. int rc = -1;
  703. daemon_accetpor_inc_ref(acceptor);
  704. if (acceptor->type == TYPE_PIPE) {
  705. ioqueue_overlapped_t *ov = ARRAY_IDX(acceptor->arr_ov, idx, ioqueue_overlapped_t*);
  706. rc = ioqueue_pipe_acceptor_async_accept(&acceptor->pipe_acceptor, ov, &on_pipe_accept_callback, (void*)idx);
  707. } else if (acceptor->type == TYPE_TCP) {
  708. ioqueue_overlapped_t *ov = ARRAY_IDX(acceptor->arr_ov, idx, ioqueue_overlapped_t*);
  709. rc = ioqueue_acceptor_async_accept(&acceptor->tcp_acceptor, ov, &on_tcp_accept_callback, (void*)idx);
  710. } else {
  711. TOOLKIT_ASSERT(0);
  712. }
  713. if (rc < 0)
  714. daemon_accetpor_dec_ref(acceptor);
  715. return rc;
  716. }
  717. static daemon_accetpor_t* create_daemon_acceptor(bus_daemon_t *daemon, char *url)
  718. {
  719. url_fields uf;
  720. int kk;
  721. int rc;
  722. daemon_accetpor_t *dacceptor = NULL;
  723. dacceptor = MALLOC_T(daemon_accetpor_t);
  724. if (url_parse(url, &uf) == 0) {
  725. if (_stricmp(uf.scheme, "tcp") == 0) {
  726. rc = ioqueue_acceptor_create(daemon->ioq, uf.host, uf.port, &dacceptor->tcp_acceptor);
  727. if (rc < 0) {
  728. free(dacceptor);
  729. url_free_fields(&uf);
  730. return NULL;
  731. }
  732. ioqueue_acceptor_listen(&dacceptor->tcp_acceptor, 10);
  733. dacceptor->type = TYPE_TCP;
  734. } else if (_stricmp(uf.scheme, "pipe") == 0) {
  735. rc = ioqueue_pipe_acceptor_create(daemon->ioq, uf.host, &dacceptor->tcp_acceptor);
  736. if (rc < 0) {
  737. free(dacceptor);
  738. url_free_fields(&uf);
  739. return NULL;
  740. }
  741. dacceptor->type = TYPE_PIPE;
  742. }
  743. url_free_fields(&uf);
  744. }
  745. dacceptor->daemon = daemon;
  746. REF_COUNT_INIT(&dacceptor->ref_cnt);
  747. dacceptor->arr_ov = array_make(DEFAULT_ACCEPT_OP_COUNT, sizeof(ioqueue_overlapped_t*));
  748. for (kk = 0; kk < DEFAULT_ACCEPT_OP_COUNT; ++kk)
  749. ARRAY_PUSH(dacceptor->arr_ov, ioqueue_overlapped_t*) = MALLOC_T(ioqueue_overlapped_t);
  750. return dacceptor;
  751. }
  752. static void destroy_daemon_acceptor(daemon_accetpor_t *dacceptor)
  753. {
  754. int i;
  755. for (i = 0; i < dacceptor->arr_ov->nelts; ++i)
  756. free(ARRAY_IDX(dacceptor->arr_ov, i, ioqueue_overlapped_t*));
  757. array_free(dacceptor->arr_ov);
  758. if (dacceptor->type == TYPE_PIPE) {
  759. ioqueue_pipe_acceptor_destroy(&dacceptor->pipe_acceptor);
  760. } else if (dacceptor->type == TYPE_TCP) {
  761. ioqueue_acceptor_destroy(&dacceptor->tcp_acceptor);
  762. }
  763. free(dacceptor);
  764. }
  765. IMPLEMENT_REF_COUNT_MT_STATIC(daemon_accetpor, daemon_accetpor_t, ref_cnt, destroy_daemon_acceptor)
  766. TOOLKIT_API int bus_daemon_create(int n_url, const char *urls[], int nthread, bus_daemon_t **p_daemon)
  767. {
  768. bus_daemon_t *daemon;
  769. int i;
  770. if (n_url == 0)
  771. return -1;
  772. if (nthread <= 0) {
  773. SYSTEM_INFO si;
  774. GetSystemInfo(&si);
  775. nthread = min(MAX_THREADS, si.dwNumberOfProcessors << 1);
  776. }
  777. WLog_DBG(TAG, "thread num: %d", nthread);
  778. daemon = MALLOC_T(bus_daemon_t);
  779. if (daemon == NULL)
  780. return -1;
  781. memset(daemon, 0, sizeof(bus_daemon_t));
  782. daemon->nthread = nthread;
  783. daemon->arr_acceptor = array_make(n_url, sizeof(daemon_accetpor_t*));
  784. daemon->arr_thread = array_make(nthread, sizeof(HANDLE));
  785. daemon->arr_uri = array_make(n_url, sizeof(char*));
  786. for (i = 0; i < n_url; ++i) {
  787. WLog_DBG(TAG, "urls[%d]: %s", i, urls[i]);
  788. ARRAY_PUSH(daemon->arr_uri, char*) = _strdup(urls[i]);
  789. }
  790. InitializeCriticalSection(&daemon->lock);
  791. INIT_LIST_HEAD(&daemon->registered_session_list);
  792. INIT_LIST_HEAD(&daemon->unregistered_session_list);
  793. *p_daemon = daemon;
  794. return 0;
  795. }
  796. TOOLKIT_API int bus_daemon_destroy(bus_daemon_t *daemon)
  797. {
  798. int i;
  799. DeleteCriticalSection(&daemon->lock);
  800. for (i = 0; i < daemon->arr_uri->nelts; ++i)
  801. free(ARRAY_IDX(daemon->arr_uri, i, char*));
  802. array_free(daemon->arr_uri);
  803. array_free(daemon->arr_acceptor);
  804. array_free(daemon->arr_thread);
  805. free(daemon);
  806. return 0;
  807. }
  808. TOOLKIT_API int bus_daemon_start(bus_daemon_t *daemon)
  809. {
  810. int i;
  811. daemon->ioq = ioqueue_create();
  812. if (!daemon->ioq) {
  813. WLog_ERR(TAG, "ioqueuq create failed!");
  814. return -1;
  815. }
  816. ioqueue_msg_add_handler(daemon->ioq, MSG_REMOVE_REGISTAR, 0, &on_msg);
  817. for (i = 0; i < daemon->arr_uri->nelts; ++i) {
  818. char *url = ARRAY_IDX(daemon->arr_uri, i, char*);
  819. daemon_accetpor_t *dacceptor = create_daemon_acceptor(daemon, url);
  820. if (dacceptor) {
  821. int kk;
  822. for (kk = 0; kk < DEFAULT_ACCEPT_OP_COUNT; ++kk) //数量5?
  823. start_accept(dacceptor, kk);
  824. ARRAY_PUSH(daemon->arr_acceptor, daemon_accetpor_t*) = dacceptor;
  825. } else {
  826. WLog_ERR(TAG, "create daemon acceptor failed!");
  827. return -1;
  828. }
  829. }
  830. daemon->lstop = 0;
  831. for (i = 0; i < daemon->nthread; ++i) {
  832. HANDLE thread = (HANDLE)_beginthreadex(NULL, 0, &thread_proc, daemon, 0, NULL);
  833. if (thread) {
  834. ARRAY_PUSH(daemon->arr_thread, HANDLE) = thread;
  835. } else {
  836. return -1;
  837. }
  838. }
  839. return 0;
  840. }
  841. TOOLKIT_API int bus_daemon_stop(bus_daemon_t *daemon)
  842. {
  843. int i;
  844. // exit all worker thread
  845. InterlockedExchange(&daemon->lstop, 1);
  846. for (i = 0; i < daemon->arr_thread->nelts; ++i) {
  847. HANDLE t = ARRAY_IDX(daemon->arr_thread, i, HANDLE);
  848. WaitForSingleObject(t, INFINITE);
  849. CloseHandle(t);
  850. }
  851. array_clear(daemon->arr_thread);
  852. // close all pending handles
  853. {
  854. endpt_session_t *pos;
  855. for (i = 0; i < daemon->arr_acceptor->nelts; ++i) {
  856. daemon_accetpor_t *dacceptor = ARRAY_IDX(daemon->arr_acceptor, i, daemon_accetpor_t*);
  857. if (dacceptor->type == TYPE_PIPE) {
  858. ioqueue_pipe_acceptor_close_pending_handle(&dacceptor->pipe_acceptor);
  859. } else if (dacceptor->type == TYPE_TCP) {
  860. ioqueue_acceptor_close(&dacceptor->tcp_acceptor);
  861. }
  862. daemon_accetpor_dec_ref(dacceptor);
  863. }
  864. array_clear(daemon->arr_acceptor);
  865. list_for_each_entry(pos, &daemon->registered_session_list, endpt_session_t, entry) {
  866. if (pos->type == TYPE_PIPE) {
  867. ioqueue_file_close(&pos->pipe);
  868. } else if (pos->type == TYPE_TCP) {
  869. ioqueue_tcpsock_close(&pos->tcp);
  870. }
  871. }
  872. list_for_each_entry(pos, &daemon->unregistered_session_list, endpt_session_t, entry) {
  873. if (pos->type == TYPE_PIPE) {
  874. ioqueue_file_close(&pos->pipe);
  875. } else if (pos->type == TYPE_TCP) {
  876. ioqueue_tcpsock_close(&pos->tcp);
  877. }
  878. }
  879. }
  880. // poll until all pending io are aborted
  881. while (!ioqueue_can_exit(daemon->ioq)) {
  882. ioqueue_poll(daemon->ioq, 10);
  883. }
  884. ioqueue_destroy(daemon->ioq);
  885. return 0;
  886. }