Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

175 lines
4.9 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. /**
  4. * Stub "OSD" to test & compare network performance with sync read/write and io_uring
  5. *
  6. * Core i7-6700HQ laptop
  7. *
  8. * stub_osd:
  9. * randwrite Q1 S1: 36900 iops
  10. * randwrite Q32 S32: 71000 iops
  11. * randwrite Q32 S32 (multi-fsync fix): 113000 iops
  12. * randread Q1: 67300 iops
  13. * randread Q32: 144000 iops
  14. *
  15. * io_uring osd with #define OSD_STUB:
  16. * randwrite Q1 S1: 30000 iops
  17. * randwrite Q32 S32: 78600 iops
  18. * randwrite Q32 S32 (multi-fsync fix): 125000 iops
  19. * randread Q1: 50700 iops
  20. * randread Q32: 86100 iops
  21. *
  22. * It seems io_uring is fine :)
  23. */
  24. #include <sys/types.h>
  25. #include <sys/socket.h>
  26. #include <netinet/in.h>
  27. #include <netinet/tcp.h>
  28. #include <arpa/inet.h>
  29. #include <string.h>
  30. #include <stdio.h>
  31. #include <unistd.h>
  32. #include <fcntl.h>
  33. #include <errno.h>
  34. #include <stdlib.h>
  35. #include <stdexcept>
  36. #include "rw_blocking.h"
  37. #include "osd_ops.h"
  38. int bind_stub(const char *bind_address, int bind_port);
  39. void run_stub(int peer_fd);
  40. int main(int narg, char *args[])
  41. {
  42. int listen_fd = bind_stub("0.0.0.0", 11203);
  43. // Accept new connections
  44. sockaddr_in addr;
  45. socklen_t peer_addr_size = sizeof(addr);
  46. int peer_fd;
  47. while (1)
  48. {
  49. printf("stub_osd: waiting for 1 client\n");
  50. peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size);
  51. if (peer_fd == -1)
  52. {
  53. if (errno == EAGAIN)
  54. continue;
  55. else
  56. throw std::runtime_error(std::string("accept: ") + strerror(errno));
  57. }
  58. char peer_str[256];
  59. printf("stub_osd: new client %d: connection from %s port %d\n", peer_fd,
  60. inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
  61. int one = 1;
  62. setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
  63. run_stub(peer_fd);
  64. close(peer_fd);
  65. printf("stub_osd: client %d disconnected\n", peer_fd);
  66. // Try to accept next connection
  67. peer_addr_size = sizeof(addr);
  68. }
  69. return 0;
  70. }
  71. int bind_stub(const char *bind_address, int bind_port)
  72. {
  73. int listen_backlog = 128;
  74. int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  75. if (listen_fd < 0)
  76. {
  77. throw std::runtime_error(std::string("socket: ") + strerror(errno));
  78. }
  79. int enable = 1;
  80. setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
  81. sockaddr_in addr;
  82. int r;
  83. if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1)
  84. {
  85. close(listen_fd);
  86. throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support"));
  87. }
  88. addr.sin_family = AF_INET;
  89. addr.sin_port = htons(bind_port);
  90. if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
  91. {
  92. close(listen_fd);
  93. throw std::runtime_error(std::string("bind: ") + strerror(errno));
  94. }
  95. if (listen(listen_fd, listen_backlog) < 0)
  96. {
  97. close(listen_fd);
  98. throw std::runtime_error(std::string("listen: ") + strerror(errno));
  99. }
  100. return listen_fd;
  101. }
  102. void run_stub(int peer_fd)
  103. {
  104. osd_any_op_t op;
  105. osd_any_reply_t reply;
  106. void *buf = NULL;
  107. while (1)
  108. {
  109. int r = read_blocking(peer_fd, op.buf, OSD_PACKET_SIZE);
  110. if (r < OSD_PACKET_SIZE)
  111. {
  112. break;
  113. }
  114. if (op.hdr.magic != SECONDARY_OSD_OP_MAGIC)
  115. {
  116. printf("client %d: bad magic number in operation header\n", peer_fd);
  117. break;
  118. }
  119. reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
  120. reply.hdr.id = op.hdr.id;
  121. reply.hdr.opcode = op.hdr.opcode;
  122. if (op.hdr.opcode == OSD_OP_SEC_READ)
  123. {
  124. reply.hdr.retval = op.sec_rw.len;
  125. buf = malloc(op.sec_rw.len);
  126. r = write_blocking(peer_fd, reply.buf, OSD_PACKET_SIZE);
  127. if (r == OSD_PACKET_SIZE)
  128. r = write_blocking(peer_fd, &buf, op.sec_rw.len);
  129. free(buf);
  130. if (r < op.sec_rw.len)
  131. break;
  132. }
  133. else if (op.hdr.opcode == OSD_OP_SEC_WRITE || op.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
  134. {
  135. buf = malloc(op.sec_rw.len);
  136. r = read_blocking(peer_fd, buf, op.sec_rw.len);
  137. free(buf);
  138. reply.hdr.retval = op.sec_rw.len;
  139. if (r == op.sec_rw.len)
  140. r = write_blocking(peer_fd, reply.buf, OSD_PACKET_SIZE);
  141. else
  142. r = 0;
  143. if (r < OSD_PACKET_SIZE)
  144. break;
  145. }
  146. else if (op.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL)
  147. {
  148. reply.hdr.retval = 0;
  149. r = write_blocking(peer_fd, reply.buf, OSD_PACKET_SIZE);
  150. if (r < OSD_PACKET_SIZE)
  151. break;
  152. }
  153. else
  154. {
  155. printf("client %d: unsupported stub opcode: %lu\n", peer_fd, op.hdr.opcode);
  156. break;
  157. }
  158. }
  159. free(buf);
  160. }