Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

277 lines
9.2 KiB

1 year ago
  1. #include "cluster_client.h"
  2. void cluster_client_t::read_requests()
  3. {
  4. for (int i = 0; i < read_ready_clients.size(); i++)
  5. {
  6. int peer_fd = read_ready_clients[i];
  7. auto & cl = clients[peer_fd];
  8. {
  9. timespec now;
  10. clock_gettime(CLOCK_REALTIME, &now);
  11. printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
  12. }
  13. io_uring_sqe* sqe = ringloop->get_sqe();
  14. if (!sqe)
  15. {
  16. read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
  17. return;
  18. }
  19. ring_data_t* data = ((ring_data_t*)sqe->user_data);
  20. if (!cl.read_op || cl.read_remaining < receive_buffer_size)
  21. {
  22. cl.read_iov.iov_base = cl.in_buf;
  23. cl.read_iov.iov_len = receive_buffer_size;
  24. }
  25. else
  26. {
  27. cl.read_iov.iov_base = cl.read_buf;
  28. cl.read_iov.iov_len = cl.read_remaining;
  29. }
  30. cl.read_msg.msg_iov = &cl.read_iov;
  31. cl.read_msg.msg_iovlen = 1;
  32. data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); };
  33. my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
  34. }
  35. read_ready_clients.clear();
  36. }
  37. void cluster_client_t::handle_read(ring_data_t *data, int peer_fd)
  38. {
  39. auto cl_it = clients.find(peer_fd);
  40. if (cl_it != clients.end())
  41. {
  42. auto & cl = cl_it->second;
  43. if (data->res < 0 && data->res != -EAGAIN)
  44. {
  45. // this is a client socket, so don't panic. just disconnect it
  46. printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res));
  47. stop_client(peer_fd);
  48. return;
  49. }
  50. if (data->res == -EAGAIN || cl.read_iov.iov_base == cl.in_buf && data->res < receive_buffer_size)
  51. {
  52. cl.read_ready--;
  53. if (cl.read_ready > 0)
  54. read_ready_clients.push_back(peer_fd);
  55. }
  56. else
  57. {
  58. read_ready_clients.push_back(peer_fd);
  59. }
  60. if (data->res == -EAGAIN)
  61. {
  62. return;
  63. }
  64. if (data->res > 0)
  65. {
  66. if (cl.read_iov.iov_base == cl.in_buf)
  67. {
  68. // Compose operation(s) from the buffer
  69. int remain = data->res;
  70. void *curbuf = cl.in_buf;
  71. while (remain > 0)
  72. {
  73. if (!cl.read_op)
  74. {
  75. cl.read_op = new osd_op_t;
  76. cl.read_op->peer_fd = peer_fd;
  77. cl.read_op->op_type = OSD_OP_IN;
  78. cl.read_buf = cl.read_op->req.buf;
  79. cl.read_remaining = OSD_PACKET_SIZE;
  80. cl.read_state = CL_READ_HDR;
  81. }
  82. if (cl.read_remaining > remain)
  83. {
  84. memcpy(cl.read_buf, curbuf, remain);
  85. cl.read_remaining -= remain;
  86. cl.read_buf += remain;
  87. remain = 0;
  88. if (cl.read_remaining <= 0)
  89. handle_finished_read(cl);
  90. }
  91. else
  92. {
  93. memcpy(cl.read_buf, curbuf, cl.read_remaining);
  94. curbuf += cl.read_remaining;
  95. remain -= cl.read_remaining;
  96. cl.read_remaining = 0;
  97. cl.read_buf = NULL;
  98. handle_finished_read(cl);
  99. }
  100. }
  101. }
  102. else
  103. {
  104. // Long data
  105. cl.read_remaining -= data->res;
  106. cl.read_buf += data->res;
  107. if (cl.read_remaining <= 0)
  108. {
  109. handle_finished_read(cl);
  110. }
  111. }
  112. }
  113. }
  114. }
  115. void cluster_client_t::handle_finished_read(osd_client_t & cl)
  116. {
  117. if (cl.read_state == CL_READ_HDR)
  118. {
  119. if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
  120. handle_reply_hdr(&cl);
  121. else
  122. handle_op_hdr(&cl);
  123. }
  124. else if (cl.read_state == CL_READ_DATA)
  125. {
  126. // Operation is ready
  127. exec_op(cl.read_op);
  128. cl.read_op = NULL;
  129. cl.read_state = 0;
  130. }
  131. else if (cl.read_state == CL_READ_REPLY_DATA)
  132. {
  133. // Reply is ready
  134. auto req_it = cl.sent_ops.find(cl.read_reply_id);
  135. osd_op_t *request = req_it->second;
  136. cl.sent_ops.erase(req_it);
  137. cl.read_reply_id = 0;
  138. delete cl.read_op;
  139. cl.read_op = NULL;
  140. cl.read_state = 0;
  141. // Measure subop latency
  142. timespec tv_end;
  143. clock_gettime(CLOCK_REALTIME, &tv_end);
  144. stats.subop_stat_count[request->req.hdr.opcode]++;
  145. if (!stats.subop_stat_count[request->req.hdr.opcode])
  146. {
  147. stats.subop_stat_count[request->req.hdr.opcode]++;
  148. stats.subop_stat_sum[request->req.hdr.opcode] = 0;
  149. }
  150. stats.subop_stat_sum[request->req.hdr.opcode] += (
  151. (tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 +
  152. (tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000
  153. );
  154. request->callback(request);
  155. }
  156. else
  157. {
  158. assert(0);
  159. }
  160. }
  161. void cluster_client_t::handle_op_hdr(osd_client_t *cl)
  162. {
  163. osd_op_t *cur_op = cl->read_op;
  164. if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_READ)
  165. {
  166. if (cur_op->req.sec_rw.len > 0)
  167. cur_op->buf = memalign(MEM_ALIGNMENT, cur_op->req.sec_rw.len);
  168. cl->read_remaining = 0;
  169. }
  170. else if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE)
  171. {
  172. if (cur_op->req.sec_rw.len > 0)
  173. cur_op->buf = memalign(MEM_ALIGNMENT, cur_op->req.sec_rw.len);
  174. cl->read_remaining = cur_op->req.sec_rw.len;
  175. }
  176. else if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ||
  177. cur_op->req.hdr.opcode == OSD_OP_SECONDARY_ROLLBACK)
  178. {
  179. if (cur_op->req.sec_stab.len > 0)
  180. cur_op->buf = memalign(MEM_ALIGNMENT, cur_op->req.sec_stab.len);
  181. cl->read_remaining = cur_op->req.sec_stab.len;
  182. }
  183. else if (cur_op->req.hdr.opcode == OSD_OP_READ)
  184. {
  185. if (cur_op->req.rw.len > 0)
  186. cur_op->buf = memalign(MEM_ALIGNMENT, cur_op->req.rw.len);
  187. cl->read_remaining = 0;
  188. }
  189. else if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
  190. {
  191. if (cur_op->req.rw.len > 0)
  192. cur_op->buf = memalign(MEM_ALIGNMENT, cur_op->req.rw.len);
  193. cl->read_remaining = cur_op->req.rw.len;
  194. }
  195. if (cl->read_remaining > 0)
  196. {
  197. // Read data
  198. cl->read_buf = cur_op->buf;
  199. cl->read_state = CL_READ_DATA;
  200. }
  201. else
  202. {
  203. // Operation is ready
  204. cl->read_op = NULL;
  205. cl->read_state = 0;
  206. exec_op(cur_op);
  207. }
  208. }
  209. void cluster_client_t::handle_reply_hdr(osd_client_t *cl)
  210. {
  211. osd_op_t *cur_op = cl->read_op;
  212. auto req_it = cl->sent_ops.find(cur_op->req.hdr.id);
  213. if (req_it == cl->sent_ops.end())
  214. {
  215. // Command out of sync. Drop connection
  216. printf("Client %d command out of sync: id %lu\n", cl->peer_fd, cur_op->req.hdr.id);
  217. stop_client(cl->peer_fd);
  218. return;
  219. }
  220. osd_op_t *op = req_it->second;
  221. memcpy(op->reply.buf, cur_op->req.buf, OSD_PACKET_SIZE);
  222. if (op->reply.hdr.opcode == OSD_OP_SECONDARY_READ &&
  223. op->reply.hdr.retval > 0)
  224. {
  225. // Read data. In this case we assume that the buffer is preallocated by the caller (!)
  226. assert(op->buf);
  227. cl->read_state = CL_READ_REPLY_DATA;
  228. cl->read_reply_id = op->req.hdr.id;
  229. cl->read_buf = op->buf;
  230. cl->read_remaining = op->reply.hdr.retval;
  231. }
  232. else if (op->reply.hdr.opcode == OSD_OP_SECONDARY_LIST &&
  233. op->reply.hdr.retval > 0)
  234. {
  235. op->buf = memalign(MEM_ALIGNMENT, sizeof(obj_ver_id) * op->reply.hdr.retval);
  236. cl->read_state = CL_READ_REPLY_DATA;
  237. cl->read_reply_id = op->req.hdr.id;
  238. cl->read_buf = op->buf;
  239. cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval;
  240. }
  241. else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG &&
  242. op->reply.hdr.retval > 0)
  243. {
  244. op->buf = malloc(op->reply.hdr.retval);
  245. cl->read_state = CL_READ_REPLY_DATA;
  246. cl->read_reply_id = op->req.hdr.id;
  247. cl->read_buf = op->buf;
  248. cl->read_remaining = op->reply.hdr.retval;
  249. }
  250. else
  251. {
  252. delete cl->read_op;
  253. cl->read_state = 0;
  254. cl->read_op = NULL;
  255. cl->sent_ops.erase(req_it);
  256. // Measure subop latency
  257. timespec tv_end;
  258. clock_gettime(CLOCK_REALTIME, &tv_end);
  259. stats.subop_stat_count[op->req.hdr.opcode]++;
  260. if (!stats.subop_stat_count[op->req.hdr.opcode])
  261. {
  262. stats.subop_stat_count[op->req.hdr.opcode]++;
  263. stats.subop_stat_sum[op->req.hdr.opcode] = 0;
  264. }
  265. stats.subop_stat_sum[op->req.hdr.opcode] += (
  266. (tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
  267. (tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
  268. );
  269. // Copy lambda to be unaffected by `delete op`
  270. std::function<void(osd_op_t*)>(op->callback)(op);
  271. }
  272. }