Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

321 lines
8.4 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. // FIO engine to test Blockstore
  4. //
  5. // Initialize storage for tests:
  6. //
  7. // dd if=/dev/zero of=test_data.bin bs=1024 count=1048576
  8. // dd if=/dev/zero of=test_meta.bin bs=1024 count=256
  9. // dd if=/dev/zero of=test_journal.bin bs=1024 count=4096
  10. //
  11. // Random write:
  12. //
  13. // fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \
  14. // -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
  15. //
  16. // Linear write:
  17. //
  18. // fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \
  19. // -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
  20. //
  21. // Random read (run with -iodepth=32 or -iodepth=1):
  22. //
  23. // fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \
  24. // -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
  25. #include "blockstore.h"
  26. #include "epoll_manager.h"
  27. #include "fio_headers.h"
  28. #include "json11/json11.hpp"
  29. struct bs_data
  30. {
  31. blockstore_t *bs;
  32. epoll_manager_t *epmgr;
  33. ring_loop_t *ringloop;
  34. /* The list of completed io_u structs. */
  35. std::vector<io_u*> completed;
  36. int op_n = 0, inflight = 0;
  37. bool last_sync = false;
  38. };
  39. struct bs_options
  40. {
  41. int __pad;
  42. char *json_config = NULL;
  43. };
  44. static struct fio_option options[] = {
  45. {
  46. .name = "bs_config",
  47. .lname = "JSON config for Blockstore",
  48. .type = FIO_OPT_STR_STORE,
  49. .off1 = offsetof(struct bs_options, json_config),
  50. .help = "JSON config for Blockstore",
  51. .category = FIO_OPT_C_ENGINE,
  52. .group = FIO_OPT_G_FILENAME,
  53. },
  54. {
  55. .name = NULL,
  56. },
  57. };
  58. static int bs_setup(struct thread_data *td)
  59. {
  60. bs_data *bsd;
  61. //fio_file *f;
  62. //int r;
  63. //int64_t size;
  64. bsd = new bs_data;
  65. if (!bsd)
  66. {
  67. td_verror(td, errno, "calloc");
  68. return 1;
  69. }
  70. td->io_ops_data = bsd;
  71. if (!td->files_index)
  72. {
  73. add_file(td, "blockstore", 0, 0);
  74. td->o.nr_files = td->o.nr_files ? : 1;
  75. td->o.open_files++;
  76. }
  77. //f = td->files[0];
  78. //f->real_file_size = size;
  79. return 0;
  80. }
  81. static void bs_cleanup(struct thread_data *td)
  82. {
  83. bs_data *bsd = (bs_data*)td->io_ops_data;
  84. if (bsd)
  85. {
  86. while (1)
  87. {
  88. do
  89. {
  90. bsd->ringloop->loop();
  91. if (bsd->bs->is_safe_to_stop())
  92. goto safe;
  93. } while (bsd->ringloop->has_work());
  94. bsd->ringloop->wait();
  95. }
  96. safe:
  97. delete bsd->bs;
  98. delete bsd->epmgr;
  99. delete bsd->ringloop;
  100. delete bsd;
  101. }
  102. }
  103. /* Connect to the server from each thread. */
  104. static int bs_init(struct thread_data *td)
  105. {
  106. bs_options *o = (bs_options*)td->eo;
  107. bs_data *bsd = (bs_data*)td->io_ops_data;
  108. blockstore_config_t config;
  109. if (o->json_config)
  110. {
  111. std::string json_err;
  112. auto json_cfg = json11::Json::parse(o->json_config, json_err);
  113. for (auto p: json_cfg.object_items())
  114. {
  115. if (p.second.is_string())
  116. config[p.first] = p.second.string_value();
  117. else
  118. config[p.first] = p.second.dump();
  119. }
  120. }
  121. bsd->ringloop = new ring_loop_t(512);
  122. bsd->epmgr = new epoll_manager_t(bsd->ringloop);
  123. bsd->bs = new blockstore_t(config, bsd->ringloop, bsd->epmgr->tfd);
  124. while (1)
  125. {
  126. bsd->ringloop->loop();
  127. if (bsd->bs->is_started())
  128. break;
  129. bsd->ringloop->wait();
  130. }
  131. log_info("fio: blockstore initialized\n");
  132. return 0;
  133. }
  134. /* Begin read or write request. */
  135. static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
  136. {
  137. bs_data *bsd = (bs_data*)td->io_ops_data;
  138. int n = bsd->op_n;
  139. if (io->ddir == DDIR_SYNC && bsd->last_sync)
  140. {
  141. return FIO_Q_COMPLETED;
  142. }
  143. fio_ro_check(td, io);
  144. io->engine_data = bsd;
  145. if (io->ddir == DDIR_WRITE || io->ddir == DDIR_READ)
  146. assert(io->xfer_buflen <= bsd->bs->get_block_size());
  147. blockstore_op_t *op = new blockstore_op_t;
  148. op->callback = NULL;
  149. switch (io->ddir)
  150. {
  151. case DDIR_READ:
  152. op->opcode = BS_OP_READ;
  153. op->buf = io->xfer_buf;
  154. op->oid = {
  155. .inode = 1,
  156. .stripe = io->offset / bsd->bs->get_block_size(),
  157. };
  158. op->version = UINT64_MAX; // last unstable
  159. op->offset = io->offset % bsd->bs->get_block_size();
  160. op->len = io->xfer_buflen;
  161. op->callback = [io, n](blockstore_op_t *op)
  162. {
  163. io->error = op->retval < 0 ? -op->retval : 0;
  164. bs_data *bsd = (bs_data*)io->engine_data;
  165. bsd->inflight--;
  166. bsd->completed.push_back(io);
  167. #ifdef BLOCKSTORE_DEBUG
  168. printf("--- OP_READ %llx n=%d retval=%d\n", io, n, op->retval);
  169. #endif
  170. delete op;
  171. };
  172. break;
  173. case DDIR_WRITE:
  174. op->opcode = BS_OP_WRITE;
  175. op->buf = io->xfer_buf;
  176. op->oid = {
  177. .inode = 1,
  178. .stripe = io->offset / bsd->bs->get_block_size(),
  179. };
  180. op->version = 0; // assign automatically
  181. op->offset = io->offset % bsd->bs->get_block_size();
  182. op->len = io->xfer_buflen;
  183. op->callback = [io, n](blockstore_op_t *op)
  184. {
  185. io->error = op->retval < 0 ? -op->retval : 0;
  186. bs_data *bsd = (bs_data*)io->engine_data;
  187. bsd->inflight--;
  188. bsd->completed.push_back(io);
  189. #ifdef BLOCKSTORE_DEBUG
  190. printf("--- OP_WRITE %llx n=%d retval=%d\n", io, n, op->retval);
  191. #endif
  192. delete op;
  193. };
  194. bsd->last_sync = false;
  195. break;
  196. case DDIR_SYNC:
  197. op->opcode = BS_OP_SYNC_STAB_ALL;
  198. op->callback = [io, n](blockstore_op_t *op)
  199. {
  200. bs_data *bsd = (bs_data*)io->engine_data;
  201. io->error = op->retval < 0 ? -op->retval : 0;
  202. bsd->completed.push_back(io);
  203. bsd->inflight--;
  204. #ifdef BLOCKSTORE_DEBUG
  205. printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval);
  206. #endif
  207. delete op;
  208. };
  209. bsd->last_sync = true;
  210. break;
  211. default:
  212. io->error = EINVAL;
  213. return FIO_Q_COMPLETED;
  214. }
  215. #ifdef BLOCKSTORE_DEBUG
  216. printf("+++ %s %llx n=%d\n", op->opcode == OP_READ ? "OP_READ" : (op->opcode == OP_WRITE ? "OP_WRITE" : "OP_SYNC"), io, n);
  217. #endif
  218. io->error = 0;
  219. bsd->inflight++;
  220. bsd->bs->enqueue_op(op);
  221. bsd->op_n++;
  222. if (io->error != 0)
  223. return FIO_Q_COMPLETED;
  224. return FIO_Q_QUEUED;
  225. }
  226. static int bs_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
  227. {
  228. bs_data *bsd = (bs_data*)td->io_ops_data;
  229. // FIXME timeout
  230. while (true)
  231. {
  232. bsd->ringloop->loop();
  233. if (bsd->completed.size() >= min)
  234. break;
  235. bsd->ringloop->wait();
  236. }
  237. return bsd->completed.size();
  238. }
  239. static struct io_u *bs_event(struct thread_data *td, int event)
  240. {
  241. bs_data *bsd = (bs_data*)td->io_ops_data;
  242. if (bsd->completed.size() == 0)
  243. return NULL;
  244. /* FIXME We ignore the event number and assume fio calls us exactly once for [0..nr_events-1] */
  245. struct io_u *ev = bsd->completed.back();
  246. bsd->completed.pop_back();
  247. return ev;
  248. }
  249. static int bs_io_u_init(struct thread_data *td, struct io_u *io)
  250. {
  251. io->engine_data = NULL;
  252. return 0;
  253. }
  254. static void bs_io_u_free(struct thread_data *td, struct io_u *io)
  255. {
  256. }
  257. static int bs_open_file(struct thread_data *td, struct fio_file *f)
  258. {
  259. return 0;
  260. }
  261. static int bs_invalidate(struct thread_data *td, struct fio_file *f)
  262. {
  263. return 0;
  264. }
  265. struct ioengine_ops ioengine = {
  266. .name = "vitastor_blockstore",
  267. .version = FIO_IOOPS_VERSION,
  268. .flags = FIO_MEMALIGN | FIO_DISKLESSIO | FIO_NOEXTEND,
  269. .setup = bs_setup,
  270. .init = bs_init,
  271. .queue = bs_queue,
  272. .getevents = bs_getevents,
  273. .event = bs_event,
  274. .cleanup = bs_cleanup,
  275. .open_file = bs_open_file,
  276. .invalidate = bs_invalidate,
  277. .io_u_init = bs_io_u_init,
  278. .io_u_free = bs_io_u_free,
  279. .option_struct_size = sizeof(struct bs_options),
  280. .options = options,
  281. };
  282. static void fio_init fio_bs_register(void)
  283. {
  284. register_ioengine(&ioengine);
  285. }
  286. static void fio_exit fio_bs_unregister(void)
  287. {
  288. unregister_ioengine(&ioengine);
  289. }