Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

574 lines
19 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
  3. #include <unistd.h>
  4. #include <fcntl.h>
  5. #include <sys/socket.h>
  6. #include <sys/epoll.h>
  7. #include <netinet/tcp.h>
  8. #include <stdexcept>
  9. #include "messenger.h"
  10. void osd_messenger_t::init()
  11. {
  12. #ifdef WITH_RDMA
  13. if (use_rdma)
  14. {
  15. rdma_context = msgr_rdma_context_t::create(
  16. rdma_device != "" ? rdma_device.c_str() : NULL,
  17. rdma_port_num, rdma_gid_index, rdma_mtu
  18. );
  19. if (!rdma_context)
  20. {
  21. fprintf(stderr, "[OSD %lu] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num);
  22. }
  23. else
  24. {
  25. rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge
  26. ? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
  27. fprintf(stderr, "[OSD %lu] RDMA initialized successfully\n", osd_num);
  28. fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
  29. tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
  30. {
  31. handle_rdma_events();
  32. });
  33. handle_rdma_events();
  34. }
  35. }
  36. #endif
  37. keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
  38. {
  39. std::vector<int> to_stop;
  40. std::vector<osd_op_t*> to_ping;
  41. for (auto cl_it = clients.begin(); cl_it != clients.end(); cl_it++)
  42. {
  43. auto cl = cl_it->second;
  44. if (!cl->osd_num || cl->peer_state != PEER_CONNECTED && cl->peer_state != PEER_RDMA)
  45. {
  46. // Do not run keepalive on regular clients
  47. continue;
  48. }
  49. if (cl->ping_time_remaining > 0)
  50. {
  51. cl->ping_time_remaining--;
  52. if (!cl->ping_time_remaining)
  53. {
  54. // Ping timed out, stop the client
  55. fprintf(stderr, "Ping timed out for OSD %lu (client %d), disconnecting peer\n", cl->osd_num, cl->peer_fd);
  56. to_stop.push_back(cl->peer_fd);
  57. }
  58. }
  59. else if (cl->idle_time_remaining > 0)
  60. {
  61. cl->idle_time_remaining--;
  62. if (!cl->idle_time_remaining)
  63. {
  64. // Connection is idle for <osd_idle_time>, send ping
  65. osd_op_t *op = new osd_op_t();
  66. op->op_type = OSD_OP_OUT;
  67. op->peer_fd = cl->peer_fd;
  68. op->req = (osd_any_op_t){
  69. .hdr = {
  70. .magic = SECONDARY_OSD_OP_MAGIC,
  71. .id = this->next_subop_id++,
  72. .opcode = OSD_OP_PING,
  73. },
  74. };
  75. op->callback = [this, cl](osd_op_t *op)
  76. {
  77. int fail_fd = (op->reply.hdr.retval != 0 ? op->peer_fd : -1);
  78. cl->ping_time_remaining = 0;
  79. delete op;
  80. if (fail_fd >= 0)
  81. {
  82. fprintf(stderr, "Ping failed for OSD %lu (client %d), disconnecting peer\n", cl->osd_num, cl->peer_fd);
  83. stop_client(fail_fd, true);
  84. }
  85. };
  86. to_ping.push_back(op);
  87. cl->ping_time_remaining = osd_ping_timeout;
  88. cl->idle_time_remaining = osd_idle_timeout;
  89. }
  90. }
  91. else
  92. {
  93. cl->idle_time_remaining = osd_idle_timeout;
  94. }
  95. }
  96. // Don't stop clients while a 'clients' iterator is still active
  97. for (int peer_fd: to_stop)
  98. {
  99. stop_client(peer_fd, true);
  100. }
  101. for (auto op: to_ping)
  102. {
  103. outbox_push(op);
  104. }
  105. });
  106. }
  107. osd_messenger_t::~osd_messenger_t()
  108. {
  109. if (keepalive_timer_id >= 0)
  110. {
  111. tfd->clear_timer(keepalive_timer_id);
  112. keepalive_timer_id = -1;
  113. }
  114. while (clients.size() > 0)
  115. {
  116. stop_client(clients.begin()->first, true);
  117. }
  118. #ifdef WITH_RDMA
  119. if (rdma_context)
  120. {
  121. delete rdma_context;
  122. }
  123. #endif
  124. }
  125. void osd_messenger_t::parse_config(const json11::Json & config)
  126. {
  127. #ifdef WITH_RDMA
  128. if (!config["use_rdma"].is_null())
  129. {
  130. // RDMA is on by default in RDMA-enabled builds
  131. this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
  132. }
  133. this->rdma_device = config["rdma_device"].string_value();
  134. this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
  135. if (!this->rdma_port_num)
  136. this->rdma_port_num = 1;
  137. this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
  138. this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
  139. this->rdma_max_sge = config["rdma_max_sge"].uint64_value();
  140. if (!this->rdma_max_sge)
  141. this->rdma_max_sge = 128;
  142. this->rdma_max_send = config["rdma_max_send"].uint64_value();
  143. if (!this->rdma_max_send)
  144. this->rdma_max_send = 32;
  145. this->rdma_max_recv = config["rdma_max_recv"].uint64_value();
  146. if (!this->rdma_max_recv)
  147. this->rdma_max_recv = 8;
  148. this->rdma_max_msg = config["rdma_max_msg"].uint64_value();
  149. if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
  150. this->rdma_max_msg = 1024*1024;
  151. #endif
  152. this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value();
  153. if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024)
  154. this->receive_buffer_size = 65536;
  155. this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
  156. config["use_sync_send_recv"].uint64_value();
  157. this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
  158. if (!this->peer_connect_interval)
  159. this->peer_connect_interval = 5;
  160. this->peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
  161. if (!this->peer_connect_timeout)
  162. this->peer_connect_timeout = 5;
  163. this->osd_idle_timeout = config["osd_idle_timeout"].uint64_value();
  164. if (!this->osd_idle_timeout)
  165. this->osd_idle_timeout = 5;
  166. this->osd_ping_timeout = config["osd_ping_timeout"].uint64_value();
  167. if (!this->osd_ping_timeout)
  168. this->osd_ping_timeout = 5;
  169. this->log_level = config["log_level"].uint64_value();
  170. }
  171. void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
  172. {
  173. if (wanted_peers.find(peer_osd) == wanted_peers.end())
  174. {
  175. wanted_peers[peer_osd] = (osd_wanted_peer_t){
  176. .address_list = peer_state["addresses"],
  177. .port = (int)peer_state["port"].int64_value(),
  178. };
  179. }
  180. else
  181. {
  182. wanted_peers[peer_osd].address_list = peer_state["addresses"];
  183. wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
  184. }
  185. wanted_peers[peer_osd].address_changed = true;
  186. try_connect_peer(peer_osd);
  187. }
  188. void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
  189. {
  190. auto wp_it = wanted_peers.find(peer_osd);
  191. if (wp_it == wanted_peers.end() || wp_it->second.connecting ||
  192. (time(NULL) - wp_it->second.last_connect_attempt) < peer_connect_interval)
  193. {
  194. return;
  195. }
  196. if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end())
  197. {
  198. wanted_peers.erase(peer_osd);
  199. return;
  200. }
  201. auto & wp = wp_it->second;
  202. if (wp.address_index >= wp.address_list.array_items().size())
  203. {
  204. return;
  205. }
  206. wp.cur_addr = wp.address_list[wp.address_index].string_value();
  207. wp.cur_port = wp.port;
  208. wp.connecting = true;
  209. try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
  210. }
  211. void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
  212. {
  213. assert(peer_osd != this->osd_num);
  214. struct sockaddr_in addr;
  215. int r;
  216. if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
  217. {
  218. on_connect_peer(peer_osd, -EINVAL);
  219. return;
  220. }
  221. addr.sin_family = AF_INET;
  222. addr.sin_port = htons(peer_port ? peer_port : 11203);
  223. int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
  224. if (peer_fd < 0)
  225. {
  226. on_connect_peer(peer_osd, -errno);
  227. return;
  228. }
  229. fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
  230. r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
  231. if (r < 0 && errno != EINPROGRESS)
  232. {
  233. close(peer_fd);
  234. on_connect_peer(peer_osd, -errno);
  235. return;
  236. }
  237. clients[peer_fd] = new osd_client_t();
  238. clients[peer_fd]->peer_addr = addr;
  239. clients[peer_fd]->peer_port = peer_port;
  240. clients[peer_fd]->peer_fd = peer_fd;
  241. clients[peer_fd]->peer_state = PEER_CONNECTING;
  242. clients[peer_fd]->connect_timeout_id = -1;
  243. clients[peer_fd]->osd_num = peer_osd;
  244. clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
  245. tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
  246. {
  247. // Either OUT (connected) or HUP
  248. handle_connect_epoll(peer_fd);
  249. });
  250. if (peer_connect_timeout > 0)
  251. {
  252. clients[peer_fd]->connect_timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id)
  253. {
  254. osd_num_t peer_osd = clients.at(peer_fd)->osd_num;
  255. stop_client(peer_fd, true);
  256. on_connect_peer(peer_osd, -EIO);
  257. return;
  258. });
  259. }
  260. }
  261. void osd_messenger_t::handle_connect_epoll(int peer_fd)
  262. {
  263. auto cl = clients[peer_fd];
  264. if (cl->connect_timeout_id >= 0)
  265. {
  266. tfd->clear_timer(cl->connect_timeout_id);
  267. cl->connect_timeout_id = -1;
  268. }
  269. osd_num_t peer_osd = cl->osd_num;
  270. int result = 0;
  271. socklen_t result_len = sizeof(result);
  272. if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
  273. {
  274. result = errno;
  275. }
  276. if (result != 0)
  277. {
  278. stop_client(peer_fd, true);
  279. on_connect_peer(peer_osd, -result);
  280. return;
  281. }
  282. int one = 1;
  283. setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
  284. cl->peer_state = PEER_CONNECTED;
  285. tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
  286. {
  287. handle_peer_epoll(peer_fd, epoll_events);
  288. });
  289. // Check OSD number
  290. check_peer_config(cl);
  291. }
  292. void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)
  293. {
  294. // Mark client as ready (i.e. some data is available)
  295. if (epoll_events & EPOLLRDHUP)
  296. {
  297. // Stop client
  298. fprintf(stderr, "[OSD %lu] client %d disconnected\n", this->osd_num, peer_fd);
  299. stop_client(peer_fd, true);
  300. }
  301. else if (epoll_events & EPOLLIN)
  302. {
  303. // Mark client as ready (i.e. some data is available)
  304. auto cl = clients[peer_fd];
  305. cl->read_ready++;
  306. if (cl->read_ready == 1)
  307. {
  308. read_ready_clients.push_back(cl->peer_fd);
  309. if (ringloop)
  310. ringloop->wakeup();
  311. else
  312. read_requests();
  313. }
  314. }
  315. }
  316. void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd)
  317. {
  318. auto & wp = wanted_peers.at(peer_osd);
  319. wp.connecting = false;
  320. if (peer_fd < 0)
  321. {
  322. fprintf(stderr, "Failed to connect to peer OSD %lu address %s port %d: %s\n", peer_osd, wp.cur_addr.c_str(), wp.cur_port, strerror(-peer_fd));
  323. if (wp.address_changed)
  324. {
  325. wp.address_changed = false;
  326. wp.address_index = 0;
  327. try_connect_peer(peer_osd);
  328. }
  329. else if (wp.address_index < wp.address_list.array_items().size()-1)
  330. {
  331. // Try other addresses
  332. wp.address_index++;
  333. try_connect_peer(peer_osd);
  334. }
  335. else
  336. {
  337. // Retry again in <peer_connect_interval> seconds
  338. wp.last_connect_attempt = time(NULL);
  339. wp.address_index = 0;
  340. tfd->set_timer(1000*peer_connect_interval, false, [this, peer_osd](int)
  341. {
  342. try_connect_peer(peer_osd);
  343. });
  344. }
  345. return;
  346. }
  347. if (log_level > 0)
  348. {
  349. fprintf(stderr, "[OSD %lu] Connected with peer OSD %lu (client %d)\n", osd_num, peer_osd, peer_fd);
  350. }
  351. wanted_peers.erase(peer_osd);
  352. repeer_pgs(peer_osd);
  353. }
  354. void osd_messenger_t::check_peer_config(osd_client_t *cl)
  355. {
  356. osd_op_t *op = new osd_op_t();
  357. op->op_type = OSD_OP_OUT;
  358. op->peer_fd = cl->peer_fd;
  359. op->req = (osd_any_op_t){
  360. .show_conf = {
  361. .header = {
  362. .magic = SECONDARY_OSD_OP_MAGIC,
  363. .id = this->next_subop_id++,
  364. .opcode = OSD_OP_SHOW_CONFIG,
  365. },
  366. },
  367. };
  368. #ifdef WITH_RDMA
  369. if (rdma_context)
  370. {
  371. cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
  372. if (cl->rdma_conn)
  373. {
  374. json11::Json payload = json11::Json::object {
  375. { "connect_rdma", cl->rdma_conn->addr.to_string() },
  376. { "rdma_max_msg", cl->rdma_conn->max_msg },
  377. };
  378. std::string payload_str = payload.dump();
  379. op->req.show_conf.json_len = payload_str.size();
  380. op->buf = malloc_or_die(payload_str.size());
  381. op->iov.push_back(op->buf, payload_str.size());
  382. memcpy(op->buf, payload_str.c_str(), payload_str.size());
  383. }
  384. }
  385. #endif
  386. op->callback = [this, cl](osd_op_t *op)
  387. {
  388. std::string json_err;
  389. json11::Json config;
  390. bool err = false;
  391. if (op->reply.hdr.retval < 0)
  392. {
  393. err = true;
  394. fprintf(stderr, "Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl->osd_num, op->reply.hdr.retval);
  395. }
  396. else
  397. {
  398. config = json11::Json::parse(std::string((char*)op->buf), json_err);
  399. if (json_err != "")
  400. {
  401. err = true;
  402. fprintf(stderr, "Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl->osd_num, json_err.c_str());
  403. }
  404. else if (config["osd_num"].uint64_value() != cl->osd_num)
  405. {
  406. err = true;
  407. fprintf(stderr, "Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl->osd_num);
  408. }
  409. else if (config["protocol_version"].uint64_value() != OSD_PROTOCOL_VERSION)
  410. {
  411. err = true;
  412. fprintf(
  413. stderr, "OSD %lu protocol version is %lu, but only version %u is supported.\n"
  414. " If you need to upgrade from 0.5.x please request it via the issue tracker.\n",
  415. cl->osd_num, config["protocol_version"].uint64_value(), OSD_PROTOCOL_VERSION
  416. );
  417. }
  418. }
  419. if (err)
  420. {
  421. osd_num_t peer_osd = cl->osd_num;
  422. stop_client(op->peer_fd);
  423. on_connect_peer(peer_osd, -1);
  424. delete op;
  425. return;
  426. }
  427. #ifdef WITH_RDMA
  428. if (config["rdma_address"].is_string())
  429. {
  430. msgr_rdma_address_t addr;
  431. if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) ||
  432. cl->rdma_conn->connect(&addr) != 0)
  433. {
  434. fprintf(
  435. stderr, "Failed to connect to OSD %lu (address %s) using RDMA\n",
  436. cl->osd_num, config["rdma_address"].string_value().c_str()
  437. );
  438. delete cl->rdma_conn;
  439. cl->rdma_conn = NULL;
  440. // FIXME: Keep TCP connection in this case
  441. osd_num_t peer_osd = cl->osd_num;
  442. stop_client(cl->peer_fd);
  443. on_connect_peer(peer_osd, -1);
  444. delete op;
  445. return;
  446. }
  447. else
  448. {
  449. uint64_t server_max_msg = config["rdma_max_msg"].uint64_value();
  450. if (cl->rdma_conn->max_msg > server_max_msg)
  451. {
  452. cl->rdma_conn->max_msg = server_max_msg;
  453. }
  454. if (log_level > 0)
  455. {
  456. fprintf(stderr, "Connected to OSD %lu using RDMA\n", cl->osd_num);
  457. }
  458. cl->peer_state = PEER_RDMA;
  459. tfd->set_fd_handler(cl->peer_fd, false, NULL);
  460. // Add the initial receive request
  461. try_recv_rdma(cl);
  462. }
  463. }
  464. #endif
  465. osd_peer_fds[cl->osd_num] = cl->peer_fd;
  466. on_connect_peer(cl->osd_num, cl->peer_fd);
  467. delete op;
  468. };
  469. outbox_push(op);
  470. }
  471. void osd_messenger_t::accept_connections(int listen_fd)
  472. {
  473. // Accept new connections
  474. sockaddr_in addr;
  475. socklen_t peer_addr_size = sizeof(addr);
  476. int peer_fd;
  477. while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
  478. {
  479. assert(peer_fd != 0);
  480. char peer_str[256];
  481. fprintf(stderr, "[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd,
  482. inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
  483. fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
  484. int one = 1;
  485. setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
  486. clients[peer_fd] = new osd_client_t();
  487. clients[peer_fd]->peer_addr = addr;
  488. clients[peer_fd]->peer_port = ntohs(addr.sin_port);
  489. clients[peer_fd]->peer_fd = peer_fd;
  490. clients[peer_fd]->peer_state = PEER_CONNECTED;
  491. clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
  492. // Add FD to epoll
  493. tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
  494. {
  495. handle_peer_epoll(peer_fd, epoll_events);
  496. });
  497. // Try to accept next connection
  498. peer_addr_size = sizeof(addr);
  499. }
  500. if (peer_fd == -1 && errno != EAGAIN)
  501. {
  502. throw std::runtime_error(std::string("accept: ") + strerror(errno));
  503. }
  504. }
  505. #ifdef WITH_RDMA
  506. bool osd_messenger_t::is_rdma_enabled()
  507. {
  508. return rdma_context != NULL;
  509. }
  510. #endif
  511. json11::Json osd_messenger_t::read_config(const json11::Json & config)
  512. {
  513. const char *config_path = config["config_path"].string_value() != ""
  514. ? config["config_path"].string_value().c_str() : VITASTOR_CONFIG_PATH;
  515. int fd = open(config_path, O_RDONLY);
  516. if (fd < 0)
  517. {
  518. if (errno != ENOENT)
  519. fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno));
  520. return config;
  521. }
  522. struct stat st;
  523. if (fstat(fd, &st) != 0)
  524. {
  525. fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno));
  526. close(fd);
  527. return config;
  528. }
  529. std::string buf;
  530. buf.resize(st.st_size);
  531. int done = 0;
  532. while (done < st.st_size)
  533. {
  534. int r = read(fd, (void*)buf.data()+done, st.st_size-done);
  535. if (r < 0)
  536. {
  537. fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno));
  538. close(fd);
  539. return config;
  540. }
  541. done += r;
  542. }
  543. close(fd);
  544. std::string json_err;
  545. json11::Json::object file_config = json11::Json::parse(buf, json_err).object_items();
  546. if (json_err != "")
  547. {
  548. fprintf(stderr, "Invalid JSON in %s: %s\n", config_path, json_err.c_str());
  549. return config;
  550. }
  551. file_config.erase("config_path");
  552. file_config.erase("osd_num");
  553. for (auto kv: config.object_items())
  554. {
  555. file_config[kv.first] = kv.second;
  556. }
  557. return file_config;
  558. }