Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

334 lines
8.8 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. // FIO engine to test cluster I/O
  4. //
  5. // Random write:
  6. //
  7. // fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \
  8. // -etcd=127.0.0.1:2379 [-etcd_prefix=/vitastor] -pool=1 -inode=1 -size=1000M
  9. //
  10. // Linear write:
  11. //
  12. // fio -thread -ioengine=./libfio_cluster.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \
  13. // -etcd=127.0.0.1:2379 [-etcd_prefix=/vitastor] -pool=1 -inode=1 -size=1000M
  14. //
  15. // Random read (run with -iodepth=32 or -iodepth=1):
  16. //
  17. // fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \
  18. // -etcd=127.0.0.1:2379 [-etcd_prefix=/vitastor] -pool=1 -inode=1 -size=1000M
  19. #include <sys/types.h>
  20. #include <sys/socket.h>
  21. #include <netinet/in.h>
  22. #include <netinet/tcp.h>
  23. #include <vector>
  24. #include <unordered_map>
  25. #include "epoll_manager.h"
  26. #include "cluster_client.h"
  27. #include "fio_headers.h"
  28. struct sec_data
  29. {
  30. ring_loop_t *ringloop = NULL;
  31. epoll_manager_t *epmgr = NULL;
  32. cluster_client_t *cli = NULL;
  33. bool last_sync = false;
  34. /* The list of completed io_u structs. */
  35. std::vector<io_u*> completed;
  36. uint64_t op_n = 0, inflight = 0;
  37. bool trace = false;
  38. };
  39. struct sec_options
  40. {
  41. int __pad;
  42. char *etcd_host = NULL;
  43. char *etcd_prefix = NULL;
  44. uint64_t pool = 0;
  45. uint64_t inode = 0;
  46. int cluster_log = 0;
  47. int trace = 0;
  48. };
  49. static struct fio_option options[] = {
  50. {
  51. .name = "etcd",
  52. .lname = "etcd address",
  53. .type = FIO_OPT_STR_STORE,
  54. .off1 = offsetof(struct sec_options, etcd_host),
  55. .help = "etcd address in the form HOST:PORT[/PATH]",
  56. .category = FIO_OPT_C_ENGINE,
  57. .group = FIO_OPT_G_FILENAME,
  58. },
  59. {
  60. .name = "etcd",
  61. .lname = "etcd key prefix",
  62. .type = FIO_OPT_STR_STORE,
  63. .off1 = offsetof(struct sec_options, etcd_prefix),
  64. .help = "etcd key prefix, by default /vitastor",
  65. .category = FIO_OPT_C_ENGINE,
  66. .group = FIO_OPT_G_FILENAME,
  67. },
  68. {
  69. .name = "pool",
  70. .lname = "pool number for the inode",
  71. .type = FIO_OPT_INT,
  72. .off1 = offsetof(struct sec_options, pool),
  73. .help = "pool number for the inode to run tests on",
  74. .category = FIO_OPT_C_ENGINE,
  75. .group = FIO_OPT_G_FILENAME,
  76. },
  77. {
  78. .name = "inode",
  79. .lname = "inode to run tests on",
  80. .type = FIO_OPT_INT,
  81. .off1 = offsetof(struct sec_options, inode),
  82. .help = "inode to run tests on (1 by default)",
  83. .category = FIO_OPT_C_ENGINE,
  84. .group = FIO_OPT_G_FILENAME,
  85. },
  86. {
  87. .name = "cluster_log_level",
  88. .lname = "cluster log level",
  89. .type = FIO_OPT_BOOL,
  90. .off1 = offsetof(struct sec_options, cluster_log),
  91. .help = "Set log level for the Vitastor client",
  92. .def = "0",
  93. .category = FIO_OPT_C_ENGINE,
  94. .group = FIO_OPT_G_FILENAME,
  95. },
  96. {
  97. .name = "osd_trace",
  98. .lname = "OSD trace",
  99. .type = FIO_OPT_BOOL,
  100. .off1 = offsetof(struct sec_options, trace),
  101. .help = "Trace OSD operations",
  102. .def = "0",
  103. .category = FIO_OPT_C_ENGINE,
  104. .group = FIO_OPT_G_FILENAME,
  105. },
  106. {
  107. .name = NULL,
  108. },
  109. };
  110. static int sec_setup(struct thread_data *td)
  111. {
  112. sec_data *bsd;
  113. bsd = new sec_data;
  114. if (!bsd)
  115. {
  116. td_verror(td, errno, "calloc");
  117. return 1;
  118. }
  119. td->io_ops_data = bsd;
  120. if (!td->files_index)
  121. {
  122. add_file(td, "osd_cluster", 0, 0);
  123. td->o.nr_files = td->o.nr_files ? : 1;
  124. td->o.open_files++;
  125. }
  126. return 0;
  127. }
  128. static void sec_cleanup(struct thread_data *td)
  129. {
  130. sec_data *bsd = (sec_data*)td->io_ops_data;
  131. if (bsd)
  132. {
  133. delete bsd->cli;
  134. delete bsd->epmgr;
  135. delete bsd->ringloop;
  136. bsd->cli = NULL;
  137. bsd->epmgr = NULL;
  138. bsd->ringloop = NULL;
  139. }
  140. }
  141. /* Connect to the server from each thread. */
  142. static int sec_init(struct thread_data *td)
  143. {
  144. sec_options *o = (sec_options*)td->eo;
  145. sec_data *bsd = (sec_data*)td->io_ops_data;
  146. json11::Json cfg = json11::Json::object {
  147. { "etcd_address", std::string(o->etcd_host) },
  148. { "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/vitastor") },
  149. { "log_level", o->cluster_log },
  150. };
  151. if (o->pool)
  152. o->inode = (o->inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (o->pool << (64-POOL_ID_BITS));
  153. if (!(o->inode >> (64-POOL_ID_BITS)))
  154. {
  155. td_verror(td, EINVAL, "pool is missing");
  156. return 1;
  157. }
  158. bsd->ringloop = new ring_loop_t(512);
  159. bsd->epmgr = new epoll_manager_t(bsd->ringloop);
  160. bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg);
  161. bsd->trace = o->trace ? true : false;
  162. return 0;
  163. }
  164. /* Begin read or write request. */
  165. static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
  166. {
  167. sec_options *opt = (sec_options*)td->eo;
  168. sec_data *bsd = (sec_data*)td->io_ops_data;
  169. int n = bsd->op_n;
  170. fio_ro_check(td, io);
  171. if (io->ddir == DDIR_SYNC && bsd->last_sync)
  172. {
  173. return FIO_Q_COMPLETED;
  174. }
  175. io->engine_data = bsd;
  176. cluster_op_t *op = new cluster_op_t;
  177. switch (io->ddir)
  178. {
  179. case DDIR_READ:
  180. op->opcode = OSD_OP_READ;
  181. op->inode = opt->inode;
  182. op->offset = io->offset;
  183. op->len = io->xfer_buflen;
  184. op->iov.push_back(io->xfer_buf, io->xfer_buflen);
  185. bsd->last_sync = false;
  186. break;
  187. case DDIR_WRITE:
  188. op->opcode = OSD_OP_WRITE;
  189. op->inode = opt->inode;
  190. op->offset = io->offset;
  191. op->len = io->xfer_buflen;
  192. op->iov.push_back(io->xfer_buf, io->xfer_buflen);
  193. bsd->last_sync = false;
  194. break;
  195. case DDIR_SYNC:
  196. op->opcode = OSD_OP_SYNC;
  197. bsd->last_sync = true;
  198. break;
  199. default:
  200. io->error = EINVAL;
  201. return FIO_Q_COMPLETED;
  202. }
  203. op->callback = [io, n](cluster_op_t *op)
  204. {
  205. io->error = op->retval < 0 ? -op->retval : 0;
  206. sec_data *bsd = (sec_data*)io->engine_data;
  207. bsd->inflight--;
  208. bsd->completed.push_back(io);
  209. if (bsd->trace)
  210. {
  211. printf("--- %s n=%d retval=%d\n", io->ddir == DDIR_READ ? "READ" :
  212. (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n, op->retval);
  213. }
  214. delete op;
  215. };
  216. if (opt->trace)
  217. {
  218. if (io->ddir == DDIR_SYNC)
  219. {
  220. printf("+++ SYNC # %d\n", n);
  221. }
  222. else
  223. {
  224. printf("+++ %s # %d 0x%llx+%llx\n",
  225. io->ddir == DDIR_READ ? "READ" : "WRITE",
  226. n, io->offset, io->xfer_buflen);
  227. }
  228. }
  229. io->error = 0;
  230. bsd->inflight++;
  231. bsd->op_n++;
  232. bsd->cli->execute(op);
  233. if (io->error != 0)
  234. return FIO_Q_COMPLETED;
  235. return FIO_Q_QUEUED;
  236. }
  237. static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
  238. {
  239. sec_data *bsd = (sec_data*)td->io_ops_data;
  240. while (true)
  241. {
  242. bsd->ringloop->loop();
  243. if (bsd->completed.size() >= min)
  244. break;
  245. bsd->ringloop->wait();
  246. }
  247. return bsd->completed.size();
  248. }
  249. static struct io_u *sec_event(struct thread_data *td, int event)
  250. {
  251. sec_data *bsd = (sec_data*)td->io_ops_data;
  252. if (bsd->completed.size() == 0)
  253. return NULL;
  254. /* FIXME We ignore the event number and assume fio calls us exactly once for [0..nr_events-1] */
  255. struct io_u *ev = bsd->completed.back();
  256. bsd->completed.pop_back();
  257. return ev;
  258. }
  259. static int sec_io_u_init(struct thread_data *td, struct io_u *io)
  260. {
  261. io->engine_data = NULL;
  262. return 0;
  263. }
  264. static void sec_io_u_free(struct thread_data *td, struct io_u *io)
  265. {
  266. }
  267. static int sec_open_file(struct thread_data *td, struct fio_file *f)
  268. {
  269. return 0;
  270. }
  271. static int sec_invalidate(struct thread_data *td, struct fio_file *f)
  272. {
  273. return 0;
  274. }
  275. struct ioengine_ops ioengine = {
  276. .name = "vitastor_cluster",
  277. .version = FIO_IOOPS_VERSION,
  278. .flags = FIO_MEMALIGN | FIO_DISKLESSIO | FIO_NOEXTEND,
  279. .setup = sec_setup,
  280. .init = sec_init,
  281. .queue = sec_queue,
  282. .getevents = sec_getevents,
  283. .event = sec_event,
  284. .cleanup = sec_cleanup,
  285. .open_file = sec_open_file,
  286. .invalidate = sec_invalidate,
  287. .io_u_init = sec_io_u_init,
  288. .io_u_free = sec_io_u_free,
  289. .option_struct_size = sizeof(struct sec_options),
  290. .options = options,
  291. };
  292. static void fio_init fio_sec_register(void)
  293. {
  294. register_ioengine(&ioengine);
  295. }
  296. static void fio_exit fio_sec_unregister(void)
  297. {
  298. unregister_ioengine(&ioengine);
  299. }