#include "precompile.h" #include "bus.h" #include "sockutil.h" #include "url.h" #include "spinlock.h" #include "list.h" #include "bus_internal.h" #include "dbgutil.h" #include #include #include #include #include #define TAG TOOLKIT_TAG("bus") #include "memutil.h" #define BUS_RESULT_DATA 1 // ==BUS_TYPE_PACKET, callback: callback.on_pkt, no use #define BUS_RESULT_INFO 2 // ==BUS_TYPE_INFO, callback: callback.on_inf #define BUS_RESULT_EVENT 3 // ==BUS_TYPE_EVENT, callback: callback.on_evt , no use #define BUS_RESULT_SYSTEM 4 // ==BUS_TYPE_SYSTEM, callback: callback.on_sys #define BUS_RESULT_MSG 5 // send package msg, callback: callback.on_msg #define BUS_RESULT_UNKNOWN 6 typedef struct msg_t { struct list_head entry; int type; int nparam; int* params; HANDLE evt; int evt_result; }msg_t; struct bus_endpt_t { int type; int epid; union { HANDLE pipe_handle; SOCKET sock_handle; }; char* url; bus_endpt_callback callback; struct list_head msg_list; spinlock_t msg_lock; HANDLE msg_sem; HANDLE tx_evt; HANDLE rx_evt; OVERLAPPED rx_overlapped; int rx_pending; int rx_pending_pkt_len; iobuffer_queue_t* rx_buf_queue; volatile int quit_flag; }; static void free_msg(msg_t* msg) { free(msg->params); free(msg); } static int to_result(int pkt_type) { switch (pkt_type) { case BUS_TYPE_EVENT: return BUS_RESULT_EVENT; case BUS_TYPE_SYSTEM: return BUS_RESULT_SYSTEM; case BUS_TYPE_PACKET: return BUS_RESULT_DATA; case BUS_TYPE_INFO: return BUS_RESULT_INFO; default: break; } return BUS_RESULT_UNKNOWN; } static HANDLE create_pipe_handle(const char* name) { char tmp[MAX_PATH]; HANDLE pipe = INVALID_HANDLE_VALUE; sprintf(tmp, "\\\\.\\pipe\\%s", name); for (;;) { pipe = CreateFileA(tmp, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); if (pipe == INVALID_HANDLE_VALUE) { if (GetLastError() == ERROR_PIPE_BUSY) { if (WaitNamedPipeA(name, 20000)) continue; } } break; } return pipe; } static SOCKET create_socket_handle(const char* ip, int port) { SOCKET fd; struct sockaddr_in addr; fd = WSASocketA(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (fd == INVALID_SOCKET) return fd; { BOOL f = TRUE; u_long l = TRUE; setsockopt(fd, SOL_SOCKET, SO_DONTLINGER, (char*)&f, sizeof(f)); //setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&f, sizeof(f)); _ioctlsocket(fd, FIONBIO, &l); } memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip); if (connect(fd, (struct sockaddr*) & addr, sizeof(addr)) != 0) { if (WSAGetLastError() == WSAEWOULDBLOCK) { fd_set wr_set, ex_set; FD_ZERO(&wr_set); FD_ZERO(&ex_set); FD_SET(fd, &wr_set); FD_SET(fd, &ex_set); if (select(fd, NULL, &wr_set, &ex_set, NULL) > 0 && FD_ISSET(fd, &wr_set)) return fd; } } if (fd != INVALID_SOCKET) closesocket(fd); //return INVALID_SOCKET;//{bug} return fd; } static int tcp_send_buf(bus_endpt_t* endpt, const char* buf, int n) { DWORD left = n; DWORD offset = 0; while (left > 0) { BOOL ret; WSABUF wsabuf; DWORD dwBytesTransfer; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(overlapped)); overlapped.hEvent = endpt->tx_evt; ResetEvent(endpt->tx_evt); wsabuf.buf = (char*)buf + offset; wsabuf.len = left; ret = WSASend(endpt->sock_handle, &wsabuf, 1, &dwBytesTransfer, 0, &overlapped, NULL) == 0; if (!ret && WSAGetLastError() == WSA_IO_PENDING) { DWORD dwFlags = 0; ret = WSAGetOverlappedResult(endpt->sock_handle, &overlapped, &dwBytesTransfer, TRUE, &dwFlags); } if (ret && dwBytesTransfer) { offset += dwBytesTransfer; left -= dwBytesTransfer; } else { return -1; } } return 0; } static int pipe_send_buf(bus_endpt_t* endpt, const char* buf, int n) { DWORD left = n; DWORD offset = 0; while (left > 0) { BOOL ret; DWORD dwBytesTransfer; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(overlapped)); overlapped.hEvent = endpt->tx_evt; ResetEvent(endpt->tx_evt); ret = WriteFile(endpt->pipe_handle, buf + offset, left, &dwBytesTransfer, &overlapped); if (!ret && GetLastError() == ERROR_IO_PENDING) { ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE); } if (ret && dwBytesTransfer) { offset += dwBytesTransfer; left -= dwBytesTransfer; } else { return -1; } } return 0; } static int send_buf(bus_endpt_t* endpt, const char* buf, int n) { if (endpt->type == TYPE_PIPE) { return pipe_send_buf(endpt, buf, n); } else if (endpt->type == TYPE_TCP) { return tcp_send_buf(endpt, buf, n); } else { return -1; } } static int send_pkt_raw(bus_endpt_t* endpt, iobuffer_t* pkt) { int pkt_len = iobuffer_get_length(pkt); int rc; iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_len, 0); rc = send_buf(endpt, iobuffer_data(pkt, 0), iobuffer_get_length(pkt)); return rc; } static int pipe_recv_buf(bus_endpt_t* endpt, char* buf, DWORD n) { DWORD left = n; DWORD offset = 0; while (left > 0) { BOOL ret; DWORD dwBytesTransfer; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(overlapped)); overlapped.hEvent = endpt->rx_evt; ResetEvent(overlapped.hEvent); ret = ReadFile(endpt->pipe_handle, buf + offset, left, &dwBytesTransfer, &overlapped); if (!ret && GetLastError() == ERROR_IO_PENDING) { ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE); } if (ret && dwBytesTransfer) { offset += dwBytesTransfer; left -= dwBytesTransfer; } else { return -1; } } return 0; } static int tcp_recv_buf(bus_endpt_t* endpt, char* buf, DWORD n) { DWORD left = n; DWORD offset = 0; while (left > 0) { BOOL ret; DWORD dwFlags = 0; WSABUF wsabuf; DWORD dwBytesTransfer; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(overlapped)); overlapped.hEvent = endpt->rx_evt; wsabuf.buf = buf + offset; wsabuf.len = left; ResetEvent(overlapped.hEvent); ret = WSARecv(endpt->sock_handle, &wsabuf, 1, &dwBytesTransfer, &dwFlags, &overlapped, NULL) == 0; if (!ret && WSAGetLastError() == WSA_IO_PENDING) { ret = WSAGetOverlappedResult(endpt->sock_handle, &overlapped, &dwBytesTransfer, TRUE, &dwFlags); } if (ret && dwBytesTransfer) { offset += dwBytesTransfer; left -= dwBytesTransfer; } else { return -1; } } return 0; } static int recv_buf(bus_endpt_t* endpt, char* buf, DWORD n) { if (endpt->type == TYPE_PIPE) { return pipe_recv_buf(endpt, buf, n); } else if (endpt->type == TYPE_TCP) { return tcp_recv_buf(endpt, buf, n); } else { return -1; } } static int recv_pkt_raw(bus_endpt_t* endpt, iobuffer_t** pkt) { int pkt_len; int rc = -1; rc = recv_buf(endpt, (char*)&pkt_len, 4); if (rc != 0) return rc; *pkt = iobuffer_create(-1, pkt_len); iobuffer_push_count(*pkt, pkt_len); if (pkt_len > 0) { rc = recv_buf(endpt, iobuffer_data(*pkt, 0), pkt_len); } if (rc < 0) { iobuffer_destroy(*pkt); *pkt = NULL; } return rc; } static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt) { DWORD dwBytesTransferred; BOOL ret; int rc = 0; iobuffer_t* pkt = NULL; *p_pkt = NULL; ResetEvent(endpt->rx_evt); memset(&endpt->rx_overlapped, 0, sizeof(OVERLAPPED)); endpt->rx_overlapped.hEvent = endpt->rx_evt; if (endpt->type == TYPE_PIPE) { ret = ReadFile(endpt->pipe_handle, &endpt->rx_pending_pkt_len, 4, &dwBytesTransferred, &endpt->rx_overlapped); } else if (endpt->type == TYPE_TCP) { DWORD dwFlags = 0; WSABUF wsabuf; wsabuf.buf = (char*)&endpt->rx_pending_pkt_len; wsabuf.len = 4; ret = WSARecv(endpt->sock_handle, &wsabuf, 1, &dwBytesTransferred, &dwFlags, &endpt->rx_overlapped, NULL) == 0; } else { return -1; } if (ret) { if (dwBytesTransferred == 0) return -1; if (dwBytesTransferred < 4) { rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len + dwBytesTransferred, 4 - dwBytesTransferred); if (rc < 0) return rc; } pkt = iobuffer_create(0, endpt->rx_pending_pkt_len); if (endpt->rx_pending_pkt_len > 0) { rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len); if (rc < 0) { iobuffer_destroy(pkt); return rc; } iobuffer_push_count(pkt, endpt->rx_pending_pkt_len); } *p_pkt = pkt; } else { if (WSAGetLastError() == WSA_IO_PENDING) { endpt->rx_pending = 1; } else { return -1; } } return 0; } static int read_left_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt) { BOOL ret = 0; int rc; DWORD dwBytesTransferred = 0; iobuffer_t* pkt = NULL; if (endpt->type == TYPE_PIPE) { ret = GetOverlappedResult(endpt->pipe_handle, &endpt->rx_overlapped, &dwBytesTransferred, TRUE); } else if (endpt->type == TYPE_TCP) { DWORD dwFlags = 0; ret = WSAGetOverlappedResult(endpt->sock_handle, &endpt->rx_overlapped, &dwBytesTransferred, TRUE, &dwFlags); } else { TOOLKIT_ASSERT(0); } if (!ret || dwBytesTransferred == 0) { WLog_ERR(TAG, "(WSA)GetOverlappedResult failed: %d", GetLastError()); return -1; } if (dwBytesTransferred < 4) { rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len + dwBytesTransferred, 4 - dwBytesTransferred); if (rc < 0) { WLog_ERR(TAG, "recv buf failed."); return rc; } } pkt = iobuffer_create(-1, endpt->rx_pending_pkt_len); rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len); if (rc < 0) { WLog_ERR(TAG, "recv buf failed and destroy buffer."); iobuffer_destroy(pkt); return rc; } iobuffer_push_count(pkt, endpt->rx_pending_pkt_len); *p_pkt = pkt; endpt->rx_pending = 0; return 0; } static int append_rx_pkt(bus_endpt_t* endpt, iobuffer_t* pkt) { int type; int read_state; read_state = iobuffer_get_read_state(pkt); iobuffer_read(pkt, IOBUF_T_I4, &type, 0); iobuffer_restore_read_state(pkt, read_state); if (type == BUS_TYPE_PACKET || type == BUS_TYPE_INFO || type == BUS_TYPE_EVENT || type == BUS_TYPE_SYSTEM) { iobuffer_queue_enqueue(endpt->rx_buf_queue, pkt); return 1; } else { return -1; } } TOOLKIT_API int bus_endpt_create(const char* url, int epid, const bus_endpt_callback* callback, bus_endpt_t** p_endpt) { bus_endpt_t* endpt = NULL; char* tmp_url; url_fields uf; int rc; int v; iobuffer_t* buf = NULL; iobuffer_t* ans_buf = NULL; if (!url) return -1; tmp_url = _strdup(url); if (url_parse(tmp_url, &uf) < 0) { free(tmp_url); return -1; } endpt = ZALLOC_T(bus_endpt_t); endpt->sock_handle = -1; endpt->url = tmp_url; if (_stricmp(uf.scheme, "tcp") == 0) { endpt->type = TYPE_TCP; endpt->sock_handle = create_socket_handle(uf.host, uf.port); if (endpt->sock_handle == INVALID_SOCKET) goto on_error; } else if (_stricmp(uf.scheme, "pipe") == 0) { endpt->type = TYPE_PIPE; endpt->pipe_handle = create_pipe_handle(uf.host); if (endpt->pipe_handle == INVALID_HANDLE_VALUE) goto on_error; } else { goto on_error; } endpt->epid = epid; endpt->tx_evt = CreateEventA(NULL, TRUE, FALSE, NULL); endpt->rx_evt = CreateEventA(NULL, TRUE, FALSE, NULL); endpt->rx_buf_queue = iobuffer_queue_create(); endpt->msg_sem = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL); INIT_LIST_HEAD(&endpt->msg_list); spinlock_init(&endpt->msg_lock); memcpy(&endpt->callback, callback, sizeof(bus_endpt_callback)); buf = iobuffer_create(-1, -1); v = BUS_TYPE_ENDPT_REGISTER; iobuffer_write(buf, IOBUF_T_I4, &v, 0); v = endpt->epid; iobuffer_write(buf, IOBUF_T_I4, &v, 0); rc = send_pkt_raw(endpt, buf); if (rc != 0) goto on_error; rc = recv_pkt_raw(endpt, &ans_buf); if (rc != 0) { goto on_error; } iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0); iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0); if (rc != 0) goto on_error; url_free_fields(&uf); if (buf) iobuffer_destroy(buf); if (ans_buf) iobuffer_destroy(ans_buf); *p_endpt = endpt; return 0; on_error: if (endpt->type == TYPE_TCP) { closesocket(endpt->sock_handle); } else if (endpt->type == TYPE_PIPE) { CloseHandle(endpt->pipe_handle); } if (endpt->msg_sem) CloseHandle(endpt->msg_sem); if (endpt->tx_evt) CloseHandle(endpt->tx_evt); if (endpt->rx_evt) CloseHandle(endpt->rx_evt); if (endpt->rx_buf_queue) iobuffer_queue_destroy(endpt->rx_buf_queue); if (endpt->url) free(endpt->url); free(endpt); url_free_fields(&uf); if (buf) iobuffer_destroy(buf); if (ans_buf) iobuffer_destroy(ans_buf); return -1; } TOOLKIT_API void bus_endpt_destroy(bus_endpt_t* endpt) { int rc = -1; iobuffer_t* buf = NULL; iobuffer_t* ans_buf = NULL; int v; TOOLKIT_ASSERT(endpt); buf = iobuffer_create(-1, -1); v = BUS_TYPE_ENDPT_UNREGISTER; iobuffer_write(buf, IOBUF_T_I4, &v, 0); v = endpt->epid; iobuffer_write(buf, IOBUF_T_I4, &v, 0); rc = send_pkt_raw(endpt, buf); if (rc != 0) goto on_error; rc = recv_pkt_raw(endpt, &ans_buf); if (rc != 0) goto on_error; iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0); iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0); on_error: /** try to release memory as possible as i can [5/22/2020 Gifur] */ spinlock_enter(&endpt->msg_lock, -1); if (!list_empty(&endpt->msg_list)) { msg_t* msg, * t; list_for_each_entry_safe(msg, t, &endpt->msg_list, msg_t, entry) { list_del(&msg->entry); if (msg->evt) CloseHandle(msg->evt); if (msg->type == 2/*IOM_T_SEND_INFO*/) { iobuffer_t* pkt = (iobuffer_t*)msg->params[5]; if (pkt) iobuffer_dec_ref(pkt); } free_msg(msg); } } spinlock_leave(&endpt->msg_lock); if (buf) iobuffer_destroy(buf); if (ans_buf) iobuffer_destroy(ans_buf); if (endpt->type == TYPE_TCP) { closesocket(endpt->sock_handle); } else if (endpt->type == TYPE_PIPE) { CloseHandle(endpt->pipe_handle); } if (endpt->msg_sem) CloseHandle(endpt->msg_sem); if (endpt->tx_evt) CloseHandle(endpt->tx_evt); if (endpt->rx_evt) CloseHandle(endpt->rx_evt); if (endpt->rx_buf_queue) iobuffer_queue_destroy(endpt->rx_buf_queue); if (endpt->url) free(endpt->url); free(endpt); } // 1 : recv ok // 0 : time out // <0 : error static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout) { iobuffer_t* pkt = NULL; int rc; BOOL ret; TOOLKIT_ASSERT(endpt); // peek first packge type if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) { iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue); int pkt_type; int read_state = iobuffer_get_read_state(pkt); iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, NULL); iobuffer_restore_read_state(pkt, read_state); *result = to_result(pkt_type); if (*result == BUS_RESULT_UNKNOWN) { WLog_ERR(TAG, "bug: unknown pkt type!"); return -1; } return 1; } // no received package, try to receive one if (!endpt->rx_pending) { rc = start_read_pkt(endpt, &pkt); if (rc < 0) return rc; if (pkt) { WLog_DBG(TAG, "pkt has read"); rc = append_rx_pkt(endpt, pkt); if (rc < 0) { iobuffer_destroy(pkt); return -1; } } else { WLog_DBG(TAG, "pending"); } } // if receive is pending, wait for send or receive complete event if (!pkt) { HANDLE hs[] = { endpt->msg_sem, endpt->rx_evt }; ret = WaitForMultipleObjects(array_size(hs), &hs[0], FALSE, (DWORD)timeout); if (ret == WAIT_TIMEOUT) { return 0; } else if (ret == WAIT_OBJECT_0) { *result = BUS_RESULT_MSG; // indicate send package event return 1; } else if (ret == WAIT_OBJECT_0 + 1) { rc = read_left_pkt(endpt, &pkt); if (rc < 0) { WLog_ERR(TAG, "read_left_pkt failed: %d", rc); return rc; } if (pkt) { rc = append_rx_pkt(endpt, pkt); if (rc < 0) { iobuffer_destroy(pkt); WLog_ERR(TAG, "append_rx_pkt failed: %d", rc); return -1; } } } } else { WLog_ERR(TAG, "pkt has readed"); } if (pkt) { int type; int read_state = iobuffer_get_read_state(pkt); iobuffer_read(pkt, IOBUF_T_I4, &type, 0); iobuffer_restore_read_state(pkt, read_state); *result = to_result(type); if (*result == BUS_RESULT_UNKNOWN) { WLog_ERR(TAG, "bug: unknown pkt type!"); return -1; } } else { WLog_ERR(TAG, "not pkt"); return -1; } return 1; } static int recv_until(bus_endpt_t* endpt, int type, iobuffer_t** p_ansbuf) { int rc; iobuffer_t* ans_pkt = NULL; int ans_type; for (;;) { if (!endpt->rx_pending) { rc = start_read_pkt(endpt, &ans_pkt); if (rc < 0) { break; } } if (!ans_pkt) { DWORD ret = WaitForSingleObject(endpt->rx_evt, INFINITE); if (ret != WAIT_OBJECT_0) return -1; rc = read_left_pkt(endpt, &ans_pkt); if (rc < 0) break; } if (ans_pkt) { int read_state = iobuffer_get_read_state(ans_pkt); iobuffer_read(ans_pkt, IOBUF_T_I4, &ans_type, 0); iobuffer_restore_read_state(ans_pkt, read_state); if (ans_type == type) { *p_ansbuf = ans_pkt; break; } else { rc = append_rx_pkt(endpt, ans_pkt); if (rc < 0) { iobuffer_destroy(ans_pkt); break; } else { ans_pkt = NULL; } } } } return rc; } static int recv_until_result(bus_endpt_t* endpt, int* p_result) { int rc; iobuffer_t* ans_pkt = NULL; int type, error; rc = recv_until(endpt, BUS_TYPE_ERROR, &ans_pkt); if (rc < 0) return rc; iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0); iobuffer_read(ans_pkt, IOBUF_T_I4, &error, 0); iobuffer_destroy(ans_pkt); *p_result = error; return rc; } static int recv_until_state(bus_endpt_t* endpt, int* p_state) { int rc; iobuffer_t* ans_pkt = NULL; int type, epid, state; rc = recv_until(endpt, BUS_TYPE_ENDPT_GET_STATE, &ans_pkt); if (rc < 0) return rc; iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0); iobuffer_read(ans_pkt, IOBUF_T_I4, &epid, 0); iobuffer_read(ans_pkt, IOBUF_T_I4, &state, 0); iobuffer_destroy(ans_pkt); *p_state = state; return rc; } TOOLKIT_API int bus_endpt_send_pkt(bus_endpt_t* endpt, int epid, int type, iobuffer_t* pkt) { int t; int rc; int read_state; int write_state; int error; char bussinessId[LINKINFO_BUSSID_LEN]; char traceId[LINKINFO_TRACEID_LEN]; char spanId[LINKINFO_SPANID_LEN]; char parentSpanId[LINKINFO_PARENTSPANID_LEN]; TOOLKIT_ASSERT(endpt); read_state = iobuffer_get_read_state(pkt); write_state = iobuffer_get_write_state(pkt); /* t = iobuffer_get_linkId(pkt); iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); */ iobuffer_get_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId); iobuffer_write_head(pkt, IOBUF_T_BUF, parentSpanId, sizeof(parentSpanId)); iobuffer_write_head(pkt, IOBUF_T_BUF, spanId, sizeof(spanId)); iobuffer_write_head(pkt, IOBUF_T_BUF, traceId, sizeof(traceId)); iobuffer_write_head(pkt, IOBUF_T_BUF, bussinessId, sizeof(bussinessId)); t = epid; // remote epid iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = endpt->epid; // local epid iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = type; // user type iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = BUS_TYPE_PACKET; // type iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); rc = send_pkt_raw(endpt, pkt); iobuffer_restore_read_state(pkt, read_state); iobuffer_restore_write_state(pkt, write_state); if (rc < 0) return rc; rc = recv_until_result(endpt, &error); if (rc == 0 && error != 0) rc = error; return rc; } TOOLKIT_API int bus_endpt_send_info(bus_endpt_t* endpt, int epid, int type, iobuffer_t* pkt) { int t; int rc; int read_state; int write_state; char bussinessId[LINKINFO_BUSSID_LEN]; char traceId[LINKINFO_TRACEID_LEN]; char spanId[LINKINFO_SPANID_LEN]; char parentSpanId[LINKINFO_PARENTSPANID_LEN]; TOOLKIT_ASSERT(endpt); read_state = iobuffer_get_read_state(pkt); write_state = iobuffer_get_write_state(pkt); /* t = iobuffer_get_linkId(pkt); iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); */ iobuffer_get_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId); iobuffer_write_head(pkt, IOBUF_T_BUF, parentSpanId, sizeof(parentSpanId)); iobuffer_write_head(pkt, IOBUF_T_BUF, spanId, sizeof(spanId)); iobuffer_write_head(pkt, IOBUF_T_BUF, traceId, sizeof(traceId)); iobuffer_write_head(pkt, IOBUF_T_BUF, bussinessId, sizeof(bussinessId)); t = epid; // remote epid iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = endpt->epid; // local epid iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = type; // user type iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = BUS_TYPE_INFO; // type iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); rc = send_pkt_raw(endpt, pkt); iobuffer_restore_read_state(pkt, read_state); iobuffer_restore_write_state(pkt, write_state); return rc; } TOOLKIT_API int bus_endpt_bcast_evt(bus_endpt_t* endpt, int type, iobuffer_t* pkt) { int t; int rc; int read_state; int write_state; TOOLKIT_ASSERT(endpt); read_state = iobuffer_get_read_state(pkt); write_state = iobuffer_get_write_state(pkt); t = endpt->epid; iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = type; iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = BUS_TYPE_EVENT; iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); rc = send_pkt_raw(endpt, pkt); iobuffer_restore_read_state(pkt, read_state); iobuffer_restore_write_state(pkt, write_state); return rc; } static int bus_endpt_recv_pkt(bus_endpt_t* endpt, int* p_epid, int* p_type, iobuffer_t** p_pkt) { if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) { iobuffer_t *pkt = iobuffer_queue_head(endpt->rx_buf_queue); int read_state = iobuffer_get_read_state(pkt); int pkt_type, usr_type, from_epid, to_epid, link_id; char bussinessId[LINKINFO_BUSSID_LEN] ,traceId[LINKINFO_TRACEID_LEN], spanId[LINKINFO_SPANID_LEN], parentSpanId[LINKINFO_PARENTSPANID_LEN]; int readLen = 0; memset(bussinessId, 0, LINKINFO_BUSSID_LEN); memset(traceId, 0, LINKINFO_TRACEID_LEN); memset(spanId, 0, LINKINFO_SPANID_LEN); memset(parentSpanId, 0, LINKINFO_PARENTSPANID_LEN); iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0); if (pkt_type == BUS_TYPE_PACKET || pkt_type == BUS_TYPE_INFO) { iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0); iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0); //iobuffer_read(pkt, IOBUF_T_I4, &link_id, 0); //MessageBox(NULL, NULL, NULL, 0); readLen = LINKINFO_BUSSID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, bussinessId, &readLen); readLen = LINKINFO_TRACEID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, traceId, &readLen); readLen = LINKINFO_SPANID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, spanId, &readLen); readLen = LINKINFO_PARENTSPANID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, parentSpanId, &readLen); iobuffer_set_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId); if (p_epid) *p_epid = from_epid; if (p_type) *p_type = usr_type; iobuffer_queue_deque(endpt->rx_buf_queue); if (p_pkt) { *p_pkt = pkt; } else { iobuffer_destroy(pkt); } return 0; } else { iobuffer_restore_read_state(pkt, read_state); } } return -1; } static int bus_endpt_recv_evt(bus_endpt_t* endpt, int* p_epid, int* p_type, iobuffer_t** p_pkt) { if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) { iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue); int read_state = iobuffer_get_read_state(pkt); int pkt_type, usr_type, from_epid, link_id; iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0); if (pkt_type == BUS_TYPE_EVENT) { //iobuffer_read(pkt, IOBUF_T_I4, &link_id, 0); iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0); if (p_epid) *p_epid = from_epid; if (p_type) *p_type = usr_type; iobuffer_queue_deque(endpt->rx_buf_queue); if (p_pkt) { *p_pkt = pkt; } else { iobuffer_destroy(pkt); } return 0; } else { iobuffer_restore_read_state(pkt, read_state); } } return -1; } static int bus_endpt_recv_sys(bus_endpt_t* endpt, int* p_epid, int* p_state) { if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) { iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue); int read_state = iobuffer_get_read_state(pkt); int pkt_type, epid, state; iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0); if (pkt_type == BUS_TYPE_SYSTEM) { iobuffer_read(pkt, IOBUF_T_I4, &epid, 0); iobuffer_read(pkt, IOBUF_T_I4, &state, 0); if (p_epid) *p_epid = epid; if (p_state) *p_state = state; iobuffer_queue_deque(endpt->rx_buf_queue); iobuffer_destroy(pkt); return 0; } else { iobuffer_restore_read_state(pkt, read_state); } } return -1; } static int bus_endpt_recv_msg(bus_endpt_t* endpt, msg_t** p_msg) { int rc = -1; TOOLKIT_ASSERT(endpt); TOOLKIT_ASSERT(p_msg); spinlock_enter(&endpt->msg_lock, -1); if (!list_empty(&endpt->msg_list)) { msg_t* e = list_first_entry(&endpt->msg_list, msg_t, entry); list_del(&e->entry); rc = 0; *p_msg = e; } spinlock_leave(&endpt->msg_lock); return rc; } TOOLKIT_API int bus_endpt_get_state(bus_endpt_t* endpt, int epid, int* p_state) { iobuffer_t* buf = NULL; int v; int rc = -1; TOOLKIT_ASSERT(endpt); buf = iobuffer_create(-1, -1); v = BUS_TYPE_ENDPT_GET_STATE; iobuffer_write(buf, IOBUF_T_I4, &v, 0); v = epid; iobuffer_write(buf, IOBUF_T_I4, &v, 0); rc = send_pkt_raw(endpt, buf); if (rc < 0) goto on_error; rc = recv_until_state(endpt, p_state); on_error: if (buf) iobuffer_destroy(buf); return rc; } TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t* endpt, int msg, int nparam, int params[]) { msg_t* e; TOOLKIT_ASSERT(endpt); WLog_DBG(TAG, "==> endpt(%d) post msg: %d", endpt->epid, msg); e = MALLOC_T(msg_t); if (e == NULL) { return -1; } e->type = msg; e->nparam = nparam; if (nparam) { e->params = (int*)malloc(sizeof(int) * nparam); if(e->params != NULL) memcpy(e->params, params, sizeof(int) * nparam); } else { e->params = NULL; } e->evt = NULL; spinlock_enter(&endpt->msg_lock, -1); list_add_tail(&e->entry, &endpt->msg_list); spinlock_leave(&endpt->msg_lock); ReleaseSemaphore(endpt->msg_sem, 1, NULL); WLog_DBG(TAG, "<== endpt(%d) post msg: %d", endpt->epid, msg); return 0; } TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t* endpt, int msg, int nparam, int params[]) { msg_t e; TOOLKIT_ASSERT(endpt); e.type = msg; e.nparam = nparam; if (nparam) { e.params = (int*)malloc(sizeof(int) * nparam); memcpy(e.params, params, sizeof(int) * nparam); } else { e.params = NULL; } e.evt_result = 0; e.evt = CreateEventA(NULL, TRUE, FALSE, NULL); spinlock_enter(&endpt->msg_lock, -1); list_add_tail(&e.entry, &endpt->msg_list); spinlock_leave(&endpt->msg_lock); ReleaseSemaphore(endpt->msg_sem, 1, NULL); WaitForSingleObject(e.evt, INFINITE); CloseHandle(e.evt); if (nparam) { free(e.params); } return e.evt_result; } TOOLKIT_API int bus_endpt_get_epid(bus_endpt_t* endpt) { return endpt->epid; } TOOLKIT_API const char* bus_endpt_get_url(bus_endpt_t* endpt) { return endpt->url; } TOOLKIT_API int bus_endpt_poll(bus_endpt_t* endpt, int timeout) { int result; int rc; int epid, type, state; iobuffer_t* pkt = NULL; rc = bus_endpt_poll_internal(endpt, &result, timeout); if (rc > 0) { if (result == BUS_RESULT_DATA) { bus_endpt_recv_pkt(endpt, &epid, &type, &pkt); WLog_INFO(TAG, "bus_endpt_recv_pkt BUS_RESULT_DATA, epid:%d, type:0x%08X", epid, type); if (endpt->callback.on_pkt) endpt->callback.on_pkt(endpt, epid, type, &pkt, endpt->callback.user_data); if (pkt) iobuffer_dec_ref(pkt); } else if (result == BUS_RESULT_INFO) { bus_endpt_recv_pkt(endpt, &epid, &type, &pkt); WLog_INFO(TAG, "bus_endpt_recv_pkt BUS_RESULT_INFO, epid:%d, type:0x%08X", epid, type); if (endpt->callback.on_inf) endpt->callback.on_inf(endpt, epid, type, &pkt, endpt->callback.user_data); if (pkt) iobuffer_dec_ref(pkt); } else if (result == BUS_RESULT_EVENT) { bus_endpt_recv_evt(endpt, &epid, &type, &pkt); WLog_INFO(TAG, "bus_endpt_recv_pkt BUS_RESULT_EVENT, epid:%d, type:0x%08X", epid, type); if (endpt->callback.on_evt) endpt->callback.on_evt(endpt, epid, type, &pkt, endpt->callback.user_data); if (pkt) iobuffer_dec_ref(pkt); } else if (result == BUS_RESULT_SYSTEM) { bus_endpt_recv_sys(endpt, &epid, &state); WLog_INFO(TAG, "bus_endpt_recv_pkt BUS_RESULT_SYSTEM, epid:%d, state:%d", epid, state); if (endpt->callback.on_sys) endpt->callback.on_sys(endpt, epid, state, endpt->callback.user_data); } else if (result == BUS_RESULT_MSG) { msg_t* msg = NULL; bus_endpt_recv_msg(endpt, &msg); WLog_INFO(TAG, "bus_endpt_recv_pkt BUS_RESULT_MSG, type:0x%08X", msg->type); if (endpt->callback.on_msg) { endpt->callback.on_msg(endpt, msg->type, msg->nparam, msg->params, msg->evt ? &msg->evt_result : NULL, endpt->callback.user_data); if (msg->evt) SetEvent(msg->evt); else free_msg(msg); } else { if (msg->evt) { msg->evt_result = -1; SetEvent(msg->evt); } else { free_msg(msg); } } } else { TOOLKIT_ASSERT(0); rc = -1; } } else if (rc < 0) { WLog_DBG(TAG, "bus_endpt_poll_internal failed, rc = %d", rc); } return rc; } TOOLKIT_API int bus_endpt_set_quit_flag(bus_endpt_t* endpt) { endpt->quit_flag = 1; return 0; } TOOLKIT_API int bus_endpt_get_quit_flag(bus_endpt_t* endpt) { return endpt->quit_flag; }