From d3c6314d010801910dff48990f97f304c6895f32 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 14 Dec 2019 20:51:41 +0300 Subject: [PATCH] Finish reply code, make it compile --- Makefile | 15 +-- fio_engine.cpp | 6 +- osd.cpp | 292 +++++++++++++++++++++++++++++++------------- osd_ops.h | 5 +- test_blockstore.cpp | 4 +- 5 files changed, 222 insertions(+), 100 deletions(-) diff --git a/Makefile b/Makefile index 9504f9aa..093736a6 100644 --- a/Makefile +++ b/Makefile @@ -1,17 +1,18 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ - blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o + blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o osd.o +CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so clean: rm -f *.o crc32c.o: crc32c.c - g++ -g -O3 -fPIC -c -o $@ $< + g++ $(CXXFLAGS) -c -o $@ $< %.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h - g++ -g -O3 -Wall -Wno-sign-compare -Wno-parentheses -Wno-pointer-arith -fPIC -c -o $@ $< + g++ $(CXXFLAGS) -c -o $@ $< test: test.cpp - g++ -g -O3 -o test -luring test.cpp + g++ $(CXXFLAGS) -o test -luring test.cpp test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp - g++ -g -o test_blockstore -ltcmalloc_minimal -luring test_blockstore.cpp $(BLOCKSTORE_OBJS) + g++ $(CXXFLAGS) -o test_blockstore -ltcmalloc_minimal -luring test_blockstore.cpp $(BLOCKSTORE_OBJS) test_allocator: test_allocator.cpp allocator.o - g++ -g -o test_allocator test_allocator.cpp allocator.o + g++ $(CXXFLAGS) -o test_allocator test_allocator.cpp allocator.o libfio_blockstore.so: fio_engine.cpp $(BLOCKSTORE_OBJS) - g++ -g -O3 -ltcmalloc_minimal -Wno-pointer-arith -fPIC -shared -luring -o libfio_blockstore.so fio_engine.cpp $(BLOCKSTORE_OBJS) + g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -luring -o libfio_blockstore.so fio_engine.cpp $(BLOCKSTORE_OBJS) diff --git a/fio_engine.cpp b/fio_engine.cpp index 16ec5ca1..68eea94a 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -91,8 +91,8 @@ static struct fio_option options[] = { static int bs_setup(struct thread_data *td) { bs_data *bsd; - fio_file *f; - int r; + //fio_file *f; + //int r; //int64_t size; bsd = new bs_data; @@ -109,8 +109,8 @@ static int bs_setup(struct thread_data *td) td->o.nr_files = td->o.nr_files ? : 1; td->o.open_files++; } - f = td->files[0]; + //f = td->files[0]; //f->real_file_size = size; return 0; } diff --git a/osd.cpp b/osd.cpp index d5c12e2e..8dd717b4 100644 --- a/osd.cpp +++ b/osd.cpp @@ -1,13 +1,20 @@ #include #include +#include #include #include +#include + #include "osd_ops.h" #include "ringloop.h" #define CL_READ_OP 1 #define CL_READ_DATA 2 +#define SQE_SENT 0x100l +#define CL_WRITE_READY 1 +#define CL_WRITE_REPLY 2 +#define CL_WRITE_DATA 3 struct osd_op_t { @@ -24,6 +31,12 @@ struct osd_op_t }; blockstore_operation bs_op; void *buf = NULL; + + ~osd_op_t() + { + if (buf) + free(buf); + } }; struct osd_client_t @@ -48,7 +61,6 @@ struct osd_client_t // Write state osd_op_t *write_op = NULL; - int write_state = 0; iovec write_iov; msghdr write_msg; void *write_buf = NULL; @@ -76,14 +88,21 @@ class osd_t int bind_port, listen_backlog; std::unordered_map clients; - std::deque read_ready_clients; - std::list write_ready_clients; + std::vector read_ready_clients; + std::vector write_ready_clients; - void handle_epoll_events(); + void loop(); + int handle_epoll_events(); + void stop_client(int peer_fd); + void read_requests(); + void handle_read(ring_data_t *data, int peer_fd); + void enqueue_op(osd_op_t *cur_op); + void send_replies(); + void make_reply(osd_op_t *op); + void handle_send(ring_data_t *data, int peer_fd); public: osd_t(blockstore *bs, ring_loop_t *ringloop); ~osd_t(); - void loop(); }; osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop) @@ -100,7 +119,8 @@ osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop) setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); sockaddr_in addr; - if ((int r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1) + int r; + if ((r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1) { close(listen_fd); throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support")); @@ -108,7 +128,7 @@ osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop) addr.sin_family = AF_INET; addr.sin_port = htons(bind_port); - if (bind(listen_fd, &addr, sizeof(addr)) < 0) + if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0) { close(listen_fd); throw std::runtime_error(std::string("bind: ") + strerror(errno)); @@ -150,28 +170,29 @@ osd_t::~osd_t() void osd_t::loop() { - if (wait_state == 1) + if (wait_state == 0) { - return; - } - io_uring_sqe *sqe = ringloop->get_sqe(); - if (!sqe) - { - wait_state = 0; - return; - } - ring_data_t *data = ((ring_data_t*)sqe->user_data); - my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); - data->callback = [&](ring_data_t *data) - { - if (data->res < 0) + io_uring_sqe *sqe = ringloop->get_sqe(); + if (!sqe) { - throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); + wait_state = 0; + return; } - handle_epoll_events(); - wait_state = 0; - }; - wait_state = 1; + ring_data_t *data = ((ring_data_t*)sqe->user_data); + my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); + data->callback = [&](ring_data_t *data) + { + if (data->res < 0) + { + throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); + } + handle_epoll_events(); + wait_state = 0; + }; + wait_state = 1; + } + send_replies(); + read_requests(); ringloop->submit(); } @@ -194,7 +215,7 @@ int osd_t::handle_epoll_events() sockaddr_in addr; socklen_t peer_addr_size = sizeof(addr); int peer_fd; - while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0) + while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) { fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); clients[peer_fd] = { @@ -250,7 +271,7 @@ void osd_t::stop_client(int peer_fd) throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } auto it = clients.find(peer_fd); - if (it->read_ready) + if (it->second.read_ready) { for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) { @@ -265,7 +286,7 @@ void osd_t::stop_client(int peer_fd) close(peer_fd); } -void osd_t::read_commands() +void osd_t::read_requests() { for (int i = 0; i < read_ready_clients.size(); i++) { @@ -274,8 +295,7 @@ void osd_t::read_commands() io_uring_sqe* sqe = ringloop->get_sqe(); if (!sqe) { - read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients().begin() + i); - ringloop->submit(); + read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i); return; } ring_data_t* data = ((ring_data_t*)sqe->user_data); @@ -297,14 +317,14 @@ void osd_t::read_commands() cl.read_ready = false; } read_ready_clients.clear(); - ringloop->submit(); } void osd_t::handle_read(ring_data_t *data, int peer_fd) { - auto cl = clients.find(peer_fd); - if (cl != clients.end()) + auto cl_it = clients.find(peer_fd); + if (cl_it != clients.end()) { + auto & cl = cl_it->second; if (data->res < 0 && data->res != -EAGAIN) { // this is a client socket, so don't panic. just disconnect it @@ -312,21 +332,21 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) stop_client(peer_fd); return; } - cl->reading = false; - if (cl->read_ready) + cl.reading = false; + if (cl.read_ready) { read_ready_clients.push_back(peer_fd); } if (data->res > 0) { - cl->read_remaining -= data->res; - cl->read_buf += data->res; - if (cl->read_remaining <= 0) + cl.read_remaining -= data->res; + cl.read_buf += data->res; + if (cl.read_remaining <= 0) { - cl->read_buf = NULL; - if (cl->read_state == CL_READ_OP) + osd_op_t *cur_op = cl.read_op; + cl.read_buf = NULL; + if (cl.read_state == CL_READ_OP) { - osd_op_t *cur_op = cl->read_op; if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) @@ -338,26 +358,26 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) { // Read data - cl->read_buf = cur_op->buf; - cl->read_remaining = cur_op->op.sec_rw.len; - cl->read_state = CL_READ_DATA; + cl.read_buf = cur_op->buf; + cl.read_remaining = cur_op->op.sec_rw.len; + cl.read_state = CL_READ_DATA; } else { // Command is ready cur_op->peer_fd = peer_fd; enqueue_op(cur_op); - cl->read_op = NULL; - cl->read_state = 0; + cl.read_op = NULL; + cl.read_state = 0; } } - else if (cl->read_state == CL_READ_DATA) + else if (cl.read_state == CL_READ_DATA) { // Command is ready cur_op->peer_fd = peer_fd; enqueue_op(cur_op); - cl->read_op = NULL; - cl->read_state = 0; + cl.read_op = NULL; + cl.read_state = 0; } } } @@ -366,59 +386,159 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) void osd_t::enqueue_op(osd_op_t *cur_op) { - cur_op->bs_op->callback = [this, cur_op](blockstore_operation* bs_op) + cur_op->bs_op.callback = [this, cur_op](blockstore_operation* bs_op) { - auto cl = clients.find(cur_op->peer_fd); - if (cl != clients.end()) + auto cl_it = clients.find(cur_op->peer_fd); + if (cl_it != clients.end()) { - cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - cur_op->reply.hdr.id = cur_op->op.hdr.id; - cur_op->reply.hdr.retval = bs_op->retval; - cl->completions.push(cur_op); + auto & cl = cl_it->second; + if (cl.write_state == 0) + { + cl.write_state = CL_WRITE_READY; + write_ready_clients.push_back(cur_op->peer_fd); + } + cl.completions.push_back(cur_op); ringloop->wakeup(); } else { - if (cur_op->buf) - free(cur_op->buf); delete cur_op; } }; - if (cur_op->op->hdr.magic != SECONDARY_OSD_OP_MAGIC || - cur_op->op->hdr.opcode < OSD_OP_MIN || cur_op->op->hdr.opcode > OSD_OP_MAX || - (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE) && - (cur_op->op->sec_rw.len > OSD_RW_MAX || cur_op->op->sec_rw.len % OSD_RW_ALIGN || cur_op->op->sec_rw.offset % OSD_RW_ALIGN)) + if (cur_op->op.hdr.magic != SECONDARY_OSD_OP_MAGIC || + cur_op->op.hdr.opcode < OSD_OP_MIN || cur_op->op.hdr.opcode > OSD_OP_MAX || + (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) && + (cur_op->op.sec_rw.len > OSD_RW_MAX || cur_op->op.sec_rw.len % OSD_RW_ALIGN || cur_op->op.sec_rw.offset % OSD_RW_ALIGN)) { // Bad command - cur_op->bs_op->retval = -EINVAL; - cur_op->bs_op->callback(); + cur_op->bs_op.retval = -EINVAL; + cur_op->bs_op.callback(&cur_op->bs_op); return; } - // FIXME: list op is not a blockstore op yet - cur_op->bs_op->flags = (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ - : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE - : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC - : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? OP_STABLE - : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_DELETE ? OP_DELETE - : -1)))))); - if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ || - cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE) + // FIXME: LIST is not a blockstore op yet + cur_op->bs_op.flags = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? OP_STABLE + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE ? OP_DELETE + : -1))))); + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || + cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) { - cur_op->bs_op->oid = cur_op->op->sec_rw.oid; - cur_op->bs_op->version = cur_op->op->sec_rw.version; - cur_op->bs_op->offset = cur_op->op->sec_rw.offset; - cur_op->bs_op->len = cur_op->op->sec_rw.len; - cur_op->bs_op->buf = cur_op->buf; + cur_op->bs_op.oid = cur_op->op.sec_rw.oid; + cur_op->bs_op.version = cur_op->op.sec_rw.version; + cur_op->bs_op.offset = cur_op->op.sec_rw.offset; + cur_op->bs_op.len = cur_op->op.sec_rw.len; + cur_op->bs_op.buf = cur_op->buf; } - else if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_DELETE) + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE) { - cur_op->bs_op->oid = cur_op->op->sec_del.oid; - cur_op->bs_op->version = cur_op->op->sec_del.version; + cur_op->bs_op.oid = cur_op->op.sec_del.oid; + cur_op->bs_op.version = cur_op->op.sec_del.version; } - else if (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_STABILIZE) + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) { - cur_op->bs_op->len = cur_op->op->len/sizeof(obj_ver_id); - cur_op->bs_op->buf = cur_op->buf; + cur_op->bs_op.len = cur_op->op.sec_stabilize.len/sizeof(obj_ver_id); + cur_op->bs_op.buf = cur_op->buf; + } + bs->enqueue_op(&cur_op->bs_op); +} + +void osd_t::send_replies() +{ + for (int i = 0; i < write_ready_clients.size(); i++) + { + int peer_fd = write_ready_clients[i]; + auto & cl = clients[peer_fd]; + io_uring_sqe* sqe = ringloop->get_sqe(); + if (!sqe) + { + write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i); + return; + } + ring_data_t* data = ((ring_data_t*)sqe->user_data); + if (!cl.write_buf) + { + // pick next command + cl.write_op = cl.completions.front(); + cl.completions.pop_front(); + make_reply(cl.write_op); + cl.write_buf = &cl.write_op->reply_buf; + cl.write_remaining = OSD_REPLY_PACKET_SIZE; + cl.write_state = CL_WRITE_REPLY; + } + cl.write_iov.iov_base = cl.write_buf; + cl.write_iov.iov_len = cl.write_remaining; + cl.write_msg.msg_iov = &cl.write_iov; + cl.write_msg.msg_iovlen = 1; + data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); }; + my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); + cl.write_state = cl.write_state | SQE_SENT; + } + write_ready_clients.clear(); +} + +void osd_t::make_reply(osd_op_t *op) +{ + op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + op->reply.hdr.id = op->op.hdr.id; + op->reply.hdr.retval = op->bs_op.retval; +} + +void osd_t::handle_send(ring_data_t *data, int peer_fd) +{ + auto cl_it = clients.find(peer_fd); + if (cl_it != clients.end()) + { + auto & cl = cl_it->second; + if (data->res < 0 && data->res != -EAGAIN) + { + // this is a client socket, so don't panic. just disconnect it + printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res)); + stop_client(peer_fd); + return; + } + cl.write_state = cl.write_state & ~SQE_SENT; + if (data->res > 0) + { + cl.write_remaining -= data->res; + cl.write_buf += data->res; + if (cl.write_remaining <= 0) + { + cl.write_buf = NULL; + osd_op_t *cur_op = cl.write_op; + if (cl.write_state == CL_WRITE_REPLY) + { + if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ && + cur_op->reply.hdr.retval > 0) + { + // Send data + cl.write_buf = cur_op->buf; + cl.write_remaining = cur_op->reply.hdr.retval; + cl.write_state = CL_WRITE_DATA; + } + else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) + { + // FIXME + } + else + { + goto op_done; + } + } + else if (cl.write_state == CL_WRITE_DATA) + { + op_done: + // Done + delete cur_op; + cl.write_op = NULL; + cl.write_state = cl.completions.size() > 0 ? CL_WRITE_READY : 0; + } + } + } + if (cl.write_state != 0) + { + write_ready_clients.push_back(peer_fd); + } } - bs->enqueue_op(cur_op->bs_op); } diff --git a/osd_ops.h b/osd_ops.h index 8b6f1789..f2027284 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -4,8 +4,9 @@ #include // Magic numbers -#define SECONDARY_OSD_OP_MAGIC 0xf3f003b966ace9ab2bd7b10325434553 -#define SECONDARY_OSD_REPLY_MAGIC 0xd17a57243b580b99baa699b87b434553 + +#define SECONDARY_OSD_OP_MAGIC 0x2bd7b10325434553l +#define SECONDARY_OSD_REPLY_MAGIC 0xbaa699b87b434553l // Operation request headers and operation reply headers have fixed size after which comes data #define OSD_OP_PACKET_SIZE 0x80 #define OSD_REPLY_PACKET_SIZE 0x40 diff --git a/test_blockstore.cpp b/test_blockstore.cpp index fdd5ea58..cb2687d3 100644 --- a/test_blockstore.cpp +++ b/test_blockstore.cpp @@ -49,7 +49,7 @@ int main(int narg, char *args[]) } else if (main_state == 2) { - printf("version %u written, syncing\n", op.version); + printf("version %lu written, syncing\n", op.version); version = op.version; op.flags = OP_SYNC; bs->enqueue_op(&op); @@ -57,7 +57,7 @@ int main(int narg, char *args[]) } else if (main_state == 4) { - printf("stabilizing version %u\n", version); + printf("stabilizing version %lu\n", version); op.flags = OP_STABLE; op.len = 1; *((obj_ver_id*)op.buf) = {