Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

400 lines
11 KiB

  1. #include <sys/socket.h>
  2. #include <sys/epoll.h>
  3. #include <sys/poll.h>
  4. #include <netinet/in.h>
  5. #include <netinet/tcp.h>
  6. #include <arpa/inet.h>
  7. #include "osd.h"
  8. static const char* osd_op_names[] = {
  9. "",
  10. "read",
  11. "write",
  12. "sync",
  13. "stabilize",
  14. "rollback",
  15. "delete",
  16. "sync_stab_all",
  17. "list",
  18. "show_config",
  19. "primary_read",
  20. "primary_write",
  21. "primary_sync",
  22. };
  23. osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop)
  24. {
  25. this->config = config;
  26. this->bs = bs;
  27. this->ringloop = ringloop;
  28. this->tick_tfd = new timerfd_interval(ringloop, 3, [this]()
  29. {
  30. for (int i = 0; i <= OSD_OP_MAX; i++)
  31. {
  32. if (op_stat_count[i] != 0)
  33. {
  34. printf("avg latency for op %d (%s): %ld us\n", i, osd_op_names[i], op_stat_sum[i]/op_stat_count[i]);
  35. op_stat_count[i] = 0;
  36. op_stat_sum[i] = 0;
  37. }
  38. }
  39. for (int i = 0; i <= OSD_OP_MAX; i++)
  40. {
  41. if (subop_stat_count[i] != 0)
  42. {
  43. printf("avg latency for subop %d (%s): %ld us\n", i, osd_op_names[i], subop_stat_sum[i]/subop_stat_count[i]);
  44. subop_stat_count[i] = 0;
  45. subop_stat_sum[i] = 0;
  46. }
  47. }
  48. if (send_stat_count != 0)
  49. {
  50. printf("avg latency to send subops with data: %ld us\n", send_stat_sum/send_stat_count);
  51. send_stat_count = 0;
  52. send_stat_sum = 0;
  53. }
  54. });
  55. this->bs_block_size = bs->get_block_size();
  56. // FIXME: use bitmap granularity instead
  57. this->bs_disk_alignment = bs->get_disk_alignment();
  58. bind_address = config["bind_address"];
  59. if (bind_address == "")
  60. bind_address = "0.0.0.0";
  61. bind_port = strtoull(config["bind_port"].c_str(), NULL, 10);
  62. if (!bind_port || bind_port > 65535)
  63. bind_port = 11203;
  64. osd_num = strtoull(config["osd_num"].c_str(), NULL, 10);
  65. if (!osd_num)
  66. throw std::runtime_error("osd_num is required in the configuration");
  67. run_primary = config["run_primary"] == "true" || config["run_primary"] == "1" || config["run_primary"] == "yes";
  68. if (run_primary)
  69. init_primary();
  70. listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  71. if (listen_fd < 0)
  72. {
  73. throw std::runtime_error(std::string("socket: ") + strerror(errno));
  74. }
  75. int enable = 1;
  76. setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
  77. sockaddr_in addr;
  78. int r;
  79. if ((r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1)
  80. {
  81. close(listen_fd);
  82. throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support"));
  83. }
  84. addr.sin_family = AF_INET;
  85. addr.sin_port = htons(bind_port);
  86. if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
  87. {
  88. close(listen_fd);
  89. throw std::runtime_error(std::string("bind: ") + strerror(errno));
  90. }
  91. if (listen(listen_fd, listen_backlog) < 0)
  92. {
  93. close(listen_fd);
  94. throw std::runtime_error(std::string("listen: ") + strerror(errno));
  95. }
  96. fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
  97. epoll_fd = epoll_create(1);
  98. if (epoll_fd < 0)
  99. {
  100. close(listen_fd);
  101. throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
  102. }
  103. epoll_event ev;
  104. ev.data.fd = listen_fd;
  105. ev.events = EPOLLIN | EPOLLET;
  106. if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0)
  107. {
  108. close(listen_fd);
  109. close(epoll_fd);
  110. throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
  111. }
  112. consumer.loop = [this]() { loop(); };
  113. ringloop->register_consumer(consumer);
  114. }
  115. osd_t::~osd_t()
  116. {
  117. delete tick_tfd;
  118. ringloop->unregister_consumer(consumer);
  119. close(epoll_fd);
  120. close(listen_fd);
  121. }
  122. osd_op_t::~osd_op_t()
  123. {
  124. if (bs_op)
  125. {
  126. delete bs_op;
  127. }
  128. if (op_data)
  129. {
  130. free(op_data);
  131. }
  132. if (rmw_buf)
  133. {
  134. free(rmw_buf);
  135. }
  136. if (buf)
  137. {
  138. // Note: reusing osd_op_t WILL currently lead to memory leaks
  139. // So we don't reuse it, but free it every time
  140. free(buf);
  141. }
  142. }
  143. bool osd_t::shutdown()
  144. {
  145. stopping = true;
  146. if (inflight_ops > 0)
  147. {
  148. return false;
  149. }
  150. return bs->is_safe_to_stop();
  151. }
  152. void osd_t::loop()
  153. {
  154. if (!wait_state)
  155. {
  156. handle_epoll_events();
  157. wait_state = 1;
  158. }
  159. handle_peers();
  160. read_requests();
  161. send_replies();
  162. ringloop->submit();
  163. }
  164. void osd_t::handle_epoll_events()
  165. {
  166. int nfds;
  167. epoll_event events[MAX_EPOLL_EVENTS];
  168. restart:
  169. nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
  170. for (int i = 0; i < nfds; i++)
  171. {
  172. if (events[i].data.fd == listen_fd)
  173. {
  174. // Accept new connections
  175. sockaddr_in addr;
  176. socklen_t peer_addr_size = sizeof(addr);
  177. int peer_fd;
  178. while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
  179. {
  180. char peer_str[256];
  181. printf("osd: new client %d: connection from %s port %d\n", peer_fd, inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
  182. int one = 1;
  183. setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
  184. clients[peer_fd] = {
  185. .peer_addr = addr,
  186. .peer_port = ntohs(addr.sin_port),
  187. .peer_fd = peer_fd,
  188. .peer_state = PEER_CONNECTED,
  189. };
  190. // Add FD to epoll
  191. epoll_event ev;
  192. ev.data.fd = peer_fd;
  193. ev.events = EPOLLET | EPOLLRDHUP;
  194. if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
  195. {
  196. throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
  197. }
  198. // Try to accept next connection
  199. peer_addr_size = sizeof(addr);
  200. }
  201. if (peer_fd == -1 && errno != EAGAIN)
  202. {
  203. throw std::runtime_error(std::string("accept: ") + strerror(errno));
  204. }
  205. }
  206. else
  207. {
  208. auto & cl = clients[events[i].data.fd];
  209. if (cl.peer_state == PEER_CONNECTING)
  210. {
  211. // Either OUT (connected) or HUP
  212. handle_connect_result(cl.peer_fd);
  213. }
  214. else if (events[i].events & EPOLLRDHUP)
  215. {
  216. // Stop client
  217. printf("osd: client %d disconnected\n", cl.peer_fd);
  218. stop_client(cl.peer_fd);
  219. }
  220. else
  221. {
  222. // Mark client as ready (i.e. some data is available)
  223. cl.read_ready++;
  224. if (cl.read_ready == 1)
  225. {
  226. read_ready_clients.push_back(cl.peer_fd);
  227. ringloop->wakeup();
  228. }
  229. }
  230. }
  231. }
  232. if (nfds == MAX_EPOLL_EVENTS)
  233. {
  234. goto restart;
  235. }
  236. io_uring_sqe *sqe = ringloop->get_sqe();
  237. if (!sqe)
  238. {
  239. throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET");
  240. }
  241. ring_data_t *data = ((ring_data_t*)sqe->user_data);
  242. my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
  243. data->callback = [this](ring_data_t *data)
  244. {
  245. if (data->res < 0)
  246. {
  247. throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
  248. }
  249. handle_epoll_events();
  250. };
  251. }
  252. void osd_t::cancel_osd_ops(osd_client_t & cl)
  253. {
  254. for (auto p: cl.sent_ops)
  255. {
  256. cancel_op(p.second);
  257. }
  258. cl.sent_ops.clear();
  259. for (auto op: cl.outbox)
  260. {
  261. cancel_op(op);
  262. }
  263. cl.outbox.clear();
  264. if (cl.write_op)
  265. {
  266. cancel_op(cl.write_op);
  267. cl.write_op = NULL;
  268. }
  269. }
  270. void osd_t::cancel_op(osd_op_t *op)
  271. {
  272. if (op->op_type == OSD_OP_OUT)
  273. {
  274. op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
  275. op->reply.hdr.id = op->req.hdr.id;
  276. op->reply.hdr.opcode = op->req.hdr.opcode;
  277. op->reply.hdr.retval = -EPIPE;
  278. op->callback(op);
  279. }
  280. else
  281. {
  282. delete op;
  283. }
  284. }
  285. void osd_t::stop_client(int peer_fd)
  286. {
  287. auto it = clients.find(peer_fd);
  288. if (it == clients.end())
  289. {
  290. return;
  291. }
  292. auto & cl = it->second;
  293. if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL) < 0)
  294. {
  295. throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
  296. }
  297. if (cl.osd_num)
  298. {
  299. // Cancel outbound operations
  300. cancel_osd_ops(cl);
  301. osd_peer_fds.erase(cl.osd_num);
  302. repeer_pgs(cl.osd_num, false);
  303. peering_state |= OSD_PEERING_PEERS;
  304. }
  305. if (cl.read_op)
  306. {
  307. delete cl.read_op;
  308. }
  309. for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
  310. {
  311. if (*rit == peer_fd)
  312. {
  313. read_ready_clients.erase(rit);
  314. break;
  315. }
  316. }
  317. for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++)
  318. {
  319. if (*wit == peer_fd)
  320. {
  321. write_ready_clients.erase(wit);
  322. break;
  323. }
  324. }
  325. clients.erase(it);
  326. close(peer_fd);
  327. }
  328. void osd_t::exec_op(osd_op_t *cur_op)
  329. {
  330. gettimeofday(&cur_op->tv_begin, NULL);
  331. if (stopping)
  332. {
  333. // Throw operation away
  334. delete cur_op;
  335. return;
  336. }
  337. cur_op->send_list.push_back(cur_op->reply.buf, OSD_PACKET_SIZE);
  338. if (cur_op->req.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
  339. cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX ||
  340. (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
  341. (cur_op->req.sec_rw.len > OSD_RW_MAX || cur_op->req.sec_rw.len % OSD_RW_ALIGN || cur_op->req.sec_rw.offset % OSD_RW_ALIGN) ||
  342. (cur_op->req.hdr.opcode == OSD_OP_READ || cur_op->req.hdr.opcode == OSD_OP_WRITE) &&
  343. (cur_op->req.rw.len > OSD_RW_MAX || cur_op->req.rw.len % OSD_RW_ALIGN || cur_op->req.rw.offset % OSD_RW_ALIGN))
  344. {
  345. // Bad command
  346. cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
  347. cur_op->reply.hdr.id = cur_op->req.hdr.id;
  348. cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
  349. cur_op->reply.hdr.retval = -EINVAL;
  350. outbox_push(this->clients[cur_op->peer_fd], cur_op);
  351. return;
  352. }
  353. inflight_ops++;
  354. if (cur_op->req.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL)
  355. {
  356. exec_sync_stab_all(cur_op);
  357. }
  358. else if (cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
  359. {
  360. exec_show_config(cur_op);
  361. }
  362. else if (cur_op->req.hdr.opcode == OSD_OP_READ)
  363. {
  364. continue_primary_read(cur_op);
  365. }
  366. else if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
  367. {
  368. continue_primary_write(cur_op);
  369. }
  370. else if (cur_op->req.hdr.opcode == OSD_OP_SYNC)
  371. {
  372. continue_primary_sync(cur_op);
  373. }
  374. else
  375. {
  376. exec_secondary(cur_op);
  377. }
  378. }