diff --git a/messenger.cpp b/messenger.cpp index 4a5d7f6f..20aea23b 100644 --- a/messenger.cpp +++ b/messenger.cpp @@ -288,16 +288,7 @@ void osd_messenger_t::cancel_osd_ops(osd_client_t *cl) cancel_op(p.second); } cl->sent_ops.clear(); - for (auto op: cl->outbox) - { - cancel_op(op); - } cl->outbox.clear(); - if (cl->write_op) - { - cancel_op(cl->write_op); - cl->write_op = NULL; - } } void osd_messenger_t::cancel_op(osd_op_t *op) diff --git a/messenger.h b/messenger.h index c718fc91..a59652e6 100644 --- a/messenger.h +++ b/messenger.h @@ -205,8 +205,8 @@ struct osd_client_t // Read state int read_ready = 0; osd_op_t *read_op = NULL; - iovec read_iov; - msghdr read_msg; + iovec read_iov = { 0 }; + msghdr read_msg = { 0 }; int read_remaining = 0; int read_state = 0; osd_op_buf_list_t recv_list; @@ -215,17 +215,16 @@ struct osd_client_t std::vector received_ops; // Outbound operations - std::deque outbox; - std::map sent_ops; + std::map sent_ops; // PGs dirtied by this client's primary-writes std::set dirty_pgs; // Write state - osd_op_t *write_op = NULL; - msghdr write_msg; + msghdr write_msg = { 0 }; int write_state = 0; - osd_op_buf_list_t send_list; + std::vector send_list, next_send_list; + std::vector outbox, next_outbox; }; struct osd_wanted_peer_t @@ -296,6 +295,7 @@ protected: void cancel_op(osd_op_t *op); bool try_send(osd_client_t *cl); + void measure_exec(osd_op_t *cur_op); void handle_send(int result, osd_client_t *cl); bool handle_read(int result, osd_client_t *cl); diff --git a/msgr_receive.cpp b/msgr_receive.cpp index 2f959f45..e359e2d1 100644 --- a/msgr_receive.cpp +++ b/msgr_receive.cpp @@ -252,6 +252,14 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) { // Read data. In this case we assume that the buffer is preallocated by the caller (!) assert(op->iov.count > 0); + if (op->reply.hdr.retval != (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->req.sec_rw.len : op->req.rw.len)) + { + // Check reply length to not overflow the buffer + printf("Client %d read reply of different length\n", cl->peer_fd); + cl->sent_ops[op->req.hdr.id] = op; + stop_client(cl->peer_fd); + return false; + } cl->recv_list.append(op->iov); delete cl->read_op; cl->read_op = op; diff --git a/msgr_send.cpp b/msgr_send.cpp index 4ed0d001..50c1647a 100644 --- a/msgr_send.cpp +++ b/msgr_send.cpp @@ -14,6 +14,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) else { // Check that operation actually belongs to this client + // FIXME: Review if this is still needed bool found = false; for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++) { @@ -30,15 +31,50 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) return; } } - cl->outbox.push_back(cur_op); + auto & to_send_list = cl->write_msg.msg_iovlen ? cl->next_send_list : cl->send_list; + auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox; + if (cur_op->op_type == OSD_OP_IN) + { + measure_exec(cur_op); + to_send_list.push_back((iovec){ .iov_base = cur_op->reply.buf, .iov_len = OSD_PACKET_SIZE }); + } + else + { + to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE }); + cl->sent_ops[cur_op->req.hdr.id] = cur_op; + } + // Pre-defined send_lists + if ((cur_op->op_type == OSD_OP_IN + ? (cur_op->req.hdr.opcode == OSD_OP_READ || + cur_op->req.hdr.opcode == OSD_OP_SEC_READ || + cur_op->req.hdr.opcode == OSD_OP_SEC_LIST || + cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG) + : (cur_op->req.hdr.opcode == OSD_OP_WRITE || + cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || + cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE || + cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || + cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)) && cur_op->iov.count > 0) + { + to_outbox.push_back(NULL); + for (int i = 0; i < cur_op->iov.count; i++) + { + to_send_list.push_back(cur_op->iov.buf[i]); + to_outbox.push_back(i == cur_op->iov.count-1 ? cur_op : NULL); + } + } + else + { + to_outbox.push_back(cur_op); + } if (!ringloop) { - while (cl->write_op || cl->outbox.size()) + // FIXME: It's worse because it doesn't allow batching + while (cl->outbox.size()) { try_send(cl); } } - else if (cl->write_op || cl->outbox.size() > 1 || !try_send(cl)) + else if (cl->write_msg.msg_iovlen > 0 || !try_send(cl)) { if (cl->write_state == 0) { @@ -49,66 +85,44 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) } } +void osd_messenger_t::measure_exec(osd_op_t *cur_op) +{ + // Measure execution latency + timespec tv_end; + clock_gettime(CLOCK_REALTIME, &tv_end); + stats.op_stat_count[cur_op->req.hdr.opcode]++; + if (!stats.op_stat_count[cur_op->req.hdr.opcode]) + { + stats.op_stat_count[cur_op->req.hdr.opcode]++; + stats.op_stat_sum[cur_op->req.hdr.opcode] = 0; + stats.op_stat_bytes[cur_op->req.hdr.opcode] = 0; + } + stats.op_stat_sum[cur_op->req.hdr.opcode] += ( + (tv_end.tv_sec - cur_op->tv_begin.tv_sec)*1000000 + + (tv_end.tv_nsec - cur_op->tv_begin.tv_nsec)/1000 + ); + if (cur_op->req.hdr.opcode == OSD_OP_READ || + cur_op->req.hdr.opcode == OSD_OP_WRITE) + { + stats.op_stat_bytes[cur_op->req.hdr.opcode] += cur_op->req.rw.len; + } + else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || + cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || + cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) + { + stats.op_stat_bytes[cur_op->req.hdr.opcode] += cur_op->req.sec_rw.len; + } +} + bool osd_messenger_t::try_send(osd_client_t *cl) { int peer_fd = cl->peer_fd; - if (!cl->write_op) + if (!cl->send_list.size() || cl->write_msg.msg_iovlen > 0) { - // pick next command - cl->write_op = cl->outbox.front(); - cl->outbox.pop_front(); - cl->write_state = CL_WRITE_REPLY; - if (cl->write_op->op_type == OSD_OP_IN) - { - // Measure execution latency - timespec tv_end; - clock_gettime(CLOCK_REALTIME, &tv_end); - stats.op_stat_count[cl->write_op->req.hdr.opcode]++; - if (!stats.op_stat_count[cl->write_op->req.hdr.opcode]) - { - stats.op_stat_count[cl->write_op->req.hdr.opcode]++; - stats.op_stat_sum[cl->write_op->req.hdr.opcode] = 0; - stats.op_stat_bytes[cl->write_op->req.hdr.opcode] = 0; - } - stats.op_stat_sum[cl->write_op->req.hdr.opcode] += ( - (tv_end.tv_sec - cl->write_op->tv_begin.tv_sec)*1000000 + - (tv_end.tv_nsec - cl->write_op->tv_begin.tv_nsec)/1000 - ); - if (cl->write_op->req.hdr.opcode == OSD_OP_READ || - cl->write_op->req.hdr.opcode == OSD_OP_WRITE) - { - stats.op_stat_bytes[cl->write_op->req.hdr.opcode] += cl->write_op->req.rw.len; - } - else if (cl->write_op->req.hdr.opcode == OSD_OP_SEC_READ || - cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE || - cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) - { - stats.op_stat_bytes[cl->write_op->req.hdr.opcode] += cl->write_op->req.sec_rw.len; - } - cl->send_list.push_back(cl->write_op->reply.buf, OSD_PACKET_SIZE); - if (cl->write_op->req.hdr.opcode == OSD_OP_READ || - cl->write_op->req.hdr.opcode == OSD_OP_SEC_READ || - cl->write_op->req.hdr.opcode == OSD_OP_SEC_LIST || - cl->write_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG) - { - cl->send_list.append(cl->write_op->iov); - } - } - else - { - cl->send_list.push_back(cl->write_op->req.buf, OSD_PACKET_SIZE); - if (cl->write_op->req.hdr.opcode == OSD_OP_WRITE || - cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE || - cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE || - cl->write_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || - cl->write_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK) - { - cl->send_list.append(cl->write_op->iov); - } - } + return true; } - cl->write_msg.msg_iov = cl->send_list.get_iovec(); - cl->write_msg.msg_iovlen = cl->send_list.get_size(); + cl->write_msg.msg_iov = cl->send_list.data(); + cl->write_msg.msg_iovlen = cl->send_list.size(); cl->refs++; if (ringloop && !use_sync_send_recv) { @@ -149,6 +163,7 @@ void osd_messenger_t::send_replies() void osd_messenger_t::handle_send(int result, osd_client_t *cl) { + cl->write_msg.msg_iovlen = 0; cl->refs--; if (cl->peer_state == PEER_STOPPED) { @@ -167,22 +182,43 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) } if (result >= 0) { - cl->send_list.eat(result); - if (cl->send_list.done >= cl->send_list.count) + int done = 0; + while (result > 0 && done < cl->send_list.size()) { - // Done - cl->send_list.reset(); - if (cl->write_op->op_type == OSD_OP_IN) + iovec & iov = cl->send_list[done]; + if (iov.iov_len <= result) { - delete cl->write_op; + if (cl->outbox[done]) + { + // Operation fully sent + if (cl->outbox[done]->op_type == OSD_OP_IN) + { + delete cl->outbox[done]; + } + } + result -= iov.iov_len; + done++; } else { - cl->sent_ops[cl->write_op->req.hdr.id] = cl->write_op; + iov.iov_len -= result; + iov.iov_base += result; + break; } - cl->write_op = NULL; - cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0; } + if (done > 0) + { + cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+done); + cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+done); + } + if (cl->next_send_list.size()) + { + cl->send_list.insert(cl->send_list.end(), cl->next_send_list.begin(), cl->next_send_list.end()); + cl->outbox.insert(cl->outbox.end(), cl->next_outbox.begin(), cl->next_outbox.end()); + cl->next_send_list.clear(); + cl->next_outbox.clear(); + } + cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0; } if (cl->write_state != 0) {