Extract osd_primary write and sync code into separate files

rel-0.5
Vitaliy Filippov 2021-03-23 00:34:46 +03:00
parent 04a1f18fa5
commit 0949f08407
4 changed files with 630 additions and 620 deletions

View File

@ -66,7 +66,8 @@ target_link_libraries(fio_vitastor_blk
# vitastor-osd
add_executable(vitastor-osd
osd_main.cpp osd.cpp osd_secondary.cpp msgr_receive.cpp msgr_send.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp
osd_primary.cpp osd_primary_subops.cpp etcd_state_client.cpp messenger.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp
osd_primary.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp
etcd_state_client.cpp messenger.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp
osd_rmw.cpp base64.cpp timerfd_manager.cpp epoll_manager.cpp ../json11/json11.cpp
)
target_link_libraries(vitastor-osd

View File

@ -177,625 +177,6 @@ resume_2:
finish_op(cur_op, cur_op->req.rw.len);
}
bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
// Check if actions are pending for this object
auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){
.oid = op_data->oid,
.osd_num = 0,
});
if (act_it != pg.flush_actions.end() &&
act_it->first.oid.inode == op_data->oid.inode &&
(act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe)
{
pg.write_queue.emplace(op_data->oid, cur_op);
return false;
}
// Check if there are other write requests to the same object
auto vo_it = pg.write_queue.find(op_data->oid);
if (vo_it != pg.write_queue.end())
{
op_data->st = 1;
pg.write_queue.emplace(op_data->oid, cur_op);
return false;
}
pg.write_queue.emplace(op_data->oid, cur_op);
return true;
}
void osd_t::continue_primary_write(osd_op_t *cur_op)
{
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
{
return;
}
osd_primary_op_data_t *op_data = cur_op->op_data;
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
else if (op_data->st == 3) goto resume_3;
else if (op_data->st == 4) goto resume_4;
else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6;
else if (op_data->st == 7) goto resume_7;
else if (op_data->st == 8) goto resume_8;
else if (op_data->st == 9) goto resume_9;
else if (op_data->st == 10) goto resume_10;
assert(op_data->st == 0);
if (!check_write_queue(cur_op, pg))
{
return;
}
resume_1:
// Determine blocks to read and write
// Missing chunks are allowed to be overwritten even in incomplete objects
// FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Simplified algorithm
op_data->stripes[0].write_start = op_data->stripes[0].req_start;
op_data->stripes[0].write_end = op_data->stripes[0].req_end;
op_data->stripes[0].write_buf = cur_op->buf;
if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
op_data->stripes[0].write_end != bs_block_size))
{
// Object is degraded/misplaced and will be moved to <write_osd_set>
op_data->stripes[0].read_start = 0;
op_data->stripes[0].read_end = bs_block_size;
cur_op->rmw_buf = op_data->stripes[0].read_buf = memalign_or_die(MEM_ALIGNMENT, bs_block_size);
}
}
else
{
cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set,
pg.pg_size, op_data->pg_data_size, pg.pg_cursize, pg.cur_set.data(), bs_block_size);
if (!cur_op->rmw_buf)
{
// Refuse partial overwrite of an incomplete object
cur_op->reply.hdr.retval = -EINVAL;
goto continue_others;
}
}
// Read required blocks
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op);
resume_2:
op_data->st = 2;
return;
resume_3:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Only (possibly) copy new data from the request into the recovery buffer
if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
op_data->stripes[0].write_end != bs_block_size))
{
memcpy(
op_data->stripes[0].read_buf + op_data->stripes[0].req_start,
op_data->stripes[0].write_buf,
op_data->stripes[0].req_end - op_data->stripes[0].req_start
);
op_data->stripes[0].write_buf = op_data->stripes[0].read_buf;
op_data->stripes[0].write_start = 0;
op_data->stripes[0].write_end = bs_block_size;
}
}
else
{
// For EC/XOR pools, save version override to make it impossible
// for parallel reads to read different versions of data and parity
pg.ver_override[op_data->oid] = op_data->fact_ver;
// Recover missing stripes, calculate parity
if (pg.scheme == POOL_SCHEME_XOR)
{
calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size);
}
else if (pg.scheme == POOL_SCHEME_JERASURE)
{
calc_rmw_parity_jerasure(op_data->stripes, pg.pg_size, op_data->pg_data_size, op_data->prev_set, pg.cur_set.data(), bs_block_size);
}
}
// Send writes
if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch)
{
op_data->target_ver = ((uint64_t)pg.epoch << (64-PG_EPOCH_BITS)) | 1;
}
else
{
if ((op_data->fact_ver & (1ul<<(64-PG_EPOCH_BITS) - 1)) == (1ul<<(64-PG_EPOCH_BITS) - 1))
{
assert(pg.epoch != ((1ul << PG_EPOCH_BITS)-1));
pg.epoch++;
}
op_data->target_ver = op_data->fact_ver + 1;
}
if (pg.epoch > pg.reported_epoch)
{
// Report newer epoch before writing
// FIXME: We may report only one PG state here...
this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
pg.history_changed = true;
report_pg_states();
resume_10:
if (pg.epoch > pg.reported_epoch)
{
op_data->st = 10;
return;
}
}
submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.pg_size, pg.cur_set.data(), cur_op);
resume_4:
op_data->st = 4;
return;
resume_5:
if (op_data->scheme != POOL_SCHEME_REPLICATED)
{
// Remove version override just after the write, but before stabilizing
pg.ver_override.erase(op_data->oid);
}
if (op_data->object_state)
{
// We must forget the unclean state of the object before deleting it
// so the next reads don't accidentally read a deleted version
// And it should be done at the same time as the removal of the version override
remove_object_from_state(op_data->oid, op_data->object_state, pg);
pg.clean_count++;
}
if (op_data->errors > 0)
{
free_object_state(pg, &op_data->object_state);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
resume_6:
resume_7:
if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6))
{
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
free_object_state(pg, &op_data->object_state);
return;
}
if (op_data->fact_ver == 1)
{
// Object is created
pg.clean_count++;
pg.total_count++;
}
if (op_data->object_state)
{
{
int recovery_type = op_data->object_state->state & (OBJ_DEGRADED|OBJ_INCOMPLETE) ? 0 : 1;
recovery_stat_count[0][recovery_type]++;
if (!recovery_stat_count[0][recovery_type])
{
recovery_stat_count[0][recovery_type]++;
recovery_stat_bytes[0][recovery_type] = 0;
}
for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); role++)
{
recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start;
}
}
// Any kind of a non-clean object can have extra chunks, because we don't record objects
// as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks
if (immediate_commit != IMMEDIATE_ALL)
{
// We can't remove extra chunks yet if fsyncs are explicit, because
// new copies may not be committed to stable storage yet
// We can only remove extra chunks after a successful SYNC for this PG
for (auto & chunk: op_data->object_state->osd_set)
{
// Check is the same as in submit_primary_del_subops()
if (op_data->scheme == POOL_SCHEME_REPLICATED
? !contains_osd(pg.cur_set.data(), pg.pg_size, chunk.osd_num)
: (chunk.osd_num != pg.cur_set[chunk.role]))
{
pg.copies_to_delete_after_sync.push_back((obj_ver_osd_t){
.osd_num = chunk.osd_num,
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | (op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role),
},
.version = op_data->fact_ver,
});
copies_to_delete_after_sync_count++;
}
}
free_object_state(pg, &op_data->object_state);
}
else
{
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
free_object_state(pg, &op_data->object_state);
if (op_data->n_subops > 0)
{
resume_8:
op_data->st = 8;
return;
resume_9:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
}
}
}
cur_op->reply.hdr.retval = cur_op->req.rw.len;
continue_others:
object_id oid = op_data->oid;
// Remove the operation from queue before calling finish_op so it doesn't see the completed operation in queue
auto next_it = pg.write_queue.find(oid);
if (next_it != pg.write_queue.end() && next_it->second == cur_op)
{
pg.write_queue.erase(next_it++);
}
// finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :)
finish_op(cur_op, cur_op->reply.hdr.retval);
// Continue other write operations to the same object
if (next_it != pg.write_queue.end() && next_it->first == oid)
{
osd_op_t *next_op = next_it->second;
continue_primary_write(next_op);
}
}
bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
if (op_data->st == base_state)
{
goto resume_6;
}
else if (op_data->st == base_state+1)
{
goto resume_7;
}
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
if (immediate_commit == IMMEDIATE_ALL)
{
if (op_data->scheme != POOL_SCHEME_REPLICATED)
{
// Send STABILIZE ops immediately
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
op_data->unstable_writes = new obj_ver_id[loc_set.size()];
{
int last_start = 0;
for (auto & chunk: loc_set)
{
op_data->unstable_writes[last_start] = (obj_ver_id){
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | chunk.role,
},
.version = op_data->fact_ver,
};
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = chunk.osd_num,
.start = last_start,
.len = 1,
});
last_start++;
}
}
submit_primary_stab_subops(cur_op);
resume_6:
op_data->st = 6;
return false;
resume_7:
// FIXME: Free those in the destructor?
delete op_data->unstable_write_osds;
delete[] op_data->unstable_writes;
op_data->unstable_writes = NULL;
op_data->unstable_write_osds = NULL;
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return false;
}
}
}
else
{
if (op_data->scheme != POOL_SCHEME_REPLICATED)
{
// Remember version as unstable for EC/XOR
for (auto & chunk: loc_set)
{
this->dirty_osds.insert(chunk.osd_num);
this->unstable_writes[(osd_object_id_t){
.osd_num = chunk.osd_num,
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | chunk.role,
},
}] = op_data->fact_ver;
}
}
else
{
// Only remember to sync OSDs for replicated pools
for (auto & chunk: loc_set)
{
this->dirty_osds.insert(chunk.osd_num);
}
}
// Remember PG as dirty to drop the connection when PG goes offline
// (this is required because of the "lazy sync")
auto cl_it = c_cli.clients.find(cur_op->peer_fd);
if (cl_it != c_cli.clients.end())
{
cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
}
dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
}
return true;
}
// Save and clear unstable_writes -> SYNC all -> STABLE all
void osd_t::continue_primary_sync(osd_op_t *cur_op)
{
if (!cur_op->op_data)
{
cur_op->op_data = (osd_primary_op_data_t*)calloc_or_die(1, sizeof(osd_primary_op_data_t));
}
osd_primary_op_data_t *op_data = cur_op->op_data;
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
else if (op_data->st == 3) goto resume_3;
else if (op_data->st == 4) goto resume_4;
else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6;
else if (op_data->st == 7) goto resume_7;
else if (op_data->st == 8) goto resume_8;
assert(op_data->st == 0);
if (syncs_in_progress.size() > 0)
{
// Wait for previous syncs, if any
// FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all
syncs_in_progress.push_back(cur_op);
op_data->st = 1;
resume_1:
return;
}
else
{
syncs_in_progress.push_back(cur_op);
}
resume_2:
if (dirty_osds.size() == 0)
{
// Nothing to sync
goto finish;
}
// Save and clear unstable_writes
// In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication
// It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway
if (unstable_writes.size() > 0)
{
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
osd_num_t last_osd = 0;
int last_start = 0, last_end = 0;
for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++)
{
if (last_osd != it->first.osd_num)
{
if (last_osd != 0)
{
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = last_osd,
.start = last_start,
.len = last_end - last_start,
});
}
last_osd = it->first.osd_num;
last_start = last_end;
}
op_data->unstable_writes[last_end] = (obj_ver_id){
.oid = it->first.oid,
.version = it->second,
};
last_end++;
}
if (last_osd != 0)
{
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = last_osd,
.start = last_start,
.len = last_end - last_start,
});
}
this->unstable_writes.clear();
}
{
void *dirty_buf = malloc_or_die(
sizeof(pool_pg_num_t)*dirty_pgs.size() +
sizeof(osd_num_t)*dirty_osds.size() +
sizeof(obj_ver_osd_t)*this->copies_to_delete_after_sync_count
);
op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf;
op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size());
op_data->dirty_pg_count = dirty_pgs.size();
op_data->dirty_osd_count = dirty_osds.size();
if (this->copies_to_delete_after_sync_count)
{
op_data->copies_to_delete_count = 0;
op_data->copies_to_delete = (obj_ver_osd_t*)(op_data->dirty_osds + op_data->dirty_osd_count);
for (auto dirty_pg_num: dirty_pgs)
{
auto & pg = pgs.at(dirty_pg_num);
assert(pg.copies_to_delete_after_sync.size() <= this->copies_to_delete_after_sync_count);
memcpy(
op_data->copies_to_delete + op_data->copies_to_delete_count,
pg.copies_to_delete_after_sync.data(),
sizeof(obj_ver_osd_t)*pg.copies_to_delete_after_sync.size()
);
op_data->copies_to_delete_count += pg.copies_to_delete_after_sync.size();
this->copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
pg.copies_to_delete_after_sync.clear();
}
assert(this->copies_to_delete_after_sync_count == 0);
}
int dpg = 0;
for (auto dirty_pg_num: dirty_pgs)
{
pgs.at(dirty_pg_num).inflight++;
op_data->dirty_pgs[dpg++] = dirty_pg_num;
}
dirty_pgs.clear();
dpg = 0;
for (auto osd_num: dirty_osds)
{
op_data->dirty_osds[dpg++] = osd_num;
}
dirty_osds.clear();
}
if (immediate_commit != IMMEDIATE_ALL)
{
// SYNC
if (!submit_primary_sync_subops(cur_op))
{
goto resume_4;
}
resume_3:
op_data->st = 3;
return;
resume_4:
if (op_data->errors > 0)
{
goto resume_6;
}
}
if (op_data->unstable_writes)
{
// Stabilize version sets, if any
submit_primary_stab_subops(cur_op);
resume_5:
op_data->st = 5;
return;
}
resume_6:
if (op_data->errors > 0)
{
// Return PGs and OSDs back into their dirty sets
for (int i = 0; i < op_data->dirty_pg_count; i++)
{
dirty_pgs.insert(op_data->dirty_pgs[i]);
}
for (int i = 0; i < op_data->dirty_osd_count; i++)
{
dirty_osds.insert(op_data->dirty_osds[i]);
}
if (op_data->unstable_writes)
{
// Return objects back into the unstable write set
for (auto unstable_osd: *(op_data->unstable_write_osds))
{
for (int i = 0; i < unstable_osd.len; i++)
{
// Except those from peered PGs
auto & w = op_data->unstable_writes[i];
pool_pg_num_t wpg = {
.pool_id = INODE_POOL(w.oid.inode),
.pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
};
if (pgs.at(wpg).state & PG_ACTIVE)
{
uint64_t & dest = this->unstable_writes[(osd_object_id_t){
.osd_num = unstable_osd.osd_num,
.oid = w.oid,
}];
dest = dest < w.version ? w.version : dest;
dirty_pgs.insert(wpg);
}
}
}
}
if (op_data->copies_to_delete)
{
// Return 'copies to delete' back into respective PGs
for (int i = 0; i < op_data->copies_to_delete_count; i++)
{
auto & w = op_data->copies_to_delete[i];
auto & pg = pgs.at((pool_pg_num_t){
.pool_id = INODE_POOL(w.oid.inode),
.pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
});
if (pg.state & PG_ACTIVE)
{
pg.copies_to_delete_after_sync.push_back(w);
copies_to_delete_after_sync_count++;
}
}
}
}
else if (op_data->copies_to_delete)
{
// Actually delete copies which we wanted to delete
submit_primary_del_batch(cur_op, op_data->copies_to_delete, op_data->copies_to_delete_count);
resume_7:
op_data->st = 7;
return;
resume_8:
if (op_data->errors > 0)
{
goto resume_6;
}
}
for (int i = 0; i < op_data->dirty_pg_count; i++)
{
auto & pg = pgs.at(op_data->dirty_pgs[i]);
pg.inflight--;
if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch &&
// We must either forget all PG's unstable writes or wait for it to become clean
dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end())
{
finish_stop_pg(pg);
}
}
// FIXME: Free those in the destructor?
free(op_data->dirty_pgs);
op_data->dirty_pgs = NULL;
op_data->dirty_osds = NULL;
if (op_data->unstable_writes)
{
delete op_data->unstable_write_osds;
delete[] op_data->unstable_writes;
op_data->unstable_writes = NULL;
op_data->unstable_write_osds = NULL;
}
if (op_data->errors > 0)
{
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
}
else
{
finish:
if (cur_op->peer_fd)
{
auto it = c_cli.clients.find(cur_op->peer_fd);
if (it != c_cli.clients.end())
it->second->dirty_pgs.clear();
}
finish_op(cur_op, 0);
}
assert(syncs_in_progress.front() == cur_op);
syncs_in_progress.pop_front();
if (syncs_in_progress.size() > 0)
{
cur_op = syncs_in_progress.front();
op_data = cur_op->op_data;
op_data->st++;
goto resume_2;
}
}
// Decrement pg_osd_set_state_t's object_count and change PG state accordingly
void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t & pg)
{

263
src/osd_primary_sync.cpp Normal file
View File

@ -0,0 +1,263 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include "osd_primary.h"
// Save and clear unstable_writes -> SYNC all -> STABLE all
void osd_t::continue_primary_sync(osd_op_t *cur_op)
{
if (!cur_op->op_data)
{
cur_op->op_data = (osd_primary_op_data_t*)calloc_or_die(1, sizeof(osd_primary_op_data_t));
}
osd_primary_op_data_t *op_data = cur_op->op_data;
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
else if (op_data->st == 3) goto resume_3;
else if (op_data->st == 4) goto resume_4;
else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6;
else if (op_data->st == 7) goto resume_7;
else if (op_data->st == 8) goto resume_8;
assert(op_data->st == 0);
if (syncs_in_progress.size() > 0)
{
// Wait for previous syncs, if any
// FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all
syncs_in_progress.push_back(cur_op);
op_data->st = 1;
resume_1:
return;
}
else
{
syncs_in_progress.push_back(cur_op);
}
resume_2:
if (dirty_osds.size() == 0)
{
// Nothing to sync
goto finish;
}
// Save and clear unstable_writes
// In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication
// It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway
if (unstable_writes.size() > 0)
{
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
osd_num_t last_osd = 0;
int last_start = 0, last_end = 0;
for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++)
{
if (last_osd != it->first.osd_num)
{
if (last_osd != 0)
{
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = last_osd,
.start = last_start,
.len = last_end - last_start,
});
}
last_osd = it->first.osd_num;
last_start = last_end;
}
op_data->unstable_writes[last_end] = (obj_ver_id){
.oid = it->first.oid,
.version = it->second,
};
last_end++;
}
if (last_osd != 0)
{
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = last_osd,
.start = last_start,
.len = last_end - last_start,
});
}
this->unstable_writes.clear();
}
{
void *dirty_buf = malloc_or_die(
sizeof(pool_pg_num_t)*dirty_pgs.size() +
sizeof(osd_num_t)*dirty_osds.size() +
sizeof(obj_ver_osd_t)*this->copies_to_delete_after_sync_count
);
op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf;
op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size());
op_data->dirty_pg_count = dirty_pgs.size();
op_data->dirty_osd_count = dirty_osds.size();
if (this->copies_to_delete_after_sync_count)
{
op_data->copies_to_delete_count = 0;
op_data->copies_to_delete = (obj_ver_osd_t*)(op_data->dirty_osds + op_data->dirty_osd_count);
for (auto dirty_pg_num: dirty_pgs)
{
auto & pg = pgs.at(dirty_pg_num);
assert(pg.copies_to_delete_after_sync.size() <= this->copies_to_delete_after_sync_count);
memcpy(
op_data->copies_to_delete + op_data->copies_to_delete_count,
pg.copies_to_delete_after_sync.data(),
sizeof(obj_ver_osd_t)*pg.copies_to_delete_after_sync.size()
);
op_data->copies_to_delete_count += pg.copies_to_delete_after_sync.size();
this->copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
pg.copies_to_delete_after_sync.clear();
}
assert(this->copies_to_delete_after_sync_count == 0);
}
int dpg = 0;
for (auto dirty_pg_num: dirty_pgs)
{
pgs.at(dirty_pg_num).inflight++;
op_data->dirty_pgs[dpg++] = dirty_pg_num;
}
dirty_pgs.clear();
dpg = 0;
for (auto osd_num: dirty_osds)
{
op_data->dirty_osds[dpg++] = osd_num;
}
dirty_osds.clear();
}
if (immediate_commit != IMMEDIATE_ALL)
{
// SYNC
if (!submit_primary_sync_subops(cur_op))
{
goto resume_4;
}
resume_3:
op_data->st = 3;
return;
resume_4:
if (op_data->errors > 0)
{
goto resume_6;
}
}
if (op_data->unstable_writes)
{
// Stabilize version sets, if any
submit_primary_stab_subops(cur_op);
resume_5:
op_data->st = 5;
return;
}
resume_6:
if (op_data->errors > 0)
{
// Return PGs and OSDs back into their dirty sets
for (int i = 0; i < op_data->dirty_pg_count; i++)
{
dirty_pgs.insert(op_data->dirty_pgs[i]);
}
for (int i = 0; i < op_data->dirty_osd_count; i++)
{
dirty_osds.insert(op_data->dirty_osds[i]);
}
if (op_data->unstable_writes)
{
// Return objects back into the unstable write set
for (auto unstable_osd: *(op_data->unstable_write_osds))
{
for (int i = 0; i < unstable_osd.len; i++)
{
// Except those from peered PGs
auto & w = op_data->unstable_writes[i];
pool_pg_num_t wpg = {
.pool_id = INODE_POOL(w.oid.inode),
.pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
};
if (pgs.at(wpg).state & PG_ACTIVE)
{
uint64_t & dest = this->unstable_writes[(osd_object_id_t){
.osd_num = unstable_osd.osd_num,
.oid = w.oid,
}];
dest = dest < w.version ? w.version : dest;
dirty_pgs.insert(wpg);
}
}
}
}
if (op_data->copies_to_delete)
{
// Return 'copies to delete' back into respective PGs
for (int i = 0; i < op_data->copies_to_delete_count; i++)
{
auto & w = op_data->copies_to_delete[i];
auto & pg = pgs.at((pool_pg_num_t){
.pool_id = INODE_POOL(w.oid.inode),
.pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
});
if (pg.state & PG_ACTIVE)
{
pg.copies_to_delete_after_sync.push_back(w);
copies_to_delete_after_sync_count++;
}
}
}
}
else if (op_data->copies_to_delete)
{
// Actually delete copies which we wanted to delete
submit_primary_del_batch(cur_op, op_data->copies_to_delete, op_data->copies_to_delete_count);
resume_7:
op_data->st = 7;
return;
resume_8:
if (op_data->errors > 0)
{
goto resume_6;
}
}
for (int i = 0; i < op_data->dirty_pg_count; i++)
{
auto & pg = pgs.at(op_data->dirty_pgs[i]);
pg.inflight--;
if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch &&
// We must either forget all PG's unstable writes or wait for it to become clean
dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end())
{
finish_stop_pg(pg);
}
}
// FIXME: Free those in the destructor?
free(op_data->dirty_pgs);
op_data->dirty_pgs = NULL;
op_data->dirty_osds = NULL;
if (op_data->unstable_writes)
{
delete op_data->unstable_write_osds;
delete[] op_data->unstable_writes;
op_data->unstable_writes = NULL;
op_data->unstable_write_osds = NULL;
}
if (op_data->errors > 0)
{
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
}
else
{
finish:
if (cur_op->peer_fd)
{
auto it = c_cli.clients.find(cur_op->peer_fd);
if (it != c_cli.clients.end())
it->second->dirty_pgs.clear();
}
finish_op(cur_op, 0);
}
assert(syncs_in_progress.front() == cur_op);
syncs_in_progress.pop_front();
if (syncs_in_progress.size() > 0)
{
cur_op = syncs_in_progress.front();
op_data = cur_op->op_data;
op_data->st++;
goto resume_2;
}
}

365
src/osd_primary_write.cpp Normal file
View File

@ -0,0 +1,365 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include "osd_primary.h"
#include "allocator.h"
bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
// Check if actions are pending for this object
auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){
.oid = op_data->oid,
.osd_num = 0,
});
if (act_it != pg.flush_actions.end() &&
act_it->first.oid.inode == op_data->oid.inode &&
(act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe)
{
pg.write_queue.emplace(op_data->oid, cur_op);
return false;
}
// Check if there are other write requests to the same object
auto vo_it = pg.write_queue.find(op_data->oid);
if (vo_it != pg.write_queue.end())
{
op_data->st = 1;
pg.write_queue.emplace(op_data->oid, cur_op);
return false;
}
pg.write_queue.emplace(op_data->oid, cur_op);
return true;
}
void osd_t::continue_primary_write(osd_op_t *cur_op)
{
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
{
return;
}
osd_primary_op_data_t *op_data = cur_op->op_data;
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
else if (op_data->st == 3) goto resume_3;
else if (op_data->st == 4) goto resume_4;
else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6;
else if (op_data->st == 7) goto resume_7;
else if (op_data->st == 8) goto resume_8;
else if (op_data->st == 9) goto resume_9;
else if (op_data->st == 10) goto resume_10;
assert(op_data->st == 0);
if (!check_write_queue(cur_op, pg))
{
return;
}
resume_1:
// Determine blocks to read and write
// Missing chunks are allowed to be overwritten even in incomplete objects
// FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Simplified algorithm
op_data->stripes[0].write_start = op_data->stripes[0].req_start;
op_data->stripes[0].write_end = op_data->stripes[0].req_end;
op_data->stripes[0].write_buf = cur_op->buf;
if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
op_data->stripes[0].write_end != bs_block_size))
{
// Object is degraded/misplaced and will be moved to <write_osd_set>
op_data->stripes[0].read_start = 0;
op_data->stripes[0].read_end = bs_block_size;
cur_op->rmw_buf = op_data->stripes[0].read_buf = memalign_or_die(MEM_ALIGNMENT, bs_block_size);
}
}
else
{
cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set,
pg.pg_size, op_data->pg_data_size, pg.pg_cursize, pg.cur_set.data(), bs_block_size);
if (!cur_op->rmw_buf)
{
// Refuse partial overwrite of an incomplete object
cur_op->reply.hdr.retval = -EINVAL;
goto continue_others;
}
}
// Read required blocks
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op);
resume_2:
op_data->st = 2;
return;
resume_3:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Only (possibly) copy new data from the request into the recovery buffer
if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
op_data->stripes[0].write_end != bs_block_size))
{
memcpy(
op_data->stripes[0].read_buf + op_data->stripes[0].req_start,
op_data->stripes[0].write_buf,
op_data->stripes[0].req_end - op_data->stripes[0].req_start
);
op_data->stripes[0].write_buf = op_data->stripes[0].read_buf;
op_data->stripes[0].write_start = 0;
op_data->stripes[0].write_end = bs_block_size;
}
}
else
{
// For EC/XOR pools, save version override to make it impossible
// for parallel reads to read different versions of data and parity
pg.ver_override[op_data->oid] = op_data->fact_ver;
// Recover missing stripes, calculate parity
if (pg.scheme == POOL_SCHEME_XOR)
{
calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size);
}
else if (pg.scheme == POOL_SCHEME_JERASURE)
{
calc_rmw_parity_jerasure(op_data->stripes, pg.pg_size, op_data->pg_data_size, op_data->prev_set, pg.cur_set.data(), bs_block_size);
}
}
// Send writes
if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch)
{
op_data->target_ver = ((uint64_t)pg.epoch << (64-PG_EPOCH_BITS)) | 1;
}
else
{
if ((op_data->fact_ver & (1ul<<(64-PG_EPOCH_BITS) - 1)) == (1ul<<(64-PG_EPOCH_BITS) - 1))
{
assert(pg.epoch != ((1ul << PG_EPOCH_BITS)-1));
pg.epoch++;
}
op_data->target_ver = op_data->fact_ver + 1;
}
if (pg.epoch > pg.reported_epoch)
{
// Report newer epoch before writing
// FIXME: We may report only one PG state here...
this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
pg.history_changed = true;
report_pg_states();
resume_10:
if (pg.epoch > pg.reported_epoch)
{
op_data->st = 10;
return;
}
}
submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.pg_size, pg.cur_set.data(), cur_op);
resume_4:
op_data->st = 4;
return;
resume_5:
if (op_data->scheme != POOL_SCHEME_REPLICATED)
{
// Remove version override just after the write, but before stabilizing
pg.ver_override.erase(op_data->oid);
}
if (op_data->object_state)
{
// We must forget the unclean state of the object before deleting it
// so the next reads don't accidentally read a deleted version
// And it should be done at the same time as the removal of the version override
remove_object_from_state(op_data->oid, op_data->object_state, pg);
pg.clean_count++;
}
if (op_data->errors > 0)
{
free_object_state(pg, &op_data->object_state);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
resume_6:
resume_7:
if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6))
{
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
free_object_state(pg, &op_data->object_state);
return;
}
if (op_data->fact_ver == 1)
{
// Object is created
pg.clean_count++;
pg.total_count++;
}
if (op_data->object_state)
{
{
int recovery_type = op_data->object_state->state & (OBJ_DEGRADED|OBJ_INCOMPLETE) ? 0 : 1;
recovery_stat_count[0][recovery_type]++;
if (!recovery_stat_count[0][recovery_type])
{
recovery_stat_count[0][recovery_type]++;
recovery_stat_bytes[0][recovery_type] = 0;
}
for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); role++)
{
recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start;
}
}
// Any kind of a non-clean object can have extra chunks, because we don't record objects
// as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks
if (immediate_commit != IMMEDIATE_ALL)
{
// We can't remove extra chunks yet if fsyncs are explicit, because
// new copies may not be committed to stable storage yet
// We can only remove extra chunks after a successful SYNC for this PG
for (auto & chunk: op_data->object_state->osd_set)
{
// Check is the same as in submit_primary_del_subops()
if (op_data->scheme == POOL_SCHEME_REPLICATED
? !contains_osd(pg.cur_set.data(), pg.pg_size, chunk.osd_num)
: (chunk.osd_num != pg.cur_set[chunk.role]))
{
pg.copies_to_delete_after_sync.push_back((obj_ver_osd_t){
.osd_num = chunk.osd_num,
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | (op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role),
},
.version = op_data->fact_ver,
});
copies_to_delete_after_sync_count++;
}
}
free_object_state(pg, &op_data->object_state);
}
else
{
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
free_object_state(pg, &op_data->object_state);
if (op_data->n_subops > 0)
{
resume_8:
op_data->st = 8;
return;
resume_9:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
}
}
}
cur_op->reply.hdr.retval = cur_op->req.rw.len;
continue_others:
object_id oid = op_data->oid;
// Remove the operation from queue before calling finish_op so it doesn't see the completed operation in queue
auto next_it = pg.write_queue.find(oid);
if (next_it != pg.write_queue.end() && next_it->second == cur_op)
{
pg.write_queue.erase(next_it++);
}
// finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :)
finish_op(cur_op, cur_op->reply.hdr.retval);
// Continue other write operations to the same object
if (next_it != pg.write_queue.end() && next_it->first == oid)
{
osd_op_t *next_op = next_it->second;
continue_primary_write(next_op);
}
}
bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
if (op_data->st == base_state)
{
goto resume_6;
}
else if (op_data->st == base_state+1)
{
goto resume_7;
}
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
if (immediate_commit == IMMEDIATE_ALL)
{
if (op_data->scheme != POOL_SCHEME_REPLICATED)
{
// Send STABILIZE ops immediately
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
op_data->unstable_writes = new obj_ver_id[loc_set.size()];
{
int last_start = 0;
for (auto & chunk: loc_set)
{
op_data->unstable_writes[last_start] = (obj_ver_id){
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | chunk.role,
},
.version = op_data->fact_ver,
};
op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = chunk.osd_num,
.start = last_start,
.len = 1,
});
last_start++;
}
}
submit_primary_stab_subops(cur_op);
resume_6:
op_data->st = 6;
return false;
resume_7:
// FIXME: Free those in the destructor?
delete op_data->unstable_write_osds;
delete[] op_data->unstable_writes;
op_data->unstable_writes = NULL;
op_data->unstable_write_osds = NULL;
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return false;
}
}
}
else
{
if (op_data->scheme != POOL_SCHEME_REPLICATED)
{
// Remember version as unstable for EC/XOR
for (auto & chunk: loc_set)
{
this->dirty_osds.insert(chunk.osd_num);
this->unstable_writes[(osd_object_id_t){
.osd_num = chunk.osd_num,
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | chunk.role,
},
}] = op_data->fact_ver;
}
}
else
{
// Only remember to sync OSDs for replicated pools
for (auto & chunk: loc_set)
{
this->dirty_osds.insert(chunk.osd_num);
}
}
// Remember PG as dirty to drop the connection when PG goes offline
// (this is required because of the "lazy sync")
auto cl_it = c_cli.clients.find(cur_op->peer_fd);
if (cl_it != c_cli.clients.end())
{
cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
}
dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
}
return true;
}