From f1e236c6e85b096c23f898355782ce6a3a886eb8 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 14 Nov 2019 02:29:34 +0300 Subject: [PATCH] Sync metadata & data after copying from journal --- blockstore_flush.cpp | 230 +++++++++++++++++++++++++----------------- blockstore_flush.h | 17 +++- blockstore_stable.cpp | 5 - blockstore_sync.cpp | 2 - blockstore_write.cpp | 1 - 5 files changed, 152 insertions(+), 103 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 865dbd8c..40604920 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -5,15 +5,30 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs) this->bs = bs; this->flusher_count = flusher_count; this->active_flushers = 0; + this->active_until_sync = 0; + this->sync_required = true; + this->sync_threshold = flusher_count == 1 ? 1 : flusher_count/2; co = new journal_flusher_co[flusher_count]; for (int i = 0; i < flusher_count; i++) { co[i].bs = bs; - co[i].wait_state = 0; co[i].flusher = this; } } +journal_flusher_co::journal_flusher_co() +{ + wait_state = 0; + simple_callback = [this](ring_data_t* data) + { + if (data->res < 0) + { + throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); + } + wait_count--; + }; +} + journal_flusher_t::~journal_flusher_t() { delete[] co; @@ -31,6 +46,16 @@ void journal_flusher_t::loop() } } +#define await_sqe(label) \ + resume_##label:\ + sqe = bs->get_sqe();\ + if (!sqe)\ + {\ + wait_state = label;\ + return;\ + }\ + data = ((ring_data_t*)sqe->user_data); + void journal_flusher_co::loop() { // This is much better than implementing the whole function as an FSM @@ -49,6 +74,15 @@ void journal_flusher_co::loop() goto resume_6; else if (wait_state == 7) goto resume_7; + else if (wait_state == 8) + goto resume_8; + else if (wait_state == 9) + goto resume_9; + else if (wait_state == 10) + goto resume_10; + else if (wait_state == 11) + goto resume_11; +resume_0: if (!flusher->flush_queue.size()) return; cur = flusher->flush_queue.front(); @@ -57,6 +91,7 @@ void journal_flusher_co::loop() if (dirty_it != bs->dirty_db.end()) { flusher->active_flushers++; + flusher->active_until_sync++; v.clear(); wait_count = 0; clean_loc = UINT64_MAX; @@ -77,21 +112,10 @@ void journal_flusher_co::loop() if (it == v.end() || it->offset > offset) { submit_len = it->offset >= offset+len ? len : it->offset-offset; - resume_1: - sqe = bs->get_sqe(); - if (!sqe) - { - // Can't submit read, ring is full - wait_state = 1; - return; - } + await_sqe(1); v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); - data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len }; - data->callback = [this](ring_data_t* data) - { - wait_count--; - }; + data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; + data->callback = simple_callback; io_uring_prep_readv( sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset ); @@ -150,17 +174,14 @@ void journal_flusher_co::loop() .buf = memalign(512, 512), .usage_count = 1, }).first; - resume_2: - sqe = bs->get_sqe(); - if (!sqe) - { - wait_state = 2; - return; - } - data = ((ring_data_t*)sqe->user_data); + await_sqe(2); data->iov = (struct iovec){ meta_it->second.buf, 512 }; data->callback = [this](ring_data_t* data) { + if (data->res < 0) + { + throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); + } meta_it->second.state = 1; wait_count--; }; @@ -173,80 +194,103 @@ void journal_flusher_co::loop() meta_it->second.usage_count++; wait_state = 3; resume_3: - // After reads complete we submit writes - if (wait_count == 0) + if (wait_count > 0) + return; + // Reads completed, submit writes + for (it = v.begin(); it != v.end(); it++) { - for (it = v.begin(); it != v.end(); it++) - { - resume_4: - sqe = bs->get_sqe(); - if (!sqe) - { - // Can't submit a write, ring is full - wait_state = 4; - return; - } - data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ it->buf, (size_t)it->len }; - data->callback = [this](ring_data_t* data) - { - wait_count--; - }; - io_uring_prep_writev( - sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset - ); - wait_count++; - } - // And a metadata write - resume_5: - if (meta_it->second.state == 0) - { - // metadata sector is still being read, wait for it - wait_state = 5; - return; - } - *((clean_disk_entry*)meta_it->second.buf + meta_pos) = { - .oid = cur.oid, - .version = cur.version, - }; - resume_6: - sqe = bs->get_sqe(); - if (!sqe) - { - // Can't submit a write, ring is full - wait_state = 6; - return; - } - data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ meta_it->second.buf, 512 }; - data->callback = [this](ring_data_t* data) - { - wait_count--; - }; + await_sqe(4); + data->iov = (struct iovec){ it->buf, (size_t)it->len }; + data->callback = simple_callback; io_uring_prep_writev( - sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector + sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset ); wait_count++; - wait_state = 7; - resume_7: - // Done, free all buffers - if (wait_count == 0) - { - meta_it->second.usage_count--; - if (meta_it->second.usage_count == 0) - { - free(meta_it->second.buf); - flusher->meta_sectors.erase(meta_it); - } - for (it = v.begin(); it != v.end(); it++) - { - free(it->buf); - } - v.clear(); - wait_state = 0; - flusher->active_flushers--; - } - // FIXME Now sync everything } + // And a metadata write + resume_5: + if (meta_it->second.state == 0) + { + // metadata sector is still being read, wait for it + wait_state = 5; + return; + } + *((clean_disk_entry*)meta_it->second.buf + meta_pos) = { + .oid = cur.oid, + .version = cur.version, + }; + // I consider unordered writes to data & metadata safe here, because + // "dirty" entries always override "clean" entries in our case + await_sqe(6); + data->iov = (struct iovec){ meta_it->second.buf, 512 }; + data->callback = simple_callback; + io_uring_prep_writev( + sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector + ); + wait_count++; + wait_state = 7; + resume_7: + if (wait_count > 0) + return; + // Done, free all buffers + meta_it->second.usage_count--; + if (meta_it->second.usage_count == 0) + { + free(meta_it->second.buf); + flusher->meta_sectors.erase(meta_it); + } + for (it = v.begin(); it != v.end(); it++) + { + free(it->buf); + } + v.clear(); + flusher->active_until_sync--; + if (flusher->sync_required) + { + // And sync everything (in batches - not per each operation!) + cur_sync = flusher->syncs.end(); + if (cur_sync == flusher->syncs.begin()) + cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 }); + else + cur_sync--; + cur_sync->ready_count++; + if (cur_sync->ready_count >= flusher->sync_threshold || + !flusher->active_until_sync && !flusher->flush_queue.size()) + { + // Sync batch is ready. Do it. + await_sqe(9); + data->callback = simple_callback; + io_uring_prep_fsync(sqe, bs->data_fd, 0); + wait_count++; + if (bs->meta_fd != bs->data_fd) + { + await_sqe(10); + data->callback = simple_callback; + io_uring_prep_fsync(sqe, bs->meta_fd, 0); + wait_count++; + } + wait_state = 11; + resume_11: + if (wait_count > 0) + return; + // Sync completed. All previous coroutines waiting for it must be resumed + cur_sync->state = 1; + } + // Wait until someone else sends and completes a sync. + resume_8: + if (!cur_sync->state) + { + wait_state = 8; + return; + } + cur_sync->ready_count--; + if (cur_sync->ready_count == 0) + { + flusher->syncs.erase(cur_sync); + } + } + wait_state = 0; + flusher->active_flushers--; + goto resume_0; } } diff --git a/blockstore_flush.h b/blockstore_flush.h index 31d37801..9e1851e5 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -12,10 +12,16 @@ struct meta_sector_t int usage_count; }; +struct flusher_sync_t +{ + int ready_count; + int state; +}; + class journal_flusher_t; // Journal flusher coroutine -struct journal_flusher_co +class journal_flusher_co { blockstore *bs; journal_flusher_t *flusher; @@ -29,8 +35,11 @@ struct journal_flusher_co std::vector::iterator it; uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos; std::map::iterator meta_it; + std::function simple_callback; + std::list::iterator cur_sync; friend class journal_flusher_t; public: + journal_flusher_co(); void loop(); }; @@ -38,10 +47,14 @@ public: class journal_flusher_t { int flusher_count; - int active_flushers; + int sync_threshold; + bool sync_required; journal_flusher_co *co; blockstore *bs; friend class journal_flusher_co; + + int active_flushers, active_until_sync; + std::list syncs; public: std::map meta_sectors; std::deque flush_queue; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 7bf54c45..53b5eaed 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -2,9 +2,6 @@ // Stabilize small write: // 1) Copy data from the journal to the data device -// Sync it before writing metadata if we want to keep metadata consistent -// Overall it's optional because it can be replayed from the journal until -// it's cleared, and reads are also fulfilled from the journal // 2) Increase version on the metadata device and sync it // 3) Advance clean_db entry's version, clear previous journal entries // @@ -112,8 +109,6 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op { if (data->res < 0) { - // sync error - // FIXME: our state becomes corrupted after a write error. maybe do something better than just die throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); } op->pending_ops--; diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 63e991bb..bf3b45c3 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -111,8 +111,6 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op) { if (data->res < 0) { - // sync error - // FIXME: our state becomes corrupted after a write error. maybe do something better than just die throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); } op->pending_ops--; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index f3c59d85..c21178cc 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -149,7 +149,6 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op) { if (data->res < 0) { - // write error // FIXME: our state becomes corrupted after a write error. maybe do something better than just die throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); }