bus-unix.c 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340
  1. #include "precompile.h"
  2. #include "bus.h"
  3. #include "sockutil.h"
  4. #include "url.h"
  5. #include "spinlock.h"
  6. #include "list.h"
  7. #include "bus_internal.h"
  8. #include "evtpoll.h"
  9. #include <winpr/file.h>
  10. #include <winpr/pipe.h>
  11. #include <winpr/synch.h>
  12. #include <winpr/string.h>
  13. #include <sys/eventfd.h>
  14. #define TAG TOOLKIT_TAG("bus_unix")
  15. #define BUS_RESULT_DATA 1 // ==BUS_TYPE_PACKET, callback: callback.on_pkt, no use
  16. #define BUS_RESULT_INFO 2 // ==BUS_TYPE_INFO, callback: callback.on_inf
  17. #define BUS_RESULT_EVENT 3 // ==BUS_TYPE_EVENT, callback: callback.on_evt , no use
  18. #define BUS_RESULT_SYSTEM 4 // ==BUS_TYPE_SYSTEM, callback: callback.on_sys
  19. #define BUS_RESULT_MSG 5 // send package msg, callback: callback.on_msg
  20. #define BUS_RESULT_UNKNOWN 6
  21. typedef struct msg_t {
  22. struct list_head entry;
  23. int type;
  24. int nparam;
  25. param_size_t* params;
  26. HANDLE evt;
  27. int evt_result;
  28. }msg_t;
  29. struct bus_endpt_t {
  30. int type;
  31. int epid;
  32. union {
  33. HANDLE pipe_handle;
  34. SOCKET sock_handle;
  35. };
  36. char* url;
  37. /*define here or iom_t area, this is definitely a problem for now.*/
  38. evtpoll_t* ep;
  39. int msg_fd;
  40. event_epoll_data_t* msg_sem;
  41. bus_endpt_callback callback;
  42. struct list_head msg_list;
  43. spinlock_t msg_lock;
  44. HANDLE tx_evt; //manually
  45. HANDLE rx_evt; //manually
  46. OVERLAPPED rx_overlapped;
  47. int rx_pending;
  48. int rx_pending_pkt_len;
  49. int rx_pending_pkt_uc_len;
  50. iobuffer_queue_t* rx_buf_queue;
  51. volatile int quit_flag;
  52. };
  53. static void free_msg(msg_t* msg)
  54. {
  55. free(msg->params);
  56. free(msg);
  57. }
  58. static __inline int bus_endpoint__data_is_handle(const bus_endpt_t* endpt, void* data)
  59. {
  60. if ((endpt->type == TYPE_TCP && data == &endpt->sock_handle) ||
  61. (endpt->type == TYPE_PIPE && data == &endpt->pipe_handle)) {
  62. return 1;
  63. }
  64. return 0;
  65. }
  66. static int to_result(int pkt_type)
  67. {
  68. switch (pkt_type) {
  69. case BUS_TYPE_EVENT:
  70. return BUS_RESULT_EVENT;
  71. case BUS_TYPE_SYSTEM:
  72. return BUS_RESULT_SYSTEM;
  73. case BUS_TYPE_PACKET:
  74. return BUS_RESULT_DATA;
  75. case BUS_TYPE_INFO:
  76. return BUS_RESULT_INFO;
  77. default:
  78. break;
  79. }
  80. return BUS_RESULT_UNKNOWN;
  81. }
  82. static HANDLE create_pipe_handle(const char* name)
  83. {
  84. char tmp[MAX_PATH];
  85. HANDLE pipe = INVALID_HANDLE_VALUE;
  86. sprintf(tmp, "\\\\.\\pipe\\%s", name);
  87. for (;;) {
  88. pipe = CreateFileA(tmp,
  89. GENERIC_READ | GENERIC_WRITE,
  90. 0,
  91. NULL,
  92. OPEN_EXISTING,
  93. FILE_FLAG_OVERLAPPED,
  94. NULL);
  95. if (pipe == INVALID_HANDLE_VALUE) {
  96. if (GetLastError() == ERROR_PIPE_BUSY) {
  97. if (WaitNamedPipeA(name, 20000))
  98. continue;
  99. }
  100. }
  101. break;
  102. }
  103. return pipe;
  104. }
  105. static SOCKET create_socket_handle(const char* ip, int port)
  106. {
  107. SOCKET fd;
  108. struct sockaddr_in addr;
  109. /*Warning: only front three param is effective*/
  110. fd = WSASocketA(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
  111. if (fd == INVALID_SOCKET) {
  112. return fd;
  113. }
  114. {
  115. BOOL f = TRUE;
  116. u_long l = TRUE;
  117. setsockopt(fd, SOL_SOCKET, SO_DONTLINGER, (char*)&f, sizeof(f));
  118. /*non block sock*/
  119. _ioctlsocket(fd, FIONBIO, &l);
  120. }
  121. memset(&addr, 0, sizeof(addr));
  122. addr.sin_family = AF_INET;
  123. addr.sin_port = htons(port);
  124. addr.sin_addr.s_addr = inet_addr(ip);
  125. WLog_INFO(TAG, "fd(%d) start to connect...", fd);
  126. if (_connect(fd, (struct sockaddr*) & addr, sizeof(addr)) != 0) {
  127. if (errno == EINPROGRESS) {
  128. WLog_WARN(TAG, "in connect progress...");
  129. fd_set wr_set, ex_set;
  130. FD_ZERO(&wr_set);
  131. FD_ZERO(&ex_set);
  132. FD_SET(fd, &wr_set);
  133. FD_SET(fd, &ex_set);
  134. if (_select(fd + 1, NULL, &wr_set, &ex_set, NULL) > 0 && FD_ISSET(fd, &wr_set)) {
  135. return fd;
  136. }
  137. } else {
  138. WLog_ERR(TAG, "_connect failed : %d", errno);
  139. }
  140. closesocket(fd);
  141. return INVALID_SOCKET;
  142. }
  143. WLog_WARN(TAG, "this should be appear hardly!!!");
  144. return fd;
  145. }
  146. static int tcp_send_buf(bus_endpt_t* endpt, const char* buf, int n)
  147. {
  148. DWORD left = n;
  149. DWORD offset = 0;
  150. WLog_DBG(TAG, "==> fd(%d): tcp send buf len: %d", endpt->sock_handle, n);
  151. while (left > 0) {
  152. BOOL ret;
  153. WSABUF wsabuf;
  154. DWORD dwBytesTransfer;
  155. OVERLAPPED overlapped;
  156. memset(&overlapped, 0, sizeof(overlapped));
  157. overlapped.hEvent = endpt->tx_evt;
  158. ResetEvent(endpt->tx_evt);
  159. wsabuf.buf = (char*)buf + offset;
  160. wsabuf.len = left;
  161. ret = FALSE;
  162. n = _send(endpt->sock_handle, wsabuf.buf, wsabuf.len, 0);
  163. if (n == -1) {
  164. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  165. fd_set wfds;
  166. struct timeval tv;
  167. int retval;
  168. FD_ZERO(&wfds);
  169. FD_SET(endpt->sock_handle, &wfds);
  170. tv.tv_sec = 5;
  171. tv.tv_usec = 0;
  172. retval = _select(endpt->sock_handle + 1, NULL, &wfds, NULL, &tv);
  173. if (retval == -1) {
  174. WLog_ERR(TAG, "select error, errno: %s", strerror(errno));
  175. }
  176. else if (retval && FD_ISSET(endpt->sock_handle, &wfds) > 0) {
  177. WLog_INFO(TAG, "can write");
  178. n = _send(endpt->sock_handle, wsabuf.buf, wsabuf.len, 0);
  179. if (n >= 0) {
  180. ret = TRUE;
  181. dwBytesTransfer = n;
  182. }
  183. } else {
  184. WLog_WARN(TAG, "write timeout");
  185. ret = TRUE;
  186. dwBytesTransfer = 0;
  187. }
  188. }
  189. else {
  190. WLog_ERR(TAG, "_send failed: %s", strerror(errno));
  191. }
  192. }
  193. else {
  194. ret = TRUE;
  195. dwBytesTransfer = n;
  196. }
  197. if (ret && dwBytesTransfer) {
  198. offset += dwBytesTransfer;
  199. left -= dwBytesTransfer;
  200. }
  201. else {
  202. WLog_DBG(TAG, "<== error fd(%d): tcp send buf left: %d", endpt->sock_handle, left);
  203. return -1;
  204. }
  205. }
  206. WLog_DBG(TAG, "<== fd(%d): tcp send buf len: %d", endpt->sock_handle, n);
  207. return 0;
  208. }
  209. static int pipe_send_buf(bus_endpt_t* endpt, const char* buf, int n)
  210. {
  211. DWORD left = n;
  212. DWORD offset = 0;
  213. while (left > 0) {
  214. BOOL ret;
  215. DWORD dwBytesTransfer;
  216. OVERLAPPED overlapped;
  217. memset(&overlapped, 0, sizeof(overlapped));
  218. overlapped.hEvent = endpt->tx_evt;
  219. ResetEvent(endpt->tx_evt);
  220. ret = WriteFile(endpt->pipe_handle, buf + offset, left, &dwBytesTransfer, &overlapped);
  221. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  222. ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE);
  223. }
  224. if (ret && dwBytesTransfer) {
  225. offset += dwBytesTransfer;
  226. left -= dwBytesTransfer;
  227. }
  228. else {
  229. return -1;
  230. }
  231. }
  232. return 0;
  233. }
  234. static int send_buf(bus_endpt_t* endpt, const char* buf, int n)
  235. {
  236. if (endpt->type == TYPE_PIPE) {
  237. return pipe_send_buf(endpt, buf, n);
  238. }
  239. else if (endpt->type == TYPE_TCP) {
  240. return tcp_send_buf(endpt, buf, n);
  241. }
  242. else {
  243. return -1;
  244. }
  245. }
  246. static int send_pkt_raw(bus_endpt_t* endpt, iobuffer_t* pkt)
  247. {
  248. int pkt_len = iobuffer_get_length(pkt);
  249. int rc;
  250. iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_len, 0);
  251. rc = send_buf(endpt, iobuffer_data(pkt, 0), iobuffer_get_length(pkt));
  252. return rc;
  253. }
  254. static int pipe_recv_buf(bus_endpt_t* endpt, char* buf, DWORD n)
  255. {
  256. DWORD left = n;
  257. DWORD offset = 0;
  258. while (left > 0) {
  259. BOOL ret;
  260. DWORD dwBytesTransfer;
  261. OVERLAPPED overlapped;
  262. memset(&overlapped, 0, sizeof(overlapped));
  263. overlapped.hEvent = endpt->rx_evt;
  264. ResetEvent(overlapped.hEvent);
  265. ret = ReadFile(endpt->pipe_handle, buf + offset, left, &dwBytesTransfer, &overlapped);
  266. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  267. ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE);
  268. }
  269. if (ret && dwBytesTransfer) {
  270. offset += dwBytesTransfer;
  271. left -= dwBytesTransfer;
  272. }
  273. else {
  274. return -1;
  275. }
  276. }
  277. return 0;
  278. }
  279. static int tcp_recv_buf(bus_endpt_t* endpt, char* buf, DWORD n)
  280. {
  281. DWORD left = n;
  282. DWORD offset = 0;
  283. while (left > 0) {
  284. BOOL ret;
  285. DWORD dwBytesTransfer;
  286. ret = FALSE;
  287. n = _recv(endpt->sock_handle, buf + offset, left, 0);
  288. if (n == -1) {
  289. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  290. fd_set rfds;
  291. struct timeval tv;
  292. int retval;
  293. FD_ZERO(&rfds);
  294. FD_SET(endpt->sock_handle, &rfds);
  295. tv.tv_sec = 5;
  296. tv.tv_usec = 0;
  297. retval = _select(endpt->sock_handle + 1, &rfds, NULL, NULL, &tv);
  298. if (retval == -1) {
  299. WLog_ERR(TAG, "select failed: %s", strerror(errno));
  300. }
  301. else if (retval && FD_ISSET(endpt->sock_handle, &rfds) > 0) {
  302. WLog_INFO(TAG, "can read");
  303. n = _recv(endpt->sock_handle, buf + offset, left, 0);
  304. if (n >= 0) {
  305. ret = TRUE;
  306. dwBytesTransfer = n;
  307. }
  308. }
  309. else {
  310. WLog_WARN(TAG, "read timeout: %d, left: %d", retval, left);
  311. ret = TRUE;
  312. dwBytesTransfer = 0;
  313. }
  314. }
  315. }
  316. else {
  317. ret = TRUE;
  318. dwBytesTransfer = n;
  319. }
  320. if (ret && dwBytesTransfer) {
  321. offset += dwBytesTransfer;
  322. left -= dwBytesTransfer;
  323. }
  324. else if (!ret) {
  325. return -1;
  326. }
  327. }
  328. WLog_DBG(TAG, "fd(%d): tcp recv buf len:%d", endpt->sock_handle, n);
  329. return 0;
  330. }
  331. /*read data block, n is the dream length of data to read which will store in buf*/
  332. static int recv_buf(bus_endpt_t* endpt, char* buf, DWORD n)
  333. {
  334. if (endpt->type == TYPE_PIPE) {
  335. return pipe_recv_buf(endpt, buf, n);
  336. }
  337. else if (endpt->type == TYPE_TCP) {
  338. return tcp_recv_buf(endpt, buf, n);
  339. }
  340. else {
  341. return -1;
  342. }
  343. }
  344. static int recv_pkt_raw(bus_endpt_t* endpt, iobuffer_t** pkt)
  345. {
  346. int pkt_len;
  347. int rc = -1;
  348. rc = recv_buf(endpt, (char*)&pkt_len, 4);
  349. if (rc != 0)
  350. return rc;
  351. *pkt = iobuffer_create(-1, pkt_len);
  352. iobuffer_push_count(*pkt, pkt_len);
  353. if (pkt_len > 0) {
  354. rc = recv_buf(endpt, iobuffer_data(*pkt, 0), pkt_len);
  355. }
  356. if (rc < 0) {
  357. iobuffer_destroy(*pkt);
  358. *pkt = NULL;
  359. }
  360. return rc;
  361. }
  362. static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
  363. {
  364. DWORD dwBytesTransferred;
  365. BOOL ret;
  366. int rc = 0;
  367. iobuffer_t* pkt = NULL;
  368. *p_pkt = NULL;
  369. WLog_DBG(TAG, "==>endpt(%d): start_read_pkt", endpt->epid);
  370. ResetEvent(endpt->rx_evt);
  371. endpt->rx_pending_pkt_uc_len = 0;
  372. if (endpt->type == TYPE_PIPE) {
  373. ret = ReadFile(endpt->pipe_handle,
  374. &endpt->rx_pending_pkt_len,
  375. 4,
  376. &dwBytesTransferred,
  377. &endpt->rx_overlapped);
  378. }
  379. else if (endpt->type == TYPE_TCP) {
  380. ret = _recv(endpt->sock_handle, (char*)&endpt->rx_pending_pkt_len, 4, 0);
  381. }
  382. else {
  383. return -1;
  384. }
  385. if (ret >= 0) {
  386. dwBytesTransferred = ret;
  387. endpt->rx_pending_pkt_uc_len = ret;
  388. if (dwBytesTransferred == 0)
  389. return -1;
  390. if (dwBytesTransferred < 4) {
  391. rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len + dwBytesTransferred, 4 - dwBytesTransferred);
  392. if (rc < 0) {
  393. return rc;
  394. }
  395. }
  396. pkt = iobuffer_create(0, endpt->rx_pending_pkt_len);
  397. endpt->rx_pending_pkt_uc_len = 0;
  398. if (endpt->rx_pending_pkt_len > 0) {
  399. rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len);
  400. if (rc < 0) {
  401. iobuffer_destroy(pkt);
  402. return rc;
  403. }
  404. iobuffer_push_count(pkt, endpt->rx_pending_pkt_len);
  405. }
  406. *p_pkt = pkt;
  407. return 0;
  408. }
  409. else if(errno == EAGAIN || errno == EWOULDBLOCK) {
  410. WLog_DBG(TAG, "endpt(%d): set rx pending flag.", endpt->epid);
  411. endpt->rx_pending = 1;
  412. return 0;
  413. }
  414. return -1;
  415. }
  416. static int read_left_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
  417. {
  418. BOOL ret;
  419. int rc;
  420. DWORD dwBytesTransferred;
  421. iobuffer_t* pkt = NULL;
  422. WLog_DBG(TAG, "==> fd(%d): read left pkt", endpt->sock_handle);
  423. if (endpt->type == TYPE_PIPE) {
  424. ret = GetOverlappedResult(endpt->pipe_handle, &endpt->rx_overlapped, &dwBytesTransferred, TRUE);
  425. }
  426. else if (endpt->type == TYPE_TCP) {
  427. do
  428. {
  429. ret = _recv(endpt->sock_handle,
  430. (char*)&endpt->rx_pending_pkt_len + endpt->rx_pending_pkt_uc_len,
  431. 4 - endpt->rx_pending_pkt_uc_len, 0);
  432. } while (ret == -1 && errno == EINTR);
  433. }
  434. if (ret < 0) {
  435. WLog_ERR(TAG, "<== fd(%d): read left pkt failed: ret %d, err: %s", endpt->sock_handle, ret, strerror(errno));
  436. return -1;
  437. }
  438. else if(ret == 0) {
  439. WLog_WARN(TAG, "<== fd(%d): peer socket close.", endpt->sock_handle);
  440. }
  441. if (dwBytesTransferred < 4) {
  442. rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len + dwBytesTransferred, 4 - dwBytesTransferred);
  443. if (rc < 0)
  444. return rc;
  445. }
  446. /*the first 4 bytes indicates the length of content and then read the buffer content.*/
  447. pkt = iobuffer_create(-1, endpt->rx_pending_pkt_len);
  448. WLog_DBG(TAG, "after read pkt len, start to read pkt's content: %d", endpt->rx_pending_pkt_len);
  449. rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len);
  450. if (rc < 0) {
  451. iobuffer_destroy(pkt);
  452. return rc;
  453. }
  454. iobuffer_push_count(pkt, endpt->rx_pending_pkt_len);
  455. *p_pkt = pkt;
  456. WLog_DBG(TAG, "endpt(%d): reset rx_pending", endpt->epid);
  457. endpt->rx_pending = 0;
  458. return 0;
  459. }
  460. static int append_rx_pkt(bus_endpt_t* endpt, iobuffer_t* pkt)
  461. {
  462. int type;
  463. int read_state;
  464. read_state = iobuffer_get_read_state(pkt);
  465. iobuffer_read(pkt, IOBUF_T_I4, &type, 0);
  466. iobuffer_restore_read_state(pkt, read_state);
  467. if (type == BUS_TYPE_PACKET || type == BUS_TYPE_INFO || type == BUS_TYPE_EVENT || type == BUS_TYPE_SYSTEM) {
  468. iobuffer_queue_enqueue(endpt->rx_buf_queue, pkt);
  469. return 1;
  470. }
  471. else {
  472. return -1;
  473. }
  474. }
  475. TOOLKIT_API int bus_endpt_create(const char* url, int epid, const bus_endpt_callback* callback, bus_endpt_t** p_endpt)
  476. {
  477. bus_endpt_t* endpt = NULL;
  478. char* tmp_url;
  479. url_fields uf;
  480. int rc;
  481. int v;
  482. iobuffer_t* buf = NULL;
  483. iobuffer_t* ans_buf = NULL;
  484. if (!url)
  485. return -1;
  486. tmp_url = _strdup(url);
  487. if (url_parse(tmp_url, &uf) < 0) {
  488. free(tmp_url);
  489. return -1;
  490. }
  491. endpt = ZALLOC_T(bus_endpt_t);
  492. endpt->sock_handle = -1;
  493. endpt->msg_fd = -1;
  494. endpt->ep = NULL;
  495. endpt->url = tmp_url;
  496. if (_stricmp(uf.scheme, "tcp") == 0) {
  497. endpt->type = TYPE_TCP;
  498. endpt->sock_handle = create_socket_handle(uf.host, uf.port);
  499. if (endpt->sock_handle == INVALID_SOCKET)
  500. goto on_error;
  501. WLog_INFO(TAG, "bus endpt socket fd: %d", endpt->sock_handle);
  502. }
  503. else if (_stricmp(uf.scheme, "pipe") == 0) {
  504. endpt->type = TYPE_PIPE;
  505. endpt->pipe_handle = create_pipe_handle(uf.host);
  506. if (endpt->pipe_handle == INVALID_HANDLE_VALUE)
  507. goto on_error;
  508. WLog_INFO(TAG, "bus endpt pipe fd: %d", endpt->sock_handle);
  509. }
  510. else {
  511. goto on_error;
  512. }
  513. endpt->ep = evtpoll_create();
  514. if (!endpt->ep) {
  515. WLog_ERR(TAG, "evtpoll create failed!");
  516. goto on_error;
  517. }
  518. endpt->epid = epid;
  519. endpt->tx_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  520. endpt->rx_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  521. endpt->rx_buf_queue = iobuffer_queue_create();
  522. endpt->msg_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
  523. if (endpt->msg_fd == -1) {
  524. WLog_ERR(TAG, "create event fd failed: %s(%d)", strerror(errno), errno);
  525. goto on_error;
  526. }
  527. INIT_LIST_HEAD(&endpt->msg_list);
  528. spinlock_init(&endpt->msg_lock);
  529. memcpy(&endpt->callback, callback, sizeof(bus_endpt_callback));
  530. buf = iobuffer_create(-1, -1);
  531. v = BUS_TYPE_ENDPT_REGISTER;
  532. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  533. v = endpt->epid;
  534. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  535. WLog_DBG(TAG, "start to send_pkt_raw");
  536. rc = send_pkt_raw(endpt, buf);
  537. WLog_INFO(TAG, "send_pkt_raw return %d", rc);
  538. if (rc != 0)
  539. goto on_error;
  540. rc = recv_pkt_raw(endpt, &ans_buf);
  541. WLog_INFO(TAG, "recv_pkt_raw return %d", rc);
  542. if (rc != 0) {
  543. goto on_error;
  544. }
  545. iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0);
  546. iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0);
  547. if (rc != 0)
  548. goto on_error;
  549. url_free_fields(&uf);
  550. if (buf)
  551. iobuffer_destroy(buf);
  552. if (ans_buf)
  553. iobuffer_destroy(ans_buf);
  554. if (-1 == evtpoll_attach(endpt->ep, endpt->msg_fd)) {
  555. goto on_error;
  556. }
  557. if (evtpoll_subscribe(endpt->ep, EV_READ, endpt->msg_fd, &endpt->msg_fd, NULL)) {
  558. WLog_ERR(TAG, "epoll subscribe bus endpt eventfd failed.");
  559. goto on_error_msg;
  560. }
  561. /* subscribe read event [3/27/2020 Gifur] */
  562. if (evtpoll_attach(endpt->ep, endpt->sock_handle)) {
  563. WLog_ERR(TAG, "epoll attch bus endpt failed.");
  564. goto on_error_msg;
  565. }
  566. if (evtpoll_subscribe(endpt->ep, EV_READ, endpt->sock_handle, &endpt->sock_handle, NULL)) {
  567. WLog_ERR(TAG, "epoll subscribe bus endpt failed.");
  568. goto on_error_handle;
  569. }
  570. *p_endpt = endpt;
  571. return 0;
  572. on_error_handle:
  573. evtpoll_detach(endpt->ep, endpt->sock_handle);
  574. on_error_msg:
  575. evtpoll_detach(endpt->ep, endpt->msg_fd);
  576. on_error:
  577. if (endpt->type == TYPE_TCP) {
  578. closesocket(endpt->sock_handle);
  579. }
  580. else if (endpt->type == TYPE_PIPE) {
  581. CloseHandle(endpt->pipe_handle);
  582. }
  583. if (endpt->msg_fd > 0)
  584. close(endpt->msg_fd);
  585. if (endpt->tx_evt)
  586. CloseHandle(endpt->tx_evt);
  587. if (endpt->rx_evt)
  588. CloseHandle(endpt->rx_evt);
  589. if (endpt->rx_buf_queue)
  590. iobuffer_queue_destroy(endpt->rx_buf_queue);
  591. if (endpt->url)
  592. free(endpt->url);
  593. free(endpt);
  594. url_free_fields(&uf);
  595. if (buf)
  596. iobuffer_destroy(buf);
  597. if (ans_buf)
  598. iobuffer_destroy(ans_buf);
  599. if (endpt->ep != NULL) {
  600. evtpoll_destroy(endpt->ep);
  601. }
  602. return -1;
  603. }
  604. TOOLKIT_API void bus_endpt_destroy(bus_endpt_t* endpt)
  605. {
  606. int rc = -1;
  607. iobuffer_t* buf = NULL;
  608. iobuffer_t* ans_buf = NULL;
  609. int v;
  610. assert(endpt);
  611. buf = iobuffer_create(-1, -1);
  612. v = BUS_TYPE_ENDPT_UNREGISTER;
  613. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  614. v = endpt->epid;
  615. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  616. rc = send_pkt_raw(endpt, buf);
  617. if (rc != 0)
  618. goto on_error;
  619. rc = recv_pkt_raw(endpt, &ans_buf);
  620. if (rc != 0)
  621. goto on_error;
  622. iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0);
  623. iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0);
  624. {
  625. msg_t* msg, * t;
  626. list_for_each_entry_safe(msg, t, &endpt->msg_list, msg_t, entry) {
  627. list_del(&msg->entry);
  628. if (msg->evt)
  629. CloseHandle(msg->evt);
  630. free(msg);
  631. }
  632. }
  633. on_error:
  634. if (buf)
  635. iobuffer_destroy(buf);
  636. if (ans_buf)
  637. iobuffer_destroy(ans_buf);
  638. if (endpt->type == TYPE_TCP) {
  639. closesocket(endpt->sock_handle);
  640. }
  641. else if (endpt->type == TYPE_PIPE) {
  642. CloseHandle(endpt->pipe_handle);
  643. }
  644. if (endpt->msg_fd)
  645. close(endpt->msg_fd);
  646. if (endpt->tx_evt)
  647. CloseHandle(endpt->tx_evt);
  648. if (endpt->rx_evt)
  649. CloseHandle(endpt->rx_evt);
  650. if (endpt->rx_buf_queue)
  651. iobuffer_queue_destroy(endpt->rx_buf_queue);
  652. assert(endpt->ep);
  653. evtpoll_destroy(endpt->ep);
  654. if (endpt->url)
  655. free(endpt->url);
  656. free(endpt);
  657. }
  658. // 1 : recv ok
  659. // 0 : time out
  660. // <0 : error
  661. static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
  662. {
  663. iobuffer_t* pkt = NULL;
  664. int rc;
  665. BOOL ret;
  666. assert(endpt);
  667. // peek first packge type
  668. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  669. pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  670. }
  671. else {
  672. // no received package, try to receive one
  673. if (!endpt->rx_pending) {
  674. rc = start_read_pkt(endpt, &pkt);
  675. if (rc < 0) {
  676. WLog_ERR(TAG, "start read pkt failed.");
  677. return rc;
  678. }
  679. if (pkt) {
  680. WLog_INFO(TAG, "pkt has read");
  681. rc = append_rx_pkt(endpt, pkt); // append pkt to rx_buf_queue
  682. if (rc < 0) {
  683. iobuffer_destroy(pkt);
  684. return -1;
  685. }
  686. }
  687. }
  688. else {
  689. //WLog_ERR(TAG, "is pending now.");
  690. }
  691. // if receive is pending, wait for send or receive complete event
  692. if (!pkt) {
  693. //WLog_DBG(TAG, "wait msg sem or received event. tiemout: %d", timeout);
  694. {
  695. int nfds;
  696. int ret;
  697. int i;
  698. struct epoll_event events[MAX_EPOLL_EVENT];
  699. struct epoll_event* pe;
  700. nfds = evtpoll_wait(endpt->ep, events, MAX_EPOLL_EVENT, timeout);
  701. if (nfds == 0) {
  702. //WLog_DBG(TAG, "epoll wait timeout.");
  703. return 0; //timeout
  704. }
  705. if (nfds == -1) {
  706. return -1;
  707. }
  708. WLog_DBG(TAG, "epoll wait return nfd: %d", nfds);
  709. for (i = 0; i < nfds; ++i) {
  710. void* pdata = NULL;
  711. pe = events + i;
  712. WLog_INFO(TAG, "loop events[%d]::fd(0x%08X) OUT:%d, IN:%d", i, pe->data.fd,
  713. pe->events & EPOLLOUT ? 1 : 0, pe->events & EPOLLIN ? 1 : 0);
  714. assert(pe->events & EPOLLIN);
  715. ret = evtpoll_deal(endpt->ep, pe, &pdata, 0);
  716. if (!ret) {
  717. assert(pdata);
  718. if(bus_endpoint__data_is_handle(endpt, pdata))
  719. {
  720. rc = read_left_pkt(endpt, &pkt);
  721. if (rc < 0)
  722. return rc;
  723. if (pkt) {
  724. rc = append_rx_pkt(endpt, pkt);
  725. if (rc < 0) {
  726. iobuffer_destroy(pkt);
  727. return -1;
  728. }
  729. }
  730. }
  731. else if (pdata == &endpt->msg_fd) {
  732. uint64_t rdata;
  733. WLog_DBG(TAG, "message arrive.");
  734. do
  735. {
  736. ret = read(endpt->msg_fd, &rdata, sizeof rdata);
  737. } while (ret < 0 && errno == EINTR);
  738. if (ret < 0) {
  739. WLog_ERR(TAG, "read msg fd failed: %s", strerror(errno));
  740. abort();
  741. }
  742. *result = BUS_RESULT_MSG;
  743. return 1;
  744. }
  745. }
  746. }
  747. }
  748. }
  749. else {
  750. WLog_ERR(TAG, "pkt has readed");
  751. }
  752. }
  753. if (pkt) {
  754. int type;
  755. int read_state = iobuffer_get_read_state(pkt);
  756. iobuffer_read(pkt, IOBUF_T_I4, &type, 0);
  757. iobuffer_restore_read_state(pkt, read_state);
  758. *result = to_result(type);
  759. if (*result == BUS_RESULT_UNKNOWN) {
  760. WLog_ERR(TAG, "bug: unknown pkt type!");
  761. return -1;
  762. }
  763. return 1;
  764. }
  765. return -1;
  766. }
  767. static int recv_until(bus_endpt_t* endpt, int type, iobuffer_t** p_ansbuf)
  768. {
  769. int rc;
  770. iobuffer_t* ans_pkt = NULL;
  771. int ans_type;
  772. WLog_DBG(TAG, "==>endpt(%d): recv until type: 0x%08X", endpt->epid, type);
  773. for (;;) {
  774. if (!endpt->rx_pending) {
  775. rc = start_read_pkt(endpt, &ans_pkt);
  776. if (rc < 0) {
  777. break;
  778. }
  779. } else {
  780. WLog_DBG(TAG, "endpt(%d) is pending", endpt->epid);
  781. }
  782. if (!ans_pkt) {
  783. int nfds;
  784. int ret;
  785. int i, flag = 0;
  786. struct epoll_event events[MAX_EPOLL_EVENT];
  787. struct epoll_event* pe;
  788. nfds = evtpoll_wait(endpt->ep, events, MAX_EPOLL_EVENT, -1);
  789. if (nfds == 0) {
  790. continue;
  791. }
  792. if (nfds == -1) {
  793. return -1;
  794. }
  795. WLog_DBG(TAG, "epoll wait return nfd: %d", nfds);
  796. for (i = 0; i < nfds; ++i) {
  797. void* pdata = NULL;
  798. pe = events + i;
  799. ret = evtpoll_deal(endpt->ep, pe, &pdata, 0);
  800. if (!ret && bus_endpoint__data_is_handle(endpt, pdata)) {
  801. flag = 1;
  802. break;
  803. }
  804. }
  805. if (flag) {
  806. rc = read_left_pkt(endpt, &ans_pkt);
  807. if (rc < 0) {
  808. break;
  809. }
  810. }
  811. }
  812. if (ans_pkt) {
  813. int read_state = iobuffer_get_read_state(ans_pkt);
  814. iobuffer_read(ans_pkt, IOBUF_T_I4, &ans_type, 0);
  815. iobuffer_restore_read_state(ans_pkt, read_state);
  816. if (ans_type == type) {
  817. *p_ansbuf = ans_pkt;
  818. break;
  819. }
  820. else {
  821. rc = append_rx_pkt(endpt, ans_pkt);
  822. if (rc < 0) {
  823. iobuffer_destroy(ans_pkt);
  824. break;
  825. }
  826. else {
  827. ans_pkt = NULL;
  828. }
  829. }
  830. }
  831. }
  832. return rc;
  833. }
  834. static int recv_until_result(bus_endpt_t* endpt, int* p_result)
  835. {
  836. int rc;
  837. iobuffer_t* ans_pkt = NULL;
  838. int type, error;
  839. rc = recv_until(endpt, BUS_TYPE_ERROR, &ans_pkt);
  840. if (rc < 0)
  841. return rc;
  842. iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0);
  843. iobuffer_read(ans_pkt, IOBUF_T_I4, &error, 0);
  844. iobuffer_destroy(ans_pkt);
  845. *p_result = error;
  846. return rc;
  847. }
  848. static int recv_until_state(bus_endpt_t* endpt, int* p_state)
  849. {
  850. int rc;
  851. iobuffer_t* ans_pkt = NULL;
  852. int type, epid, state;
  853. rc = recv_until(endpt, BUS_TYPE_ENDPT_GET_STATE, &ans_pkt);
  854. if (rc < 0)
  855. return rc;
  856. iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0);
  857. iobuffer_read(ans_pkt, IOBUF_T_I4, &epid, 0);
  858. iobuffer_read(ans_pkt, IOBUF_T_I4, &state, 0);
  859. iobuffer_destroy(ans_pkt);
  860. WLog_DBG(TAG, "state address: 0x%08X", p_state);
  861. *p_state = state;
  862. return rc;
  863. }
  864. TOOLKIT_API int bus_endpt_send_pkt(bus_endpt_t* endpt, int epid, int type, iobuffer_t* pkt)
  865. {
  866. int t;
  867. int rc;
  868. int read_state;
  869. int write_state;
  870. int error;
  871. assert(endpt);
  872. read_state = iobuffer_get_read_state(pkt);
  873. write_state = iobuffer_get_write_state(pkt);
  874. t = epid; // remote epid
  875. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  876. t = endpt->epid; // local epid
  877. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  878. t = type; // user type
  879. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  880. t = BUS_TYPE_PACKET; // type
  881. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  882. rc = send_pkt_raw(endpt, pkt);
  883. iobuffer_restore_read_state(pkt, read_state);
  884. iobuffer_restore_write_state(pkt, write_state);
  885. if (rc < 0)
  886. return rc;
  887. rc = recv_until_result(endpt, &error);
  888. if (rc == 0 && error != 0)
  889. rc = error;
  890. return rc;
  891. }
  892. TOOLKIT_API int bus_endpt_send_info(bus_endpt_t* endpt, int epid, int type, iobuffer_t* pkt)
  893. {
  894. int t;
  895. int rc;
  896. int read_state;
  897. int write_state;
  898. assert(endpt);
  899. WLog_DBG(TAG, "endpt(%d) send info: %d, %d.", endpt->epid, epid, type);
  900. read_state = iobuffer_get_read_state(pkt);
  901. write_state = iobuffer_get_write_state(pkt);
  902. t = epid; // remote epid
  903. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  904. t = endpt->epid; // local epid
  905. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  906. t = type; // user type
  907. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  908. t = BUS_TYPE_INFO; // type
  909. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  910. rc = send_pkt_raw(endpt, pkt);
  911. iobuffer_restore_read_state(pkt, read_state);
  912. iobuffer_restore_write_state(pkt, write_state);
  913. return rc;
  914. }
  915. TOOLKIT_API int bus_endpt_bcast_evt(bus_endpt_t* endpt, int type, iobuffer_t* pkt)
  916. {
  917. int t;
  918. int rc;
  919. int read_state;
  920. int write_state;
  921. assert(endpt);
  922. read_state = iobuffer_get_read_state(pkt);
  923. write_state = iobuffer_get_write_state(pkt);
  924. t = endpt->epid;
  925. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  926. t = type;
  927. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  928. t = BUS_TYPE_EVENT;
  929. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  930. rc = send_pkt_raw(endpt, pkt);
  931. iobuffer_restore_read_state(pkt, read_state);
  932. iobuffer_restore_write_state(pkt, write_state);
  933. return rc;
  934. }
  935. static int bus_endpt_recv_pkt(bus_endpt_t* endpt, int* p_epid, int* p_type, iobuffer_t** p_pkt)
  936. {
  937. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  938. iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  939. int read_state = iobuffer_get_read_state(pkt);
  940. int pkt_type, usr_type, from_epid, to_epid;
  941. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  942. if (pkt_type == BUS_TYPE_PACKET || pkt_type == BUS_TYPE_INFO) {
  943. iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0);
  944. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  945. iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0);
  946. if (p_epid)
  947. *p_epid = from_epid;
  948. if (p_type)
  949. *p_type = usr_type;
  950. iobuffer_queue_deque(endpt->rx_buf_queue);
  951. if (p_pkt) {
  952. *p_pkt = pkt;
  953. }
  954. else {
  955. iobuffer_destroy(pkt);
  956. }
  957. return 0;
  958. }
  959. else {
  960. iobuffer_restore_read_state(pkt, read_state);
  961. }
  962. }
  963. return -1;
  964. }
  965. static int bus_endpt_recv_evt(bus_endpt_t* endpt, int* p_epid, int* p_type, iobuffer_t** p_pkt)
  966. {
  967. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  968. iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  969. int read_state = iobuffer_get_read_state(pkt);
  970. int pkt_type, usr_type, from_epid;
  971. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  972. if (pkt_type == BUS_TYPE_EVENT) {
  973. iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0);
  974. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  975. if (p_epid)
  976. *p_epid = from_epid;
  977. if (p_type)
  978. *p_type = usr_type;
  979. iobuffer_queue_deque(endpt->rx_buf_queue);
  980. if (p_pkt) {
  981. *p_pkt = pkt;
  982. }
  983. else {
  984. iobuffer_destroy(pkt);
  985. }
  986. return 0;
  987. }
  988. else {
  989. iobuffer_restore_read_state(pkt, read_state);
  990. }
  991. }
  992. return -1;
  993. }
  994. static int bus_endpt_recv_sys(bus_endpt_t* endpt, int* p_epid, int* p_state)
  995. {
  996. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  997. iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  998. int read_state = iobuffer_get_read_state(pkt);
  999. int pkt_type, epid, state;
  1000. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  1001. if (pkt_type == BUS_TYPE_SYSTEM) {
  1002. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  1003. iobuffer_read(pkt, IOBUF_T_I4, &state, 0);
  1004. if (p_epid)
  1005. *p_epid = epid;
  1006. if (p_state)
  1007. *p_state = state;
  1008. iobuffer_queue_deque(endpt->rx_buf_queue);
  1009. iobuffer_destroy(pkt);
  1010. return 0;
  1011. }
  1012. else {
  1013. iobuffer_restore_read_state(pkt, read_state);
  1014. }
  1015. }
  1016. return -1;
  1017. }
  1018. static int bus_endpt_recv_msg(bus_endpt_t* endpt, msg_t** p_msg)
  1019. {
  1020. int rc = -1;
  1021. assert(endpt);
  1022. assert(p_msg);
  1023. spinlock_enter(&endpt->msg_lock, -1);
  1024. if (!list_empty(&endpt->msg_list)) {
  1025. msg_t* e = list_first_entry(&endpt->msg_list, msg_t, entry);
  1026. list_del(&e->entry);
  1027. rc = 0;
  1028. *p_msg = e;
  1029. }
  1030. spinlock_leave(&endpt->msg_lock);
  1031. return rc;
  1032. }
  1033. TOOLKIT_API int bus_endpt_get_state(bus_endpt_t* endpt, int epid, int* p_state)
  1034. {
  1035. iobuffer_t* buf = NULL;
  1036. int v;
  1037. int rc = -1;
  1038. assert(endpt);
  1039. buf = iobuffer_create(-1, -1);
  1040. v = BUS_TYPE_ENDPT_GET_STATE;
  1041. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  1042. v = epid;
  1043. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  1044. rc = send_pkt_raw(endpt, buf);
  1045. if (rc < 0) {
  1046. WLog_ERR(TAG, "send pkt raw failed.");
  1047. goto on_error;
  1048. }
  1049. rc = recv_until_state(endpt, p_state);
  1050. on_error:
  1051. if (buf)
  1052. iobuffer_destroy(buf);
  1053. return rc;
  1054. }
  1055. TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t* endpt, int msg, int nparam, param_size_t params[])
  1056. {
  1057. msg_t* e;
  1058. int rc;
  1059. uint64_t wdata = 0;
  1060. assert(endpt);
  1061. WLog_DBG(TAG, "==> endpt(%d) post msg: %d", endpt->epid, msg);
  1062. e = MALLOC_T(msg_t);
  1063. e->type = msg;
  1064. e->nparam = nparam;
  1065. if (nparam) {
  1066. e->params = (param_size_t*)malloc(sizeof(param_size_t) * nparam);
  1067. memcpy(e->params, params, sizeof(param_size_t) * nparam);
  1068. }
  1069. else {
  1070. e->params = NULL;
  1071. }
  1072. e->evt = NULL;
  1073. spinlock_enter(&endpt->msg_lock, -1);
  1074. list_add_tail(&e->entry, &endpt->msg_list);
  1075. spinlock_leave(&endpt->msg_lock);
  1076. wdata = 1;
  1077. rc = write(endpt->msg_fd, &wdata, sizeof wdata);
  1078. if (rc == -1) {
  1079. WLog_ERR(TAG, "write to eventfd failed: %s(%d)", strerror(errno), errno);
  1080. return -1;
  1081. }
  1082. return 0;
  1083. }
  1084. TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t* endpt, int msg, int nparam, param_size_t params[])
  1085. {
  1086. msg_t e;
  1087. int rc;
  1088. uint64_t wdata = 0;
  1089. assert(endpt);
  1090. WLog_DBG(TAG, "==> endpt(%d) send msg: epid %d, param counts %d", endpt->epid, msg, nparam);
  1091. e.type = msg;
  1092. e.nparam = nparam;
  1093. if (nparam) {
  1094. e.params = (param_size_t*)malloc(sizeof(param_size_t) * nparam);
  1095. memcpy(e.params, params, sizeof(param_size_t) * nparam);
  1096. }
  1097. else {
  1098. e.params = NULL;
  1099. }
  1100. e.evt_result = 0;
  1101. e.evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  1102. assert(e.evt != NULL);
  1103. spinlock_enter(&endpt->msg_lock, -1);
  1104. list_add_tail(&e.entry, &endpt->msg_list);
  1105. spinlock_leave(&endpt->msg_lock);
  1106. wdata = 1;
  1107. rc = write(endpt->msg_fd, &wdata, sizeof wdata);
  1108. if (rc == -1) {
  1109. WLog_ERR(TAG, "write to eventfd failed: %s(%d)", strerror(errno), errno);
  1110. CloseHandle(e.evt);
  1111. if (nparam) {
  1112. free(e.params);
  1113. }
  1114. WLog_DBG(TAG, "<== error endpt(%d) send msg: %d", endpt->epid, msg);
  1115. return -1;
  1116. }
  1117. WaitForSingleObject(e.evt, INFINITE);
  1118. CloseHandle(e.evt);
  1119. if (nparam) {
  1120. free(e.params);
  1121. }
  1122. WLog_DBG(TAG, "<== endpt(%d) send msg: epid %d, evt_res: %d", endpt->epid, msg, e.evt_result);
  1123. return e.evt_result;
  1124. }
  1125. TOOLKIT_API int bus_endpt_get_epid(bus_endpt_t* endpt)
  1126. {
  1127. return endpt->epid;
  1128. }
  1129. TOOLKIT_API const char* bus_endpt_get_url(bus_endpt_t* endpt)
  1130. {
  1131. return endpt->url;
  1132. }
  1133. TOOLKIT_API int bus_endpt_poll(bus_endpt_t* endpt, int timeout)
  1134. {
  1135. int result;
  1136. int rc;
  1137. int epid, type, state;
  1138. iobuffer_t* pkt = NULL;
  1139. rc = bus_endpt_poll_internal(endpt, &result, timeout);
  1140. //WLog_DBG(TAG, "bus endpt poll internal: %d, %d", rc, result);
  1141. if (rc > 0) {
  1142. if (result == BUS_RESULT_DATA) {
  1143. bus_endpt_recv_pkt(endpt, &epid, &type, &pkt);
  1144. if (endpt->callback.on_pkt)
  1145. endpt->callback.on_pkt(endpt, epid, type, &pkt, endpt->callback.user_data);
  1146. if (pkt)
  1147. iobuffer_dec_ref(pkt);
  1148. }
  1149. else if (result == BUS_RESULT_INFO) {
  1150. bus_endpt_recv_pkt(endpt, &epid, &type, &pkt);
  1151. if (endpt->callback.on_inf)
  1152. endpt->callback.on_inf(endpt, epid, type, &pkt, endpt->callback.user_data);
  1153. if (pkt)
  1154. iobuffer_dec_ref(pkt);
  1155. }
  1156. else if (result == BUS_RESULT_EVENT) {
  1157. bus_endpt_recv_evt(endpt, &epid, &type, &pkt);
  1158. if (endpt->callback.on_evt)
  1159. endpt->callback.on_evt(endpt, epid, type, &pkt, endpt->callback.user_data);
  1160. if (pkt)
  1161. iobuffer_dec_ref(pkt);
  1162. }
  1163. else if (result == BUS_RESULT_SYSTEM) {
  1164. bus_endpt_recv_sys(endpt, &epid, &state);
  1165. if (endpt->callback.on_sys)
  1166. endpt->callback.on_sys(endpt, epid, state, endpt->callback.user_data);
  1167. }
  1168. else if (result == BUS_RESULT_MSG) {
  1169. msg_t* msg = NULL;
  1170. bus_endpt_recv_msg(endpt, &msg);
  1171. if (endpt->callback.on_msg) {
  1172. endpt->callback.on_msg(endpt,
  1173. msg->type,
  1174. msg->nparam,
  1175. msg->params,
  1176. msg->evt ? &msg->evt_result : NULL,
  1177. endpt->callback.user_data);
  1178. if (msg->evt) {
  1179. WLog_DBG(TAG, "after recv msg, send finished evt.");
  1180. SetEvent(msg->evt);
  1181. }
  1182. else {
  1183. WLog_DBG(TAG, "free msg");
  1184. free_msg(msg);
  1185. }
  1186. }
  1187. else {
  1188. if (msg->evt) {
  1189. msg->evt_result = -1;
  1190. WLog_DBG(TAG, "after on msg failed, send finished evt.");
  1191. SetEvent(msg->evt);
  1192. }
  1193. else {
  1194. WLog_DBG(TAG, "free msg");
  1195. free_msg(msg);
  1196. }
  1197. }
  1198. }
  1199. else {
  1200. assert(0);
  1201. rc = -1;
  1202. }
  1203. }
  1204. return rc;
  1205. }
  1206. TOOLKIT_API int bus_endpt_set_quit_flag(bus_endpt_t* endpt)
  1207. {
  1208. endpt->quit_flag = 1;
  1209. return 0;
  1210. }
  1211. TOOLKIT_API int bus_endpt_get_quit_flag(bus_endpt_t* endpt)
  1212. {
  1213. return endpt->quit_flag;
  1214. }