Incoming data pre-buffering

trace-sqes
Vitaliy Filippov 2020-03-02 02:58:00 +03:00
parent b27ad550cf
commit 8315407558
4 changed files with 89 additions and 46 deletions

View File

@ -220,6 +220,7 @@ restart:
.peer_port = ntohs(addr.sin_port),
.peer_fd = peer_fd,
.peer_state = PEER_CONNECTED,
.in_buf = malloc(receive_buffer_size),
};
// Add FD to epoll
epoll_event ev;
@ -344,6 +345,7 @@ void osd_t::stop_client(int peer_fd)
break;
}
}
free(cl.in_buf);
clients.erase(it);
close(peer_fd);
}

6
osd.h
View File

@ -22,7 +22,7 @@
#define OSD_OP_IN 0
#define OSD_OP_OUT 1
#define CL_READ_OP 1
#define CL_READ_HDR 1
#define CL_READ_DATA 2
#define CL_READ_REPLY_DATA 3
#define CL_WRITE_READY 1
@ -125,6 +125,8 @@ struct osd_client_t
std::function<void(osd_num_t, int)> connect_callback;
osd_num_t osd_num = 0;
void *in_buf = NULL;
// Read state
int read_ready = 0;
osd_op_t *read_op = NULL;
@ -170,6 +172,7 @@ class osd_t
int bind_port, listen_backlog;
int client_queue_depth = 128;
bool allow_test_ops = true;
int receive_buffer_size = 9000;
// peer OSDs
@ -215,6 +218,7 @@ class osd_t
void handle_epoll_events();
void read_requests();
void handle_read(ring_data_t *data, int peer_fd);
void handle_finished_read(osd_client_t & cl);
void handle_op_hdr(osd_client_t *cl);
void handle_reply_hdr(osd_client_t *cl);
bool try_send(osd_client_t & cl);

View File

@ -84,6 +84,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port
.peer_state = PEER_CONNECTING,
.connect_callback = callback,
.osd_num = osd_num,
.in_buf = malloc(receive_buffer_size),
};
osd_peer_fds[osd_num] = peer_fd;
// Add FD to epoll (EPOLLOUT for tracking connect() result)

View File

@ -13,22 +13,16 @@ void osd_t::read_requests()
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.read_buf)
if (!cl.read_op || cl.read_remaining < receive_buffer_size)
{
// no reads in progress
// so this is either a new command or a reply to a previously sent command
if (!cl.read_op)
{
cl.read_op = new osd_op_t;
cl.read_op->peer_fd = peer_fd;
}
cl.read_op->op_type = OSD_OP_IN;
cl.read_buf = &cl.read_op->req.buf;
cl.read_remaining = OSD_PACKET_SIZE;
cl.read_state = CL_READ_OP;
cl.read_iov.iov_base = cl.in_buf;
cl.read_iov.iov_len = receive_buffer_size;
}
else
{
cl.read_iov.iov_base = cl.read_buf;
cl.read_iov.iov_len = cl.read_remaining;
}
cl.read_iov.iov_base = cl.read_buf;
cl.read_iov.iov_len = cl.read_remaining;
cl.read_msg.msg_iov = &cl.read_iov;
cl.read_msg.msg_iovlen = 1;
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); };
@ -60,52 +54,93 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
read_ready_clients.push_back(peer_fd);
if (data->res > 0)
{
cl.read_remaining -= data->res;
cl.read_buf += data->res;
if (cl.read_remaining <= 0)
if (cl.read_iov.iov_base == cl.in_buf)
{
cl.read_buf = NULL;
if (cl.read_state == CL_READ_OP)
// Compose operation(s) from the buffer
int remain = data->res;
void *curbuf = cl.in_buf;
while (remain > 0)
{
if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
if (!cl.read_op)
{
handle_reply_hdr(&cl);
cl.read_op = new osd_op_t;
cl.read_op->peer_fd = peer_fd;
cl.read_op->op_type = OSD_OP_IN;
cl.read_buf = cl.read_op->req.buf;
cl.read_remaining = OSD_PACKET_SIZE;
cl.read_state = CL_READ_HDR;
}
if (cl.read_remaining > remain)
{
memcpy(cl.read_buf, curbuf, remain);
cl.read_remaining -= remain;
cl.read_buf += remain;
remain = 0;
if (cl.read_remaining <= 0)
handle_finished_read(cl);
}
else
{
handle_op_hdr(&cl);
memcpy(cl.read_buf, curbuf, cl.read_remaining);
curbuf += cl.read_remaining;
remain -= cl.read_remaining;
cl.read_remaining = 0;
cl.read_buf = NULL;
handle_finished_read(cl);
}
}
else if (cl.read_state == CL_READ_DATA)
}
else
{
// Long data
cl.read_remaining -= data->res;
cl.read_buf += data->res;
if (cl.read_remaining <= 0)
{
// Operation is ready
exec_op(cl.read_op);
cl.read_op = NULL;
cl.read_state = 0;
}
else if (cl.read_state == CL_READ_REPLY_DATA)
{
// Reply is ready
auto req_it = cl.sent_ops.find(cl.read_reply_id);
osd_op_t *request = req_it->second;
cl.sent_ops.erase(req_it);
cl.read_reply_id = 0;
cl.read_state = 0;
// Measure subop latency
timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end);
subop_stat_count[request->req.hdr.opcode]++;
subop_stat_sum[request->req.hdr.opcode] += (
(tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000
);
request->callback(request);
handle_finished_read(cl);
}
}
}
}
}
void osd_t::handle_finished_read(osd_client_t & cl)
{
if (cl.read_state == CL_READ_HDR)
{
if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
handle_reply_hdr(&cl);
else
handle_op_hdr(&cl);
}
else if (cl.read_state == CL_READ_DATA)
{
// Operation is ready
exec_op(cl.read_op);
cl.read_op = NULL;
cl.read_state = 0;
}
else if (cl.read_state == CL_READ_REPLY_DATA)
{
// Reply is ready
auto req_it = cl.sent_ops.find(cl.read_reply_id);
osd_op_t *request = req_it->second;
cl.sent_ops.erase(req_it);
cl.read_reply_id = 0;
cl.read_op = NULL;
cl.read_state = 0;
// Measure subop latency
timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end);
subop_stat_count[request->req.hdr.opcode]++;
subop_stat_sum[request->req.hdr.opcode] += (
(tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000
);
request->callback(request);
}
}
void osd_t::handle_op_hdr(osd_client_t *cl)
{
osd_op_t *cur_op = cl->read_op;
@ -190,6 +225,7 @@ void osd_t::handle_reply_hdr(osd_client_t *cl)
else
{
cl->read_state = 0;
cl->read_op = NULL;
cl->sent_ops.erase(req_it);
// Measure subop latency
timespec tv_end;