123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731 |
- #include "IOCPSocketServer.h"
- #include <list>
- #include <process.h>
- #include <assert.h>
- enum OperationType
- {
- OT_ACCEPT,
- OT_SEND,
- OT_RECEIVE,
- };
- struct CIOCPSocketServer::OperationContext : OVERLAPPED
- {
- int nConnectionID;
- SOCKET hSocket;
- OperationType eOperationType;
- WSABUF WsaBuf;
- char *pBuf;
- int nTotalBytes;
- int nLeftBytes;
-
- OperationContext(OperationType t, int id, SOCKET s):
- nConnectionID(id),
- hSocket(s),
- eOperationType(t),
- pBuf(NULL),
- nTotalBytes(0),
- nLeftBytes(0)
- {
- }
- ~OperationContext()
- {
- if (pBuf != NULL)
- {
- delete pBuf;
- pBuf = NULL;
- }
- }
- void PrepareBuffer(int nBufLen = 8192)
- {
- memset(this, 0, sizeof(OVERLAPPED));
- pBuf =new char[nBufLen];
- memset(pBuf, 0, nBufLen);
- nTotalBytes = nBufLen;
- nLeftBytes = nBufLen;
- WsaBuf.buf = pBuf;
- WsaBuf.len = nBufLen;
- }
- bool AddSendLen(int nTransferLen)
- {
- memset(this, 0, sizeof(OVERLAPPED));
-
- if (nLeftBytes >0)
- {
- nLeftBytes -= nTransferLen;
- WsaBuf.buf = pBuf + nTotalBytes - nLeftBytes;
- WsaBuf.len = nLeftBytes;
- }
-
- return nLeftBytes > 0;
- }
- void SetSendData(char *pData, int nLen)
- {
- memset(this, 0, sizeof(OVERLAPPED));
- if (NULL != pBuf)
- {
- delete[] pBuf;
- pBuf = NULL;
- }
- pBuf = new char[nLen];
- memcpy(pBuf, pData, nLen);
- nTotalBytes = nLeftBytes = nLen;
- WsaBuf.buf = pBuf;
- WsaBuf.len = nLen;
- }
- };
- struct CIOCPSocketServer::ConnectionContext
- {
- int nConnectionID;
- SOCKET hSocket;
- ConnectionContext(int nConnectionID, SOCKET hSocket):
- nConnectionID(nConnectionID),
- hSocket(hSocket)
- {
- }
- ~ConnectionContext()
- {
- if (hSocket != INVALID_SOCKET)
- {
- closesocket(hSocket);
- hSocket = INVALID_SOCKET;
- }
- }
- };
- CIOCPSocketServer::CIOCPSocketServer(int nMaxWorkThreadNum):
- m_dwSerialNo(0),
- m_hIOCP(INVALID_HANDLE_VALUE),
- m_hListenSocket(INVALID_SOCKET),
- m_pAcceptContext(NULL),
- m_nThreadCount(0),
- m_pWorkThreads(NULL),
- m_bExit(false),
- m_bStarted(false),
- m_fnAcceptEx(NULL),
- m_fnGetAcceptExSocketAddrs(NULL),
- m_nMaxWorkThreadNum(nMaxWorkThreadNum)
- {
- }
- CIOCPSocketServer::~CIOCPSocketServer(void)
- {
- CAutoLock lock(&m_LockObject);
- for(auto it = m_ConnectionContext.begin(); it!=m_ConnectionContext.end(); it++)
- {
- delete (*it).second;
- }
- if (m_pWorkThreads != NULL && m_nThreadCount >0)
- {
- for(int i=0; i<m_nThreadCount; i++)
- CloseHandle(m_pWorkThreads[i]);
- delete m_pWorkThreads;
- m_pWorkThreads = NULL;
- m_nThreadCount = 0;
- }
- if (m_hIOCP != INVALID_HANDLE_VALUE)
- {
- CloseHandle(m_hIOCP);
- m_hIOCP = INVALID_HANDLE_VALUE;
- }
- if (m_hListenSocket != INVALID_SOCKET)
- {
- closesocket(m_hListenSocket);
- m_hListenSocket = INVALID_SOCKET;
- }
- if (m_pAcceptContext != NULL)
- {
- delete m_pAcceptContext;
- m_pAcceptContext = NULL;
- }
- }
- bool CIOCPSocketServer::WinSockStartup()
- {
- WSADATA wsaData = {};
- return WSAStartup(0x202, &wsaData) == 0;
- }
- bool CIOCPSocketServer::WinSockCleanup()
- {
- return WSACleanup() == 0;
- }
- const char* CIOCPSocketServer::GetLastErrorMsg(int nErrorCode)
- {
- if (nErrorCode == 0)
- nErrorCode = GetLastError();
- char pMsg[1024];
- memset(pMsg, 0, sizeof(pMsg));
- if(::FormatMessage(
- FORMAT_MESSAGE_IGNORE_INSERTS | FORMAT_MESSAGE_FROM_SYSTEM,
- NULL,
- nErrorCode,
- MAKELANGID(LANG_NEUTRAL,SUBLANG_DEFAULT),
- pMsg,
- sizeof(pMsg)/sizeof(char),
- NULL))
- {
- pMsg[sizeof(pMsg)/sizeof(char)-1]=0;
- }
- m_strLastErrMsg = pMsg;
- return m_strLastErrMsg.c_str();
- }
- int CIOCPSocketServer::GetProcessorNum()
- {
- static int nProcessors = 0;
- if (0 == nProcessors)
- {
- SYSTEM_INFO si;
- GetSystemInfo(&si);
- nProcessors = si.dwNumberOfProcessors;
- }
- return nProcessors;
- }
- bool CIOCPSocketServer::StartListen(int nListenPort)
- {
- if (m_bStarted)
- return true;
- m_bExit = false;
- m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
- if (m_hIOCP == NULL)
- return false;
-
- UINT nThreadID(0);
- //Overlapped I/O follows the model established in Windows and can be performed only on
- //sockets created through the WSASocket function
- m_hListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- if (m_hListenSocket == INVALID_SOCKET)
- goto error;
- sockaddr_in addr;
- ZeroMemory((char *)&addr, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl(INADDR_ANY);
- addr.sin_port = htons(nListenPort);
- if (SOCKET_ERROR == bind(m_hListenSocket, (struct sockaddr *) &addr, sizeof(addr)))
- goto error;
- // bind to iocp
- if (CreateIoCompletionPort((HANDLE)m_hListenSocket, m_hIOCP, 0, 0) == NULL)
- goto error;
- if (SOCKET_ERROR == listen(m_hListenSocket,SOMAXCONN))
- goto error;
- // begin accept
- if (!AsyncAccept())
- goto error;
- // create work thread
- m_nThreadCount = m_nMaxWorkThreadNum >0 ? m_nMaxWorkThreadNum : 2 * GetProcessorNum();
- m_pWorkThreads = new HANDLE[m_nThreadCount];
- int i;
- for(i=0; i<m_nThreadCount; i++)
- {
- m_pWorkThreads[i] = (HANDLE)_beginthreadex(NULL, 0, &WorkThreadFunc, this, 0, &nThreadID);
- }
-
- m_bStarted = true;
- return true;
- error:
- if (m_hIOCP != INVALID_HANDLE_VALUE)
- {
- CloseHandle(m_hIOCP);
- m_hIOCP = INVALID_HANDLE_VALUE;
- }
- if (m_hListenSocket != INVALID_SOCKET)
- {
- closesocket(m_hListenSocket);
- m_hListenSocket = INVALID_SOCKET;
- }
- return false;
- }
- UINT CIOCPSocketServer::WorkThreadFunc(void *pArg)
- {
- CIOCPSocketServer *pThis = (CIOCPSocketServer*) pArg;
- while(pThis->PollCompletionPort());
- // Í˳öÏß³Ì
- _endthreadex(0);
- return 0;
- }
- bool CIOCPSocketServer::PollCompletionPort()
- {
- OperationContext *pOperationContext = NULL;
- DWORD dwTemp = 0;
- DWORD dwBytesTransfered = 0;
- BOOL bRet = GetQueuedCompletionStatus(
- m_hIOCP,
- &dwBytesTransfered,
- (LPDWORD)&dwTemp,
- (LPOVERLAPPED *)&pOperationContext,
- INFINITE);
- // should exit now
- if (pOperationContext == NULL)
- return false;
- if ((!bRet || (bRet && 0 == dwBytesTransfered)) && pOperationContext->eOperationType != OT_ACCEPT)
- {
- int nConnectionID = pOperationContext->nConnectionID;
- CloseConnection(nConnectionID);
- OnClose(nConnectionID);
-
- delete pOperationContext;
- return true;
- }
- switch (pOperationContext->eOperationType)
- {
- case OT_ACCEPT:
- ProcessAcceptCompleted(pOperationContext);
- break;
- case OT_SEND:
- ProcessSendCompleted(pOperationContext, dwBytesTransfered);
- break;
- case OT_RECEIVE:
- ProcessReceiveCompleted(pOperationContext, dwBytesTransfered);
- break;
- }
- return true;
- }
- bool CIOCPSocketServer::StopListen()
- {
- if (!m_bStarted)
- return false;
- m_bExit = true;
- m_bStarted = false;
-
- for (int i = 0; i < m_nThreadCount; i++)
- {
- //Help threads get out of blocking - GetQueuedCompletionStatus()
- PostQueuedCompletionStatus(m_hIOCP, 0, (DWORD) NULL, NULL);
- }
- WaitForMultipleObjects(m_nThreadCount, m_pWorkThreads, TRUE, INFINITE);
- for (int i = 0; i < m_nThreadCount; i++)
- {
- CloseHandle(m_pWorkThreads[i]);
- }
- delete[] m_pWorkThreads;
- m_pWorkThreads = NULL;
- if (m_pAcceptContext != NULL)
- {
- if (m_pAcceptContext->hSocket != INVALID_SOCKET)
- closesocket(m_pAcceptContext->hSocket);
- delete m_pAcceptContext;
- m_pAcceptContext = NULL;
- }
- SetNoLinger(m_hListenSocket);
- closesocket(m_hListenSocket);
- m_hListenSocket = INVALID_SOCKET;
- for(auto it = m_ConnectionContext.begin(); it != m_ConnectionContext.end(); it++)
- {
- delete (*it).second;
- }
- m_ConnectionContext.clear();
- CloseHandle(m_hIOCP);
- m_hIOCP = NULL;
- return true;
- }
- bool CIOCPSocketServer::AsyncAccept()
- {
- if (m_fnAcceptEx == NULL)
- {
- // Load the AcceptEx extension function from the provider for this socket
- GUID acceptex_guid = WSAID_ACCEPTEX;
-
- if (!LoadExtensionFunction(m_hListenSocket, acceptex_guid, (void**)&m_fnAcceptEx))
- {
- GetLastErrorMsg(WSAGetLastError());
- return false;
- }
- }
- if (m_pAcceptContext == NULL)
- {
- int nReceiveBufLen = 8192;
- m_pAcceptContext = new OperationContext(OT_ACCEPT, 0, INVALID_SOCKET);
- m_pAcceptContext->pBuf =new char[nReceiveBufLen];
- memset(m_pAcceptContext->pBuf, 0, nReceiveBufLen);
- m_pAcceptContext->nTotalBytes = m_pAcceptContext->nLeftBytes = nReceiveBufLen;
- m_pAcceptContext->WsaBuf.buf = m_pAcceptContext->pBuf;
- m_pAcceptContext->WsaBuf.len = nReceiveBufLen;
- }
- assert(m_pAcceptContext->hSocket == INVALID_SOCKET);
- m_pAcceptContext->hSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- memset(m_pAcceptContext, 0, sizeof(OVERLAPPED));
- DWORD dwRecvNumBytes(0);
- BOOL bRet = m_fnAcceptEx(m_hListenSocket,
- m_pAcceptContext->hSocket,
- (LPVOID)(m_pAcceptContext->pBuf),
- 0,
- GetAddressSize() + 16,
- GetAddressSize() + 16,
- &dwRecvNumBytes,
- m_pAcceptContext);
- if (bRet)
- {
- // Accept completed synchronously. We need to marshal the data recieved over to the
- // worker thread ourselves...
- //PostQueuedCompletionStatus(m_hIOCP, dwRecvNumBytes, 0, m_pAcceptContext);
- }
- else
- if (!bRet && ERROR_IO_PENDING != WSAGetLastError())
- {
- GetLastErrorMsg();
- closesocket(m_pAcceptContext->hSocket);
- m_pAcceptContext->hSocket = INVALID_SOCKET;
- return false;
- }
- return true;
- }
- bool CIOCPSocketServer::AsyncSend(int nConnectionID, char *pData, int nSendLen)
- {
- if (!m_bStarted)
- {
- m_strLastErrMsg = "service not started";
- return false;
- }
- if (m_bExit)
- {
- m_strLastErrMsg = "service has stopped";
- return false;
- }
- auto pConnectionContext = GetConnectionContext(nConnectionID);
- if (pConnectionContext == NULL)
- {
- m_strLastErrMsg = "connection not exist";
- return false;
- }
- auto pSendContext = new OperationContext(OT_SEND, nConnectionID, pConnectionContext->hSocket);
- pSendContext->SetSendData(pData, nSendLen);
- DWORD dwBytesSend(0);
- int nRet = WSASend(pSendContext->hSocket, &pSendContext->WsaBuf, 1, &dwBytesSend, 0, pSendContext, NULL);
- if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) )
- {
- delete pSendContext;
- GetLastErrorMsg();
- CloseConnection(nConnectionID);
- return false;
- }
- return true;
- }
- CIOCPSocketServer::ConnectionContext* CIOCPSocketServer::GetConnectionContext(int nConnectionID)
- {
- CAutoLock lock(&m_LockObject);
- auto it = m_ConnectionContext.find(nConnectionID);
- if (it == m_ConnectionContext.end())
- return NULL;
- return (*it).second;
- }
- bool CIOCPSocketServer::CloseConnection(int nConnectionID)
- {
- CAutoLock lock(&m_LockObject);
- auto it = m_ConnectionContext.find(nConnectionID);
- if (it == m_ConnectionContext.end())
- return false;
- auto pSocketConnection = (*it).second;
- m_ConnectionContext.erase(nConnectionID);
- delete pSocketConnection;
- return true;
- }
- void CIOCPSocketServer::ProcessAcceptCompleted(OperationContext *pOperationContext)
- {
- assert(NULL != pOperationContext);
- SOCKET hAcceptedSocket = pOperationContext->hSocket;
- if (!IsConnected(hAcceptedSocket))
- goto error;
- // When the AcceptEx function returns, the socket sAcceptSocket is
- // in the default state for a connected socket. The socket sAcceptSocket
- // does not inherit the properties of the socket associated with
- // sListenSocket parameter until SO_UPDATE_ACCEPT_CONTEXT is set on
- // the socket. Use the setsockopt function to set the SO_UPDATE_ACCEPT_CONTEXT
- // option, specifying sAcceptSocket as the socket handle and sListenSocket
- // as the option value.
- int nRet = setsockopt(
- hAcceptedSocket,
- SOL_SOCKET,
- SO_UPDATE_ACCEPT_CONTEXT,
- (char *)&m_hListenSocket,
- sizeof(m_hListenSocket)
- );
- if( nRet == SOCKET_ERROR )
- goto error;
- if (m_fnGetAcceptExSocketAddrs == NULL)
- {
- GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
- if (!LoadExtensionFunction(m_hListenSocket, guidGetAcceptExSockaddrs, (void**)&m_fnGetAcceptExSocketAddrs))
- goto error;
- }
- INT sizeOfLocalAddress = 0;
- INT sizeOfRemoteAddress = 0;
- sockaddr_in *pLocalAddress = 0;
- sockaddr_in *pRemoteAddress = 0;
-
- const size_t sizeOfAddress = GetAddressSize() + 16;
- m_fnGetAcceptExSocketAddrs(pOperationContext->pBuf, 0,
- sizeOfAddress,
- sizeOfAddress,
- (sockaddr**)&pLocalAddress,
- &sizeOfLocalAddress,
- (sockaddr**)&pRemoteAddress,
- &sizeOfRemoteAddress);
- // bind to iocp
- if (CreateIoCompletionPort((HANDLE)hAcceptedSocket, m_hIOCP, 0, 0) == NULL)
- goto error;
- // add into connection context map
- int nConnectionID = InterlockedIncrement(&m_dwSerialNo);
- ConnectionContext *pConnectionContext = new ConnectionContext(nConnectionID, hAcceptedSocket);
- {
- CAutoLock lock(&m_LockObject);
- m_ConnectionContext[nConnectionID] = pConnectionContext;
- pOperationContext->hSocket = INVALID_SOCKET;
- }
- // issue next accept
- AsyncAccept();
- // issue first receive
- auto pRecvContext = new OperationContext(OT_RECEIVE, nConnectionID, hAcceptedSocket);
- pRecvContext->PrepareBuffer();
- DWORD dwFlags(0), dwBytesRecvd(0);
- nRet = WSARecv(hAcceptedSocket, &pRecvContext->WsaBuf, 1, &dwBytesRecvd, &dwFlags, pRecvContext, NULL);
- //if (0 == nRet)
- //{
- // complete immediately
- //PostQueuedCompletionStatus(m_hIOCP, dwBytesRecvd, 0, pRecvContext);
- //}
- //else
- if (SOCKET_ERROR == nRet && WSAGetLastError() != ERROR_IO_PENDING)
- {
- delete pRecvContext;
- GetLastErrorMsg();
- CloseConnection(nConnectionID);
- goto error;
- }
- // OnAccepte
- OnAccepte(nConnectionID, inet_ntoa(pRemoteAddress->sin_addr), ntohs(pRemoteAddress->sin_port));
- return;
- error:
- GetLastErrorMsg();
- {
- CAutoLock lock(&m_LockObject);
- closesocket(pOperationContext->hSocket);
- pOperationContext->hSocket = INVALID_SOCKET;
- }
- }
- void CIOCPSocketServer::ProcessSendCompleted(OperationContext *pOperationContext, int nByteLen)
- {
- assert(nByteLen >0 && pOperationContext!=NULL);
-
- int nConnectionID = pOperationContext->nConnectionID;
-
- // OnSend event
- OnSend(nConnectionID, nByteLen);
- if (!pOperationContext->AddSendLen(nByteLen) || !IsConnected(pOperationContext->hSocket))
- {
- delete pOperationContext;
- return;
- }
- DWORD dwBytesSend(0);
- int nRet = WSASend(pOperationContext->hSocket,
- &pOperationContext->WsaBuf, 1,
- &dwBytesSend, 0,
- pOperationContext, NULL);
- //if (0 == nRet)
- //{
- // // complete immediately
- // PostQueuedCompletionStatus(m_hIOCP, dwBytesSend, 0, pNextSendContext);
- //}
- //else
- if (SOCKET_ERROR == nRet && WSAGetLastError() != ERROR_IO_PENDING)
- {
- delete pOperationContext;
- GetLastErrorMsg();
- CloseConnection(nConnectionID);
- }
- }
- void CIOCPSocketServer::ProcessReceiveCompleted(OperationContext *pOperationContext, int nByteLen)
- {
- assert(nByteLen >0);
- int nConnectionID = pOperationContext->nConnectionID;
- OnReceive(nConnectionID, pOperationContext->pBuf, nByteLen);
-
- if (! IsConnected(pOperationContext->hSocket))
- {
- delete pOperationContext;
- return;
- }
- DWORD dwFlags(0), dwBytesRecvd(0);
- memset(pOperationContext, 0, sizeof(OVERLAPPED));
- int nRet = WSARecv(pOperationContext->hSocket, &pOperationContext->WsaBuf, 1, &dwBytesRecvd, &dwFlags, pOperationContext, NULL);
- //if (0 == nRet)
- //{
- // // complete immediately
- // PostQueuedCompletionStatus(m_hIOCP, dwBytesRecvd, 0, pNextRecvContext);
- //}
- //else
- if (SOCKET_ERROR == nRet && WSAGetLastError() != ERROR_IO_PENDING)
- {
- delete pOperationContext;
- GetLastErrorMsg();
- CloseConnection(nConnectionID);
- }
- }
- DWORD CIOCPSocketServer::GetConnectTime(SOCKET hSocket)
- {
- if (INVALID_SOCKET != hSocket)
- {
- INT seconds;
- INT bytes = sizeof(seconds);
-
- if (NO_ERROR == ::getsockopt(
- hSocket,
- SOL_SOCKET, SO_CONNECT_TIME,
- (char *)&seconds,
- (PINT)&bytes))
- {
- return seconds;
- }
- else
- {
- GetLastErrorMsg(WSAGetLastError());
- }
- }
- //lint -e{569} Loss of information (return) (32 bits to 31 bits)
- return 0xFFFFFFFF;
- }
- bool CIOCPSocketServer::IsConnected(SOCKET hSocket)
- {
- return GetConnectTime(hSocket) != 0xFFFFFFFF;
- }
- size_t CIOCPSocketServer::GetAddressSize() const
- {
- return sizeof(SOCKADDR_IN);
- }
- bool CIOCPSocketServer::SetNoLinger(SOCKET hSocket)
- {
- // Force an abortive close.
- LINGER lingerStruct;
- lingerStruct.l_onoff = 1;
- lingerStruct.l_linger = 0;
- return SOCKET_ERROR != ::setsockopt(hSocket, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct));
- }
- bool CIOCPSocketServer::LoadExtensionFunction(SOCKET s, GUID functionID, void **ppFunc)
- {
- DWORD dwBytes = 0;
- bool ok = true;
- if (0 != WSAIoctl(
- s,
- SIO_GET_EXTENSION_FUNCTION_POINTER,
- &functionID,
- sizeof(GUID),
- ppFunc,
- sizeof(void *),
- &dwBytes,
- 0,
- 0))
- {
- ok = false;
- }
- return ok;
- }
|