Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

432 lines
12 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #include <unistd.h>
  4. #include <fcntl.h>
  5. #include <sys/socket.h>
  6. #include <sys/epoll.h>
  7. #include <netinet/tcp.h>
  8. #include <stdexcept>
  9. #include "messenger.h"
  10. osd_op_t::~osd_op_t()
  11. {
  12. assert(!bs_op);
  13. assert(!op_data);
  14. if (rmw_buf)
  15. {
  16. free(rmw_buf);
  17. }
  18. if (buf)
  19. {
  20. // Note: reusing osd_op_t WILL currently lead to memory leaks
  21. // So we don't reuse it, but free it every time
  22. free(buf);
  23. }
  24. }
  25. osd_messenger_t::~osd_messenger_t()
  26. {
  27. while (clients.size() > 0)
  28. {
  29. stop_client(clients.begin()->first);
  30. }
  31. }
  32. void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
  33. {
  34. if (wanted_peers.find(peer_osd) == wanted_peers.end())
  35. {
  36. wanted_peers[peer_osd] = (osd_wanted_peer_t){
  37. .address_list = peer_state["addresses"],
  38. .port = (int)peer_state["port"].int64_value(),
  39. };
  40. }
  41. else
  42. {
  43. wanted_peers[peer_osd].address_list = peer_state["addresses"];
  44. wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
  45. }
  46. wanted_peers[peer_osd].address_changed = true;
  47. if (!wanted_peers[peer_osd].connecting &&
  48. (time(NULL) - wanted_peers[peer_osd].last_connect_attempt) >= peer_connect_interval)
  49. {
  50. try_connect_peer(peer_osd);
  51. }
  52. }
  53. void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
  54. {
  55. auto wp_it = wanted_peers.find(peer_osd);
  56. if (wp_it == wanted_peers.end())
  57. {
  58. return;
  59. }
  60. if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end())
  61. {
  62. wanted_peers.erase(peer_osd);
  63. return;
  64. }
  65. auto & wp = wp_it->second;
  66. if (wp.address_index >= wp.address_list.array_items().size())
  67. {
  68. return;
  69. }
  70. wp.cur_addr = wp.address_list[wp.address_index].string_value();
  71. wp.cur_port = wp.port;
  72. wp.connecting = true;
  73. try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
  74. }
  75. void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
  76. {
  77. assert(peer_osd != this->osd_num);
  78. struct sockaddr_in addr;
  79. int r;
  80. if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
  81. {
  82. on_connect_peer(peer_osd, -EINVAL);
  83. return;
  84. }
  85. addr.sin_family = AF_INET;
  86. addr.sin_port = htons(peer_port ? peer_port : 11203);
  87. int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
  88. if (peer_fd < 0)
  89. {
  90. on_connect_peer(peer_osd, -errno);
  91. return;
  92. }
  93. fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
  94. r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
  95. if (r < 0 && errno != EINPROGRESS)
  96. {
  97. close(peer_fd);
  98. on_connect_peer(peer_osd, -errno);
  99. return;
  100. }
  101. int timeout_id = -1;
  102. if (peer_connect_timeout > 0)
  103. {
  104. timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id)
  105. {
  106. osd_num_t peer_osd = clients.at(peer_fd)->osd_num;
  107. stop_client(peer_fd);
  108. on_connect_peer(peer_osd, -EIO);
  109. return;
  110. });
  111. }
  112. clients[peer_fd] = new osd_client_t((osd_client_t){
  113. .peer_addr = addr,
  114. .peer_port = peer_port,
  115. .peer_fd = peer_fd,
  116. .peer_state = PEER_CONNECTING,
  117. .connect_timeout_id = timeout_id,
  118. .osd_num = peer_osd,
  119. .in_buf = malloc_or_die(receive_buffer_size),
  120. });
  121. tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
  122. {
  123. // Either OUT (connected) or HUP
  124. handle_connect_epoll(peer_fd);
  125. });
  126. }
  127. void osd_messenger_t::handle_connect_epoll(int peer_fd)
  128. {
  129. auto cl = clients[peer_fd];
  130. if (cl->connect_timeout_id >= 0)
  131. {
  132. tfd->clear_timer(cl->connect_timeout_id);
  133. cl->connect_timeout_id = -1;
  134. }
  135. osd_num_t peer_osd = cl->osd_num;
  136. int result = 0;
  137. socklen_t result_len = sizeof(result);
  138. if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
  139. {
  140. result = errno;
  141. }
  142. if (result != 0)
  143. {
  144. stop_client(peer_fd);
  145. on_connect_peer(peer_osd, -result);
  146. return;
  147. }
  148. int one = 1;
  149. setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
  150. cl->peer_state = PEER_CONNECTED;
  151. tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
  152. {
  153. handle_peer_epoll(peer_fd, epoll_events);
  154. });
  155. // Check OSD number
  156. check_peer_config(cl);
  157. }
  158. void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)
  159. {
  160. // Mark client as ready (i.e. some data is available)
  161. if (epoll_events & EPOLLRDHUP)
  162. {
  163. // Stop client
  164. printf("[OSD %lu] client %d disconnected\n", this->osd_num, peer_fd);
  165. stop_client(peer_fd);
  166. }
  167. else if (epoll_events & EPOLLIN)
  168. {
  169. // Mark client as ready (i.e. some data is available)
  170. auto cl = clients[peer_fd];
  171. cl->read_ready++;
  172. if (cl->read_ready == 1)
  173. {
  174. read_ready_clients.push_back(cl->peer_fd);
  175. if (ringloop)
  176. ringloop->wakeup();
  177. else
  178. read_requests();
  179. }
  180. }
  181. }
  182. void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd)
  183. {
  184. auto & wp = wanted_peers.at(peer_osd);
  185. wp.connecting = false;
  186. if (peer_fd < 0)
  187. {
  188. printf("Failed to connect to peer OSD %lu address %s port %d: %s\n", peer_osd, wp.cur_addr.c_str(), wp.cur_port, strerror(-peer_fd));
  189. if (wp.address_changed)
  190. {
  191. wp.address_changed = false;
  192. wp.address_index = 0;
  193. try_connect_peer(peer_osd);
  194. }
  195. else if (wp.address_index < wp.address_list.array_items().size()-1)
  196. {
  197. // Try other addresses
  198. wp.address_index++;
  199. try_connect_peer(peer_osd);
  200. }
  201. else
  202. {
  203. // Retry again in <peer_connect_interval> seconds
  204. wp.last_connect_attempt = time(NULL);
  205. wp.address_index = 0;
  206. tfd->set_timer(1000*peer_connect_interval, false, [this, peer_osd](int)
  207. {
  208. try_connect_peer(peer_osd);
  209. });
  210. }
  211. return;
  212. }
  213. if (log_level > 0)
  214. {
  215. printf("[OSD %lu] Connected with peer OSD %lu (client %d)\n", osd_num, peer_osd, peer_fd);
  216. }
  217. wanted_peers.erase(peer_osd);
  218. repeer_pgs(peer_osd);
  219. }
  220. void osd_messenger_t::check_peer_config(osd_client_t *cl)
  221. {
  222. osd_op_t *op = new osd_op_t();
  223. op->op_type = OSD_OP_OUT;
  224. op->peer_fd = cl->peer_fd;
  225. op->req = (osd_any_op_t){
  226. .show_conf = {
  227. .header = {
  228. .magic = SECONDARY_OSD_OP_MAGIC,
  229. .id = this->next_subop_id++,
  230. .opcode = OSD_OP_SHOW_CONFIG,
  231. },
  232. },
  233. };
  234. op->callback = [this, cl](osd_op_t *op)
  235. {
  236. std::string json_err;
  237. json11::Json config;
  238. bool err = false;
  239. if (op->reply.hdr.retval < 0)
  240. {
  241. err = true;
  242. printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl->osd_num, op->reply.hdr.retval);
  243. }
  244. else
  245. {
  246. config = json11::Json::parse(std::string((char*)op->buf), json_err);
  247. if (json_err != "")
  248. {
  249. err = true;
  250. printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl->osd_num, json_err.c_str());
  251. }
  252. else if (config["osd_num"].uint64_value() != cl->osd_num)
  253. {
  254. err = true;
  255. printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl->osd_num);
  256. }
  257. }
  258. if (err)
  259. {
  260. osd_num_t osd_num = cl->osd_num;
  261. stop_client(op->peer_fd);
  262. on_connect_peer(osd_num, -1);
  263. delete op;
  264. return;
  265. }
  266. osd_peer_fds[cl->osd_num] = cl->peer_fd;
  267. on_connect_peer(cl->osd_num, cl->peer_fd);
  268. delete op;
  269. };
  270. outbox_push(op);
  271. }
  272. void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
  273. {
  274. for (auto p: cl->sent_ops)
  275. {
  276. cancel_op(p.second);
  277. }
  278. cl->sent_ops.clear();
  279. cl->outbox.clear();
  280. }
  281. void osd_messenger_t::cancel_op(osd_op_t *op)
  282. {
  283. if (op->op_type == OSD_OP_OUT)
  284. {
  285. op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
  286. op->reply.hdr.id = op->req.hdr.id;
  287. op->reply.hdr.opcode = op->req.hdr.opcode;
  288. op->reply.hdr.retval = -EPIPE;
  289. // Copy lambda to be unaffected by `delete op`
  290. std::function<void(osd_op_t*)>(op->callback)(op);
  291. }
  292. else
  293. {
  294. // This function is only called in stop_client(), so it's fine to destroy the operation
  295. delete op;
  296. }
  297. }
  298. void osd_messenger_t::stop_client(int peer_fd)
  299. {
  300. assert(peer_fd != 0);
  301. auto it = clients.find(peer_fd);
  302. if (it == clients.end())
  303. {
  304. return;
  305. }
  306. uint64_t repeer_osd = 0;
  307. osd_client_t *cl = it->second;
  308. if (cl->peer_state == PEER_CONNECTED)
  309. {
  310. if (cl->osd_num)
  311. {
  312. // Reload configuration from etcd when the connection is dropped
  313. if (log_level > 0)
  314. printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num);
  315. repeer_osd = cl->osd_num;
  316. }
  317. else
  318. {
  319. if (log_level > 0)
  320. printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd);
  321. }
  322. }
  323. cl->peer_state = PEER_STOPPED;
  324. clients.erase(it);
  325. tfd->set_fd_handler(peer_fd, false, NULL);
  326. if (cl->connect_timeout_id >= 0)
  327. {
  328. tfd->clear_timer(cl->connect_timeout_id);
  329. cl->connect_timeout_id = -1;
  330. }
  331. if (cl->osd_num)
  332. {
  333. osd_peer_fds.erase(cl->osd_num);
  334. }
  335. if (cl->read_op)
  336. {
  337. if (cl->read_op->callback)
  338. {
  339. cancel_op(cl->read_op);
  340. }
  341. else
  342. {
  343. delete cl->read_op;
  344. }
  345. cl->read_op = NULL;
  346. }
  347. for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
  348. {
  349. if (*rit == peer_fd)
  350. {
  351. read_ready_clients.erase(rit);
  352. break;
  353. }
  354. }
  355. for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++)
  356. {
  357. if (*wit == peer_fd)
  358. {
  359. write_ready_clients.erase(wit);
  360. break;
  361. }
  362. }
  363. free(cl->in_buf);
  364. cl->in_buf = NULL;
  365. close(peer_fd);
  366. if (repeer_osd)
  367. {
  368. // First repeer PGs as canceling OSD ops may push new operations
  369. // and we need correct PG states when we do that
  370. repeer_pgs(repeer_osd);
  371. }
  372. if (cl->osd_num)
  373. {
  374. // Cancel outbound operations
  375. cancel_osd_ops(cl);
  376. }
  377. if (cl->refs <= 0)
  378. {
  379. delete cl;
  380. }
  381. }
  382. void osd_messenger_t::accept_connections(int listen_fd)
  383. {
  384. // Accept new connections
  385. sockaddr_in addr;
  386. socklen_t peer_addr_size = sizeof(addr);
  387. int peer_fd;
  388. while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
  389. {
  390. assert(peer_fd != 0);
  391. char peer_str[256];
  392. printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd,
  393. inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
  394. fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
  395. int one = 1;
  396. setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
  397. clients[peer_fd] = new osd_client_t((osd_client_t){
  398. .peer_addr = addr,
  399. .peer_port = ntohs(addr.sin_port),
  400. .peer_fd = peer_fd,
  401. .peer_state = PEER_CONNECTED,
  402. .in_buf = malloc_or_die(receive_buffer_size),
  403. });
  404. // Add FD to epoll
  405. tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
  406. {
  407. handle_peer_epoll(peer_fd, epoll_events);
  408. });
  409. // Try to accept next connection
  410. peer_addr_size = sizeof(addr);
  411. }
  412. if (peer_fd == -1 && errno != EAGAIN)
  413. {
  414. throw std::runtime_error(std::string("accept: ") + strerror(errno));
  415. }
  416. }