Fix bugs in cluster_client

Vitaliy Filippov 2020-06-15 13:22:20 +03:00
parent 64afec03ec
commit 27ee14a4e6
3 changed files with 99 additions and 65 deletions

View File

@ -5,6 +5,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
this->ringloop = ringloop; this->ringloop = ringloop;
this->tfd = tfd; this->tfd = tfd;
msgr.osd_num = 0;
msgr.tfd = tfd; msgr.tfd = tfd;
msgr.ringloop = ringloop; msgr.ringloop = ringloop;
msgr.repeer_pgs = [this](osd_num_t peer_osd) msgr.repeer_pgs = [this](osd_num_t peer_osd)
@ -48,6 +49,14 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
log_level = config["log_level"].int64_value(); log_level = config["log_level"].int64_value();
st_cli.parse_config(config); st_cli.parse_config(config);
st_cli.load_global_config(); st_cli.load_global_config();
consumer.loop = [this]()
{
msgr.read_requests();
msgr.send_replies();
this->ringloop->submit();
};
ringloop->register_consumer(&consumer);
} }
void cluster_client_t::continue_ops() void cluster_client_t::continue_ops()
@ -108,10 +117,10 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
if (config.find("pg_stripe_size") != config.end()) if (config.find("pg_stripe_size") != config.end())
{ {
pg_stripe_size = config["pg_stripe_size"].uint64_value(); pg_stripe_size = config["pg_stripe_size"].uint64_value();
if (!pg_stripe_size) }
{ if (!pg_stripe_size)
pg_stripe_size = DEFAULT_PG_STRIPE_SIZE; {
} pg_stripe_size = DEFAULT_PG_STRIPE_SIZE;
} }
if (config["immediate_commit"] == "all") if (config["immediate_commit"] == "all")
{ {
@ -121,10 +130,10 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
else if (config.find("client_dirty_limit") != config.end()) else if (config.find("client_dirty_limit") != config.end())
{ {
client_dirty_limit = config["client_dirty_limit"].uint64_value(); client_dirty_limit = config["client_dirty_limit"].uint64_value();
if (!client_dirty_limit) }
{ if (!client_dirty_limit)
client_dirty_limit = DEFAULT_CLIENT_DIRTY_LIMIT; {
} client_dirty_limit = DEFAULT_CLIENT_DIRTY_LIMIT;
} }
msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value(); msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value();
if (!msgr.peer_connect_interval) if (!msgr.peer_connect_interval)
@ -136,6 +145,7 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
{ {
msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
} }
st_cli.load_pgs();
} }
void cluster_client_t::on_load_pgs_hook(bool success) void cluster_client_t::on_load_pgs_hook(bool success)
@ -143,11 +153,14 @@ void cluster_client_t::on_load_pgs_hook(bool success)
if (success) if (success)
{ {
pg_count = st_cli.pg_config.size(); pg_count = st_cli.pg_config.size();
for (auto op: offline_ops) if (pg_count)
{ {
execute(op); for (auto op: offline_ops)
{
execute(op);
}
offline_ops.clear();
} }
offline_ops.clear();
} }
} }
@ -189,24 +202,27 @@ void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd)
/** /**
* How writes are synced when immediate_commit is false * How writes are synced when immediate_commit is false
* *
* WRITE: * 1) accept up to <client_dirty_limit> write operations for execution,
* 1) copy op to the queue * queue all subsequent writes into <next_writes>
* 2) slice op * 2) accept exactly one SYNC, queue all subsequent SYNCs into <next_writes>, too
* 3) connect & send all parts * 3) "continue" all accepted writes
* 4) if any of them fail due to disconnected peers, repeat after reconnecting *
* 5) if any of them fail due to other errors, fail the operation * "Continue" WRITE:
* 1) if the operation is not a copy yet - copy it (required for replay)
* 2) if the operation is not sliced yet - slice it
* 3) if the operation doesn't require reslice - try to connect & send all remaining parts
* 4) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout
* 5) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch"
* 6) if PG count changes before all parts are done, wait for all in-progress parts to finish, * 6) if PG count changes before all parts are done, wait for all in-progress parts to finish,
* throw all results away, reslice and resubmit op * throw all results away, reslice and resubmit op
* 7) when all parts are done, try to continue SYNCs * 7) when all parts are done, try to "continue" the current SYNC
* 8) if the operation succeeds, but then some OSDs drop their connections, repeat
* parts from the current "unsynced batch" previously sent to those OSDs in any order
* *
* SYNC: * "Continue" current SYNC:
* 1) add SYNC to the queue after writes * 1) take all unsynced operations from the current batch
* 2) try to continue SYNCs * 2) check if all affected OSDs are still alive
* * 3) if yes, send all SYNCs. otherwise, leave current SYNC as is.
* Continue SYNCs:
* 1) check the queue for a SYNC ready for submission. if there is one:
* 2) slice it
* 3) connect & send all SYNC parts
* 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes * 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes
* 5) if any of them fail due to other errors, fail the SYNC operation * 5) if any of them fail due to other errors, fail the SYNC operation
*/ */
@ -214,6 +230,12 @@ void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd)
void cluster_client_t::execute(cluster_op_t *op) void cluster_client_t::execute(cluster_op_t *op)
{ {
op->retval = 0; op->retval = 0;
if (!pg_count)
{
// Config is not loaded yet, retry after connecting to etcd
offline_ops.push_back(op);
return;
}
if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE || if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE ||
(op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len || (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len ||
op->offset % bs_disk_alignment || op->len % bs_disk_alignment)) op->offset % bs_disk_alignment || op->len % bs_disk_alignment))
@ -222,12 +244,6 @@ void cluster_client_t::execute(cluster_op_t *op)
std::function<void(cluster_op_t*)>(op->callback)(op); std::function<void(cluster_op_t*)>(op->callback)(op);
return; return;
} }
if (!pg_count)
{
// Config is not loaded yet, retry after connecting to etcd
offline_ops.push_back(op);
return;
}
if (op->opcode == OSD_OP_SYNC) if (op->opcode == OSD_OP_SYNC)
{ {
execute_sync(op); execute_sync(op);
@ -319,6 +335,7 @@ void cluster_client_t::continue_rw(cluster_op_t *op)
op->retval = op->len; op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op); std::function<void(cluster_op_t*)>(op->callback)(op);
continue_sync(); continue_sync();
return;
} }
else if (op->retval != 0 && op->retval != -EPIPE) else if (op->retval != 0 && op->retval != -EPIPE)
{ {
@ -339,18 +356,25 @@ void cluster_client_t::continue_rw(cluster_op_t *op)
std::function<void(cluster_op_t*)>(op->callback)(op); std::function<void(cluster_op_t*)>(op->callback)(op);
if (del) if (del)
{ {
if (op->buf)
free(op->buf);
delete op; delete op;
} }
continue_sync(); continue_sync();
return;
} }
else if (op->needs_reslice) else
{ {
op->parts.clear(); // -EPIPE or no error - clear the error
op->done_count = 0; op->retval = 0;
op->needs_reslice = false; if (op->needs_reslice)
continue_rw(op); {
op->parts.clear();
op->done_count = 0;
op->needs_reslice = false;
continue_rw(op);
}
} }
// else -EPIPE
} }
} }
@ -361,34 +385,25 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
uint64_t pg_block_size = bs_block_size * pg_part_count; uint64_t pg_block_size = bs_block_size * pg_part_count;
uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size; uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size;
uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size; uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
int part_count = 0;
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
{
if (op->offset < (stripe+pg_block_size) && (op->offset+op->len) > stripe)
{
part_count++;
}
}
op->retval = 0; op->retval = 0;
op->parts.resize(part_count); op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1);
int i = 0; int i = 0;
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
{ {
uint64_t stripe_end = stripe + pg_block_size; pg_num_t pg_num = (op->inode + stripe/pg_stripe_size) % pg_count + 1;
if (op->offset < stripe_end && (op->offset+op->len) > stripe) uint64_t begin = (op->offset < stripe ? stripe : op->offset);
{ uint64_t end = (op->offset + op->len) > (stripe + pg_block_size)
pg_num_t pg_num = (op->inode + stripe/pg_stripe_size) % pg_count + 1; ? (stripe + pg_block_size) : (op->offset + op->len);
op->parts[i] = { op->parts[i] = {
.parent = op, .parent = op,
.offset = op->offset < stripe ? stripe : op->offset, .offset = begin,
.len = (uint32_t)((op->offset+op->len) > stripe_end ? pg_block_size : op->offset+op->len-stripe), .len = (uint32_t)(end - begin),
.pg_num = pg_num, .pg_num = pg_num,
.buf = op->buf + (op->offset < stripe ? stripe-op->offset : 0), .buf = op->buf + begin - op->offset,
.sent = false, .sent = false,
.done = false, .done = false,
}; };
i++; i++;
}
} }
} }
@ -539,6 +554,8 @@ void cluster_client_t::finish_sync()
{ {
if (op->is_internal) if (op->is_internal)
{ {
if (op->buf)
free(op->buf);
delete op; delete op;
} }
} }
@ -548,7 +565,15 @@ void cluster_client_t::finish_sync()
int i; int i;
for (i = 0; i < next_writes.size() && !cur_sync; i++) for (i = 0; i < next_writes.size() && !cur_sync; i++)
{ {
execute(next_writes[i]); if (next_writes[i]->opcode == OSD_OP_SYNC)
{
execute_sync(next_writes[i]);
}
else
{
cur_ops.insert(next_writes[i]);
continue_rw(next_writes[i]);
}
} }
next_writes.erase(next_writes.begin(), next_writes.begin()+i); next_writes.erase(next_writes.begin(), next_writes.begin()+i);
} }

View File

@ -64,6 +64,7 @@ class cluster_client_t
uint64_t op_id = 1; uint64_t op_id = 1;
etcd_state_client_t st_cli; etcd_state_client_t st_cli;
osd_messenger_t msgr; osd_messenger_t msgr;
ring_consumer_t consumer;
// operations currently in progress // operations currently in progress
std::set<cluster_op_t*> cur_ops; std::set<cluster_op_t*> cur_ops;
int retry_timeout_id = 0; int retry_timeout_id = 0;

View File

@ -211,8 +211,16 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
if (opt->trace) if (opt->trace)
{ {
printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" : if (io->ddir == DDIR_SYNC)
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n); {
printf("+++ SYNC # %d\n", n);
}
else
{
printf("+++ %s # %d 0x%llx+%llx\n",
io->ddir == DDIR_READ ? "READ" : "WRITE",
n, io->offset, io->xfer_buflen);
}
} }
io->error = 0; io->error = 0;