diff --git a/blockstore_impl.h b/blockstore_impl.h index 6775bd92..7a0528f1 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -286,12 +286,14 @@ class blockstore_impl_t // Stabilize int dequeue_stable(blockstore_op_t *op); int continue_stable(blockstore_op_t *op); + void mark_stable(const obj_ver_id & ov); void handle_stable_event(ring_data_t *data, blockstore_op_t *op); void stabilize_object(object_id oid, uint64_t max_ver); // Rollback int dequeue_rollback(blockstore_op_t *op); int continue_rollback(blockstore_op_t *op); + void mark_rolled_back(const obj_ver_id & ov); void handle_rollback_event(ring_data_t *data, blockstore_op_t *op); void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc); diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 7d6d6f87..93b9ae0b 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -587,33 +587,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u .oid = je->stable.oid, .version = je->stable.version, }; - auto it = bs->dirty_db.find(ov); - if (it == bs->dirty_db.end()) - { - // journal contains a legitimate STABLE entry for a non-existing dirty write - // this probably means that journal was trimmed between WRITE and STABLE entries - // skip it - } - else - { - while (1) - { - it->second.state = (it->second.state == ST_D_SYNCED - ? ST_D_STABLE - : (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE)); - if (it == bs->dirty_db.begin()) - break; - it--; - if (it->first.oid != ov.oid || IS_STABLE(it->second.state)) - break; - } - bs->flusher->enqueue_flush(ov); - } - auto unstab_it = bs->unstable_writes.find(ov.oid); - if (unstab_it != bs->unstable_writes.end() && unstab_it->second <= ov.version) - { - bs->unstable_writes.erase(unstab_it); - } + bs->mark_stable(ov); } else if (je->type == JE_ROLLBACK) { @@ -621,50 +595,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u printf("je_rollback oid=%lu:%lu ver=%lu\n", je->rollback.oid.inode, je->rollback.oid.stripe, je->rollback.version); #endif // rollback dirty writes of up to - auto it = bs->dirty_db.lower_bound((obj_ver_id){ + obj_ver_id ov = { .oid = je->rollback.oid, - .version = UINT64_MAX, - }); - if (it != bs->dirty_db.begin()) - { - uint64_t max_unstable = 0; - auto rm_start = it; - auto rm_end = it; - it--; - while (it->first.oid == je->rollback.oid && - it->first.version > je->rollback.version && - !IS_IN_FLIGHT(it->second.state) && - !IS_STABLE(it->second.state)) - { - if (it->first.oid != je->rollback.oid) - break; - else if (it->first.version <= je->rollback.version) - { - if (!IS_STABLE(it->second.state)) - max_unstable = it->first.version; - break; - } - else if (IS_STABLE(it->second.state)) - break; - // Remove entry - rm_start = it; - if (it == bs->dirty_db.begin()) - break; - it--; - } - if (rm_start != rm_end) - { - bs->erase_dirty(rm_start, rm_end, UINT64_MAX); - } - auto unstab_it = bs->unstable_writes.find(je->rollback.oid); - if (unstab_it != bs->unstable_writes.end()) - { - if (max_unstable == 0) - bs->unstable_writes.erase(unstab_it); - else - unstab_it->second = max_unstable; - } - } + .version = je->rollback.version, + }; + bs->mark_rolled_back(ov); } else if (je->type == JE_DELETE) { diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp index 3da1fa61..c3040d60 100644 --- a/blockstore_rollback.cpp +++ b/blockstore_rollback.cpp @@ -77,33 +77,6 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) } for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { - // FIXME This is here only for the purpose of tracking unstable_writes. Remove if not required - // FIXME ...aaaand this is similar to blockstore_init.cpp - maybe dedup it? - auto dirty_it = dirty_db.lower_bound((obj_ver_id){ - .oid = v->oid, - .version = UINT64_MAX, - }); - uint64_t max_unstable = 0; - while (dirty_it != dirty_db.begin()) - { - dirty_it--; - if (dirty_it->first.oid != v->oid) - break; - else if (dirty_it->first.version <= v->version) - { - if (!IS_STABLE(dirty_it->second.state)) - max_unstable = dirty_it->first.version; - break; - } - } - auto unstab_it = unstable_writes.find(v->oid); - if (unstab_it != unstable_writes.end()) - { - if (max_unstable == 0) - unstable_writes.erase(unstab_it); - else - unstab_it->second = max_unstable; - } journal_entry_rollback *je = (journal_entry_rollback*) prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback)); journal.sector_info[journal.cur_sector].dirty = false; @@ -161,26 +134,7 @@ resume_5: int i; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { - // Erase dirty_db entries - auto rm_end = dirty_db.lower_bound((obj_ver_id){ - .oid = v->oid, - .version = UINT64_MAX, - }); - auto rm_start = rm_end; - assert(rm_start != dirty_db.begin()); - rm_start--; - while (1) - { - if (rm_start->first.oid != v->oid || rm_start->first.version <= v->version) - { - rm_start++; - break; - } - if (rm_start == dirty_db.begin()) - break; - rm_start--; - } - erase_dirty(rm_start, rm_end, UINT64_MAX); + mark_rolled_back(*v); } journal.trim(); inflight_writes--; @@ -190,6 +144,54 @@ resume_5: return 1; } +void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov) +{ + auto it = dirty_db.lower_bound((obj_ver_id){ + .oid = ov.oid, + .version = UINT64_MAX, + }); + if (it != dirty_db.begin()) + { + uint64_t max_unstable = 0; + auto rm_start = it; + auto rm_end = it; + it--; + while (it->first.oid == ov.oid && + it->first.version > ov.version && + !IS_IN_FLIGHT(it->second.state) && + !IS_STABLE(it->second.state)) + { + if (it->first.oid != ov.oid) + break; + else if (it->first.version <= ov.version) + { + if (!IS_STABLE(it->second.state)) + max_unstable = it->first.version; + break; + } + else if (IS_STABLE(it->second.state)) + break; + // Remove entry + rm_start = it; + if (it == dirty_db.begin()) + break; + it--; + } + if (rm_start != rm_end) + { + erase_dirty(rm_start, rm_end, UINT64_MAX); + } + auto unstab_it = unstable_writes.find(ov.oid); + if (unstab_it != unstable_writes.end()) + { + if (max_unstable == 0) + unstable_writes.erase(unstab_it); + else + unstab_it->second = max_unstable; + } + } +} + void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t *op) { live = true; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 295cf087..a8dda0e9 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -109,12 +109,6 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { // FIXME: Only stabilize versions that aren't stable yet - auto unstab_it = unstable_writes.find(v->oid); - if (unstab_it != unstable_writes.end() && - unstab_it->second <= v->version) - { - unstable_writes.erase(unstab_it); - } journal_entry_stable *je = (journal_entry_stable*) prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable)); journal.sector_info[journal.cur_sector].dirty = false; @@ -174,42 +168,7 @@ resume_5: for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { // Mark all dirty_db entries up to op->version as stable - auto dirty_it = dirty_db.find(*v); - if (dirty_it != dirty_db.end()) - { - while (1) - { - if (dirty_it->second.state == ST_J_SYNCED) - { - dirty_it->second.state = ST_J_STABLE; - } - else if (dirty_it->second.state == ST_D_SYNCED) - { - dirty_it->second.state = ST_D_STABLE; - } - else if (dirty_it->second.state == ST_DEL_SYNCED) - { - dirty_it->second.state = ST_DEL_STABLE; - } - else if (IS_STABLE(dirty_it->second.state)) - { - break; - } - if (dirty_it == dirty_db.begin()) - { - break; - } - dirty_it--; - if (dirty_it->first.oid != v->oid) - { - break; - } - } -#ifdef BLOCKSTORE_DEBUG - printf("enqueue_flush %lu:%lu v%lu\n", v->oid.inode, v->oid.stripe, v->version); -#endif - flusher->enqueue_flush(*v); - } + mark_stable(*v); } inflight_writes--; // Acknowledge op @@ -218,6 +177,52 @@ resume_5: return 1; } +void blockstore_impl_t::mark_stable(const obj_ver_id & v) +{ + auto dirty_it = dirty_db.find(v); + if (dirty_it != dirty_db.end()) + { + while (1) + { + if (dirty_it->second.state == ST_J_SYNCED) + { + dirty_it->second.state = ST_J_STABLE; + } + else if (dirty_it->second.state == ST_D_SYNCED) + { + dirty_it->second.state = ST_D_STABLE; + } + else if (dirty_it->second.state == ST_DEL_SYNCED) + { + dirty_it->second.state = ST_DEL_STABLE; + } + else if (IS_STABLE(dirty_it->second.state)) + { + break; + } + if (dirty_it == dirty_db.begin()) + { + break; + } + dirty_it--; + if (dirty_it->first.oid != v.oid) + { + break; + } + } +#ifdef BLOCKSTORE_DEBUG + printf("enqueue_flush %lu:%lu v%lu\n", v.oid.inode, v.oid.stripe, v.version); +#endif + flusher->enqueue_flush(v); + } + auto unstab_it = unstable_writes.find(v.oid); + if (unstab_it != unstable_writes.end() && + unstab_it->second <= v.version) + { + unstable_writes.erase(unstab_it); + } +} + void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *op) { live = true;