diff --git a/osd.cpp b/osd.cpp index d8a0b5f43..784912516 100644 --- a/osd.cpp +++ b/osd.cpp @@ -284,7 +284,6 @@ void osd_t::cancel_osd_ops(osd_client_t & cl) { cancel_op(cl.write_op); cl.write_op = NULL; - cl.write_buf = NULL; } } @@ -357,6 +356,7 @@ void osd_t::exec_op(osd_op_t *cur_op) delete cur_op; return; } + cur_op->send_list.push_back(cur_op->reply.buf, OSD_PACKET_SIZE); if (cur_op->req.hdr.magic != SECONDARY_OSD_OP_MAGIC || cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX || (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) && diff --git a/osd.h b/osd.h index 858b1d1e0..37a5283da 100644 --- a/osd.h +++ b/osd.h @@ -27,12 +27,10 @@ #define CL_READ_OP 1 #define CL_READ_DATA 2 #define CL_READ_REPLY_DATA 3 -#define SQE_SENT 0x100l #define CL_WRITE_READY 1 #define CL_WRITE_REPLY 2 -#define CL_WRITE_DATA 3 #define MAX_EPOLL_EVENTS 64 -#define OSD_OP_INLINE_BUF_COUNT 4 +#define OSD_OP_INLINE_BUF_COUNT 16 #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 @@ -41,17 +39,11 @@ //#define OSD_STUB -struct osd_op_buf_t -{ - void *buf; - int len; -}; - struct osd_op_buf_list_t { int count = 0, alloc = 0, sent = 0; - osd_op_buf_t *buf = NULL; - osd_op_buf_t inline_buf[OSD_OP_INLINE_BUF_COUNT]; + iovec *buf = NULL; + iovec inline_buf[OSD_OP_INLINE_BUF_COUNT]; ~osd_op_buf_list_t() { @@ -61,7 +53,17 @@ struct osd_op_buf_list_t } } - inline void push_back(void *nbuf, int len) + inline iovec* get_iovec() + { + return (buf ? buf : inline_buf) + sent; + } + + inline int get_size() + { + return count - sent; + } + + inline void push_back(void *nbuf, size_t len) { if (count >= alloc) { @@ -74,21 +76,16 @@ struct osd_op_buf_list_t { int old = alloc; alloc = ((alloc/16)*16 + 1); - buf = (osd_op_buf_t*)malloc(sizeof(osd_op_buf_t) * alloc); - memcpy(buf, inline_buf, sizeof(osd_op_buf_t)*old); + buf = (iovec*)malloc(sizeof(iovec) * alloc); + memcpy(buf, inline_buf, sizeof(iovec)*old); } else { alloc = ((alloc/16)*16 + 1); - buf = (osd_op_buf_t*)realloc(buf, sizeof(osd_op_buf_t) * alloc); + buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); } } - buf[count++] = { .buf = nbuf, .len = len }; - } - - inline osd_op_buf_t & operator [] (int i) - { - return buf[i]; + buf[count++] = { .iov_base = nbuf, .iov_len = len }; } }; @@ -151,10 +148,7 @@ struct osd_client_t // Write state osd_op_t *write_op = NULL; - iovec write_iov; msghdr write_msg; - void *write_buf = NULL; - int write_remaining = 0; int write_state = 0; }; diff --git a/osd_peering.cpp b/osd_peering.cpp index 57b5aee7f..8db35ad55 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -353,6 +353,7 @@ void osd_t::start_pg_peering(int pg_idx) auto & cl = clients[osd_peer_fds[role_osd]]; osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; + op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); op->peer_fd = cl.peer_fd; op->req = { .sec_list = { diff --git a/osd_primary.cpp b/osd_primary.cpp index 2d1c306ce..f973ce7f4 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -241,6 +241,7 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* else { subops[subop].op_type = OSD_OP_OUT; + subops[subop].send_list.push_back(subops[subop].req.buf, OSD_PACKET_SIZE); subops[subop].peer_fd = this->osd_peer_fds.at(role_osd_num); subops[subop].req.sec_rw = { .header = { @@ -536,6 +537,7 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) else { subops[i].op_type = OSD_OP_OUT; + subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); subops[i].peer_fd = osd_peer_fds.at(sync_osd); subops[i].req.sec_sync = { .header = { @@ -580,6 +582,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) else { subops[i].op_type = OSD_OP_OUT; + subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); subops[i].peer_fd = osd_peer_fds.at(stab_osd.osd_num); subops[i].req.sec_stab = { .header = { diff --git a/osd_send.cpp b/osd_send.cpp index e4964633b..4b2e580fb 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -29,17 +29,15 @@ void osd_t::send_replies() return; } ring_data_t* data = ((ring_data_t*)sqe->user_data); - if (!cl.write_buf) + if (!cl.write_op) { // pick next command cl.write_op = cl.outbox.front(); cl.outbox.pop_front(); + cl.write_state = CL_WRITE_REPLY; if (cl.write_op->op_type == OSD_OP_OUT) { gettimeofday(&cl.write_op->tv_send, NULL); - cl.write_buf = &cl.write_op->req.buf; - cl.write_remaining = OSD_PACKET_SIZE; - cl.write_state = CL_WRITE_REPLY; } else { @@ -51,18 +49,12 @@ void osd_t::send_replies() (tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 + tv_end.tv_usec - cl.write_op->tv_begin.tv_usec ); - cl.write_buf = &cl.write_op->reply.buf; - cl.write_remaining = OSD_PACKET_SIZE; - cl.write_state = CL_WRITE_REPLY; } } - cl.write_iov.iov_base = cl.write_buf; - cl.write_iov.iov_len = cl.write_remaining; - cl.write_msg.msg_iov = &cl.write_iov; - cl.write_msg.msg_iovlen = 1; + cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec(); + cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size(); data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); }; my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); - cl.write_state = cl.write_state | SQE_SENT; } write_ready_clients.clear(); } @@ -80,51 +72,50 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd) stop_client(peer_fd); return; } - cl.write_state = cl.write_state & ~SQE_SENT; - if (data->res > 0) + if (data->res >= 0) { - cl.write_remaining -= data->res; - cl.write_buf += data->res; - if (cl.write_remaining <= 0) + osd_op_t *cur_op = cl.write_op; + while (data->res > 0 && cur_op->send_list.sent < cur_op->send_list.count) { - cl.write_buf = NULL; - osd_op_t *cur_op = cl.write_op; - if (cur_op->send_list.sent < cur_op->send_list.count) + iovec & iov = cur_op->send_list.buf[cur_op->send_list.sent]; + if (iov.iov_len <= data->res) { - // Send data - cl.write_buf = cur_op->send_list[cur_op->send_list.sent].buf; - assert(cl.write_buf); - cl.write_remaining = cur_op->send_list[cur_op->send_list.sent].len; + data->res -= iov.iov_len; cur_op->send_list.sent++; - cl.write_state = CL_WRITE_DATA; } else { - // Done - if (cur_op->op_type == OSD_OP_IN) - { - delete cur_op; - } - else - { - // Measure subops with data - if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_STABILIZE || - cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) - { - timeval tv_end; - gettimeofday(&tv_end, NULL); - send_stat_count++; - send_stat_sum += ( - (tv_end.tv_sec - cl.write_op->tv_send.tv_sec)*1000000 + - tv_end.tv_usec - cl.write_op->tv_send.tv_usec - ); - } - cl.sent_ops[cl.write_op->req.hdr.id] = cl.write_op; - } - cl.write_op = NULL; - cl.write_state = cl.outbox.size() > 0 ? CL_WRITE_READY : 0; + iov.iov_len -= data->res; + iov.iov_base += data->res; + break; } } + if (cur_op->send_list.sent >= cur_op->send_list.count) + { + // Done + if (cur_op->op_type == OSD_OP_IN) + { + delete cur_op; + } + else + { + // Measure subops with data + if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_STABILIZE || + cur_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) + { + timeval tv_end; + gettimeofday(&tv_end, NULL); + send_stat_count++; + send_stat_sum += ( + (tv_end.tv_sec - cl.write_op->tv_send.tv_sec)*1000000 + + tv_end.tv_usec - cl.write_op->tv_send.tv_usec + ); + } + cl.sent_ops[cl.write_op->req.hdr.id] = cl.write_op; + } + cl.write_op = NULL; + cl.write_state = cl.outbox.size() > 0 ? CL_WRITE_READY : 0; + } } if (cl.write_state != 0) {