Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

249 lines
7.8 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #define _XOPEN_SOURCE
  4. #include <limits.h>
  5. #include "messenger.h"
  6. void osd_messenger_t::outbox_push(osd_op_t *cur_op)
  7. {
  8. assert(cur_op->peer_fd);
  9. osd_client_t *cl = clients.at(cur_op->peer_fd);
  10. if (cur_op->op_type == OSD_OP_OUT)
  11. {
  12. clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);
  13. }
  14. else
  15. {
  16. // Check that operation actually belongs to this client
  17. // FIXME: Review if this is still needed
  18. bool found = false;
  19. for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++)
  20. {
  21. if (*it == cur_op)
  22. {
  23. found = true;
  24. cl->received_ops.erase(it, it+1);
  25. break;
  26. }
  27. }
  28. if (!found)
  29. {
  30. delete cur_op;
  31. return;
  32. }
  33. }
  34. auto & to_send_list = cl->write_msg.msg_iovlen ? cl->next_send_list : cl->send_list;
  35. auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox;
  36. if (cur_op->op_type == OSD_OP_IN)
  37. {
  38. measure_exec(cur_op);
  39. to_send_list.push_back((iovec){ .iov_base = cur_op->reply.buf, .iov_len = OSD_PACKET_SIZE });
  40. }
  41. else
  42. {
  43. to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE });
  44. cl->sent_ops[cur_op->req.hdr.id] = cur_op;
  45. }
  46. // Pre-defined send_lists
  47. if ((cur_op->op_type == OSD_OP_IN
  48. ? (cur_op->req.hdr.opcode == OSD_OP_READ ||
  49. cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
  50. cur_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
  51. cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
  52. : (cur_op->req.hdr.opcode == OSD_OP_WRITE ||
  53. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
  54. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE ||
  55. cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
  56. cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)) && cur_op->iov.count > 0)
  57. {
  58. to_outbox.push_back(NULL);
  59. // Bitmap
  60. if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ && cur_op->reply.sec_rw.attr_len > 0 ||
  61. (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) &&
  62. cur_op->req.sec_rw.attr_len > 0)
  63. {
  64. to_send_list.push_back((iovec){
  65. .iov_base = cur_op->bitmap,
  66. .iov_len = cur_op->reply.sec_rw.attr_len,
  67. });
  68. to_outbox.push_back(NULL);
  69. }
  70. for (int i = 0; i < cur_op->iov.count; i++)
  71. {
  72. assert(cur_op->iov.buf[i].iov_base);
  73. to_send_list.push_back(cur_op->iov.buf[i]);
  74. to_outbox.push_back(i == cur_op->iov.count-1 ? cur_op : NULL);
  75. }
  76. }
  77. else
  78. {
  79. to_outbox.push_back(cur_op);
  80. }
  81. if (!ringloop)
  82. {
  83. // FIXME: It's worse because it doesn't allow batching
  84. while (cl->outbox.size())
  85. {
  86. try_send(cl);
  87. }
  88. }
  89. else if (cl->write_msg.msg_iovlen > 0 || !try_send(cl))
  90. {
  91. if (cl->write_state == 0)
  92. {
  93. cl->write_state = CL_WRITE_READY;
  94. write_ready_clients.push_back(cur_op->peer_fd);
  95. }
  96. ringloop->wakeup();
  97. }
  98. }
  99. void osd_messenger_t::measure_exec(osd_op_t *cur_op)
  100. {
  101. // Measure execution latency
  102. if (!cur_op->tv_end.tv_sec)
  103. {
  104. clock_gettime(CLOCK_REALTIME, &cur_op->tv_end);
  105. }
  106. stats.op_stat_count[cur_op->req.hdr.opcode]++;
  107. if (!stats.op_stat_count[cur_op->req.hdr.opcode])
  108. {
  109. stats.op_stat_count[cur_op->req.hdr.opcode]++;
  110. stats.op_stat_sum[cur_op->req.hdr.opcode] = 0;
  111. stats.op_stat_bytes[cur_op->req.hdr.opcode] = 0;
  112. }
  113. stats.op_stat_sum[cur_op->req.hdr.opcode] += (
  114. (cur_op->tv_end.tv_sec - cur_op->tv_begin.tv_sec)*1000000 +
  115. (cur_op->tv_end.tv_nsec - cur_op->tv_begin.tv_nsec)/1000
  116. );
  117. if (cur_op->req.hdr.opcode == OSD_OP_READ ||
  118. cur_op->req.hdr.opcode == OSD_OP_WRITE)
  119. {
  120. stats.op_stat_bytes[cur_op->req.hdr.opcode] += cur_op->req.rw.len;
  121. }
  122. else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
  123. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
  124. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
  125. {
  126. stats.op_stat_bytes[cur_op->req.hdr.opcode] += cur_op->req.sec_rw.len;
  127. }
  128. }
  129. bool osd_messenger_t::try_send(osd_client_t *cl)
  130. {
  131. int peer_fd = cl->peer_fd;
  132. if (!cl->send_list.size() || cl->write_msg.msg_iovlen > 0)
  133. {
  134. return true;
  135. }
  136. if (ringloop && !use_sync_send_recv)
  137. {
  138. io_uring_sqe* sqe = ringloop->get_sqe();
  139. if (!sqe)
  140. {
  141. return false;
  142. }
  143. cl->write_msg.msg_iov = cl->send_list.data();
  144. cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
  145. cl->refs++;
  146. ring_data_t* data = ((ring_data_t*)sqe->user_data);
  147. data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
  148. my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
  149. }
  150. else
  151. {
  152. cl->write_msg.msg_iov = cl->send_list.data();
  153. cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
  154. cl->refs++;
  155. int result = sendmsg(peer_fd, &cl->write_msg, MSG_NOSIGNAL);
  156. if (result < 0)
  157. {
  158. result = -errno;
  159. }
  160. handle_send(result, cl);
  161. }
  162. return true;
  163. }
  164. void osd_messenger_t::send_replies()
  165. {
  166. for (int i = 0; i < write_ready_clients.size(); i++)
  167. {
  168. int peer_fd = write_ready_clients[i];
  169. auto cl_it = clients.find(peer_fd);
  170. if (cl_it != clients.end() && !try_send(cl_it->second))
  171. {
  172. write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
  173. return;
  174. }
  175. }
  176. write_ready_clients.clear();
  177. }
  178. void osd_messenger_t::handle_send(int result, osd_client_t *cl)
  179. {
  180. cl->write_msg.msg_iovlen = 0;
  181. cl->refs--;
  182. if (cl->peer_state == PEER_STOPPED)
  183. {
  184. if (!cl->refs)
  185. {
  186. delete cl;
  187. }
  188. return;
  189. }
  190. if (result < 0 && result != -EAGAIN)
  191. {
  192. // this is a client socket, so don't panic. just disconnect it
  193. printf("Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
  194. stop_client(cl->peer_fd);
  195. return;
  196. }
  197. if (result >= 0)
  198. {
  199. int done = 0;
  200. while (result > 0 && done < cl->send_list.size())
  201. {
  202. iovec & iov = cl->send_list[done];
  203. if (iov.iov_len <= result)
  204. {
  205. if (cl->outbox[done])
  206. {
  207. // Operation fully sent
  208. if (cl->outbox[done]->op_type == OSD_OP_IN)
  209. {
  210. delete cl->outbox[done];
  211. }
  212. }
  213. result -= iov.iov_len;
  214. done++;
  215. }
  216. else
  217. {
  218. iov.iov_len -= result;
  219. iov.iov_base += result;
  220. break;
  221. }
  222. }
  223. if (done > 0)
  224. {
  225. cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+done);
  226. cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+done);
  227. }
  228. if (cl->next_send_list.size())
  229. {
  230. cl->send_list.insert(cl->send_list.end(), cl->next_send_list.begin(), cl->next_send_list.end());
  231. cl->outbox.insert(cl->outbox.end(), cl->next_outbox.begin(), cl->next_outbox.end());
  232. cl->next_send_list.clear();
  233. cl->next_outbox.clear();
  234. }
  235. cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
  236. }
  237. if (cl->write_state != 0)
  238. {
  239. write_ready_clients.push_back(cl->peer_fd);
  240. }
  241. }