#include "precompile.h" #include "ioqueue.h" #include "timerqueue.h" #include "memutil.h" #include "refcnt.h" #include "strutil.h" #include "sockutil.h" #include "dbgutil.h" #include #include #include #include #include #include #ifndef SO_UPDATE_CONNECT_CONTEXT #define SO_UPDATE_CONNECT_CONTEXT 0x7010 #endif #ifndef WSAID_CONNECTEX #define WSAID_CONNECTEX \ {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}} #endif #define MT_TTL 10*60*1000 /* maintenance 10 minutes */ #define MT_INTERVAL 30*1000 /* maintenance 30 seconds */ /* The address specified in AcceptEx() must be 16 more than the size of * SOCKADDR (source: MSDN). */ #define ACCEPT_ADDR_LEN (16+sizeof(SOCKADDR)) struct ioqueue_t { HANDLE iocp; void *user_data; /* timer */ spinlock_t tm_queue_lock; timer_queue_t *tm_queue; /* msg handler */ ioqueue_on_msg_callback msg_handlers[MAX_MSG][MAX_MSG_PRIORITY]; LONG msg_cnt; /* connect */ spinlock_t connect_list_lock; struct list_head connect_list; spinlock_t handler_list_lock; struct list_head handler_list; LONG stop; }; typedef struct ioqueue_msg { int msg_type; int param1; int param2; HANDLE evt; /* for send message */ }ioqueue_msg; #define HANDLE_TYPE_ACCEPTOR 0x01 #define HANDLE_TYPE_TCPSOCK 0x02 #define HANDLE_TYPE_UDPSOCK 0x03 #define HANDLE_TYPE_FILE 0x04 #define HANDLE_TYPE_PIPEACCEPTOR 0x05 #define OV_ACCEPT 0x01 #define OV_CONNECT 0x02 #define OV_SENDSOME 0x03 #define OV_SENDN 0x04 #define OV_RECVSOME 0x05 #define OV_RECVN 0x06 #define OV_SENDTO 0x07 #define OV_RECVFROM 0x08 #define OV_READFILESOME 0x09 #define OV_WRITEFILESOME 0x0a #define OV_READFILEN 0x0b #define OV_WRITEFILEN 0x0c #define OV_RECVUNTIL 0x0d #define OV_CONNECTPIPE 0x0e typedef struct ioqueue_base_overlapped_t { OVERLAPPED ov; int type; void *user_data; struct list_head pending_entry; ioqueue_handle_context *handle_ctx; }ioqueue_base_overlapped_t; typedef struct ioqueue_accept_overlapped_t { ioqueue_base_overlapped_t base; SOCKET client; ioqueue_on_accept_callback on_accept_callback; char accept_buf[2*ACCEPT_ADDR_LEN]; }ioqueue_accept_overlapped_t; typedef struct ioqueue_connect_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_connect_callback on_connect_callback; struct list_head node; HANDLE hevt; }ioqueue_connect_overlapped_t; typedef struct ioqueue_sendsome_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_send_callback on_send_callback; WSABUF wsabuf; }ioqueue_sendsome_overlapped_t; typedef struct ioqueue_sendn_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_send_callback on_send_callback; WSABUF wsabuf; char *original_buf; unsigned int sended_bytes; unsigned int total_bytes; }ioqueue_sendn_overlapped_t; typedef struct ioqueue_recvsome_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_recv_callback on_recv_callback; WSABUF wsabuf; DWORD dwFlags; }ioqueue_recvsome_overlapped_t; typedef struct ioqueue_recvn_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_recv_callback on_recv_callback; WSABUF wsabuf; char *original_buf; unsigned int recved_bytes; unsigned int total_bytes; DWORD dwFlags; }ioqueue_recvn_overlapped_t; typedef struct ioqueue_recvuntil_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_recvuntil_callback on_recvuntil_callback; WSABUF wsabuf; char *original_buf; char *delimer; unsigned int recved_bytes; unsigned int total_bytes; DWORD dwFlags; }ioqueue_recvuntil_overlapped_t; typedef struct ioqueue_sendto_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_sendto_callback on_sendto_callback; WSABUF wsabuf; }ioqueue_sendto_overlapped_t; typedef struct ioqueue_recvfrom_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_recvfrom_callback on_recvfrom_callback; WSABUF wsabuf; struct sockaddr_in peer; int addrlen; DWORD dwFlags; }ioqueue_recvfrom_overlapped_t; typedef struct ioqueue_readfilesome_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_read_callback on_read_callback; char *buf; HANDLE hevt; }ioqueue_readfilesome_overlapped_t; typedef struct ioqueue_readfilen_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_read_callback on_read_callback; char *buf; HANDLE hevt; unsigned int recved_bytes; unsigned int total_bytes; }ioqueue_readfilen_overlapped_t; typedef struct ioqueue_writefilesome_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_write_callback on_write_callback; HANDLE hevt; char *buf; }ioqueue_writefilesome_overlapped_t; typedef struct ioqueue_writefilen_overlapped_t { ioqueue_base_overlapped_t base; ioqueue_on_write_callback on_write_callback; char *buf; HANDLE hevt; unsigned int sended_bytes; unsigned int total_bytes; }ioqueue_writefilen_overlapped_t; typedef struct ioqueue_connectpipe_overlapped_t { ioqueue_base_overlapped_t base; HANDLE client; HANDLE hevt; ioqueue_on_pipe_accept_callback on_accept_callback; }ioqueue_connectpipe_overlapped_t; //mv 2 sockutil.h //static int reuse_addr(SOCKET sock) //{ // BOOL reuseaddr = 1; // return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&reuseaddr, sizeof(reuseaddr)); //} //mv 2 sockutil.h //static int nonblock_sock(SOCKET sock) //{ // unsigned long v = 1; // return ioctlsocket(sock, FIONBIO, &v); //} static int is_os_gte_xp() /* is os version greater and equal than xp */ { static int yes = -1; #ifdef _WIN32 if (yes == -1) { OSVERSIONINFO ver; ver.dwOSVersionInfoSize = sizeof(OSVERSIONINFO); GetVersionEx(&ver); if (ver.dwMajorVersion > 5 || (ver.dwMajorVersion == 5 && ver.dwMinorVersion > 0)) { yes = 1; } } #endif//_WIN32 return yes; } static __inline LONG inc_msg_cnt(ioqueue_t *ioq) { return InterlockedIncrement(&ioq->msg_cnt); } static __inline LONG dec_msg_cnt(ioqueue_t *ioq) { return InterlockedDecrement(&ioq->msg_cnt); } static __inline void add_handler_list(ioqueue_handle_context *handle_ctx, ioqueue_t *ioq) { spinlock_enter(&ioq->handler_list_lock, -1); list_add(&handle_ctx->node, &ioq->handler_list); spinlock_leave(&ioq->handler_list_lock); } static __inline void del_handler_list(ioqueue_handle_context *handle_ctx, ioqueue_t *ioq) { if (handle_ctx->node.next) { spinlock_enter(&ioq->handler_list_lock, -1); list_del(&handle_ctx->node); handle_ctx->node.next = handle_ctx->node.prev = NULL; spinlock_leave(&ioq->handler_list_lock); } } static void ioqueue_handle_context_free(ioqueue_handle_context *handle_ctx) { if (handle_ctx->type == HANDLE_TYPE_UDPSOCK || handle_ctx->type == HANDLE_TYPE_TCPSOCK || handle_ctx->type == HANDLE_TYPE_ACCEPTOR) { if (handle_ctx->u.sock != INVALID_SOCKET) { closesocket(handle_ctx->u.sock); handle_ctx->u.sock = INVALID_SOCKET; } } else if (handle_ctx->type == HANDLE_TYPE_FILE) { if (handle_ctx->u.file != INVALID_HANDLE_VALUE) { CloseHandle(handle_ctx->u.file); handle_ctx->u.file = INVALID_HANDLE_VALUE; } } else if (handle_ctx->type == HANDLE_TYPE_PIPEACCEPTOR) { if (handle_ctx->u.pipe_name) { free(handle_ctx->u.pipe_name); handle_ctx->u.pipe_name = NULL; } } else { TOOLKIT_ASSERT(0); return; } del_handler_list(handle_ctx, handle_ctx->owner); } IMPLEMENT_REF_COUNT_MT(ioqueue_handle_context, ioqueue_handle_context, pending_ios, ioqueue_handle_context_free) static __inline LONG inc_pending_io(ioqueue_handle_context *handle_ctx) { return inc_ref(ioqueue_handle_context, handle_ctx); } static __inline LONG dec_pending_io(ioqueue_handle_context *handle_ctx) { return dec_ref(ioqueue_handle_context, handle_ctx); } static SOCKET new_socket() { SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (sock != INVALID_SOCKET) { reuse_addr(sock); } return sock; } static void delete_socket(SOCKET sock) { LINGER l; l.l_onoff = 1; l.l_linger = 0; setsockopt(sock, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof(l)); closesocket(sock); } TOOLKIT_API ioqueue_t *ioqueue_create() { ioqueue_t *ioq = ZALLOC_T(ioqueue_t); if (!ioq) return NULL; ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (!ioq->iocp) goto on_error_0; if (timer_heap_create(&ioq->tm_queue) != 0) goto on_error_3; spinlock_init(&ioq->tm_queue_lock); spinlock_init(&ioq->connect_list_lock); INIT_LIST_HEAD(&ioq->connect_list); spinlock_init(&ioq->handler_list_lock); INIT_LIST_HEAD(&ioq->handler_list); ///ioq->stop = 0; //ioq->msg_cnt = 0; return ioq; on_error_3: CloseHandle(ioq->iocp); on_error_0: free(ioq); return NULL; } TOOLKIT_API void ioqueue_destroy(ioqueue_t *ioq) { TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(ioqueue_handler_empty(ioq)); TOOLKIT_ASSERT(ioqueue_msg_empty(ioq)); timer_queue_destroy(ioq->tm_queue); CloseHandle(ioq->iocp); free(ioq); } TOOLKIT_API int ioqueue_handler_empty(ioqueue_t *ioq) { int ret; TOOLKIT_ASSERT(ioq); spinlock_enter(&ioq->handler_list_lock, -1); ret = list_empty(&ioq->handler_list); spinlock_leave(&ioq->handler_list_lock); return ret; } TOOLKIT_API int ioqueue_msg_empty(ioqueue_t *ioq) { TOOLKIT_ASSERT(ioq); return ioq->msg_cnt == 0; } TOOLKIT_API int ioqueue_msg_add_handler(ioqueue_t *ioq, int msg_type, int priority, ioqueue_on_msg_callback cb) { TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(cb); TOOLKIT_ASSERT(msg_type >= 0 && msg_type <= MAX_MSG); TOOLKIT_ASSERT(priority >= 0 && priority <= MAX_MSG_PRIORITY); ioq->msg_handlers[msg_type][priority] = cb; return 0; } TOOLKIT_API int ioqueue_msg_remove_handler(ioqueue_t *ioq, int msg_type, int priority) { TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(msg_type >= 0 && msg_type <= MAX_MSG); TOOLKIT_ASSERT(priority >= 0 && priority <= MAX_MSG_PRIORITY); ioq->msg_handlers[msg_type][priority] = NULL; return 0; } static void dispatch_acceptor(int err, DWORD dwBytesTransfer, ioqueue_acceptor_t *acceptor, ioqueue_accept_overlapped_t *overlapped) { ioqueue_t *ioq = acceptor->owner; if (err == 0) { /* only valid for winxp or later, ignore return value */ setsockopt(overlapped->client, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&acceptor->u.sock, sizeof(SOCKET)); } else { delete_socket(overlapped->client); overlapped->client = INVALID_SOCKET; } { SOCKET s = overlapped->client; int accepted = overlapped->on_accept_callback(acceptor, (ioqueue_overlapped_t*)overlapped, s, overlapped->base.user_data, err); if (!err && !accepted && s != INVALID_SOCKET) delete_socket(s); } } static void dispatch_pipe_acceptor(int err, DWORD dwBytesTransfer, ioqueue_pipe_acceptor_t *acceptor, ioqueue_connectpipe_overlapped_t *overlapped) { ioqueue_t *ioq = acceptor->owner; int accepted; CloseHandle(overlapped->hevt); overlapped->hevt = NULL; if (!err && overlapped->client == INVALID_HANDLE_VALUE) err = -1; if (err) { if (overlapped->client != INVALID_HANDLE_VALUE) { CloseHandle(overlapped->client); overlapped->client = INVALID_HANDLE_VALUE; } } accepted = overlapped->on_accept_callback(acceptor, (ioqueue_overlapped_t*)overlapped, overlapped->client, overlapped->base.user_data, err); if (!err && !accepted && overlapped->client != INVALID_HANDLE_VALUE) { CloseHandle(overlapped->client); } } static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped_t *io_ctx) { int err = ret ? 0 : -1; ioqueue_base_overlapped_t *base_ov = (ioqueue_base_overlapped_t*)io_ctx; ioqueue_handle_context *handle_ctx = base_ov->handle_ctx; fastlock_enter(handle_ctx->ov_pending_list_lock); list_del(&base_ov->pending_entry); fastlock_leave(handle_ctx->ov_pending_list_lock); dec_pending_io(handle_ctx); switch (handle_ctx->type) { case HANDLE_TYPE_ACCEPTOR: dispatch_acceptor(err, dwBytesTransfer, handle_ctx, (ioqueue_accept_overlapped_t*)io_ctx); break; case HANDLE_TYPE_PIPEACCEPTOR: dispatch_pipe_acceptor(err, dwBytesTransfer, handle_ctx, (ioqueue_connectpipe_overlapped_t*)io_ctx); break; case HANDLE_TYPE_TCPSOCK: case HANDLE_TYPE_UDPSOCK: case HANDLE_TYPE_FILE: switch (base_ov->type) { case OV_CONNECT: { ioqueue_connect_overlapped_t *overlapped = (ioqueue_connect_overlapped_t*)io_ctx; if (err == 0) { setsockopt(handle_ctx->u.sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); } overlapped->on_connect_callback(handle_ctx, io_ctx, base_ov->user_data, err); } break; case OV_SENDSOME: { ioqueue_sendsome_overlapped_t *overlapped = (ioqueue_sendsome_overlapped_t*)io_ctx; overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->wsabuf.buf, dwBytesTransfer, base_ov->user_data, err); } break; case OV_SENDN: { ioqueue_sendn_overlapped_t *overlapped = (ioqueue_sendn_overlapped_t*)io_ctx; overlapped->sended_bytes += dwBytesTransfer; if (err == 0 && overlapped->sended_bytes < overlapped->total_bytes) { int rc; DWORD bytesWritten; overlapped->wsabuf.buf += dwBytesTransfer; overlapped->wsabuf.len -= dwBytesTransfer; inc_pending_io(handle_ctx); overlapped->base.ov.Internal = 0; overlapped->base.ov.InternalHigh = 0; overlapped->base.ov.Offset = 0; overlapped->base.ov.OffsetHigh = 0; rc = WSASend(handle_ctx->u.sock, &overlapped->wsabuf, 1, &bytesWritten, 0, &overlapped->base.ov, NULL); if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) { dec_pending_io(handle_ctx); overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->original_buf, overlapped->sended_bytes, base_ov->user_data, -1); } } else { overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->original_buf, overlapped->sended_bytes, base_ov->user_data, err); } } break; case OV_RECVSOME: { ioqueue_recvsome_overlapped_t *overlapped = (ioqueue_recvsome_overlapped_t*)io_ctx; overlapped->on_recv_callback(handle_ctx, io_ctx, overlapped->wsabuf.buf, dwBytesTransfer, base_ov->user_data, err); } break; case OV_RECVUNTIL: { ioqueue_recvuntil_overlapped_t *overlapped = (ioqueue_recvuntil_overlapped_t*)io_ctx; if (err == 0 && dwBytesTransfer) { const char *pos; overlapped->recved_bytes += dwBytesTransfer; pos = memstr(overlapped->original_buf, overlapped->recved_bytes, overlapped->delimer); if (pos) { free(overlapped->delimer); overlapped->on_recvuntil_callback(handle_ctx, io_ctx, overlapped->original_buf, overlapped->recved_bytes, (int)strlen(overlapped->delimer)+(int)(pos-overlapped->original_buf), base_ov->user_data, err); } else if (overlapped->recved_bytes < overlapped->total_bytes) { DWORD bytesRead; int rc; overlapped->wsabuf.buf += dwBytesTransfer; overlapped->wsabuf.len -= dwBytesTransfer; inc_pending_io(handle_ctx); overlapped->base.ov.Internal = 0; overlapped->base.ov.InternalHigh = 0; overlapped->base.ov.Offset = 0; overlapped->base.ov.OffsetHigh = 0; overlapped->dwFlags = 0; rc = WSARecv(handle_ctx->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags, &overlapped->base.ov, NULL); if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) { dec_pending_io(handle_ctx); free(overlapped->delimer); overlapped->on_recvuntil_callback(handle_ctx, io_ctx, overlapped->original_buf, overlapped->recved_bytes, 0, base_ov->user_data, -1); } } else { free(overlapped->delimer); overlapped->on_recvuntil_callback(handle_ctx, io_ctx, overlapped->original_buf, overlapped->recved_bytes, 0, base_ov->user_data, -1); } } else { free(overlapped->delimer); overlapped->on_recvuntil_callback(handle_ctx, io_ctx, overlapped->original_buf, overlapped->recved_bytes, 0, base_ov->user_data, err); } } break; case OV_RECVN: { ioqueue_recvn_overlapped_t *overlapped = (ioqueue_recvn_overlapped_t*)io_ctx; overlapped->recved_bytes += dwBytesTransfer; if (err == 0 && overlapped->recved_bytes < overlapped->total_bytes) { int rc; DWORD bytesRead; overlapped->wsabuf.buf += dwBytesTransfer; overlapped->wsabuf.len -= dwBytesTransfer; inc_pending_io(handle_ctx); overlapped->base.ov.Internal = 0; overlapped->base.ov.InternalHigh = 0; overlapped->base.ov.Offset = 0; overlapped->base.ov.OffsetHigh = 0; overlapped->dwFlags = 0; rc = WSARecv(handle_ctx->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags, &overlapped->base.ov, NULL); if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) { dec_pending_io(handle_ctx); overlapped->on_recv_callback(handle_ctx, io_ctx, overlapped->original_buf, overlapped->recved_bytes, base_ov->user_data, -1); } } else { overlapped->on_recv_callback(handle_ctx, io_ctx, overlapped->original_buf, overlapped->recved_bytes, base_ov->user_data, err); } } break; case OV_SENDTO: { ioqueue_sendto_overlapped_t *overlapped = (ioqueue_sendto_overlapped_t*)io_ctx; overlapped->on_sendto_callback(handle_ctx, io_ctx, overlapped->wsabuf.buf, dwBytesTransfer, base_ov->user_data, err); } break; case OV_RECVFROM: { ioqueue_recvfrom_overlapped_t *overlapped = (ioqueue_recvfrom_overlapped_t*)io_ctx; overlapped->on_recvfrom_callback(handle_ctx, io_ctx, (struct sockaddr*)&overlapped->peer, overlapped->addrlen, overlapped->wsabuf.buf, dwBytesTransfer, base_ov->user_data, err); } break; case OV_READFILESOME: { ioqueue_readfilesome_overlapped_t *overlapped = (ioqueue_readfilesome_overlapped_t*)io_ctx; CloseHandle(overlapped->hevt); overlapped->hevt = NULL; overlapped->on_read_callback(handle_ctx, io_ctx, overlapped->buf, dwBytesTransfer, base_ov->user_data, err); } break; case OV_READFILEN: { ioqueue_readfilen_overlapped_t *overlapped = (ioqueue_readfilen_overlapped_t*)io_ctx; CloseHandle(overlapped->hevt); overlapped->hevt = NULL; overlapped->recved_bytes += dwBytesTransfer; if (err == 0 && overlapped->recved_bytes < overlapped->total_bytes) { BOOL ret; DWORD left = overlapped->total_bytes - overlapped->recved_bytes; inc_pending_io(handle_ctx); overlapped->base.ov.Internal = 0; overlapped->base.ov.InternalHigh = 0; overlapped->base.ov.Offset += dwBytesTransfer; if (overlapped->base.ov.Offset < dwBytesTransfer) overlapped->base.ov.OffsetHigh += 1; ret = ReadFile(handle_ctx->u.file, overlapped->buf+overlapped->recved_bytes, left, NULL, &overlapped->base.ov); if (!ret && GetLastError() != ERROR_IO_PENDING) { dec_pending_io(handle_ctx); overlapped->on_read_callback(handle_ctx, io_ctx, overlapped->buf, overlapped->recved_bytes, base_ov->user_data, -1); } } else { overlapped->on_read_callback(handle_ctx, io_ctx, overlapped->buf, overlapped->recved_bytes, base_ov->user_data, err); } } break; case OV_WRITEFILESOME: { ioqueue_writefilesome_overlapped_t *overlapped = (ioqueue_writefilesome_overlapped_t*)io_ctx; CloseHandle(overlapped->hevt); overlapped->hevt = NULL; overlapped->on_write_callback(handle_ctx, io_ctx, overlapped->buf, dwBytesTransfer, base_ov->user_data, err); } break; case OV_WRITEFILEN: { ioqueue_writefilen_overlapped_t *overlapped = (ioqueue_writefilen_overlapped_t*)io_ctx; CloseHandle(overlapped->hevt); overlapped->hevt = NULL; overlapped->sended_bytes += dwBytesTransfer; if (err == 0 && overlapped->sended_bytes < overlapped->total_bytes) { BOOL ret; DWORD left = overlapped->total_bytes - overlapped->sended_bytes; inc_pending_io(handle_ctx); overlapped->base.ov.Internal = 0; overlapped->base.ov.InternalHigh = 0; overlapped->base.ov.Offset += dwBytesTransfer; if (overlapped->base.ov.Offset < dwBytesTransfer) overlapped->base.ov.OffsetHigh += 1; ret = WriteFile(handle_ctx->u.file, overlapped->buf+overlapped->sended_bytes, left, NULL, &overlapped->base.ov); if (!ret && GetLastError() != ERROR_IO_PENDING) { dec_pending_io(handle_ctx); overlapped->on_write_callback(handle_ctx, io_ctx, overlapped->buf, overlapped->sended_bytes, base_ov->user_data, -1); } } else { overlapped->on_write_callback(handle_ctx, io_ctx, overlapped->buf, overlapped->sended_bytes, base_ov->user_data, err); } } break; default: TOOLKIT_ASSERT(0); break; } break; default: TOOLKIT_ASSERT(0); break; } } static void dispatch_msg(ioqueue_t *ioq, int msg_type, int param1, int param2, HANDLE evt) { int chain = 1, i; for (i = 0; chain && i < MAX_MSG_PRIORITY; ++i) { ioqueue_on_msg_callback cb = ioq->msg_handlers[msg_type][i]; if (cb) chain = cb(msg_type, param1, param2); } if (evt) SetEvent(evt); } /*MSG_REMOVE_REGISTAR*/ TOOLKIT_API int ioqueue_post_message(ioqueue_t *ioq, int msg_type, int param1, int param2) { ioqueue_msg *msg; TOOLKIT_ASSERT(ioq); msg = MALLOC_T(ioqueue_msg); if (msg == NULL) { return -1; } msg->msg_type = msg_type; msg->param1 = param1; msg->param2 = param2; msg->evt = NULL; inc_msg_cnt(ioq); if (!PostQueuedCompletionStatus(ioq->iocp, 0, (ULONG_PTR)msg, NULL)) { dec_msg_cnt(ioq); free(msg); return -1; } return 0; } /*It seems no use anywhere*/ TOOLKIT_API int ioqueue_send_message(ioqueue_t *ioq, int msg_type, int param1, int param2) { ioqueue_msg msg = {msg_type, param1, param2}; TOOLKIT_ASSERT(ioq); msg.evt = CreateEventA(NULL, TRUE, FALSE, NULL); inc_msg_cnt(ioq); if (!PostQueuedCompletionStatus(ioq->iocp, 0, (ULONG_PTR)&msg, NULL)) { CloseHandle(msg.evt); dec_msg_cnt(ioq); return -1; } WaitForSingleObject(msg.evt, INFINITE); CloseHandle(msg.evt); dec_msg_cnt(ioq); return 0; } static int poll_all_events(ioqueue_t *ioq, HANDLE *hevts, ioqueue_connect_overlapped_t **ovs, int i) { int count = 0; int left = i; int pos = 0; while (left > 0) { DWORD idx = WaitForMultipleObjects(left, &hevts[pos], FALSE, 0) - WAIT_OBJECT_0; if (idx <= (DWORD)left && idx >= 0) { WSANETWORKEVENTS net_events; ioqueue_connect_overlapped_t *triggered = ovs[idx+pos]; list_del(&triggered->node); WSAEnumNetworkEvents(triggered->base.handle_ctx->u.sock, hevts[idx+pos], &net_events); WSAEventSelect(triggered->base.handle_ctx->u.sock, hevts[idx+pos], 0); CloseHandle(hevts[idx+pos]); triggered->on_connect_callback(triggered->base.handle_ctx, (ioqueue_overlapped_t*)triggered, triggered->base.user_data, net_events.iErrorCode[FD_CONNECT_BIT] == 0 ? 0 : -1); left -= (int)idx + 1; pos += idx+1; count ++; } else { break; } } return count; } static int poll_connect_list(ioqueue_t *ioq) { int count = 0, i = 0; HANDLE hevts[MAXIMUM_WAIT_OBJECTS]; ioqueue_connect_overlapped_t *ovs[MAXIMUM_WAIT_OBJECTS]; ioqueue_connect_overlapped_t *pos, *n; list_for_each_entry_safe(pos, n, &ioq->connect_list, ioqueue_connect_overlapped_t, node) { hevts[i] = pos->hevt; ovs[i] = pos; i++; if (i == MAXIMUM_WAIT_OBJECTS) { count += poll_all_events(ioq, hevts, ovs, i); i = 0; } } if (i > 0) { count += poll_all_events(ioq, hevts, ovs, i); } return count; } TOOLKIT_API void* ioqueue_set_user_data(ioqueue_t* ioq, void* user_data) { void* old; TOOLKIT_ASSERT(ioq); old = ioq->user_data; ioq->user_data = user_data; return old; } TOOLKIT_API void* ioqueue_get_user_data(ioqueue_t* ioq) { TOOLKIT_ASSERT(ioq); return ioq->user_data; } TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout) { ioqueue_t *ioq = (ioqueue_t*)q; int count = 0, t = 0; /* network and msg, dispatch until no events */ do { BOOL ret; ULONG_PTR iocp_key = 0; LPOVERLAPPED iocp_pov = 0; DWORD dwBytesTransfer = 0; ret = GetQueuedCompletionStatus(ioq->iocp, &dwBytesTransfer, &iocp_key, &iocp_pov, t ? 0 : (DWORD)timeout); if (iocp_pov) { /* network io */ ioqueue_overlapped_t *io_ctx = (ioqueue_overlapped_t*)iocp_pov; dispatch_network(ret, dwBytesTransfer, io_ctx); t++; count ++; } else if (ret && iocp_key && !iocp_pov) { /* msg */ ioqueue_msg *msg = (ioqueue_msg *)iocp_key; int msg_type = msg->msg_type; int param1 = msg->param1; int param2 = msg->param2; HANDLE evt = msg->evt; if (!evt) free(msg); dispatch_msg(ioq, msg_type, param1, param2, evt); dec_msg_cnt(ioq); t++; count ++; } else { t = 0; } } while (t > 0); /* win2k connect event */ if (!is_os_gte_xp()) { spinlock_enter(&ioq->connect_list_lock, -1); poll_connect_list(ioq); spinlock_leave(&ioq->connect_list_lock); } /* timer heap */ spinlock_enter(&ioq->tm_queue_lock, -1); count += timer_queue_poll(ioq->tm_queue, NULL); /* dispatch timer heap */ spinlock_leave(&ioq->tm_queue_lock); if (ioq->stop == -1) { if (InterlockedCompareExchange(&ioq->stop, -2, -1) == -1) { /* close all handler */ ioqueue_handle_context *pos, *n; spinlock_enter(&ioq->handler_list_lock, -1); list_for_each_entry_safe(pos, n, &ioq->handler_list, ioqueue_handle_context, node) { if (pos->type != HANDLE_TYPE_FILE) { closesocket(pos->u.sock); pos->u.sock = INVALID_SOCKET; } else { CloseHandle(pos->u.file); pos->u.file = INVALID_HANDLE_VALUE; } } spinlock_leave(&ioq->handler_list_lock); } } return count; } TOOLKIT_API void ioqueue_stop(ioqueue_t *ioq) { TOOLKIT_ASSERT(ioq); ioq->stop = -1; } /* timer */ TOOLKIT_API int ioqueue_timer_schedule(ioqueue_t *ioq, timer_entry *entry, unsigned int delay) { int err; TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(entry); if (ioq->stop) return -1; spinlock_enter(&ioq->tm_queue_lock, -1); err = timer_queue_schedule(ioq->tm_queue, entry, delay); spinlock_leave(&ioq->tm_queue_lock); return err; } TOOLKIT_API int ioqueue_timer_cancel(ioqueue_t *ioq, timer_entry *entry, int cancel) { int err; TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(entry); spinlock_enter(&ioq->tm_queue_lock, -1); err = timer_queue_cancel(ioq->tm_queue, entry, cancel); spinlock_leave(&ioq->tm_queue_lock); return err; } /* acceptor */ TOOLKIT_API int ioqueue_acceptor_create(ioqueue_t *ioq, const char *ip, unsigned short port, ioqueue_acceptor_t* acceptor) { struct sockaddr_in service = {0}; TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(port); if (ioq->stop) return -1; memset(acceptor, 0, sizeof(ioqueue_acceptor_t)); acceptor->u.sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (acceptor->u.sock == INVALID_SOCKET) goto on_error; nonblock_sock(acceptor->u.sock); service.sin_family = AF_INET; service.sin_port = htons(port); service.sin_addr.s_addr = ip ? inet_addr(ip) : htonl(INADDR_ANY); if (bind(acceptor->u.sock, (struct sockaddr*)&service, sizeof(struct sockaddr)) != 0) goto on_error; if (!CreateIoCompletionPort((HANDLE)acceptor->u.sock, ioq->iocp, 0, 0)) goto on_error; acceptor->type = HANDLE_TYPE_ACCEPTOR; acceptor->owner = ioq; fastlock_init(acceptor->ov_pending_list_lock); INIT_LIST_HEAD(&acceptor->ov_pending_list); add_handler_list(acceptor, ioq); inc_ref(ioqueue_handle_context, acceptor); return 0; on_error: if (acceptor->u.sock != INVALID_SOCKET) closesocket(acceptor->u.sock); return -1; } TOOLKIT_API int ioqueue_acceptor_listen(ioqueue_acceptor_t* acceptor, int backlog) { TOOLKIT_ASSERT(acceptor); return listen(acceptor->u.sock, backlog); } TOOLKIT_API void ioqueue_acceptor_destroy(ioqueue_acceptor_t* acceptor) { TOOLKIT_ASSERT(acceptor); dec_ref(ioqueue_handle_context, acceptor); } TOOLKIT_API void ioqueue_acceptor_close(ioqueue_acceptor_t* acceptor) { SOCKET s; TOOLKIT_ASSERT(acceptor); s = acceptor->u.sock; if (s != INVALID_SOCKET) { acceptor->u.sock = INVALID_SOCKET; closesocket(s); } } TOOLKIT_API int ioqueue_acceptor_async_accept(ioqueue_acceptor_t* acceptor, ioqueue_overlapped_t *ov, ioqueue_on_accept_callback on_accept_callback, void *user_data) { ioqueue_t *ioq; ioqueue_accept_overlapped_t *overlapped; DWORD bytesTransfer; BOOL ret; TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_ACCEPTOR); TOOLKIT_ASSERT(on_accept_callback); ioq = acceptor->owner; if (ioq->stop) return -1; overlapped = (ioqueue_accept_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_accept_overlapped_t)); overlapped->client = new_socket(); if (overlapped->client == INVALID_SOCKET) return -1; fastlock_enter(acceptor->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &acceptor->ov_pending_list); fastlock_leave(acceptor->ov_pending_list_lock); overlapped->base.type = OV_ACCEPT; overlapped->base.user_data = user_data; overlapped->base.handle_ctx = acceptor; inc_pending_io(acceptor); overlapped->on_accept_callback = on_accept_callback; ret = AcceptEx(acceptor->u.sock, overlapped->client, overlapped->accept_buf, 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, &bytesTransfer, &overlapped->base.ov); if (ret || WSAGetLastError() == WSA_IO_PENDING) return 0; #if 0 { DWORD dwError = WSAGetLastError(); printf("dwError = %d\n", dwError); } #endif fastlock_enter(acceptor->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(acceptor->ov_pending_list_lock); dec_pending_io(acceptor); delete_socket(overlapped->client); return -1; } TOOLKIT_API int ioqueue_acceptor_accept(ioqueue_acceptor_t* acceptor, SOCKET *s, struct sockaddr *addr, int *addrlen, int timeout) { struct timeval tm; fd_set set; fd_set ex_set; int rc; FD_ZERO(&set); FD_ZERO(&ex_set); FD_SET(acceptor->u.sock, &set); FD_SET(acceptor->u.sock, &ex_set); tm.tv_sec = timeout / 1000; tm.tv_usec = 1000 * (timeout % 1000); rc = select(acceptor->u.sock+1, &set, NULL, &ex_set, &tm); if (rc > 0) { if (FD_ISSET(acceptor->u.sock, &ex_set)) return -1; if (FD_ISSET(acceptor->u.sock, &set)) { SOCKET fd = accept(acceptor->u.sock, addr, addrlen); if (fd != INVALID_SOCKET) { *s = fd; return 0; } } } return -1; } TOOLKIT_API int ioqueue_acceptor_create_client(ioqueue_acceptor_t* acceptor, SOCKET s, ioqueue_tcpsock_t *tcpsock) { ioqueue_t *ioq; TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(tcpsock); TOOLKIT_ASSERT(s != INVALID_SOCKET); ioq = acceptor->owner; if (ioq->stop) return -1; memset(tcpsock, 0, sizeof(ioqueue_tcpsock_t)); tcpsock->type = HANDLE_TYPE_TCPSOCK; tcpsock->u.sock = s; tcpsock->owner = ioq; tcpsock->user_data = NULL; fastlock_init(tcpsock->ov_pending_list_lock); INIT_LIST_HEAD(&tcpsock->ov_pending_list); if (!CreateIoCompletionPort((HANDLE)s, ioq->iocp, 0, 0)) /* bind to iocp */ return -1; add_handler_list(tcpsock, ioq); inc_ref(ioqueue_handle_context, tcpsock); return 0; } TOOLKIT_API SOCKET ioqueue_acceptor_get_raw_socket(ioqueue_acceptor_t* acceptor) { TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_ACCEPTOR); TOOLKIT_ASSERT(acceptor->u.sock != INVALID_SOCKET); return acceptor->u.sock; } TOOLKIT_API ioqueue_t* ioqueue_acceptor_get_owned_ioqueue(ioqueue_acceptor_t* acceptor) { TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_ACCEPTOR); return acceptor->owner; } TOOLKIT_API void *ioqueue_acceptor_set_user_data(ioqueue_acceptor_t* acceptor, void *user_data) { void *old; TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_ACCEPTOR); old = acceptor->user_data; acceptor->user_data = user_data; return old; } TOOLKIT_API void *ioqueue_acceptor_get_user_data(ioqueue_acceptor_t* acceptor) { TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_ACCEPTOR); return acceptor->user_data; } TOOLKIT_API int ioqueue_acceptor_cancel(ioqueue_acceptor_t* acceptor) { TOOLKIT_ASSERT(acceptor); return CancelIo(acceptor->u.file) ? 0 : -1; } /* tcpsock */ TOOLKIT_API int ioqueue_tcpsock_create(ioqueue_t *ioq, ioqueue_tcpsock_t *tcpsock) { SOCKET s; TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(tcpsock); if (ioq->stop) return -1; s = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (s == INVALID_SOCKET) return -1; if (ioqueue_tcpsock_create_from_handle(ioq, s, tcpsock) != 0) { closesocket(s); return -1; } return 0; } TOOLKIT_API int ioqueue_tcpsock_create_from_handle(ioqueue_t *ioq, SOCKET s, ioqueue_tcpsock_t *tcpsock) { TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(s != INVALID_SOCKET); TOOLKIT_ASSERT(tcpsock); if (ioq->stop) return -1; memset(tcpsock, 0, sizeof(ioqueue_tcpsock_t)); tcpsock->u.sock = s; reuse_addr(tcpsock->u.sock); nonblock_sock(tcpsock->u.sock); /* winxp or more we use ConnectEx, this funtion need bind at first */ if (is_os_gte_xp()) { struct sockaddr_in local = {0}; local.sin_family = AF_INET; local.sin_port = htons(0); local.sin_addr.s_addr = INADDR_ANY; if (bind(tcpsock->u.sock, (struct sockaddr*)&local, sizeof(struct sockaddr)) != 0) return -1; } else { /* for win2k we use connect, set socket to non-block mode */ //u_long ul_onoff = 1; //if (ioctlsocket(tcpsock->u.sock, FIONBIO, &ul_onoff) != 0) // goto on_error; } if (!CreateIoCompletionPort((HANDLE)tcpsock->u.sock, ioq->iocp, 0, 0)) return -1; fastlock_init(tcpsock->ov_pending_list_lock); INIT_LIST_HEAD(&tcpsock->ov_pending_list); tcpsock->type = HANDLE_TYPE_TCPSOCK; tcpsock->owner = ioq; add_handler_list(tcpsock, ioq); inc_ref(ioqueue_handle_context, tcpsock); return 0; } TOOLKIT_API int ioqueue_tcpsock_async_connect(ioqueue_tcpsock_t *tcpsock, ioqueue_overlapped_t *ov, const char *ip, unsigned short port, ioqueue_on_connect_callback on_connect_callback, void* user_data) { ioqueue_t *ioq; ioqueue_connect_overlapped_t *overlapped; struct sockaddr_in service; TOOLKIT_ASSERT(tcpsock); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(ip); TOOLKIT_ASSERT(port); TOOLKIT_ASSERT(on_connect_callback); ioq = tcpsock->owner; if (ioq->stop) return -1; ioq = tcpsock->owner; overlapped = (ioqueue_connect_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_connect_overlapped_t)); fastlock_enter(tcpsock->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list); fastlock_leave(tcpsock->ov_pending_list_lock); overlapped->base.type = OV_CONNECT; overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock; overlapped->base.user_data = user_data; overlapped->on_connect_callback = on_connect_callback; inc_pending_io(tcpsock); if (is_os_gte_xp()) { /* use ConnectEx */ DWORD dwBytes; BOOL ret; BOOL (PASCAL FAR * lpfnConnectEx) (IN SOCKET s, IN const struct sockaddr FAR *name, IN int namelen, IN PVOID lpSendBuffer OPTIONAL, IN DWORD dwSendDataLength, OUT LPDWORD lpdwBytesSent, IN LPOVERLAPPED lpOverlapped ); // LPFN_CONNECTEX lpfnConnectEx; GUID GuidConnectEx = WSAID_CONNECTEX; if (WSAIoctl(tcpsock->u.sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidConnectEx, sizeof(GuidConnectEx), &lpfnConnectEx, sizeof(lpfnConnectEx), &dwBytes, NULL, NULL) != 0) { fastlock_enter(tcpsock->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(tcpsock->ov_pending_list_lock); dec_pending_io(tcpsock); return -1; } memset(&service, 0, sizeof(service)); service.sin_family = AF_INET; service.sin_port = htons(port); service.sin_addr.s_addr = inet_addr(ip); { struct sockaddr_in local_addr = {0}; // bind to a INADDR_ANY and port 0 to let OS choose an local address local_addr.sin_family = AF_INET; local_addr.sin_addr.s_addr = htonl(INADDR_ANY); local_addr.sin_port = htons(0); ret = bind(tcpsock->u.sock, (SOCKADDR*)&local_addr, sizeof(local_addr)); // caution: ConnectEx need socket to be bounded at first } if (ret == 0) { ret = lpfnConnectEx(tcpsock->u.sock, (struct sockaddr*)&service, sizeof(service), NULL, 0, NULL, &overlapped->base.ov); if (ret || WSAGetLastError() == WSA_IO_PENDING) return 0; } } else { /* use non-blocking connect */ overlapped->hevt = WSACreateEvent(); if (WSAEventSelect(tcpsock->u.sock, overlapped->hevt, FD_CONNECT) == 0) { spinlock_enter(&ioq->connect_list_lock, -1); list_add_tail(&overlapped->node, &ioq->connect_list); spinlock_leave(&ioq->connect_list_lock); if (connect(tcpsock->u.sock, (struct sockaddr*)&service, sizeof(service)) == 0) { return 0; } else { spinlock_enter(&ioq->connect_list_lock, -1); list_del(&overlapped->node); spinlock_leave(&ioq->connect_list_lock); } } WSACloseEvent(overlapped->hevt); } fastlock_enter(tcpsock->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(tcpsock->ov_pending_list_lock); dec_pending_io(tcpsock); return -1; } TOOLKIT_API int ioqueue_tcpsock_conect(ioqueue_tcpsock_t *tcpsock, const char *ip, unsigned short port, int timeout) { fd_set wr_set; fd_set ex_set; struct timeval tm; TOOLKIT_ASSERT(tcpsock); TOOLKIT_ASSERT(ip); TOOLKIT_ASSERT(port > 0); FD_ZERO(&wr_set); FD_ZERO(&ex_set); FD_SET(tcpsock->u.sock, &wr_set); FD_SET(tcpsock->u.sock, &ex_set); tm.tv_sec = timeout / 1000; tm.tv_usec = 1000 * (timeout % 1000); if (select(tcpsock->u.sock+1, NULL, &wr_set, &ex_set, &tm) > 0) { if (FD_ISSET(tcpsock->u.sock, &ex_set)) return -1; if (FD_ISSET(tcpsock->u.sock, &wr_set)) return 0; } return -1; } TOOLKIT_API int ioqueue_tcpsock_async_sendsome(ioqueue_tcpsock_t *tcpsock, ioqueue_overlapped_t *ov, void *buf, unsigned int len, ioqueue_on_send_callback on_send_callback, void *user_data) { ioqueue_sendsome_overlapped_t *overlapped; DWORD bytesWritten; int rc; ioqueue_t *ioq; TOOLKIT_ASSERT(tcpsock); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_send_callback); ioq = ioqueue_tcpsock_get_owned_ioqueue(tcpsock); if (ioq->stop) return -1; overlapped = (ioqueue_sendsome_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_sendsome_overlapped_t)); fastlock_enter(tcpsock->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list); fastlock_leave(tcpsock->ov_pending_list_lock); overlapped->base.type = OV_SENDSOME; overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock; overlapped->base.user_data = user_data; overlapped->on_send_callback = on_send_callback; overlapped->wsabuf.len = len; overlapped->wsabuf.buf = buf; inc_pending_io(tcpsock); rc = WSASend(tcpsock->u.sock, &overlapped->wsabuf, 1, &bytesWritten, 0, &overlapped->base.ov, NULL); if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING) return 0; fastlock_enter(tcpsock->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(tcpsock->ov_pending_list_lock); dec_pending_io(tcpsock); return -1; } TOOLKIT_API int ioqueue_tcpsock_async_sendn(ioqueue_tcpsock_t *tcpsock, ioqueue_overlapped_t *ov, void *buf, unsigned int len, ioqueue_on_send_callback on_send_callback, void* user_data) { ioqueue_sendn_overlapped_t *overlapped; DWORD bytesWritten; int rc; ioqueue_t *ioq; TOOLKIT_ASSERT(tcpsock); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_send_callback); ioq = ioqueue_tcpsock_get_owned_ioqueue(tcpsock); if (ioq->stop) return -1; overlapped = (ioqueue_sendn_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_sendn_overlapped_t)); fastlock_enter(tcpsock->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list); fastlock_leave(tcpsock->ov_pending_list_lock); overlapped->base.type = OV_SENDN; overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock; overlapped->base.user_data = user_data; overlapped->on_send_callback = on_send_callback; overlapped->wsabuf.len = len; overlapped->wsabuf.buf = buf; overlapped->original_buf = buf; overlapped->sended_bytes = 0; overlapped->total_bytes = len; inc_pending_io(tcpsock); rc = WSASend(tcpsock->u.sock, &overlapped->wsabuf, 1, &bytesWritten, 0, &overlapped->base.ov, NULL); if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING) return 0; bytesWritten = WSAGetLastError(); fastlock_enter(tcpsock->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(tcpsock->ov_pending_list_lock); dec_pending_io(tcpsock); return -1; } TOOLKIT_API int ioqueue_tcpsock_async_senduntil(ioqueue_tcpsock_t *tcpsock, ioqueue_overlapped_t *ov, void *buf, unsigned int len, const char *delimer, ioqueue_on_send_callback on_send_cb, void* user_data) { const char *p; TOOLKIT_ASSERT(tcpsock); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_send_cb); TOOLKIT_ASSERT(delimer); p = memstr(buf, len, delimer); if (!p) return -1; p += strlen(delimer); return ioqueue_tcpsock_async_sendn(tcpsock, ov, buf, p - (char*)buf, on_send_cb, user_data); } TOOLKIT_API int ioqueue_tcpsock_sendsome(ioqueue_tcpsock_t *tcpsock, void *buf, unsigned int len, int timeout) { TOOLKIT_ASSERT(tcpsock); return send(tcpsock->u.sock, buf, len, 0); } TOOLKIT_API int ioqueue_tcpsock_sendn(ioqueue_tcpsock_t *tcpsock, void *buf, unsigned int len, int timeout) { return tsend_n(tcpsock->u.sock, buf, len, timeout); } TOOLKIT_API int ioqueue_tcpsock_senduntil(ioqueue_tcpsock_t *tcpsock, void *buf, unsigned int len, const char *delimer, int timeout) { return tsend_until(tcpsock->u.sock, buf, len, delimer, timeout); } TOOLKIT_API int ioqueue_tcpsock_async_recvsome(ioqueue_tcpsock_t *tcpsock, ioqueue_overlapped_t *ov, void *buf, unsigned int len, ioqueue_on_recv_callback on_recv_callback, void *user_data) { ioqueue_recvsome_overlapped_t *overlapped; DWORD bytesRead; int rc; ioqueue_t *ioq; TOOLKIT_ASSERT(tcpsock); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_recv_callback); ioq = tcpsock->owner; if (ioq->stop) return -1; overlapped = (ioqueue_recvsome_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_recvsome_overlapped_t)); fastlock_enter(tcpsock->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list); fastlock_leave(tcpsock->ov_pending_list_lock); overlapped->base.type = OV_RECVSOME; overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock; overlapped->base.user_data = user_data; overlapped->on_recv_callback = on_recv_callback; overlapped->wsabuf.len = len; overlapped->wsabuf.buf = buf; overlapped->dwFlags = 0; inc_pending_io(tcpsock); rc = WSARecv(tcpsock->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags, &overlapped->base.ov, NULL); if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING) return 0; fastlock_enter(tcpsock->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(tcpsock->ov_pending_list_lock); dec_pending_io(tcpsock); return -1; } TOOLKIT_API int ioqueue_tcpsock_async_recvn(ioqueue_tcpsock_t *tcpsock, ioqueue_overlapped_t *ov, void *buf, unsigned int len, ioqueue_on_recv_callback on_recv_callback, void *user_data) { ioqueue_recvn_overlapped_t *overlapped; DWORD bytesRead; int rc; ioqueue_t *ioq; TOOLKIT_ASSERT(tcpsock); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_recv_callback); ioq = tcpsock->owner; if (ioq->stop) return -1; overlapped = (ioqueue_recvn_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_recvn_overlapped_t)); fastlock_enter(tcpsock->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list); fastlock_leave(tcpsock->ov_pending_list_lock); overlapped->base.type = OV_RECVN; overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock; overlapped->base.user_data = user_data; overlapped->on_recv_callback = on_recv_callback; overlapped->wsabuf.len = len; overlapped->wsabuf.buf = buf; overlapped->original_buf = buf; overlapped->recved_bytes = 0; overlapped->total_bytes = len; overlapped->dwFlags = 0; inc_pending_io(tcpsock); rc = WSARecv(tcpsock->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags, &overlapped->base.ov, NULL); if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING) return 0; fastlock_enter(tcpsock->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(tcpsock->ov_pending_list_lock); dec_pending_io(tcpsock); return -1; } TOOLKIT_API int ioqueue_tcpsock_async_recvuntil(ioqueue_tcpsock_t *tcpsock, ioqueue_overlapped_t *ov, void *buf, unsigned int len, const char *delimer, ioqueue_on_recvuntil_callback on_recvuntil_callback, void *user_data) { ioqueue_recvuntil_overlapped_t *overlapped; DWORD bytesRead; int rc; ioqueue_t *ioq; TOOLKIT_ASSERT(tcpsock); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(delimer); TOOLKIT_ASSERT(on_recvuntil_callback); ioq = tcpsock->owner; if (ioq->stop) return -1; overlapped = (ioqueue_recvuntil_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_recvuntil_overlapped_t)); fastlock_enter(tcpsock->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list); fastlock_leave(tcpsock->ov_pending_list_lock); overlapped->base.type = OV_RECVUNTIL; overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock; overlapped->base.user_data = user_data; overlapped->on_recvuntil_callback = on_recvuntil_callback; overlapped->wsabuf.len = len; overlapped->wsabuf.buf = buf; overlapped->original_buf = buf; overlapped->recved_bytes = 0; overlapped->total_bytes = len; overlapped->delimer = _strdup(delimer); overlapped->dwFlags = 0; inc_pending_io(tcpsock); rc = WSARecv(tcpsock->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags, &overlapped->base.ov, NULL); if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING) return 0; fastlock_enter(tcpsock->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(tcpsock->ov_pending_list_lock); dec_pending_io(tcpsock); return -1; } TOOLKIT_API int ioqueue_tcpsock_recvsome(ioqueue_tcpsock_t *tcpsock, void *buf, unsigned int len, int timeout) { return recv(tcpsock->u.sock, buf, len, 0); } TOOLKIT_API int ioqueue_tcpsock_recvn(ioqueue_tcpsock_t *tcpsock, void *buf, unsigned int len, int timeout) { return trecv_n(tcpsock->u.sock, buf, len, timeout); } TOOLKIT_API int ioqueue_tcpsock_recvuntil(ioqueue_tcpsock_t *tcpsock, void *buf, unsigned int len, const char *delimer, unsigned int *header_len, int timeout) { return trecv_until(tcpsock->u.sock, buf, len, delimer, header_len, timeout); } TOOLKIT_API void ioqueue_tcpsock_close(ioqueue_tcpsock_t *tcpsock) { SOCKET s; TOOLKIT_ASSERT(tcpsock); s = tcpsock->u.sock; if (s != INVALID_SOCKET) { tcpsock->u.sock = INVALID_SOCKET; closesocket(s); } } TOOLKIT_API void ioqueue_tcpsock_destroy(ioqueue_tcpsock_t *tcpsock) { TOOLKIT_ASSERT(tcpsock); dec_ref(ioqueue_handle_context, tcpsock); } TOOLKIT_API int ioqueue_tcpsock_shutdown(ioqueue_tcpsock_t *tcpsock, int how) { TOOLKIT_ASSERT(tcpsock); return shutdown(tcpsock->u.sock, how); } TOOLKIT_API SOCKET ioqueue_tcpsock_get_raw_socket(ioqueue_tcpsock_t *tcpsock) { TOOLKIT_ASSERT(tcpsock); return tcpsock->u.sock; } TOOLKIT_API ioqueue_t* ioqueue_tcpsock_get_owned_ioqueue(ioqueue_tcpsock_t *tcpsock) { TOOLKIT_ASSERT(tcpsock); return tcpsock->owner; } TOOLKIT_API void *ioqueue_tcpsock_set_user_data(ioqueue_tcpsock_t *tcpsock, void *user_data) { void *old; TOOLKIT_ASSERT(tcpsock); old = tcpsock->user_data; tcpsock->user_data = user_data; return old; } TOOLKIT_API void *ioqueue_tcpsock_get_user_data(ioqueue_tcpsock_t *tcpsock) { TOOLKIT_ASSERT(tcpsock); return tcpsock->user_data; } TOOLKIT_API int ioqueue_tcpsock_cancel(ioqueue_tcpsock_t* tcpsock) { TOOLKIT_ASSERT(tcpsock); return CancelIo(tcpsock->u.file) ? 0 : -1; } /* udpsock */ TOOLKIT_API int ioqueue_udpsock_create(ioqueue_t *ioq, ioqueue_udpsock_t *udpsock) { SOCKET s; TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(udpsock); if (ioq->stop) return -1; s = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED); if (s == INVALID_SOCKET) return -1; if (ioqueue_udpsock_create_from_handle(ioq, s, udpsock) != 0) { closesocket(s); return -1; } return 0; } TOOLKIT_API int ioqueue_udpsock_create_from_handle(ioqueue_t *ioq, SOCKET s, ioqueue_udpsock_t *udpsock) { TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(udpsock); TOOLKIT_ASSERT(s != INVALID_SOCKET); if (ioq->stop) return -1; memset(udpsock, 0, sizeof(ioqueue_udpsock_t)); udpsock->u.sock = s; nonblock_sock(udpsock->u.sock); if (!CreateIoCompletionPort((HANDLE)udpsock->u.sock, ioq->iocp, 0, 0)) return -1; fastlock_init(udpsock->ov_pending_list_lock); INIT_LIST_HEAD(&udpsock->ov_pending_list); udpsock->type = HANDLE_TYPE_UDPSOCK; udpsock->owner = ioq; add_handler_list(udpsock, ioq); inc_ref(ioqueue_handle_context, udpsock); return 0; } TOOLKIT_API void ioqueue_udpsock_close(ioqueue_udpsock_t *udpsock) { SOCKET s; TOOLKIT_ASSERT(udpsock); s = udpsock->u.sock; if (s != INVALID_SOCKET) { udpsock->u.sock = INVALID_SOCKET; closesocket(s); } } TOOLKIT_API void ioqueue_udpsock_destroy(ioqueue_udpsock_t *udpsock) { TOOLKIT_ASSERT(udpsock); dec_ref(ioqueue_handle_context, udpsock); } TOOLKIT_API int ioqueue_udpsock_async_sendto(ioqueue_udpsock_t* udpsock, ioqueue_overlapped_t *ov, void *buf, int len, const struct sockaddr* to, int tolen, ioqueue_on_sendto_callback on_sendto_callback, void *user_data) { ioqueue_sendto_overlapped_t *overlapped; int rc; DWORD bytesWritten; ioqueue_t *ioq; TOOLKIT_ASSERT(udpsock); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(to); TOOLKIT_ASSERT(on_sendto_callback); TOOLKIT_ASSERT(ov); ioq = udpsock->owner; if (ioq->stop) return -1; overlapped = (ioqueue_sendto_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_sendto_overlapped_t)); fastlock_enter(udpsock->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &udpsock->ov_pending_list); fastlock_leave(udpsock->ov_pending_list_lock); overlapped->base.type = OV_SENDTO; overlapped->base.user_data = user_data; overlapped->base.handle_ctx = (ioqueue_handle_context*)udpsock; overlapped->on_sendto_callback = on_sendto_callback; overlapped->wsabuf.len = len; overlapped->wsabuf.buf = buf; inc_pending_io(udpsock); rc = WSASendTo(udpsock->u.sock, &overlapped->wsabuf, 1, &bytesWritten, 0, to, tolen, &overlapped->base.ov, NULL); if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING) return 0; fastlock_enter(udpsock->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(udpsock->ov_pending_list_lock); dec_pending_io(udpsock); return -1; } TOOLKIT_API int ioqueue_udpsock_sendto(ioqueue_udpsock_t *udpsock, void *buf, int len, const struct sockaddr* to, int tolen, int timeout) { return tsendto(udpsock->u.sock, buf, len, to, tolen, timeout); } TOOLKIT_API int ioqueue_udpsock_async_recvfrom(ioqueue_udpsock_t* udpsock, ioqueue_overlapped_t *ov, void* buf, int len, ioqueue_on_recvfrom_callback on_recvfrom_callback, void *user_data) { ioqueue_recvfrom_overlapped_t *overlapped; int rc; DWORD bytesRead; ioqueue_t *ioq; TOOLKIT_ASSERT(udpsock); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_recvfrom_callback); ioq = udpsock->owner; if (ioq->stop) return -1; overlapped = (ioqueue_recvfrom_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_recvfrom_overlapped_t)); fastlock_enter(udpsock->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &udpsock->ov_pending_list); fastlock_leave(udpsock->ov_pending_list_lock); overlapped->base.type = OV_RECVFROM; overlapped->base.user_data = user_data; overlapped->base.handle_ctx = (ioqueue_handle_context*)udpsock; overlapped->on_recvfrom_callback = on_recvfrom_callback; overlapped->wsabuf.len = len; overlapped->wsabuf.buf = buf; overlapped->dwFlags = 0; inc_pending_io(udpsock); rc = WSARecvFrom(udpsock->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags, (struct sockaddr*)&overlapped->peer, &overlapped->addrlen, &overlapped->base.ov, NULL); if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING) return 0; fastlock_enter(udpsock->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(udpsock->ov_pending_list_lock); dec_pending_io(udpsock); return -1; } TOOLKIT_API int ioqueue_udpsock_recvfrom(ioqueue_udpsock_t* udpsock, ioqueue_overlapped_t *overlapped, void* buf, int len, struct sockaddr *fromaddr, int *addrlen, int timeout) { return trecvfrom(udpsock->u.sock, buf, len, fromaddr, addrlen, timeout); } TOOLKIT_API SOCKET ioqueue_udpsock_get_raw_socket(ioqueue_udpsock_t *udpsock) { TOOLKIT_ASSERT(udpsock); return udpsock->u.sock; } TOOLKIT_API ioqueue_t* ioqueue_udpsock_get_owned_ioqueue(ioqueue_udpsock_t *udpsock) { TOOLKIT_ASSERT(udpsock); return udpsock->owner; } TOOLKIT_API void *ioqueue_udpsock_set_user_data(ioqueue_udpsock_t *udpsock, void *user_data) { void *old; TOOLKIT_ASSERT(udpsock); old = udpsock->user_data; udpsock->user_data = user_data; return old; } TOOLKIT_API void *ioqueue_udpsock_get_user_data(ioqueue_udpsock_t *udpsock) { TOOLKIT_ASSERT(udpsock); return udpsock->user_data; } TOOLKIT_API int ioqueue_udpsock_cancel(ioqueue_udpsock_t *udpsock) { TOOLKIT_ASSERT(udpsock); return CancelIo(udpsock->u.file) ? 0 : -1; } /* file */ TOOLKIT_API int ioqueue_file_create(ioqueue_t *ioq, const char *path, DWORD dwDesiredAccess, DWORD dwShareMode, DWORD dwCreationDisposition, DWORD dwFlagsAndAttributes, ioqueue_file_t *file) { HANDLE hFile; TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(path); TOOLKIT_ASSERT(file); if (ioq->stop) return -1; hFile = CreateFileA(path, dwDesiredAccess, dwShareMode, NULL, dwCreationDisposition, dwFlagsAndAttributes|FILE_FLAG_OVERLAPPED, NULL); return ioqueue_file_create_from_handle(ioq, hFile, file); } TOOLKIT_API int ioqueue_file_create_from_handle(ioqueue_t *ioq, HANDLE h, ioqueue_file_t *file) { TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(file); if (ioq->stop) return -1; memset(file, 0, sizeof(ioqueue_file_t)); file->u.file = h; if (file->u.file == INVALID_HANDLE_VALUE) return -1; if (!CreateIoCompletionPort(file->u.file, ioq->iocp, 0, 0)) { CloseHandle(file->u.file); file->u.file = INVALID_HANDLE_VALUE; return -1; } fastlock_init(file->ov_pending_list_lock); INIT_LIST_HEAD(&file->ov_pending_list); file->type = HANDLE_TYPE_FILE; file->owner = ioq; add_handler_list(file, ioq); inc_ref(ioqueue_handle_context, file); return 0; } TOOLKIT_API void ioqueue_file_close(ioqueue_file_t* file) { HANDLE s; TOOLKIT_ASSERT(file); s = file->u.file; if (s != INVALID_HANDLE_VALUE) { file->u.file = INVALID_HANDLE_VALUE; CloseHandle(s); } } TOOLKIT_API void ioqueue_file_destroy(ioqueue_file_t* file) { TOOLKIT_ASSERT(file); dec_ref(ioqueue_handle_context, file); } TOOLKIT_API int ioqueue_file_async_readsome(ioqueue_file_t* file, ioqueue_overlapped_t *ov, void *buf, unsigned int len, ioqueue_on_read_callback on_read_callback, void *user_data) { return ioqueue_file_async_readsome_at(file, ov, buf, len, 0, 0, on_read_callback, user_data); } TOOLKIT_API int ioqueue_file_async_readn(ioqueue_file_t* file, ioqueue_overlapped_t *overlapped, void *buf, unsigned int len, ioqueue_on_read_callback on_read_cb, void *user_data) { return ioqueue_file_async_readn_at(file, overlapped, buf, len, 0, 0, on_read_cb, user_data); } TOOLKIT_API int ioqueue_file_readsome(ioqueue_file_t *file, void *buf, unsigned int len) { return ioqueue_file_readsome_at(file, buf, len, 0, 0); } TOOLKIT_API int ioqueue_file_readn(ioqueue_file_t *file, void *buf, unsigned int len) { return ioqueue_file_readn_at(file, buf, len, 0, 0); } TOOLKIT_API int ioqueue_file_async_readsome_at(ioqueue_file_t* file, ioqueue_overlapped_t *ov, void *buf, unsigned int len, DWORD posLow, DWORD posHigh, ioqueue_on_read_callback on_read_callback, void *user_data) { ioqueue_readfilesome_overlapped_t *overlapped; BOOL rc; ioqueue_t *ioq; TOOLKIT_ASSERT(file); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_read_callback); ioq = file->owner; if (ioq->stop) return -1; overlapped = (ioqueue_readfilesome_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_readfilesome_overlapped_t)); fastlock_enter(file->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list); fastlock_leave(file->ov_pending_list_lock); overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL); overlapped->base.type = OV_READFILESOME; overlapped->base.user_data = user_data; overlapped->base.handle_ctx = (ioqueue_handle_context*)file; overlapped->base.ov.Offset = posLow; overlapped->base.ov.OffsetHigh = posHigh; overlapped->on_read_callback = on_read_callback; overlapped->buf = buf; inc_pending_io(file); rc = ReadFile(file->u.file, buf, (DWORD)len, NULL, &overlapped->base.ov); if (rc || GetLastError() == ERROR_IO_PENDING) return 0; fastlock_enter(file->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(file->ov_pending_list_lock); dec_pending_io(file); CloseHandle(overlapped->hevt); return -1; } TOOLKIT_API int ioqueue_file_async_readn_at(ioqueue_file_t* file, ioqueue_overlapped_t *ov, void *buf, unsigned int len, DWORD posLow, DWORD posHigh, ioqueue_on_read_callback on_read_cb, void *user_data) { ioqueue_readfilen_overlapped_t *overlapped; BOOL rc; ioqueue_t *ioq; TOOLKIT_ASSERT(file); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_read_cb); ioq = file->owner; if (ioq->stop) return -1; overlapped = (ioqueue_readfilen_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_readfilen_overlapped_t)); fastlock_enter(file->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list); fastlock_leave(file->ov_pending_list_lock); overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL); overlapped->base.type = OV_READFILEN; overlapped->base.user_data = user_data; overlapped->base.handle_ctx = (ioqueue_handle_context*)file; overlapped->base.ov.Offset = posLow; overlapped->base.ov.OffsetHigh = posHigh; overlapped->on_read_callback = on_read_cb; overlapped->buf = buf; overlapped->recved_bytes = 0; overlapped->total_bytes = len; inc_pending_io(file); rc = ReadFile(file->u.file, buf, (DWORD)len, NULL, &overlapped->base.ov); if (rc || GetLastError() == ERROR_IO_PENDING) return 0; fastlock_enter(file->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(file->ov_pending_list_lock); dec_pending_io(file); return -1; } TOOLKIT_API int ioqueue_file_readsome_at(ioqueue_file_t *file, void *buf, unsigned int len, DWORD posLow, DWORD posHigh) { OVERLAPPED ov; BOOL ret; DWORD dwTransferBytes; int rc = -1; /* (MSDN) Even if you have passed the function a file handle associated with a completion port and a valid OVERLAPPED structure, an application can prevent completion port notification. This is done by specifying a valid event handle for the hEvent member of the OVERLAPPED structure, and setting its low-order bit. A valid event handle whose low-order bit is set keeps I/O completion from being queued to the completion port. */ memset(&ov, 0, sizeof(ov)); ov.Offset = posLow; ov.OffsetHigh = posHigh; ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL); ov.hEvent = (HANDLE)((DWORD)ov.hEvent & 0x1); ret = ReadFile(file->u.file, buf, len, &dwTransferBytes, &ov); if (!ret && GetLastError() == ERROR_IO_PENDING) { ret = GetOverlappedResult(file->u.file, &ov, &dwTransferBytes, TRUE); } CloseHandle((HANDLE)((DWORD)ov.hEvent & ~1)); if (ret && dwTransferBytes > 0) rc = dwTransferBytes; return rc; } TOOLKIT_API int ioqueue_file_readn_at(ioqueue_file_t *file, void *buf, unsigned int len, DWORD posLow, DWORD posHigh) { OVERLAPPED ov; int rc = 0; DWORD left = len; DWORD offset = 0; memset(&ov, 0, sizeof(ov)); ov.Offset = posLow; ov.OffsetHigh = posHigh; ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL); ov.hEvent = (HANDLE)((DWORD)ov.hEvent & 0x1); while (left > 0) { BOOL ret; DWORD dwTransferBytes; ret = ReadFile(file->u.file, (char*)buf+offset, len, &dwTransferBytes, &ov); if (!ret && GetLastError() == ERROR_IO_PENDING) { ret = GetOverlappedResult(file->u.file, &ov, &dwTransferBytes, TRUE); } if (rc && dwTransferBytes) { offset += dwTransferBytes; left -= dwTransferBytes; ov.Internal = 0; ov.InternalHigh = 0; ov.Offset += dwTransferBytes; if (ov.Offset < dwTransferBytes) ov.OffsetHigh++; } else { rc = -1; break; } } CloseHandle((HANDLE)((DWORD)ov.hEvent & ~1)); return rc; } TOOLKIT_API int ioqueue_file_async_writesome(ioqueue_file_t* file, ioqueue_overlapped_t *ov, void* buf, unsigned int len, ioqueue_on_write_callback on_write_callback, void *user_data) { return ioqueue_file_async_writesome_at(file, ov, buf, len, 0, 0, on_write_callback, user_data); } TOOLKIT_API int ioqueue_file_async_writen(ioqueue_file_t* file, ioqueue_overlapped_t *overlapped, void* buf, unsigned int len, ioqueue_on_write_callback on_write_cb, void *user_data) { return ioqueue_file_async_writen_at(file, overlapped, buf, len, 0, 0, on_write_cb, user_data); } TOOLKIT_API int ioqueue_file_writesome(ioqueue_file_t* file, const void *buf, unsigned int len) { return ioqueue_file_writesome_at(file, buf, len, 0, 0); } TOOLKIT_API int ioqueue_file_writen(ioqueue_file_t* file, const void *buf, unsigned int len) { return ioqueue_file_writen_at(file, buf, len, 0, 0); } TOOLKIT_API int ioqueue_file_async_writesome_at(ioqueue_file_t* file, ioqueue_overlapped_t *ov, void* buf, unsigned int len, DWORD posLow, DWORD posHigh, ioqueue_on_write_callback on_write_callback, void *user_data) { ioqueue_writefilesome_overlapped_t *overlapped; BOOL rc; ioqueue_t *ioq; TOOLKIT_ASSERT(file); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_write_callback); ioq = file->owner; if (ioq->stop) return -1; overlapped = (ioqueue_writefilesome_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_writefilesome_overlapped_t)); fastlock_enter(file->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list); fastlock_leave(file->ov_pending_list_lock); overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL); overlapped->base.type = OV_WRITEFILESOME; overlapped->base.user_data = user_data; overlapped->base.handle_ctx = (ioqueue_handle_context*)file; overlapped->base.ov.Offset = posLow; overlapped->base.ov.OffsetHigh = posHigh; overlapped->on_write_callback = on_write_callback; overlapped->buf = buf; inc_pending_io(file); rc = WriteFile(file->u.file, buf, (DWORD)len, NULL, &overlapped->base.ov); if (rc || GetLastError() == ERROR_IO_PENDING) return 0; fastlock_enter(file->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(file->ov_pending_list_lock); dec_pending_io(file); CloseHandle(overlapped->hevt); return -1; } TOOLKIT_API int ioqueue_file_async_writen_at(ioqueue_file_t* file, ioqueue_overlapped_t *ov, void* buf, unsigned int len, DWORD posLow, DWORD posHigh, ioqueue_on_write_callback on_write_cb, void *user_data) { ioqueue_writefilen_overlapped_t *overlapped; BOOL rc; ioqueue_t *ioq; TOOLKIT_ASSERT(file); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(buf); TOOLKIT_ASSERT(on_write_cb); ioq = file->owner; if (ioq->stop) return -1; overlapped = (ioqueue_writefilen_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_writefilen_overlapped_t)); fastlock_enter(file->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list); fastlock_leave(file->ov_pending_list_lock); overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL); overlapped->base.type = OV_WRITEFILEN; overlapped->base.user_data = user_data; overlapped->base.handle_ctx = (ioqueue_handle_context*)file; overlapped->base.ov.Offset = posLow; overlapped->base.ov.OffsetHigh = posHigh; overlapped->on_write_callback = on_write_cb; overlapped->buf = buf; overlapped->sended_bytes = 0; overlapped->total_bytes = len; inc_pending_io(file); rc = WriteFile(file->u.file, buf, (DWORD)len, NULL, &overlapped->base.ov); if (rc || GetLastError() == ERROR_IO_PENDING) return 0; fastlock_enter(file->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(file->ov_pending_list_lock); dec_pending_io(file); CloseHandle(overlapped->hevt); return -1; } TOOLKIT_API int ioqueue_file_writesome_at(ioqueue_file_t* file, const void *buf, unsigned int len, DWORD posLow, DWORD posHigh) { OVERLAPPED ov; BOOL ret; DWORD dwTransferBytes; int rc = -1; memset(&ov, 0, sizeof(ov)); ov.Offset = posLow; ov.OffsetHigh = posHigh; ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL); ov.hEvent = (HANDLE)((DWORD)ov.hEvent & 0x1); ret = WriteFile(file->u.file, buf, len, &dwTransferBytes, &ov); if (!ret && GetLastError() == ERROR_IO_PENDING) { ret = GetOverlappedResult(file->u.file, &ov, &dwTransferBytes, TRUE); } CloseHandle((HANDLE)((DWORD)ov.hEvent & ~1)); if (ret && dwTransferBytes > 0) rc = dwTransferBytes; return rc; } TOOLKIT_API int ioqueue_file_writen_at(ioqueue_file_t* file, const void *buf, unsigned int len, DWORD posLow, DWORD posHigh) { OVERLAPPED ov; int rc = 0; DWORD offset = 0; DWORD left = len; memset(&ov, 0, sizeof(ov)); ov.Offset = posLow; ov.OffsetHigh = posHigh; ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL); ov.hEvent = (HANDLE)((DWORD)ov.hEvent & 0x1); while (left > 0) { BOOL ret; DWORD dwTransferBytes; ret = WriteFile(file->u.file, (char*)buf+offset, left, &dwTransferBytes, &ov); if (!ret && GetLastError() == ERROR_IO_PENDING) { ret = GetOverlappedResult(file->u.file, &ov, &dwTransferBytes, TRUE); } if (ret && dwTransferBytes > 0) { offset += dwTransferBytes; left -= dwTransferBytes; ov.Internal = 0; ov.InternalHigh = 0; ov.Offset += dwTransferBytes; if (ov.Offset < dwTransferBytes) ov.OffsetHigh ++; } else { rc = -1; break; } } CloseHandle((HANDLE)((DWORD)ov.hEvent & ~1)); return rc; } TOOLKIT_API ioqueue_t* ioqueue_file_get_owned_ioqueue(ioqueue_file_t* file) { TOOLKIT_ASSERT(file); return file->owner; } TOOLKIT_API HANDLE ioqueue_file_get_raw_handle(ioqueue_file_t* file) { TOOLKIT_ASSERT(file); return file->u.file; } TOOLKIT_API void *ioqueue_file_set_user_data(ioqueue_file_t* file, void* user_data) { void *old; TOOLKIT_ASSERT(file); old = file->user_data; file->user_data = user_data; return old; } TOOLKIT_API void *ioqueue_file_get_user_data(ioqueue_file_t* file) { TOOLKIT_ASSERT(file); return file->user_data; } TOOLKIT_API int ioqueue_file_cancel(ioqueue_file_t* file) { TOOLKIT_ASSERT(file); return CancelIo(file->u.file) ? 0 : -1; } /* pipe acceptor */ TOOLKIT_API int ioqueue_pipe_acceptor_create(ioqueue_t *ioq, const char *name, ioqueue_pipe_acceptor_t *acceptor) { TOOLKIT_ASSERT(ioq); TOOLKIT_ASSERT(name); TOOLKIT_ASSERT(acceptor); memset(acceptor, 0, sizeof(ioqueue_pipe_acceptor_t)); acceptor->u.pipe_name = strdup_printf("\\\\.\\pipe\\%s", name); acceptor->type = HANDLE_TYPE_PIPEACCEPTOR; acceptor->owner = ioq; fastlock_init(acceptor->ov_pending_list_lock); INIT_LIST_HEAD(&acceptor->ov_pending_list); add_handler_list(acceptor, ioq); inc_ref(ioqueue_handle_context, acceptor); return 0; } TOOLKIT_API void ioqueue_pipe_acceptor_destroy(ioqueue_pipe_acceptor_t *acceptor) { TOOLKIT_ASSERT(acceptor); dec_ref(ioqueue_handle_context, acceptor); } TOOLKIT_API ioqueue_t* ioqueue_pipe_acceptor_get_owned_ioqueue(ioqueue_pipe_acceptor_t *acceptor) { TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_PIPEACCEPTOR); return acceptor->owner; } TOOLKIT_API void *ioqueue_pipe_acceptor_set_user_data(ioqueue_pipe_acceptor_t *acceptor, void *user_data) { void *old; TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_PIPEACCEPTOR); old = acceptor->user_data; acceptor->user_data = user_data; return old; } TOOLKIT_API void *ioqueue_pipe_acceptor_get_user_data(ioqueue_pipe_acceptor_t *acceptor) { TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_ACCEPTOR); return acceptor->user_data; } TOOLKIT_API int ioqueue_pipe_acceptor_async_accept(ioqueue_pipe_acceptor_t *acceptor, ioqueue_overlapped_t *ov, ioqueue_on_pipe_accept_callback on_accept_callback, void *user_data) { ioqueue_t *ioq; ioqueue_connectpipe_overlapped_t *overlapped; BOOL ret; TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(ov); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_PIPEACCEPTOR); TOOLKIT_ASSERT(on_accept_callback); ioq = acceptor->owner; if (ioq->stop) return -1; overlapped = (ioqueue_connectpipe_overlapped_t*)ov; memset(overlapped, 0, sizeof(ioqueue_connectpipe_overlapped_t)); overlapped->client = CreateNamedPipeA(acceptor->u.pipe_name, PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE, PIPE_UNLIMITED_INSTANCES, 3072, 3072, NMPWAIT_WAIT_FOREVER, NULL); if (overlapped->client == INVALID_HANDLE_VALUE) return -1; if (!CreateIoCompletionPort(overlapped->client, ioq->iocp, 0, 0)) { CloseHandle(overlapped->client); return -1; } overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL); // must be use event, from MSDN overlapped->base.type = OV_CONNECTPIPE; overlapped->base.user_data = user_data; overlapped->base.handle_ctx = acceptor; overlapped->base.ov.hEvent = overlapped->hevt; fastlock_enter(acceptor->ov_pending_list_lock); list_add_tail(&overlapped->base.pending_entry, &acceptor->ov_pending_list); fastlock_leave(acceptor->ov_pending_list_lock); inc_pending_io(acceptor); overlapped->on_accept_callback = on_accept_callback; ret = ConnectNamedPipe(overlapped->client, &overlapped->base.ov); if (ret || GetLastError() == ERROR_IO_PENDING) return 0; fastlock_enter(acceptor->ov_pending_list_lock); list_del(&overlapped->base.pending_entry); fastlock_leave(acceptor->ov_pending_list_lock); dec_pending_io(acceptor); CloseHandle(overlapped->client); CloseHandle(overlapped->hevt); return -1; } TOOLKIT_API int ioqueue_pipe_acceptor_accept(ioqueue_pipe_acceptor_t *acceptor, HANDLE *p_pipe, int timeout) { ioqueue_t *ioq; HANDLE pipe; OVERLAPPED ov; BOOL ret; TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(p_pipe); TOOLKIT_ASSERT(acceptor->type == HANDLE_TYPE_PIPEACCEPTOR); ioq = acceptor->owner; if (ioq->stop) return -1; pipe = CreateNamedPipeA(acceptor->u.pipe_name, PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE, PIPE_UNLIMITED_INSTANCES, 3072, 3072, (DWORD)timeout, NULL); if (pipe == INVALID_HANDLE_VALUE) return -1; memset(&ov, 0, sizeof(ov)); ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL); ret = ConnectNamedPipe(pipe, &ov); CloseHandle(ov.hEvent); if (ret && CreateIoCompletionPort(pipe, ioq->iocp, 0, 0)) { *p_pipe = pipe; return 0; } else { CloseHandle(pipe); } return -1; } TOOLKIT_API int ioqueue_pipe_acceptor_create_client(ioqueue_pipe_acceptor_t *acceptor, HANDLE h, ioqueue_file_t *pipe) { ioqueue_t *ioq; TOOLKIT_ASSERT(acceptor); TOOLKIT_ASSERT(pipe); TOOLKIT_ASSERT(h != INVALID_HANDLE_VALUE); ioq = acceptor->owner; if (ioq->stop) return -1; memset(pipe, 0, sizeof(ioqueue_tcpsock_t)); pipe->type = HANDLE_TYPE_FILE; pipe->u.file = h; pipe->owner = ioq; INIT_LIST_HEAD(&pipe->ov_pending_list); add_handler_list(pipe, ioq); inc_ref(ioqueue_handle_context, pipe); return 0; } TOOLKIT_API int ioqueue_pipe_acceptor_cancel(ioqueue_pipe_acceptor_t *acceptor) { //..... TOOLKIT_ASSERT(0); return 0; } TOOLKIT_API int ioqueue_pipe_acceptor_close_pending_handle(ioqueue_pipe_acceptor_t *acceptor) { TOOLKIT_ASSERT(acceptor); fastlock_enter(acceptor->ov_pending_list_lock); { ioqueue_base_overlapped_t *pos; list_for_each_entry(pos, &acceptor->ov_pending_list, ioqueue_base_overlapped_t, pending_entry) { ioqueue_connectpipe_overlapped_t *overlapped = (ioqueue_connectpipe_overlapped_t *)pos; if (overlapped->client != INVALID_HANDLE_VALUE) { CloseHandle(overlapped->client); overlapped->client = INVALID_HANDLE_VALUE; } } } fastlock_leave(acceptor->ov_pending_list_lock); return 0; }