123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- #include "stdafx.h"
- #define __STDC_LIMIT_MACROS 1
- #include "CSocketClient.h"
- #include <boost/thread.hpp>
- #include <boost/bind.hpp>
- #include <boost/random.hpp>
- #include "CModTools.h"
- #include <vector>
- #include <string>
- #include "portCheck/portCheck.h"
- using boost::asio::ip::tcp;
- #define DEFAULT_RCEV_LENGTH 1024*512
- bool checkHttpThreadFun(const std::string url) {
- return checkHttpActive(url.c_str());
- }
- std::pair<bool, std::string> DetectActiveHttp(std::vector<std::string> urlArr)
- {
- try {
- std::vector<boost::shared_future<bool>> threadArr;
- #if (defined _WIN32 || defined _WIN64)
- for each (const auto it in urlArr)
- #else
- for (const auto it : urlArr)
- #endif
- threadArr.push_back(boost::async(boost::bind(checkHttpThreadFun, it)));
- boost::this_thread::sleep_for(boost::chrono::seconds(1));
- //boost::wait_for_any(threadArr.begin(), threadArr.end());
- std::vector<std::string> vaildArr;
- for (size_t i = 0; i < threadArr.size(); i++)
- {
- if (threadArr[i].is_ready())
- {
- DbgEx("Detect url %s %s", urlArr[i].c_str(), threadArr[i].get() ? "sucess" : "failed");
- if (threadArr[i].get())
- vaildArr.push_back(urlArr[i]);
- }
- else
- DbgEx("Detect url %s timeout", urlArr[i].c_str());
- }
- if (vaildArr.size() > 0)
- {
- boost::mt19937 rng(time(0));
- boost::uniform_int<> rdgen(0, vaildArr.size() - 1);
- auto pos = rdgen(rng);
- return std::make_pair(true, vaildArr[pos]);//随机数
- }
- else
- return std::make_pair(false, "");
- }
- catch (std::exception& e) {
- DbgEx("DetectActiveHttp exception, %s", e.what());
- return std::make_pair(false, "");
- }
- }
- namespace Chromium {
- CSocketClient::CSocketClient(CEntityBase* pEntity, unsigned int id)
- :m_vbuf(128), m_pNetThread(nullptr), mID(id)
- {
- m_pEntity = pEntity;
- }
- CSocketClient::CSocketClient(boost::asio::io_service& ios, const char* ipAddr,
- const char* port, CEntityBase* pEntity, unsigned int id) :
- m_ep(boost::asio::ip::address_v4::from_string(ipAddr), atoi(port)),
- m_buf(100, 0),
- m_pNetThread(nullptr),
- m_vbuf(128),
- mID(id),
- m_pEntity(pEntity)
- {
- ::DbgEx("CSocketClient constructor ip=%s, port=%s", ipAddr, port);
- m_psocket = sock_ptr(new socket_type(ios));
- }
- CSocketClient::~CSocketClient() {
- // free(m_psocket);
- }
- ErrorCodeEnum CSocketClient::Connect() {
- boost::system::error_code ec;
- DbgEx("CSocketClient method -> connect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
- m_psocket->connect(m_ep, ec);
- if (ec)
- {
- // connect failed!
- DbgEx("CSocketClient method -> Connect : connect failed! Retry after 1s !");
- // retry to connect after 3s
- Sleep(1000);
- m_psocket->connect(m_ep, ec);
- if (ec)return Error_IO;
- }
- DbgEx("CSocketClient method -> connect succeed!");
- StartSocketService();
- return Error_Succeed;
- }
- ErrorCodeEnum CSocketClient::Close() {
- DbgEx("CSocketClient method -> close");
- StopSocketService();
- boost::system::error_code ec;
- m_psocket->close(ec);
- handle_close(ec);
- return Error_Succeed;
- }
- ErrorCodeEnum CSocketClient::Write(CMessage* pMsg) {
- DbgEx("CSocketClient method -> write, %s", hexdumpToString(pMsg->getPayload(), pMsg->getLength()).c_str());
- if (m_psocket && m_psocket->is_open())
- {
- ::DbgEx("m_psocket->async_send");
- m_psocket->async_send(boost::asio::buffer(pMsg->getPayload(), pMsg->getLength()), boost::bind(&this_type::handle_write, this, boost::asio::placeholders::error));
- }
- else {
- DbgEx("CSocketClient method -> write error : socket is closed");
- return Error_IO;
- }
- ::DbgEx("Leave CSocketClient method -> write");
- return Error_Succeed;
- }
- ErrorCodeEnum CSocketClient::StartSocketService() {
- if (NULL == m_pNetThread)
- {
- m_pNetThread = new boost::thread(boost::bind(&CSocketClient::thread_recv, this));
- }
- return Error_Succeed;
- }
- ErrorCodeEnum CSocketClient::StopSocketService() {
- if (NULL != m_pNetThread)
- {
- DbgEx("CSocketClient -> StopSocketService doing interrupt");
- m_pNetThread->interrupt();
- m_pNetThread->join();
- m_pNetThread = NULL;
- }
- DbgEx("Socket Service stopped");
- return Error_Succeed;
- }
- void CSocketClient::handle_connect(const boost::system::error_code& err) {
- DbgEx("CSocketClient method -> handle_connect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
- if (err)
- {
- // connect failed!
- DbgEx("CSocketClient method -> handle_connect : connect failed!");
- // retry to connect after 3s
- Sleep(3000);
- m_psocket->async_connect(m_ep, boost::bind(&this_type::handle_connect, this, boost::asio::placeholders::error));
- return;
- }
- DbgEx("CSocketClient method -> handle_connect : connect succeed! socket p = %ld", m_psocket);
- DbgEx("CSocketClient method -> handle_connect end");
- }
- void CSocketClient::handle_close(const boost::system::error_code& err) {
- DbgEx("CSocketClient method -> handle_close : %s", err.message().c_str());
- return;
- }
- ErrorCodeEnum CSocketClient::Reconnect()
- {
- boost::system::error_code ec;
- DbgEx("CSocketClient method -> reconnect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
- CModTools::get_mutable_instance().RestartProxyServer();
- m_psocket->connect(m_ep, ec);
- if (ec)
- {
- // connect failed!
- DbgEx("CSocketClient method -> Reconnect : connect failed! Retry after 1s ! error message = %s", ec.message());
- // retry to connect after 3s
- Sleep(1000);
- m_psocket->connect(m_ep, ec);
- if (ec)return Error_IO;
- }
- DbgEx("CSocketClient method -> reconnect succeed!");
- return Error_Succeed;
- }
- void CSocketClient::handle_write(const boost::system::error_code& err) {
- DbgEx("CSocketClient method -> handle_write %s!", err ? "error" : "succeed");
- }
- ErrorCodeEnum CSocketClient::SetMessageHandler(ISocketCallback* obj) {
- this->mMessageHandler = boost::bind(&ISocketCallback::message_from_socket, obj, _1, _2);
- return Error_Succeed;
- }
- void CSocketClient::handle_read(const boost::system::error_code& err,
- const size_t bytes_transferred, CMessage& msg)
- {
- DbgEx("CSocketClient handle_read bytes_transferred=%d", bytes_transferred);
- if (err)
- {
- // OnRecvError
- DbgEx("CSocketClient handle_read something wrong!");
- return;
- }
- // 用来处理发送给websocket
- if (this->mMessageHandler != NULL)
- {
- auto f = boost::bind(mMessageHandler, msg, mID);
- boost::thread msgHandler(f);
- msgHandler.join();
- //this->mMessageHandler(msg, mID);
- }
- DbgEx("CSocketClient handle_read well!");
- }
- std::string CSocketClient::hexdumpToString(const char* buf, const int num)
- {
- char str[8192 * 2] = { 0 };
- int i = 0;
- char c[5] = { 0 };
- if (num > 1200)
- {
- for (i = 0; i < 50; i++)
- {
- sprintf(c, "%02X ", (unsigned char)buf[i]);
- strcat(str, c);
- }
- return CSimpleStringA::Format("buffer too long to show!show pre 50 hex! CSocketClient hex buf len = %d : %s", num, str).GetData();
- }
- for (i = 0; i < num; i++)
- {
- sprintf(c, "%02X ", (unsigned char)buf[i]);
- strcat(str, c);
- }
- return CSimpleStringA::Format("CSocketClient hex buf len = %d : %s", num, str).GetData();
- }
- void CSocketClient::hexdump(const char* buf, const int num) {
- auto hexStr = hexdumpToString(buf, num);
- DbgEx(hexStr.c_str());
- }
- void CSocketClient::thread_recv() {
- try
- {
- // time out for select
- timeval tvsec = { 1,0 };
- tvsec.tv_sec = 1; /* 1 second timeout */
- tvsec.tv_usec = 0; /* no microseconds. */
- fd_set read_flags;
- fd_set write_flags;
- CMessage msg(MAX_TRANSFER_LEN); //以能传输1MB数据为目标
- bool isFindMsg = true;
- while (true)
- {
- boost::this_thread::sleep(boost::posix_time::milliseconds(isFindMsg ? 1 : 5));
- boost::system::error_code ec;
- FD_ZERO(&read_flags);
- FD_ZERO(&write_flags);
- FD_SET(m_psocket->native_handle(), &read_flags);
- int sel = 0;
- try
- {
- sel = select(m_psocket->native_handle() + 1, &read_flags, &write_flags, NULL, &tvsec);
- }
- catch (const std::exception&)
- {
- DbgEx("select exception");
- }
- if (sel == 0) {
- // select timeout
- // DbgEx("select timeout");
- isFindMsg = false;
- }
- else if (sel < 0 && errno != EINTR) {
- // select error
- DbgEx("select error sel = %d", sel);
- ErrorCodeEnum ec = this->Reconnect();
- if (Error_Succeed != ec)
- {
- DbgEx("Socket select error and reconnect failed. Restart this entity please!");
- break;
- }
- boost::this_thread::sleep(boost::posix_time::milliseconds(100));
- }
- else {
- msg.clear();//提升性能,避免析构
- isFindMsg = true;
- // select something
- //DbgEx("select something, buffer len = %d", msg.getBufferLength());
- size_t len = 0;
- try
- {
- len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData(), 4), 0, ec);
- }
- catch (...)
- {
- DbgEx("first read error, %s", hexdumpToString(msg.getWriteableData(), msg.getBufferLength()).c_str());
- msg.clear();
- return;
- }
- //DbgEx("first read len = %d", len);
- if (0 >= len)
- {
- DbgEx("socket has been closed!");
- this->Close();
- boost::this_thread::sleep(boost::posix_time::milliseconds(100));
- }
- try
- {
- len = 0;
- const size_t dreamLen = *(int*)(msg.getWriteableData()) + 4;
- do {
- #if (defined _WIN32 || defined _WIN64)
- len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData() + 4, *(int*)(msg.getWriteableData()) + 4), 0, ec);
- #else
- const size_t recv_len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData() + 4 + len, dreamLen - len), 0, ec);
- if (!ec && recv_len >= 0)
- len += recv_len;
- else
- break;
- #endif
- } while (dreamLen > len);
- }
- catch (...)
- {
- DbgEx("second read error, %s", hexdumpToString(msg.getWriteableData(), msg.getBufferLength()).c_str());
- msg.clear();
- return;
- }
- // 用来处理发送给websocket
- // todo fix select len = 0
- if (this->mMessageHandler != NULL && 0 != msg.getBufferLength())
- {
- //DbgEx("pre mMessageHandler");
- this->mMessageHandler(msg, mID);
- //DbgEx("after mMessageHandler");
- }
- }
- }
- DbgEx("warning ! : thread_recv end!");
- }
- catch (boost::thread_interrupted&)
- {
- DbgEx("Socket thread interrupted!");
- }
- }
- }
|