Measure & report op bandwidth, include local blockstore ops in stats
parent
2b854948f9
commit
f71d0c117b
8
osd.cpp
8
osd.cpp
|
@ -525,9 +525,15 @@ void osd_t::print_stats()
|
||||||
if (op_stat_count[0][i] != op_stat_count[1][i])
|
if (op_stat_count[0][i] != op_stat_count[1][i])
|
||||||
{
|
{
|
||||||
uint64_t avg = (op_stat_sum[0][i] - op_stat_sum[1][i])/(op_stat_count[0][i] - op_stat_count[1][i]);
|
uint64_t avg = (op_stat_sum[0][i] - op_stat_sum[1][i])/(op_stat_count[0][i] - op_stat_count[1][i]);
|
||||||
printf("avg latency for op %d (%s): %ld us\n", i, osd_op_names[i], avg);
|
uint64_t bw = (op_stat_bytes[0][i] - op_stat_bytes[1][i]) / print_stats_interval;
|
||||||
|
printf(
|
||||||
|
"avg latency for op %d (%s): %lu us, B/W: %.2f %s\n", i, osd_op_names[i], avg,
|
||||||
|
(bw > 1024*1024*1024 ? bw/1024.0/1024/1024 : (bw > 1024*1024 ? bw/1024.0/1024 : bw/1024.0)),
|
||||||
|
(bw > 1024*1024*1024 ? "GB/s" : (bw > 1024*1024 ? "MB/s" : "KB/s"))
|
||||||
|
);
|
||||||
op_stat_count[1][i] = op_stat_count[0][i];
|
op_stat_count[1][i] = op_stat_count[0][i];
|
||||||
op_stat_sum[1][i] = op_stat_sum[0][i];
|
op_stat_sum[1][i] = op_stat_sum[0][i];
|
||||||
|
op_stat_bytes[1][i] = op_stat_bytes[0][i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||||
|
|
3
osd.h
3
osd.h
|
@ -285,6 +285,7 @@ class osd_t
|
||||||
// op statistics
|
// op statistics
|
||||||
uint64_t op_stat_sum[2][OSD_OP_MAX+1] = { 0 };
|
uint64_t op_stat_sum[2][OSD_OP_MAX+1] = { 0 };
|
||||||
uint64_t op_stat_count[2][OSD_OP_MAX+1] = { 0 };
|
uint64_t op_stat_count[2][OSD_OP_MAX+1] = { 0 };
|
||||||
|
uint64_t op_stat_bytes[2][OSD_OP_MAX+1] = { 0 };
|
||||||
uint64_t subop_stat_sum[2][OSD_OP_MAX+1] = { 0 };
|
uint64_t subop_stat_sum[2][OSD_OP_MAX+1] = { 0 };
|
||||||
uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 };
|
uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 };
|
||||||
|
|
||||||
|
@ -380,6 +381,8 @@ class osd_t
|
||||||
void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg);
|
void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg);
|
||||||
bool finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state);
|
bool finalize_primary_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state);
|
||||||
void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version);
|
void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version);
|
||||||
|
void handle_primary_bs_subop(osd_op_t *subop);
|
||||||
|
void add_bs_subop_stats(osd_op_t *subop);
|
||||||
void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval);
|
void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval);
|
||||||
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||||
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set);
|
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set);
|
||||||
|
|
|
@ -152,12 +152,15 @@ json11::Json osd_t::get_statistics()
|
||||||
st["size"] = bs->get_block_count() * bs->get_block_size();
|
st["size"] = bs->get_block_count() * bs->get_block_size();
|
||||||
st["free"] = bs->get_free_block_count() * bs->get_block_size();
|
st["free"] = bs->get_free_block_count() * bs->get_block_size();
|
||||||
}
|
}
|
||||||
|
// FIXME: report recovery ops and bandwidth
|
||||||
|
// FIXME: handle integer overflow
|
||||||
json11::Json::object op_stats, subop_stats;
|
json11::Json::object op_stats, subop_stats;
|
||||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||||
{
|
{
|
||||||
op_stats[osd_op_names[i]] = json11::Json::object {
|
op_stats[osd_op_names[i]] = json11::Json::object {
|
||||||
{ "count", op_stat_count[0][i] },
|
{ "count", op_stat_count[0][i] },
|
||||||
{ "sum", op_stat_sum[0][i] },
|
{ "sum", op_stat_sum[0][i] },
|
||||||
|
{ "bytes", op_stat_bytes[0][i] },
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
for (int i = 0; i <= OSD_OP_MAX; i++)
|
for (int i = 0; i <= OSD_OP_MAX; i++)
|
||||||
|
@ -297,6 +300,7 @@ void osd_t::start_etcd_watcher()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
// FIXME apply config changes in runtime
|
||||||
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
|
||||||
{ "create_request", json11::Json::object {
|
{ "create_request", json11::Json::object {
|
||||||
{ "key", base64_encode(etcd_prefix+"/config/") },
|
{ "key", base64_encode(etcd_prefix+"/config/") },
|
||||||
|
@ -334,7 +338,7 @@ void osd_t::start_etcd_watcher()
|
||||||
void osd_t::load_global_config()
|
void osd_t::load_global_config()
|
||||||
{
|
{
|
||||||
etcd_call("/kv/range", json11::Json::object {
|
etcd_call("/kv/range", json11::Json::object {
|
||||||
{ "key", base64_encode(etcd_prefix+"/config/osd/all") }
|
{ "key", base64_encode(etcd_prefix+"/config/global") }
|
||||||
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
}, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
|
||||||
{
|
{
|
||||||
if (err != "")
|
if (err != "")
|
||||||
|
|
|
@ -152,10 +152,12 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback
|
||||||
if (peer_osd == this->osd_num)
|
if (peer_osd == this->osd_num)
|
||||||
{
|
{
|
||||||
// local
|
// local
|
||||||
|
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
|
||||||
op->bs_op = new blockstore_op_t({
|
op->bs_op = new blockstore_op_t({
|
||||||
.opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE),
|
.opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE),
|
||||||
.callback = [this, op, pg_num, fb](blockstore_op_t *bs_op)
|
.callback = [this, op, pg_num, fb](blockstore_op_t *bs_op)
|
||||||
{
|
{
|
||||||
|
add_bs_subop_stats(op);
|
||||||
handle_flush_op(pg_num, fb, this->osd_num, bs_op->retval);
|
handle_flush_op(pg_num, fb, this->osd_num, bs_op->retval);
|
||||||
delete op;
|
delete op;
|
||||||
},
|
},
|
||||||
|
|
|
@ -412,6 +412,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
|
||||||
osd_op_t *op = new osd_op_t();
|
osd_op_t *op = new osd_op_t();
|
||||||
op->op_type = 0;
|
op->op_type = 0;
|
||||||
op->peer_fd = 0;
|
op->peer_fd = 0;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
|
||||||
op->bs_op = new blockstore_op_t();
|
op->bs_op = new blockstore_op_t();
|
||||||
op->bs_op->opcode = BS_OP_SYNC;
|
op->bs_op->opcode = BS_OP_SYNC;
|
||||||
op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
|
op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
|
||||||
|
@ -422,6 +423,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
|
||||||
force_stop(1);
|
force_stop(1);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
add_bs_subop_stats(op);
|
||||||
delete op;
|
delete op;
|
||||||
ps->list_ops.erase(role_osd);
|
ps->list_ops.erase(role_osd);
|
||||||
submit_list_subop(role_osd, ps);
|
submit_list_subop(role_osd, ps);
|
||||||
|
@ -474,17 +476,19 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
|
||||||
osd_op_t *op = new osd_op_t();
|
osd_op_t *op = new osd_op_t();
|
||||||
op->op_type = 0;
|
op->op_type = 0;
|
||||||
op->peer_fd = 0;
|
op->peer_fd = 0;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
|
||||||
op->bs_op = new blockstore_op_t();
|
op->bs_op = new blockstore_op_t();
|
||||||
op->bs_op->opcode = BS_OP_LIST;
|
op->bs_op->opcode = BS_OP_LIST;
|
||||||
op->bs_op->oid.stripe = pg_stripe_size;
|
op->bs_op->oid.stripe = pg_stripe_size;
|
||||||
op->bs_op->len = pg_count;
|
op->bs_op->len = pg_count;
|
||||||
op->bs_op->offset = ps->pg_num-1;
|
op->bs_op->offset = ps->pg_num-1;
|
||||||
op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op)
|
op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
|
||||||
{
|
{
|
||||||
if (op->bs_op->retval < 0)
|
if (op->bs_op->retval < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error("local OP_LIST failed");
|
throw std::runtime_error("local OP_LIST failed");
|
||||||
}
|
}
|
||||||
|
add_bs_subop_stats(op);
|
||||||
printf(
|
printf(
|
||||||
"[PG %u] Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n",
|
"[PG %u] Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n",
|
||||||
ps->pg_num, role_osd, bs_op->retval, bs_op->version
|
ps->pg_num, role_osd, bs_op->retval, bs_op->version
|
||||||
|
|
|
@ -97,7 +97,7 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
op_data->done = op_data->errors = 0;
|
op_data->done = op_data->errors = 0;
|
||||||
op_data->n_subops = n_subops;
|
op_data->n_subops = n_subops;
|
||||||
op_data->subops = subops;
|
op_data->subops = subops;
|
||||||
int subop = 0;
|
int i = 0;
|
||||||
for (int role = 0; role < pg_size; role++)
|
for (int role = 0; role < pg_size; role++)
|
||||||
{
|
{
|
||||||
// We always submit zero-length writes to all replicas, even if the stripe is not modified
|
// We always submit zero-length writes to all replicas, even if the stripe is not modified
|
||||||
|
@ -110,19 +110,13 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
{
|
{
|
||||||
if (role_osd_num == this->osd_num)
|
if (role_osd_num == this->osd_num)
|
||||||
{
|
{
|
||||||
subops[subop].bs_op = new blockstore_op_t({
|
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
|
||||||
|
subops[i].op_type = (long)cur_op;
|
||||||
|
subops[i].bs_op = new blockstore_op_t({
|
||||||
.opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ),
|
.opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ),
|
||||||
.callback = [cur_op, this](blockstore_op_t *subop)
|
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
|
||||||
{
|
{
|
||||||
if (subop->opcode == BS_OP_WRITE && subop->retval != subop->len)
|
handle_primary_bs_subop(subop);
|
||||||
{
|
|
||||||
// die
|
|
||||||
throw std::runtime_error("local write operation failed (retval = "+std::to_string(subop->retval)+")");
|
|
||||||
}
|
|
||||||
handle_primary_subop(
|
|
||||||
subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ,
|
|
||||||
cur_op, subop->retval, subop->len, subop->version
|
|
||||||
);
|
|
||||||
},
|
},
|
||||||
.oid = {
|
.oid = {
|
||||||
.inode = op_data->oid.inode,
|
.inode = op_data->oid.inode,
|
||||||
|
@ -133,14 +127,14 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
||||||
.buf = w ? stripes[role].write_buf : stripes[role].read_buf,
|
.buf = w ? stripes[role].write_buf : stripes[role].read_buf,
|
||||||
});
|
});
|
||||||
bs->enqueue_op(subops[subop].bs_op);
|
bs->enqueue_op(subops[i].bs_op);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
subops[subop].op_type = OSD_OP_OUT;
|
subops[i].op_type = OSD_OP_OUT;
|
||||||
subops[subop].send_list.push_back(subops[subop].req.buf, OSD_PACKET_SIZE);
|
subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE);
|
||||||
subops[subop].peer_fd = this->osd_peer_fds.at(role_osd_num);
|
subops[i].peer_fd = this->osd_peer_fds.at(role_osd_num);
|
||||||
subops[subop].req.sec_rw = {
|
subops[i].req.sec_rw = {
|
||||||
.header = {
|
.header = {
|
||||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||||
.id = this->next_subop_id++,
|
.id = this->next_subop_id++,
|
||||||
|
@ -154,12 +148,12 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
.offset = w ? stripes[role].write_start : stripes[role].read_start,
|
.offset = w ? stripes[role].write_start : stripes[role].read_start,
|
||||||
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
|
||||||
};
|
};
|
||||||
subops[subop].buf = w ? stripes[role].write_buf : stripes[role].read_buf;
|
subops[i].buf = w ? stripes[role].write_buf : stripes[role].read_buf;
|
||||||
if (w && stripes[role].write_end > 0)
|
if (w && stripes[role].write_end > 0)
|
||||||
{
|
{
|
||||||
subops[subop].send_list.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start);
|
subops[i].send_list.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start);
|
||||||
}
|
}
|
||||||
subops[subop].callback = [cur_op, this](osd_op_t *subop)
|
subops[i].callback = [cur_op, this](osd_op_t *subop)
|
||||||
{
|
{
|
||||||
int fail_fd = subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE &&
|
int fail_fd = subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE &&
|
||||||
subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1;
|
subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1;
|
||||||
|
@ -175,13 +169,59 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
|
||||||
stop_client(fail_fd);
|
stop_client(fail_fd);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
outbox_push(clients[subops[subop].peer_fd], &subops[subop]);
|
outbox_push(clients[subops[i].peer_fd], &subops[i]);
|
||||||
}
|
}
|
||||||
subop++;
|
i++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static uint64_t bs_op_to_osd_op[] = {
|
||||||
|
0,
|
||||||
|
OSD_OP_SECONDARY_READ, // BS_OP_READ
|
||||||
|
OSD_OP_SECONDARY_WRITE, // BS_OP_WRITE
|
||||||
|
OSD_OP_SECONDARY_SYNC, // BS_OP_SYNC
|
||||||
|
OSD_OP_SECONDARY_STABILIZE, // BS_OP_STABLE
|
||||||
|
OSD_OP_SECONDARY_DELETE, // BS_OP_DELETE
|
||||||
|
OSD_OP_SECONDARY_LIST, // BS_OP_LIST
|
||||||
|
OSD_OP_SECONDARY_ROLLBACK, // BS_OP_ROLLBACK
|
||||||
|
OSD_OP_TEST_SYNC_STAB_ALL, // BS_OP_SYNC_STAB_ALL
|
||||||
|
};
|
||||||
|
|
||||||
|
void osd_t::handle_primary_bs_subop(osd_op_t *subop)
|
||||||
|
{
|
||||||
|
osd_op_t *cur_op = (osd_op_t*)(long)subop->op_type;
|
||||||
|
blockstore_op_t *bs_op = subop->bs_op;
|
||||||
|
int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE ? bs_op->len : 0;
|
||||||
|
if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ)
|
||||||
|
{
|
||||||
|
// die
|
||||||
|
throw std::runtime_error(
|
||||||
|
"local blockstore modification failed (opcode = "+std::to_string(bs_op->opcode)+
|
||||||
|
" retval = "+std::to_string(bs_op->retval)+")"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
add_bs_subop_stats(subop);
|
||||||
|
handle_primary_subop(bs_op_to_osd_op[bs_op->opcode], cur_op, bs_op->retval, expected, bs_op->version);
|
||||||
|
}
|
||||||
|
|
||||||
|
void osd_t::add_bs_subop_stats(osd_op_t *subop)
|
||||||
|
{
|
||||||
|
// Include local blockstore ops in statistics
|
||||||
|
uint64_t opcode = bs_op_to_osd_op[subop->bs_op->opcode];
|
||||||
|
timespec tv_end;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &tv_end);
|
||||||
|
op_stat_count[0][opcode]++;
|
||||||
|
op_stat_sum[0][opcode] += (
|
||||||
|
(tv_end.tv_sec - subop->tv_begin.tv_sec)*1000000 +
|
||||||
|
(tv_end.tv_nsec - subop->tv_begin.tv_nsec)/1000
|
||||||
|
);
|
||||||
|
if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE)
|
||||||
|
{
|
||||||
|
op_stat_bytes[0][opcode] += subop->bs_op->len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version)
|
void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version)
|
||||||
{
|
{
|
||||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||||
|
@ -260,16 +300,13 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os
|
||||||
{
|
{
|
||||||
if (chunk.osd_num == this->osd_num)
|
if (chunk.osd_num == this->osd_num)
|
||||||
{
|
{
|
||||||
|
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
|
||||||
|
subops[i].op_type = (long)cur_op;
|
||||||
subops[i].bs_op = new blockstore_op_t({
|
subops[i].bs_op = new blockstore_op_t({
|
||||||
.opcode = BS_OP_DELETE,
|
.opcode = BS_OP_DELETE,
|
||||||
.callback = [cur_op, this](blockstore_op_t *subop)
|
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
|
||||||
{
|
{
|
||||||
if (subop->retval != 0)
|
handle_primary_bs_subop(subop);
|
||||||
{
|
|
||||||
// die
|
|
||||||
throw std::runtime_error("local delete operation failed");
|
|
||||||
}
|
|
||||||
handle_primary_subop(OSD_OP_SECONDARY_DELETE, cur_op, subop->retval, 0, 0);
|
|
||||||
},
|
},
|
||||||
.oid = {
|
.oid = {
|
||||||
.inode = op_data->oid.inode,
|
.inode = op_data->oid.inode,
|
||||||
|
@ -328,16 +365,13 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
|
||||||
osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num;
|
osd_num_t sync_osd = (*(op_data->unstable_write_osds))[i].osd_num;
|
||||||
if (sync_osd == this->osd_num)
|
if (sync_osd == this->osd_num)
|
||||||
{
|
{
|
||||||
|
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
|
||||||
|
subops[i].op_type = (long)cur_op;
|
||||||
subops[i].bs_op = new blockstore_op_t({
|
subops[i].bs_op = new blockstore_op_t({
|
||||||
.opcode = BS_OP_SYNC,
|
.opcode = BS_OP_SYNC,
|
||||||
.callback = [cur_op, this](blockstore_op_t *subop)
|
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
|
||||||
{
|
{
|
||||||
if (subop->retval != 0)
|
handle_primary_bs_subop(subop);
|
||||||
{
|
|
||||||
// die
|
|
||||||
throw std::runtime_error("local sync operation failed");
|
|
||||||
}
|
|
||||||
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval, 0, 0);
|
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
bs->enqueue_op(subops[i].bs_op);
|
bs->enqueue_op(subops[i].bs_op);
|
||||||
|
@ -382,16 +416,13 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
|
||||||
auto & stab_osd = (*(op_data->unstable_write_osds))[i];
|
auto & stab_osd = (*(op_data->unstable_write_osds))[i];
|
||||||
if (stab_osd.osd_num == this->osd_num)
|
if (stab_osd.osd_num == this->osd_num)
|
||||||
{
|
{
|
||||||
|
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
|
||||||
|
subops[i].op_type = (long)cur_op;
|
||||||
subops[i].bs_op = new blockstore_op_t({
|
subops[i].bs_op = new blockstore_op_t({
|
||||||
.opcode = BS_OP_STABLE,
|
.opcode = BS_OP_STABLE,
|
||||||
.callback = [cur_op, this](blockstore_op_t *subop)
|
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
|
||||||
{
|
{
|
||||||
if (subop->retval != 0)
|
handle_primary_bs_subop(subop);
|
||||||
{
|
|
||||||
// die
|
|
||||||
throw std::runtime_error("local stabilize operation failed");
|
|
||||||
}
|
|
||||||
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval, 0, 0);
|
|
||||||
},
|
},
|
||||||
.len = (uint32_t)stab_osd.len,
|
.len = (uint32_t)stab_osd.len,
|
||||||
.buf = (void*)(op_data->unstable_writes + stab_osd.start),
|
.buf = (void*)(op_data->unstable_writes + stab_osd.start),
|
||||||
|
|
10
osd_send.cpp
10
osd_send.cpp
|
@ -44,6 +44,16 @@ bool osd_t::try_send(osd_client_t & cl)
|
||||||
(tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 +
|
(tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 +
|
||||||
(tv_end.tv_nsec - cl.write_op->tv_begin.tv_nsec)/1000
|
(tv_end.tv_nsec - cl.write_op->tv_begin.tv_nsec)/1000
|
||||||
);
|
);
|
||||||
|
if (cl.write_op->req.hdr.opcode == OSD_OP_READ ||
|
||||||
|
cl.write_op->req.hdr.opcode == OSD_OP_WRITE)
|
||||||
|
{
|
||||||
|
op_stat_bytes[0][cl.write_op->req.hdr.opcode] += cl.write_op->req.rw.len;
|
||||||
|
}
|
||||||
|
else if (cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_READ ||
|
||||||
|
cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE)
|
||||||
|
{
|
||||||
|
op_stat_bytes[0][cl.write_op->req.hdr.opcode] += cl.write_op->req.sec_rw.len;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
|
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
|
||||||
|
|
Loading…
Reference in New Issue