Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

399 lines
13 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
  3. #include "messenger.h"
  4. void osd_messenger_t::read_requests()
  5. {
  6. for (int i = 0; i < read_ready_clients.size(); i++)
  7. {
  8. int peer_fd = read_ready_clients[i];
  9. osd_client_t *cl = clients[peer_fd];
  10. if (cl->read_msg.msg_iovlen)
  11. {
  12. continue;
  13. }
  14. if (cl->read_remaining < receive_buffer_size)
  15. {
  16. cl->read_iov.iov_base = cl->in_buf;
  17. cl->read_iov.iov_len = receive_buffer_size;
  18. cl->read_msg.msg_iov = &cl->read_iov;
  19. cl->read_msg.msg_iovlen = 1;
  20. }
  21. else
  22. {
  23. cl->read_iov.iov_base = 0;
  24. cl->read_iov.iov_len = cl->read_remaining;
  25. cl->read_msg.msg_iov = cl->recv_list.get_iovec();
  26. cl->read_msg.msg_iovlen = cl->recv_list.get_size();
  27. }
  28. cl->refs++;
  29. if (ringloop && !use_sync_send_recv)
  30. {
  31. io_uring_sqe* sqe = ringloop->get_sqe();
  32. if (!sqe)
  33. {
  34. cl->read_msg.msg_iovlen = 0;
  35. read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
  36. return;
  37. }
  38. ring_data_t* data = ((ring_data_t*)sqe->user_data);
  39. data->callback = [this, cl](ring_data_t *data) { handle_read(data->res, cl); };
  40. my_uring_prep_recvmsg(sqe, peer_fd, &cl->read_msg, 0);
  41. }
  42. else
  43. {
  44. int result = recvmsg(peer_fd, &cl->read_msg, 0);
  45. if (result < 0)
  46. {
  47. result = -errno;
  48. }
  49. handle_read(result, cl);
  50. }
  51. }
  52. read_ready_clients.clear();
  53. }
  54. bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
  55. {
  56. bool ret = false;
  57. cl->read_msg.msg_iovlen = 0;
  58. cl->refs--;
  59. if (cl->peer_state == PEER_STOPPED)
  60. {
  61. if (cl->refs <= 0)
  62. {
  63. delete cl;
  64. }
  65. return false;
  66. }
  67. if (result <= 0 && result != -EAGAIN)
  68. {
  69. // this is a client socket, so don't panic on error. just disconnect it
  70. if (result != 0)
  71. {
  72. fprintf(stderr, "Client %d socket read error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
  73. }
  74. stop_client(cl->peer_fd);
  75. return false;
  76. }
  77. if (result == -EAGAIN || result < cl->read_iov.iov_len)
  78. {
  79. cl->read_ready--;
  80. if (cl->read_ready > 0)
  81. read_ready_clients.push_back(cl->peer_fd);
  82. }
  83. else
  84. {
  85. read_ready_clients.push_back(cl->peer_fd);
  86. }
  87. if (result > 0)
  88. {
  89. if (cl->read_iov.iov_base == cl->in_buf)
  90. {
  91. if (!handle_read_buffer(cl, cl->in_buf, result))
  92. {
  93. goto fin;
  94. }
  95. }
  96. else
  97. {
  98. // Long data
  99. cl->read_remaining -= result;
  100. cl->recv_list.eat(result);
  101. if (cl->recv_list.done >= cl->recv_list.count)
  102. {
  103. handle_finished_read(cl);
  104. }
  105. }
  106. if (result >= cl->read_iov.iov_len)
  107. {
  108. ret = true;
  109. }
  110. }
  111. fin:
  112. for (auto cb: set_immediate)
  113. {
  114. cb();
  115. }
  116. set_immediate.clear();
  117. return ret;
  118. }
  119. bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain)
  120. {
  121. // Compose operation(s) from the buffer
  122. while (remain > 0)
  123. {
  124. if (!cl->read_op)
  125. {
  126. cl->read_op = new osd_op_t;
  127. cl->read_op->peer_fd = cl->peer_fd;
  128. cl->read_op->op_type = OSD_OP_IN;
  129. cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
  130. cl->read_remaining = OSD_PACKET_SIZE;
  131. cl->read_state = CL_READ_HDR;
  132. }
  133. while (cl->recv_list.done < cl->recv_list.count && remain > 0)
  134. {
  135. iovec* cur = cl->recv_list.get_iovec();
  136. if (cur->iov_len > remain)
  137. {
  138. memcpy(cur->iov_base, curbuf, remain);
  139. cl->read_remaining -= remain;
  140. cur->iov_len -= remain;
  141. cur->iov_base += remain;
  142. remain = 0;
  143. }
  144. else
  145. {
  146. memcpy(cur->iov_base, curbuf, cur->iov_len);
  147. curbuf += cur->iov_len;
  148. cl->read_remaining -= cur->iov_len;
  149. remain -= cur->iov_len;
  150. cur->iov_len = 0;
  151. cl->recv_list.done++;
  152. }
  153. }
  154. if (cl->recv_list.done >= cl->recv_list.count)
  155. {
  156. if (!handle_finished_read(cl))
  157. {
  158. return false;
  159. }
  160. }
  161. }
  162. return true;
  163. }
  164. bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
  165. {
  166. cl->recv_list.reset();
  167. if (cl->read_state == CL_READ_HDR)
  168. {
  169. if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
  170. return handle_reply_hdr(cl);
  171. else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC)
  172. handle_op_hdr(cl);
  173. else
  174. {
  175. fprintf(stderr, "Received garbage: magic=%lx id=%lu opcode=%lx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd);
  176. stop_client(cl->peer_fd);
  177. return false;
  178. }
  179. }
  180. else if (cl->read_state == CL_READ_DATA)
  181. {
  182. // Operation is ready
  183. cl->received_ops.push_back(cl->read_op);
  184. set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); });
  185. cl->read_op = NULL;
  186. cl->read_state = 0;
  187. }
  188. else if (cl->read_state == CL_READ_REPLY_DATA)
  189. {
  190. // Reply is ready
  191. handle_reply_ready(cl->read_op);
  192. cl->read_op = NULL;
  193. cl->read_state = 0;
  194. }
  195. else
  196. {
  197. assert(0);
  198. }
  199. return true;
  200. }
  201. void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
  202. {
  203. osd_op_t *cur_op = cl->read_op;
  204. if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ)
  205. {
  206. cl->read_remaining = 0;
  207. }
  208. else if (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
  209. cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
  210. {
  211. if (cur_op->req.sec_rw.attr_len > 0)
  212. {
  213. if (cur_op->req.sec_rw.attr_len > sizeof(unsigned))
  214. cur_op->bitmap = cur_op->rmw_buf = malloc_or_die(cur_op->req.sec_rw.attr_len);
  215. else
  216. cur_op->bitmap = &cur_op->bmp_data;
  217. cl->recv_list.push_back(cur_op->bitmap, cur_op->req.sec_rw.attr_len);
  218. }
  219. if (cur_op->req.sec_rw.len > 0)
  220. {
  221. cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_rw.len);
  222. cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_rw.len);
  223. }
  224. cl->read_remaining = cur_op->req.sec_rw.len + cur_op->req.sec_rw.attr_len;
  225. }
  226. else if (cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
  227. cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)
  228. {
  229. if (cur_op->req.sec_stab.len > 0)
  230. {
  231. cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_stab.len);
  232. cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_stab.len);
  233. }
  234. cl->read_remaining = cur_op->req.sec_stab.len;
  235. }
  236. else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
  237. {
  238. if (cur_op->req.sec_read_bmp.len > 0)
  239. {
  240. cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_read_bmp.len);
  241. cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_read_bmp.len);
  242. }
  243. cl->read_remaining = cur_op->req.sec_read_bmp.len;
  244. }
  245. else if (cur_op->req.hdr.opcode == OSD_OP_READ)
  246. {
  247. cl->read_remaining = 0;
  248. }
  249. else if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
  250. {
  251. if (cur_op->req.rw.len > 0)
  252. {
  253. cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.rw.len);
  254. cl->recv_list.push_back(cur_op->buf, cur_op->req.rw.len);
  255. }
  256. cl->read_remaining = cur_op->req.rw.len;
  257. }
  258. else if (cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
  259. {
  260. if (cur_op->req.show_conf.json_len > 0)
  261. {
  262. cur_op->buf = malloc_or_die(cur_op->req.show_conf.json_len+1);
  263. ((uint8_t*)cur_op->buf)[cur_op->req.show_conf.json_len] = 0;
  264. cl->recv_list.push_back(cur_op->buf, cur_op->req.show_conf.json_len);
  265. }
  266. cl->read_remaining = cur_op->req.show_conf.json_len;
  267. }
  268. if (cl->read_remaining > 0)
  269. {
  270. // Read data
  271. cl->read_state = CL_READ_DATA;
  272. }
  273. else
  274. {
  275. // Operation is ready
  276. cl->received_ops.push_back(cur_op);
  277. set_immediate.push_back([this, cur_op]() { exec_op(cur_op); });
  278. cl->read_op = NULL;
  279. cl->read_state = 0;
  280. }
  281. }
  282. bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
  283. {
  284. auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id);
  285. if (req_it == cl->sent_ops.end())
  286. {
  287. // Command out of sync. Drop connection
  288. fprintf(stderr, "Client %d command out of sync: id %lu\n", cl->peer_fd, cl->read_op->req.hdr.id);
  289. stop_client(cl->peer_fd);
  290. return false;
  291. }
  292. osd_op_t *op = req_it->second;
  293. memcpy(op->reply.buf, cl->read_op->req.buf, OSD_PACKET_SIZE);
  294. cl->sent_ops.erase(req_it);
  295. if (op->reply.hdr.opcode == OSD_OP_SEC_READ || op->reply.hdr.opcode == OSD_OP_READ)
  296. {
  297. // Read data. In this case we assume that the buffer is preallocated by the caller (!)
  298. unsigned bmp_len = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->reply.sec_rw.attr_len : op->reply.rw.bitmap_len);
  299. unsigned expected_size = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->req.sec_rw.len : op->req.rw.len);
  300. if (op->reply.hdr.retval >= 0 && (op->reply.hdr.retval != expected_size || bmp_len > op->bitmap_len))
  301. {
  302. // Check reply length to not overflow the buffer
  303. fprintf(stderr, "Client %d read reply of different length: expected %u+%u, got %ld+%u\n",
  304. cl->peer_fd, expected_size, op->bitmap_len, op->reply.hdr.retval, bmp_len);
  305. cl->sent_ops[op->req.hdr.id] = op;
  306. stop_client(cl->peer_fd);
  307. return false;
  308. }
  309. if (op->reply.hdr.retval >= 0 && bmp_len > 0)
  310. {
  311. assert(op->bitmap);
  312. cl->recv_list.push_back(op->bitmap, bmp_len);
  313. }
  314. if (op->reply.hdr.retval > 0)
  315. {
  316. assert(op->iov.count > 0);
  317. cl->recv_list.append(op->iov);
  318. }
  319. cl->read_remaining = op->reply.hdr.retval + bmp_len;
  320. if (cl->read_remaining == 0)
  321. {
  322. goto reuse;
  323. }
  324. delete cl->read_op;
  325. cl->read_op = op;
  326. cl->read_state = CL_READ_REPLY_DATA;
  327. }
  328. else if (op->reply.hdr.opcode == OSD_OP_SEC_LIST && op->reply.hdr.retval > 0)
  329. {
  330. assert(!op->iov.count);
  331. delete cl->read_op;
  332. cl->read_op = op;
  333. cl->read_state = CL_READ_REPLY_DATA;
  334. cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval;
  335. op->buf = memalign_or_die(MEM_ALIGNMENT, cl->read_remaining);
  336. cl->recv_list.push_back(op->buf, cl->read_remaining);
  337. }
  338. else if (op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP && op->reply.hdr.retval > 0)
  339. {
  340. assert(!op->iov.count);
  341. delete cl->read_op;
  342. cl->read_op = op;
  343. cl->read_state = CL_READ_REPLY_DATA;
  344. cl->read_remaining = op->reply.hdr.retval;
  345. free(op->buf);
  346. op->buf = memalign_or_die(MEM_ALIGNMENT, cl->read_remaining);
  347. cl->recv_list.push_back(op->buf, cl->read_remaining);
  348. }
  349. else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0)
  350. {
  351. delete cl->read_op;
  352. cl->read_op = op;
  353. cl->read_state = CL_READ_REPLY_DATA;
  354. cl->read_remaining = op->reply.hdr.retval;
  355. free(op->buf);
  356. op->buf = malloc_or_die(op->reply.hdr.retval);
  357. cl->recv_list.push_back(op->buf, op->reply.hdr.retval);
  358. }
  359. else
  360. {
  361. reuse:
  362. // It's fine to reuse cl->read_op for the next reply
  363. handle_reply_ready(op);
  364. cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
  365. cl->read_remaining = OSD_PACKET_SIZE;
  366. cl->read_state = CL_READ_HDR;
  367. }
  368. return true;
  369. }
  370. void osd_messenger_t::handle_reply_ready(osd_op_t *op)
  371. {
  372. // Measure subop latency
  373. timespec tv_end;
  374. clock_gettime(CLOCK_REALTIME, &tv_end);
  375. stats.subop_stat_count[op->req.hdr.opcode]++;
  376. if (!stats.subop_stat_count[op->req.hdr.opcode])
  377. {
  378. stats.subop_stat_count[op->req.hdr.opcode]++;
  379. stats.subop_stat_sum[op->req.hdr.opcode] = 0;
  380. }
  381. stats.subop_stat_sum[op->req.hdr.opcode] += (
  382. (tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
  383. (tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
  384. );
  385. set_immediate.push_back([this, op]()
  386. {
  387. // Copy lambda to be unaffected by `delete op`
  388. std::function<void(osd_op_t*)>(op->callback)(op);
  389. });
  390. }