123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689 |
- #pragma once
- #include <iostream>
- #include <string>
- #include <deque>
- #include <future>
- #include "use_asio.hpp"
- #include "client_util.hpp"
- #include "const_vars.h"
- #include "meta_util.hpp"
- using namespace rest_rpc::rpc_service;
- namespace rest_rpc {
- class req_result {
- public:
- req_result() = default;
- req_result(string_view data) : data_(data.data(), data.length()) {}
- bool success() const {
- return !has_error(data_);
- }
- template<typename T>
- T as() {
- if (has_error(data_)) {
- throw std::logic_error(get_error_msg(data_));
- }
- return get_result<T>(data_);
- }
- void as() {
- if (has_error(data_)) {
- throw std::logic_error(get_error_msg(data_));
- }
- }
- private:
- std::string data_;
- };
- enum class CallModel {
- future,
- callback
- };
- const constexpr auto FUTURE = CallModel::future;
- const constexpr size_t DEFAULT_TIMEOUT = 5000; //milliseconds
- class rpc_client : private asio::noncopyable {
- public:
- rpc_client() : socket_(ios_), work_(ios_),
- deadline_(ios_), body_(INIT_BUF_SIZE) {
- thd_ = std::make_shared<std::thread>([this] {
- ios_.run();
- });
- }
- rpc_client(const std::string& host, unsigned short port) : socket_(ios_), work_(ios_),
- deadline_(ios_), host_(host), port_(port), body_(INIT_BUF_SIZE) {
- thd_ = std::make_shared<std::thread>([this] {
- ios_.run();
- });
- }
- ~rpc_client() {
- close();
- stop();
- }
- void run(){
- thd_->join();
- }
- void set_connect_timeout(size_t milliseconds) {
- connect_timeout_ = milliseconds;
- }
- void set_reconnect_count(int reconnect_count) {
- reconnect_cnt_ = reconnect_count;
- }
- bool connect(size_t timeout = 1) {
- if (has_connected_)
- return true;
- assert(port_ != 0);
- async_connect();
- return wait_conn(timeout);
- }
- bool connect(const std::string& host, unsigned short port, size_t timeout = 1) {
- if (port_==0) {
- host_ = host;
- port_ = port;
- }
- return connect(timeout);
- }
- void async_connect(const std::string& host, unsigned short port) {
- if (port_ == 0) {
- host_ = host;
- port_ = port;
- }
- async_connect();
- }
- bool wait_conn(size_t timeout) {
- if (has_connected_) {
- return true;
- }
- has_wait_ = true;
- std::unique_lock<std::mutex> lock(conn_mtx_);
- bool result = conn_cond_.wait_for(lock, std::chrono::seconds(timeout),
- [this] {return has_connected_.load(); });
- has_wait_ = false;
- return has_connected_;
- }
- void enable_auto_reconnect(bool enable = true) {
- enable_reconnect_ = enable;
- }
- void enable_auto_heartbeat(bool enable = true) {
- if (enable) {
- reset_deadline_timer(5);
- }
- else {
- deadline_.cancel();
- }
- }
- void update_addr(const std::string& host, unsigned short port) {
- host_ = host;
- port_ = port;
- }
- void close() {
- has_connected_ = false;
- if (socket_.is_open()) {
- boost::system::error_code ignored_ec;
- socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec);
- socket_.close(ignored_ec);
- }
- clear_cache();
- }
- void set_error_callback(std::function<void(boost::system::error_code)> f) {
- err_cb_ = std::move(f);
- }
- uint64_t reqest_id() {
- return temp_req_id_;
- }
- bool has_connected() const {
- return has_connected_;
- }
- //sync call
- #if __cplusplus > 201402L
- template<size_t TIMEOUT, typename T = void, typename... Args>
- auto call(const std::string& rpc_name, Args&& ... args) {
- std::future<req_result> future = async_call<FUTURE>(rpc_name, std::forward<Args>(args)...);
- auto status = future.wait_for(std::chrono::milliseconds(TIMEOUT));
- if (status == std::future_status::timeout || status == std::future_status::deferred) {
- throw std::out_of_range("timeout or deferred");
- }
- if constexpr (std::is_void_v<T>) {
- future.get().as();
- }
- else {
- return future.get().as<T>();
- }
- }
- template<typename T = void, typename... Args>
- auto call(const std::string& rpc_name, Args&& ... args) {
- return call<DEFAULT_TIMEOUT, T>(rpc_name, std::forward<Args>(args)...);
- }
- #else
- template<size_t TIMEOUT, typename T=void, typename... Args>
- typename std::enable_if<std::is_void<T>::value>::type call(const std::string& rpc_name, Args&& ... args) {
- std::future<req_result> future = async_call<FUTURE>(rpc_name, std::forward<Args>(args)...);
- auto status = future.wait_for(std::chrono::milliseconds(TIMEOUT));
- if (status == std::future_status::timeout || status == std::future_status::deferred) {
- throw std::out_of_range("timeout or deferred");
- }
- future.get().as();
- }
- template<typename T = void, typename... Args>
- typename std::enable_if<std::is_void<T>::value>::type call(const std::string& rpc_name, Args&& ... args) {
- call<DEFAULT_TIMEOUT, T>(rpc_name, std::forward<Args>(args)...);
- }
- template<size_t TIMEOUT, typename T, typename... Args>
- typename std::enable_if<!std::is_void<T>::value, T>::type call(const std::string& rpc_name, Args&& ... args) {
- std::future<req_result> future = async_call<FUTURE>(rpc_name, std::forward<Args>(args)...);
- auto status = future.wait_for(std::chrono::milliseconds(TIMEOUT));
- if (status == std::future_status::timeout || status == std::future_status::deferred) {
- throw std::out_of_range("timeout or deferred");
- }
- return future.get().as<T>();
- }
- template<typename T, typename... Args>
- typename std::enable_if<!std::is_void<T>::value, T>::type call(const std::string& rpc_name, Args&& ... args) {
- return call<DEFAULT_TIMEOUT, T>(rpc_name, std::forward<Args>(args)...);
- }
- #endif
- template<CallModel model, typename... Args>
- std::future<req_result> async_call(const std::string& rpc_name, Args&&... args) {
- auto p = std::make_shared<std::promise<req_result>>();
- std::future<req_result> future = p->get_future();
- uint64_t fu_id = 0;
- {
- std::unique_lock<std::mutex> lock(cb_mtx_);
- fu_id_++;
- fu_id = fu_id_;
- future_map_.emplace(fu_id, std::move(p));
- }
- msgpack_codec codec;
- auto ret = codec.pack_args(rpc_name, std::forward<Args>(args)...);
- write(fu_id, request_type::req_res, std::move(ret));
- return future;
- }
- template<size_t TIMEOUT = DEFAULT_TIMEOUT, typename... Args>
- void async_call(const std::string& rpc_name, std::function<void(boost::system::error_code, string_view)> cb, Args&& ... args) {
- if (!has_connected_) {
- error_callback(boost::asio::error::make_error_code(boost::asio::error::not_connected));
- return;
- }
- uint64_t cb_id = 0;
- {
- std::unique_lock<std::mutex> lock(cb_mtx_);
- callback_id_++;
- callback_id_ |= (uint64_t(1) << 63);
- cb_id = callback_id_;
- auto call = std::make_shared<call_t>(ios_, std::move(cb), TIMEOUT);
- call->start_timer();
- callback_map_.emplace(cb_id, call);
- }
- msgpack_codec codec;
- auto ret = codec.pack_args(rpc_name, std::forward<Args>(args)...);
- write(cb_id, request_type::req_res, std::move(ret));
- }
- void stop() {
- if (thd_ != nullptr) {
- ios_.stop();
- thd_->join();
- thd_ = nullptr;
- }
- }
- template<typename Func>
- void subscribe(std::string key, Func f) {
- auto it = sub_map_.find(key);
- if (it != sub_map_.end()) {
- assert("duplicated subscribe");
- return;
- }
- sub_map_.emplace(key, std::move(f));
- send_subscribe(key, "");
- key_token_set_.emplace(std::move(key), "");
- }
- template<typename Func>
- void subscribe(std::string key, std::string token, Func f) {
- auto composite_key = key + token;
- auto it = sub_map_.find(composite_key);
- if (it != sub_map_.end()) {
- assert("duplicated subscribe");
- return;
- }
- sub_map_.emplace(std::move(composite_key), std::move(f));
- send_subscribe(key, token);
- key_token_set_.emplace(std::move(key), std::move(token));
- }
- template<typename T, size_t TIMEOUT = 3>
- void publish(std::string key, T&& t) {
- msgpack_codec codec;
- auto buf = codec.pack(std::move(t));
- call<TIMEOUT>("publish", std::move(key), "", std::string(buf.data(), buf.size()));
- }
- template<typename T, size_t TIMEOUT=3>
- void publish_by_token(std::string key, std::string token, T&& t) {
- msgpack_codec codec;
- auto buf = codec.pack(std::move(t));
- call<TIMEOUT>("publish_by_token", std::move(key), std::move(token), std::string(buf.data(), buf.size()));
- }
- private:
- void async_connect() {
- assert(port_ != 0);
- auto addr = boost::asio::ip::address::from_string(host_);
- socket_.async_connect({ addr, port_ }, [this](const boost::system::error_code& ec) {
- if (has_connected_) {
- return;
- }
- if (ec) {
- //std::cout << ec.message() << std::endl;
- has_connected_ = false;
- if (reconnect_cnt_ == 0) {
- return;
- }
- if (reconnect_cnt_ > 0) {
- reconnect_cnt_--;
- }
- async_reconnect();
- }
- else {
- //std::cout<<"connected ok"<<std::endl;
- has_connected_ = true;
- do_read();
- resend_subscribe();
- if (has_wait_)
- conn_cond_.notify_one();
- }
- });
- }
- void async_reconnect() {
- reset_socket();
- async_connect();
- std::this_thread::sleep_for(std::chrono::milliseconds(connect_timeout_));
- }
- void reset_deadline_timer(size_t timeout) {
- deadline_.expires_from_now(std::chrono::seconds(timeout));
- deadline_.async_wait([this, timeout](const boost::system::error_code& ec) {
- if (!ec) {
- if (has_connected_) {
- write(0, request_type::req_res, buffer_type(0));
- }
- }
- reset_deadline_timer(timeout);
- });
- }
- void write(std::uint64_t req_id, request_type type, buffer_type&& message) {
- size_t size = message.size();
- assert(size < MAX_BUF_LEN);
- client_message_type msg{ req_id, type, {message.release(), size} };
- std::unique_lock<std::mutex> lock(write_mtx_);
- outbox_.emplace_back(std::move(msg));
- if (outbox_.size() > 1) {
- // outstanding async_write
- return;
- }
- write();
- }
- void write() {
- auto& msg = outbox_[0];
- write_size_ = (uint32_t)msg.content.length();
- std::array<boost::asio::const_buffer, 4> write_buffers;
- write_buffers[0] = boost::asio::buffer(&write_size_, sizeof(int32_t));
- write_buffers[1] = boost::asio::buffer(&msg.req_id, sizeof(uint64_t));
- write_buffers[2] = boost::asio::buffer(&msg.req_type, sizeof(request_type));
- write_buffers[3] = boost::asio::buffer((char*)msg.content.data(), write_size_);
- boost::asio::async_write(socket_, write_buffers,
- [this](const boost::system::error_code& ec, const size_t length) {
- if (ec) {
- has_connected_ = false;
- close();
- error_callback(ec);
- return;
- }
- std::unique_lock<std::mutex> lock(write_mtx_);
- if (outbox_.empty()) {
- return;
- }
- ::free((char*)outbox_.front().content.data());
- outbox_.pop_front();
- if (!outbox_.empty()) {
- // more messages to send
- this->write();
- }
- });
- }
- void do_read() {
- boost::asio::async_read(socket_, boost::asio::buffer(head_),
- [this](const boost::system::error_code& ec, const size_t length) {
- if (!socket_.is_open()) {
- //LOG(INFO) << "socket already closed";
- has_connected_ = false;
- return;
- }
- if (!ec) {
- //const uint32_t body_len = *((uint32_t*)(head_));
- //auto req_id = *((std::uint64_t*)(head_ + sizeof(int32_t)));
- //auto req_type = *(request_type*)(head_ + sizeof(int32_t) + sizeof(int64_t));
- rpc_header* header = (rpc_header*)(head_);
- const uint32_t body_len = header->body_len;
- if (body_len > 0 && body_len < MAX_BUF_LEN) {
- if (body_.size() < body_len) { body_.resize(body_len); }
- read_body(header->req_id, header->req_type, body_len);
- return;
- }
- if (body_len == 0 || body_len > MAX_BUF_LEN) {
- //LOG(INFO) << "invalid body len";
- close();
- error_callback(asio::error::make_error_code(asio::error::message_size));
- return;
- }
- }
- else {
- //LOG(INFO) << ec.message();
- has_connected_ = false;
- close();
- error_callback(ec);
- }
- });
- }
- void read_body(std::uint64_t req_id, request_type req_type, size_t body_len) {
- boost::asio::async_read(
- socket_, boost::asio::buffer(body_.data(), body_len),
- [this, req_id, req_type, body_len](boost::system::error_code ec, std::size_t length) {
- //cancel_timer();
- if (!socket_.is_open()) {
- //LOG(INFO) << "socket already closed";
- call_back(req_id, asio::error::make_error_code(asio::error::connection_aborted), {});
- return;
- }
- if (!ec) {
- //entier body
- if (req_type == request_type::req_res) {
- call_back(req_id, ec, { body_.data(), body_len });
- }
- else if (req_type == request_type::sub_pub) {
- callback_sub(ec, { body_.data(), body_len });
- }
- else {
- close();
- error_callback(asio::error::make_error_code(asio::error::invalid_argument));
- return;
- }
- do_read();
- }
- else {
- //LOG(INFO) << ec.message();
- has_connected_ = false;
- close();
- error_callback(ec);
- }
- });
- }
- void send_subscribe(const std::string& key, const std::string& token) {
- msgpack_codec codec;
- auto ret = codec.pack_args(key, token);
- write(0, request_type::sub_pub, std::move(ret));
- }
- void resend_subscribe() {
- if (key_token_set_.empty())
- return;
- for (auto& pair : key_token_set_) {
- send_subscribe(pair.first, pair.second);
- }
- }
- void call_back(uint64_t req_id, const boost::system::error_code& ec, string_view data) {
- temp_req_id_ = req_id;
- auto cb_flag = req_id >> 63;
- if (cb_flag) {
- std::shared_ptr<call_t> cl = nullptr;
- {
- std::unique_lock<std::mutex> lock(cb_mtx_);
- cl = std::move(callback_map_[req_id]);
- }
- assert(cl);
- if (!cl->has_timeout()) {
- cl->cancel();
- cl->callback(ec, data);
- }
- else {
- cl->callback(asio::error::make_error_code(asio::error::timed_out), {});
- }
- std::unique_lock<std::mutex> lock(cb_mtx_);
- callback_map_.erase(req_id);
- }
- else {
- std::unique_lock<std::mutex> lock(cb_mtx_);
- auto& f = future_map_[req_id];
- if (ec) {
- //LOG<<ec.message();
- if (!f) {
- //std::cout << "invalid req_id" << std::endl;
- return;
- }
- }
- assert(f);
- f->set_value(req_result{ data });
- future_map_.erase(req_id);
- }
- }
- void callback_sub(const boost::system::error_code& ec, string_view result) {
- rpc_service::msgpack_codec codec;
- try {
- auto tp = codec.unpack<std::tuple<int, std::string, std::string>>(result.data(), result.size());
- auto code = std::get<0>(tp);
- auto& key = std::get<1>(tp);
- auto& data = std::get<2>(tp);
- auto it = sub_map_.find(key);
- if (it == sub_map_.end()) {
- return;
- }
- it->second(data);
- }
- catch (const std::exception& ex) {
- error_callback(asio::error::make_error_code(asio::error::invalid_argument));
- std::cout << ex.what() << "\n";
- }
- }
- void clear_cache() {
- {
- std::unique_lock<std::mutex> lock(write_mtx_);
- while (!outbox_.empty()) {
- ::free((char*)outbox_.front().content.data());
- outbox_.pop_front();
- }
- }
- {
- std::unique_lock<std::mutex> lock(cb_mtx_);
- callback_map_.clear();
- future_map_.clear();
- }
- }
- void reset_socket(){
- boost::system::error_code igored_ec;
- socket_.close(igored_ec);
- socket_ = decltype(socket_)(ios_);
- if(!socket_.is_open()){
- socket_.open(boost::asio::ip::tcp::v4());
- }
- }
- class call_t : asio::noncopyable, public std::enable_shared_from_this<call_t> {
- public:
- call_t(asio::io_service& ios, std::function<void(boost::system::error_code, string_view)> cb, size_t timeout) : timer_(ios),
- cb_(std::move(cb)), timeout_(timeout){
- }
- void start_timer() {
- if (timeout_ == 0) {
- return;
- }
- timer_.expires_from_now(std::chrono::milliseconds(timeout_));
- auto self = this->shared_from_this();
- timer_.async_wait([this, self](boost::system::error_code ec) {
- if (ec) {
- return;
- }
- has_timeout_ = true;
- });
- }
- void callback(boost::system::error_code ec, string_view data) {
- cb_(ec, data);
- }
- bool has_timeout() const {
- return has_timeout_;
- }
- void cancel() {
- if (timeout_ == 0) {
- return;
- }
- boost::system::error_code ec;
- timer_.cancel(ec);
- }
- private:
- boost::asio::steady_timer timer_;
- std::function<void(boost::system::error_code, string_view)> cb_;
- size_t timeout_;
- bool has_timeout_ = false;
- };
- void error_callback(const boost::system::error_code& ec) {
- if (err_cb_) {
- err_cb_(ec);
- }
- if (enable_reconnect_) {
- async_connect();
- }
- }
- void set_default_error_cb() {
- err_cb_ = [this](boost::system::error_code){
- async_connect();
- };
- }
- boost::asio::io_service ios_;
- asio::ip::tcp::socket socket_;
- boost::asio::io_service::work work_;
- std::shared_ptr<std::thread> thd_ = nullptr;
- std::string host_;
- unsigned short port_ = 0;
- size_t connect_timeout_ = 1000;//s
- int reconnect_cnt_ = -1;
- std::atomic_bool has_connected_ = { false };
- std::mutex conn_mtx_;
- std::condition_variable conn_cond_;
- bool has_wait_ = false;
- asio::steady_timer deadline_;
- struct client_message_type {
- std::uint64_t req_id;
- request_type req_type;
- string_view content;
- };
- std::deque<client_message_type> outbox_;
- uint32_t write_size_ = 0;
- std::mutex write_mtx_;
- uint64_t fu_id_ = 0;
- std::function<void(boost::system::error_code)> err_cb_;
- bool enable_reconnect_ = false;
- std::unordered_map<std::uint64_t, std::shared_ptr<std::promise<req_result>>> future_map_;
- std::unordered_map<std::uint64_t, std::shared_ptr<call_t>> callback_map_;
- std::mutex cb_mtx_;
- uint64_t callback_id_ = 0;
- uint64_t temp_req_id_ = 0;
- char head_[HEAD_LEN] = {};
- std::vector<char> body_;
- std::unordered_map<std::string, std::function<void(string_view)>> sub_map_;
- std::set<std::pair<std::string, std::string>> key_token_set_;
- };
- }
|