Use immediate_commit to benefit the primary OSD

trace-sqes
Vitaliy Filippov 2020-03-10 02:05:32 +03:00
parent 3f522c66e6
commit 31f9445030
3 changed files with 75 additions and 18 deletions

View File

@ -68,6 +68,10 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
osd_num = strtoull(config["osd_num"].c_str(), NULL, 10); osd_num = strtoull(config["osd_num"].c_str(), NULL, 10);
if (!osd_num) if (!osd_num)
throw std::runtime_error("osd_num is required in the configuration"); throw std::runtime_error("osd_num is required in the configuration");
if (config["immediate_commit"] == "all")
immediate_commit = IMMEDIATE_ALL;
else if (config["immediate_commit"] == "small")
immediate_commit = IMMEDIATE_SMALL;
run_primary = config["run_primary"] == "true" || config["run_primary"] == "1" || config["run_primary"] == "yes"; run_primary = config["run_primary"] == "true" || config["run_primary"] == "1" || config["run_primary"] == "yes";
if (run_primary) if (run_primary)
init_primary(); init_primary();

5
osd.h
View File

@ -35,6 +35,10 @@
#define OSD_PEERING_PEERS 1 #define OSD_PEERING_PEERS 1
#define OSD_PEERING_PGS 2 #define OSD_PEERING_PGS 2
#define IMMEDIATE_NONE 0
#define IMMEDIATE_SMALL 1
#define IMMEDIATE_ALL 2
//#define OSD_STUB //#define OSD_STUB
struct osd_op_buf_list_t struct osd_op_buf_list_t
@ -173,6 +177,7 @@ class osd_t
int client_queue_depth = 128; int client_queue_depth = 128;
bool allow_test_ops = true; bool allow_test_ops = true;
int receive_buffer_size = 9000; int receive_buffer_size = 9000;
int immediate_commit = IMMEDIATE_NONE;
// peer OSDs // peer OSDs

View File

@ -326,6 +326,8 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
else if (op_data->st == 3) goto resume_3; else if (op_data->st == 3) goto resume_3;
else if (op_data->st == 4) goto resume_4; else if (op_data->st == 4) goto resume_4;
else if (op_data->st == 5) goto resume_5; else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6;
else if (op_data->st == 7) goto resume_7;
assert(op_data->st == 0); assert(op_data->st == 0);
// Check if actions are pending for this object // Check if actions are pending for this object
{ {
@ -373,24 +375,67 @@ resume_4:
op_data->st = 4; op_data->st = 4;
return; return;
resume_5: resume_5:
// Remember version as unstable // FIXME: Check for immediate_commit == IMMEDIATE_SMALL
osd_num_t *osd_set = pg.cur_set.data(); if (immediate_commit == IMMEDIATE_ALL)
for (int role = 0; role < pg.pg_size; role++)
{ {
if (osd_set[role] != 0) op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
op_data->unstable_writes = new obj_ver_id[pg.pg_cursize];
{ {
this->unstable_writes[(osd_object_id_t){ int last_start = 0;
.osd_num = osd_set[role], osd_num_t *osd_set = pg.cur_set.data();
.oid = { for (int role = 0; role < pg.pg_size; role++)
.inode = op_data->oid.inode, {
.stripe = op_data->oid.stripe | role, if (osd_set[role] != 0)
}, {
}] = op_data->fact_ver; op_data->unstable_writes[last_start] = (obj_ver_id){
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | role,
},
.version = op_data->fact_ver,
};
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = osd_set[role],
.start = last_start,
.len = 1,
});
last_start++;
}
}
} }
// Stabilize version sets
submit_primary_stab_subops(cur_op);
resume_6:
op_data->st = 6;
return;
resume_7:
// FIXME: Free them correctly (via a destructor or so)
delete op_data->unstable_write_osds;
delete[] op_data->unstable_writes;
op_data->unstable_writes = NULL;
op_data->unstable_write_osds = NULL;
}
else
{
// Remember version as unstable
osd_num_t *osd_set = pg.cur_set.data();
for (int role = 0; role < pg.pg_size; role++)
{
if (osd_set[role] != 0)
{
this->unstable_writes[(osd_object_id_t){
.osd_num = osd_set[role],
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | role,
},
}] = op_data->fact_ver;
}
}
// Remember PG as dirty to drop the connection when PG goes offline
// (this is required because of the "lazy sync")
this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num);
} }
// Remember PG as dirty to drop the connection when PG goes offline
// (this is required because of the "lazy sync")
this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num);
// Remove version override // Remove version override
pg.ver_override.erase(op_data->oid); pg.ver_override.erase(op_data->oid);
finish_primary_op(cur_op, cur_op->req.rw.len); finish_primary_op(cur_op, cur_op->req.rw.len);
@ -483,11 +528,14 @@ resume_2:
} }
} }
unstable_writes.clear(); unstable_writes.clear();
// SYNC if (immediate_commit != IMMEDIATE_ALL)
submit_primary_sync_subops(cur_op); {
// SYNC
submit_primary_sync_subops(cur_op);
resume_3: resume_3:
cur_op->op_data->st = 3; cur_op->op_data->st = 3;
return; return;
}
resume_4: resume_4:
// Stabilize version sets // Stabilize version sets
submit_primary_stab_subops(cur_op); submit_primary_stab_subops(cur_op);