From 02a0eb49c2ef130bc9035214ea59c3ae284cd018 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 13 Dec 2019 22:53:59 +0300 Subject: [PATCH] Begin reply code --- blockstore.cpp | 2 +- blockstore_flush.cpp | 8 +-- osd.cpp | 154 +++++++++++++++++++++++++------------------ osd_ops.h | 3 +- ringloop.cpp | 2 +- ringloop.h | 2 +- 6 files changed, 100 insertions(+), 71 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index 907d7fca..0e760f0e 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -296,5 +296,5 @@ void blockstore::enqueue_op(blockstore_operation *op) { enqueue_write(op); } - ringloop->wakeup(ring_consumer); + ringloop->wakeup(); } diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 412516ec..e0087e8a 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -101,7 +101,7 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov) void journal_flusher_t::force_start() { start_forced = true; - bs->ringloop->wakeup(bs->ring_consumer); + bs->ringloop->wakeup(); } #define await_sqe(label) \ @@ -330,12 +330,12 @@ bool journal_flusher_co::loop() if (meta_new.submitted) { meta_new.it->second.state = 1; - bs->ringloop->wakeup(bs->ring_consumer); + bs->ringloop->wakeup(); } if (meta_old.submitted) { meta_old.it->second.state = 1; - bs->ringloop->wakeup(bs->ring_consumer); + bs->ringloop->wakeup(); } // Reads completed, submit writes for (it = v.begin(); it != v.end(); it++) @@ -624,7 +624,7 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) } // Sync completed. All previous coroutines waiting for it must be resumed cur_sync->state = 2; - bs->ringloop->wakeup(bs->ring_consumer); + bs->ringloop->wakeup(); } // Wait until someone else sends and completes a sync. resume_2: diff --git a/osd.cpp b/osd.cpp index 67857181..d5c12e2e 100644 --- a/osd.cpp +++ b/osd.cpp @@ -6,15 +6,23 @@ #include "osd_ops.h" #include "ringloop.h" +#define CL_READ_OP 1 +#define CL_READ_DATA 2 + struct osd_op_t { + int peer_fd; union { osd_any_op_t op; - uint8_t op_buf[OSD_OP_PACKET_SIZE]; + uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 }; + }; + union + { + osd_any_reply_t reply; + uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 }; }; blockstore_operation bs_op; - int client_fd; void *buf = NULL; }; @@ -23,15 +31,29 @@ struct osd_client_t sockaddr_in peer_addr; socklen_t peer_addr_size; int peer_fd; - bool ready = false; - bool reading = false; - int in_flight_ops = 0; + //int in_flight_ops = 0; - struct osd_op_t *cur_op = NULL; - iovec iov; - msghdr msg; - void *cur_buf = NULL; - int cur_done = 0, cur_remaining = 0; + // Read state + bool read_ready = false; + bool reading = false; + osd_op_t *read_op = NULL; + iovec read_iov; + msghdr read_msg; + void *read_buf = NULL; + int read_remaining = 0; + int read_state = 0; + + // Completed operations to send replies back to the client + std::deque completions; + + // Write state + osd_op_t *write_op = NULL; + int write_state = 0; + iovec write_iov; + msghdr write_msg; + void *write_buf = NULL; + int write_remaining = 0; + int write_state = 0; }; class osd_t @@ -54,7 +76,8 @@ class osd_t int bind_port, listen_backlog; std::unordered_map clients; - std::deque ready_clients; + std::deque read_ready_clients; + std::list write_ready_clients; void handle_epoll_events(); public: @@ -106,7 +129,7 @@ osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop) throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); } - struct epoll_event ev; + epoll_event ev; ev.data.fd = listen_fd; ev.events = EPOLLIN; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0) @@ -131,13 +154,13 @@ void osd_t::loop() { return; } - struct io_uring_sqe *sqe = ringloop->get_sqe(); + io_uring_sqe *sqe = ringloop->get_sqe(); if (!sqe) { wait_state = 0; return; } - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + ring_data_t *data = ((ring_data_t*)sqe->user_data); my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); data->callback = [&](ring_data_t *data) { @@ -168,7 +191,7 @@ int osd_t::handle_epoll_events() if (events[i].data.fd == listen_fd) { // Accept new connections - struct sockaddr_in addr; + sockaddr_in addr; socklen_t peer_addr_size = sizeof(addr); int peer_fd; while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0) @@ -178,10 +201,9 @@ int osd_t::handle_epoll_events() .peer_addr = addr, .peer_addr_size = peer_addr_size, .peer_fd = peer_fd, - .ready = false, }; // Add FD to epoll - struct epoll_event ev; + epoll_event ev; ev.data.fd = peer_fd; ev.events = EPOLLIN | EPOLLHUP; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) @@ -204,12 +226,12 @@ int osd_t::handle_epoll_events() // Stop client stop_client(cl.peer_fd); } - else if (!cl.ready) + else if (!cl.read_ready) { // Mark client as ready (i.e. some data is available) - cl.ready = true; + cl.read_ready = true; if (!cl.reading) - ready_clients.push_back(cl.peer_fd); + read_ready_clients.push_back(cl.peer_fd); } } count++; @@ -220,7 +242,7 @@ int osd_t::handle_epoll_events() void osd_t::stop_client(int peer_fd) { - struct epoll_event ev; + epoll_event ev; ev.data.fd = peer_fd; ev.events = EPOLLIN | EPOLLHUP; if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, &ev) < 0) @@ -228,13 +250,13 @@ void osd_t::stop_client(int peer_fd) throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } auto it = clients.find(peer_fd); - if (it->ready) + if (it->read_ready) { - for (auto rit = ready_clients.begin(); rit != ready_clients.end(); rit++) + for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) { if (*rit == peer_fd) { - ready_clients.erase(rit); + read_ready_clients.erase(rit); break; } } @@ -245,36 +267,37 @@ void osd_t::stop_client(int peer_fd) void osd_t::read_commands() { - for (int i = 0; i < ready_clients.size(); i++) + for (int i = 0; i < read_ready_clients.size(); i++) { - int peer_fd = ready_clients[i]; + int peer_fd = read_ready_clients[i]; auto & cl = clients[peer_fd]; - if (!cl.cur_buf) - { - // no reads in progress, so this is probably a new command - cl.cur_op = new osd_op_t; - cl.cur_buf = &cl.cur_op->op_buf; - cl.cur_done = 0; - cl.cur_remaining = OSD_OP_PACKET_SIZE; - } - struct io_uring_sqe* sqe = ringloop->get_sqe(); + io_uring_sqe* sqe = ringloop->get_sqe(); if (!sqe) { - ready_clients.erase(ready_clients.begin(), ready_clients().begin() + i); + read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients().begin() + i); + ringloop->submit(); return; } - struct ring_data_t* data = ((ring_data_t*)sqe->user_data); - cl.iov.iov_base = cl.cur_buf; - cl.iov.iov_len = cl.cur_remaining; - cl.msg.msg_iov = &cl.iov; - cl.msg.msg_iovlen = 1; + ring_data_t* data = ((ring_data_t*)sqe->user_data); + if (!cl.read_buf) + { + // no reads in progress, so this is probably a new command + cl.read_op = new osd_op_t; + cl.read_buf = &cl.read_op->op_buf; + cl.read_remaining = OSD_OP_PACKET_SIZE; + cl.read_state = CL_READ_OP; + } + cl.read_iov.iov_base = cl.read_buf; + cl.read_iov.iov_len = cl.read_remaining; + cl.read_msg.msg_iov = &cl.read_iov; + cl.read_msg.msg_iovlen = 1; data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); }; - my_uring_prep_recvmsg(sqe, peer_fd, &cl.msg, 0); - ringloop->submit(); + my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); cl.reading = true; - cl.ready = false; + cl.read_ready = false; } - ready_clients.clear(); + read_ready_clients.clear(); + ringloop->submit(); } void osd_t::handle_read(ring_data_t *data, int peer_fd) @@ -290,21 +313,20 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) return; } cl->reading = false; - if (cl->ready) + if (cl->read_ready) { - ready_clients.push_back(peer_fd); + read_ready_clients.push_back(peer_fd); } if (data->res > 0) { - cl->cur_done += data->res; - cl->cur_remaining -= data->res; - cl->cur_buf += data->res; - if (cl->cur_remaining <= 0) + cl->read_remaining -= data->res; + cl->read_buf += data->res; + if (cl->read_remaining <= 0) { - cl->cur_buf = NULL; - if (cl->read_state == CL_READ_COMMAND) + cl->read_buf = NULL; + if (cl->read_state == CL_READ_OP) { - osd_op_t *cur_op = cl->cur_op; + osd_op_t *cur_op = cl->read_op; if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) @@ -316,24 +338,25 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) { // Read data - cl->cur_buf = cur_op->buf; - cl->cur_done = 0; - cl->cur_remaining = cur_op->op.sec_rw.len; + cl->read_buf = cur_op->buf; + cl->read_remaining = cur_op->op.sec_rw.len; cl->read_state = CL_READ_DATA; } else { // Command is ready + cur_op->peer_fd = peer_fd; enqueue_op(cur_op); - cl->cur_op = NULL; + cl->read_op = NULL; cl->read_state = 0; } } else if (cl->read_state == CL_READ_DATA) { // Command is ready + cur_op->peer_fd = peer_fd; enqueue_op(cur_op); - cl->cur_op = NULL; + cl->read_op = NULL; cl->read_state = 0; } } @@ -341,14 +364,18 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) } } -void osd_t::enqueue_op(int peer_fd, osd_op_t *cur_op) +void osd_t::enqueue_op(osd_op_t *cur_op) { - cur_op->bs_op->callback = [this, peer_fd, cur_op](blockstore_operation* bs_op) + cur_op->bs_op->callback = [this, cur_op](blockstore_operation* bs_op) { - auto cl = clients.find(peer_fd); + auto cl = clients.find(cur_op->peer_fd); if (cl != clients.end()) { - cl->replies.push(cur_op); + cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + cur_op->reply.hdr.id = cur_op->op.hdr.id; + cur_op->reply.hdr.retval = bs_op->retval; + cl->completions.push(cur_op); + ringloop->wakeup(); } else { @@ -367,6 +394,7 @@ void osd_t::enqueue_op(int peer_fd, osd_op_t *cur_op) cur_op->bs_op->callback(); return; } + // FIXME: list op is not a blockstore op yet cur_op->bs_op->flags = (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE : (cur_op->op->hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC diff --git a/osd_ops.h b/osd_ops.h index 75d73ccb..8b6f1789 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -8,7 +8,7 @@ #define SECONDARY_OSD_REPLY_MAGIC 0xd17a57243b580b99baa699b87b434553 // Operation request headers and operation reply headers have fixed size after which comes data #define OSD_OP_PACKET_SIZE 0x80 -#define OSD_REPLY_PACKET_SIZE 0x80 +#define OSD_REPLY_PACKET_SIZE 0x40 // Opcodes #define OSD_OP_MIN 0x01 #define OSD_OP_SECONDARY_READ 0x01 @@ -130,6 +130,7 @@ union osd_any_op_t union osd_any_reply_t { + osd_reply_header_t hdr; osd_reply_secondary_rw_t sec_rw; osd_reply_secondary_del_t sec_del; osd_reply_secondary_sync_t sec_sync; diff --git a/ringloop.cpp b/ringloop.cpp index 509366c2..11358efc 100644 --- a/ringloop.cpp +++ b/ringloop.cpp @@ -27,7 +27,7 @@ int ring_loop_t::register_consumer(ring_consumer_t & consumer) return consumer.number; } -void ring_loop_t::wakeup(ring_consumer_t & consumer) +void ring_loop_t::wakeup() { loop_again = true; } diff --git a/ringloop.h b/ringloop.h index 17eff381..df232e28 100644 --- a/ringloop.h +++ b/ringloop.h @@ -136,7 +136,7 @@ public: return sqe; } int register_consumer(ring_consumer_t & consumer); - void wakeup(ring_consumer_t & consumer); + void wakeup(); void unregister_consumer(ring_consumer_t & consumer); void loop(); inline int submit()