Browse Source

Implement write batching

Also fix possible race condition which could in theory lead to "command out of sync"
and a buffer overflow that could happen on incorrect server response.
Vitaliy Filippov 11 months ago
parent
commit
d1645551d4
  1. 9
      messenger.cpp
  2. 14
      messenger.h
  3. 8
      msgr_receive.cpp
  4. 170
      msgr_send.cpp

9
messenger.cpp

@ -288,16 +288,7 @@ void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
cancel_op(p.second);
}
cl->sent_ops.clear();
for (auto op: cl->outbox)
{
cancel_op(op);
}
cl->outbox.clear();
if (cl->write_op)
{
cancel_op(cl->write_op);
cl->write_op = NULL;
}
}
void osd_messenger_t::cancel_op(osd_op_t *op)

14
messenger.h

@ -205,8 +205,8 @@ struct osd_client_t
// Read state
int read_ready = 0;
osd_op_t *read_op = NULL;
iovec read_iov;
msghdr read_msg;
iovec read_iov = { 0 };
msghdr read_msg = { 0 };
int read_remaining = 0;
int read_state = 0;
osd_op_buf_list_t recv_list;
@ -215,17 +215,16 @@ struct osd_client_t
std::vector<osd_op_t*> received_ops;
// Outbound operations
std::deque<osd_op_t*> outbox;
std::map<int, osd_op_t*> sent_ops;
std::map<uint64_t, osd_op_t*> sent_ops;
// PGs dirtied by this client's primary-writes
std::set<pool_pg_num_t> dirty_pgs;
// Write state
osd_op_t *write_op = NULL;
msghdr write_msg;
msghdr write_msg = { 0 };
int write_state = 0;
osd_op_buf_list_t send_list;
std::vector<iovec> send_list, next_send_list;
std::vector<osd_op_t*> outbox, next_outbox;
};
struct osd_wanted_peer_t
@ -296,6 +295,7 @@ protected:
void cancel_op(osd_op_t *op);
bool try_send(osd_client_t *cl);
void measure_exec(osd_op_t *cur_op);
void handle_send(int result, osd_client_t *cl);
bool handle_read(int result, osd_client_t *cl);

8
msgr_receive.cpp

@ -252,6 +252,14 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
{
// Read data. In this case we assume that the buffer is preallocated by the caller (!)
assert(op->iov.count > 0);
if (op->reply.hdr.retval != (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->req.sec_rw.len : op->req.rw.len))
{
// Check reply length to not overflow the buffer
printf("Client %d read reply of different length\n", cl->peer_fd);
cl->sent_ops[op->req.hdr.id] = op;
stop_client(cl->peer_fd);
return false;
}
cl->recv_list.append(op->iov);
delete cl->read_op;
cl->read_op = op;

170
msgr_send.cpp

@ -14,6 +14,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
else
{
// Check that operation actually belongs to this client
// FIXME: Review if this is still needed
bool found = false;
for (auto it = cl->received_ops.begin(); it != cl->received_ops.end(); it++)
{
@ -30,15 +31,50 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
return;
}
}
cl->outbox.push_back(cur_op);
auto & to_send_list = cl->write_msg.msg_iovlen ? cl->next_send_list : cl->send_list;
auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox;
if (cur_op->op_type == OSD_OP_IN)
{
measure_exec(cur_op);
to_send_list.push_back((iovec){ .iov_base = cur_op->reply.buf, .iov_len = OSD_PACKET_SIZE });
}
else
{
to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE });
cl->sent_ops[cur_op->req.hdr.id] = cur_op;
}
// Pre-defined send_lists
if ((cur_op->op_type == OSD_OP_IN
? (cur_op->req.hdr.opcode == OSD_OP_READ ||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cur_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
: (cur_op->req.hdr.opcode == OSD_OP_WRITE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)) && cur_op->iov.count > 0)
{
to_outbox.push_back(NULL);
for (int i = 0; i < cur_op->iov.count; i++)
{
to_send_list.push_back(cur_op->iov.buf[i]);
to_outbox.push_back(i == cur_op->iov.count-1 ? cur_op : NULL);
}
}
else
{
to_outbox.push_back(cur_op);
}
if (!ringloop)
{
while (cl->write_op || cl->outbox.size())
// FIXME: It's worse because it doesn't allow batching
while (cl->outbox.size())
{
try_send(cl);
}
}
else if (cl->write_op || cl->outbox.size() > 1 || !try_send(cl))
else if (cl->write_msg.msg_iovlen > 0 || !try_send(cl))
{
if (cl->write_state == 0)
{
@ -49,66 +85,44 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
}
}
void osd_messenger_t::measure_exec(osd_op_t *cur_op)
{
// Measure execution latency
timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end);
stats.op_stat_count[cur_op->req.hdr.opcode]++;
if (!stats.op_stat_count[cur_op->req.hdr.opcode])
{
stats.op_stat_count[cur_op->req.hdr.opcode]++;
stats.op_stat_sum[cur_op->req.hdr.opcode] = 0;
stats.op_stat_bytes[cur_op->req.hdr.opcode] = 0;
}
stats.op_stat_sum[cur_op->req.hdr.opcode] += (
(tv_end.tv_sec - cur_op->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - cur_op->tv_begin.tv_nsec)/1000
);
if (cur_op->req.hdr.opcode == OSD_OP_READ ||
cur_op->req.hdr.opcode == OSD_OP_WRITE)
{
stats.op_stat_bytes[cur_op->req.hdr.opcode] += cur_op->req.rw.len;
}
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
{
stats.op_stat_bytes[cur_op->req.hdr.opcode] += cur_op->req.sec_rw.len;
}
}
bool osd_messenger_t::try_send(osd_client_t *cl)
{
int peer_fd = cl->peer_fd;
if (!cl->write_op)
if (!cl->send_list.size() || cl->write_msg.msg_iovlen > 0)
{
// pick next command
cl->write_op = cl->outbox.front();
cl->outbox.pop_front();
cl->write_state = CL_WRITE_REPLY;
if (cl->write_op->op_type == OSD_OP_IN)
{
// Measure execution latency
timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end);
stats.op_stat_count[cl->write_op->req.hdr.opcode]++;
if (!stats.op_stat_count[cl->write_op->req.hdr.opcode])
{
stats.op_stat_count[cl->write_op->req.hdr.opcode]++;
stats.op_stat_sum[cl->write_op->req.hdr.opcode] = 0;
stats.op_stat_bytes[cl->write_op->req.hdr.opcode] = 0;
}
stats.op_stat_sum[cl->write_op->req.hdr.opcode] += (
(tv_end.tv_sec - cl->write_op->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - cl->write_op->tv_begin.tv_nsec)/1000
);
if (cl->write_op->req.hdr.opcode == OSD_OP_READ ||
cl->write_op->req.hdr.opcode == OSD_OP_WRITE)
{
stats.op_stat_bytes[cl->write_op->req.hdr.opcode] += cl->write_op->req.rw.len;
}
else if (cl->write_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
{
stats.op_stat_bytes[cl->write_op->req.hdr.opcode] += cl->write_op->req.sec_rw.len;
}
cl->send_list.push_back(cl->write_op->reply.buf, OSD_PACKET_SIZE);
if (cl->write_op->req.hdr.opcode == OSD_OP_READ ||
cl->write_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cl->write_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
cl->write_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
{
cl->send_list.append(cl->write_op->iov);
}
}
else
{
cl->send_list.push_back(cl->write_op->req.buf, OSD_PACKET_SIZE);
if (cl->write_op->req.hdr.opcode == OSD_OP_WRITE ||
cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
cl->write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE ||
cl->write_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
cl->write_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)
{
cl->send_list.append(cl->write_op->iov);
}
}
return true;
}
cl->write_msg.msg_iov = cl->send_list.get_iovec();
cl->write_msg.msg_iovlen = cl->send_list.get_size();
cl->write_msg.msg_iov = cl->send_list.data();
cl->write_msg.msg_iovlen = cl->send_list.size();
cl->refs++;
if (ringloop && !use_sync_send_recv)
{
@ -149,6 +163,7 @@ void osd_messenger_t::send_replies()
void osd_messenger_t::handle_send(int result, osd_client_t *cl)
{
cl->write_msg.msg_iovlen = 0;
cl->refs--;
if (cl->peer_state == PEER_STOPPED)
{
@ -167,22 +182,43 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
}
if (result >= 0)
{
cl->send_list.eat(result);
if (cl->send_list.done >= cl->send_list.count)
int done = 0;
while (result > 0 && done < cl->send_list.size())
{
// Done
cl->send_list.reset();
if (cl->write_op->op_type == OSD_OP_IN)
iovec & iov = cl->send_list[done];
if (iov.iov_len <= result)
{
delete cl->write_op;
if (cl->outbox[done])
{
// Operation fully sent
if (cl->outbox[done]->op_type == OSD_OP_IN)
{
delete cl->outbox[done];
}
}
result -= iov.iov_len;
done++;
}
else
{
cl->sent_ops[cl->write_op->req.hdr.id] = cl->write_op;
iov.iov_len -= result;
iov.iov_base += result;
break;
}
cl->write_op = NULL;
cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
}
if (done > 0)
{
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+done);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+done);
}
if (cl->next_send_list.size())
{
cl->send_list.insert(cl->send_list.end(), cl->next_send_list.begin(), cl->next_send_list.end());
cl->outbox.insert(cl->outbox.end(), cl->next_outbox.begin(), cl->next_outbox.end());
cl->next_send_list.clear();
cl->next_outbox.clear();
}
cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
}
if (cl->write_state != 0)
{

Loading…
Cancel
Save