Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

131 lines
3.3 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. // C-C++ proxy for the QEMU driver
  4. // (QEMU headers don't compile with g++)
  5. #include <sys/epoll.h>
  6. #include "cluster_client.h"
  7. typedef void* AioContext;
  8. #include "qemu_proxy.h"
  9. extern "C"
  10. {
  11. // QEMU
  12. typedef void IOHandler(void *opaque);
  13. void aio_set_fd_handler(AioContext *ctx, int fd, int is_external, IOHandler *fd_read, IOHandler *fd_write, void *poll_fn, void *opaque);
  14. }
  15. struct QemuProxyData
  16. {
  17. int fd;
  18. std::function<void(int, int)> callback;
  19. };
  20. class QemuProxy
  21. {
  22. std::map<int, QemuProxyData> handlers;
  23. public:
  24. timerfd_manager_t *tfd;
  25. cluster_client_t *cli;
  26. AioContext *ctx;
  27. QemuProxy(AioContext *ctx, const char *etcd_host, const char *etcd_prefix)
  28. {
  29. this->ctx = ctx;
  30. json11::Json cfg = json11::Json::object {
  31. { "etcd_address", std::string(etcd_host) },
  32. { "etcd_prefix", std::string(etcd_prefix ? etcd_prefix : "/vitastor") },
  33. };
  34. tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> callback) { set_fd_handler(fd, wr, callback); });
  35. cli = new cluster_client_t(NULL, tfd, cfg);
  36. }
  37. ~QemuProxy()
  38. {
  39. cli->stop();
  40. delete cli;
  41. delete tfd;
  42. }
  43. void set_fd_handler(int fd, bool wr, std::function<void(int, int)> callback)
  44. {
  45. if (callback != NULL)
  46. {
  47. handlers[fd] = { .fd = fd, .callback = callback };
  48. aio_set_fd_handler(ctx, fd, false, &QemuProxy::read_handler, wr ? &QemuProxy::write_handler : NULL, NULL, &handlers[fd]);
  49. }
  50. else
  51. {
  52. handlers.erase(fd);
  53. aio_set_fd_handler(ctx, fd, false, NULL, NULL, NULL, NULL);
  54. }
  55. }
  56. static void read_handler(void *opaque)
  57. {
  58. QemuProxyData *data = (QemuProxyData *)opaque;
  59. data->callback(data->fd, EPOLLIN);
  60. }
  61. static void write_handler(void *opaque)
  62. {
  63. QemuProxyData *data = (QemuProxyData *)opaque;
  64. data->callback(data->fd, EPOLLOUT);
  65. }
  66. };
  67. extern "C" {
  68. void* vitastor_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix)
  69. {
  70. QemuProxy *p = new QemuProxy(ctx, etcd_host, etcd_prefix);
  71. return p;
  72. }
  73. void vitastor_proxy_destroy(void *client)
  74. {
  75. QemuProxy *p = (QemuProxy*)client;
  76. delete p;
  77. }
  78. void vitastor_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len,
  79. iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque)
  80. {
  81. QemuProxy *p = (QemuProxy*)client;
  82. cluster_op_t *op = new cluster_op_t;
  83. op->opcode = write ? OSD_OP_WRITE : OSD_OP_READ;
  84. op->inode = inode;
  85. op->offset = offset;
  86. op->len = len;
  87. for (int i = 0; i < iovcnt; i++)
  88. {
  89. op->iov.push_back(iov[i].iov_base, iov[i].iov_len);
  90. }
  91. op->callback = [cb, opaque](cluster_op_t *op)
  92. {
  93. cb(op->retval, opaque);
  94. delete op;
  95. };
  96. p->cli->execute(op);
  97. }
  98. void vitastor_proxy_sync(void *client, VitastorIOHandler cb, void *opaque)
  99. {
  100. QemuProxy *p = (QemuProxy*)client;
  101. cluster_op_t *op = new cluster_op_t;
  102. op->opcode = OSD_OP_SYNC;
  103. op->callback = [cb, opaque](cluster_op_t *op)
  104. {
  105. cb(op->retval, opaque);
  106. delete op;
  107. };
  108. p->cli->execute(op);
  109. }
  110. }