Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

291 lines
9.5 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
  3. #define _XOPEN_SOURCE
  4. #include <limits.h>
  5. #include "messenger.h"
  6. void osd_messenger_t::outbox_push(osd_op_t *cur_op)
  7. {
  8. assert(cur_op->peer_fd);
  9. osd_client_t *cl = clients.at(cur_op->peer_fd);
  10. if (cur_op->op_type == OSD_OP_OUT)
  11. {
  12. clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);
  13. }
  14. else
  15. {
  16. // Check that operation actually belongs to this client
  17. // FIXME: Review if this is still needed
  18. bool found = false;
  19. for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++)
  20. {
  21. if (*it == cur_op)
  22. {
  23. found = true;
  24. cl->received_ops.erase(it, it+1);
  25. break;
  26. }
  27. }
  28. if (!found)
  29. {
  30. delete cur_op;
  31. return;
  32. }
  33. }
  34. auto & to_send_list = cl->write_msg.msg_iovlen ? cl->next_send_list : cl->send_list;
  35. auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox;
  36. if (cur_op->op_type == OSD_OP_IN)
  37. {
  38. measure_exec(cur_op);
  39. to_send_list.push_back((iovec){ .iov_base = cur_op->reply.buf, .iov_len = OSD_PACKET_SIZE });
  40. }
  41. else
  42. {
  43. to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE });
  44. cl->sent_ops[cur_op->req.hdr.id] = cur_op;
  45. }
  46. to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_HDR });
  47. // Bitmap
  48. if (cur_op->op_type == OSD_OP_IN &&
  49. cur_op->req.hdr.opcode == OSD_OP_SEC_READ &&
  50. cur_op->reply.sec_rw.attr_len > 0)
  51. {
  52. to_send_list.push_back((iovec){
  53. .iov_base = cur_op->bitmap,
  54. .iov_len = cur_op->reply.sec_rw.attr_len,
  55. });
  56. to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
  57. }
  58. else if (cur_op->op_type == OSD_OP_OUT &&
  59. (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) &&
  60. cur_op->req.sec_rw.attr_len > 0)
  61. {
  62. to_send_list.push_back((iovec){
  63. .iov_base = cur_op->bitmap,
  64. .iov_len = cur_op->req.sec_rw.attr_len,
  65. });
  66. to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
  67. }
  68. // Operation data
  69. if ((cur_op->op_type == OSD_OP_IN
  70. ? (cur_op->req.hdr.opcode == OSD_OP_READ ||
  71. cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
  72. cur_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
  73. cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
  74. : (cur_op->req.hdr.opcode == OSD_OP_WRITE ||
  75. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
  76. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE ||
  77. cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
  78. cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK ||
  79. cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)) && cur_op->iov.count > 0)
  80. {
  81. for (int i = 0; i < cur_op->iov.count; i++)
  82. {
  83. assert(cur_op->iov.buf[i].iov_base);
  84. to_send_list.push_back(cur_op->iov.buf[i]);
  85. to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
  86. }
  87. }
  88. if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
  89. {
  90. if (cur_op->op_type == OSD_OP_IN && cur_op->reply.hdr.retval > 0)
  91. to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->reply.hdr.retval });
  92. else if (cur_op->op_type == OSD_OP_OUT && cur_op->req.sec_read_bmp.len > 0)
  93. to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len });
  94. to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
  95. }
  96. if (cur_op->op_type == OSD_OP_IN)
  97. {
  98. to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_FREE;
  99. }
  100. #ifdef WITH_RDMA
  101. if (cl->peer_state == PEER_RDMA)
  102. {
  103. try_send_rdma(cl);
  104. return;
  105. }
  106. #endif
  107. if (!ringloop)
  108. {
  109. // FIXME: It's worse because it doesn't allow batching
  110. while (cl->outbox.size())
  111. {
  112. try_send(cl);
  113. }
  114. }
  115. else if (cl->write_msg.msg_iovlen > 0 || !try_send(cl))
  116. {
  117. if (cl->write_state == 0)
  118. {
  119. cl->write_state = CL_WRITE_READY;
  120. write_ready_clients.push_back(cur_op->peer_fd);
  121. }
  122. ringloop->wakeup();
  123. }
  124. }
  125. void osd_messenger_t::measure_exec(osd_op_t *cur_op)
  126. {
  127. // Measure execution latency
  128. if (cur_op->req.hdr.opcode > OSD_OP_MAX)
  129. {
  130. return;
  131. }
  132. if (!cur_op->tv_end.tv_sec)
  133. {
  134. clock_gettime(CLOCK_REALTIME, &cur_op->tv_end);
  135. }
  136. stats.op_stat_count[cur_op->req.hdr.opcode]++;
  137. if (!stats.op_stat_count[cur_op->req.hdr.opcode])
  138. {
  139. stats.op_stat_count[cur_op->req.hdr.opcode]++;
  140. stats.op_stat_sum[cur_op->req.hdr.opcode] = 0;
  141. stats.op_stat_bytes[cur_op->req.hdr.opcode] = 0;
  142. }
  143. stats.op_stat_sum[cur_op->req.hdr.opcode] += (
  144. (cur_op->tv_end.tv_sec - cur_op->tv_begin.tv_sec)*1000000 +
  145. (cur_op->tv_end.tv_nsec - cur_op->tv_begin.tv_nsec)/1000
  146. );
  147. if (cur_op->req.hdr.opcode == OSD_OP_READ ||
  148. cur_op->req.hdr.opcode == OSD_OP_WRITE)
  149. {
  150. stats.op_stat_bytes[cur_op->req.hdr.opcode] += cur_op->req.rw.len;
  151. }
  152. else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
  153. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
  154. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
  155. {
  156. stats.op_stat_bytes[cur_op->req.hdr.opcode] += cur_op->req.sec_rw.len;
  157. }
  158. }
  159. bool osd_messenger_t::try_send(osd_client_t *cl)
  160. {
  161. int peer_fd = cl->peer_fd;
  162. if (!cl->send_list.size() || cl->write_msg.msg_iovlen > 0)
  163. {
  164. return true;
  165. }
  166. if (ringloop && !use_sync_send_recv)
  167. {
  168. io_uring_sqe* sqe = ringloop->get_sqe();
  169. if (!sqe)
  170. {
  171. return false;
  172. }
  173. cl->write_msg.msg_iov = cl->send_list.data();
  174. cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
  175. cl->refs++;
  176. ring_data_t* data = ((ring_data_t*)sqe->user_data);
  177. data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
  178. my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
  179. }
  180. else
  181. {
  182. cl->write_msg.msg_iov = cl->send_list.data();
  183. cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
  184. cl->refs++;
  185. int result = sendmsg(peer_fd, &cl->write_msg, MSG_NOSIGNAL);
  186. if (result < 0)
  187. {
  188. result = -errno;
  189. }
  190. handle_send(result, cl);
  191. }
  192. return true;
  193. }
  194. void osd_messenger_t::send_replies()
  195. {
  196. for (int i = 0; i < write_ready_clients.size(); i++)
  197. {
  198. int peer_fd = write_ready_clients[i];
  199. auto cl_it = clients.find(peer_fd);
  200. if (cl_it != clients.end() && !try_send(cl_it->second))
  201. {
  202. write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
  203. return;
  204. }
  205. }
  206. write_ready_clients.clear();
  207. }
  208. void osd_messenger_t::handle_send(int result, osd_client_t *cl)
  209. {
  210. cl->write_msg.msg_iovlen = 0;
  211. cl->refs--;
  212. if (cl->peer_state == PEER_STOPPED)
  213. {
  214. if (cl->refs <= 0)
  215. {
  216. delete cl;
  217. }
  218. return;
  219. }
  220. if (result < 0 && result != -EAGAIN)
  221. {
  222. // this is a client socket, so don't panic. just disconnect it
  223. fprintf(stderr, "Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
  224. stop_client(cl->peer_fd);
  225. return;
  226. }
  227. if (result >= 0)
  228. {
  229. int done = 0;
  230. while (result > 0 && done < cl->send_list.size())
  231. {
  232. iovec & iov = cl->send_list[done];
  233. if (iov.iov_len <= result)
  234. {
  235. if (cl->outbox[done].flags & MSGR_SENDP_FREE)
  236. {
  237. // Reply fully sent
  238. delete cl->outbox[done].op;
  239. }
  240. result -= iov.iov_len;
  241. done++;
  242. }
  243. else
  244. {
  245. iov.iov_len -= result;
  246. iov.iov_base += result;
  247. break;
  248. }
  249. }
  250. if (done > 0)
  251. {
  252. cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+done);
  253. cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+done);
  254. }
  255. if (cl->next_send_list.size())
  256. {
  257. cl->send_list.insert(cl->send_list.end(), cl->next_send_list.begin(), cl->next_send_list.end());
  258. cl->outbox.insert(cl->outbox.end(), cl->next_outbox.begin(), cl->next_outbox.end());
  259. cl->next_send_list.clear();
  260. cl->next_outbox.clear();
  261. }
  262. cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
  263. #ifdef WITH_RDMA
  264. if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING)
  265. {
  266. // FIXME: Do something better than just forgetting the FD
  267. // FIXME: Ignore pings during RDMA state transition
  268. if (log_level > 0)
  269. {
  270. fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd);
  271. }
  272. cl->peer_state = PEER_RDMA;
  273. tfd->set_fd_handler(cl->peer_fd, false, NULL);
  274. // Add the initial receive request
  275. try_recv_rdma(cl);
  276. }
  277. #endif
  278. }
  279. if (cl->write_state != 0)
  280. {
  281. write_ready_clients.push_back(cl->peer_fd);
  282. }
  283. }