Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

408 lines
11 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. // FIO engine to test Blockstore through Secondary OSD interface
  4. //
  5. // Prepare storage like in fio_engine.cpp, then start OSD with ./osd, then test it
  6. //
  7. // Random write:
  8. //
  9. // fio -thread -ioengine=./libfio_sec_osd.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \
  10. // -host=127.0.0.1 -port=11203 [-block_size_order=17] [-single_primary=1] -size=1000M
  11. //
  12. // Linear write:
  13. //
  14. // fio -thread -ioengine=./libfio_sec_osd.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \
  15. // -host=127.0.0.1 -port=11203 -size=1000M
  16. //
  17. // Random read (run with -iodepth=32 or -iodepth=1):
  18. //
  19. // fio -thread -ioengine=./libfio_sec_osd.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \
  20. // -host=127.0.0.1 -port=11203 -size=1000M
  21. #include <sys/types.h>
  22. #include <sys/socket.h>
  23. #include <netinet/in.h>
  24. #include <netinet/tcp.h>
  25. #include <vector>
  26. #include <unordered_map>
  27. #include "rw_blocking.h"
  28. #include "osd_ops.h"
  29. #include "fio_headers.h"
  30. struct sec_data
  31. {
  32. int connect_fd;
  33. /* block_size = 1 << block_order (128KB by default) */
  34. uint64_t block_order = 17, block_size = 1 << 17;
  35. std::unordered_map<uint64_t, io_u*> queue;
  36. bool last_sync = false;
  37. /* The list of completed io_u structs. */
  38. std::vector<io_u*> completed;
  39. uint64_t op_n = 0, inflight = 0;
  40. };
  41. struct sec_options
  42. {
  43. int __pad;
  44. char *host = NULL;
  45. int port = 0;
  46. int single_primary = 0;
  47. int trace = 0;
  48. int block_order = 17;
  49. };
  50. static struct fio_option options[] = {
  51. {
  52. .name = "host",
  53. .lname = "Test Secondary OSD host",
  54. .type = FIO_OPT_STR_STORE,
  55. .off1 = offsetof(struct sec_options, host),
  56. .help = "Test Secondary OSD host",
  57. .category = FIO_OPT_C_ENGINE,
  58. .group = FIO_OPT_G_FILENAME,
  59. },
  60. {
  61. .name = "port",
  62. .lname = "Test Secondary OSD port",
  63. .type = FIO_OPT_INT,
  64. .off1 = offsetof(struct sec_options, port),
  65. .help = "Test Secondary OSD port",
  66. .category = FIO_OPT_C_ENGINE,
  67. .group = FIO_OPT_G_FILENAME,
  68. },
  69. {
  70. .name = "block_size_order",
  71. .lname = "Blockstore block size order",
  72. .type = FIO_OPT_INT,
  73. .off1 = offsetof(struct sec_options, block_order),
  74. .help = "Blockstore block size order (size = 2^order)",
  75. .category = FIO_OPT_C_ENGINE,
  76. .group = FIO_OPT_G_FILENAME,
  77. },
  78. {
  79. .name = "single_primary",
  80. .lname = "Single Primary",
  81. .type = FIO_OPT_BOOL,
  82. .off1 = offsetof(struct sec_options, single_primary),
  83. .help = "Test single Primary OSD (one PG) instead of Secondary",
  84. .def = "0",
  85. .category = FIO_OPT_C_ENGINE,
  86. .group = FIO_OPT_G_FILENAME,
  87. },
  88. {
  89. .name = "osd_trace",
  90. .lname = "OSD trace",
  91. .type = FIO_OPT_BOOL,
  92. .off1 = offsetof(struct sec_options, trace),
  93. .help = "Trace OSD operations",
  94. .def = "0",
  95. .category = FIO_OPT_C_ENGINE,
  96. .group = FIO_OPT_G_FILENAME,
  97. },
  98. {
  99. .name = NULL,
  100. },
  101. };
  102. static int sec_setup(struct thread_data *td)
  103. {
  104. sec_data *bsd;
  105. //fio_file *f;
  106. //int r;
  107. //int64_t size;
  108. bsd = new sec_data;
  109. if (!bsd)
  110. {
  111. td_verror(td, errno, "calloc");
  112. return 1;
  113. }
  114. td->io_ops_data = bsd;
  115. if (!td->files_index)
  116. {
  117. add_file(td, "bs_sec_osd", 0, 0);
  118. td->o.nr_files = td->o.nr_files ? : 1;
  119. td->o.open_files++;
  120. }
  121. //f = td->files[0];
  122. //f->real_file_size = size;
  123. return 0;
  124. }
  125. static void sec_cleanup(struct thread_data *td)
  126. {
  127. sec_data *bsd = (sec_data*)td->io_ops_data;
  128. if (bsd)
  129. {
  130. close(bsd->connect_fd);
  131. }
  132. }
  133. /* Connect to the server from each thread. */
  134. static int sec_init(struct thread_data *td)
  135. {
  136. sec_options *o = (sec_options*)td->eo;
  137. sec_data *bsd = (sec_data*)td->io_ops_data;
  138. bsd->block_order = o->block_order == 0 ? 17 : o->block_order;
  139. bsd->block_size = 1 << o->block_order;
  140. struct sockaddr_in addr;
  141. int r;
  142. if ((r = inet_pton(AF_INET, o->host ? o->host : "127.0.0.1", &addr.sin_addr)) != 1)
  143. {
  144. fprintf(stderr, "server address: %s%s\n", o->host ? o->host : "127.0.0.1", r == 0 ? " is not valid" : ": no ipv4 support");
  145. return 1;
  146. }
  147. addr.sin_family = AF_INET;
  148. addr.sin_port = htons(o->port ? o->port : 11203);
  149. bsd->connect_fd = socket(AF_INET, SOCK_STREAM, 0);
  150. if (bsd->connect_fd < 0)
  151. {
  152. perror("socket");
  153. return 1;
  154. }
  155. if (connect(bsd->connect_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
  156. {
  157. perror("connect");
  158. return 1;
  159. }
  160. int one = 1;
  161. setsockopt(bsd->connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
  162. // FIXME: read config (block size) from OSD
  163. return 0;
  164. }
  165. /* Begin read or write request. */
  166. static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
  167. {
  168. sec_options *opt = (sec_options*)td->eo;
  169. sec_data *bsd = (sec_data*)td->io_ops_data;
  170. int n = bsd->op_n;
  171. fio_ro_check(td, io);
  172. if (io->ddir == DDIR_SYNC && bsd->last_sync)
  173. {
  174. return FIO_Q_COMPLETED;
  175. }
  176. io->engine_data = bsd;
  177. osd_any_op_t op = { 0 };
  178. op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
  179. op.hdr.id = n;
  180. switch (io->ddir)
  181. {
  182. case DDIR_READ:
  183. if (!opt->single_primary)
  184. {
  185. op.hdr.opcode = OSD_OP_SEC_READ;
  186. op.sec_rw.oid = {
  187. .inode = 1,
  188. .stripe = io->offset >> bsd->block_order,
  189. };
  190. op.sec_rw.version = UINT64_MAX; // last unstable
  191. op.sec_rw.offset = io->offset % bsd->block_size;
  192. op.sec_rw.len = io->xfer_buflen;
  193. }
  194. else
  195. {
  196. op.hdr.opcode = OSD_OP_READ;
  197. op.rw.inode = 1;
  198. op.rw.offset = io->offset;
  199. op.rw.len = io->xfer_buflen;
  200. }
  201. bsd->last_sync = false;
  202. break;
  203. case DDIR_WRITE:
  204. if (!opt->single_primary)
  205. {
  206. op.hdr.opcode = OSD_OP_SEC_WRITE;
  207. op.sec_rw.oid = {
  208. .inode = 1,
  209. .stripe = io->offset >> bsd->block_order,
  210. };
  211. op.sec_rw.version = 0; // assign automatically
  212. op.sec_rw.offset = io->offset % bsd->block_size;
  213. op.sec_rw.len = io->xfer_buflen;
  214. }
  215. else
  216. {
  217. op.hdr.opcode = OSD_OP_WRITE;
  218. op.rw.inode = 1;
  219. op.rw.offset = io->offset;
  220. op.rw.len = io->xfer_buflen;
  221. }
  222. bsd->last_sync = false;
  223. break;
  224. case DDIR_SYNC:
  225. if (!opt->single_primary)
  226. {
  227. // Allowed only for testing: sync & stabilize all unstable object versions
  228. op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL;
  229. }
  230. else
  231. {
  232. op.hdr.opcode = OSD_OP_SYNC;
  233. }
  234. // fio sends 32 syncs with -fsync=32. we omit 31 of them even though
  235. // generally it may not be 100% correct (FIXME: fix fio itself)
  236. bsd->last_sync = true;
  237. break;
  238. default:
  239. io->error = EINVAL;
  240. return FIO_Q_COMPLETED;
  241. }
  242. if (opt->trace)
  243. {
  244. printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" :
  245. (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n);
  246. }
  247. io->error = 0;
  248. bsd->inflight++;
  249. bsd->op_n++;
  250. bsd->queue[n] = io;
  251. iovec iov[2] = { { .iov_base = op.buf, .iov_len = OSD_PACKET_SIZE } };
  252. int iovcnt = 1, wtotal = OSD_PACKET_SIZE;
  253. if (io->ddir == DDIR_WRITE)
  254. {
  255. iov[1] = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen };
  256. wtotal += io->xfer_buflen;
  257. iovcnt++;
  258. }
  259. if (writev_blocking(bsd->connect_fd, iov, iovcnt) != wtotal)
  260. {
  261. perror("writev");
  262. exit(1);
  263. }
  264. if (io->error != 0)
  265. return FIO_Q_COMPLETED;
  266. return FIO_Q_QUEUED;
  267. }
  268. static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
  269. {
  270. sec_options *opt = (sec_options*)td->eo;
  271. sec_data *bsd = (sec_data*)td->io_ops_data;
  272. // FIXME timeout, at least poll. Now it's the stupidest implementation possible
  273. osd_any_reply_t reply;
  274. while (bsd->completed.size() < min)
  275. {
  276. read_blocking(bsd->connect_fd, reply.buf, OSD_PACKET_SIZE);
  277. if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC)
  278. {
  279. fprintf(stderr, "bad reply: magic = %lx instead of %lx\n", reply.hdr.magic, SECONDARY_OSD_REPLY_MAGIC);
  280. exit(1);
  281. }
  282. auto it = bsd->queue.find(reply.hdr.id);
  283. if (it == bsd->queue.end())
  284. {
  285. fprintf(stderr, "bad reply: op id %lx missing in local queue\n", reply.hdr.id);
  286. exit(1);
  287. }
  288. io_u* io = it->second;
  289. if (io->ddir == DDIR_READ)
  290. {
  291. if (reply.hdr.retval != io->xfer_buflen)
  292. {
  293. fprintf(stderr, "Short read: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen);
  294. exit(1);
  295. }
  296. read_blocking(bsd->connect_fd, io->xfer_buf, io->xfer_buflen);
  297. }
  298. else if (io->ddir == DDIR_WRITE)
  299. {
  300. if (reply.hdr.retval != io->xfer_buflen)
  301. {
  302. fprintf(stderr, "Short write: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen);
  303. exit(1);
  304. }
  305. }
  306. else if (io->ddir == DDIR_SYNC)
  307. {
  308. if (reply.hdr.retval != 0)
  309. {
  310. fprintf(stderr, "Sync failed: retval = %ld\n", reply.hdr.retval);
  311. exit(1);
  312. }
  313. }
  314. if (opt->trace)
  315. {
  316. printf("--- %s # %ld\n", io->ddir == DDIR_READ ? "READ" :
  317. (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), reply.hdr.id);
  318. }
  319. bsd->completed.push_back(io);
  320. }
  321. return bsd->completed.size();
  322. }
  323. static struct io_u *sec_event(struct thread_data *td, int event)
  324. {
  325. sec_data *bsd = (sec_data*)td->io_ops_data;
  326. if (bsd->completed.size() == 0)
  327. return NULL;
  328. /* FIXME We ignore the event number and assume fio calls us exactly once for [0..nr_events-1] */
  329. struct io_u *ev = bsd->completed.back();
  330. bsd->completed.pop_back();
  331. return ev;
  332. }
  333. static int sec_io_u_init(struct thread_data *td, struct io_u *io)
  334. {
  335. io->engine_data = NULL;
  336. return 0;
  337. }
  338. static void sec_io_u_free(struct thread_data *td, struct io_u *io)
  339. {
  340. }
  341. static int sec_open_file(struct thread_data *td, struct fio_file *f)
  342. {
  343. return 0;
  344. }
  345. static int sec_invalidate(struct thread_data *td, struct fio_file *f)
  346. {
  347. return 0;
  348. }
  349. struct ioengine_ops ioengine = {
  350. .name = "vitastor_secondary_osd",
  351. .version = FIO_IOOPS_VERSION,
  352. .flags = FIO_MEMALIGN | FIO_DISKLESSIO | FIO_NOEXTEND,
  353. .setup = sec_setup,
  354. .init = sec_init,
  355. .queue = sec_queue,
  356. .getevents = sec_getevents,
  357. .event = sec_event,
  358. .cleanup = sec_cleanup,
  359. .open_file = sec_open_file,
  360. .invalidate = sec_invalidate,
  361. .io_u_init = sec_io_u_init,
  362. .io_u_free = sec_io_u_free,
  363. .option_struct_size = sizeof(struct sec_options),
  364. .options = options,
  365. };
  366. static void fio_init fio_sec_register(void)
  367. {
  368. register_ioengine(&ioengine);
  369. }
  370. static void fio_exit fio_sec_unregister(void)
  371. {
  372. unregister_ioengine(&ioengine);
  373. }