From 50cf3667fab6d3a925e28ad177c646056f4ed5e8 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 25 Nov 2019 01:16:34 +0300 Subject: [PATCH] Track unstable writes --- blockstore.h | 2 ++ blockstore_init.cpp | 9 +++++++++ blockstore_read.cpp | 2 +- blockstore_stable.cpp | 9 ++++----- blockstore_sync.cpp | 4 ++++ 5 files changed, 20 insertions(+), 6 deletions(-) diff --git a/blockstore.h b/blockstore.h index 5d821615..60a1792c 100644 --- a/blockstore.h +++ b/blockstore.h @@ -50,6 +50,7 @@ #define IS_IN_FLIGHT(st) (st == ST_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) #define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT) +#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED) #define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE) #define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE) #define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE) @@ -252,6 +253,7 @@ class blockstore std::map dirty_db; std::list submit_queue; // FIXME: funny thing is that vector is better here std::vector unsynced_big_writes, unsynced_small_writes; + std::map unstable_writes; std::list in_progress_syncs; // ...and probably here, too uint32_t block_order, block_size; uint64_t block_count; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 0cc61d3f..0220e475 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -355,6 +355,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .journal_sector = proc_pos, }); bs->journal.used_sectors[proc_pos]++; + auto & unstab = bs->unstable_writes[ov.oid]; + unstab = !unstab || unstab > ov.version ? ov.version : unstab; } else if (je->type == JE_BIG_WRITE) { @@ -373,6 +375,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .journal_sector = proc_pos, }); bs->journal.used_sectors[proc_pos]++; + auto & unstab = bs->unstable_writes[ov.oid]; + unstab = !unstab || unstab > ov.version ? ov.version : unstab; } else if (je->type == JE_STABLE) { @@ -404,6 +408,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) } bs->flusher->queue_flush(ov); } + auto unstab_it = bs->unstable_writes.find(ov.oid); + if (unstab_it != bs->unstable_writes.end() && unstab_it->second <= ov.version) + { + bs->unstable_writes.erase(unstab_it); + } } else if (je->type == JE_DELETE) { diff --git a/blockstore_read.cpp b/blockstore_read.cpp index ee0316d9..9a4bc1cc 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -96,7 +96,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op) { dirty_entry& dirty = dirty_it->second; bool version_ok = read_op->version >= dirty_it->first.version; - if (IS_STABLE(dirty.state)) + if (IS_SYNCED(dirty.state)) { if (!version_ok && read_op->version != 0) read_op->version = dirty_it->first.version; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 3bcc7bd5..971778e3 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -22,11 +22,10 @@ // AND We must do it in batches, for the sake of reduced fsync call count // AND We must know what we stabilize. Basic workflow is like: // 1) primary OSD receives sync request -// 2) it determines his own unsynced writes from blockstore's information -// just before submitting fsync -// 3) it submits syncs to blockstore and peers -// 4) after everyone acks sync it takes the object list and sends stabilize requests to everyone -// 5) after everyone acks stabilize requests it acks the client's sync operation +// 2) it submits syncs to blockstore and peers +// 3) after everyone acks sync it acks sync to the client +// 4) after a while it takes his synced object list and sends stabilize requests +// to peers and to its own blockstore, thus freeing the old version int blockstore::dequeue_stable(blockstore_operation *op) { diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index ceb4cf54..25093243 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -148,10 +148,14 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op) op->sync_state = SYNC_DONE; for (auto it = op->sync_big_writes.begin(); it != op->sync_big_writes.end(); it++) { + auto & unstab = unstable_writes[it->oid]; + unstab = !unstab || unstab > it->version ? it->version : unstab; dirty_db[*it].state = ST_D_META_SYNCED; } for (auto it = op->sync_small_writes.begin(); it != op->sync_small_writes.end(); it++) { + auto & unstab = unstable_writes[it->oid]; + unstab = !unstab || unstab > it->version ? it->version : unstab; dirty_db[*it].state = ST_J_SYNCED; } ack_sync(op);