Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

317 lines
8.3 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. // FIO engine to test Blockstore
  4. //
  5. // Initialize storage for tests:
  6. //
  7. // dd if=/dev/zero of=test_data.bin bs=1024 count=1048576
  8. // dd if=/dev/zero of=test_meta.bin bs=1024 count=256
  9. // dd if=/dev/zero of=test_journal.bin bs=1024 count=4096
  10. //
  11. // Random write:
  12. //
  13. // fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \
  14. // -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
  15. //
  16. // Linear write:
  17. //
  18. // fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \
  19. // -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
  20. //
  21. // Random read (run with -iodepth=32 or -iodepth=1):
  22. //
  23. // fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \
  24. // -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
  25. #include "blockstore.h"
  26. #include "fio_headers.h"
  27. #include "json11/json11.hpp"
  28. struct bs_data
  29. {
  30. blockstore_t *bs;
  31. ring_loop_t *ringloop;
  32. /* The list of completed io_u structs. */
  33. std::vector<io_u*> completed;
  34. int op_n = 0, inflight = 0;
  35. bool last_sync = false;
  36. };
  37. struct bs_options
  38. {
  39. int __pad;
  40. char *json_config = NULL;
  41. };
  42. static struct fio_option options[] = {
  43. {
  44. .name = "bs_config",
  45. .lname = "JSON config for Blockstore",
  46. .type = FIO_OPT_STR_STORE,
  47. .off1 = offsetof(struct bs_options, json_config),
  48. .help = "JSON config for Blockstore",
  49. .category = FIO_OPT_C_ENGINE,
  50. .group = FIO_OPT_G_FILENAME,
  51. },
  52. {
  53. .name = NULL,
  54. },
  55. };
  56. static int bs_setup(struct thread_data *td)
  57. {
  58. bs_data *bsd;
  59. //fio_file *f;
  60. //int r;
  61. //int64_t size;
  62. bsd = new bs_data;
  63. if (!bsd)
  64. {
  65. td_verror(td, errno, "calloc");
  66. return 1;
  67. }
  68. td->io_ops_data = bsd;
  69. if (!td->files_index)
  70. {
  71. add_file(td, "blockstore", 0, 0);
  72. td->o.nr_files = td->o.nr_files ? : 1;
  73. td->o.open_files++;
  74. }
  75. //f = td->files[0];
  76. //f->real_file_size = size;
  77. return 0;
  78. }
  79. static void bs_cleanup(struct thread_data *td)
  80. {
  81. bs_data *bsd = (bs_data*)td->io_ops_data;
  82. if (bsd)
  83. {
  84. while (1)
  85. {
  86. do
  87. {
  88. bsd->ringloop->loop();
  89. if (bsd->bs->is_safe_to_stop())
  90. goto safe;
  91. } while (bsd->ringloop->has_work());
  92. bsd->ringloop->wait();
  93. }
  94. safe:
  95. delete bsd->bs;
  96. delete bsd->ringloop;
  97. delete bsd;
  98. }
  99. }
  100. /* Connect to the server from each thread. */
  101. static int bs_init(struct thread_data *td)
  102. {
  103. bs_options *o = (bs_options*)td->eo;
  104. bs_data *bsd = (bs_data*)td->io_ops_data;
  105. blockstore_config_t config;
  106. if (o->json_config)
  107. {
  108. std::string json_err;
  109. auto json_cfg = json11::Json::parse(o->json_config, json_err);
  110. for (auto p: json_cfg.object_items())
  111. {
  112. if (p.second.is_string())
  113. config[p.first] = p.second.string_value();
  114. else
  115. config[p.first] = p.second.dump();
  116. }
  117. }
  118. bsd->ringloop = new ring_loop_t(512);
  119. bsd->bs = new blockstore_t(config, bsd->ringloop);
  120. while (1)
  121. {
  122. bsd->ringloop->loop();
  123. if (bsd->bs->is_started())
  124. break;
  125. bsd->ringloop->wait();
  126. }
  127. log_info("fio: blockstore initialized\n");
  128. return 0;
  129. }
  130. /* Begin read or write request. */
  131. static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
  132. {
  133. bs_data *bsd = (bs_data*)td->io_ops_data;
  134. int n = bsd->op_n;
  135. if (io->ddir == DDIR_SYNC && bsd->last_sync)
  136. {
  137. return FIO_Q_COMPLETED;
  138. }
  139. fio_ro_check(td, io);
  140. io->engine_data = bsd;
  141. if (io->ddir == DDIR_WRITE || io->ddir == DDIR_READ)
  142. assert(io->xfer_buflen <= bsd->bs->get_block_size());
  143. blockstore_op_t *op = new blockstore_op_t;
  144. op->callback = NULL;
  145. switch (io->ddir)
  146. {
  147. case DDIR_READ:
  148. op->opcode = BS_OP_READ;
  149. op->buf = io->xfer_buf;
  150. op->oid = {
  151. .inode = 1,
  152. .stripe = io->offset / bsd->bs->get_block_size(),
  153. };
  154. op->version = UINT64_MAX; // last unstable
  155. op->offset = io->offset % bsd->bs->get_block_size();
  156. op->len = io->xfer_buflen;
  157. op->callback = [io, n](blockstore_op_t *op)
  158. {
  159. io->error = op->retval < 0 ? -op->retval : 0;
  160. bs_data *bsd = (bs_data*)io->engine_data;
  161. bsd->inflight--;
  162. bsd->completed.push_back(io);
  163. #ifdef BLOCKSTORE_DEBUG
  164. printf("--- OP_READ %llx n=%d retval=%d\n", io, n, op->retval);
  165. #endif
  166. delete op;
  167. };
  168. break;
  169. case DDIR_WRITE:
  170. op->opcode = BS_OP_WRITE;
  171. op->buf = io->xfer_buf;
  172. op->oid = {
  173. .inode = 1,
  174. .stripe = io->offset / bsd->bs->get_block_size(),
  175. };
  176. op->version = 0; // assign automatically
  177. op->offset = io->offset % bsd->bs->get_block_size();
  178. op->len = io->xfer_buflen;
  179. op->callback = [io, n](blockstore_op_t *op)
  180. {
  181. io->error = op->retval < 0 ? -op->retval : 0;
  182. bs_data *bsd = (bs_data*)io->engine_data;
  183. bsd->inflight--;
  184. bsd->completed.push_back(io);
  185. #ifdef BLOCKSTORE_DEBUG
  186. printf("--- OP_WRITE %llx n=%d retval=%d\n", io, n, op->retval);
  187. #endif
  188. delete op;
  189. };
  190. bsd->last_sync = false;
  191. break;
  192. case DDIR_SYNC:
  193. op->opcode = BS_OP_SYNC_STAB_ALL;
  194. op->callback = [io, n](blockstore_op_t *op)
  195. {
  196. bs_data *bsd = (bs_data*)io->engine_data;
  197. io->error = op->retval < 0 ? -op->retval : 0;
  198. bsd->completed.push_back(io);
  199. bsd->inflight--;
  200. #ifdef BLOCKSTORE_DEBUG
  201. printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval);
  202. #endif
  203. delete op;
  204. };
  205. bsd->last_sync = true;
  206. break;
  207. default:
  208. io->error = EINVAL;
  209. return FIO_Q_COMPLETED;
  210. }
  211. #ifdef BLOCKSTORE_DEBUG
  212. printf("+++ %s %llx n=%d\n", op->opcode == OP_READ ? "OP_READ" : (op->opcode == OP_WRITE ? "OP_WRITE" : "OP_SYNC"), io, n);
  213. #endif
  214. io->error = 0;
  215. bsd->inflight++;
  216. bsd->bs->enqueue_op(op);
  217. bsd->op_n++;
  218. if (io->error != 0)
  219. return FIO_Q_COMPLETED;
  220. return FIO_Q_QUEUED;
  221. }
  222. static int bs_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
  223. {
  224. bs_data *bsd = (bs_data*)td->io_ops_data;
  225. // FIXME timeout
  226. while (true)
  227. {
  228. bsd->ringloop->loop();
  229. if (bsd->completed.size() >= min)
  230. break;
  231. bsd->ringloop->wait();
  232. }
  233. return bsd->completed.size();
  234. }
  235. static struct io_u *bs_event(struct thread_data *td, int event)
  236. {
  237. bs_data *bsd = (bs_data*)td->io_ops_data;
  238. if (bsd->completed.size() == 0)
  239. return NULL;
  240. /* FIXME We ignore the event number and assume fio calls us exactly once for [0..nr_events-1] */
  241. struct io_u *ev = bsd->completed.back();
  242. bsd->completed.pop_back();
  243. return ev;
  244. }
  245. static int bs_io_u_init(struct thread_data *td, struct io_u *io)
  246. {
  247. io->engine_data = NULL;
  248. return 0;
  249. }
  250. static void bs_io_u_free(struct thread_data *td, struct io_u *io)
  251. {
  252. }
  253. static int bs_open_file(struct thread_data *td, struct fio_file *f)
  254. {
  255. return 0;
  256. }
  257. static int bs_invalidate(struct thread_data *td, struct fio_file *f)
  258. {
  259. return 0;
  260. }
  261. struct ioengine_ops ioengine = {
  262. .name = "vitastor_blockstore",
  263. .version = FIO_IOOPS_VERSION,
  264. .flags = FIO_MEMALIGN | FIO_DISKLESSIO | FIO_NOEXTEND,
  265. .setup = bs_setup,
  266. .init = bs_init,
  267. .queue = bs_queue,
  268. .getevents = bs_getevents,
  269. .event = bs_event,
  270. .cleanup = bs_cleanup,
  271. .open_file = bs_open_file,
  272. .invalidate = bs_invalidate,
  273. .io_u_init = bs_io_u_init,
  274. .io_u_free = bs_io_u_free,
  275. .option_struct_size = sizeof(struct bs_options),
  276. .options = options,
  277. };
  278. static void fio_init fio_bs_register(void)
  279. {
  280. register_ioengine(&ioengine);
  281. }
  282. static void fio_exit fio_bs_unregister(void)
  283. {
  284. unregister_ioengine(&ioengine);
  285. }