CSocketClient.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. #include "stdafx.h"
  2. #define __STDC_LIMIT_MACROS 1
  3. #include "CSocketClient.h"
  4. #include <boost/thread.hpp>
  5. #include <boost/bind.hpp>
  6. #include <boost/random.hpp>
  7. #include "CModTools.h"
  8. #include <vector>
  9. #include <string>
  10. #include "publicFunExport.h"
  11. #if defined(RVC_OS_LINUX)
  12. #include <RestfulFunc.h>
  13. #endif
  14. using boost::asio::ip::tcp;
  15. #define DEFAULT_RCEV_LENGTH 1024*512
  16. bool checkHttpThreadFun(const std::string url) {
  17. #if defined(RVC_OS_WIN)
  18. return checkHttpActive(url.c_str());
  19. #else
  20. string msg;
  21. int curFlag = HttpProbe(url.c_str(), msg, 5);
  22. return curFlag > 0 && curFlag < 400;
  23. #endif // #if defined(RVC_OS_WIN)
  24. }
  25. std::pair<bool, std::string> DetectActiveHttp(std::vector<std::string> urlArr)
  26. {
  27. try {
  28. std::vector<boost::shared_future<bool>> threadArr;
  29. #if (defined _WIN32 || defined _WIN64)
  30. for each (const auto it in urlArr)
  31. #else
  32. for (const auto it : urlArr)
  33. #endif
  34. threadArr.emplace_back(boost::async(boost::bind(checkHttpThreadFun, it)));
  35. boost::this_thread::sleep_for(boost::chrono::seconds(1));
  36. //boost::wait_for_any(threadArr.begin(), threadArr.end());
  37. std::vector<std::string> vaildArr;
  38. for (size_t i = 0; i < threadArr.size(); i++)
  39. {
  40. if (threadArr[i].is_ready())
  41. {
  42. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Detect url %s %s", urlArr[i].c_str(), threadArr[i].get() ? "success" : "failed");
  43. if (threadArr[i].get())
  44. vaildArr.emplace_back(urlArr[i]);
  45. }
  46. else
  47. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Detect url %s timeout", urlArr[i].c_str());
  48. }
  49. if (vaildArr.size() > 0)
  50. {
  51. boost::mt19937 rng(time(0));
  52. boost::uniform_int<> rdgen(0, vaildArr.size() - 1);
  53. auto pos = rdgen(rng);
  54. return std::make_pair(true, vaildArr[pos]);//随机数
  55. }
  56. else
  57. return std::make_pair(false, "");
  58. }
  59. catch (std::exception& e) {
  60. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("DetectActiveHttp exception, %s", e.what());
  61. return std::make_pair(false, "");
  62. }
  63. }
  64. namespace Chromium {
  65. CSocketClient::CSocketClient(CEntityBase* pEntity, unsigned int id)
  66. :m_vbuf(128), m_pNetThread(nullptr), mID(id)
  67. {
  68. m_pEntity = pEntity;
  69. }
  70. CSocketClient::CSocketClient(boost::asio::io_service& ios, const char* ipAddr,
  71. const char* port, CEntityBase* pEntity, unsigned int id) :
  72. m_ep(boost::asio::ip::address_v4::from_string(ipAddr), atoi(port)),
  73. m_buf(100, 0),
  74. m_pNetThread(nullptr),
  75. m_vbuf(128),
  76. mID(id),
  77. m_pEntity(pEntity)
  78. {
  79. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient constructor ip=%s, port=%s", ipAddr, port);
  80. m_psocket = sock_ptr(new socket_type(ios));
  81. }
  82. CSocketClient::~CSocketClient() {
  83. // free(m_psocket);
  84. }
  85. ErrorCodeEnum CSocketClient::Connect() {
  86. boost::system::error_code ec;
  87. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> connect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
  88. m_psocket->connect(m_ep, ec);
  89. if (ec)
  90. {
  91. // connect failed!
  92. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> Connect : connect failed! Retry after 1s !");
  93. // retry to connect after 3s
  94. Sleep(1000);
  95. m_psocket->connect(m_ep, ec);
  96. if (ec)return Error_IO;
  97. }
  98. DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> connect succeed!");
  99. StartSocketService();
  100. return Error_Succeed;
  101. }
  102. ErrorCodeEnum CSocketClient::Close() {
  103. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> close");
  104. StopSocketService();
  105. boost::system::error_code ec;
  106. m_psocket->close(ec);
  107. handle_close(ec);
  108. return Error_Succeed;
  109. }
  110. ErrorCodeEnum CSocketClient::Write(CMessage* pMsg) {
  111. //DbgEx("CSocketClient method -> write, %s", hexdumpToString(pMsg->getPayload(), pMsg->getLength()).c_str());
  112. if (m_psocket && m_psocket->is_open())
  113. {
  114. //::DbgEx("m_psocket->async_send");
  115. m_psocket->async_send(boost::asio::buffer(pMsg->getPayload(), pMsg->getLength()), boost::bind(&this_type::handle_write, this, boost::asio::placeholders::error));
  116. }
  117. else {
  118. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> write error : socket is closed");
  119. return Error_IO;
  120. }
  121. //::DbgEx("Leave CSocketClient method -> write");
  122. return Error_Succeed;
  123. }
  124. ErrorCodeEnum CSocketClient::StartSocketService() {
  125. if (NULL == m_pNetThread)
  126. m_pNetThread = new boost::thread(boost::bind(&CSocketClient::thread_recv, this));
  127. return Error_Succeed;
  128. }
  129. ErrorCodeEnum CSocketClient::StopSocketService() {
  130. if (NULL != m_pNetThread)
  131. {
  132. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient -> StopSocketService doing interrupt");
  133. m_pNetThread->interrupt();
  134. m_pNetThread->join();
  135. m_pNetThread = NULL;
  136. }
  137. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Socket Service stopped");
  138. return Error_Succeed;
  139. }
  140. void CSocketClient::handle_connect(const boost::system::error_code& err) {
  141. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> handle_connect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
  142. if (err)
  143. {
  144. // connect failed!
  145. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> handle_connect : connect failed!");
  146. // retry to connect after 3s
  147. Sleep(3000);
  148. m_psocket->async_connect(m_ep, boost::bind(&this_type::handle_connect, this, boost::asio::placeholders::error));
  149. return;
  150. }
  151. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> handle_connect : connect succeed! socket p = %ld", m_psocket);
  152. }
  153. void CSocketClient::handle_close(const boost::system::error_code& err) {
  154. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient method -> handle_close : %s", err.message().c_str());
  155. return;
  156. }
  157. ErrorCodeEnum CSocketClient::Reconnect()
  158. {
  159. return Error_Succeed;
  160. }
  161. void CSocketClient::handle_write(const boost::system::error_code& err) {
  162. }
  163. ErrorCodeEnum CSocketClient::SetMessageHandler(ISocketCallback* obj) {
  164. this->mMessageHandler = boost::bind(&ISocketCallback::message_from_socket, obj, _1, _2);
  165. return Error_Succeed;
  166. }
  167. void CSocketClient::handle_read(const boost::system::error_code& err,
  168. const size_t bytes_transferred, CMessage& msg)
  169. {
  170. if (err)
  171. {
  172. // OnRecvError
  173. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("CSocketClient handle_read something wrong!");
  174. return;
  175. }
  176. // 用来处理发送给websocket
  177. if (this->mMessageHandler != NULL)
  178. {
  179. auto f = boost::bind(mMessageHandler, msg, mID);
  180. boost::thread msgHandler(f);
  181. msgHandler.join();
  182. }
  183. //DbgEx("CSocketClient handle_read well!");
  184. }
  185. void CSocketClient::thread_recv() {
  186. //DbgWithLink(LOG_LEVEL_DEBUG, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("create thread:%s", __FUNCTION__);
  187. try
  188. {
  189. // time out for select
  190. timeval tvsec = { 1,0 };
  191. tvsec.tv_sec = 1; /* 1 second timeout */
  192. tvsec.tv_usec = 0; /* no microseconds. */
  193. fd_set read_flags;
  194. fd_set write_flags;
  195. CMessage msg(MAX_TRANSFER_LEN); //以能传输1MB数据为目标
  196. bool isFindMsg = true;
  197. while (!boost::this_thread::interruption_requested())
  198. {
  199. boost::this_thread::sleep(boost::posix_time::milliseconds(isFindMsg ? 1 : 5));
  200. boost::system::error_code ec;
  201. FD_ZERO(&read_flags);
  202. FD_ZERO(&write_flags);
  203. FD_SET(m_psocket->native_handle(), &read_flags);
  204. int sel = 0;
  205. try
  206. {
  207. sel = select(m_psocket->native_handle() + 1, &read_flags, &write_flags, NULL, &tvsec);
  208. }
  209. catch (const std::exception&)
  210. {
  211. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("select exception");
  212. }
  213. if (sel == 0) {
  214. // select timeout
  215. // DbgEx("select timeout");
  216. isFindMsg = false;
  217. }
  218. else if (sel < 0 && errno != EINTR) {
  219. // select error
  220. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("select error sel = %d", sel);
  221. ErrorCodeEnum ec = (boost::this_thread::interruption_requested() ? ErrorCodeEnum::Error_Succeed :this->Reconnect());
  222. if (Error_Succeed != ec)
  223. {
  224. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Socket select error and reconnect failed. Restart this entity please!");
  225. break;
  226. }
  227. boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  228. }
  229. else {
  230. msg.clear();//提升性能,避免析构
  231. isFindMsg = true;
  232. // select something
  233. size_t len = 0;
  234. try
  235. {
  236. len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData(), 4), 0, ec);
  237. }
  238. catch (...)
  239. {
  240. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("first read error, %s", hexdumpToString(msg.getWriteableData(), msg.getBufferLength()).c_str());
  241. msg.clear();
  242. return;
  243. }
  244. //DbgEx("select something from %d, buffer len = %d", sel, msg.getBufferLength());
  245. //DbgEx("first read len = %d", len);
  246. if (0 >= len || len > MAX_TRANSFER_LEN)
  247. {
  248. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("socket has been closed! cur Len: %d, may make the read buffer err");
  249. if (!boost::this_thread::interruption_requested())
  250. break;
  251. this->Reconnect();
  252. boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  253. }
  254. try
  255. {
  256. len = 0;
  257. const size_t dreamLen = *(int*)(msg.getWriteableData()) + 4;
  258. //DbgEx("dreamLen = %d", dreamLen);
  259. do {
  260. const size_t recv_len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData() + 4 + len, dreamLen - len), 0, ec);
  261. if (!ec && recv_len >= 0)
  262. len += recv_len;
  263. else
  264. break;
  265. } while (dreamLen > len);
  266. }
  267. catch (...)
  268. {
  269. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("second read error, %s", hexdumpToString(msg.getWriteableData(), msg.getBufferLength()).c_str());
  270. msg.clear();
  271. return;
  272. }
  273. // 用来处理发送给websocket
  274. // todo fix select len = 0
  275. if (this->mMessageHandler != NULL && 0 != msg.getBufferLength())
  276. this->mMessageHandler(msg, mID);
  277. }
  278. }
  279. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("warning ! : thread_recv end!");
  280. }
  281. catch (boost::thread_interrupted&)
  282. {
  283. DbgWithLink(LOG_LEVEL_INFO, LOG_TYPE_SYSTEM).setAPI(__FUNCTION__)("Socket thread interrupted!");
  284. }
  285. }
  286. }