From a8deb7d88c881cb53e40c410643bad52139babd0 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 11 Dec 2019 14:18:19 +0300 Subject: [PATCH] Begin OSD --- osd.cpp | 119 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ osd_ops.h | 25 ++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 osd.cpp diff --git a/osd.cpp b/osd.cpp new file mode 100644 index 00000000..53e11cb7 --- /dev/null +++ b/osd.cpp @@ -0,0 +1,119 @@ +#include +#include +#include +#include + +#include "osd_ops.h" +#include "ringloop.h" + +class osd_t +{ + int wait_state = 0; + int epoll_fd = 0; + int listen_fd = 0; + ring_consumer_t consumer; + + std::string bind_address; + int bind_port, listen_backlog; + ring_loop_t *ringloop; + + void handle_epoll_events(); +public: + osd_t(ring_loop_t *ringloop); + ~osd_t(); + void loop(); +}; + +class osd_client_t +{ + int sock_fd; +}; + +osd_t::osd_t(ring_loop_t *ringloop) +{ + this->ringloop = ringloop; + + listen_fd = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd < 0) + { + throw std::runtime_error(std::string("socket: ") + strerror(errno)); + } + + sockaddr_in addr; + if ((int r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1) + { + close(listen_fd); + throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support")); + } + addr.sin_family = AF_INET; + addr.sin_port = bind_port; + + if (bind(listen_fd, &addr, sizeof(addr)) < 0) + { + close(listen_fd); + throw std::runtime_error(std::string("bind: ") + strerror(errno)); + } + + if (listen(listen_fd, listen_backlog) < 0) + { + close(listen_fd); + throw std::runtime_error(std::string("listen: ") + strerror(errno)); + } + + epoll_fd = epoll_create(1); + if (epoll_fd < 0) + { + close(listen_fd); + throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); + } + + struct epoll_event ev; + ev.data.fd = listen_fd; + ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, sfd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + + consumer.loop = [this]() { loop(); }; + ringloop->register_consumer(consumer); +} + +osd_t::~osd_t() +{ + ringloop->unregister_consumer(consumer); + close(epoll_fd); + close(listen_fd); +} + +void osd_t::loop() +{ + if (wait_state == 1) + { + return; + } + struct io_uring_sqe *sqe = ringloop->get_sqe(); + if (!sqe) + { + wait_state = 0; + return; + } + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); + data->callback = [&](ring_data_t *data) + { + if (data->res < 0) + { + throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); + } + handle_epoll_events(); + wait_state = 0; + }; + wait_state = 1; + ringloop->submit(); +} + +void osd_t::handle_epoll_events() +{ + +} diff --git a/osd_ops.h b/osd_ops.h index 2d28cc52..76acd1fa 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -1,9 +1,12 @@ #pragma once +#include "blockstore.h" #include #define SECONDARY_OSD_OP_MAGIC 0xf3f003b966ace9ab2bd7b10325434553 #define SECONDARY_OSD_REPLY_MAGIC 0xd17a57243b580b99baa699b87b434553 +#define OSD_OP_PACKET_SIZE 0x80 +#define OSD_REPLY_PACKET_SIZE 0x80 #define OSD_OP_SECONDARY_READ 0x01 #define OSD_OP_SECONDARY_WRITE 0x02 #define OSD_OP_SECONDARY_SYNC 0x03 @@ -144,3 +147,25 @@ struct __attribute__((__packed__)) osd_reply_secondary_list_t // oid array object_id *oids; }; + +union osd_any_op_t +{ + osd_op_secondary_rw_t secondary_rw; + osd_op_secondary_del_t secondary_del; + osd_op_secondary_sync_t op_sync; + osd_op_secondary_stabilize_t op_stabilize; + osd_op_secondary_list_t op_list; +}; + +union osd_any_reply_t +{ + osd_reply_secondary_rw_t secondary_rw; + osd_reply_secondary_del_t secondary_del; + osd_reply_secondary_sync_t op_sync; + osd_reply_secondary_stabilize_t op_stabilize; + osd_reply_secondary_list_t op_list; +}; + +static int size_ok = sizeof(osd_any_op_t) < OSD_OP_PACKET_SIZE && + sizeof(osd_any_reply_t) < OSD_REPLY_PACKET_SIZE + ? (perror("BUG: too small packet size"), 0) : 1;