Fix memory leaks with subops, fix recovery crashes
parent
1b30120918
commit
43fe1d88e7
|
@ -40,6 +40,12 @@ struct osd_primary_op_data_t
|
||||||
|
|
||||||
void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
||||||
{
|
{
|
||||||
|
if (!cur_op->peer_fd)
|
||||||
|
{
|
||||||
|
cur_op->callback(cur_op);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
// FIXME add separate magic number
|
// FIXME add separate magic number
|
||||||
auto cl_it = clients.find(cur_op->peer_fd);
|
auto cl_it = clients.find(cur_op->peer_fd);
|
||||||
if (cl_it != clients.end())
|
if (cl_it != clients.end())
|
||||||
|
@ -48,20 +54,13 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
||||||
cur_op->reply.hdr.id = cur_op->req.hdr.id;
|
cur_op->reply.hdr.id = cur_op->req.hdr.id;
|
||||||
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
|
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
|
||||||
cur_op->reply.hdr.retval = retval;
|
cur_op->reply.hdr.retval = retval;
|
||||||
if (!cur_op->peer_fd)
|
|
||||||
{
|
|
||||||
cur_op->callback(cur_op);
|
|
||||||
delete cur_op;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
outbox_push(cl_it->second, cur_op);
|
outbox_push(cl_it->second, cur_op);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
delete cur_op;
|
delete cur_op;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
||||||
|
@ -159,7 +158,6 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
|
||||||
uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data());
|
uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data());
|
||||||
if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0)
|
if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0)
|
||||||
{
|
{
|
||||||
free(op_data);
|
|
||||||
finish_op(cur_op, -EIO);
|
finish_op(cur_op, -EIO);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -177,8 +175,6 @@ resume_1:
|
||||||
resume_2:
|
resume_2:
|
||||||
if (op_data->errors > 0)
|
if (op_data->errors > 0)
|
||||||
{
|
{
|
||||||
free(op_data);
|
|
||||||
cur_op->op_data = NULL;
|
|
||||||
finish_op(cur_op, -EIO);
|
finish_op(cur_op, -EIO);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -203,8 +199,6 @@ resume_2:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
free(op_data);
|
|
||||||
cur_op->op_data = NULL;
|
|
||||||
finish_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->req.rw.len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,6 +409,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
|
||||||
op_data->stripes[i].read_start = 0;
|
op_data->stripes[i].read_start = 0;
|
||||||
op_data->stripes[i].read_end = bs_block_size;
|
op_data->stripes[i].read_end = bs_block_size;
|
||||||
op_data->stripes[i].missing = op_data->prev_set[i] == 0;
|
op_data->stripes[i].missing = op_data->prev_set[i] == 0;
|
||||||
|
op_data->stripes[i].write_end = 0;
|
||||||
}
|
}
|
||||||
op_data->degraded = 1;
|
op_data->degraded = 1;
|
||||||
submit_primary_subops(SUBMIT_READ, pg.pg_size, op_data->prev_set, cur_op);
|
submit_primary_subops(SUBMIT_READ, pg.pg_size, op_data->prev_set, cur_op);
|
||||||
|
@ -429,6 +424,12 @@ resume_9:
|
||||||
op_data->recovery_buf + cur_op->req.rw.offset - op_data->oid.stripe,
|
op_data->recovery_buf + cur_op->req.rw.offset - op_data->oid.stripe,
|
||||||
cur_op->buf, cur_op->req.rw.len
|
cur_op->buf, cur_op->req.rw.len
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
free(cur_op->buf);
|
||||||
|
cur_op->buf = op_data->recovery_buf;
|
||||||
|
op_data->recovery_buf = NULL;
|
||||||
|
if (cur_op->req.rw.len > 0)
|
||||||
|
{
|
||||||
// Write modified parts
|
// Write modified parts
|
||||||
uint32_t start = 0, end = 0;
|
uint32_t start = 0, end = 0;
|
||||||
for (int role = 0; role < pg.pg_minsize; role++)
|
for (int role = 0; role < pg.pg_minsize; role++)
|
||||||
|
@ -439,22 +440,16 @@ resume_9:
|
||||||
end = std::max(op_data->stripes[role].req_end, end);
|
end = std::max(op_data->stripes[role].req_end, end);
|
||||||
op_data->stripes[role].write_start = op_data->stripes[role].req_start;
|
op_data->stripes[role].write_start = op_data->stripes[role].req_start;
|
||||||
op_data->stripes[role].write_end = op_data->stripes[role].req_end;
|
op_data->stripes[role].write_end = op_data->stripes[role].req_end;
|
||||||
op_data->stripes[role].write_buf = op_data->recovery_buf + role*bs_block_size + op_data->stripes[role].write_start;
|
op_data->stripes[role].write_buf = cur_op->buf + role*bs_block_size + op_data->stripes[role].write_start;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (int role = pg.pg_minsize; role < pg.pg_size; role++)
|
for (int role = pg.pg_minsize; role < pg.pg_size; role++)
|
||||||
{
|
{
|
||||||
op_data->stripes[role].write_start = start;
|
op_data->stripes[role].write_start = start;
|
||||||
op_data->stripes[role].write_end = end;
|
op_data->stripes[role].write_end = end;
|
||||||
op_data->stripes[role].write_buf = op_data->recovery_buf + role*bs_block_size + op_data->stripes[role].write_start;
|
op_data->stripes[role].write_buf = cur_op->buf + role*bs_block_size + op_data->stripes[role].write_start;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (cur_op->buf)
|
|
||||||
{
|
|
||||||
free(cur_op->buf);
|
|
||||||
}
|
|
||||||
cur_op->buf = op_data->recovery_buf;
|
|
||||||
op_data->recovery_buf = NULL;
|
|
||||||
// Also write recovered parts
|
// Also write recovered parts
|
||||||
uint64_t *cur_set = pg.cur_set.data();
|
uint64_t *cur_set = pg.cur_set.data();
|
||||||
for (int role = 0; role < pg.pg_size; role++)
|
for (int role = 0; role < pg.pg_size; role++)
|
||||||
|
@ -463,7 +458,7 @@ resume_9:
|
||||||
{
|
{
|
||||||
op_data->stripes[role].write_start = 0;
|
op_data->stripes[role].write_start = 0;
|
||||||
op_data->stripes[role].write_end = bs_block_size;
|
op_data->stripes[role].write_end = bs_block_size;
|
||||||
op_data->stripes[role].write_buf = op_data->recovery_buf + role*bs_block_size;
|
op_data->stripes[role].write_buf = cur_op->buf + role*bs_block_size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pg.ver_override[op_data->oid] = op_data->fact_ver;
|
pg.ver_override[op_data->oid] = op_data->fact_ver;
|
||||||
|
@ -554,16 +549,17 @@ resume_7:
|
||||||
this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num);
|
this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num);
|
||||||
}
|
}
|
||||||
// Remove version override
|
// Remove version override
|
||||||
pg.ver_override.erase(op_data->oid);
|
object_id oid = op_data->oid;
|
||||||
|
pg.ver_override.erase(oid);
|
||||||
finish_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->req.rw.len);
|
||||||
// Continue other write operations to the same object
|
// Continue other write operations to the same object
|
||||||
{
|
{
|
||||||
auto next_it = pg.write_queue.find(op_data->oid);
|
auto next_it = pg.write_queue.find(oid);
|
||||||
auto this_it = next_it;
|
auto this_it = next_it;
|
||||||
next_it++;
|
next_it++;
|
||||||
pg.write_queue.erase(this_it);
|
pg.write_queue.erase(this_it);
|
||||||
if (next_it != pg.write_queue.end() &&
|
if (next_it != pg.write_queue.end() &&
|
||||||
next_it->first == op_data->oid)
|
next_it->first == oid)
|
||||||
{
|
{
|
||||||
osd_op_t *next_op = next_it->second;
|
osd_op_t *next_op = next_it->second;
|
||||||
continue_primary_write(next_op);
|
continue_primary_write(next_op);
|
||||||
|
|
|
@ -133,6 +133,7 @@ void osd_t::handle_finished_read(osd_client_t & cl)
|
||||||
osd_op_t *request = req_it->second;
|
osd_op_t *request = req_it->second;
|
||||||
cl.sent_ops.erase(req_it);
|
cl.sent_ops.erase(req_it);
|
||||||
cl.read_reply_id = 0;
|
cl.read_reply_id = 0;
|
||||||
|
delete cl.read_op;
|
||||||
cl.read_op = NULL;
|
cl.read_op = NULL;
|
||||||
cl.read_state = 0;
|
cl.read_state = 0;
|
||||||
// Measure subop latency
|
// Measure subop latency
|
||||||
|
@ -145,6 +146,10 @@ void osd_t::handle_finished_read(osd_client_t & cl)
|
||||||
);
|
);
|
||||||
request->callback(request);
|
request->callback(request);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::handle_op_hdr(osd_client_t *cl)
|
void osd_t::handle_op_hdr(osd_client_t *cl)
|
||||||
|
@ -230,6 +235,7 @@ void osd_t::handle_reply_hdr(osd_client_t *cl)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
delete cl->read_op;
|
||||||
cl->read_state = 0;
|
cl->read_state = 0;
|
||||||
cl->read_op = NULL;
|
cl->read_op = NULL;
|
||||||
cl->sent_ops.erase(req_it);
|
cl->sent_ops.erase(req_it);
|
||||||
|
|
Loading…
Reference in New Issue