Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

202 lines
5.0 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
  3. #pragma once
  4. #include <sys/types.h>
  5. #include <stdint.h>
  6. #include <arpa/inet.h>
  7. #include <set>
  8. #include <map>
  9. #include <deque>
  10. #include <vector>
  11. #include "malloc_or_die.h"
  12. #include "json11/json11.hpp"
  13. #include "msgr_op.h"
  14. #include "timerfd_manager.h"
  15. #include <ringloop.h>
  16. #ifdef WITH_RDMA
  17. #include "msgr_rdma.h"
  18. #endif
  19. #define CL_READ_HDR 1
  20. #define CL_READ_DATA 2
  21. #define CL_READ_REPLY_DATA 3
  22. #define CL_WRITE_READY 1
  23. #define PEER_CONNECTING 1
  24. #define PEER_CONNECTED 2
  25. #define PEER_RDMA_CONNECTING 3
  26. #define PEER_RDMA 4
  27. #define PEER_STOPPED 5
  28. #define DEFAULT_BITMAP_GRANULARITY 4096
  29. #define VITASTOR_CONFIG_PATH "/etc/vitastor/vitastor.conf"
  30. #define MSGR_SENDP_HDR 1
  31. #define MSGR_SENDP_FREE 2
  32. struct msgr_sendp_t
  33. {
  34. osd_op_t *op;
  35. int flags;
  36. };
  37. struct osd_client_t
  38. {
  39. int refs = 0;
  40. sockaddr_in peer_addr;
  41. int peer_port;
  42. int peer_fd;
  43. int peer_state;
  44. int connect_timeout_id = -1;
  45. int ping_time_remaining = 0;
  46. int idle_time_remaining = 0;
  47. osd_num_t osd_num = 0;
  48. void *in_buf = NULL;
  49. #ifdef WITH_RDMA
  50. msgr_rdma_connection_t *rdma_conn = NULL;
  51. #endif
  52. // Read state
  53. int read_ready = 0;
  54. osd_op_t *read_op = NULL;
  55. iovec read_iov = { 0 };
  56. msghdr read_msg = { 0 };
  57. int read_remaining = 0;
  58. int read_state = 0;
  59. osd_op_buf_list_t recv_list;
  60. // Incoming operations
  61. std::vector<osd_op_t*> received_ops;
  62. // Outbound operations
  63. std::map<uint64_t, osd_op_t*> sent_ops;
  64. // PGs dirtied by this client's primary-writes
  65. std::set<pool_pg_num_t> dirty_pgs;
  66. // Write state
  67. msghdr write_msg = { 0 };
  68. int write_state = 0;
  69. std::vector<iovec> send_list, next_send_list;
  70. std::vector<msgr_sendp_t> outbox, next_outbox;
  71. ~osd_client_t()
  72. {
  73. free(in_buf);
  74. in_buf = NULL;
  75. }
  76. };
  77. struct osd_wanted_peer_t
  78. {
  79. json11::Json address_list;
  80. int port;
  81. time_t last_connect_attempt;
  82. bool connecting, address_changed;
  83. int address_index;
  84. std::string cur_addr;
  85. int cur_port;
  86. };
  87. struct osd_op_stats_t
  88. {
  89. uint64_t op_stat_sum[OSD_OP_MAX+1] = { 0 };
  90. uint64_t op_stat_count[OSD_OP_MAX+1] = { 0 };
  91. uint64_t op_stat_bytes[OSD_OP_MAX+1] = { 0 };
  92. uint64_t subop_stat_sum[OSD_OP_MAX+1] = { 0 };
  93. uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
  94. };
  95. struct osd_messenger_t
  96. {
  97. protected:
  98. int keepalive_timer_id = -1;
  99. uint32_t receive_buffer_size = 0;
  100. int peer_connect_interval = 0;
  101. int peer_connect_timeout = 0;
  102. int osd_idle_timeout = 0;
  103. int osd_ping_timeout = 0;
  104. int log_level = 0;
  105. bool use_sync_send_recv = false;
  106. #ifdef WITH_RDMA
  107. bool use_rdma = true;
  108. std::string rdma_device;
  109. uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0;
  110. msgr_rdma_context_t *rdma_context = NULL;
  111. uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 8;
  112. uint64_t rdma_max_msg = 0;
  113. #endif
  114. std::vector<int> read_ready_clients;
  115. std::vector<int> write_ready_clients;
  116. std::vector<std::function<void()>> set_immediate;
  117. public:
  118. timerfd_manager_t *tfd;
  119. ring_loop_t *ringloop;
  120. // osd_num_t is only for logging and asserts
  121. osd_num_t osd_num;
  122. uint64_t next_subop_id = 1;
  123. std::map<int, osd_client_t*> clients;
  124. std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
  125. std::map<uint64_t, int> osd_peer_fds;
  126. // op statistics
  127. osd_op_stats_t stats;
  128. void init();
  129. void parse_config(const json11::Json & config);
  130. void connect_peer(uint64_t osd_num, json11::Json peer_state);
  131. void stop_client(int peer_fd, bool force = false);
  132. void outbox_push(osd_op_t *cur_op);
  133. std::function<void(osd_op_t*)> exec_op;
  134. std::function<void(osd_num_t)> repeer_pgs;
  135. void read_requests();
  136. void send_replies();
  137. void accept_connections(int listen_fd);
  138. ~osd_messenger_t();
  139. static json11::Json read_config(const json11::Json & config);
  140. #ifdef WITH_RDMA
  141. bool is_rdma_enabled();
  142. bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
  143. #endif
  144. protected:
  145. void try_connect_peer(uint64_t osd_num);
  146. void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
  147. void handle_peer_epoll(int peer_fd, int epoll_events);
  148. void handle_connect_epoll(int peer_fd);
  149. void on_connect_peer(osd_num_t peer_osd, int peer_fd);
  150. void check_peer_config(osd_client_t *cl);
  151. void cancel_osd_ops(osd_client_t *cl);
  152. void cancel_op(osd_op_t *op);
  153. bool try_send(osd_client_t *cl);
  154. void measure_exec(osd_op_t *cur_op);
  155. void handle_send(int result, osd_client_t *cl);
  156. bool handle_read(int result, osd_client_t *cl);
  157. bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
  158. bool handle_finished_read(osd_client_t *cl);
  159. void handle_op_hdr(osd_client_t *cl);
  160. bool handle_reply_hdr(osd_client_t *cl);
  161. void handle_reply_ready(osd_op_t *op);
  162. #ifdef WITH_RDMA
  163. bool try_send_rdma(osd_client_t *cl);
  164. bool try_recv_rdma(osd_client_t *cl);
  165. void handle_rdma_events();
  166. #endif
  167. };