#pragma once #include #include #include #include #include "use_asio.hpp" #include "client_util.hpp" #include "const_vars.h" #include "meta_util.hpp" using namespace rest_rpc::rpc_service; namespace rest_rpc { class req_result { public: req_result() = default; req_result(string_view data) : data_(data.data(), data.length()) {} bool success() const { return !has_error(data_); } template T as() { if (has_error(data_)) { throw std::logic_error(get_error_msg(data_)); } return get_result(data_); } void as() { if (has_error(data_)) { throw std::logic_error(get_error_msg(data_)); } } private: std::string data_; }; enum class CallModel { future, callback }; const constexpr auto FUTURE = CallModel::future; const constexpr size_t DEFAULT_TIMEOUT = 5000; //milliseconds class rpc_client : private asio::noncopyable { public: rpc_client() : socket_(ios_), work_(ios_), deadline_(ios_), body_(INIT_BUF_SIZE) { thd_ = std::make_shared([this] { ios_.run(); }); } rpc_client(const std::string& host, unsigned short port) : socket_(ios_), work_(ios_), deadline_(ios_), host_(host), port_(port), body_(INIT_BUF_SIZE) { thd_ = std::make_shared([this] { ios_.run(); }); } ~rpc_client() { close(); stop(); } void run(){ thd_->join(); } void set_connect_timeout(size_t milliseconds) { connect_timeout_ = milliseconds; } void set_reconnect_count(int reconnect_count) { reconnect_cnt_ = reconnect_count; } bool connect(size_t timeout = 1) { if (has_connected_) return true; assert(port_ != 0); async_connect(); return wait_conn(timeout); } bool connect(const std::string& host, unsigned short port, size_t timeout = 1) { if (port_==0) { host_ = host; port_ = port; } return connect(timeout); } void async_connect(const std::string& host, unsigned short port) { if (port_ == 0) { host_ = host; port_ = port; } async_connect(); } bool wait_conn(size_t timeout) { if (has_connected_) { return true; } has_wait_ = true; std::unique_lock lock(conn_mtx_); bool result = conn_cond_.wait_for(lock, std::chrono::seconds(timeout), [this] {return has_connected_.load(); }); has_wait_ = false; return has_connected_; } void enable_auto_reconnect(bool enable = true) { enable_reconnect_ = enable; } void enable_auto_heartbeat(bool enable = true) { if (enable) { reset_deadline_timer(5); } else { deadline_.cancel(); } } void update_addr(const std::string& host, unsigned short port) { host_ = host; port_ = port; } void close() { has_connected_ = false; if (socket_.is_open()) { boost::system::error_code ignored_ec; socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec); socket_.close(ignored_ec); } clear_cache(); } void set_error_callback(std::function f) { err_cb_ = std::move(f); } uint64_t reqest_id() { return temp_req_id_; } bool has_connected() const { return has_connected_; } //sync call #if __cplusplus > 201402L template auto call(const std::string& rpc_name, Args&& ... args) { std::future future = async_call(rpc_name, std::forward(args)...); auto status = future.wait_for(std::chrono::milliseconds(TIMEOUT)); if (status == std::future_status::timeout || status == std::future_status::deferred) { throw std::out_of_range("timeout or deferred"); } if constexpr (std::is_void_v) { future.get().as(); } else { return future.get().as(); } } template auto call(const std::string& rpc_name, Args&& ... args) { return call(rpc_name, std::forward(args)...); } #else template typename std::enable_if::value>::type call(const std::string& rpc_name, Args&& ... args) { std::future future = async_call(rpc_name, std::forward(args)...); auto status = future.wait_for(std::chrono::milliseconds(TIMEOUT)); if (status == std::future_status::timeout || status == std::future_status::deferred) { throw std::out_of_range("timeout or deferred"); } future.get().as(); } template typename std::enable_if::value>::type call(const std::string& rpc_name, Args&& ... args) { call(rpc_name, std::forward(args)...); } template typename std::enable_if::value, T>::type call(const std::string& rpc_name, Args&& ... args) { std::future future = async_call(rpc_name, std::forward(args)...); auto status = future.wait_for(std::chrono::milliseconds(TIMEOUT)); if (status == std::future_status::timeout || status == std::future_status::deferred) { throw std::out_of_range("timeout or deferred"); } return future.get().as(); } template typename std::enable_if::value, T>::type call(const std::string& rpc_name, Args&& ... args) { return call(rpc_name, std::forward(args)...); } #endif template std::future async_call(const std::string& rpc_name, Args&&... args) { auto p = std::make_shared>(); std::future future = p->get_future(); uint64_t fu_id = 0; { std::unique_lock lock(cb_mtx_); fu_id_++; fu_id = fu_id_; future_map_.emplace(fu_id, std::move(p)); } msgpack_codec codec; auto ret = codec.pack_args(rpc_name, std::forward(args)...); write(fu_id, request_type::req_res, std::move(ret)); return future; } template void async_call(const std::string& rpc_name, std::function cb, Args&& ... args) { if (!has_connected_) { error_callback(boost::asio::error::make_error_code(boost::asio::error::not_connected)); return; } uint64_t cb_id = 0; { std::unique_lock lock(cb_mtx_); callback_id_++; callback_id_ |= (uint64_t(1) << 63); cb_id = callback_id_; auto call = std::make_shared(ios_, std::move(cb), TIMEOUT); call->start_timer(); callback_map_.emplace(cb_id, call); } msgpack_codec codec; auto ret = codec.pack_args(rpc_name, std::forward(args)...); write(cb_id, request_type::req_res, std::move(ret)); } void stop() { if (thd_ != nullptr) { ios_.stop(); thd_->join(); thd_ = nullptr; } } template void subscribe(std::string key, Func f) { auto it = sub_map_.find(key); if (it != sub_map_.end()) { assert("duplicated subscribe"); return; } sub_map_.emplace(key, std::move(f)); send_subscribe(key, ""); key_token_set_.emplace(std::move(key), ""); } template void subscribe(std::string key, std::string token, Func f) { auto composite_key = key + token; auto it = sub_map_.find(composite_key); if (it != sub_map_.end()) { assert("duplicated subscribe"); return; } sub_map_.emplace(std::move(composite_key), std::move(f)); send_subscribe(key, token); key_token_set_.emplace(std::move(key), std::move(token)); } template void publish(std::string key, T&& t) { msgpack_codec codec; auto buf = codec.pack(std::move(t)); call("publish", std::move(key), "", std::string(buf.data(), buf.size())); } template void publish_by_token(std::string key, std::string token, T&& t) { msgpack_codec codec; auto buf = codec.pack(std::move(t)); call("publish_by_token", std::move(key), std::move(token), std::string(buf.data(), buf.size())); } private: void async_connect() { assert(port_ != 0); auto addr = boost::asio::ip::address::from_string(host_); socket_.async_connect({ addr, port_ }, [this](const boost::system::error_code& ec) { if (has_connected_) { return; } if (ec) { //std::cout << ec.message() << std::endl; has_connected_ = false; if (reconnect_cnt_ == 0) { return; } if (reconnect_cnt_ > 0) { reconnect_cnt_--; } async_reconnect(); } else { //std::cout<<"connected ok"< lock(write_mtx_); outbox_.emplace_back(std::move(msg)); if (outbox_.size() > 1) { // outstanding async_write return; } write(); } void write() { auto& msg = outbox_[0]; write_size_ = (uint32_t)msg.content.length(); std::array write_buffers; write_buffers[0] = boost::asio::buffer(&write_size_, sizeof(int32_t)); write_buffers[1] = boost::asio::buffer(&msg.req_id, sizeof(uint64_t)); write_buffers[2] = boost::asio::buffer(&msg.req_type, sizeof(request_type)); write_buffers[3] = boost::asio::buffer((char*)msg.content.data(), write_size_); boost::asio::async_write(socket_, write_buffers, [this](const boost::system::error_code& ec, const size_t length) { if (ec) { has_connected_ = false; close(); error_callback(ec); return; } std::unique_lock lock(write_mtx_); if (outbox_.empty()) { return; } ::free((char*)outbox_.front().content.data()); outbox_.pop_front(); if (!outbox_.empty()) { // more messages to send this->write(); } }); } void do_read() { boost::asio::async_read(socket_, boost::asio::buffer(head_), [this](const boost::system::error_code& ec, const size_t length) { if (!socket_.is_open()) { //LOG(INFO) << "socket already closed"; has_connected_ = false; return; } if (!ec) { //const uint32_t body_len = *((uint32_t*)(head_)); //auto req_id = *((std::uint64_t*)(head_ + sizeof(int32_t))); //auto req_type = *(request_type*)(head_ + sizeof(int32_t) + sizeof(int64_t)); rpc_header* header = (rpc_header*)(head_); const uint32_t body_len = header->body_len; if (body_len > 0 && body_len < MAX_BUF_LEN) { if (body_.size() < body_len) { body_.resize(body_len); } read_body(header->req_id, header->req_type, body_len); return; } if (body_len == 0 || body_len > MAX_BUF_LEN) { //LOG(INFO) << "invalid body len"; close(); error_callback(asio::error::make_error_code(asio::error::message_size)); return; } } else { //LOG(INFO) << ec.message(); has_connected_ = false; close(); error_callback(ec); } }); } void read_body(std::uint64_t req_id, request_type req_type, size_t body_len) { boost::asio::async_read( socket_, boost::asio::buffer(body_.data(), body_len), [this, req_id, req_type, body_len](boost::system::error_code ec, std::size_t length) { //cancel_timer(); if (!socket_.is_open()) { //LOG(INFO) << "socket already closed"; call_back(req_id, asio::error::make_error_code(asio::error::connection_aborted), {}); return; } if (!ec) { //entier body if (req_type == request_type::req_res) { call_back(req_id, ec, { body_.data(), body_len }); } else if (req_type == request_type::sub_pub) { callback_sub(ec, { body_.data(), body_len }); } else { close(); error_callback(asio::error::make_error_code(asio::error::invalid_argument)); return; } do_read(); } else { //LOG(INFO) << ec.message(); has_connected_ = false; close(); error_callback(ec); } }); } void send_subscribe(const std::string& key, const std::string& token) { msgpack_codec codec; auto ret = codec.pack_args(key, token); write(0, request_type::sub_pub, std::move(ret)); } void resend_subscribe() { if (key_token_set_.empty()) return; for (auto& pair : key_token_set_) { send_subscribe(pair.first, pair.second); } } void call_back(uint64_t req_id, const boost::system::error_code& ec, string_view data) { temp_req_id_ = req_id; auto cb_flag = req_id >> 63; if (cb_flag) { std::shared_ptr cl = nullptr; { std::unique_lock lock(cb_mtx_); cl = std::move(callback_map_[req_id]); } assert(cl); if (!cl->has_timeout()) { cl->cancel(); cl->callback(ec, data); } else { cl->callback(asio::error::make_error_code(asio::error::timed_out), {}); } std::unique_lock lock(cb_mtx_); callback_map_.erase(req_id); } else { std::unique_lock lock(cb_mtx_); auto& f = future_map_[req_id]; if (ec) { //LOG<set_value(req_result{ data }); future_map_.erase(req_id); } } void callback_sub(const boost::system::error_code& ec, string_view result) { rpc_service::msgpack_codec codec; try { auto tp = codec.unpack>(result.data(), result.size()); auto code = std::get<0>(tp); auto& key = std::get<1>(tp); auto& data = std::get<2>(tp); auto it = sub_map_.find(key); if (it == sub_map_.end()) { return; } it->second(data); } catch (const std::exception& ex) { error_callback(asio::error::make_error_code(asio::error::invalid_argument)); std::cout << ex.what() << "\n"; } } void clear_cache() { { std::unique_lock lock(write_mtx_); while (!outbox_.empty()) { ::free((char*)outbox_.front().content.data()); outbox_.pop_front(); } } { std::unique_lock lock(cb_mtx_); callback_map_.clear(); future_map_.clear(); } } void reset_socket(){ boost::system::error_code igored_ec; socket_.close(igored_ec); socket_ = decltype(socket_)(ios_); if(!socket_.is_open()){ socket_.open(boost::asio::ip::tcp::v4()); } } class call_t : asio::noncopyable, public std::enable_shared_from_this { public: call_t(asio::io_service& ios, std::function cb, size_t timeout) : timer_(ios), cb_(std::move(cb)), timeout_(timeout){ } void start_timer() { if (timeout_ == 0) { return; } timer_.expires_from_now(std::chrono::milliseconds(timeout_)); auto self = this->shared_from_this(); timer_.async_wait([this, self](boost::system::error_code ec) { if (ec) { return; } has_timeout_ = true; }); } void callback(boost::system::error_code ec, string_view data) { cb_(ec, data); } bool has_timeout() const { return has_timeout_; } void cancel() { if (timeout_ == 0) { return; } boost::system::error_code ec; timer_.cancel(ec); } private: boost::asio::steady_timer timer_; std::function cb_; size_t timeout_; bool has_timeout_ = false; }; void error_callback(const boost::system::error_code& ec) { if (err_cb_) { err_cb_(ec); } if (enable_reconnect_) { async_connect(); } } void set_default_error_cb() { err_cb_ = [this](boost::system::error_code){ async_connect(); }; } boost::asio::io_service ios_; asio::ip::tcp::socket socket_; boost::asio::io_service::work work_; std::shared_ptr thd_ = nullptr; std::string host_; unsigned short port_ = 0; size_t connect_timeout_ = 1000;//s int reconnect_cnt_ = -1; std::atomic_bool has_connected_ = { false }; std::mutex conn_mtx_; std::condition_variable conn_cond_; bool has_wait_ = false; asio::steady_timer deadline_; struct client_message_type { std::uint64_t req_id; request_type req_type; string_view content; }; std::deque outbox_; uint32_t write_size_ = 0; std::mutex write_mtx_; uint64_t fu_id_ = 0; std::function err_cb_; bool enable_reconnect_ = false; std::unordered_map>> future_map_; std::unordered_map> callback_map_; std::mutex cb_mtx_; uint64_t callback_id_ = 0; uint64_t temp_req_id_ = 0; char head_[HEAD_LEN] = {}; std::vector body_; std::unordered_map> sub_map_; std::set> key_token_set_; }; }