Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

709 lines
20 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. // Similar to qemu-nbd, but sets timeout and uses io_uring
  4. #include <linux/nbd.h>
  5. #include <sys/ioctl.h>
  6. #include <sys/socket.h>
  7. #include <netinet/in.h>
  8. #include <netinet/tcp.h>
  9. #include <arpa/inet.h>
  10. #include <sys/un.h>
  11. #include <unistd.h>
  12. #include <fcntl.h>
  13. #include <signal.h>
  14. #include "epoll_manager.h"
  15. #include "cluster_client.h"
  16. #ifndef MSG_ZEROCOPY
  17. #define MSG_ZEROCOPY 0
  18. #endif
  19. const char *exe_name = NULL;
  20. class nbd_proxy
  21. {
  22. protected:
  23. std::string image_name;
  24. uint64_t inode = 0;
  25. uint64_t device_size = 0;
  26. inode_watch_t *watch = NULL;
  27. ring_loop_t *ringloop = NULL;
  28. epoll_manager_t *epmgr = NULL;
  29. cluster_client_t *cli = NULL;
  30. ring_consumer_t consumer;
  31. std::vector<iovec> send_list, next_send_list;
  32. std::vector<void*> to_free;
  33. int nbd_fd = -1;
  34. void *recv_buf = NULL;
  35. int receive_buffer_size = 9000;
  36. nbd_request cur_req;
  37. cluster_op_t *cur_op = NULL;
  38. void *cur_buf = NULL;
  39. int cur_left = 0;
  40. int read_state = 0;
  41. int read_ready = 0;
  42. msghdr read_msg = { 0 }, send_msg = { 0 };
  43. iovec read_iov = { 0 };
  44. public:
  45. static json11::Json::object parse_args(int narg, const char *args[])
  46. {
  47. json11::Json::object cfg;
  48. int pos = 0;
  49. for (int i = 1; i < narg; i++)
  50. {
  51. if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
  52. {
  53. help();
  54. }
  55. else if (args[i][0] == '-' && args[i][1] == '-')
  56. {
  57. const char *opt = args[i]+2;
  58. cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i];
  59. }
  60. else if (pos == 0)
  61. {
  62. cfg["command"] = args[i];
  63. pos++;
  64. }
  65. else if (pos == 1 && (cfg["command"] == "map" || cfg["command"] == "unmap"))
  66. {
  67. int n = 0;
  68. if (sscanf(args[i], "/dev/nbd%d", &n) > 0)
  69. cfg["dev_num"] = n;
  70. else
  71. cfg["dev_num"] = args[i];
  72. pos++;
  73. }
  74. }
  75. return cfg;
  76. }
  77. void exec(json11::Json cfg)
  78. {
  79. if (cfg["command"] == "map")
  80. {
  81. start(cfg);
  82. }
  83. else if (cfg["command"] == "unmap")
  84. {
  85. if (cfg["dev_num"].is_null())
  86. {
  87. fprintf(stderr, "device name or number is missing\n");
  88. exit(1);
  89. }
  90. unmap(cfg["dev_num"].uint64_value());
  91. }
  92. else if (cfg["command"] == "list" || cfg["command"] == "list-mapped")
  93. {
  94. auto mapped = list_mapped();
  95. print_mapped(mapped, !cfg["json"].is_null());
  96. }
  97. else
  98. {
  99. help();
  100. }
  101. }
  102. static void help()
  103. {
  104. printf(
  105. "Vitastor NBD proxy\n"
  106. "(c) Vitaliy Filippov, 2020-2021 (VNPL-1.1)\n\n"
  107. "USAGE:\n"
  108. " %s map [--etcd_address <etcd_address>] (--image <image> | --pool <pool> --inode <inode> --size <size in bytes>)\n"
  109. " %s unmap /dev/nbd0\n"
  110. " %s list [--json]\n",
  111. exe_name, exe_name, exe_name
  112. );
  113. exit(0);
  114. }
  115. void unmap(int dev_num)
  116. {
  117. char path[64] = { 0 };
  118. sprintf(path, "/dev/nbd%d", dev_num);
  119. int r, nbd = open(path, O_RDWR);
  120. if (nbd < 0)
  121. {
  122. perror("open");
  123. exit(1);
  124. }
  125. r = ioctl(nbd, NBD_DISCONNECT);
  126. if (r < 0)
  127. {
  128. perror("NBD_DISCONNECT");
  129. exit(1);
  130. }
  131. close(nbd);
  132. }
  133. void start(json11::Json cfg)
  134. {
  135. // Check options
  136. if (cfg["image"].string_value() != "")
  137. {
  138. // Use image name
  139. image_name = cfg["image"].string_value();
  140. inode = 0;
  141. }
  142. else
  143. {
  144. // Use pool, inode number and size
  145. if (!cfg["size"].uint64_value())
  146. {
  147. fprintf(stderr, "device size is missing\n");
  148. exit(1);
  149. }
  150. device_size = cfg["size"].uint64_value();
  151. inode = cfg["inode"].uint64_value();
  152. uint64_t pool = cfg["pool"].uint64_value();
  153. if (pool)
  154. {
  155. inode = (inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (pool << (64-POOL_ID_BITS));
  156. }
  157. if (!(inode >> (64-POOL_ID_BITS)))
  158. {
  159. fprintf(stderr, "pool is missing\n");
  160. exit(1);
  161. }
  162. }
  163. // Create client
  164. ringloop = new ring_loop_t(512);
  165. epmgr = new epoll_manager_t(ringloop);
  166. cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
  167. if (!inode)
  168. {
  169. // Load image metadata
  170. while (!cli->is_ready())
  171. {
  172. ringloop->loop();
  173. if (cli->is_ready())
  174. break;
  175. ringloop->wait();
  176. }
  177. watch = cli->st_cli.watch_inode(image_name);
  178. device_size = watch->cfg.size;
  179. }
  180. // Initialize NBD
  181. int sockfd[2];
  182. if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockfd) < 0)
  183. {
  184. perror("socketpair");
  185. exit(1);
  186. }
  187. fcntl(sockfd[0], F_SETFL, fcntl(sockfd[0], F_GETFL, 0) | O_NONBLOCK);
  188. nbd_fd = sockfd[0];
  189. load_module();
  190. if (!cfg["dev_num"].is_null())
  191. {
  192. if (run_nbd(sockfd, cfg["dev_num"].int64_value(), device_size, NBD_FLAG_SEND_FLUSH, 30) < 0)
  193. {
  194. perror("run_nbd");
  195. exit(1);
  196. }
  197. }
  198. else
  199. {
  200. // Find an unused device
  201. int i = 0;
  202. while (true)
  203. {
  204. int r = run_nbd(sockfd, i, device_size, NBD_FLAG_SEND_FLUSH, 30);
  205. if (r == 0)
  206. {
  207. printf("/dev/nbd%d\n", i);
  208. break;
  209. }
  210. else if (r == -1 && errno == ENOENT)
  211. {
  212. fprintf(stderr, "No free NBD devices found\n");
  213. exit(1);
  214. }
  215. else if (r == -2 && errno == EBUSY)
  216. {
  217. i++;
  218. }
  219. else
  220. {
  221. printf("%d %d\n", r, errno);
  222. perror("run_nbd");
  223. exit(1);
  224. }
  225. }
  226. }
  227. if (cfg["foreground"].is_null())
  228. {
  229. daemonize();
  230. }
  231. // Initialize read state
  232. read_state = CL_READ_HDR;
  233. recv_buf = malloc_or_die(receive_buffer_size);
  234. cur_buf = &cur_req;
  235. cur_left = sizeof(nbd_request);
  236. consumer.loop = [this]()
  237. {
  238. submit_read();
  239. submit_send();
  240. ringloop->submit();
  241. };
  242. ringloop->register_consumer(&consumer);
  243. // Add FD to epoll
  244. epmgr->tfd->set_fd_handler(sockfd[0], false, [this](int peer_fd, int epoll_events)
  245. {
  246. read_ready++;
  247. submit_read();
  248. });
  249. while (1)
  250. {
  251. ringloop->loop();
  252. ringloop->wait();
  253. }
  254. // FIXME: Cleanup when exiting
  255. }
  256. void load_module()
  257. {
  258. if (access("/sys/module/nbd", F_OK) == 0)
  259. {
  260. return;
  261. }
  262. int r;
  263. if ((r = system("modprobe nbd")) != 0)
  264. {
  265. if (r < 0)
  266. perror("Failed to load NBD kernel module");
  267. else
  268. fprintf(stderr, "Failed to load NBD kernel module\n");
  269. exit(1);
  270. }
  271. }
  272. void daemonize()
  273. {
  274. if (fork())
  275. exit(0);
  276. setsid();
  277. if (fork())
  278. exit(0);
  279. chdir("/");
  280. close(0);
  281. close(1);
  282. close(2);
  283. open("/dev/null", O_RDONLY);
  284. open("/dev/null", O_WRONLY);
  285. open("/dev/null", O_WRONLY);
  286. }
  287. json11::Json::object list_mapped()
  288. {
  289. const char *self_filename = exe_name;
  290. for (int i = 0; exe_name[i] != 0; i++)
  291. {
  292. if (exe_name[i] == '/')
  293. self_filename = exe_name+i+1;
  294. }
  295. char path[64] = { 0 };
  296. json11::Json::object mapped;
  297. int dev_num = -1;
  298. int pid;
  299. while (true)
  300. {
  301. dev_num++;
  302. sprintf(path, "/sys/block/nbd%d", dev_num);
  303. if (access(path, F_OK) != 0)
  304. break;
  305. sprintf(path, "/sys/block/nbd%d/pid", dev_num);
  306. std::string pid_str = read_file(path);
  307. if (pid_str == "")
  308. continue;
  309. if (sscanf(pid_str.c_str(), "%d", &pid) < 1)
  310. {
  311. printf("Failed to read pid from /sys/block/nbd%d/pid\n", dev_num);
  312. continue;
  313. }
  314. sprintf(path, "/proc/%d/cmdline", pid);
  315. std::string cmdline = read_file(path);
  316. std::vector<const char*> argv;
  317. int last = 0;
  318. for (int i = 0; i < cmdline.size(); i++)
  319. {
  320. if (cmdline[i] == 0)
  321. {
  322. argv.push_back(cmdline.c_str()+last);
  323. last = i+1;
  324. }
  325. }
  326. if (argv.size() > 0)
  327. {
  328. const char *pid_filename = argv[0];
  329. for (int i = 0; argv[0][i] != 0; i++)
  330. {
  331. if (argv[0][i] == '/')
  332. pid_filename = argv[0]+i+1;
  333. }
  334. if (!strcmp(pid_filename, self_filename))
  335. {
  336. json11::Json::object cfg = nbd_proxy::parse_args(argv.size(), argv.data());
  337. if (cfg["command"] == "map")
  338. {
  339. cfg.erase("command");
  340. cfg["pid"] = pid;
  341. mapped["/dev/nbd"+std::to_string(dev_num)] = cfg;
  342. }
  343. }
  344. }
  345. }
  346. return mapped;
  347. }
  348. void print_mapped(json11::Json mapped, bool json)
  349. {
  350. if (json)
  351. {
  352. printf("%s\n", mapped.dump().c_str());
  353. }
  354. else
  355. {
  356. for (auto & dev: mapped.object_items())
  357. {
  358. printf("%s\n", dev.first.c_str());
  359. for (auto & k: dev.second.object_items())
  360. {
  361. printf("%s: %s\n", k.first.c_str(), k.second.string_value().c_str());
  362. }
  363. printf("\n");
  364. }
  365. }
  366. }
  367. std::string read_file(char *path)
  368. {
  369. int fd = open(path, O_RDONLY);
  370. if (fd < 0)
  371. {
  372. if (errno == ENOENT)
  373. return "";
  374. auto err = "open "+std::string(path);
  375. perror(err.c_str());
  376. exit(1);
  377. }
  378. std::string r;
  379. while (true)
  380. {
  381. int l = r.size();
  382. r.resize(l + 1024);
  383. int rd = read(fd, (void*)(r.c_str() + l), 1024);
  384. if (rd <= 0)
  385. {
  386. r.resize(l);
  387. break;
  388. }
  389. r.resize(l + rd);
  390. }
  391. close(fd);
  392. return r;
  393. }
  394. protected:
  395. int run_nbd(int sockfd[2], int dev_num, uint64_t size, uint64_t flags, unsigned timeout)
  396. {
  397. // Check handle size
  398. assert(sizeof(cur_req.handle) == 8);
  399. char path[64] = { 0 };
  400. sprintf(path, "/dev/nbd%d", dev_num);
  401. int r, nbd = open(path, O_RDWR), qd_fd;
  402. if (nbd < 0)
  403. {
  404. return -1;
  405. }
  406. r = ioctl(nbd, NBD_SET_SOCK, sockfd[1]);
  407. if (r < 0)
  408. {
  409. goto end_close;
  410. }
  411. r = ioctl(nbd, NBD_SET_BLKSIZE, 4096);
  412. if (r < 0)
  413. {
  414. goto end_unmap;
  415. }
  416. r = ioctl(nbd, NBD_SET_SIZE, size);
  417. if (r < 0)
  418. {
  419. goto end_unmap;
  420. }
  421. ioctl(nbd, NBD_SET_FLAGS, flags);
  422. if (timeout >= 0)
  423. {
  424. r = ioctl(nbd, NBD_SET_TIMEOUT, (unsigned long)timeout);
  425. if (r < 0)
  426. {
  427. goto end_unmap;
  428. }
  429. }
  430. // Configure request size
  431. sprintf(path, "/sys/block/nbd%d/queue/max_sectors_kb", dev_num);
  432. qd_fd = open(path, O_WRONLY);
  433. if (qd_fd < 0)
  434. {
  435. goto end_unmap;
  436. }
  437. write(qd_fd, "32768", 5);
  438. close(qd_fd);
  439. if (!fork())
  440. {
  441. // Run in child
  442. close(sockfd[0]);
  443. r = ioctl(nbd, NBD_DO_IT);
  444. if (r < 0)
  445. {
  446. fprintf(stderr, "NBD device terminated with error: %s\n", strerror(errno));
  447. kill(getppid(), SIGTERM);
  448. }
  449. close(sockfd[1]);
  450. ioctl(nbd, NBD_CLEAR_QUE);
  451. ioctl(nbd, NBD_CLEAR_SOCK);
  452. exit(0);
  453. }
  454. close(sockfd[1]);
  455. close(nbd);
  456. return 0;
  457. end_close:
  458. r = errno;
  459. close(nbd);
  460. errno = r;
  461. return -2;
  462. end_unmap:
  463. r = errno;
  464. ioctl(nbd, NBD_CLEAR_SOCK);
  465. close(nbd);
  466. errno = r;
  467. return -3;
  468. }
  469. void submit_send()
  470. {
  471. if (!send_list.size() || send_msg.msg_iovlen > 0)
  472. {
  473. return;
  474. }
  475. io_uring_sqe* sqe = ringloop->get_sqe();
  476. if (!sqe)
  477. {
  478. return;
  479. }
  480. ring_data_t* data = ((ring_data_t*)sqe->user_data);
  481. data->callback = [this](ring_data_t *data) { handle_send(data->res); };
  482. send_msg.msg_iov = send_list.data();
  483. send_msg.msg_iovlen = send_list.size();
  484. my_uring_prep_sendmsg(sqe, nbd_fd, &send_msg, MSG_ZEROCOPY);
  485. }
  486. void handle_send(int result)
  487. {
  488. send_msg.msg_iovlen = 0;
  489. if (result < 0 && result != -EAGAIN)
  490. {
  491. fprintf(stderr, "Socket disconnected: %s\n", strerror(-result));
  492. exit(1);
  493. }
  494. int to_eat = 0;
  495. while (result > 0 && to_eat < send_list.size())
  496. {
  497. if (result >= send_list[to_eat].iov_len)
  498. {
  499. free(to_free[to_eat]);
  500. result -= send_list[to_eat].iov_len;
  501. to_eat++;
  502. }
  503. else
  504. {
  505. send_list[to_eat].iov_base += result;
  506. send_list[to_eat].iov_len -= result;
  507. break;
  508. }
  509. }
  510. if (to_eat > 0)
  511. {
  512. send_list.erase(send_list.begin(), send_list.begin() + to_eat);
  513. to_free.erase(to_free.begin(), to_free.begin() + to_eat);
  514. }
  515. for (int i = 0; i < next_send_list.size(); i++)
  516. {
  517. send_list.push_back(next_send_list[i]);
  518. }
  519. next_send_list.clear();
  520. if (send_list.size() > 0)
  521. {
  522. ringloop->wakeup();
  523. }
  524. }
  525. void submit_read()
  526. {
  527. if (!read_ready || read_msg.msg_iovlen > 0)
  528. {
  529. return;
  530. }
  531. io_uring_sqe* sqe = ringloop->get_sqe();
  532. if (!sqe)
  533. {
  534. return;
  535. }
  536. ring_data_t* data = ((ring_data_t*)sqe->user_data);
  537. data->callback = [this](ring_data_t *data) { handle_read(data->res); };
  538. if (cur_left < receive_buffer_size)
  539. {
  540. read_iov.iov_base = recv_buf;
  541. read_iov.iov_len = receive_buffer_size;
  542. }
  543. else
  544. {
  545. read_iov.iov_base = cur_buf;
  546. read_iov.iov_len = cur_left;
  547. }
  548. read_msg.msg_iov = &read_iov;
  549. read_msg.msg_iovlen = 1;
  550. my_uring_prep_recvmsg(sqe, nbd_fd, &read_msg, 0);
  551. }
  552. void handle_read(int result)
  553. {
  554. read_msg.msg_iovlen = 0;
  555. if (result < 0 && result != -EAGAIN)
  556. {
  557. fprintf(stderr, "Socket disconnected: %s\n", strerror(-result));
  558. exit(1);
  559. }
  560. if (result == -EAGAIN || result < read_iov.iov_len)
  561. {
  562. read_ready--;
  563. }
  564. if (read_ready > 0)
  565. {
  566. ringloop->wakeup();
  567. }
  568. void *b = recv_buf;
  569. while (result > 0)
  570. {
  571. if (read_iov.iov_base == recv_buf)
  572. {
  573. int inc = result >= cur_left ? cur_left : result;
  574. memcpy(cur_buf, b, inc);
  575. cur_left -= inc;
  576. result -= inc;
  577. cur_buf += inc;
  578. b += inc;
  579. }
  580. else
  581. {
  582. assert(result <= cur_left);
  583. cur_left -= result;
  584. result = 0;
  585. }
  586. if (cur_left <= 0)
  587. {
  588. handle_finished_read();
  589. }
  590. }
  591. }
  592. void handle_finished_read()
  593. {
  594. if (read_state == CL_READ_HDR)
  595. {
  596. int req_type = be32toh(cur_req.type);
  597. if (be32toh(cur_req.magic) != NBD_REQUEST_MAGIC ||
  598. req_type != NBD_CMD_READ && req_type != NBD_CMD_WRITE && req_type != NBD_CMD_FLUSH)
  599. {
  600. printf("Unexpected request: magic=%x type=%x, terminating\n", cur_req.magic, req_type);
  601. exit(1);
  602. }
  603. uint64_t handle = *((uint64_t*)cur_req.handle);
  604. #ifdef DEBUG
  605. printf("request %lx +%x %lx\n", be64toh(cur_req.from), be32toh(cur_req.len), handle);
  606. #endif
  607. void *buf = NULL;
  608. cluster_op_t *op = new cluster_op_t;
  609. if (req_type == NBD_CMD_READ || req_type == NBD_CMD_WRITE)
  610. {
  611. op->opcode = req_type == NBD_CMD_READ ? OSD_OP_READ : OSD_OP_WRITE;
  612. op->inode = inode ? inode : watch->cfg.num;
  613. op->offset = be64toh(cur_req.from);
  614. op->len = be32toh(cur_req.len);
  615. buf = malloc_or_die(sizeof(nbd_reply) + op->len);
  616. op->iov.push_back(buf + sizeof(nbd_reply), op->len);
  617. }
  618. else if (req_type == NBD_CMD_FLUSH)
  619. {
  620. op->opcode = OSD_OP_SYNC;
  621. buf = malloc_or_die(sizeof(nbd_reply));
  622. }
  623. op->callback = [this, buf, handle](cluster_op_t *op)
  624. {
  625. #ifdef DEBUG
  626. printf("reply %lx e=%d\n", handle, op->retval);
  627. #endif
  628. nbd_reply *reply = (nbd_reply*)buf;
  629. reply->magic = htobe32(NBD_REPLY_MAGIC);
  630. memcpy(reply->handle, &handle, 8);
  631. reply->error = htobe32(op->retval < 0 ? -op->retval : 0);
  632. auto & to_list = send_msg.msg_iovlen > 0 ? next_send_list : send_list;
  633. if (op->retval < 0 || op->opcode != OSD_OP_READ)
  634. to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) });
  635. else
  636. to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) + op->len });
  637. to_free.push_back(buf);
  638. delete op;
  639. ringloop->wakeup();
  640. };
  641. if (req_type == NBD_CMD_WRITE)
  642. {
  643. cur_op = op;
  644. cur_buf = buf + sizeof(nbd_reply);
  645. cur_left = op->len;
  646. read_state = CL_READ_DATA;
  647. }
  648. else
  649. {
  650. cur_op = NULL;
  651. cur_buf = &cur_req;
  652. cur_left = sizeof(nbd_request);
  653. read_state = CL_READ_HDR;
  654. cli->execute(op);
  655. }
  656. }
  657. else
  658. {
  659. if (cur_op->opcode == OSD_OP_WRITE && watch->cfg.readonly)
  660. {
  661. cur_op->retval = -EROFS;
  662. std::function<void(cluster_op_t*)>(cur_op->callback)(cur_op);
  663. }
  664. else
  665. {
  666. cli->execute(cur_op);
  667. }
  668. cur_op = NULL;
  669. cur_buf = &cur_req;
  670. cur_left = sizeof(nbd_request);
  671. read_state = CL_READ_HDR;
  672. }
  673. }
  674. };
  675. int main(int narg, const char *args[])
  676. {
  677. setvbuf(stdout, NULL, _IONBF, 0);
  678. setvbuf(stderr, NULL, _IONBF, 0);
  679. exe_name = args[0];
  680. nbd_proxy *p = new nbd_proxy();
  681. p->exec(nbd_proxy::parse_args(narg, args));
  682. return 0;
  683. }