From ae56fe106715d3f0c2434bd22e175dcdd8d6246b Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 10 Nov 2019 12:46:58 +0300 Subject: [PATCH] Finish sync algorithm in theory --- blockstore.cpp | 6 ++- blockstore.h | 14 +++--- blockstore_sync.cpp | 114 +++++++++++++++++++++++++++---------------- blockstore_write.cpp | 38 ++++++++++----- 4 files changed, 109 insertions(+), 63 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index e943988a..6c301f0c 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -123,6 +123,11 @@ void blockstore::loop() else { // try to submit ops + auto cur_sync = in_progress_syncs.begin(); + while (cur_sync != in_progress_syncs.end()) + { + continue_sync(*cur_sync++); + } auto cur = submit_queue.begin(); bool has_writes = false; while (cur != submit_queue.end()) @@ -176,7 +181,6 @@ void blockstore::loop() throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); } submit_queue.erase(op_ptr); - in_progress_ops.insert(op); } else { diff --git a/blockstore.h b/blockstore.h index 6f4a0155..54e7b4e6 100644 --- a/blockstore.h +++ b/blockstore.h @@ -170,11 +170,6 @@ public: // Otherwise, the submit order is free, that is all operations may be submitted immediately // In fact, adding a write operation must immediately result in dirty_db being populated -// write -> immediately add to dirty ops, immediately submit. postpone if ring full -// read -> check dirty ops, read or wait, remember max used journal offset, then unremember it -// sync -> take all current writes (inflight + pending), wait for them to finish, sync, move their state -// the question is: how to remember current writes. - #define OP_READ 1 #define OP_READ_DIRTY 2 #define OP_WRITE 3 @@ -218,7 +213,7 @@ private: uint64_t min_used_journal_sector, max_used_journal_sector; // Sync - std::deque sync_writes; + std::deque sync_big_writes; std::list::iterator in_progress_ptr; int big_write_count, sync_state, prev_sync_count; }; @@ -235,9 +230,10 @@ class blockstore spp::sparse_hash_map object_db; std::map dirty_db; std::list submit_queue; - std::deque unsynced_writes; + std::deque unsynced_big_writes; + int unsynced_small_writes = 0; std::list in_progress_syncs; - std::set in_progress_ops; + std::set in_progress_ops; // FIXME purpose of tracking this is unclear uint32_t block_order, block_size; uint64_t block_count; allocator *data_alloc; @@ -288,6 +284,8 @@ class blockstore // Sync int dequeue_sync(blockstore_operation *op); void handle_sync_event(ring_data_t *data, blockstore_operation *op); + int continue_sync(blockstore_operation *op); + int ack_sync(blockstore_operation *op); public: diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index d5d3ee85..a001dab9 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -3,33 +3,42 @@ #define SYNC_NO_BIG 1 #define SYNC_HAS_BIG 2 #define SYNC_DATA_SYNC_SENT 3 -#define SYNC_JOURNAL_SYNC_SENT 4 -#define SYNC_DONE 5 +#define SYNC_DATA_SYNC_DONE 4 +#define SYNC_JOURNAL_SYNC_SENT 5 +#define SYNC_DONE 6 int blockstore::dequeue_sync(blockstore_operation *op) { if (op->sync_state == 0) { op->big_write_count = 0; - op->sync_state = SYNC_NO_BIG; - op->sync_writes.swap(unsynced_writes); - unsynced_writes.clear(); - if (op->sync_writes.size() == 0) - { + op->sync_big_writes.swap(unsynced_big_writes); + op->big_write_count = op->sync_big_writes.size(); + if (op->big_write_count > 0) + op->sync_state = SYNC_HAS_BIG; + else if (unsynced_small_writes == 0) op->sync_state = SYNC_DONE; - } - auto it = op->sync_writes.begin(); - while (it != op->sync_writes.end()) + else + op->sync_state = SYNC_NO_BIG; + unsynced_big_writes.clear(); + unsynced_small_writes = 0; + } + int r = continue_sync(op); + if (r) + { + int done = ack_sync(op); + if (!done) { - uint32_t state = dirty_db[*it].state; - if (IS_BIG_WRITE(state)) - { - op->big_write_count++; - op->sync_state = SYNC_HAS_BIG; - } - it++; + in_progress_ops.insert(op); + op->prev_sync_count = in_progress_syncs.size(); + op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); } } + return r; +} + +int blockstore::continue_sync(blockstore_operation *op) +{ if (op->sync_state == SYNC_NO_BIG) { // No big writes, just fsync the journal @@ -48,7 +57,7 @@ int blockstore::dequeue_sync(blockstore_operation *op) op->pending_ops = 1; op->sync_state = SYNC_DATA_SYNC_SENT; } - else if (op->sync_state == SYNC_DATA_SYNC_SENT) + else if (op->sync_state == SYNC_DATA_SYNC_DONE) { // 2nd step: Data device is synced, prepare & write journal entries // Check space in the journal and journal memory buffers @@ -87,8 +96,7 @@ int blockstore::dequeue_sync(blockstore_operation *op) op->min_used_journal_sector = 1 + journal.cur_sector; sectors_required = 0; required = op->big_write_count; - // FIXME: advance it - auto it = op->sync_writes.begin(); + auto it = op->sync_big_writes.begin(); while (1) { int fits = (512 - journal.in_sector_pos) / sizeof(journal_entry_big_write); @@ -111,6 +119,7 @@ int blockstore::dequeue_sync(blockstore_operation *op) journal.crc32_last = je->crc32; journal.in_sector_pos += sizeof(journal_entry_big_write); required--; + it++; } if (required <= 0) break; @@ -136,9 +145,10 @@ int blockstore::dequeue_sync(blockstore_operation *op) op->max_used_journal_sector = 1 + journal.cur_sector; op->sync_state = SYNC_JOURNAL_SYNC_SENT; } - // FIXME: resubmit op from in_progress - op->prev_sync_count = in_progress_syncs.size(); - op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); + else + { + return 0; + } return 1; } @@ -151,40 +161,62 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op) throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); } op->pending_ops--; - if (op->min_used_journal_sector > 0) - { - for (uint64_t s = op->min_used_journal_sector; s != op->max_used_journal_sector; s = (s + 1) % journal.sector_count) - { - journal.sector_info[s-1].usage_count--; - } - op->min_used_journal_sector = op->max_used_journal_sector = 0; - } if (op->pending_ops == 0) { - // Acknowledge sync + // Release used journal sectors + if (op->min_used_journal_sector > 0) + { + for (uint64_t s = op->min_used_journal_sector; s != op->max_used_journal_sector; s = (s + 1) % journal.sector_count) + { + journal.sector_info[s-1].usage_count--; + } + op->min_used_journal_sector = op->max_used_journal_sector = 0; + } + // Handle state + if (op->sync_state == SYNC_DATA_SYNC_SENT) + { + op->sync_state = SYNC_DATA_SYNC_DONE; + } + else if (op->sync_state == SYNC_JOURNAL_SYNC_SENT) + { + op->sync_state = SYNC_DONE; + } + else + { + throw new std::runtime_error("BUG: unexpected sync op state"); + } + ack_sync(op); + } +} + +int blockstore::ack_sync(blockstore_operation *op) +{ + if (op->sync_state == SYNC_DONE && op->prev_sync_count == 0) + { + // Remove dependency of subsequent syncs auto it = op->in_progress_ptr; int done_syncs = 1; ++it; while (it != in_progress_syncs.end()) { - auto & next_sync = *it; + auto & next_sync = *it++; next_sync->prev_sync_count -= done_syncs; - if (next_sync->prev_sync_count == 0/* && next_sync->DONE*/) + if (next_sync->prev_sync_count == 0 && next_sync->sync_state == SYNC_DONE) { done_syncs++; - auto next_it = it; - it++; - in_progress_syncs.erase(next_it); + // Acknowledge next_sync + in_progress_syncs.erase(next_sync->in_progress_ptr); + in_progress_ops.erase(next_sync); next_sync->retval = 0; next_sync->callback(next_sync); - in_progress_ops.erase(next_sync); } - else - it++; } + // Acknowledge sync in_progress_syncs.erase(op->in_progress_ptr); + in_progress_ops.erase(op); op->retval = 0; op->callback(op); - in_progress_ops.erase(op); + return 1; } + return 0; } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 9425327b..a767c506 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -29,6 +29,7 @@ int blockstore::dequeue_write(blockstore_operation *op) ); op->pending_ops = 1; op->min_used_journal_sector = op->max_used_journal_sector = 0; + in_progress_ops.insert(op); } else { @@ -110,6 +111,7 @@ int blockstore::dequeue_write(blockstore_operation *op) journal.sector_info[journal.cur_sector].usage_count++; op->pending_ops = 2; op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; + in_progress_ops.insert(op); } return 1; } @@ -123,29 +125,39 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op) throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); } op->pending_ops--; - if (op->min_used_journal_sector > 0) - { - for (uint64_t s = op->min_used_journal_sector; s <= op->max_used_journal_sector; s++) - { - journal.sector_info[s-1].usage_count--; - } - op->min_used_journal_sector = op->max_used_journal_sector = 0; - } if (op->pending_ops == 0) { - // Acknowledge write without sync + // Release used journal sectors + if (op->min_used_journal_sector > 0) + { + for (uint64_t s = op->min_used_journal_sector; s <= op->max_used_journal_sector; s++) + { + journal.sector_info[s-1].usage_count--; + } + op->min_used_journal_sector = op->max_used_journal_sector = 0; + } + // Switch object state auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, }); dirty_it->second.state = (dirty_it->second.state == ST_J_SUBMITTED ? ST_J_WRITTEN : (dirty_it->second.state == ST_DEL_SUBMITTED ? ST_DEL_WRITTEN : ST_D_WRITTEN)); + // Acknowledge write without sync op->retval = op->len; op->callback(op); in_progress_ops.erase(op); - unsynced_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); + // Remember write as unsynced + if (IS_BIG_WRITE(dirty_it->second.state)) + { + unsynced_big_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + } + else + { + unsynced_small_writes++; + } } }