bus-win.c 25 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151
  1. #include "precompile.h"
  2. #include "bus.h"
  3. #include "sockutil.h"
  4. #include "url.h"
  5. #include "memutil.h"
  6. #include "spinlock.h"
  7. #include "list.h"
  8. #include "bus_internal.h"
  9. #include <winpr/file.h>
  10. #include <winpr/pipe.h>
  11. #include <winpr/synch.h>
  12. #include <winpr/string.h>
  13. #define BUS_RESULT_DATA 1 // ==BUS_TYPE_PACKET, callback: callback.on_pkt, no use
  14. #define BUS_RESULT_INFO 2 // ==BUS_TYPE_INFO, callback: callback.on_inf
  15. #define BUS_RESULT_EVENT 3 // ==BUS_TYPE_EVENT, callback: callback.on_evt , no use
  16. #define BUS_RESULT_SYSTEM 4 // ==BUS_TYPE_SYSTEM, callback: callback.on_sys
  17. #define BUS_RESULT_MSG 5 // send package msg, callback: callback.on_msg
  18. #define BUS_RESULT_UNKNOWN 6
  19. typedef struct msg_t {
  20. struct list_head entry;
  21. int type;
  22. int nparam;
  23. int* params;
  24. HANDLE evt;
  25. int evt_result;
  26. }msg_t;
  27. struct bus_endpt_t {
  28. int type;
  29. int epid;
  30. union {
  31. HANDLE pipe_handle;
  32. SOCKET sock_handle;
  33. };
  34. char* url;
  35. bus_endpt_callback callback;
  36. struct list_head msg_list;
  37. spinlock_t msg_lock;
  38. HANDLE msg_sem;
  39. HANDLE tx_evt;
  40. HANDLE rx_evt;
  41. OVERLAPPED rx_overlapped;
  42. int rx_pending;
  43. int rx_pending_pkt_len;
  44. iobuffer_queue_t* rx_buf_queue;
  45. volatile int quit_flag;
  46. };
  47. static void free_msg(msg_t* msg)
  48. {
  49. free(msg->params);
  50. free(msg);
  51. }
  52. static int to_result(int pkt_type)
  53. {
  54. switch (pkt_type) {
  55. case BUS_TYPE_EVENT:
  56. return BUS_RESULT_EVENT;
  57. case BUS_TYPE_SYSTEM:
  58. return BUS_RESULT_SYSTEM;
  59. case BUS_TYPE_PACKET:
  60. return BUS_RESULT_DATA;
  61. case BUS_TYPE_INFO:
  62. return BUS_RESULT_INFO;
  63. default:
  64. break;
  65. }
  66. return BUS_RESULT_UNKNOWN;
  67. }
  68. static HANDLE create_pipe_handle(const char* name)
  69. {
  70. char tmp[MAX_PATH];
  71. HANDLE pipe = INVALID_HANDLE_VALUE;
  72. sprintf(tmp, "\\\\.\\pipe\\%s", name);
  73. for (;;) {
  74. pipe = CreateFileA(tmp,
  75. GENERIC_READ | GENERIC_WRITE,
  76. 0,
  77. NULL,
  78. OPEN_EXISTING,
  79. FILE_FLAG_OVERLAPPED,
  80. NULL);
  81. if (pipe == INVALID_HANDLE_VALUE) {
  82. if (GetLastError() == ERROR_PIPE_BUSY) {
  83. if (WaitNamedPipeA(name, 20000))
  84. continue;
  85. }
  86. }
  87. break;
  88. }
  89. return pipe;
  90. }
  91. static SOCKET create_socket_handle(const char* ip, int port)
  92. {
  93. SOCKET fd;
  94. struct sockaddr_in addr;
  95. fd = WSASocketA(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
  96. if (fd == INVALID_SOCKET)
  97. return fd;
  98. {
  99. BOOL f = TRUE;
  100. u_long l = TRUE;
  101. setsockopt(fd, SOL_SOCKET, SO_DONTLINGER, (char*)&f, sizeof(f));
  102. //setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&f, sizeof(f));
  103. _ioctlsocket(fd, FIONBIO, &l);
  104. }
  105. memset(&addr, 0, sizeof(addr));
  106. addr.sin_family = AF_INET;
  107. addr.sin_port = htons(port);
  108. addr.sin_addr.s_addr = inet_addr(ip);
  109. if (connect(fd, (struct sockaddr*) & addr, sizeof(addr)) != 0) {
  110. if (WSAGetLastError() == WSAEWOULDBLOCK) {
  111. fd_set wr_set, ex_set;
  112. FD_ZERO(&wr_set);
  113. FD_ZERO(&ex_set);
  114. FD_SET(fd, &wr_set);
  115. FD_SET(fd, &ex_set);
  116. if (select(fd, NULL, &wr_set, &ex_set, NULL) > 0 && FD_ISSET(fd, &wr_set))
  117. return fd;
  118. }
  119. }
  120. if (fd != INVALID_SOCKET)
  121. closesocket(fd);
  122. //return INVALID_SOCKET;//{bug}
  123. return fd;
  124. }
  125. static int tcp_send_buf(bus_endpt_t* endpt, const char* buf, int n)
  126. {
  127. DWORD left = n;
  128. DWORD offset = 0;
  129. while (left > 0) {
  130. BOOL ret;
  131. WSABUF wsabuf;
  132. DWORD dwBytesTransfer;
  133. OVERLAPPED overlapped;
  134. memset(&overlapped, 0, sizeof(overlapped));
  135. overlapped.hEvent = endpt->tx_evt;
  136. ResetEvent(endpt->tx_evt);
  137. wsabuf.buf = (char*)buf + offset;
  138. wsabuf.len = left;
  139. ret = WSASend(endpt->sock_handle, &wsabuf, 1, &dwBytesTransfer, 0, &overlapped, NULL) == 0;
  140. if (!ret && WSAGetLastError() == WSA_IO_PENDING) {
  141. DWORD dwFlags = 0;
  142. ret = WSAGetOverlappedResult(endpt->sock_handle, &overlapped, &dwBytesTransfer, TRUE, &dwFlags);
  143. }
  144. if (ret && dwBytesTransfer) {
  145. offset += dwBytesTransfer;
  146. left -= dwBytesTransfer;
  147. }
  148. else {
  149. return -1;
  150. }
  151. }
  152. return 0;
  153. }
  154. static int pipe_send_buf(bus_endpt_t* endpt, const char* buf, int n)
  155. {
  156. DWORD left = n;
  157. DWORD offset = 0;
  158. while (left > 0) {
  159. BOOL ret;
  160. DWORD dwBytesTransfer;
  161. OVERLAPPED overlapped;
  162. memset(&overlapped, 0, sizeof(overlapped));
  163. overlapped.hEvent = endpt->tx_evt;
  164. ResetEvent(endpt->tx_evt);
  165. ret = WriteFile(endpt->pipe_handle, buf + offset, left, &dwBytesTransfer, &overlapped);
  166. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  167. ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE);
  168. }
  169. if (ret && dwBytesTransfer) {
  170. offset += dwBytesTransfer;
  171. left -= dwBytesTransfer;
  172. }
  173. else {
  174. return -1;
  175. }
  176. }
  177. return 0;
  178. }
  179. static int send_buf(bus_endpt_t* endpt, const char* buf, int n)
  180. {
  181. if (endpt->type == TYPE_PIPE) {
  182. return pipe_send_buf(endpt, buf, n);
  183. }
  184. else if (endpt->type == TYPE_TCP) {
  185. return tcp_send_buf(endpt, buf, n);
  186. }
  187. else {
  188. return -1;
  189. }
  190. }
  191. static int send_pkt_raw(bus_endpt_t* endpt, iobuffer_t* pkt)
  192. {
  193. int pkt_len = iobuffer_get_length(pkt);
  194. int rc;
  195. iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_len, 0);
  196. rc = send_buf(endpt, iobuffer_data(pkt, 0), iobuffer_get_length(pkt));
  197. return rc;
  198. }
  199. static int pipe_recv_buf(bus_endpt_t* endpt, char* buf, DWORD n)
  200. {
  201. DWORD left = n;
  202. DWORD offset = 0;
  203. while (left > 0) {
  204. BOOL ret;
  205. DWORD dwBytesTransfer;
  206. OVERLAPPED overlapped;
  207. memset(&overlapped, 0, sizeof(overlapped));
  208. overlapped.hEvent = endpt->rx_evt;
  209. ResetEvent(overlapped.hEvent);
  210. ret = ReadFile(endpt->pipe_handle, buf + offset, left, &dwBytesTransfer, &overlapped);
  211. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  212. ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE);
  213. }
  214. if (ret && dwBytesTransfer) {
  215. offset += dwBytesTransfer;
  216. left -= dwBytesTransfer;
  217. }
  218. else {
  219. return -1;
  220. }
  221. }
  222. return 0;
  223. }
  224. static int tcp_recv_buf(bus_endpt_t* endpt, char* buf, DWORD n)
  225. {
  226. DWORD left = n;
  227. DWORD offset = 0;
  228. while (left > 0) {
  229. BOOL ret;
  230. DWORD dwFlags = 0;
  231. WSABUF wsabuf;
  232. DWORD dwBytesTransfer;
  233. OVERLAPPED overlapped;
  234. memset(&overlapped, 0, sizeof(overlapped));
  235. overlapped.hEvent = endpt->rx_evt;
  236. wsabuf.buf = buf + offset;
  237. wsabuf.len = left;
  238. ResetEvent(overlapped.hEvent);
  239. ret = WSARecv(endpt->sock_handle, &wsabuf, 1, &dwBytesTransfer, &dwFlags, &overlapped, NULL) == 0;
  240. if (!ret && WSAGetLastError() == WSA_IO_PENDING) {
  241. ret = WSAGetOverlappedResult(endpt->sock_handle, &overlapped, &dwBytesTransfer, TRUE, &dwFlags);
  242. }
  243. if (ret && dwBytesTransfer) {
  244. offset += dwBytesTransfer;
  245. left -= dwBytesTransfer;
  246. }
  247. else {
  248. return -1;
  249. }
  250. }
  251. return 0;
  252. }
  253. static int recv_buf(bus_endpt_t* endpt, char* buf, DWORD n)
  254. {
  255. if (endpt->type == TYPE_PIPE) {
  256. return pipe_recv_buf(endpt, buf, n);
  257. }
  258. else if (endpt->type == TYPE_TCP) {
  259. return tcp_recv_buf(endpt, buf, n);
  260. }
  261. else {
  262. return -1;
  263. }
  264. }
  265. static int recv_pkt_raw(bus_endpt_t* endpt, iobuffer_t** pkt)
  266. {
  267. int pkt_len;
  268. int rc = -1;
  269. rc = recv_buf(endpt, (char*)&pkt_len, 4);
  270. if (rc != 0)
  271. return rc;
  272. *pkt = iobuffer_create(-1, pkt_len);
  273. iobuffer_push_count(*pkt, pkt_len);
  274. if (pkt_len > 0) {
  275. rc = recv_buf(endpt, iobuffer_data(*pkt, 0), pkt_len);
  276. }
  277. if (rc < 0) {
  278. iobuffer_destroy(*pkt);
  279. *pkt = NULL;
  280. }
  281. return rc;
  282. }
  283. static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
  284. {
  285. DWORD dwBytesTransferred;
  286. BOOL ret;
  287. int rc = 0;
  288. iobuffer_t* pkt = NULL;
  289. *p_pkt = NULL;
  290. ResetEvent(endpt->rx_evt);
  291. memset(&endpt->rx_overlapped, 0, sizeof(OVERLAPPED));
  292. endpt->rx_overlapped.hEvent = endpt->rx_evt;
  293. if (endpt->type == TYPE_PIPE) {
  294. ret = ReadFile(endpt->pipe_handle,
  295. &endpt->rx_pending_pkt_len,
  296. 4,
  297. &dwBytesTransferred,
  298. &endpt->rx_overlapped);
  299. }
  300. else if (endpt->type == TYPE_TCP) {
  301. DWORD dwFlags = 0;
  302. WSABUF wsabuf;
  303. wsabuf.buf = (char*)&endpt->rx_pending_pkt_len;
  304. wsabuf.len = 4;
  305. ret = WSARecv(endpt->sock_handle,
  306. &wsabuf,
  307. 1,
  308. &dwBytesTransferred,
  309. &dwFlags,
  310. &endpt->rx_overlapped,
  311. NULL) == 0;
  312. }
  313. else {
  314. return -1;
  315. }
  316. if (ret) {
  317. if (dwBytesTransferred == 0)
  318. return -1;
  319. if (dwBytesTransferred < 4) {
  320. rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len + dwBytesTransferred, 4 - dwBytesTransferred);
  321. if (rc < 0)
  322. return rc;
  323. }
  324. pkt = iobuffer_create(0, endpt->rx_pending_pkt_len);
  325. if (endpt->rx_pending_pkt_len > 0) {
  326. rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len);
  327. if (rc < 0) {
  328. iobuffer_destroy(pkt);
  329. return rc;
  330. }
  331. iobuffer_push_count(pkt, endpt->rx_pending_pkt_len);
  332. }
  333. *p_pkt = pkt;
  334. }
  335. else {
  336. if (WSAGetLastError() == WSA_IO_PENDING) {
  337. endpt->rx_pending = 1;
  338. }
  339. else {
  340. return -1;
  341. }
  342. }
  343. return 0;
  344. }
  345. static int read_left_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
  346. {
  347. BOOL ret;
  348. int rc;
  349. DWORD dwBytesTransferred;
  350. iobuffer_t* pkt = NULL;
  351. if (endpt->type == TYPE_PIPE) {
  352. ret = GetOverlappedResult(endpt->pipe_handle, &endpt->rx_overlapped, &dwBytesTransferred, TRUE);
  353. }
  354. else if (endpt->type == TYPE_TCP) {
  355. DWORD dwFlags = 0;
  356. ret = WSAGetOverlappedResult(endpt->sock_handle, &endpt->rx_overlapped, &dwBytesTransferred, TRUE, &dwFlags);
  357. }
  358. if (!ret || dwBytesTransferred == 0) {
  359. DWORD dwError = GetLastError();
  360. return -1;
  361. }
  362. if (dwBytesTransferred < 4) {
  363. rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len + dwBytesTransferred, 4 - dwBytesTransferred);
  364. if (rc < 0)
  365. return rc;
  366. }
  367. pkt = iobuffer_create(-1, endpt->rx_pending_pkt_len);
  368. rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len);
  369. if (rc < 0) {
  370. iobuffer_destroy(pkt);
  371. return rc;
  372. }
  373. iobuffer_push_count(pkt, endpt->rx_pending_pkt_len);
  374. *p_pkt = pkt;
  375. endpt->rx_pending = 0;
  376. return 0;
  377. }
  378. static int append_rx_pkt(bus_endpt_t* endpt, iobuffer_t* pkt)
  379. {
  380. int type;
  381. int read_state;
  382. read_state = iobuffer_get_read_state(pkt);
  383. iobuffer_read(pkt, IOBUF_T_I4, &type, 0);
  384. iobuffer_restore_read_state(pkt, read_state);
  385. if (type == BUS_TYPE_PACKET || type == BUS_TYPE_INFO || type == BUS_TYPE_EVENT || type == BUS_TYPE_SYSTEM) {
  386. iobuffer_queue_enqueue(endpt->rx_buf_queue, pkt);
  387. return 1;
  388. }
  389. else {
  390. return -1;
  391. }
  392. }
  393. TOOLKIT_API int bus_endpt_create(const char* url, int epid, const bus_endpt_callback* callback, bus_endpt_t** p_endpt)
  394. {
  395. bus_endpt_t* endpt = NULL;
  396. char* tmp_url;
  397. url_fields uf;
  398. int rc;
  399. int v;
  400. iobuffer_t* buf = NULL;
  401. iobuffer_t* ans_buf = NULL;
  402. if (!url)
  403. return -1;
  404. tmp_url = _strdup(url);
  405. if (url_parse(tmp_url, &uf) < 0) {
  406. free(tmp_url);
  407. return -1;
  408. }
  409. endpt = ZALLOC_T(bus_endpt_t);
  410. endpt->sock_handle = -1;
  411. endpt->url = tmp_url;
  412. if (_stricmp(uf.scheme, "tcp") == 0) {
  413. endpt->type = TYPE_TCP;
  414. endpt->sock_handle = create_socket_handle(uf.host, uf.port);
  415. if (endpt->sock_handle == INVALID_SOCKET)
  416. goto on_error;
  417. }
  418. else if (_stricmp(uf.scheme, "pipe") == 0) {
  419. endpt->type = TYPE_PIPE;
  420. endpt->pipe_handle = create_pipe_handle(uf.host);
  421. if (endpt->pipe_handle == INVALID_HANDLE_VALUE)
  422. goto on_error;
  423. }
  424. else {
  425. goto on_error;
  426. }
  427. endpt->epid = epid;
  428. endpt->tx_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  429. endpt->rx_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  430. endpt->rx_buf_queue = iobuffer_queue_create();
  431. endpt->msg_sem = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL);
  432. INIT_LIST_HEAD(&endpt->msg_list);
  433. spinlock_init(&endpt->msg_lock);
  434. memcpy(&endpt->callback, callback, sizeof(bus_endpt_callback));
  435. buf = iobuffer_create(-1, -1);
  436. v = BUS_TYPE_ENDPT_REGISTER;
  437. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  438. v = endpt->epid;
  439. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  440. rc = send_pkt_raw(endpt, buf);
  441. if (rc != 0)
  442. goto on_error;
  443. rc = recv_pkt_raw(endpt, &ans_buf);
  444. if (rc != 0) {
  445. DWORD dwError = GetLastError();
  446. goto on_error;
  447. }
  448. iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0);
  449. iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0);
  450. if (rc != 0)
  451. goto on_error;
  452. url_free_fields(&uf);
  453. if (buf)
  454. iobuffer_destroy(buf);
  455. if (ans_buf)
  456. iobuffer_destroy(ans_buf);
  457. *p_endpt = endpt;
  458. return 0;
  459. on_error:
  460. if (endpt->type == TYPE_TCP) {
  461. closesocket(endpt->sock_handle);
  462. }
  463. else if (endpt->type == TYPE_PIPE) {
  464. CloseHandle(endpt->pipe_handle);
  465. }
  466. if (endpt->msg_sem)
  467. CloseHandle(endpt->msg_sem);
  468. if (endpt->tx_evt)
  469. CloseHandle(endpt->tx_evt);
  470. if (endpt->rx_evt)
  471. CloseHandle(endpt->rx_evt);
  472. if (endpt->rx_buf_queue)
  473. iobuffer_queue_destroy(endpt->rx_buf_queue);
  474. if (endpt->url)
  475. free(endpt->url);
  476. free(endpt);
  477. url_free_fields(&uf);
  478. if (buf)
  479. iobuffer_destroy(buf);
  480. if (ans_buf)
  481. iobuffer_destroy(ans_buf);
  482. return -1;
  483. }
  484. TOOLKIT_API void bus_endpt_destroy(bus_endpt_t* endpt)
  485. {
  486. int rc = -1;
  487. iobuffer_t* buf = NULL;
  488. iobuffer_t* ans_buf = NULL;
  489. int v;
  490. assert(endpt);
  491. buf = iobuffer_create(-1, -1);
  492. v = BUS_TYPE_ENDPT_UNREGISTER;
  493. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  494. v = endpt->epid;
  495. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  496. rc = send_pkt_raw(endpt, buf);
  497. if (rc != 0)
  498. goto on_error;
  499. rc = recv_pkt_raw(endpt, &ans_buf);
  500. if (rc != 0)
  501. goto on_error;
  502. iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0);
  503. iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0);
  504. {
  505. msg_t* msg, * t;
  506. list_for_each_entry_safe(msg, t, &endpt->msg_list, msg_t, entry) {
  507. list_del(&msg->entry);
  508. if (msg->evt)
  509. CloseHandle(msg->evt);
  510. free(msg);
  511. }
  512. }
  513. on_error:
  514. if (buf)
  515. iobuffer_destroy(buf);
  516. if (ans_buf)
  517. iobuffer_destroy(ans_buf);
  518. if (endpt->type == TYPE_TCP) {
  519. closesocket(endpt->sock_handle);
  520. }
  521. else if (endpt->type == TYPE_PIPE) {
  522. CloseHandle(endpt->pipe_handle);
  523. }
  524. if (endpt->msg_sem)
  525. CloseHandle(endpt->msg_sem);
  526. if (endpt->tx_evt)
  527. CloseHandle(endpt->tx_evt);
  528. if (endpt->rx_evt)
  529. CloseHandle(endpt->rx_evt);
  530. if (endpt->rx_buf_queue)
  531. iobuffer_queue_destroy(endpt->rx_buf_queue);
  532. if (endpt->url)
  533. free(endpt->url);
  534. free(endpt);
  535. }
  536. // 1 : recv ok
  537. // 0 : time out
  538. // <0 : error
  539. static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
  540. {
  541. iobuffer_t* pkt = NULL;
  542. int rc;
  543. BOOL ret;
  544. assert(endpt);
  545. // peek first packge type
  546. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  547. iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  548. int pkt_type;
  549. int read_state = iobuffer_get_read_state(pkt);
  550. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, NULL);
  551. iobuffer_restore_read_state(pkt, read_state);
  552. *result = to_result(pkt_type);
  553. if (*result == BUS_RESULT_UNKNOWN) {
  554. OutputDebugStringA("bug: unknown pkt type!\n");
  555. return -1;
  556. }
  557. return 1;
  558. }
  559. // no received package, try to receive one
  560. if (!endpt->rx_pending) {
  561. rc = start_read_pkt(endpt, &pkt);
  562. if (rc < 0)
  563. return rc;
  564. if (pkt) {
  565. OutputDebugStringA("pkt has read\n");
  566. rc = append_rx_pkt(endpt, pkt);
  567. if (rc < 0) {
  568. iobuffer_destroy(pkt);
  569. return -1;
  570. }
  571. }
  572. else {
  573. OutputDebugStringA("pending\n");
  574. }
  575. }
  576. // if receive is pending, wait for send or receive complete event
  577. if (!pkt) {
  578. HANDLE hs[] = { endpt->msg_sem, endpt->rx_evt };
  579. ret = WaitForMultipleObjects(array_size(hs), &hs[0], FALSE, (DWORD)timeout);
  580. if (ret == WAIT_TIMEOUT) {
  581. return 0;
  582. }
  583. else if (ret == WAIT_OBJECT_0) {
  584. *result = BUS_RESULT_MSG; // indicate send package event
  585. return 1;
  586. }
  587. else if (ret == WAIT_OBJECT_0 + 1) {
  588. rc = read_left_pkt(endpt, &pkt);
  589. if (rc < 0)
  590. return rc;
  591. if (pkt) {
  592. rc = append_rx_pkt(endpt, pkt);
  593. if (rc < 0) {
  594. iobuffer_destroy(pkt);
  595. return -1;
  596. }
  597. }
  598. }
  599. }
  600. else {
  601. OutputDebugStringA("pkt has readed\n");
  602. }
  603. if (pkt) {
  604. int type;
  605. int read_state = iobuffer_get_read_state(pkt);
  606. iobuffer_read(pkt, IOBUF_T_I4, &type, 0);
  607. iobuffer_restore_read_state(pkt, read_state);
  608. *result = to_result(type);
  609. if (*result == BUS_RESULT_UNKNOWN) {
  610. OutputDebugStringA("bug: unknown pkt type!\n");
  611. return -1;
  612. }
  613. }
  614. else {
  615. return -1;
  616. }
  617. return 1;
  618. }
  619. static int recv_until(bus_endpt_t* endpt, int type, iobuffer_t** p_ansbuf)
  620. {
  621. int rc;
  622. iobuffer_t* ans_pkt = NULL;
  623. int ans_type;
  624. for (;;) {
  625. if (!endpt->rx_pending) {
  626. rc = start_read_pkt(endpt, &ans_pkt);
  627. if (rc < 0) {
  628. DWORD dwError = WSAGetLastError();
  629. break;
  630. }
  631. }
  632. if (!ans_pkt) {
  633. DWORD ret = WaitForSingleObject(endpt->rx_evt, INFINITE);
  634. if (ret != WAIT_OBJECT_0)
  635. return -1;
  636. rc = read_left_pkt(endpt, &ans_pkt);
  637. if (rc < 0)
  638. break;
  639. }
  640. if (ans_pkt) {
  641. int read_state = iobuffer_get_read_state(ans_pkt);
  642. iobuffer_read(ans_pkt, IOBUF_T_I4, &ans_type, 0);
  643. iobuffer_restore_read_state(ans_pkt, read_state);
  644. if (ans_type == type) {
  645. *p_ansbuf = ans_pkt;
  646. break;
  647. }
  648. else {
  649. rc = append_rx_pkt(endpt, ans_pkt);
  650. if (rc < 0) {
  651. iobuffer_destroy(ans_pkt);
  652. break;
  653. }
  654. else {
  655. ans_pkt = NULL;
  656. }
  657. }
  658. }
  659. }
  660. return rc;
  661. }
  662. static int recv_until_result(bus_endpt_t* endpt, int* p_result)
  663. {
  664. int rc;
  665. iobuffer_t* ans_pkt = NULL;
  666. int type, error;
  667. rc = recv_until(endpt, BUS_TYPE_ERROR, &ans_pkt);
  668. if (rc < 0)
  669. return rc;
  670. iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0);
  671. iobuffer_read(ans_pkt, IOBUF_T_I4, &error, 0);
  672. iobuffer_destroy(ans_pkt);
  673. *p_result = error;
  674. return rc;
  675. }
  676. static int recv_until_state(bus_endpt_t* endpt, int* p_state)
  677. {
  678. int rc;
  679. iobuffer_t* ans_pkt = NULL;
  680. int type, epid, state;
  681. rc = recv_until(endpt, BUS_TYPE_ENDPT_GET_STATE, &ans_pkt);
  682. if (rc < 0)
  683. return rc;
  684. iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0);
  685. iobuffer_read(ans_pkt, IOBUF_T_I4, &epid, 0);
  686. iobuffer_read(ans_pkt, IOBUF_T_I4, &state, 0);
  687. iobuffer_destroy(ans_pkt);
  688. *p_state = state;
  689. return rc;
  690. }
  691. TOOLKIT_API int bus_endpt_send_pkt(bus_endpt_t* endpt, int epid, int type, iobuffer_t* pkt)
  692. {
  693. int t;
  694. int rc;
  695. int read_state;
  696. int write_state;
  697. int error;
  698. assert(endpt);
  699. read_state = iobuffer_get_read_state(pkt);
  700. write_state = iobuffer_get_write_state(pkt);
  701. t = epid; // remote epid
  702. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  703. t = endpt->epid; // local epid
  704. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  705. t = type; // user type
  706. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  707. t = BUS_TYPE_PACKET; // type
  708. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  709. rc = send_pkt_raw(endpt, pkt);
  710. iobuffer_restore_read_state(pkt, read_state);
  711. iobuffer_restore_write_state(pkt, write_state);
  712. if (rc < 0)
  713. return rc;
  714. rc = recv_until_result(endpt, &error);
  715. if (rc == 0 && error != 0)
  716. rc = error;
  717. return rc;
  718. }
  719. TOOLKIT_API int bus_endpt_send_info(bus_endpt_t* endpt, int epid, int type, iobuffer_t* pkt)
  720. {
  721. int t;
  722. int rc;
  723. int read_state;
  724. int write_state;
  725. assert(endpt);
  726. read_state = iobuffer_get_read_state(pkt);
  727. write_state = iobuffer_get_write_state(pkt);
  728. t = epid; // remote epid
  729. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  730. t = endpt->epid; // local epid
  731. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  732. t = type; // user type
  733. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  734. t = BUS_TYPE_INFO; // type
  735. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  736. rc = send_pkt_raw(endpt, pkt);
  737. iobuffer_restore_read_state(pkt, read_state);
  738. iobuffer_restore_write_state(pkt, write_state);
  739. return rc;
  740. }
  741. TOOLKIT_API int bus_endpt_bcast_evt(bus_endpt_t* endpt, int type, iobuffer_t* pkt)
  742. {
  743. int t;
  744. int rc;
  745. int read_state;
  746. int write_state;
  747. assert(endpt);
  748. read_state = iobuffer_get_read_state(pkt);
  749. write_state = iobuffer_get_write_state(pkt);
  750. t = endpt->epid;
  751. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  752. t = type;
  753. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  754. t = BUS_TYPE_EVENT;
  755. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  756. rc = send_pkt_raw(endpt, pkt);
  757. iobuffer_restore_read_state(pkt, read_state);
  758. iobuffer_restore_write_state(pkt, write_state);
  759. return rc;
  760. }
  761. static int bus_endpt_recv_pkt(bus_endpt_t* endpt, int* p_epid, int* p_type, iobuffer_t** p_pkt)
  762. {
  763. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  764. iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  765. int read_state = iobuffer_get_read_state(pkt);
  766. int pkt_type, usr_type, from_epid, to_epid;
  767. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  768. if (pkt_type == BUS_TYPE_PACKET || pkt_type == BUS_TYPE_INFO) {
  769. iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0);
  770. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  771. iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0);
  772. if (p_epid)
  773. *p_epid = from_epid;
  774. if (p_type)
  775. *p_type = usr_type;
  776. iobuffer_queue_deque(endpt->rx_buf_queue);
  777. if (p_pkt) {
  778. *p_pkt = pkt;
  779. }
  780. else {
  781. iobuffer_destroy(pkt);
  782. }
  783. return 0;
  784. }
  785. else {
  786. iobuffer_restore_read_state(pkt, read_state);
  787. }
  788. }
  789. return -1;
  790. }
  791. static int bus_endpt_recv_evt(bus_endpt_t* endpt, int* p_epid, int* p_type, iobuffer_t** p_pkt)
  792. {
  793. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  794. iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  795. int read_state = iobuffer_get_read_state(pkt);
  796. int pkt_type, usr_type, from_epid;
  797. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  798. if (pkt_type == BUS_TYPE_EVENT) {
  799. iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0);
  800. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  801. if (p_epid)
  802. *p_epid = from_epid;
  803. if (p_type)
  804. *p_type = usr_type;
  805. iobuffer_queue_deque(endpt->rx_buf_queue);
  806. if (p_pkt) {
  807. *p_pkt = pkt;
  808. }
  809. else {
  810. iobuffer_destroy(pkt);
  811. }
  812. return 0;
  813. }
  814. else {
  815. iobuffer_restore_read_state(pkt, read_state);
  816. }
  817. }
  818. return -1;
  819. }
  820. static int bus_endpt_recv_sys(bus_endpt_t* endpt, int* p_epid, int* p_state)
  821. {
  822. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  823. iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  824. int read_state = iobuffer_get_read_state(pkt);
  825. int pkt_type, epid, state;
  826. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  827. if (pkt_type == BUS_TYPE_SYSTEM) {
  828. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  829. iobuffer_read(pkt, IOBUF_T_I4, &state, 0);
  830. if (p_epid)
  831. *p_epid = epid;
  832. if (p_state)
  833. *p_state = state;
  834. iobuffer_queue_deque(endpt->rx_buf_queue);
  835. iobuffer_destroy(pkt);
  836. return 0;
  837. }
  838. else {
  839. iobuffer_restore_read_state(pkt, read_state);
  840. }
  841. }
  842. return -1;
  843. }
  844. static int bus_endpt_recv_msg(bus_endpt_t* endpt, msg_t** p_msg)
  845. {
  846. int rc = -1;
  847. assert(endpt);
  848. assert(p_msg);
  849. spinlock_enter(&endpt->msg_lock, -1);
  850. if (!list_empty(&endpt->msg_list)) {
  851. msg_t* e = list_first_entry(&endpt->msg_list, msg_t, entry);
  852. list_del(&e->entry);
  853. rc = 0;
  854. *p_msg = e;
  855. }
  856. spinlock_leave(&endpt->msg_lock);
  857. return rc;
  858. }
  859. TOOLKIT_API int bus_endpt_get_state(bus_endpt_t* endpt, int epid, int* p_state)
  860. {
  861. iobuffer_t* buf = NULL;
  862. int v;
  863. int rc = -1;
  864. assert(endpt);
  865. buf = iobuffer_create(-1, -1);
  866. v = BUS_TYPE_ENDPT_GET_STATE;
  867. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  868. v = epid;
  869. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  870. rc = send_pkt_raw(endpt, buf);
  871. if (rc < 0)
  872. goto on_error;
  873. rc = recv_until_state(endpt, p_state);
  874. on_error:
  875. if (buf)
  876. iobuffer_destroy(buf);
  877. return rc;
  878. }
  879. TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t* endpt, int msg, int nparam, int params[])
  880. {
  881. msg_t* e;
  882. assert(endpt);
  883. e = MALLOC_T(msg_t);
  884. e->type = msg;
  885. e->nparam = nparam;
  886. if (nparam) {
  887. e->params = (int*)malloc(sizeof(int) * nparam);
  888. memcpy(e->params, params, sizeof(int) * nparam);
  889. }
  890. else {
  891. e->params = NULL;
  892. }
  893. e->evt = NULL;
  894. spinlock_enter(&endpt->msg_lock, -1);
  895. list_add_tail(&e->entry, &endpt->msg_list);
  896. spinlock_leave(&endpt->msg_lock);
  897. ReleaseSemaphore(endpt->msg_sem, 1, NULL);
  898. return 0;
  899. }
  900. TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t* endpt, int msg, int nparam, int params[])
  901. {
  902. msg_t e;
  903. assert(endpt);
  904. e.type = msg;
  905. e.nparam = nparam;
  906. if (nparam) {
  907. e.params = (int*)malloc(sizeof(int) * nparam);
  908. memcpy(e.params, params, sizeof(int) * nparam);
  909. }
  910. else {
  911. e.params = NULL;
  912. }
  913. e.evt_result = 0;
  914. e.evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  915. spinlock_enter(&endpt->msg_lock, -1);
  916. list_add_tail(&e.entry, &endpt->msg_list);
  917. spinlock_leave(&endpt->msg_lock);
  918. ReleaseSemaphore(endpt->msg_sem, 1, NULL);
  919. WaitForSingleObject(e.evt, INFINITE);
  920. CloseHandle(e.evt);
  921. if (nparam) {
  922. free(e.params);
  923. }
  924. return e.evt_result;
  925. }
  926. TOOLKIT_API int bus_endpt_get_epid(bus_endpt_t* endpt)
  927. {
  928. return endpt->epid;
  929. }
  930. TOOLKIT_API const char* bus_endpt_get_url(bus_endpt_t* endpt)
  931. {
  932. return endpt->url;
  933. }
  934. TOOLKIT_API int bus_endpt_poll(bus_endpt_t* endpt, int timeout)
  935. {
  936. int result;
  937. int rc;
  938. int epid, type, state;
  939. iobuffer_t* pkt = NULL;
  940. rc = bus_endpt_poll_internal(endpt, &result, timeout);
  941. if (rc > 0) {
  942. if (result == BUS_RESULT_DATA) {
  943. bus_endpt_recv_pkt(endpt, &epid, &type, &pkt);
  944. if (endpt->callback.on_pkt)
  945. endpt->callback.on_pkt(endpt, epid, type, &pkt, endpt->callback.user_data);
  946. if (pkt)
  947. iobuffer_dec_ref(pkt);
  948. }
  949. else if (result == BUS_RESULT_INFO) {
  950. bus_endpt_recv_pkt(endpt, &epid, &type, &pkt);
  951. if (endpt->callback.on_inf)
  952. endpt->callback.on_inf(endpt, epid, type, &pkt, endpt->callback.user_data);
  953. if (pkt)
  954. iobuffer_dec_ref(pkt);
  955. }
  956. else if (result == BUS_RESULT_EVENT) {
  957. bus_endpt_recv_evt(endpt, &epid, &type, &pkt);
  958. if (endpt->callback.on_evt)
  959. endpt->callback.on_evt(endpt, epid, type, &pkt, endpt->callback.user_data);
  960. if (pkt)
  961. iobuffer_dec_ref(pkt);
  962. }
  963. else if (result == BUS_RESULT_SYSTEM) {
  964. bus_endpt_recv_sys(endpt, &epid, &state);
  965. if (endpt->callback.on_sys)
  966. endpt->callback.on_sys(endpt, epid, state, endpt->callback.user_data);
  967. }
  968. else if (result == BUS_RESULT_MSG) {
  969. msg_t* msg = NULL;
  970. bus_endpt_recv_msg(endpt, &msg);
  971. if (endpt->callback.on_msg) {
  972. endpt->callback.on_msg(endpt, msg->type, msg->nparam, msg->params, msg->evt ? &msg->evt_result : NULL, endpt->callback.user_data);
  973. if (msg->evt)
  974. SetEvent(msg->evt);
  975. else
  976. free_msg(msg);
  977. }
  978. else {
  979. if (msg->evt) {
  980. msg->evt_result = -1;
  981. SetEvent(msg->evt);
  982. }
  983. else {
  984. free_msg(msg);
  985. }
  986. }
  987. }
  988. else {
  989. assert(0);
  990. rc = -1;
  991. }
  992. }
  993. return rc;
  994. }
  995. TOOLKIT_API int bus_endpt_set_quit_flag(bus_endpt_t* endpt)
  996. {
  997. endpt->quit_flag = 1;
  998. return 0;
  999. }
  1000. TOOLKIT_API int bus_endpt_get_quit_flag(bus_endpt_t* endpt)
  1001. {
  1002. return endpt->quit_flag;
  1003. }