diff --git a/msgr_receive.cpp b/msgr_receive.cpp index e359e2d1..54c62953 100644 --- a/msgr_receive.cpp +++ b/msgr_receive.cpp @@ -9,6 +9,10 @@ void osd_messenger_t::read_requests() { int peer_fd = read_ready_clients[i]; osd_client_t *cl = clients[peer_fd]; + if (cl->read_msg.msg_iovlen) + { + continue; + } if (cl->read_remaining < receive_buffer_size) { cl->read_iov.iov_base = cl->in_buf; @@ -29,6 +33,7 @@ void osd_messenger_t::read_requests() io_uring_sqe* sqe = ringloop->get_sqe(); if (!sqe) { + cl->read_msg.msg_iovlen = 0; read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i); return; } @@ -52,6 +57,7 @@ void osd_messenger_t::read_requests() bool osd_messenger_t::handle_read(int result, osd_client_t *cl) { bool ret = false; + cl->read_msg.msg_iovlen = 0; cl->refs--; if (cl->peer_state == PEER_STOPPED) { @@ -160,8 +166,14 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl) { if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) return handle_reply_hdr(cl); - else + else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC) handle_op_hdr(cl); + else + { + printf("Received garbage: magic=%lx id=%lu opcode=%lx from %d\n", cl->read_op->req.hdr.magic, cl->read_op->req.hdr.id, cl->read_op->req.hdr.opcode, cl->peer_fd); + stop_client(cl->peer_fd); + return false; + } } else if (cl->read_state == CL_READ_DATA) { diff --git a/msgr_send.cpp b/msgr_send.cpp index 8542cb88..4fa44b99 100644 --- a/msgr_send.cpp +++ b/msgr_send.cpp @@ -46,7 +46,8 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE }); cl->sent_ops[cur_op->req.hdr.id] = cur_op; } - // Pre-defined send_lists + to_outbox.push_back(NULL); + // Operation data if ((cur_op->op_type == OSD_OP_IN ? (cur_op->req.hdr.opcode == OSD_OP_READ || cur_op->req.hdr.opcode == OSD_OP_SEC_READ || @@ -58,17 +59,17 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)) && cur_op->iov.count > 0) { - to_outbox.push_back(NULL); for (int i = 0; i < cur_op->iov.count; i++) { assert(cur_op->iov.buf[i].iov_base); to_send_list.push_back(cur_op->iov.buf[i]); - to_outbox.push_back(i == cur_op->iov.count-1 ? cur_op : NULL); + to_outbox.push_back(NULL); } } - else + if (cur_op->op_type == OSD_OP_IN) { - to_outbox.push_back(cur_op); + // To free it later + to_outbox[to_outbox.size()-1] = cur_op; } if (!ringloop) { @@ -92,6 +93,10 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) void osd_messenger_t::measure_exec(osd_op_t *cur_op) { // Measure execution latency + if (cur_op->req.hdr.opcode > OSD_OP_MAX) + { + return; + } timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); stats.op_stat_count[cur_op->req.hdr.opcode]++; @@ -198,11 +203,8 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) { if (cl->outbox[done]) { - // Operation fully sent - if (cl->outbox[done]->op_type == OSD_OP_IN) - { - delete cl->outbox[done]; - } + // Reply fully sent + delete cl->outbox[done]; } result -= iov.iov_len; done++; diff --git a/osd_primary.cpp b/osd_primary.cpp index bdfb1f99..dd257d2b 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -489,7 +489,11 @@ resume_7: } // Remember PG as dirty to drop the connection when PG goes offline // (this is required because of the "lazy sync") - c_cli.clients[cur_op->peer_fd]->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); + auto cl_it = c_cli.clients.find(cur_op->peer_fd); + if (cl_it != c_cli.clients.end()) + { + cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); + } dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); } return true;