Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

341 lines
11 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #include "messenger.h"
  4. void osd_messenger_t::read_requests()
  5. {
  6. for (int i = 0; i < read_ready_clients.size(); i++)
  7. {
  8. int peer_fd = read_ready_clients[i];
  9. osd_client_t *cl = clients[peer_fd];
  10. if (cl->read_remaining < receive_buffer_size)
  11. {
  12. cl->read_iov.iov_base = cl->in_buf;
  13. cl->read_iov.iov_len = receive_buffer_size;
  14. cl->read_msg.msg_iov = &cl->read_iov;
  15. cl->read_msg.msg_iovlen = 1;
  16. }
  17. else
  18. {
  19. cl->read_iov.iov_base = 0;
  20. cl->read_iov.iov_len = cl->read_remaining;
  21. cl->read_msg.msg_iov = cl->recv_list.get_iovec();
  22. cl->read_msg.msg_iovlen = cl->recv_list.get_size();
  23. }
  24. cl->refs++;
  25. if (ringloop && !use_sync_send_recv)
  26. {
  27. io_uring_sqe* sqe = ringloop->get_sqe();
  28. if (!sqe)
  29. {
  30. read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
  31. return;
  32. }
  33. ring_data_t* data = ((ring_data_t*)sqe->user_data);
  34. data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
  35. my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
  36. }
  37. else
  38. {
  39. int result = recvmsg(peer_fd, &cl->read_msg, 0);
  40. if (result < 0)
  41. {
  42. result = -errno;
  43. }
  44. handle_read(result, cl);
  45. }
  46. }
  47. read_ready_clients.clear();
  48. }
  49. bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
  50. {
  51. bool ret = false;
  52. cl->refs--;
  53. if (cl->peer_state == PEER_STOPPED)
  54. {
  55. if (cl->refs <= 0)
  56. {
  57. delete cl;
  58. }
  59. return false;
  60. }
  61. if (result <= 0 && result != -EAGAIN)
  62. {
  63. // this is a client socket, so don't panic on error. just disconnect it
  64. if (result != 0)
  65. {
  66. printf("Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
  67. }
  68. stop_client(cl->peer_fd);
  69. return false;
  70. }
  71. if (result == -EAGAIN || result < cl->read_iov.iov_len)
  72. {
  73. cl->read_ready--;
  74. if (cl->read_ready > 0)
  75. read_ready_clients.push_back(cl->peer_fd);
  76. }
  77. else
  78. {
  79. read_ready_clients.push_back(cl->peer_fd);
  80. }
  81. if (result > 0)
  82. {
  83. if (cl->read_iov.iov_base == cl->in_buf)
  84. {
  85. // Compose operation(s) from the buffer
  86. int remain = result;
  87. void *curbuf = cl->in_buf;
  88. while (remain > 0)
  89. {
  90. if (!cl->read_op)
  91. {
  92. cl->read_op = new osd_op_t;
  93. cl->read_op->peer_fd = cl->peer_fd;
  94. cl->read_op->op_type = OSD_OP_IN;
  95. cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
  96. cl->read_remaining = OSD_PACKET_SIZE;
  97. cl->read_state = CL_READ_HDR;
  98. }
  99. while (cl->recv_list.done < cl->recv_list.count && remain > 0)
  100. {
  101. iovec* cur = cl->recv_list.get_iovec();
  102. if (cur->iov_len > remain)
  103. {
  104. memcpy(cur->iov_base, curbuf, remain);
  105. cl->read_remaining -= remain;
  106. cur->iov_len -= remain;
  107. cur->iov_base += remain;
  108. remain = 0;
  109. }
  110. else
  111. {
  112. memcpy(cur->iov_base, curbuf, cur->iov_len);
  113. curbuf += cur->iov_len;
  114. cl->read_remaining -= cur->iov_len;
  115. remain -= cur->iov_len;
  116. cur->iov_len = 0;
  117. cl->recv_list.done++;
  118. }
  119. }
  120. if (cl->recv_list.done >= cl->recv_list.count)
  121. {
  122. if (!handle_finished_read(cl))
  123. {
  124. goto fin;
  125. }
  126. }
  127. }
  128. }
  129. else
  130. {
  131. // Long data
  132. cl->read_remaining -= result;
  133. cl->recv_list.eat(result);
  134. if (cl->recv_list.done >= cl->recv_list.count)
  135. {
  136. handle_finished_read(cl);
  137. }
  138. }
  139. if (result >= cl->read_iov.iov_len)
  140. {
  141. ret = true;
  142. }
  143. }
  144. fin:
  145. for (auto cb: set_immediate)
  146. {
  147. cb();
  148. }
  149. set_immediate.clear();
  150. return ret;
  151. }
  152. bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
  153. {
  154. cl->recv_list.reset();
  155. if (cl->read_state == CL_READ_HDR)
  156. {
  157. if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
  158. return handle_reply_hdr(cl);
  159. else
  160. handle_op_hdr(cl);
  161. }
  162. else if (cl->read_state == CL_READ_DATA)
  163. {
  164. // Operation is ready
  165. cl->received_ops.push_back(cl->read_op);
  166. set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); });
  167. cl->read_op = NULL;
  168. cl->read_state = 0;
  169. }
  170. else if (cl->read_state == CL_READ_REPLY_DATA)
  171. {
  172. // Reply is ready
  173. handle_reply_ready(cl->read_op);
  174. cl->read_op = NULL;
  175. cl->read_state = 0;
  176. }
  177. else
  178. {
  179. assert(0);
  180. }
  181. return true;
  182. }
  183. void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
  184. {
  185. osd_op_t *cur_op = cl->read_op;
  186. if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ)
  187. {
  188. cl->read_remaining = 0;
  189. }
  190. else if (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
  191. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
  192. {
  193. if (cur_op->req.sec_rw.attr_len > 0)
  194. {
  195. if (cur_op->req.sec_rw.attr_len > sizeof(unsigned))
  196. cur_op->bitmap = cur_op->rmw_buf = malloc_or_die(cur_op->req.sec_rw.attr_len);
  197. else
  198. cur_op->bitmap = &cur_op->bmp_data;
  199. cl->recv_list.push_back(cur_op->bitmap, cur_op->req.sec_rw.attr_len);
  200. }
  201. if (cur_op->req.sec_rw.len > 0)
  202. {
  203. cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_rw.len);
  204. cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_rw.len);
  205. }
  206. cl->read_remaining = cur_op->req.sec_rw.len + cur_op->req.sec_rw.attr_len;
  207. }
  208. else if (cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
  209. cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)
  210. {
  211. if (cur_op->req.sec_stab.len > 0)
  212. {
  213. cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_stab.len);
  214. cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_stab.len);
  215. }
  216. cl->read_remaining = cur_op->req.sec_stab.len;
  217. }
  218. else if (cur_op->req.hdr.opcode == OSD_OP_READ)
  219. {
  220. cl->read_remaining = 0;
  221. }
  222. else if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
  223. {
  224. if (cur_op->req.rw.len > 0)
  225. {
  226. cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.rw.len);
  227. cl->recv_list.push_back(cur_op->buf, cur_op->req.rw.len);
  228. }
  229. cl->read_remaining = cur_op->req.rw.len;
  230. }
  231. if (cl->read_remaining > 0)
  232. {
  233. // Read data
  234. cl->read_state = CL_READ_DATA;
  235. }
  236. else
  237. {
  238. // Operation is ready
  239. cl->received_ops.push_back(cur_op);
  240. set_immediate.push_back([this, cur_op]() { exec_op(cur_op); });
  241. cl->read_op = NULL;
  242. cl->read_state = 0;
  243. }
  244. }
  245. bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
  246. {
  247. auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id);
  248. if (req_it == cl->sent_ops.end())
  249. {
  250. // Command out of sync. Drop connection
  251. printf("Client %d command out of sync: id %lu\n", cl->peer_fd, cl->read_op->req.hdr.id);
  252. stop_client(cl->peer_fd);
  253. return false;
  254. }
  255. osd_op_t *op = req_it->second;
  256. memcpy(op->reply.buf, cl->read_op->req.buf, OSD_PACKET_SIZE);
  257. cl->sent_ops.erase(req_it);
  258. if ((op->reply.hdr.opcode == OSD_OP_SEC_READ || op->reply.hdr.opcode == OSD_OP_READ) &&
  259. op->reply.hdr.retval > 0)
  260. {
  261. // Read data. In this case we assume that the buffer is preallocated by the caller (!)
  262. assert(op->iov.count > 0);
  263. unsigned bmp_len = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->reply.sec_rw.attr_len : op->reply.rw.bitmap_len);
  264. if (op->reply.hdr.retval != (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->req.sec_rw.len : op->req.rw.len) ||
  265. bmp_len > op->bitmap_len)
  266. {
  267. // Check reply length to not overflow the buffer
  268. printf("Client %d read reply of different length\n", cl->peer_fd);
  269. cl->sent_ops[op->req.hdr.id] = op;
  270. stop_client(cl->peer_fd);
  271. return false;
  272. }
  273. if (bmp_len > 0)
  274. {
  275. cl->recv_list.push_back(op->bitmap, bmp_len);
  276. }
  277. cl->recv_list.append(op->iov);
  278. delete cl->read_op;
  279. cl->read_op = op;
  280. cl->read_state = CL_READ_REPLY_DATA;
  281. cl->read_remaining = op->reply.hdr.retval + (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->reply.sec_rw.attr_len : op->reply.rw.bitmap_len);
  282. }
  283. else if (op->reply.hdr.opcode == OSD_OP_SEC_LIST && op->reply.hdr.retval > 0)
  284. {
  285. assert(!op->iov.count);
  286. delete cl->read_op;
  287. cl->read_op = op;
  288. cl->read_state = CL_READ_REPLY_DATA;
  289. cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval;
  290. op->buf = memalign_or_die(MEM_ALIGNMENT, cl->read_remaining);
  291. cl->recv_list.push_back(op->buf, cl->read_remaining);
  292. }
  293. else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0)
  294. {
  295. assert(!op->iov.count);
  296. delete cl->read_op;
  297. cl->read_op = op;
  298. cl->read_state = CL_READ_REPLY_DATA;
  299. cl->read_remaining = op->reply.hdr.retval;
  300. op->buf = malloc_or_die(op->reply.hdr.retval);
  301. cl->recv_list.push_back(op->buf, op->reply.hdr.retval);
  302. }
  303. else
  304. {
  305. // It's fine to reuse cl->read_op for the next reply
  306. handle_reply_ready(op);
  307. cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
  308. cl->read_remaining = OSD_PACKET_SIZE;
  309. cl->read_state = CL_READ_HDR;
  310. }
  311. return true;
  312. }
  313. void osd_messenger_t::handle_reply_ready(osd_op_t *op)
  314. {
  315. // Measure subop latency
  316. timespec tv_end;
  317. clock_gettime(CLOCK_REALTIME, &tv_end);
  318. stats.subop_stat_count[op->req.hdr.opcode]++;
  319. if (!stats.subop_stat_count[op->req.hdr.opcode])
  320. {
  321. stats.subop_stat_count[op->req.hdr.opcode]++;
  322. stats.subop_stat_sum[op->req.hdr.opcode] = 0;
  323. }
  324. stats.subop_stat_sum[op->req.hdr.opcode] += (
  325. (tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
  326. (tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
  327. );
  328. set_immediate.push_back([this, op]()
  329. {
  330. // Copy lambda to be unaffected by `delete op`
  331. std::function<void(osd_op_t*)>(op->callback)(op);
  332. });
  333. }