CSocketClient.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. #include "stdafx.h"
  2. #define __STDC_LIMIT_MACROS 1
  3. #include "CSocketClient.h"
  4. #include <boost/thread.hpp>
  5. #include <boost/bind.hpp>
  6. #include <boost/random.hpp>
  7. #include "CModTools.h"
  8. #include <vector>
  9. #include <string>
  10. #include "portCheck/portCheck.h"
  11. using boost::asio::ip::tcp;
  12. #define DEFAULT_RCEV_LENGTH 1024*512
  13. bool checkHttpThreadFun(const std::string url) {
  14. return checkHttpActive(url.c_str());
  15. }
  16. std::pair<bool, std::string> DetectActiveHttp(std::vector<std::string> urlArr)
  17. {
  18. try {
  19. std::vector<boost::shared_future<bool>> threadArr;
  20. #if (defined _WIN32 || defined _WIN64)
  21. for each (const auto it in urlArr)
  22. #else
  23. for (const auto it : urlArr)
  24. #endif
  25. threadArr.push_back(boost::async(boost::bind(checkHttpThreadFun, it)));
  26. boost::this_thread::sleep_for(boost::chrono::seconds(1));
  27. //boost::wait_for_any(threadArr.begin(), threadArr.end());
  28. std::vector<std::string> vaildArr;
  29. for (size_t i = 0; i < threadArr.size(); i++)
  30. {
  31. if (threadArr[i].is_ready())
  32. {
  33. DbgEx("Detect url %s %s", urlArr[i].c_str(), threadArr[i].get() ? "sucess" : "failed");
  34. if (threadArr[i].get())
  35. vaildArr.push_back(urlArr[i]);
  36. }
  37. else
  38. DbgEx("Detect url %s timeout", urlArr[i].c_str());
  39. }
  40. if (vaildArr.size() > 0)
  41. {
  42. boost::mt19937 rng(time(0));
  43. boost::uniform_int<> rdgen(0, vaildArr.size() - 1);
  44. auto pos = rdgen(rng);
  45. return std::make_pair(true, vaildArr[pos]);//随机数
  46. }
  47. else
  48. return std::make_pair(false, "");
  49. }
  50. catch (std::exception& e) {
  51. DbgEx("DetectActiveHttp exception, %s", e.what());
  52. return std::make_pair(false, "");
  53. }
  54. }
  55. namespace Chromium {
  56. CSocketClient::CSocketClient(CEntityBase* pEntity, unsigned int id)
  57. :m_vbuf(128), m_pNetThread(nullptr), mID(id)
  58. {
  59. m_pEntity = pEntity;
  60. }
  61. CSocketClient::CSocketClient(boost::asio::io_service& ios, const char* ipAddr,
  62. const char* port, CEntityBase* pEntity, unsigned int id) :
  63. m_ep(boost::asio::ip::address_v4::from_string(ipAddr), atoi(port)),
  64. m_buf(100, 0),
  65. m_pNetThread(nullptr),
  66. m_vbuf(128),
  67. mID(id),
  68. m_pEntity(pEntity)
  69. {
  70. ::DbgEx("CSocketClient constructor ip=%s, port=%s", ipAddr, port);
  71. m_psocket = sock_ptr(new socket_type(ios));
  72. }
  73. CSocketClient::~CSocketClient() {
  74. // free(m_psocket);
  75. }
  76. ErrorCodeEnum CSocketClient::Connect() {
  77. boost::system::error_code ec;
  78. DbgEx("CSocketClient method -> connect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
  79. m_psocket->connect(m_ep, ec);
  80. if (ec)
  81. {
  82. // connect failed!
  83. DbgEx("CSocketClient method -> Connect : connect failed! Retry after 1s !");
  84. // retry to connect after 3s
  85. Sleep(1000);
  86. m_psocket->connect(m_ep, ec);
  87. if (ec)return Error_IO;
  88. }
  89. DbgEx("CSocketClient method -> connect succeed!");
  90. StartSocketService();
  91. return Error_Succeed;
  92. }
  93. ErrorCodeEnum CSocketClient::Close() {
  94. DbgEx("CSocketClient method -> close");
  95. StopSocketService();
  96. boost::system::error_code ec;
  97. m_psocket->close(ec);
  98. handle_close(ec);
  99. return Error_Succeed;
  100. }
  101. ErrorCodeEnum CSocketClient::Write(CMessage* pMsg) {
  102. DbgEx("CSocketClient method -> write, %s", hexdumpToString(pMsg->getPayload(), pMsg->getLength()).c_str());
  103. if (m_psocket && m_psocket->is_open())
  104. {
  105. ::DbgEx("m_psocket->async_send");
  106. m_psocket->async_send(boost::asio::buffer(pMsg->getPayload(), pMsg->getLength()), boost::bind(&this_type::handle_write, this, boost::asio::placeholders::error));
  107. }
  108. else {
  109. DbgEx("CSocketClient method -> write error : socket is closed");
  110. return Error_IO;
  111. }
  112. ::DbgEx("Leave CSocketClient method -> write");
  113. return Error_Succeed;
  114. }
  115. ErrorCodeEnum CSocketClient::StartSocketService() {
  116. if (NULL == m_pNetThread)
  117. {
  118. m_pNetThread = new boost::thread(boost::bind(&CSocketClient::thread_recv, this));
  119. }
  120. return Error_Succeed;
  121. }
  122. ErrorCodeEnum CSocketClient::StopSocketService() {
  123. if (NULL != m_pNetThread)
  124. {
  125. DbgEx("CSocketClient -> StopSocketService doing interrupt");
  126. m_pNetThread->interrupt();
  127. m_pNetThread->join();
  128. m_pNetThread = NULL;
  129. }
  130. DbgEx("Socket Service stopped");
  131. return Error_Succeed;
  132. }
  133. void CSocketClient::handle_connect(const boost::system::error_code& err) {
  134. DbgEx("CSocketClient method -> handle_connect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
  135. if (err)
  136. {
  137. // connect failed!
  138. DbgEx("CSocketClient method -> handle_connect : connect failed!");
  139. // retry to connect after 3s
  140. Sleep(3000);
  141. m_psocket->async_connect(m_ep, boost::bind(&this_type::handle_connect, this, boost::asio::placeholders::error));
  142. return;
  143. }
  144. DbgEx("CSocketClient method -> handle_connect : connect succeed! socket p = %ld", m_psocket);
  145. DbgEx("CSocketClient method -> handle_connect end");
  146. }
  147. void CSocketClient::handle_close(const boost::system::error_code& err) {
  148. DbgEx("CSocketClient method -> handle_close : %s", err.message().c_str());
  149. return;
  150. }
  151. ErrorCodeEnum CSocketClient::Reconnect()
  152. {
  153. boost::system::error_code ec;
  154. DbgEx("CSocketClient method -> reconnect thread_id = %ld, object=%ld", boost::this_thread::get_id(), this);
  155. CModTools::get_mutable_instance().RestartProxyServer();
  156. m_psocket->connect(m_ep, ec);
  157. if (ec)
  158. {
  159. // connect failed!
  160. DbgEx("CSocketClient method -> Reconnect : connect failed! Retry after 1s ! error message = %s", ec.message());
  161. // retry to connect after 3s
  162. Sleep(1000);
  163. m_psocket->connect(m_ep, ec);
  164. if (ec)return Error_IO;
  165. }
  166. DbgEx("CSocketClient method -> reconnect succeed!");
  167. return Error_Succeed;
  168. }
  169. void CSocketClient::handle_write(const boost::system::error_code& err) {
  170. DbgEx("CSocketClient method -> handle_write %s!", err ? "error" : "succeed");
  171. }
  172. ErrorCodeEnum CSocketClient::SetMessageHandler(ISocketCallback* obj) {
  173. this->mMessageHandler = boost::bind(&ISocketCallback::message_from_socket, obj, _1, _2);
  174. return Error_Succeed;
  175. }
  176. void CSocketClient::handle_read(const boost::system::error_code& err,
  177. const size_t bytes_transferred, CMessage& msg)
  178. {
  179. DbgEx("CSocketClient handle_read bytes_transferred=%d", bytes_transferred);
  180. if (err)
  181. {
  182. // OnRecvError
  183. DbgEx("CSocketClient handle_read something wrong!");
  184. return;
  185. }
  186. // 用来处理发送给websocket
  187. if (this->mMessageHandler != NULL)
  188. {
  189. auto f = boost::bind(mMessageHandler, msg, mID);
  190. boost::thread msgHandler(f);
  191. msgHandler.join();
  192. //this->mMessageHandler(msg, mID);
  193. }
  194. DbgEx("CSocketClient handle_read well!");
  195. }
  196. std::string CSocketClient::hexdumpToString(const char* buf, const int num)
  197. {
  198. char str[8192 * 2] = { 0 };
  199. int i = 0;
  200. char c[5] = { 0 };
  201. if (num > 1200)
  202. {
  203. for (i = 0; i < 50; i++)
  204. {
  205. sprintf(c, "%02X ", (unsigned char)buf[i]);
  206. strcat(str, c);
  207. }
  208. return CSimpleStringA::Format("buffer too long to show!show pre 50 hex! CSocketClient hex buf len = %d : %s", num, str).GetData();
  209. }
  210. for (i = 0; i < num; i++)
  211. {
  212. sprintf(c, "%02X ", (unsigned char)buf[i]);
  213. strcat(str, c);
  214. }
  215. return CSimpleStringA::Format("CSocketClient hex buf len = %d : %s", num, str).GetData();
  216. }
  217. void CSocketClient::hexdump(const char* buf, const int num) {
  218. auto hexStr = hexdumpToString(buf, num);
  219. DbgEx(hexStr.c_str());
  220. }
  221. void CSocketClient::thread_recv() {
  222. try
  223. {
  224. // time out for select
  225. timeval tvsec = { 1,0 };
  226. tvsec.tv_sec = 1; /* 1 second timeout */
  227. tvsec.tv_usec = 0; /* no microseconds. */
  228. fd_set read_flags;
  229. fd_set write_flags;
  230. CMessage msg(MAX_TRANSFER_LEN); //以能传输1MB数据为目标
  231. bool isFindMsg = true;
  232. while (true)
  233. {
  234. boost::this_thread::sleep(boost::posix_time::milliseconds(isFindMsg ? 1 : 5));
  235. boost::system::error_code ec;
  236. FD_ZERO(&read_flags);
  237. FD_ZERO(&write_flags);
  238. FD_SET(m_psocket->native_handle(), &read_flags);
  239. int sel = 0;
  240. try
  241. {
  242. sel = select(m_psocket->native_handle() + 1, &read_flags, &write_flags, NULL, &tvsec);
  243. }
  244. catch (const std::exception&)
  245. {
  246. DbgEx("select exception");
  247. }
  248. if (sel == 0) {
  249. // select timeout
  250. // DbgEx("select timeout");
  251. isFindMsg = false;
  252. }
  253. else if (sel < 0 && errno != EINTR) {
  254. // select error
  255. DbgEx("select error sel = %d", sel);
  256. ErrorCodeEnum ec = this->Reconnect();
  257. if (Error_Succeed != ec)
  258. {
  259. DbgEx("Socket select error and reconnect failed. Restart this entity please!");
  260. break;
  261. }
  262. boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  263. }
  264. else {
  265. msg.clear();//提升性能,避免析构
  266. isFindMsg = true;
  267. // select something
  268. //DbgEx("select something, buffer len = %d", msg.getBufferLength());
  269. size_t len = 0;
  270. try
  271. {
  272. len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData(), 4), 0, ec);
  273. }
  274. catch (...)
  275. {
  276. DbgEx("first read error, %s", hexdumpToString(msg.getWriteableData(), msg.getBufferLength()).c_str());
  277. msg.clear();
  278. return;
  279. }
  280. //DbgEx("first read len = %d", len);
  281. if (0 >= len)
  282. {
  283. DbgEx("socket has been closed!");
  284. this->Close();
  285. boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  286. }
  287. try
  288. {
  289. len = 0;
  290. const size_t dreamLen = *(int*)(msg.getWriteableData()) + 4;
  291. do {
  292. #if (defined _WIN32 || defined _WIN64)
  293. len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData() + 4, *(int*)(msg.getWriteableData()) + 4), 0, ec);
  294. #else
  295. const size_t recv_len = m_psocket->receive(boost::asio::buffer(msg.getWriteableData() + 4 + len, dreamLen - len), 0, ec);
  296. if (!ec && recv_len >= 0)
  297. len += recv_len;
  298. else
  299. break;
  300. #endif
  301. } while (dreamLen > len);
  302. }
  303. catch (...)
  304. {
  305. DbgEx("second read error, %s", hexdumpToString(msg.getWriteableData(), msg.getBufferLength()).c_str());
  306. msg.clear();
  307. return;
  308. }
  309. // 用来处理发送给websocket
  310. // todo fix select len = 0
  311. if (this->mMessageHandler != NULL && 0 != msg.getBufferLength())
  312. {
  313. //DbgEx("pre mMessageHandler");
  314. this->mMessageHandler(msg, mID);
  315. //DbgEx("after mMessageHandler");
  316. }
  317. }
  318. }
  319. DbgEx("warning ! : thread_recv end!");
  320. }
  321. catch (boost::thread_interrupted&)
  322. {
  323. DbgEx("Socket thread interrupted!");
  324. }
  325. }
  326. }