Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

132 lines
3.6 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
  3. /**
  4. * Stub "OSD" implemented on top of osd_messenger to test & compare
  5. * network performance with sync read/write and io_uring
  6. */
  7. #include <sys/types.h>
  8. #include <sys/socket.h>
  9. #include <netinet/in.h>
  10. #include <netinet/tcp.h>
  11. #include <arpa/inet.h>
  12. #include <string.h>
  13. #include <stdio.h>
  14. #include <unistd.h>
  15. #include <fcntl.h>
  16. #include <errno.h>
  17. #include <stdlib.h>
  18. #include <stdexcept>
  19. #include "ringloop.h"
  20. #include "epoll_manager.h"
  21. #include "messenger.h"
  22. int bind_stub(const char *bind_address, int bind_port);
  23. void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op);
  24. int main(int narg, char *args[])
  25. {
  26. ring_consumer_t looper;
  27. ring_loop_t *ringloop = new ring_loop_t(512);
  28. epoll_manager_t *epmgr = new epoll_manager_t(ringloop);
  29. osd_messenger_t *msgr = new osd_messenger_t();
  30. msgr->osd_num = 1351;
  31. msgr->tfd = epmgr->tfd;
  32. msgr->ringloop = ringloop;
  33. msgr->repeer_pgs = [](osd_num_t) {};
  34. msgr->exec_op = [msgr](osd_op_t *op) { stub_exec_op(msgr, op); };
  35. // Accept new connections
  36. int listen_fd = bind_stub("0.0.0.0", 11203);
  37. epmgr->set_fd_handler(listen_fd, false, [listen_fd, msgr](int fd, int events)
  38. {
  39. msgr->accept_connections(listen_fd);
  40. });
  41. looper.loop = [msgr, ringloop]()
  42. {
  43. msgr->read_requests();
  44. msgr->send_replies();
  45. ringloop->submit();
  46. };
  47. ringloop->register_consumer(&looper);
  48. printf("stub_uring_osd: waiting for clients\n");
  49. while (true)
  50. {
  51. ringloop->loop();
  52. ringloop->wait();
  53. }
  54. delete msgr;
  55. delete epmgr;
  56. delete ringloop;
  57. return 0;
  58. }
  59. int bind_stub(const char *bind_address, int bind_port)
  60. {
  61. int listen_backlog = 128;
  62. int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  63. if (listen_fd < 0)
  64. {
  65. throw std::runtime_error(std::string("socket: ") + strerror(errno));
  66. }
  67. int enable = 1;
  68. setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
  69. sockaddr_in addr;
  70. int r;
  71. if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1)
  72. {
  73. close(listen_fd);
  74. throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support"));
  75. }
  76. addr.sin_family = AF_INET;
  77. addr.sin_port = htons(bind_port);
  78. if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
  79. {
  80. close(listen_fd);
  81. throw std::runtime_error(std::string("bind: ") + strerror(errno));
  82. }
  83. if (listen(listen_fd, listen_backlog) < 0)
  84. {
  85. close(listen_fd);
  86. throw std::runtime_error(std::string("listen: ") + strerror(errno));
  87. }
  88. fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
  89. return listen_fd;
  90. }
  91. void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op)
  92. {
  93. op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
  94. op->reply.hdr.id = op->req.hdr.id;
  95. op->reply.hdr.opcode = op->req.hdr.opcode;
  96. if (op->req.hdr.opcode == OSD_OP_SEC_READ)
  97. {
  98. op->reply.hdr.retval = op->req.sec_rw.len;
  99. op->buf = malloc(op->req.sec_rw.len);
  100. op->iov.push_back(op->buf, op->req.sec_rw.len);
  101. }
  102. else if (op->req.hdr.opcode == OSD_OP_SEC_WRITE || op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
  103. {
  104. op->reply.hdr.retval = op->req.sec_rw.len;
  105. }
  106. else if (op->req.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL)
  107. {
  108. op->reply.hdr.retval = 0;
  109. }
  110. else
  111. {
  112. printf("client %d: unsupported stub opcode: %lu\n", op->peer_fd, op->req.hdr.opcode);
  113. op->reply.hdr.retval = -EINVAL;
  114. }
  115. msgr->outbox_push(op);
  116. }