#include "precompile.h" #include "ioqueue.h" #include "bus.h" #include "bus_internal.h" #include "url.h" #include "refcnt.h" #include "list.h" #include "iobuffer.h" #include "array.h" #include "dbgutil.h" #include #include #include #include #include #include #include #define TAG TOOLKIT_TAG("bus_deamon") #define MAX_THREADS 32 #define DEFAULT_ACCEPT_OP_COUNT 5 #define MSG_REMOVE_REGISTAR 1 typedef struct endpt_session_t endpt_session_t; typedef struct daemon_accetpor_t daemon_accetpor_t; struct endpt_session_t { DECLARE_REF_COUNT_MEMBER(ref_cnt); int epid; int registered; union { ioqueue_tcpsock_t tcp; ioqueue_file_t pipe; }; CRITICAL_SECTION lock; time_t start_time; int err; int rx_pending_pkt_len; int type; int tx_pending; struct list_head entry; iobuffer_queue_t *tx_iobuf_queue; ioqueue_overlapped_t rx_overlapped; ioqueue_overlapped_t tx_overlapped; iobuffer_t *rx_pending_buf; iobuffer_t *tx_pending_buf; bus_daemon_t *daemon; daemon_accetpor_t *acceptor; }; struct daemon_accetpor_t { DECLARE_REF_COUNT_MEMBER(ref_cnt); union { ioqueue_acceptor_t tcp_acceptor; ioqueue_pipe_acceptor_t pipe_acceptor; }; int type; array_header_t *arr_ov; bus_daemon_t *daemon; }; struct bus_daemon_t { ioqueue_t *ioq; volatile LONG lstop; CRITICAL_SECTION lock; int nthread; array_header_t *arr_uri; array_header_t *arr_acceptor; array_header_t *arr_thread; struct list_head registered_session_list; struct list_head unregistered_session_list; }; DECLARE_REF_COUNT_STATIC(endpt_session, endpt_session_t) DECLARE_REF_COUNT_STATIC(daemon_accetpor, daemon_accetpor_t) static unsigned int __stdcall thread_proc(void *param) { bus_daemon_t *daemon = (bus_daemon_t*)param; SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); while (!daemon->lstop) { ioqueue_poll(daemon->ioq, 10); } return 0; } static int start_accept(daemon_accetpor_t *acceptor, int idx); static int queue_ans_pkt(endpt_session_t *session, int rc); static int start_send_pkt(endpt_session_t *session, iobuffer_t **pkt); static int session_start_recv_hdr(endpt_session_t *session); static int queue_sys_pkt(endpt_session_t *session, int epid, int state); static void daemon_lock(bus_daemon_t *daemon) { EnterCriticalSection(&daemon->lock); } static void daemon_unlock(bus_daemon_t *daemon) { LeaveCriticalSection(&daemon->lock); } static void session_lock(endpt_session_t *session) { EnterCriticalSection(&session->lock); } static void session_unlock(endpt_session_t *session) { LeaveCriticalSection(&session->lock); } static void add_unregistered_list(endpt_session_t *session) { bus_daemon_t *daemon = session->daemon; daemon_lock(daemon); list_add_tail(&session->entry, &daemon->unregistered_session_list); daemon_unlock(daemon); } static void remove_session_list(endpt_session_t *session) { bus_daemon_t *daemon = session->daemon; daemon_lock(daemon); list_del(&session->entry); if (session->epid != BUS_INVALID_EPID) { endpt_session_t *pos, *n; list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) { queue_sys_pkt(pos, session->epid, BUS_STATE_OFF); } } daemon_unlock(daemon); } static void move_to_registered_session(endpt_session_t *session) { bus_daemon_t *daemon = session->daemon; daemon_lock(daemon); { endpt_session_t *pos, *n; list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) { queue_sys_pkt(pos, session->epid, BUS_STATE_ON); } } list_move_tail(&session->entry, &daemon->registered_session_list); daemon_unlock(daemon); } static void move_to_unregistered_session(endpt_session_t *session) { bus_daemon_t *daemon = session->daemon; daemon_lock(daemon); list_move_tail(&session->entry, &daemon->unregistered_session_list); daemon_unlock(daemon); } static endpt_session_t* create_session(daemon_accetpor_t *acceptor, int type, int fd) { int rc; endpt_session_t *session; session = ZALLOC_T(endpt_session_t); if (type == TYPE_PIPE) { rc = ioqueue_pipe_acceptor_create_client(&acceptor->pipe_acceptor, (HANDLE)fd, &session->pipe); if (rc < 0) goto on_error; } else if (type == TYPE_TCP) { rc = ioqueue_acceptor_create_client(&acceptor->tcp_acceptor, fd, &session->tcp); if (rc < 0) goto on_error; } session->epid = BUS_INVALID_EPID; session->start_time = time(NULL); session->daemon = acceptor->daemon; session->acceptor = acceptor; session->type = type; REF_COUNT_INIT(&session->ref_cnt); InitializeCriticalSection(&session->lock); session->tx_iobuf_queue = iobuffer_queue_create(); return session; on_error: free(session); return NULL; } static void destroy_session(endpt_session_t *session) { while (iobuffer_queue_count(session->tx_iobuf_queue) > 0) { iobuffer_t *iobuf = iobuffer_queue_deque(session->tx_iobuf_queue); endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(iobuf); if (ts) { queue_ans_pkt(ts, BUS_E_NETBROKEN); endpt_session_dec_ref(ts); } iobuffer_destroy(iobuf); } iobuffer_queue_destroy(session->tx_iobuf_queue); DeleteCriticalSection(&session->lock); if (session->type == TYPE_PIPE) { ioqueue_file_destroy(&session->pipe); } else if (session->type == TYPE_TCP) { ioqueue_tcpsock_destroy(&session->tcp); } else { TOOLKIT_ASSERT(0); } free(session); } IMPLEMENT_REF_COUNT_MT_STATIC(endpt_session, endpt_session_t, ref_cnt, destroy_session) static endpt_session_t *find_session(bus_daemon_t *daemon, int epid) { endpt_session_t *session = NULL, *pos; daemon_lock(daemon); list_for_each_entry(pos, &daemon->registered_session_list, endpt_session_t, entry) { if (pos->epid == epid) { endpt_session_inc_ref(pos); session = pos; break; } } daemon_unlock(daemon); return session; } static void on_send_pkt(endpt_session_t *session, int err) { iobuffer_t *pkt; session_lock(session); pkt = session->tx_pending_buf; session->tx_pending_buf = NULL; session->tx_pending = 0; session_unlock(session); if (pkt) { if (iobuffer_get_user_data(pkt)) { endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(pkt); queue_ans_pkt(ts, err ? BUS_E_NETBROKEN : BUS_E_OK); endpt_session_dec_ref(ts); } iobuffer_destroy(pkt); pkt = NULL; } if (!err) { int rc = 0; session_lock(session); if (!session->tx_pending) { if (iobuffer_queue_count(session->tx_iobuf_queue)) { iobuffer_t *tpkt = iobuffer_queue_deque(session->tx_iobuf_queue); session->tx_pending = 1; rc = start_send_pkt(session, &tpkt); if (rc < 0) { session->tx_pending = 0; } if (tpkt) { pkt = tpkt; } } } session_unlock(session); if (rc < 0) { if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) { remove_session_list(session); endpt_session_dec_ref(session); } } if (pkt) { if (iobuffer_get_user_data(pkt)) { endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(pkt); queue_ans_pkt(ts, err ? BUS_E_NETBROKEN : BUS_E_OK); endpt_session_dec_ref(ts); } iobuffer_destroy(pkt); pkt = NULL; } } else { session_lock(session); while (iobuffer_queue_count(session->tx_iobuf_queue) > 0) { iobuffer_t *iobuf = iobuffer_queue_deque(session->tx_iobuf_queue); endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(iobuf); if (ts) { session_unlock(session); queue_ans_pkt(ts, BUS_E_NETBROKEN); session_lock(session); endpt_session_dec_ref(ts); } iobuffer_destroy(iobuf); } session_unlock(session); } endpt_session_dec_ref(session); } static void on_pipe_send_pkt(ioqueue_file_t* file, ioqueue_overlapped_t *overlapped, void *buf, unsigned int transfer_bytes, void *user_data, int err) { on_send_pkt((endpt_session_t*)user_data, err); } static void on_tcp_send_pkt(ioqueue_tcpsock_t* tcpsock, ioqueue_overlapped_t *overlapped, void *buf, unsigned int transfer_bytes, void *user_data, int err) { on_send_pkt((endpt_session_t*)user_data, err); } static int start_send_pkt(endpt_session_t *session, iobuffer_t **pkt) { int rc = -1; int v; if (session->err < 0) return rc; v = iobuffer_get_length(*pkt); iobuffer_write_head(*pkt, IOBUF_T_I4, &v, 0); endpt_session_inc_ref(session); session->tx_pending_buf = *pkt; *pkt = NULL; if (session->type == TYPE_PIPE) { rc = ioqueue_file_async_writen(&session->pipe, &session->tx_overlapped, iobuffer_data(session->tx_pending_buf, 0), iobuffer_get_length(session->tx_pending_buf), &on_pipe_send_pkt, session); } else if (session->type == TYPE_TCP) { rc = ioqueue_tcpsock_async_sendn(&session->tcp, &session->tx_overlapped, iobuffer_data(session->tx_pending_buf, 0), iobuffer_get_length(session->tx_pending_buf), &on_tcp_send_pkt, session); } else { TOOLKIT_ASSERT(0); } if (rc < 0) { *pkt = session->tx_pending_buf; endpt_session_dec_ref(session); } return rc; } static int queue_tx_pkt(endpt_session_t *session, iobuffer_t **pkt) { int rc = -1; session_lock(session); if (session->tx_pending) { iobuffer_queue_enqueue(session->tx_iobuf_queue, *pkt); *pkt = NULL; rc = 0; } else { session->tx_pending = 1; rc = start_send_pkt(session, pkt); } session_unlock(session); if (rc < 0) { if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) { ioqueue_post_message(session->daemon->ioq, MSG_REMOVE_REGISTAR, (int)session, 0); } } return rc; } static iobuffer_t *make_ans_pkt(int rc) { int v; iobuffer_t *pkt = iobuffer_create(-1, -1); v = BUS_TYPE_ERROR; iobuffer_write(pkt, IOBUF_T_I4, &v, 0); v = rc; iobuffer_write(pkt, IOBUF_T_I4, &v, 0); return pkt; } static int queue_ans_pkt(endpt_session_t *session, int rc) { iobuffer_t *pkt = make_ans_pkt(rc); int err = queue_tx_pkt(session, &pkt); if (pkt) { iobuffer_destroy(pkt); } return err; } static iobuffer_t *make_ans_state_pkt(bus_daemon_t *daemon, int epid) { int v; endpt_session_t *ts; iobuffer_t *pkt; int state; ts = find_session(daemon, epid); if (!ts) { state = BUS_STATE_OFF; } else { state = BUS_STATE_ON; endpt_session_dec_ref(ts); ts = NULL; } pkt = iobuffer_create(-1, -1); v = BUS_TYPE_ENDPT_GET_STATE; iobuffer_write(pkt, IOBUF_T_I4, &v, 0); v = epid; iobuffer_write(pkt, IOBUF_T_I4, &v, 0); v = state; iobuffer_write(pkt, IOBUF_T_I4, &v, 0); return pkt; } static iobuffer_t *make_sys_pkt(int epid, int state) { int v; iobuffer_t *pkt = iobuffer_create(-1, -1); v = BUS_TYPE_SYSTEM; iobuffer_write(pkt, IOBUF_T_I4, &v, 0); v = epid; iobuffer_write(pkt, IOBUF_T_I4, &v, 0); v = state; iobuffer_write(pkt, IOBUF_T_I4, &v, 0); return pkt; } static int queue_sys_pkt(endpt_session_t *session, int epid, int state) { iobuffer_t *pkt = make_sys_pkt(epid, state); int err = queue_tx_pkt(session, &pkt); if (pkt) { iobuffer_destroy(pkt); } return err; } static int on_process_pkt(endpt_session_t *session, iobuffer_t **ppkt) { bus_daemon_t *daemon = session->daemon; iobuffer_t *pkt = *ppkt; int type; int read_state; int rc = -1; read_state = iobuffer_get_read_state(pkt); iobuffer_read(pkt, IOBUF_T_I4, &type, 0); if (session->registered) { if (type == BUS_TYPE_ENDPT_UNREGISTER) { move_to_unregistered_session(session); session->registered = 0; queue_ans_pkt(session, BUS_E_OK); rc = -1; } else if (type == BUS_TYPE_ENDPT_GET_STATE) { int epid; iobuffer_t *ans_pkt; iobuffer_read(pkt, IOBUF_T_I4, &epid, 0); ans_pkt = make_ans_state_pkt(session->daemon, epid); rc = queue_tx_pkt(session, &ans_pkt); if (ans_pkt) iobuffer_dec_ref(ans_pkt); } else if (type == BUS_TYPE_PACKET) { int from_epid; int to_epid; int user_type; iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0); iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0); iobuffer_restore_read_state(pkt, read_state); if (to_epid == session->epid) { endpt_session_inc_ref(session); iobuffer_set_user_data(pkt, session); rc = queue_tx_pkt(session, ppkt); if (rc < 0) { endpt_session_dec_ref(session); iobuffer_set_user_data(pkt, NULL); } } else { endpt_session_t *ts = find_session(session->daemon, to_epid); if (!ts) { rc = queue_ans_pkt(session, BUS_E_NOTFOUND); } else { if (ts->err) { rc = queue_ans_pkt(session, BUS_E_NETBROKEN); } else { endpt_session_inc_ref(session); iobuffer_set_user_data(pkt, session); rc = queue_tx_pkt(ts, ppkt); if (rc < 0) { endpt_session_dec_ref(session); iobuffer_set_user_data(pkt, NULL); rc = queue_ans_pkt(session, BUS_E_NETBROKEN); } } endpt_session_dec_ref(ts); // find session } } } else if (type == BUS_TYPE_EVENT) { int epid; int user_type; iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &epid, 0); iobuffer_restore_read_state(pkt, read_state); daemon_lock(session->daemon); { endpt_session_t *pos, *n; list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) { if (pos != session) { iobuffer_t *tbuf = iobuffer_clone(pkt); queue_tx_pkt(pos, &tbuf); if (tbuf) iobuffer_destroy(tbuf); } } } daemon_unlock(session->daemon); rc = 0; } else if (type == BUS_TYPE_INFO) { int from_epid; int to_epid; int user_type; iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0); iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0); iobuffer_restore_read_state(pkt, read_state); if (to_epid == session->epid) { queue_tx_pkt(session, ppkt); } else { endpt_session_t *ts = find_session(session->daemon, to_epid); if (ts) { if (!ts->err) { queue_tx_pkt(ts, ppkt); } endpt_session_dec_ref(ts); // find session } } rc = 0; } } else { if (type == BUS_TYPE_ENDPT_REGISTER) { int epid; endpt_session_t *ts; iobuffer_read(pkt, IOBUF_T_I4, &epid, 0); ts = find_session(session->daemon, epid); if (!ts) { session->registered = 1; session->epid = epid; rc = queue_ans_pkt(session, BUS_E_OK); if (rc == 0) { move_to_registered_session(session); } } else { endpt_session_dec_ref(ts); } } } if (*ppkt) iobuffer_restore_read_state(pkt, read_state); return rc; } static void on_recv_body(endpt_session_t *session, unsigned int transfer_bytes, int err) { int rc = -1; if (!err) { iobuffer_t *pkt = session->rx_pending_buf; session->rx_pending_buf = NULL; iobuffer_push_count(pkt, transfer_bytes); rc = on_process_pkt(session, &pkt); if (rc == 0) { rc = session_start_recv_hdr(session); } if (pkt) iobuffer_dec_ref(pkt); } if (rc < 0) { if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) { remove_session_list(session); endpt_session_dec_ref(session); } } endpt_session_dec_ref(session); } static void on_pipe_recv_body(ioqueue_file_t* file, ioqueue_overlapped_t *overlapped, void *buf, unsigned int transfer_bytes, void *user_data, int err) { on_recv_body((endpt_session_t*)user_data, transfer_bytes, err); } static void on_tcp_recv_body(ioqueue_tcpsock_t* tcpsock, ioqueue_overlapped_t *overlapped, void *buf, unsigned int transfer_bytes, void *user_data, int err) { on_recv_body((endpt_session_t*)user_data, transfer_bytes, err); } static int session_start_recv_body(endpt_session_t *session) { int rc = -1; if (session->err < 0) return rc; endpt_session_inc_ref(session); session->rx_pending_buf = iobuffer_create(-1, session->rx_pending_pkt_len); if (session->type == TYPE_PIPE) { rc = ioqueue_file_async_readn(&session->pipe, &session->rx_overlapped, iobuffer_data(session->rx_pending_buf, 0), session->rx_pending_pkt_len, &on_pipe_recv_body, session); } else if (session->type == TYPE_TCP) { rc = ioqueue_tcpsock_async_recvn(&session->tcp, &session->rx_overlapped, iobuffer_data(session->rx_pending_buf, 0), session->rx_pending_pkt_len, &on_tcp_recv_body, session); } if (rc < 0) { iobuffer_destroy(session->rx_pending_buf); session->rx_pending_buf = NULL; endpt_session_dec_ref(session); } return rc; } static void on_recv_hdr(endpt_session_t *session, int err) { int rc = -1; if (!err) { rc = session_start_recv_body(session); } if (rc < 0) { if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) { remove_session_list(session); endpt_session_dec_ref(session); } } endpt_session_dec_ref(session); } static void on_pipe_recv_hdr(ioqueue_file_t *file, ioqueue_overlapped_t *overlapped, void *buf, unsigned int transfer_bytes, void *user_data, int err) { on_recv_hdr((endpt_session_t*)user_data, err); } static void on_tcp_recv_hdr(ioqueue_tcpsock_t *tcpsock, ioqueue_overlapped_t *overlapped, void *buf, unsigned int transfer_bytes, void *user_data, int err) { on_recv_hdr((endpt_session_t*)user_data, err); } static int session_start_recv_hdr(endpt_session_t *session) { int rc = -1; if (session->err < 0) return rc; endpt_session_inc_ref(session); if (session->type == TYPE_PIPE) { rc = ioqueue_file_async_readn(&session->pipe, &session->rx_overlapped, &session->rx_pending_pkt_len, 4, &on_pipe_recv_hdr, session); } else if (session->type == TYPE_TCP) { rc = ioqueue_tcpsock_async_recvn(&session->tcp, &session->rx_overlapped, &session->rx_pending_pkt_len, 4, &on_tcp_recv_hdr, session); } if (rc < 0) endpt_session_dec_ref(session); return rc; } static int on_msg(unsigned short msg_id, int param1, int param2) { if (msg_id == MSG_REMOVE_REGISTAR) { endpt_session_t *session = (endpt_session_t*)param1; remove_session_list(session); endpt_session_dec_ref(session); } return TRUE; } static int on_accept(daemon_accetpor_t *dacceptor, void *user_data, int fd, int err) { int idx = (int)user_data; if (!err) { endpt_session_t *session = create_session(dacceptor, dacceptor->type, fd); if (session) { int rc; add_unregistered_list(session); rc = session_start_recv_hdr(session); if (rc < 0) { remove_session_list(session); endpt_session_dec_ref(session); } } start_accept(dacceptor, idx); } daemon_accetpor_dec_ref(dacceptor); return 1; } static int on_pipe_accept_callback(ioqueue_pipe_acceptor_t *acceptor, ioqueue_overlapped_t *overlapped, HANDLE pipe, void *user_data, int err) { daemon_accetpor_t *dacceptor = CONTAINING_RECORD(acceptor, daemon_accetpor_t, pipe_acceptor); return on_accept(dacceptor, user_data, (int)pipe, err); } static int on_tcp_accept_callback(ioqueue_acceptor_t *acceptor, ioqueue_overlapped_t *overlapped, SOCKET in_sock, void *user_data, int err) { daemon_accetpor_t *dacceptor = CONTAINING_RECORD(acceptor, daemon_accetpor_t, tcp_acceptor); return on_accept(dacceptor, user_data, in_sock, err); } static int start_accept(daemon_accetpor_t *acceptor, int idx) { int rc = -1; daemon_accetpor_inc_ref(acceptor); if (acceptor->type == TYPE_PIPE) { ioqueue_overlapped_t *ov = ARRAY_IDX(acceptor->arr_ov, idx, ioqueue_overlapped_t*); rc = ioqueue_pipe_acceptor_async_accept(&acceptor->pipe_acceptor, ov, &on_pipe_accept_callback, (void*)idx); } else if (acceptor->type == TYPE_TCP) { ioqueue_overlapped_t *ov = ARRAY_IDX(acceptor->arr_ov, idx, ioqueue_overlapped_t*); rc = ioqueue_acceptor_async_accept(&acceptor->tcp_acceptor, ov, &on_tcp_accept_callback, (void*)idx); } else { TOOLKIT_ASSERT(0); } if (rc < 0) daemon_accetpor_dec_ref(acceptor); return rc; } static daemon_accetpor_t* create_daemon_acceptor(bus_daemon_t *daemon, char *url) { url_fields uf; int kk; int rc; daemon_accetpor_t *dacceptor = NULL; dacceptor = MALLOC_T(daemon_accetpor_t); if (url_parse(url, &uf) == 0) { if (_stricmp(uf.scheme, "tcp") == 0) { rc = ioqueue_acceptor_create(daemon->ioq, uf.host, uf.port, &dacceptor->tcp_acceptor); if (rc < 0) { free(dacceptor); url_free_fields(&uf); return NULL; } ioqueue_acceptor_listen(&dacceptor->tcp_acceptor, 10); dacceptor->type = TYPE_TCP; } else if (_stricmp(uf.scheme, "pipe") == 0) { rc = ioqueue_pipe_acceptor_create(daemon->ioq, uf.host, &dacceptor->tcp_acceptor); if (rc < 0) { free(dacceptor); url_free_fields(&uf); return NULL; } dacceptor->type = TYPE_PIPE; } url_free_fields(&uf); } dacceptor->daemon = daemon; REF_COUNT_INIT(&dacceptor->ref_cnt); dacceptor->arr_ov = array_make(DEFAULT_ACCEPT_OP_COUNT, sizeof(ioqueue_overlapped_t*)); for (kk = 0; kk < DEFAULT_ACCEPT_OP_COUNT; ++kk) ARRAY_PUSH(dacceptor->arr_ov, ioqueue_overlapped_t*) = MALLOC_T(ioqueue_overlapped_t); return dacceptor; } static void destroy_daemon_acceptor(daemon_accetpor_t *dacceptor) { int i; for (i = 0; i < dacceptor->arr_ov->nelts; ++i) free(ARRAY_IDX(dacceptor->arr_ov, i, ioqueue_overlapped_t*)); array_free(dacceptor->arr_ov); if (dacceptor->type == TYPE_PIPE) { ioqueue_pipe_acceptor_destroy(&dacceptor->pipe_acceptor); } else if (dacceptor->type == TYPE_TCP) { ioqueue_acceptor_destroy(&dacceptor->tcp_acceptor); } free(dacceptor); } IMPLEMENT_REF_COUNT_MT_STATIC(daemon_accetpor, daemon_accetpor_t, ref_cnt, destroy_daemon_acceptor) TOOLKIT_API int bus_daemon_create(int n_url, const char *urls[], int nthread, bus_daemon_t **p_daemon) { bus_daemon_t *daemon; int i; if (n_url == 0) return -1; if (nthread <= 0) { SYSTEM_INFO si; GetSystemInfo(&si); nthread = min(MAX_THREADS, si.dwNumberOfProcessors << 1); } WLog_DBG(TAG, "thread num: %d", nthread); daemon = MALLOC_T(bus_daemon_t); if (daemon == NULL) return -1; memset(daemon, 0, sizeof(bus_daemon_t)); daemon->nthread = nthread; daemon->arr_acceptor = array_make(n_url, sizeof(daemon_accetpor_t*)); daemon->arr_thread = array_make(nthread, sizeof(HANDLE)); daemon->arr_uri = array_make(n_url, sizeof(char*)); for (i = 0; i < n_url; ++i) { WLog_DBG(TAG, "urls[%d]: %s", i, urls[i]); ARRAY_PUSH(daemon->arr_uri, char*) = _strdup(urls[i]); } InitializeCriticalSection(&daemon->lock); INIT_LIST_HEAD(&daemon->registered_session_list); INIT_LIST_HEAD(&daemon->unregistered_session_list); *p_daemon = daemon; return 0; } TOOLKIT_API int bus_daemon_destroy(bus_daemon_t *daemon) { int i; DeleteCriticalSection(&daemon->lock); for (i = 0; i < daemon->arr_uri->nelts; ++i) free(ARRAY_IDX(daemon->arr_uri, i, char*)); array_free(daemon->arr_uri); array_free(daemon->arr_acceptor); array_free(daemon->arr_thread); free(daemon); return 0; } TOOLKIT_API int bus_daemon_start(bus_daemon_t *daemon) { int i; daemon->ioq = ioqueue_create(); if (!daemon->ioq) { WLog_ERR(TAG, "ioqueuq create failed!"); return -1; } ioqueue_msg_add_handler(daemon->ioq, MSG_REMOVE_REGISTAR, 0, &on_msg); for (i = 0; i < daemon->arr_uri->nelts; ++i) { char *url = ARRAY_IDX(daemon->arr_uri, i, char*); daemon_accetpor_t *dacceptor = create_daemon_acceptor(daemon, url); if (dacceptor) { int kk; for (kk = 0; kk < DEFAULT_ACCEPT_OP_COUNT; ++kk) //数量5? start_accept(dacceptor, kk); ARRAY_PUSH(daemon->arr_acceptor, daemon_accetpor_t*) = dacceptor; } else { WLog_ERR(TAG, "create daemon acceptor failed!"); return -1; } } daemon->lstop = 0; for (i = 0; i < daemon->nthread; ++i) { HANDLE thread = (HANDLE)_beginthreadex(NULL, 0, &thread_proc, daemon, 0, NULL); if (thread) { ARRAY_PUSH(daemon->arr_thread, HANDLE) = thread; } else { return -1; } } return 0; } TOOLKIT_API int bus_daemon_stop(bus_daemon_t *daemon) { int i; // exit all worker thread InterlockedExchange(&daemon->lstop, 1); for (i = 0; i < daemon->arr_thread->nelts; ++i) { HANDLE t = ARRAY_IDX(daemon->arr_thread, i, HANDLE); WaitForSingleObject(t, INFINITE); CloseHandle(t); } array_clear(daemon->arr_thread); // close all pending handles { endpt_session_t *pos; for (i = 0; i < daemon->arr_acceptor->nelts; ++i) { daemon_accetpor_t *dacceptor = ARRAY_IDX(daemon->arr_acceptor, i, daemon_accetpor_t*); if (dacceptor->type == TYPE_PIPE) { ioqueue_pipe_acceptor_close_pending_handle(&dacceptor->pipe_acceptor); } else if (dacceptor->type == TYPE_TCP) { ioqueue_acceptor_close(&dacceptor->tcp_acceptor); } daemon_accetpor_dec_ref(dacceptor); } array_clear(daemon->arr_acceptor); list_for_each_entry(pos, &daemon->registered_session_list, endpt_session_t, entry) { if (pos->type == TYPE_PIPE) { ioqueue_file_close(&pos->pipe); } else if (pos->type == TYPE_TCP) { ioqueue_tcpsock_close(&pos->tcp); } } list_for_each_entry(pos, &daemon->unregistered_session_list, endpt_session_t, entry) { if (pos->type == TYPE_PIPE) { ioqueue_file_close(&pos->pipe); } else if (pos->type == TYPE_TCP) { ioqueue_tcpsock_close(&pos->tcp); } } } // poll until all pending io are aborted while (!ioqueue_can_exit(daemon->ioq)) { ioqueue_poll(daemon->ioq, 10); } ioqueue_destroy(daemon->ioq); return 0; }