connection.h 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. #ifndef REST_RPC_CONNECTION_H_
  2. #define REST_RPC_CONNECTION_H_
  3. #include <iostream>
  4. #include <memory>
  5. #include <array>
  6. #include <deque>
  7. #include "use_asio.hpp"
  8. #include "const_vars.h"
  9. #include "router.h"
  10. using boost::asio::ip::tcp;
  11. namespace rest_rpc {
  12. namespace rpc_service {
  13. class connection : public std::enable_shared_from_this<connection>, private asio::noncopyable {
  14. public:
  15. connection(boost::asio::io_service& io_service, std::size_t timeout_seconds)
  16. : socket_(io_service),
  17. body_(INIT_BUF_SIZE),
  18. timer_(io_service),
  19. timeout_seconds_(timeout_seconds),
  20. has_closed_(false) {
  21. }
  22. ~connection() { close(); }
  23. void start() { read_head(); }
  24. tcp::socket& socket() { return socket_; }
  25. bool has_closed() const { return has_closed_; }
  26. uint64_t request_id() const {
  27. return req_id_;
  28. }
  29. void response(uint64_t req_id, std::string data, request_type req_type = request_type::req_res) {
  30. auto len = data.size();
  31. assert(len < MAX_BUF_LEN);
  32. std::unique_lock<std::mutex> lock(write_mtx_);
  33. write_queue_.emplace_back(message_type{ req_id, req_type, std::make_shared<std::string>(std::move(data)) });
  34. if (write_queue_.size() > 1) {
  35. return;
  36. }
  37. write();
  38. }
  39. template<typename T>
  40. void pack_and_response(uint64_t req_id, T data) {
  41. auto result = msgpack_codec::pack_args_str(result_code::OK, std::move(data));
  42. response(req_id, std::move(result));
  43. }
  44. void close() {
  45. has_closed_ = true;
  46. if (socket_.is_open()) {
  47. boost::system::error_code ignored_ec;
  48. socket_.shutdown(tcp::socket::shutdown_both, ignored_ec);
  49. socket_.close(ignored_ec);
  50. }
  51. }
  52. void set_conn_id(int64_t id) { conn_id_ = id; }
  53. int64_t conn_id() const { return conn_id_; }
  54. const std::vector<char>& body() const {
  55. return body_;
  56. }
  57. std::string remote_address() const {
  58. if (has_closed_) {
  59. return "";
  60. }
  61. return socket_.remote_endpoint().address().to_string();
  62. }
  63. void publish(const std::string& key, const std::string& data) {
  64. auto result = msgpack_codec::pack_args_str(result_code::OK, key, data);
  65. response(0, std::move(result), request_type::sub_pub);
  66. }
  67. void set_callback(std::function<void(std::string, std::string, std::weak_ptr<connection>)> callback) {
  68. callback_ = std::move(callback);
  69. }
  70. private:
  71. void read_head() {
  72. reset_timer();
  73. auto self(this->shared_from_this());
  74. boost::asio::async_read(
  75. socket_, boost::asio::buffer(head_),
  76. [this, self](boost::system::error_code ec, std::size_t length) {
  77. if (!socket_.is_open()) {
  78. //LOG(INFO) << "socket already closed";
  79. return;
  80. }
  81. if (!ec) {
  82. //const uint32_t body_len = *((int*)(head_));
  83. //req_id_ = *((std::uint64_t*)(head_ + sizeof(int32_t)));
  84. rpc_header* header = (rpc_header*)(head_);
  85. req_id_ = header->req_id;
  86. const uint32_t body_len = header->body_len;
  87. req_type_ = header->req_type;
  88. if (body_len > 0 && body_len < MAX_BUF_LEN) {
  89. if (body_.size() < body_len) { body_.resize(body_len); }
  90. read_body(body_len);
  91. return;
  92. }
  93. if (body_len == 0) { // nobody, just head, maybe as heartbeat.
  94. cancel_timer();
  95. read_head();
  96. }
  97. else {
  98. //LOG(INFO) << "invalid body len";
  99. close();
  100. }
  101. }
  102. else {
  103. //LOG(INFO) << ec.message();
  104. close();
  105. }
  106. });
  107. }
  108. void read_body(std::size_t size) {
  109. auto self(this->shared_from_this());
  110. boost::asio::async_read(
  111. socket_, boost::asio::buffer(body_.data(), size),
  112. [this, self](boost::system::error_code ec, std::size_t length) {
  113. cancel_timer();
  114. if (!socket_.is_open()) {
  115. //LOG(INFO) << "socket already closed";
  116. return;
  117. }
  118. if (!ec) {
  119. read_head();
  120. if (req_type_ == request_type::req_res) {
  121. router& _router = router::get();
  122. _router.route<connection>(body_.data(), length, this->shared_from_this());
  123. }
  124. else if (req_type_ == request_type::sub_pub) {
  125. try {
  126. msgpack_codec codec;
  127. auto p = codec.unpack<std::tuple<std::string, std::string>>(body_.data(), length);
  128. callback_(std::move(std::get<0>(p)), std::move(std::get<1>(p)), this->shared_from_this());
  129. }
  130. catch (const std::exception& ex) {
  131. std::cout << ex.what() << "\n";
  132. }
  133. }
  134. }
  135. else {
  136. //LOG(INFO) << ec.message();
  137. }
  138. });
  139. }
  140. void write() {
  141. auto& msg = write_queue_.front();
  142. write_size_ = (uint32_t)msg.content->size();
  143. std::array<boost::asio::const_buffer, 4> write_buffers;
  144. write_buffers[0] = boost::asio::buffer(&write_size_, sizeof(uint32_t));
  145. write_buffers[1] = boost::asio::buffer(&msg.req_id, sizeof(uint64_t));
  146. write_buffers[2] = boost::asio::buffer(&msg.req_type, sizeof(request_type));
  147. write_buffers[3] = boost::asio::buffer(msg.content->data(), write_size_);
  148. auto self = this->shared_from_this();
  149. boost::asio::async_write(
  150. socket_, write_buffers,
  151. [this, self](boost::system::error_code ec, std::size_t length) {
  152. on_write(ec, length);
  153. });
  154. }
  155. void on_write(boost::system::error_code ec, std::size_t length) {
  156. if (ec) {
  157. std::cout << ec.value() << " " << ec.message() << std::endl;
  158. close();
  159. return;
  160. }
  161. if (has_closed()) { return; }
  162. std::unique_lock<std::mutex> lock(write_mtx_);
  163. write_queue_.pop_front();
  164. if (!write_queue_.empty()) {
  165. write();
  166. }
  167. }
  168. void reset_timer() {
  169. if (timeout_seconds_ == 0) { return; }
  170. auto self(this->shared_from_this());
  171. timer_.expires_from_now(std::chrono::seconds(timeout_seconds_));
  172. timer_.async_wait([this, self](const boost::system::error_code& ec) {
  173. if (has_closed()) { return; }
  174. if (ec) { return; }
  175. //LOG(INFO) << "rpc connection timeout";
  176. close();
  177. });
  178. }
  179. void cancel_timer() {
  180. if (timeout_seconds_ == 0) { return; }
  181. timer_.cancel();
  182. }
  183. tcp::socket socket_;
  184. char head_[HEAD_LEN];
  185. std::vector<char> body_;
  186. std::uint64_t req_id_;
  187. request_type req_type_;
  188. uint32_t write_size_ = 0;
  189. std::mutex write_mtx_;
  190. asio::steady_timer timer_;
  191. std::size_t timeout_seconds_;
  192. int64_t conn_id_ = 0;
  193. bool has_closed_;
  194. std::deque<message_type> write_queue_;
  195. std::function<void(std::string, std::string, std::weak_ptr<connection>)> callback_;
  196. };
  197. } // namespace rpc_service
  198. } // namespace rest_rpc
  199. #endif // REST_RPC_CONNECTION_H_