diff --git a/blockstore.h b/blockstore.h index ccadff0d..56d5b21b 100644 --- a/blockstore.h +++ b/blockstore.h @@ -33,7 +33,6 @@ #define ST_J_WRITTEN 3 #define ST_J_SYNCED 4 #define ST_J_STABLE 5 -#define ST_J_MOVE_READ_SUBMITTED 6 #define ST_J_MOVE_WRITE_SUBMITTED 7 #define ST_J_MOVE_SYNCED 8 @@ -109,6 +108,11 @@ inline bool operator == (const object_id & a, const object_id & b) return a.inode == b.inode && a.stripe == b.stripe; } +inline bool operator != (const object_id & a, const object_id & b) +{ + return a.inode != b.inode || a.stripe != b.stripe; +} + inline bool operator < (const object_id & a, const object_id & b) { return a.inode < b.inode || a.inode == b.inode && a.stripe < b.stripe; diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index fb5e3ce4..ba1f193b 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -46,6 +46,34 @@ void journal_flusher_t::loop() } } +void journal_flusher_t::queue_flush(obj_ver_id ov) +{ + auto it = flush_versions.find(ov.oid); + if (it != flush_versions.end()) + { + it->second = ov.version; + } + else + { + flush_versions[ov.oid] = ov.version; + flush_queue.push_back(ov.oid); + } +} + +void journal_flusher_t::unshift_flush(obj_ver_id ov) +{ + auto it = flush_versions.find(ov.oid); + if (it != flush_versions.end()) + { + it->second = ov.version; + } + else + { + flush_versions[ov.oid] = ov.version; + flush_queue.push_front(ov.oid); + } +} + #define await_sqe(label) \ resume_##label:\ sqe = bs->get_sqe();\ @@ -85,11 +113,26 @@ void journal_flusher_co::loop() resume_0: if (!flusher->flush_queue.size()) return; - cur = flusher->flush_queue.front(); + cur.oid = flusher->flush_queue.front(); + cur.version = flusher->flush_versions[cur.oid]; flusher->flush_queue.pop_front(); + flusher->flush_versions.erase(cur.oid); dirty_end = bs->dirty_db.find(cur); if (dirty_end != bs->dirty_db.end()) { + repeat_it = flusher->sync_to_repeat.find(cur.oid); + if (repeat_it != flusher->sync_to_repeat.end()) + { + // We don't flush different parts of history of the same object in parallel + // So we check if someone is already flushing this object + // In that case we set sync_to_repeat to 2 and pick another object + // Another coroutine will see this "2" and re-queue the object after it finishes + repeat_it->second = cur.version; + wait_state = 0; + goto resume_0; + } + else + repeat_it->second = 0; dirty_it = dirty_end; flusher->active_flushers++; flusher->active_until_sync++; @@ -99,7 +142,7 @@ resume_0: skip_copy = false; do { - if (dirty_it->second.state == ST_J_STABLE) + if (dirty_it->second.state == ST_J_STABLE && !skip_copy) { // First we submit all reads offset = dirty_it->second.offset; @@ -127,8 +170,6 @@ resume_0: break; } } - // So subsequent stabilizers don't flush the entry again - dirty_it->second.state = ST_J_MOVE_READ_SUBMITTED; } else if (dirty_it->second.state == ST_D_STABLE) { @@ -139,16 +180,10 @@ resume_0: } skip_copy = true; } - else if (IS_STABLE(dirty_it->second.state)) - { - // Other coroutine is already flushing it, stop - break; - } - else + else if (!IS_STABLE(dirty_it->second.state)) { throw new std::runtime_error("BUG: Unexpected dirty_entry state during flush: " + std::to_string(dirty_it->second.state)); } - dirty_start = dirty_it; dirty_it--; } while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid); if (wait_count == 0 && clean_loc == UINT64_MAX) @@ -156,6 +191,13 @@ resume_0: // Nothing to flush flusher->active_flushers--; flusher->active_until_sync--; + repeat_it = flusher->sync_to_repeat.find(cur.oid); + if (repeat_it->second != 0) + { + // Requeue version + flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }); + } + flusher->sync_to_repeat.erase(repeat_it); wait_state = 0; goto resume_0; } @@ -313,24 +355,34 @@ resume_0: .version = cur.version, .location = clean_loc, }; - for (dirty_it = dirty_start; dirty_it != dirty_end; dirty_it++) + dirty_it = dirty_end; + do { if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc) { allocator_set(bs->data_alloc, dirty_it->second.location >> bs->block_order, false); } int used = --bs->journal.used_sectors[dirty_it->second.journal_sector]; - if (used == 1) + if (used == 0) { bs->journal.used_sectors.erase(dirty_it->second.journal_sector); } - } - // Then, basically, remove the whole version range from dirty_db... - // FIXME not until dirty_start, until other object. And wait for previous flushes. - bs->dirty_db.erase(dirty_start, std::next(dirty_end)); + dirty_it--; + } while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid); + // Then, basically, remove everything up to the current version from dirty_db... + if (dirty_it->first.oid != cur.oid) + dirty_it++; + bs->dirty_db.erase(dirty_it, std::next(dirty_end)); // FIXME: ...and clear unused part of the journal (with some interval, not for every flushed op) wait_state = 0; flusher->active_flushers--; + repeat_it = flusher->sync_to_repeat.find(cur.oid); + if (repeat_it->second != 0) + { + // Requeue version + flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }); + } + flusher->sync_to_repeat.erase(repeat_it); goto resume_0; } } diff --git a/blockstore_flush.h b/blockstore_flush.h index 10dfcf0e..18142a98 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -36,6 +36,7 @@ class journal_flusher_co std::vector::iterator it; uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos; std::map::iterator meta_it; + std::map::iterator repeat_it; std::function simple_callback; std::list::iterator cur_sync; friend class journal_flusher_t; @@ -56,10 +57,15 @@ class journal_flusher_t int active_flushers, active_until_sync; std::list syncs; -public: + std::map sync_to_repeat; + std::map meta_sectors; - std::deque flush_queue; + std::deque flush_queue; + std::map flush_versions; +public: journal_flusher_t(int flusher_count, blockstore *bs); ~journal_flusher_t(); void loop(); + void queue_flush(obj_ver_id oid); + void unshift_flush(obj_ver_id oid); }; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 53b5eaed..c8bfefce 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -139,7 +139,7 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op } dirty_it--; } while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid); - flusher->flush_queue.push_back(*v); + flusher->queue_flush(*v); } } // Acknowledge op