12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019 |
- #include "precompile.h"
- #include "ioqueue.h"
- #include "bus.h"
- #include "bus_internal.h"
- #include "url.h"
- #include "refcnt.h"
- #include "list.h"
- #include "iobuffer.h"
- #include "array.h"
- #include "dbgutil.h"
- #include <time.h>
- #include <winpr/synch.h>
- #include <winpr/interlocked.h>
- #include <winpr/string.h>
- #include <winpr/sysinfo.h>
- #include <winpr/thread.h>
- #include <winpr/winsock.h>
- #define TAG TOOLKIT_TAG("bus_deamon")
- #define MAX_THREADS 32
- #define DEFAULT_ACCEPT_OP_COUNT 5
- #define MSG_REMOVE_REGISTAR 1
- typedef struct endpt_session_t endpt_session_t;
- typedef struct daemon_accetpor_t daemon_accetpor_t;
- struct endpt_session_t
- {
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- int epid;
- int registered;
- union {
- ioqueue_tcpsock_t tcp;
- ioqueue_file_t pipe;
- };
- CRITICAL_SECTION lock;
- time_t start_time;
- int err;
- int rx_pending_pkt_len;
- int type;
- int tx_pending;
- struct list_head entry;
- iobuffer_queue_t *tx_iobuf_queue;
- ioqueue_overlapped_t rx_overlapped;
- ioqueue_overlapped_t tx_overlapped;
- iobuffer_t *rx_pending_buf;
- iobuffer_t *tx_pending_buf;
- bus_daemon_t *daemon;
- daemon_accetpor_t *acceptor;
- };
- struct daemon_accetpor_t
- {
- DECLARE_REF_COUNT_MEMBER(ref_cnt);
- union {
- ioqueue_acceptor_t tcp_acceptor;
- ioqueue_pipe_acceptor_t pipe_acceptor;
- };
- int type;
- array_header_t *arr_ov;
- bus_daemon_t *daemon;
- };
- struct bus_daemon_t
- {
- ioqueue_t *ioq;
- volatile LONG lstop;
- CRITICAL_SECTION lock;
- int nthread;
- array_header_t *arr_uri;
- array_header_t *arr_acceptor;
- array_header_t *arr_thread;
- struct list_head registered_session_list;
- struct list_head unregistered_session_list;
- };
- DECLARE_REF_COUNT_STATIC(endpt_session, endpt_session_t)
- DECLARE_REF_COUNT_STATIC(daemon_accetpor, daemon_accetpor_t)
- static unsigned int __stdcall thread_proc(void *param)
- {
- bus_daemon_t *daemon = (bus_daemon_t*)param;
- SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
- while (!daemon->lstop) {
- ioqueue_poll(daemon->ioq, 10);
- }
- return 0;
- }
- static int start_accept(daemon_accetpor_t *acceptor, int idx);
- static int queue_ans_pkt(endpt_session_t *session, int rc);
- static int start_send_pkt(endpt_session_t *session, iobuffer_t **pkt);
- static int session_start_recv_hdr(endpt_session_t *session);
- static int queue_sys_pkt(endpt_session_t *session, int epid, int state);
- static void daemon_lock(bus_daemon_t *daemon)
- {
- EnterCriticalSection(&daemon->lock);
- }
- static void daemon_unlock(bus_daemon_t *daemon)
- {
- LeaveCriticalSection(&daemon->lock);
- }
- static void session_lock(endpt_session_t *session)
- {
- EnterCriticalSection(&session->lock);
- }
- static void session_unlock(endpt_session_t *session)
- {
- LeaveCriticalSection(&session->lock);
- }
- static void add_unregistered_list(endpt_session_t *session)
- {
- bus_daemon_t *daemon = session->daemon;
- daemon_lock(daemon);
- list_add_tail(&session->entry, &daemon->unregistered_session_list);
- daemon_unlock(daemon);
- }
- static void remove_session_list(endpt_session_t *session)
- {
- bus_daemon_t *daemon = session->daemon;
- daemon_lock(daemon);
- list_del(&session->entry);
- if (session->epid != BUS_INVALID_EPID) {
- endpt_session_t *pos, *n;
- list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
- queue_sys_pkt(pos, session->epid, BUS_STATE_OFF);
- }
- }
- daemon_unlock(daemon);
- }
- static void move_to_registered_session(endpt_session_t *session)
- {
- bus_daemon_t *daemon = session->daemon;
- daemon_lock(daemon);
- {
- endpt_session_t *pos, *n;
- list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
- queue_sys_pkt(pos, session->epid, BUS_STATE_ON);
- }
- }
- list_move_tail(&session->entry, &daemon->registered_session_list);
- daemon_unlock(daemon);
- }
- static void move_to_unregistered_session(endpt_session_t *session)
- {
- bus_daemon_t *daemon = session->daemon;
- daemon_lock(daemon);
- list_move_tail(&session->entry, &daemon->unregistered_session_list);
- daemon_unlock(daemon);
- }
- static endpt_session_t* create_session(daemon_accetpor_t *acceptor, int type, int fd)
- {
- int rc;
- endpt_session_t *session;
- session = ZALLOC_T(endpt_session_t);
- if (type == TYPE_PIPE) {
- rc = ioqueue_pipe_acceptor_create_client(&acceptor->pipe_acceptor, (HANDLE)fd, &session->pipe);
- if (rc < 0)
- goto on_error;
- } else if (type == TYPE_TCP) {
- rc = ioqueue_acceptor_create_client(&acceptor->tcp_acceptor, fd, &session->tcp);
- if (rc < 0)
- goto on_error;
- }
- session->epid = BUS_INVALID_EPID;
- session->start_time = time(NULL);
- session->daemon = acceptor->daemon;
- session->acceptor = acceptor;
- session->type = type;
- REF_COUNT_INIT(&session->ref_cnt);
- InitializeCriticalSection(&session->lock);
- session->tx_iobuf_queue = iobuffer_queue_create();
- return session;
- on_error:
- free(session);
- return NULL;
- }
- static void destroy_session(endpt_session_t *session)
- {
- while (iobuffer_queue_count(session->tx_iobuf_queue) > 0) {
- iobuffer_t *iobuf = iobuffer_queue_deque(session->tx_iobuf_queue);
- endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(iobuf);
- if (ts) {
- queue_ans_pkt(ts, BUS_E_NETBROKEN);
- endpt_session_dec_ref(ts);
- }
- iobuffer_destroy(iobuf);
- }
- iobuffer_queue_destroy(session->tx_iobuf_queue);
- DeleteCriticalSection(&session->lock);
- if (session->type == TYPE_PIPE) {
- ioqueue_file_destroy(&session->pipe);
- } else if (session->type == TYPE_TCP) {
- ioqueue_tcpsock_destroy(&session->tcp);
- } else {
- TOOLKIT_ASSERT(0);
- }
- free(session);
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(endpt_session, endpt_session_t, ref_cnt, destroy_session)
- static endpt_session_t *find_session(bus_daemon_t *daemon, int epid)
- {
- endpt_session_t *session = NULL, *pos;
- daemon_lock(daemon);
- list_for_each_entry(pos, &daemon->registered_session_list, endpt_session_t, entry) {
- if (pos->epid == epid) {
- endpt_session_inc_ref(pos);
- session = pos;
- break;
- }
- }
- daemon_unlock(daemon);
- return session;
- }
- static void on_send_pkt(endpt_session_t *session, int err)
- {
- iobuffer_t *pkt;
- session_lock(session);
- pkt = session->tx_pending_buf;
- session->tx_pending_buf = NULL;
- session->tx_pending = 0;
- session_unlock(session);
- if (pkt) {
- if (iobuffer_get_user_data(pkt)) {
- endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(pkt);
- queue_ans_pkt(ts, err ? BUS_E_NETBROKEN : BUS_E_OK);
- endpt_session_dec_ref(ts);
- }
- iobuffer_destroy(pkt);
- pkt = NULL;
- }
- if (!err) {
- int rc = 0;
- session_lock(session);
- if (!session->tx_pending) {
- if (iobuffer_queue_count(session->tx_iobuf_queue)) {
- iobuffer_t *tpkt = iobuffer_queue_deque(session->tx_iobuf_queue);
- session->tx_pending = 1;
- rc = start_send_pkt(session, &tpkt);
- if (rc < 0) {
- session->tx_pending = 0;
- }
- if (tpkt) {
- pkt = tpkt;
- }
- }
- }
- session_unlock(session);
- if (rc < 0) {
- if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
- remove_session_list(session);
- endpt_session_dec_ref(session);
- }
- }
- if (pkt) {
- if (iobuffer_get_user_data(pkt)) {
- endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(pkt);
- queue_ans_pkt(ts, err ? BUS_E_NETBROKEN : BUS_E_OK);
- endpt_session_dec_ref(ts);
- }
- iobuffer_destroy(pkt);
- pkt = NULL;
- }
- } else {
- session_lock(session);
- while (iobuffer_queue_count(session->tx_iobuf_queue) > 0) {
- iobuffer_t *iobuf = iobuffer_queue_deque(session->tx_iobuf_queue);
- endpt_session_t *ts = (endpt_session_t*)iobuffer_get_user_data(iobuf);
- if (ts) {
- session_unlock(session);
- queue_ans_pkt(ts, BUS_E_NETBROKEN);
- session_lock(session);
- endpt_session_dec_ref(ts);
- }
- iobuffer_destroy(iobuf);
- }
- session_unlock(session);
- }
- endpt_session_dec_ref(session);
- }
- static void on_pipe_send_pkt(ioqueue_file_t* file,
- ioqueue_overlapped_t *overlapped,
- void *buf,
- unsigned int transfer_bytes,
- void *user_data,
- int err)
- {
- on_send_pkt((endpt_session_t*)user_data, err);
- }
- static void on_tcp_send_pkt(ioqueue_tcpsock_t* tcpsock,
- ioqueue_overlapped_t *overlapped,
- void *buf,
- unsigned int transfer_bytes,
- void *user_data,
- int err)
- {
- on_send_pkt((endpt_session_t*)user_data, err);
- }
- static int start_send_pkt(endpt_session_t *session, iobuffer_t **pkt)
- {
- int rc = -1;
- int v;
- if (session->err < 0)
- return rc;
- v = iobuffer_get_length(*pkt);
- iobuffer_write_head(*pkt, IOBUF_T_I4, &v, 0);
- endpt_session_inc_ref(session);
- session->tx_pending_buf = *pkt;
- *pkt = NULL;
- if (session->type == TYPE_PIPE) {
- rc = ioqueue_file_async_writen(&session->pipe,
- &session->tx_overlapped,
- iobuffer_data(session->tx_pending_buf, 0),
- iobuffer_get_length(session->tx_pending_buf),
- &on_pipe_send_pkt, session);
- } else if (session->type == TYPE_TCP) {
- rc = ioqueue_tcpsock_async_sendn(&session->tcp,
- &session->tx_overlapped,
- iobuffer_data(session->tx_pending_buf, 0),
- iobuffer_get_length(session->tx_pending_buf),
- &on_tcp_send_pkt, session);
- } else {
- TOOLKIT_ASSERT(0);
- }
- if (rc < 0) {
- *pkt = session->tx_pending_buf;
- endpt_session_dec_ref(session);
- }
- return rc;
- }
- static int queue_tx_pkt(endpt_session_t *session, iobuffer_t **pkt)
- {
- int rc = -1;
- session_lock(session);
- if (session->tx_pending) {
- iobuffer_queue_enqueue(session->tx_iobuf_queue, *pkt);
- *pkt = NULL;
- rc = 0;
- } else {
- session->tx_pending = 1;
- rc = start_send_pkt(session, pkt);
- }
- session_unlock(session);
- if (rc < 0) {
- if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
- ioqueue_post_message(session->daemon->ioq, MSG_REMOVE_REGISTAR, (int)session, 0);
- }
- }
- return rc;
- }
- static iobuffer_t *make_ans_pkt(int rc)
- {
- int v;
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- v = BUS_TYPE_ERROR;
- iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
- v = rc;
- iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
- return pkt;
- }
- static int queue_ans_pkt(endpt_session_t *session, int rc)
- {
- iobuffer_t *pkt = make_ans_pkt(rc);
- int err = queue_tx_pkt(session, &pkt);
- if (pkt) {
- iobuffer_destroy(pkt);
- }
- return err;
- }
- static iobuffer_t *make_ans_state_pkt(bus_daemon_t *daemon, int epid)
- {
- int v;
- endpt_session_t *ts;
- iobuffer_t *pkt;
- int state;
- ts = find_session(daemon, epid);
- if (!ts) {
- state = BUS_STATE_OFF;
- } else {
- state = BUS_STATE_ON;
- endpt_session_dec_ref(ts);
- ts = NULL;
- }
-
- pkt = iobuffer_create(-1, -1);
- v = BUS_TYPE_ENDPT_GET_STATE;
- iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
- v = epid;
- iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
- v = state;
- iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
-
- return pkt;
- }
- static iobuffer_t *make_sys_pkt(int epid, int state)
- {
- int v;
- iobuffer_t *pkt = iobuffer_create(-1, -1);
- v = BUS_TYPE_SYSTEM;
- iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
- v = epid;
- iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
- v = state;
- iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
- return pkt;
- }
- static int queue_sys_pkt(endpt_session_t *session, int epid, int state)
- {
- iobuffer_t *pkt = make_sys_pkt(epid, state);
- int err = queue_tx_pkt(session, &pkt);
- if (pkt) {
- iobuffer_destroy(pkt);
- }
- return err;
- }
- static int on_process_pkt(endpt_session_t *session, iobuffer_t **ppkt)
- {
- bus_daemon_t *daemon = session->daemon;
- iobuffer_t *pkt = *ppkt;
- int type;
- int read_state;
- int rc = -1;
- read_state = iobuffer_get_read_state(pkt);
- iobuffer_read(pkt, IOBUF_T_I4, &type, 0);
- if (session->registered) {
- if (type == BUS_TYPE_ENDPT_UNREGISTER) {
- move_to_unregistered_session(session);
- session->registered = 0;
- queue_ans_pkt(session, BUS_E_OK);
- rc = -1;
- } else if (type == BUS_TYPE_ENDPT_GET_STATE) {
- int epid;
- iobuffer_t *ans_pkt;
- iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
- ans_pkt = make_ans_state_pkt(session->daemon, epid);
- rc = queue_tx_pkt(session, &ans_pkt);
- if (ans_pkt)
- iobuffer_dec_ref(ans_pkt);
- } else if (type == BUS_TYPE_PACKET) {
- int from_epid;
- int to_epid;
- int user_type;
- iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0);
- iobuffer_restore_read_state(pkt, read_state);
- if (to_epid == session->epid) {
- endpt_session_inc_ref(session);
- iobuffer_set_user_data(pkt, session);
- rc = queue_tx_pkt(session, ppkt);
- if (rc < 0) {
- endpt_session_dec_ref(session);
- iobuffer_set_user_data(pkt, NULL);
- }
- } else {
- endpt_session_t *ts = find_session(session->daemon, to_epid);
- if (!ts) {
- rc = queue_ans_pkt(session, BUS_E_NOTFOUND);
- } else {
- if (ts->err) {
- rc = queue_ans_pkt(session, BUS_E_NETBROKEN);
- } else {
- endpt_session_inc_ref(session);
- iobuffer_set_user_data(pkt, session);
- rc = queue_tx_pkt(ts, ppkt);
- if (rc < 0) {
- endpt_session_dec_ref(session);
- iobuffer_set_user_data(pkt, NULL);
- rc = queue_ans_pkt(session, BUS_E_NETBROKEN);
- }
- }
- endpt_session_dec_ref(ts); // find session
- }
- }
- } else if (type == BUS_TYPE_EVENT) {
- int epid;
- int user_type;
- iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
- iobuffer_restore_read_state(pkt, read_state);
- daemon_lock(session->daemon);
- {
- endpt_session_t *pos, *n;
- list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
- if (pos != session) {
- iobuffer_t *tbuf = iobuffer_clone(pkt);
- queue_tx_pkt(pos, &tbuf);
- if (tbuf)
- iobuffer_destroy(tbuf);
- }
- }
- }
- daemon_unlock(session->daemon);
- rc = 0;
- } else if (type == BUS_TYPE_INFO) {
- int from_epid;
- int to_epid;
- int user_type;
- iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
- iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0);
- iobuffer_restore_read_state(pkt, read_state);
- if (to_epid == session->epid) {
- queue_tx_pkt(session, ppkt);
- } else {
- endpt_session_t *ts = find_session(session->daemon, to_epid);
- if (ts) {
- if (!ts->err) {
- queue_tx_pkt(ts, ppkt);
- }
- endpt_session_dec_ref(ts); // find session
- }
- }
- rc = 0;
- }
- } else {
- if (type == BUS_TYPE_ENDPT_REGISTER) {
- int epid;
- endpt_session_t *ts;
- iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
- ts = find_session(session->daemon, epid);
- if (!ts) {
- session->registered = 1;
- session->epid = epid;
- rc = queue_ans_pkt(session, BUS_E_OK);
- if (rc == 0) {
- move_to_registered_session(session);
- }
- } else {
- endpt_session_dec_ref(ts);
- }
- }
- }
- if (*ppkt)
- iobuffer_restore_read_state(pkt, read_state);
- return rc;
- }
- static void on_recv_body(endpt_session_t *session, unsigned int transfer_bytes, int err)
- {
- int rc = -1;
- if (!err) {
- iobuffer_t *pkt = session->rx_pending_buf;
- session->rx_pending_buf = NULL;
- iobuffer_push_count(pkt, transfer_bytes);
- rc = on_process_pkt(session, &pkt);
- if (rc == 0) {
- rc = session_start_recv_hdr(session);
- }
- if (pkt)
- iobuffer_dec_ref(pkt);
- }
- if (rc < 0) {
- if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
- remove_session_list(session);
- endpt_session_dec_ref(session);
- }
- }
- endpt_session_dec_ref(session);
- }
- static void on_pipe_recv_body(ioqueue_file_t* file,
- ioqueue_overlapped_t *overlapped,
- void *buf,
- unsigned int transfer_bytes,
- void *user_data,
- int err)
- {
- on_recv_body((endpt_session_t*)user_data, transfer_bytes, err);
- }
- static void on_tcp_recv_body(ioqueue_tcpsock_t* tcpsock,
- ioqueue_overlapped_t *overlapped,
- void *buf,
- unsigned int transfer_bytes,
- void *user_data,
- int err)
- {
- on_recv_body((endpt_session_t*)user_data, transfer_bytes, err);
- }
- static int session_start_recv_body(endpt_session_t *session)
- {
- int rc = -1;
- if (session->err < 0)
- return rc;
- endpt_session_inc_ref(session);
- session->rx_pending_buf = iobuffer_create(-1, session->rx_pending_pkt_len);
- if (session->type == TYPE_PIPE) {
- rc = ioqueue_file_async_readn(&session->pipe,
- &session->rx_overlapped,
- iobuffer_data(session->rx_pending_buf, 0),
- session->rx_pending_pkt_len,
- &on_pipe_recv_body,
- session);
- } else if (session->type == TYPE_TCP) {
- rc = ioqueue_tcpsock_async_recvn(&session->tcp,
- &session->rx_overlapped,
- iobuffer_data(session->rx_pending_buf, 0),
- session->rx_pending_pkt_len,
- &on_tcp_recv_body,
- session);
- }
- if (rc < 0) {
- iobuffer_destroy(session->rx_pending_buf);
- session->rx_pending_buf = NULL;
- endpt_session_dec_ref(session);
- }
- return rc;
- }
- static void on_recv_hdr(endpt_session_t *session, int err)
- {
- int rc = -1;
- if (!err) {
- rc = session_start_recv_body(session);
- }
- if (rc < 0) {
- if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
- remove_session_list(session);
- endpt_session_dec_ref(session);
- }
- }
- endpt_session_dec_ref(session);
- }
- static void on_pipe_recv_hdr(ioqueue_file_t *file,
- ioqueue_overlapped_t *overlapped,
- void *buf,
- unsigned int transfer_bytes,
- void *user_data,
- int err)
- {
- on_recv_hdr((endpt_session_t*)user_data, err);
- }
- static void on_tcp_recv_hdr(ioqueue_tcpsock_t *tcpsock,
- ioqueue_overlapped_t *overlapped,
- void *buf,
- unsigned int transfer_bytes,
- void *user_data,
- int err)
- {
- on_recv_hdr((endpt_session_t*)user_data, err);
- }
- static int session_start_recv_hdr(endpt_session_t *session)
- {
- int rc = -1;
- if (session->err < 0)
- return rc;
- endpt_session_inc_ref(session);
- if (session->type == TYPE_PIPE) {
- rc = ioqueue_file_async_readn(&session->pipe,
- &session->rx_overlapped,
- &session->rx_pending_pkt_len,
- 4,
- &on_pipe_recv_hdr,
- session);
- } else if (session->type == TYPE_TCP) {
- rc = ioqueue_tcpsock_async_recvn(&session->tcp,
- &session->rx_overlapped,
- &session->rx_pending_pkt_len,
- 4,
- &on_tcp_recv_hdr,
- session);
- }
- if (rc < 0)
- endpt_session_dec_ref(session);
- return rc;
- }
- static int on_msg(unsigned short msg_id, int param1, int param2)
- {
- if (msg_id == MSG_REMOVE_REGISTAR) {
- endpt_session_t *session = (endpt_session_t*)param1;
- remove_session_list(session);
- endpt_session_dec_ref(session);
- }
- return TRUE;
- }
- static int on_accept(daemon_accetpor_t *dacceptor, void *user_data, int fd, int err)
- {
- int idx = (int)user_data;
- if (!err) {
- endpt_session_t *session = create_session(dacceptor, dacceptor->type, fd);
- if (session) {
- int rc;
- add_unregistered_list(session);
- rc = session_start_recv_hdr(session);
- if (rc < 0) {
- remove_session_list(session);
- endpt_session_dec_ref(session);
- }
- }
- start_accept(dacceptor, idx);
- }
- daemon_accetpor_dec_ref(dacceptor);
- return 1;
- }
- static int on_pipe_accept_callback(ioqueue_pipe_acceptor_t *acceptor,
- ioqueue_overlapped_t *overlapped,
- HANDLE pipe,
- void *user_data,
- int err)
- {
- daemon_accetpor_t *dacceptor = CONTAINING_RECORD(acceptor, daemon_accetpor_t, pipe_acceptor);
- return on_accept(dacceptor, user_data, (int)pipe, err);
- }
- static int on_tcp_accept_callback(ioqueue_acceptor_t *acceptor,
- ioqueue_overlapped_t *overlapped,
- SOCKET in_sock,
- void *user_data,
- int err)
- {
- daemon_accetpor_t *dacceptor = CONTAINING_RECORD(acceptor, daemon_accetpor_t, tcp_acceptor);
- return on_accept(dacceptor, user_data, in_sock, err);
- }
- static int start_accept(daemon_accetpor_t *acceptor, int idx)
- {
- int rc = -1;
- daemon_accetpor_inc_ref(acceptor);
- if (acceptor->type == TYPE_PIPE) {
- ioqueue_overlapped_t *ov = ARRAY_IDX(acceptor->arr_ov, idx, ioqueue_overlapped_t*);
- rc = ioqueue_pipe_acceptor_async_accept(&acceptor->pipe_acceptor, ov, &on_pipe_accept_callback, (void*)idx);
- } else if (acceptor->type == TYPE_TCP) {
- ioqueue_overlapped_t *ov = ARRAY_IDX(acceptor->arr_ov, idx, ioqueue_overlapped_t*);
- rc = ioqueue_acceptor_async_accept(&acceptor->tcp_acceptor, ov, &on_tcp_accept_callback, (void*)idx);
- } else {
- TOOLKIT_ASSERT(0);
- }
- if (rc < 0)
- daemon_accetpor_dec_ref(acceptor);
- return rc;
- }
- static daemon_accetpor_t* create_daemon_acceptor(bus_daemon_t *daemon, char *url)
- {
- url_fields uf;
- int kk;
- int rc;
- daemon_accetpor_t *dacceptor = NULL;
- dacceptor = MALLOC_T(daemon_accetpor_t);
- if (url_parse(url, &uf) == 0) {
- if (_stricmp(uf.scheme, "tcp") == 0) {
- rc = ioqueue_acceptor_create(daemon->ioq, uf.host, uf.port, &dacceptor->tcp_acceptor);
- if (rc < 0) {
- free(dacceptor);
- url_free_fields(&uf);
- return NULL;
- }
- ioqueue_acceptor_listen(&dacceptor->tcp_acceptor, 10);
- dacceptor->type = TYPE_TCP;
- } else if (_stricmp(uf.scheme, "pipe") == 0) {
- rc = ioqueue_pipe_acceptor_create(daemon->ioq, uf.host, &dacceptor->tcp_acceptor);
- if (rc < 0) {
- free(dacceptor);
- url_free_fields(&uf);
- return NULL;
- }
- dacceptor->type = TYPE_PIPE;
- }
- url_free_fields(&uf);
- }
- dacceptor->daemon = daemon;
- REF_COUNT_INIT(&dacceptor->ref_cnt);
- dacceptor->arr_ov = array_make(DEFAULT_ACCEPT_OP_COUNT, sizeof(ioqueue_overlapped_t*));
- for (kk = 0; kk < DEFAULT_ACCEPT_OP_COUNT; ++kk)
- ARRAY_PUSH(dacceptor->arr_ov, ioqueue_overlapped_t*) = MALLOC_T(ioqueue_overlapped_t);
- return dacceptor;
- }
- static void destroy_daemon_acceptor(daemon_accetpor_t *dacceptor)
- {
- int i;
- for (i = 0; i < dacceptor->arr_ov->nelts; ++i)
- free(ARRAY_IDX(dacceptor->arr_ov, i, ioqueue_overlapped_t*));
- array_free(dacceptor->arr_ov);
- if (dacceptor->type == TYPE_PIPE) {
- ioqueue_pipe_acceptor_destroy(&dacceptor->pipe_acceptor);
- } else if (dacceptor->type == TYPE_TCP) {
- ioqueue_acceptor_destroy(&dacceptor->tcp_acceptor);
- }
- free(dacceptor);
- }
- IMPLEMENT_REF_COUNT_MT_STATIC(daemon_accetpor, daemon_accetpor_t, ref_cnt, destroy_daemon_acceptor)
- TOOLKIT_API int bus_daemon_create(int n_url, const char *urls[], int nthread, bus_daemon_t **p_daemon)
- {
- bus_daemon_t *daemon;
- int i;
- if (n_url == 0)
- return -1;
- if (nthread <= 0) {
- SYSTEM_INFO si;
- GetSystemInfo(&si);
- nthread = min(MAX_THREADS, si.dwNumberOfProcessors << 1);
- }
- WLog_DBG(TAG, "thread num: %d", nthread);
- daemon = MALLOC_T(bus_daemon_t);
- if (daemon == NULL)
- return -1;
- memset(daemon, 0, sizeof(bus_daemon_t));
- daemon->nthread = nthread;
- daemon->arr_acceptor = array_make(n_url, sizeof(daemon_accetpor_t*));
- daemon->arr_thread = array_make(nthread, sizeof(HANDLE));
- daemon->arr_uri = array_make(n_url, sizeof(char*));
- for (i = 0; i < n_url; ++i) {
- WLog_DBG(TAG, "urls[%d]: %s", i, urls[i]);
- ARRAY_PUSH(daemon->arr_uri, char*) = _strdup(urls[i]);
- }
- InitializeCriticalSection(&daemon->lock);
- INIT_LIST_HEAD(&daemon->registered_session_list);
- INIT_LIST_HEAD(&daemon->unregistered_session_list);
- *p_daemon = daemon;
- return 0;
- }
- TOOLKIT_API int bus_daemon_destroy(bus_daemon_t *daemon)
- {
- int i;
- DeleteCriticalSection(&daemon->lock);
- for (i = 0; i < daemon->arr_uri->nelts; ++i)
- free(ARRAY_IDX(daemon->arr_uri, i, char*));
- array_free(daemon->arr_uri);
- array_free(daemon->arr_acceptor);
- array_free(daemon->arr_thread);
- free(daemon);
- return 0;
- }
- TOOLKIT_API int bus_daemon_start(bus_daemon_t *daemon)
- {
- int i;
- daemon->ioq = ioqueue_create();
- if (!daemon->ioq) {
- WLog_ERR(TAG, "ioqueuq create failed!");
- return -1;
- }
- ioqueue_msg_add_handler(daemon->ioq, MSG_REMOVE_REGISTAR, 0, &on_msg);
- for (i = 0; i < daemon->arr_uri->nelts; ++i) {
- char *url = ARRAY_IDX(daemon->arr_uri, i, char*);
- daemon_accetpor_t *dacceptor = create_daemon_acceptor(daemon, url);
- if (dacceptor) {
- int kk;
- for (kk = 0; kk < DEFAULT_ACCEPT_OP_COUNT; ++kk) //数量5?
- start_accept(dacceptor, kk);
- ARRAY_PUSH(daemon->arr_acceptor, daemon_accetpor_t*) = dacceptor;
- } else {
- WLog_ERR(TAG, "create daemon acceptor failed!");
- return -1;
- }
- }
- daemon->lstop = 0;
- for (i = 0; i < daemon->nthread; ++i) {
- HANDLE thread = (HANDLE)_beginthreadex(NULL, 0, &thread_proc, daemon, 0, NULL);
- if (thread) {
- ARRAY_PUSH(daemon->arr_thread, HANDLE) = thread;
- } else {
- return -1;
- }
- }
- return 0;
- }
- TOOLKIT_API int bus_daemon_stop(bus_daemon_t *daemon)
- {
- int i;
- // exit all worker thread
- InterlockedExchange(&daemon->lstop, 1);
- for (i = 0; i < daemon->arr_thread->nelts; ++i) {
- HANDLE t = ARRAY_IDX(daemon->arr_thread, i, HANDLE);
- WaitForSingleObject(t, INFINITE);
- CloseHandle(t);
- }
- array_clear(daemon->arr_thread);
- // close all pending handles
- {
- endpt_session_t *pos;
- for (i = 0; i < daemon->arr_acceptor->nelts; ++i) {
- daemon_accetpor_t *dacceptor = ARRAY_IDX(daemon->arr_acceptor, i, daemon_accetpor_t*);
- if (dacceptor->type == TYPE_PIPE) {
- ioqueue_pipe_acceptor_close_pending_handle(&dacceptor->pipe_acceptor);
- } else if (dacceptor->type == TYPE_TCP) {
- ioqueue_acceptor_close(&dacceptor->tcp_acceptor);
- }
- daemon_accetpor_dec_ref(dacceptor);
- }
- array_clear(daemon->arr_acceptor);
- list_for_each_entry(pos, &daemon->registered_session_list, endpt_session_t, entry) {
- if (pos->type == TYPE_PIPE) {
- ioqueue_file_close(&pos->pipe);
- } else if (pos->type == TYPE_TCP) {
- ioqueue_tcpsock_close(&pos->tcp);
- }
- }
- list_for_each_entry(pos, &daemon->unregistered_session_list, endpt_session_t, entry) {
- if (pos->type == TYPE_PIPE) {
- ioqueue_file_close(&pos->pipe);
- } else if (pos->type == TYPE_TCP) {
- ioqueue_tcpsock_close(&pos->tcp);
- }
- }
- }
- // poll until all pending io are aborted
- while (!ioqueue_can_exit(daemon->ioq)) {
- ioqueue_poll(daemon->ioq, 10);
- }
- ioqueue_destroy(daemon->ioq);
- return 0;
- }
|