Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

678 lines
19 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. // Similar to qemu-nbd, but sets timeout and uses io_uring
  4. #include <linux/nbd.h>
  5. #include <sys/ioctl.h>
  6. #include <sys/socket.h>
  7. #include <netinet/in.h>
  8. #include <netinet/tcp.h>
  9. #include <arpa/inet.h>
  10. #include <sys/un.h>
  11. #include <unistd.h>
  12. #include <fcntl.h>
  13. #include <signal.h>
  14. #include "epoll_manager.h"
  15. #include "cluster_client.h"
  16. #ifndef MSG_ZEROCOPY
  17. #define MSG_ZEROCOPY 0
  18. #endif
  19. const char *exe_name = NULL;
  20. class nbd_proxy
  21. {
  22. protected:
  23. uint64_t inode = 0;
  24. ring_loop_t *ringloop = NULL;
  25. epoll_manager_t *epmgr = NULL;
  26. cluster_client_t *cli = NULL;
  27. ring_consumer_t consumer;
  28. std::vector<iovec> send_list, next_send_list;
  29. std::vector<void*> to_free;
  30. int nbd_fd = -1;
  31. void *recv_buf = NULL;
  32. int receive_buffer_size = 9000;
  33. nbd_request cur_req;
  34. cluster_op_t *cur_op = NULL;
  35. void *cur_buf = NULL;
  36. int cur_left = 0;
  37. int read_state = 0;
  38. int read_ready = 0;
  39. msghdr read_msg = { 0 }, send_msg = { 0 };
  40. iovec read_iov = { 0 };
  41. public:
  42. static json11::Json::object parse_args(int narg, const char *args[])
  43. {
  44. json11::Json::object cfg;
  45. int pos = 0;
  46. for (int i = 1; i < narg; i++)
  47. {
  48. if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
  49. {
  50. help();
  51. }
  52. else if (args[i][0] == '-' && args[i][1] == '-')
  53. {
  54. const char *opt = args[i]+2;
  55. cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i];
  56. }
  57. else if (pos == 0)
  58. {
  59. cfg["command"] = args[i];
  60. pos++;
  61. }
  62. else if (pos == 1 && (cfg["command"] == "map" || cfg["command"] == "unmap"))
  63. {
  64. int n = 0;
  65. if (sscanf(args[i], "/dev/nbd%d", &n) > 0)
  66. cfg["dev_num"] = n;
  67. else
  68. cfg["dev_num"] = args[i];
  69. pos++;
  70. }
  71. }
  72. return cfg;
  73. }
  74. void exec(json11::Json cfg)
  75. {
  76. if (cfg["command"] == "map")
  77. {
  78. start(cfg);
  79. }
  80. else if (cfg["command"] == "unmap")
  81. {
  82. if (cfg["dev_num"].is_null())
  83. {
  84. fprintf(stderr, "device name or number is missing\n");
  85. exit(1);
  86. }
  87. unmap(cfg["dev_num"].uint64_value());
  88. }
  89. else if (cfg["command"] == "list" || cfg["command"] == "list-mapped")
  90. {
  91. auto mapped = list_mapped();
  92. print_mapped(mapped, !cfg["json"].is_null());
  93. }
  94. else
  95. {
  96. help();
  97. }
  98. }
  99. static void help()
  100. {
  101. printf(
  102. "Vitastor NBD proxy\n"
  103. "(c) Vitaliy Filippov, 2020 (VNPL-1.0)\n\n"
  104. "USAGE:\n"
  105. " %s map --etcd_address <etcd_address> --pool <pool> --inode <inode> --size <size in bytes>\n"
  106. " %s unmap /dev/nbd0\n"
  107. " %s list [--json]\n",
  108. exe_name, exe_name, exe_name
  109. );
  110. exit(0);
  111. }
  112. void unmap(int dev_num)
  113. {
  114. char path[64] = { 0 };
  115. sprintf(path, "/dev/nbd%d", dev_num);
  116. int r, nbd = open(path, O_RDWR);
  117. if (nbd < 0)
  118. {
  119. perror("open");
  120. exit(1);
  121. }
  122. r = ioctl(nbd, NBD_DISCONNECT);
  123. if (r < 0)
  124. {
  125. perror("NBD_DISCONNECT");
  126. exit(1);
  127. }
  128. close(nbd);
  129. }
  130. void start(json11::Json cfg)
  131. {
  132. // Check options
  133. if (cfg["etcd_address"].string_value() == "")
  134. {
  135. fprintf(stderr, "etcd_address is missing\n");
  136. exit(1);
  137. }
  138. if (!cfg["size"].uint64_value())
  139. {
  140. fprintf(stderr, "device size is missing\n");
  141. exit(1);
  142. }
  143. inode = cfg["inode"].uint64_value();
  144. uint64_t pool = cfg["pool"].uint64_value();
  145. if (pool)
  146. {
  147. inode = (inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (pool << (64-POOL_ID_BITS));
  148. }
  149. if (!(inode >> (64-POOL_ID_BITS)))
  150. {
  151. fprintf(stderr, "pool is missing\n");
  152. exit(1);
  153. }
  154. // Initialize NBD
  155. int sockfd[2];
  156. if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockfd) < 0)
  157. {
  158. perror("socketpair");
  159. exit(1);
  160. }
  161. fcntl(sockfd[0], F_SETFL, fcntl(sockfd[0], F_GETFL, 0) | O_NONBLOCK);
  162. nbd_fd = sockfd[0];
  163. load_module();
  164. if (!cfg["dev_num"].is_null())
  165. {
  166. if (run_nbd(sockfd, cfg["dev_num"].int64_value(), cfg["size"].uint64_value(), NBD_FLAG_SEND_FLUSH, 30) < 0)
  167. {
  168. perror("run_nbd");
  169. exit(1);
  170. }
  171. }
  172. else
  173. {
  174. // Find an unused device
  175. int i = 0;
  176. while (true)
  177. {
  178. int r = run_nbd(sockfd, i, cfg["size"].uint64_value(), NBD_FLAG_SEND_FLUSH, 30);
  179. if (r == 0)
  180. {
  181. printf("/dev/nbd%d\n", i);
  182. break;
  183. }
  184. else if (r == -1 && errno == ENOENT)
  185. {
  186. fprintf(stderr, "No free NBD devices found\n");
  187. exit(1);
  188. }
  189. else if (r == -2 && errno == EBUSY)
  190. {
  191. i++;
  192. }
  193. else
  194. {
  195. printf("%d %d\n", r, errno);
  196. perror("run_nbd");
  197. exit(1);
  198. }
  199. }
  200. }
  201. if (cfg["foreground"].is_null())
  202. {
  203. daemonize();
  204. }
  205. // Create client
  206. ringloop = new ring_loop_t(512);
  207. epmgr = new epoll_manager_t(ringloop);
  208. cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
  209. // Initialize read state
  210. read_state = CL_READ_HDR;
  211. recv_buf = malloc_or_die(receive_buffer_size);
  212. cur_buf = &cur_req;
  213. cur_left = sizeof(nbd_request);
  214. consumer.loop = [this]()
  215. {
  216. submit_read();
  217. submit_send();
  218. ringloop->submit();
  219. };
  220. ringloop->register_consumer(&consumer);
  221. // Add FD to epoll
  222. epmgr->tfd->set_fd_handler(sockfd[0], false, [this](int peer_fd, int epoll_events)
  223. {
  224. read_ready++;
  225. submit_read();
  226. });
  227. while (1)
  228. {
  229. ringloop->loop();
  230. ringloop->wait();
  231. }
  232. }
  233. void load_module()
  234. {
  235. if (access("/sys/module/nbd", F_OK))
  236. {
  237. return;
  238. }
  239. int r;
  240. if ((r = system("modprobe nbd")) != 0)
  241. {
  242. if (r < 0)
  243. perror("Failed to load NBD kernel module");
  244. else
  245. fprintf(stderr, "Failed to load NBD kernel module\n");
  246. exit(1);
  247. }
  248. }
  249. void daemonize()
  250. {
  251. if (fork())
  252. exit(0);
  253. setsid();
  254. if (fork())
  255. exit(0);
  256. chdir("/");
  257. close(0);
  258. close(1);
  259. close(2);
  260. open("/dev/null", O_RDONLY);
  261. open("/dev/null", O_WRONLY);
  262. open("/dev/null", O_WRONLY);
  263. }
  264. json11::Json::object list_mapped()
  265. {
  266. const char *self_filename = exe_name;
  267. for (int i = 0; exe_name[i] != 0; i++)
  268. {
  269. if (exe_name[i] == '/')
  270. self_filename = exe_name+i+1;
  271. }
  272. char path[64] = { 0 };
  273. json11::Json::object mapped;
  274. int dev_num = -1;
  275. int pid;
  276. while (true)
  277. {
  278. dev_num++;
  279. sprintf(path, "/sys/block/nbd%d", dev_num);
  280. if (access(path, F_OK) != 0)
  281. break;
  282. sprintf(path, "/sys/block/nbd%d/pid", dev_num);
  283. std::string pid_str = read_file(path);
  284. if (pid_str == "")
  285. continue;
  286. if (sscanf(pid_str.c_str(), "%d", &pid) < 1)
  287. {
  288. printf("Failed to read pid from /sys/block/nbd%d/pid\n", dev_num);
  289. continue;
  290. }
  291. sprintf(path, "/proc/%d/cmdline", pid);
  292. std::string cmdline = read_file(path);
  293. std::vector<const char*> argv;
  294. int last = 0;
  295. for (int i = 0; i < cmdline.size(); i++)
  296. {
  297. if (cmdline[i] == 0)
  298. {
  299. argv.push_back(cmdline.c_str()+last);
  300. last = i+1;
  301. }
  302. }
  303. if (argv.size() > 0)
  304. {
  305. const char *pid_filename = argv[0];
  306. for (int i = 0; argv[0][i] != 0; i++)
  307. {
  308. if (argv[0][i] == '/')
  309. pid_filename = argv[0]+i+1;
  310. }
  311. if (!strcmp(pid_filename, self_filename))
  312. {
  313. json11::Json::object cfg = nbd_proxy::parse_args(argv.size(), argv.data());
  314. if (cfg["command"] == "map")
  315. {
  316. cfg.erase("command");
  317. cfg["pid"] = pid;
  318. mapped["/dev/nbd"+std::to_string(dev_num)] = cfg;
  319. }
  320. }
  321. }
  322. }
  323. return mapped;
  324. }
  325. void print_mapped(json11::Json mapped, bool json)
  326. {
  327. if (json)
  328. {
  329. printf("%s\n", mapped.dump().c_str());
  330. }
  331. else
  332. {
  333. for (auto & dev: mapped.object_items())
  334. {
  335. printf("%s\n", dev.first.c_str());
  336. for (auto & k: dev.second.object_items())
  337. {
  338. printf("%s: %s\n", k.first.c_str(), k.second.string_value().c_str());
  339. }
  340. printf("\n");
  341. }
  342. }
  343. }
  344. std::string read_file(char *path)
  345. {
  346. int fd = open(path, O_RDONLY);
  347. if (fd < 0)
  348. {
  349. if (errno == ENOENT)
  350. return "";
  351. auto err = "open "+std::string(path);
  352. perror(err.c_str());
  353. exit(1);
  354. }
  355. std::string r;
  356. while (true)
  357. {
  358. int l = r.size();
  359. r.resize(l + 1024);
  360. int rd = read(fd, (void*)(r.c_str() + l), 1024);
  361. if (rd <= 0)
  362. {
  363. r.resize(l);
  364. break;
  365. }
  366. r.resize(l + rd);
  367. }
  368. close(fd);
  369. return r;
  370. }
  371. protected:
  372. int run_nbd(int sockfd[2], int dev_num, uint64_t size, uint64_t flags, unsigned timeout)
  373. {
  374. // Check handle size
  375. assert(sizeof(cur_req.handle) == 8);
  376. char path[64] = { 0 };
  377. sprintf(path, "/dev/nbd%d", dev_num);
  378. int r, nbd = open(path, O_RDWR), qd_fd;
  379. if (nbd < 0)
  380. {
  381. return -1;
  382. }
  383. r = ioctl(nbd, NBD_SET_SOCK, sockfd[1]);
  384. if (r < 0)
  385. {
  386. goto end_close;
  387. }
  388. r = ioctl(nbd, NBD_SET_BLKSIZE, 4096);
  389. if (r < 0)
  390. {
  391. goto end_unmap;
  392. }
  393. r = ioctl(nbd, NBD_SET_SIZE, size);
  394. if (r < 0)
  395. {
  396. goto end_unmap;
  397. }
  398. ioctl(nbd, NBD_SET_FLAGS, flags);
  399. if (timeout >= 0)
  400. {
  401. r = ioctl(nbd, NBD_SET_TIMEOUT, (unsigned long)timeout);
  402. if (r < 0)
  403. {
  404. goto end_unmap;
  405. }
  406. }
  407. // Configure request size
  408. sprintf(path, "/sys/block/nbd%d/queue/max_sectors_kb", dev_num);
  409. qd_fd = open(path, O_WRONLY);
  410. if (qd_fd < 0)
  411. {
  412. goto end_unmap;
  413. }
  414. write(qd_fd, "32768", 5);
  415. close(qd_fd);
  416. if (!fork())
  417. {
  418. // Run in child
  419. close(sockfd[0]);
  420. r = ioctl(nbd, NBD_DO_IT);
  421. if (r < 0)
  422. {
  423. fprintf(stderr, "NBD device terminated with error: %s\n", strerror(errno));
  424. kill(getppid(), SIGTERM);
  425. }
  426. close(sockfd[1]);
  427. ioctl(nbd, NBD_CLEAR_QUE);
  428. ioctl(nbd, NBD_CLEAR_SOCK);
  429. exit(0);
  430. }
  431. close(sockfd[1]);
  432. close(nbd);
  433. return 0;
  434. end_close:
  435. r = errno;
  436. close(nbd);
  437. errno = r;
  438. return -2;
  439. end_unmap:
  440. r = errno;
  441. ioctl(nbd, NBD_CLEAR_SOCK);
  442. close(nbd);
  443. errno = r;
  444. return -3;
  445. }
  446. void submit_send()
  447. {
  448. if (!send_list.size() || send_msg.msg_iovlen > 0)
  449. {
  450. return;
  451. }
  452. io_uring_sqe* sqe = ringloop->get_sqe();
  453. if (!sqe)
  454. {
  455. return;
  456. }
  457. ring_data_t* data = ((ring_data_t*)sqe->user_data);
  458. data->callback = [this](ring_data_t *data) { handle_send(data->res); };
  459. send_msg.msg_iov = send_list.data();
  460. send_msg.msg_iovlen = send_list.size();
  461. my_uring_prep_sendmsg(sqe, nbd_fd, &send_msg, MSG_ZEROCOPY);
  462. }
  463. void handle_send(int result)
  464. {
  465. send_msg.msg_iovlen = 0;
  466. if (result < 0 && result != -EAGAIN)
  467. {
  468. fprintf(stderr, "Socket disconnected: %s\n", strerror(-result));
  469. exit(1);
  470. }
  471. int to_eat = 0;
  472. while (result > 0 && to_eat < send_list.size())
  473. {
  474. if (result >= send_list[to_eat].iov_len)
  475. {
  476. free(to_free[to_eat]);
  477. result -= send_list[to_eat].iov_len;
  478. to_eat++;
  479. }
  480. else
  481. {
  482. send_list[to_eat].iov_base += result;
  483. send_list[to_eat].iov_len -= result;
  484. break;
  485. }
  486. }
  487. if (to_eat > 0)
  488. {
  489. send_list.erase(send_list.begin(), send_list.begin() + to_eat);
  490. to_free.erase(to_free.begin(), to_free.begin() + to_eat);
  491. }
  492. for (int i = 0; i < next_send_list.size(); i++)
  493. {
  494. send_list.push_back(next_send_list[i]);
  495. }
  496. next_send_list.clear();
  497. if (send_list.size() > 0)
  498. {
  499. ringloop->wakeup();
  500. }
  501. }
  502. void submit_read()
  503. {
  504. if (!read_ready || read_msg.msg_iovlen > 0)
  505. {
  506. return;
  507. }
  508. io_uring_sqe* sqe = ringloop->get_sqe();
  509. if (!sqe)
  510. {
  511. return;
  512. }
  513. ring_data_t* data = ((ring_data_t*)sqe->user_data);
  514. data->callback = [this](ring_data_t *data) { handle_read(data->res); };
  515. if (cur_left < receive_buffer_size)
  516. {
  517. read_iov.iov_base = recv_buf;
  518. read_iov.iov_len = receive_buffer_size;
  519. }
  520. else
  521. {
  522. read_iov.iov_base = cur_buf;
  523. read_iov.iov_len = cur_left;
  524. }
  525. read_msg.msg_iov = &read_iov;
  526. read_msg.msg_iovlen = 1;
  527. my_uring_prep_recvmsg(sqe, nbd_fd, &read_msg, 0);
  528. }
  529. void handle_read(int result)
  530. {
  531. read_msg.msg_iovlen = 0;
  532. if (result < 0 && result != -EAGAIN)
  533. {
  534. fprintf(stderr, "Socket disconnected: %s\n", strerror(-result));
  535. exit(1);
  536. }
  537. if (result == -EAGAIN || result < read_iov.iov_len)
  538. {
  539. read_ready--;
  540. }
  541. if (read_ready > 0)
  542. {
  543. ringloop->wakeup();
  544. }
  545. void *b = recv_buf;
  546. while (result > 0)
  547. {
  548. if (read_iov.iov_base == recv_buf)
  549. {
  550. int inc = result >= cur_left ? cur_left : result;
  551. memcpy(cur_buf, b, inc);
  552. cur_left -= inc;
  553. result -= inc;
  554. cur_buf += inc;
  555. b += inc;
  556. }
  557. else
  558. {
  559. assert(result <= cur_left);
  560. cur_left -= result;
  561. result = 0;
  562. }
  563. if (cur_left <= 0)
  564. {
  565. handle_finished_read();
  566. }
  567. }
  568. }
  569. void handle_finished_read()
  570. {
  571. if (read_state == CL_READ_HDR)
  572. {
  573. int req_type = be32toh(cur_req.type);
  574. if (be32toh(cur_req.magic) != NBD_REQUEST_MAGIC ||
  575. req_type != NBD_CMD_READ && req_type != NBD_CMD_WRITE && req_type != NBD_CMD_FLUSH)
  576. {
  577. printf("Unexpected request: magic=%x type=%x, terminating\n", cur_req.magic, req_type);
  578. exit(1);
  579. }
  580. uint64_t handle = *((uint64_t*)cur_req.handle);
  581. #ifdef DEBUG
  582. printf("request %lx +%x %lx\n", be64toh(cur_req.from), be32toh(cur_req.len), handle);
  583. #endif
  584. void *buf = NULL;
  585. cluster_op_t *op = new cluster_op_t;
  586. if (req_type == NBD_CMD_READ || req_type == NBD_CMD_WRITE)
  587. {
  588. op->opcode = req_type == NBD_CMD_READ ? OSD_OP_READ : OSD_OP_WRITE;
  589. op->inode = inode;
  590. op->offset = be64toh(cur_req.from);
  591. op->len = be32toh(cur_req.len);
  592. buf = malloc_or_die(sizeof(nbd_reply) + op->len);
  593. op->iov.push_back(buf + sizeof(nbd_reply), op->len);
  594. }
  595. else if (req_type == NBD_CMD_FLUSH)
  596. {
  597. op->opcode = OSD_OP_SYNC;
  598. buf = malloc_or_die(sizeof(nbd_reply));
  599. }
  600. op->callback = [this, buf, handle](cluster_op_t *op)
  601. {
  602. #ifdef DEBUG
  603. printf("reply %lx e=%d\n", handle, op->retval);
  604. #endif
  605. nbd_reply *reply = (nbd_reply*)buf;
  606. reply->magic = htobe32(NBD_REPLY_MAGIC);
  607. memcpy(reply->handle, &handle, 8);
  608. reply->error = htobe32(op->retval < 0 ? -op->retval : 0);
  609. auto & to_list = send_msg.msg_iovlen > 0 ? next_send_list : send_list;
  610. if (op->retval < 0 || op->opcode != OSD_OP_READ)
  611. to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) });
  612. else
  613. to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) + op->len });
  614. to_free.push_back(buf);
  615. delete op;
  616. ringloop->wakeup();
  617. };
  618. if (req_type == NBD_CMD_WRITE)
  619. {
  620. cur_op = op;
  621. cur_buf = buf + sizeof(nbd_reply);
  622. cur_left = op->len;
  623. read_state = CL_READ_DATA;
  624. }
  625. else
  626. {
  627. cur_op = NULL;
  628. cur_buf = &cur_req;
  629. cur_left = sizeof(nbd_request);
  630. read_state = CL_READ_HDR;
  631. cli->execute(op);
  632. }
  633. }
  634. else
  635. {
  636. cli->execute(cur_op);
  637. cur_op = NULL;
  638. cur_buf = &cur_req;
  639. cur_left = sizeof(nbd_request);
  640. read_state = CL_READ_HDR;
  641. }
  642. }
  643. };
  644. int main(int narg, const char *args[])
  645. {
  646. setvbuf(stdout, NULL, _IONBF, 0);
  647. setvbuf(stderr, NULL, _IONBF, 0);
  648. exe_name = args[0];
  649. nbd_proxy *p = new nbd_proxy();
  650. p->exec(nbd_proxy::parse_args(narg, args));
  651. return 0;
  652. }