rpc_client.hpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <deque>
  5. #include <future>
  6. #include "use_asio.hpp"
  7. #include "client_util.hpp"
  8. #include "const_vars.h"
  9. #include "meta_util.hpp"
  10. using namespace rest_rpc::rpc_service;
  11. namespace rest_rpc {
  12. class req_result {
  13. public:
  14. req_result() = default;
  15. req_result(string_view data) : data_(data.data(), data.length()) {}
  16. bool success() const {
  17. return !has_error(data_);
  18. }
  19. template<typename T>
  20. T as() {
  21. if (has_error(data_)) {
  22. throw std::logic_error(get_error_msg(data_));
  23. }
  24. return get_result<T>(data_);
  25. }
  26. void as() {
  27. if (has_error(data_)) {
  28. throw std::logic_error(get_error_msg(data_));
  29. }
  30. }
  31. private:
  32. std::string data_;
  33. };
  34. enum class CallModel {
  35. future,
  36. callback
  37. };
  38. const constexpr auto FUTURE = CallModel::future;
  39. const constexpr size_t DEFAULT_TIMEOUT = 5000; //milliseconds
  40. class rpc_client : private asio::noncopyable {
  41. public:
  42. rpc_client() : socket_(ios_), work_(ios_),
  43. deadline_(ios_), body_(INIT_BUF_SIZE) {
  44. thd_ = std::make_shared<std::thread>([this] {
  45. ios_.run();
  46. });
  47. }
  48. rpc_client(const std::string& host, unsigned short port) : socket_(ios_), work_(ios_),
  49. deadline_(ios_), host_(host), port_(port), body_(INIT_BUF_SIZE) {
  50. thd_ = std::make_shared<std::thread>([this] {
  51. ios_.run();
  52. });
  53. }
  54. ~rpc_client() {
  55. close();
  56. stop();
  57. }
  58. void run(){
  59. thd_->join();
  60. }
  61. void set_connect_timeout(size_t milliseconds) {
  62. connect_timeout_ = milliseconds;
  63. }
  64. void set_reconnect_count(int reconnect_count) {
  65. reconnect_cnt_ = reconnect_count;
  66. }
  67. bool connect(size_t timeout = 1) {
  68. if (has_connected_)
  69. return true;
  70. assert(port_ != 0);
  71. async_connect();
  72. return wait_conn(timeout);
  73. }
  74. bool connect(const std::string& host, unsigned short port, size_t timeout = 1) {
  75. if (port_==0) {
  76. host_ = host;
  77. port_ = port;
  78. }
  79. return connect(timeout);
  80. }
  81. void async_connect(const std::string& host, unsigned short port) {
  82. if (port_ == 0) {
  83. host_ = host;
  84. port_ = port;
  85. }
  86. async_connect();
  87. }
  88. bool wait_conn(size_t timeout) {
  89. if (has_connected_) {
  90. return true;
  91. }
  92. has_wait_ = true;
  93. std::unique_lock<std::mutex> lock(conn_mtx_);
  94. bool result = conn_cond_.wait_for(lock, std::chrono::seconds(timeout),
  95. [this] {return has_connected_.load(); });
  96. has_wait_ = false;
  97. return has_connected_;
  98. }
  99. void enable_auto_reconnect(bool enable = true) {
  100. enable_reconnect_ = enable;
  101. }
  102. void enable_auto_heartbeat(bool enable = true) {
  103. if (enable) {
  104. reset_deadline_timer(5);
  105. }
  106. else {
  107. deadline_.cancel();
  108. }
  109. }
  110. void update_addr(const std::string& host, unsigned short port) {
  111. host_ = host;
  112. port_ = port;
  113. }
  114. void close() {
  115. has_connected_ = false;
  116. if (socket_.is_open()) {
  117. boost::system::error_code ignored_ec;
  118. socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec);
  119. socket_.close(ignored_ec);
  120. }
  121. clear_cache();
  122. }
  123. void set_error_callback(std::function<void(boost::system::error_code)> f) {
  124. err_cb_ = std::move(f);
  125. }
  126. uint64_t reqest_id() {
  127. return temp_req_id_;
  128. }
  129. bool has_connected() const {
  130. return has_connected_;
  131. }
  132. //sync call
  133. #if __cplusplus > 201402L
  134. template<size_t TIMEOUT, typename T = void, typename... Args>
  135. auto call(const std::string& rpc_name, Args&& ... args) {
  136. std::future<req_result> future = async_call<FUTURE>(rpc_name, std::forward<Args>(args)...);
  137. auto status = future.wait_for(std::chrono::milliseconds(TIMEOUT));
  138. if (status == std::future_status::timeout || status == std::future_status::deferred) {
  139. throw std::out_of_range("timeout or deferred");
  140. }
  141. if constexpr (std::is_void_v<T>) {
  142. future.get().as();
  143. }
  144. else {
  145. return future.get().as<T>();
  146. }
  147. }
  148. template<typename T = void, typename... Args>
  149. auto call(const std::string& rpc_name, Args&& ... args) {
  150. return call<DEFAULT_TIMEOUT, T>(rpc_name, std::forward<Args>(args)...);
  151. }
  152. #else
  153. template<size_t TIMEOUT, typename T=void, typename... Args>
  154. typename std::enable_if<std::is_void<T>::value>::type call(const std::string& rpc_name, Args&& ... args) {
  155. std::future<req_result> future = async_call<FUTURE>(rpc_name, std::forward<Args>(args)...);
  156. auto status = future.wait_for(std::chrono::milliseconds(TIMEOUT));
  157. if (status == std::future_status::timeout || status == std::future_status::deferred) {
  158. throw std::out_of_range("timeout or deferred");
  159. }
  160. future.get().as();
  161. }
  162. template<typename T = void, typename... Args>
  163. typename std::enable_if<std::is_void<T>::value>::type call(const std::string& rpc_name, Args&& ... args) {
  164. call<DEFAULT_TIMEOUT, T>(rpc_name, std::forward<Args>(args)...);
  165. }
  166. template<size_t TIMEOUT, typename T, typename... Args>
  167. typename std::enable_if<!std::is_void<T>::value, T>::type call(const std::string& rpc_name, Args&& ... args) {
  168. std::future<req_result> future = async_call<FUTURE>(rpc_name, std::forward<Args>(args)...);
  169. auto status = future.wait_for(std::chrono::milliseconds(TIMEOUT));
  170. if (status == std::future_status::timeout || status == std::future_status::deferred) {
  171. throw std::out_of_range("timeout or deferred");
  172. }
  173. return future.get().as<T>();
  174. }
  175. template<typename T, typename... Args>
  176. typename std::enable_if<!std::is_void<T>::value, T>::type call(const std::string& rpc_name, Args&& ... args) {
  177. return call<DEFAULT_TIMEOUT, T>(rpc_name, std::forward<Args>(args)...);
  178. }
  179. #endif
  180. template<CallModel model, typename... Args>
  181. std::future<req_result> async_call(const std::string& rpc_name, Args&&... args) {
  182. auto p = std::make_shared<std::promise<req_result>>();
  183. std::future<req_result> future = p->get_future();
  184. uint64_t fu_id = 0;
  185. {
  186. std::unique_lock<std::mutex> lock(cb_mtx_);
  187. fu_id_++;
  188. fu_id = fu_id_;
  189. future_map_.emplace(fu_id, std::move(p));
  190. }
  191. msgpack_codec codec;
  192. auto ret = codec.pack_args(rpc_name, std::forward<Args>(args)...);
  193. write(fu_id, request_type::req_res, std::move(ret));
  194. return future;
  195. }
  196. template<size_t TIMEOUT = DEFAULT_TIMEOUT, typename... Args>
  197. void async_call(const std::string& rpc_name, std::function<void(boost::system::error_code, string_view)> cb, Args&& ... args) {
  198. if (!has_connected_) {
  199. error_callback(boost::asio::error::make_error_code(boost::asio::error::not_connected));
  200. return;
  201. }
  202. uint64_t cb_id = 0;
  203. {
  204. std::unique_lock<std::mutex> lock(cb_mtx_);
  205. callback_id_++;
  206. callback_id_ |= (uint64_t(1) << 63);
  207. cb_id = callback_id_;
  208. auto call = std::make_shared<call_t>(ios_, std::move(cb), TIMEOUT);
  209. call->start_timer();
  210. callback_map_.emplace(cb_id, call);
  211. }
  212. msgpack_codec codec;
  213. auto ret = codec.pack_args(rpc_name, std::forward<Args>(args)...);
  214. write(cb_id, request_type::req_res, std::move(ret));
  215. }
  216. void stop() {
  217. if (thd_ != nullptr) {
  218. ios_.stop();
  219. thd_->join();
  220. thd_ = nullptr;
  221. }
  222. }
  223. template<typename Func>
  224. void subscribe(std::string key, Func f) {
  225. auto it = sub_map_.find(key);
  226. if (it != sub_map_.end()) {
  227. assert("duplicated subscribe");
  228. return;
  229. }
  230. sub_map_.emplace(key, std::move(f));
  231. send_subscribe(key, "");
  232. key_token_set_.emplace(std::move(key), "");
  233. }
  234. template<typename Func>
  235. void subscribe(std::string key, std::string token, Func f) {
  236. auto composite_key = key + token;
  237. auto it = sub_map_.find(composite_key);
  238. if (it != sub_map_.end()) {
  239. assert("duplicated subscribe");
  240. return;
  241. }
  242. sub_map_.emplace(std::move(composite_key), std::move(f));
  243. send_subscribe(key, token);
  244. key_token_set_.emplace(std::move(key), std::move(token));
  245. }
  246. template<typename T, size_t TIMEOUT = 3>
  247. void publish(std::string key, T&& t) {
  248. msgpack_codec codec;
  249. auto buf = codec.pack(std::move(t));
  250. call<TIMEOUT>("publish", std::move(key), "", std::string(buf.data(), buf.size()));
  251. }
  252. template<typename T, size_t TIMEOUT=3>
  253. void publish_by_token(std::string key, std::string token, T&& t) {
  254. msgpack_codec codec;
  255. auto buf = codec.pack(std::move(t));
  256. call<TIMEOUT>("publish_by_token", std::move(key), std::move(token), std::string(buf.data(), buf.size()));
  257. }
  258. private:
  259. void async_connect() {
  260. assert(port_ != 0);
  261. auto addr = boost::asio::ip::address::from_string(host_);
  262. socket_.async_connect({ addr, port_ }, [this](const boost::system::error_code& ec) {
  263. if (has_connected_) {
  264. return;
  265. }
  266. if (ec) {
  267. //std::cout << ec.message() << std::endl;
  268. has_connected_ = false;
  269. if (reconnect_cnt_ == 0) {
  270. return;
  271. }
  272. if (reconnect_cnt_ > 0) {
  273. reconnect_cnt_--;
  274. }
  275. async_reconnect();
  276. }
  277. else {
  278. //std::cout<<"connected ok"<<std::endl;
  279. has_connected_ = true;
  280. do_read();
  281. resend_subscribe();
  282. if (has_wait_)
  283. conn_cond_.notify_one();
  284. }
  285. });
  286. }
  287. void async_reconnect() {
  288. reset_socket();
  289. async_connect();
  290. std::this_thread::sleep_for(std::chrono::milliseconds(connect_timeout_));
  291. }
  292. void reset_deadline_timer(size_t timeout) {
  293. deadline_.expires_from_now(std::chrono::seconds(timeout));
  294. deadline_.async_wait([this, timeout](const boost::system::error_code& ec) {
  295. if (!ec) {
  296. if (has_connected_) {
  297. write(0, request_type::req_res, buffer_type(0));
  298. }
  299. }
  300. reset_deadline_timer(timeout);
  301. });
  302. }
  303. void write(std::uint64_t req_id, request_type type, buffer_type&& message) {
  304. size_t size = message.size();
  305. assert(size < MAX_BUF_LEN);
  306. client_message_type msg{ req_id, type, {message.release(), size} };
  307. std::unique_lock<std::mutex> lock(write_mtx_);
  308. outbox_.emplace_back(std::move(msg));
  309. if (outbox_.size() > 1) {
  310. // outstanding async_write
  311. return;
  312. }
  313. write();
  314. }
  315. void write() {
  316. auto& msg = outbox_[0];
  317. write_size_ = (uint32_t)msg.content.length();
  318. std::array<boost::asio::const_buffer, 4> write_buffers;
  319. write_buffers[0] = boost::asio::buffer(&write_size_, sizeof(int32_t));
  320. write_buffers[1] = boost::asio::buffer(&msg.req_id, sizeof(uint64_t));
  321. write_buffers[2] = boost::asio::buffer(&msg.req_type, sizeof(request_type));
  322. write_buffers[3] = boost::asio::buffer((char*)msg.content.data(), write_size_);
  323. boost::asio::async_write(socket_, write_buffers,
  324. [this](const boost::system::error_code& ec, const size_t length) {
  325. if (ec) {
  326. has_connected_ = false;
  327. close();
  328. error_callback(ec);
  329. return;
  330. }
  331. std::unique_lock<std::mutex> lock(write_mtx_);
  332. if (outbox_.empty()) {
  333. return;
  334. }
  335. ::free((char*)outbox_.front().content.data());
  336. outbox_.pop_front();
  337. if (!outbox_.empty()) {
  338. // more messages to send
  339. this->write();
  340. }
  341. });
  342. }
  343. void do_read() {
  344. boost::asio::async_read(socket_, boost::asio::buffer(head_),
  345. [this](const boost::system::error_code& ec, const size_t length) {
  346. if (!socket_.is_open()) {
  347. //LOG(INFO) << "socket already closed";
  348. has_connected_ = false;
  349. return;
  350. }
  351. if (!ec) {
  352. //const uint32_t body_len = *((uint32_t*)(head_));
  353. //auto req_id = *((std::uint64_t*)(head_ + sizeof(int32_t)));
  354. //auto req_type = *(request_type*)(head_ + sizeof(int32_t) + sizeof(int64_t));
  355. rpc_header* header = (rpc_header*)(head_);
  356. const uint32_t body_len = header->body_len;
  357. if (body_len > 0 && body_len < MAX_BUF_LEN) {
  358. if (body_.size() < body_len) { body_.resize(body_len); }
  359. read_body(header->req_id, header->req_type, body_len);
  360. return;
  361. }
  362. if (body_len == 0 || body_len > MAX_BUF_LEN) {
  363. //LOG(INFO) << "invalid body len";
  364. close();
  365. error_callback(asio::error::make_error_code(asio::error::message_size));
  366. return;
  367. }
  368. }
  369. else {
  370. //LOG(INFO) << ec.message();
  371. has_connected_ = false;
  372. close();
  373. error_callback(ec);
  374. }
  375. });
  376. }
  377. void read_body(std::uint64_t req_id, request_type req_type, size_t body_len) {
  378. boost::asio::async_read(
  379. socket_, boost::asio::buffer(body_.data(), body_len),
  380. [this, req_id, req_type, body_len](boost::system::error_code ec, std::size_t length) {
  381. //cancel_timer();
  382. if (!socket_.is_open()) {
  383. //LOG(INFO) << "socket already closed";
  384. call_back(req_id, asio::error::make_error_code(asio::error::connection_aborted), {});
  385. return;
  386. }
  387. if (!ec) {
  388. //entier body
  389. if (req_type == request_type::req_res) {
  390. call_back(req_id, ec, { body_.data(), body_len });
  391. }
  392. else if (req_type == request_type::sub_pub) {
  393. callback_sub(ec, { body_.data(), body_len });
  394. }
  395. else {
  396. close();
  397. error_callback(asio::error::make_error_code(asio::error::invalid_argument));
  398. return;
  399. }
  400. do_read();
  401. }
  402. else {
  403. //LOG(INFO) << ec.message();
  404. has_connected_ = false;
  405. close();
  406. error_callback(ec);
  407. }
  408. });
  409. }
  410. void send_subscribe(const std::string& key, const std::string& token) {
  411. msgpack_codec codec;
  412. auto ret = codec.pack_args(key, token);
  413. write(0, request_type::sub_pub, std::move(ret));
  414. }
  415. void resend_subscribe() {
  416. if (key_token_set_.empty())
  417. return;
  418. for (auto& pair : key_token_set_) {
  419. send_subscribe(pair.first, pair.second);
  420. }
  421. }
  422. void call_back(uint64_t req_id, const boost::system::error_code& ec, string_view data) {
  423. temp_req_id_ = req_id;
  424. auto cb_flag = req_id >> 63;
  425. if (cb_flag) {
  426. std::shared_ptr<call_t> cl = nullptr;
  427. {
  428. std::unique_lock<std::mutex> lock(cb_mtx_);
  429. cl = std::move(callback_map_[req_id]);
  430. }
  431. assert(cl);
  432. if (!cl->has_timeout()) {
  433. cl->cancel();
  434. cl->callback(ec, data);
  435. }
  436. else {
  437. cl->callback(asio::error::make_error_code(asio::error::timed_out), {});
  438. }
  439. std::unique_lock<std::mutex> lock(cb_mtx_);
  440. callback_map_.erase(req_id);
  441. }
  442. else {
  443. std::unique_lock<std::mutex> lock(cb_mtx_);
  444. auto& f = future_map_[req_id];
  445. if (ec) {
  446. //LOG<<ec.message();
  447. if (!f) {
  448. //std::cout << "invalid req_id" << std::endl;
  449. return;
  450. }
  451. }
  452. assert(f);
  453. f->set_value(req_result{ data });
  454. future_map_.erase(req_id);
  455. }
  456. }
  457. void callback_sub(const boost::system::error_code& ec, string_view result) {
  458. rpc_service::msgpack_codec codec;
  459. try {
  460. auto tp = codec.unpack<std::tuple<int, std::string, std::string>>(result.data(), result.size());
  461. auto code = std::get<0>(tp);
  462. auto& key = std::get<1>(tp);
  463. auto& data = std::get<2>(tp);
  464. auto it = sub_map_.find(key);
  465. if (it == sub_map_.end()) {
  466. return;
  467. }
  468. it->second(data);
  469. }
  470. catch (const std::exception& ex) {
  471. error_callback(asio::error::make_error_code(asio::error::invalid_argument));
  472. std::cout << ex.what() << "\n";
  473. }
  474. }
  475. void clear_cache() {
  476. {
  477. std::unique_lock<std::mutex> lock(write_mtx_);
  478. while (!outbox_.empty()) {
  479. ::free((char*)outbox_.front().content.data());
  480. outbox_.pop_front();
  481. }
  482. }
  483. {
  484. std::unique_lock<std::mutex> lock(cb_mtx_);
  485. callback_map_.clear();
  486. future_map_.clear();
  487. }
  488. }
  489. void reset_socket(){
  490. boost::system::error_code igored_ec;
  491. socket_.close(igored_ec);
  492. socket_ = decltype(socket_)(ios_);
  493. if(!socket_.is_open()){
  494. socket_.open(boost::asio::ip::tcp::v4());
  495. }
  496. }
  497. class call_t : asio::noncopyable, public std::enable_shared_from_this<call_t> {
  498. public:
  499. call_t(asio::io_service& ios, std::function<void(boost::system::error_code, string_view)> cb, size_t timeout) : timer_(ios),
  500. cb_(std::move(cb)), timeout_(timeout){
  501. }
  502. void start_timer() {
  503. if (timeout_ == 0) {
  504. return;
  505. }
  506. timer_.expires_from_now(std::chrono::milliseconds(timeout_));
  507. auto self = this->shared_from_this();
  508. timer_.async_wait([this, self](boost::system::error_code ec) {
  509. if (ec) {
  510. return;
  511. }
  512. has_timeout_ = true;
  513. });
  514. }
  515. void callback(boost::system::error_code ec, string_view data) {
  516. cb_(ec, data);
  517. }
  518. bool has_timeout() const {
  519. return has_timeout_;
  520. }
  521. void cancel() {
  522. if (timeout_ == 0) {
  523. return;
  524. }
  525. boost::system::error_code ec;
  526. timer_.cancel(ec);
  527. }
  528. private:
  529. boost::asio::steady_timer timer_;
  530. std::function<void(boost::system::error_code, string_view)> cb_;
  531. size_t timeout_;
  532. bool has_timeout_ = false;
  533. };
  534. void error_callback(const boost::system::error_code& ec) {
  535. if (err_cb_) {
  536. err_cb_(ec);
  537. }
  538. if (enable_reconnect_) {
  539. async_connect();
  540. }
  541. }
  542. void set_default_error_cb() {
  543. err_cb_ = [this](boost::system::error_code){
  544. async_connect();
  545. };
  546. }
  547. boost::asio::io_service ios_;
  548. asio::ip::tcp::socket socket_;
  549. boost::asio::io_service::work work_;
  550. std::shared_ptr<std::thread> thd_ = nullptr;
  551. std::string host_;
  552. unsigned short port_ = 0;
  553. size_t connect_timeout_ = 1000;//s
  554. int reconnect_cnt_ = -1;
  555. std::atomic_bool has_connected_ = { false };
  556. std::mutex conn_mtx_;
  557. std::condition_variable conn_cond_;
  558. bool has_wait_ = false;
  559. asio::steady_timer deadline_;
  560. struct client_message_type {
  561. std::uint64_t req_id;
  562. request_type req_type;
  563. string_view content;
  564. };
  565. std::deque<client_message_type> outbox_;
  566. uint32_t write_size_ = 0;
  567. std::mutex write_mtx_;
  568. uint64_t fu_id_ = 0;
  569. std::function<void(boost::system::error_code)> err_cb_;
  570. bool enable_reconnect_ = false;
  571. std::unordered_map<std::uint64_t, std::shared_ptr<std::promise<req_result>>> future_map_;
  572. std::unordered_map<std::uint64_t, std::shared_ptr<call_t>> callback_map_;
  573. std::mutex cb_mtx_;
  574. uint64_t callback_id_ = 0;
  575. uint64_t temp_req_id_ = 0;
  576. char head_[HEAD_LEN] = {};
  577. std::vector<char> body_;
  578. std::unordered_map<std::string, std::function<void(string_view)>> sub_map_;
  579. std::set<std::pair<std::string, std::string>> key_token_set_;
  580. };
  581. }