123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- #include "stdafx.h"
- #define __STDC_LIMIT_MACROS 1
- #include "CSocketClient.h"
- #include <boost/thread.hpp>
- #include <boost/bind.hpp>
- #include <boost/random.hpp>
- #include "CModTools.h"
- #include <vector>
- #include <string>
- #include "publicFunExport.h"
- #if defined(RVC_OS_LINUX)
- #include <RestfulFunc.h>
- #endif
- using boost::asio::ip::tcp;
- #define DEFAULT_RCEV_LENGTH 1024*512
- bool checkHttpThreadFun(const std::string url) {
- #if defined(RVC_OS_WIN)
- return checkHttpActive(url.c_str());
- #else
- string msg;
- int curFlag = HttpProbe(url.c_str(), msg, 5);
- return curFlag > 0 && curFlag < 400;
- #endif // #if defined(RVC_OS_WIN)
- }
- std::pair<bool, std::string> DetectActiveHttp(std::vector<std::string> urlArr)
- {
- try {
- std::vector<boost::shared_future<bool>> threadArr;
- #if (defined _WIN32 || defined _WIN64)
- for each (const auto it in urlArr)
- #else
- for (const auto it : urlArr)
- #endif
- threadArr.emplace_back(boost::async(boost::bind(checkHttpThreadFun, it)));
- boost::this_thread::sleep_for(boost::chrono::seconds(1));
- //boost::wait_for_any(threadArr.begin(), threadArr.end());
- std::vector<std::string> vaildArr;
- for (size_t i = 0; i < threadArr.size(); i++)
- {
- if (threadArr[i].is_ready())
- {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Detect url %s %s", urlArr[i].c_str(), threadArr[i].get() ? "success" : "failed");
- if (threadArr[i].get())
- vaildArr.emplace_back(urlArr[i]);
- }
- else
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Detect url %s timeout", urlArr[i].c_str());
- }
- if (vaildArr.size() > 0)
- {
- boost::mt19937 rng(time(0));
- boost::uniform_int<> rdgen(0, vaildArr.size() - 1);
- auto pos = rdgen(rng);
- return std::make_pair(true, vaildArr[pos]);//随机数
- }
- else
- return std::make_pair(false, "");
- }
- catch (std::exception& e) {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("DetectActiveHttp exception, %s", e.what());
- return std::make_pair(false, "");
- }
- }
- namespace Chromium {
- CSocketClient::CSocketClient(CEntityBase* pEntity, unsigned int id)
- :m_vbuf(128), m_pNetThread(nullptr), mID(id)
- {
- m_pEntity = pEntity;
- }
- CSocketClient::CSocketClient(boost::asio::io_service& ios, const char* ipAddr,
- const char* port, CEntityBase* pEntity, unsigned int id) :
- m_ep(boost::asio::ip::address_v4::from_string(ipAddr), atoi(port)),
- m_buf(100, 0),
- m_pNetThread(nullptr),
- m_vbuf(128),
- mID(id),
- m_pEntity(pEntity)
- {
- DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient constructor ip=%s, port=%s", ipAddr, port);
- m_psocket = sock_ptr(new socket_type(ios));
- }
- CSocketClient::~CSocketClient() {
- // free(m_psocket);
- }
- ErrorCodeEnum CSocketClient::Connect() {
- boost::system::error_code ec;
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> connect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
- m_psocket->connect(m_ep, ec);
- if (ec)
- {
- // connect failed!
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> Connect : connect failed! Retry after 1s !");
- // retry to connect after 3s
- Sleep(1000);
- m_psocket->connect(m_ep, ec);
- if (ec)return Error_IO;
- }
- DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> connect succeed!");
- StartSocketService();
- return Error_Succeed;
- }
- ErrorCodeEnum CSocketClient::Close() {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> close");
- StopSocketService();
- boost::system::error_code ec;
- m_psocket->close(ec);
- handle_close(ec);
- return Error_Succeed;
- }
- ErrorCodeEnum CSocketClient::Write(CMessage* pMsg) {
- //DbgEx("CSocketClient method -> write, %s", hexdumpToString(pMsg->getPayload(), pMsg->getLength()).c_str());
- if (m_psocket && m_psocket->is_open())
- {
- //::DbgEx("m_psocket->async_send");
- m_psocket->async_send(boost::asio::buffer(pMsg->getPayload(), pMsg->getLength()), boost::bind(&this_type::handle_write, this, boost::asio::placeholders::error));
- }
- else {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> write error : socket is closed");
- return Error_IO;
- }
- //::DbgEx("Leave CSocketClient method -> write");
- return Error_Succeed;
- }
- ErrorCodeEnum CSocketClient::StartSocketService() {
- if (NULL == m_pNetThread)
- m_pNetThread = new boost::thread(boost::bind(&CSocketClient::thread_recv, this));
- return Error_Succeed;
- }
- ErrorCodeEnum CSocketClient::StopSocketService() {
- if (NULL != m_pNetThread)
- {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient -> StopSocketService doing interrupt");
- m_pNetThread->interrupt();
- m_pNetThread->join();
- m_pNetThread = NULL;
- }
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Socket Service stopped");
- return Error_Succeed;
- }
- void CSocketClient::handle_connect(const boost::system::error_code& err) {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> handle_connect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
- if (err)
- {
- // connect failed!
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> handle_connect : connect failed!");
- // retry to connect after 3s
- Sleep(3000);
- m_psocket->async_connect(m_ep, boost::bind(&this_type::handle_connect, this, boost::asio::placeholders::error));
- return;
- }
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> handle_connect : connect succeed! socket p = %ld", m_psocket);
- }
- void CSocketClient::handle_close(const boost::system::error_code& err) {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> handle_close : %s", err.message().c_str());
- return;
- }
- ErrorCodeEnum CSocketClient::Reconnect()
- {
- return Error_Succeed;
- }
- void CSocketClient::handle_write(const boost::system::error_code& err) {
- }
- ErrorCodeEnum CSocketClient::SetMessageHandler(ISocketCallback* obj) {
- this->mMessageHandler = boost::bind(&ISocketCallback::message_from_socket, obj, _1, _2);
- return Error_Succeed;
- }
- void CSocketClient::handle_read(const boost::system::error_code& err,
- const size_t bytes_transferred, CMessage& msg)
- {
- if (err)
- {
- // OnRecvError
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient handle_read something wrong!");
- return;
- }
- // 用来处理发送给websocket
- if (this->mMessageHandler != NULL)
- {
- auto f = boost::bind(mMessageHandler, msg, mID);
- boost::thread msgHandler(f);
- msgHandler.join();
- }
- //DbgEx("CSocketClient handle_read well!");
- }
- void CSocketClient::thread_recv() {
- //DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create thread:%s", __FUNCTION__);
- try
- {
- // time out for select
- timeval tvsec = { 1,0 };
- tvsec.tv_sec = 1; /* 1 second timeout */
- tvsec.tv_usec = 0; /* no microseconds. */
- fd_set read_flags;
- fd_set write_flags;
- CMessage msg(MAX_TRANSFER_LEN); //以能传输1MB数据为目标
- bool isFindMsg = true;
- while (!boost::this_thread::interruption_requested())
- {
- boost::this_thread::sleep(boost::posix_time::milliseconds(isFindMsg ? 1 : 5));
- boost::system::error_code ec;
- FD_ZERO(&read_flags);
- FD_ZERO(&write_flags);
- FD_SET(m_psocket->native_handle(), &read_flags);
- int sel = 0;
- try
- {
- sel = select(m_psocket->native_handle() + 1, &read_flags, &write_flags, NULL, &tvsec);
- }
- catch (const std::exception&)
- {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("select exception");
- }
- if (sel == 0) {
- // select timeout
- // DbgEx("select timeout");
- isFindMsg = false;
- }
- else if (sel < 0 && errno != EINTR) {
- // select error
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("select error sel = %d", sel);
- ErrorCodeEnum ec = (boost::this_thread::interruption_requested() ? ErrorCodeEnum::Error_Succeed :this->Reconnect());
- if (Error_Succeed != ec)
- {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Socket select error and reconnect failed. Restart this entity please!");
- break;
- }
- boost::this_thread::sleep(boost::posix_time::milliseconds(100));
- }
- else {
- msg.clear();//提升性能,避免析构
- isFindMsg = true;
- // select something
- size_t len = 0;
- try
- {
- len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData(), 4), 0, ec);
- }
- catch (...)
- {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("first read error, %s", hexdumpToString(msg.getWriteableData(), msg.getBufferLength()).c_str());
- msg.clear();
- return;
- }
- //DbgEx("select something from %d, buffer len = %d", sel, msg.getBufferLength());
- //DbgEx("first read len = %d", len);
- if (0 >= len || len > MAX_TRANSFER_LEN)
- {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("socket has been closed! cur Len: %d, may make the read buffer err");
- if (!boost::this_thread::interruption_requested())
- break;
-
- this->Reconnect();
- boost::this_thread::sleep(boost::posix_time::milliseconds(100));
- }
- try
- {
- len = 0;
- const size_t dreamLen = *(int*)(msg.getWriteableData()) + 4;
- //DbgEx("dreamLen = %d", dreamLen);
- do {
- const size_t recv_len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData() + 4 + len, dreamLen - len), 0, ec);
- if (!ec && recv_len >= 0)
- len += recv_len;
- else
- break;
- } while (dreamLen > len);
- }
- catch (...)
- {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("second read error, %s", hexdumpToString(msg.getWriteableData(), msg.getBufferLength()).c_str());
- msg.clear();
- return;
- }
- // 用来处理发送给websocket
- // todo fix select len = 0
- if (this->mMessageHandler != NULL && 0 != msg.getBufferLength())
- this->mMessageHandler(msg, mID);
- }
- }
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("warning ! : thread_recv end!");
- }
- catch (boost::thread_interrupted&)
- {
- DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Socket thread interrupted!");
- }
- }
- }
|