From 214da03735f3228781e4e494d4d30b891e563b7f Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 13 Nov 2019 17:45:37 +0300 Subject: [PATCH] Move flusher into a separate file --- Makefile | 2 +- blockstore.cpp | 5 + blockstore.h | 3 + blockstore_flush.cpp | 240 ++++++++++++++++++++++++++++++++++ blockstore_flush.h | 51 ++++++++ blockstore_stable.cpp | 293 +----------------------------------------- 6 files changed, 301 insertions(+), 293 deletions(-) create mode 100644 blockstore_flush.cpp create mode 100644 blockstore_flush.h diff --git a/Makefile b/Makefile index b9dac358..242f1f88 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ - blockstore_write.o blockstore_sync.o blockstore_stable.o crc32c.o ringloop.o test + blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o test clean: rm -f *.o crc32c.o: crc32c.c diff --git a/blockstore.cpp b/blockstore.cpp index de2e87c2..c354956b 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -35,10 +35,15 @@ blockstore::blockstore(spp::sparse_hash_map & config, close(journal.fd); throw e; } + int flusher_count = stoull(config["flusher_count"]); + if (!flusher_count) + flusher_count = 32; + flusher = new journal_flusher_t(flusher_count, this); } blockstore::~blockstore() { + delete flusher; free(zero_object); ringloop->unregister_consumer(ring_consumer.number); if (data_fd >= 0) diff --git a/blockstore.h b/blockstore.h index 7673334a..c7e2f149 100644 --- a/blockstore.h +++ b/blockstore.h @@ -245,6 +245,8 @@ private: #include "blockstore_init.h" +#include "blockstore_flush.h" + class blockstore { struct ring_consumer_t ring_consumer; @@ -267,6 +269,7 @@ class blockstore uint64_t data_offset, data_size, data_len; struct journal_t journal; + journal_flusher_t *flusher; ring_loop_t *ringloop; diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp new file mode 100644 index 00000000..dea46c7c --- /dev/null +++ b/blockstore_flush.cpp @@ -0,0 +1,240 @@ +#include "blockstore.h" + +journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs) +{ + this->bs = bs; + this->flusher_count = flusher_count; + this->active_flushers = 0; + co = new journal_flusher_co[flusher_count]; + for (int i = 0; i < flusher_count; i++) + { + co[i].bs = bs; + co[i].wait_state = 0; + co[i].flusher = this; + } +} + +journal_flusher_t::~journal_flusher_t() +{ + delete[] co; +} + +void journal_flusher_t::loop() +{ + if (!active_flushers && !flush_queue.size()) + { + return; + } + for (int i = 0; i < flusher_count; i++) + { + co[i].loop(); + } +} + +void journal_flusher_co::loop() +{ + // This is much better than implementing the whole function as an FSM + // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ... + if (wait_state == 1) + goto resume_1; + else if (wait_state == 2) + goto resume_2; + else if (wait_state == 3) + goto resume_3; + else if (wait_state == 4) + goto resume_4; + else if (wait_state == 5) + goto resume_5; + else if (wait_state == 6) + goto resume_6; + else if (wait_state == 7) + goto resume_7; + if (!flusher->flush_queue.size()) + return; + cur = flusher->flush_queue.front(); + flusher->flush_queue.pop_front(); + dirty_it = bs->dirty_db.find(cur); + if (dirty_it != bs->dirty_db.end()) + { + flusher->active_flushers++; + v.clear(); + wait_count = 0; + clean_loc = UINT64_MAX; + skip_copy = false; + do + { + if (dirty_it->second.state == ST_J_STABLE) + { + // First we submit all reads + offset = dirty_it->second.offset; + len = dirty_it->second.size; + it = v.begin(); + while (1) + { + for (; it != v.end(); it++) + if (it->offset >= offset) + break; + if (it == v.end() || it->offset > offset) + { + submit_len = it->offset >= offset+len ? len : it->offset-offset; + resume_1: + sqe = bs->get_sqe(); + if (!sqe) + { + // Can't submit read, ring is full + wait_state = 1; + return; + } + v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); + data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len }; + data->op = this; + io_uring_prep_readv( + sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset + ); + wait_count++; + } + if (it == v.end() || it->offset+it->len >= offset+len) + { + break; + } + } + // So subsequent stabilizers don't flush the entry again + dirty_it->second.state = ST_J_MOVE_READ_SUBMITTED; + } + else if (dirty_it->second.state == ST_D_STABLE) + { + // Copy last STABLE entry metadata + if (!skip_copy) + { + clean_loc = dirty_it->second.location; + } + skip_copy = true; + } + else if (IS_STABLE(dirty_it->second.state)) + { + break; + } + dirty_it--; + } while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid); + if (clean_loc == UINT64_MAX) + { + // Find it in clean_db + auto clean_it = bs->clean_db.find(cur.oid); + if (clean_it == bs->clean_db.end()) + { + // Object not present at all. This is a bug. + throw new std::runtime_error("BUG: Object we are trying to flush not allocated on the data device"); + } + else + clean_loc = clean_it->second.location; + } + // Also we need to submit the metadata read. We do a read-modify-write for every operation. + // But we must check if the same sector is already in memory. + // Another option is to keep all raw metadata in memory all the time. Maybe I'll do it sometime... + // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot, + // so I'll avoid it as long as I can. + meta_sector = (clean_loc / (512 / sizeof(clean_disk_entry))) * 512; + meta_pos = (clean_loc % (512 / sizeof(clean_disk_entry))); + meta_it = flusher->meta_sectors.find(meta_sector); + if (meta_it == flusher->meta_sectors.end()) + { + // Not in memory yet, read it + meta_it = flusher->meta_sectors.emplace(meta_sector, (meta_sector_t){ + .offset = meta_sector, + .len = 512, + .state = 0, // 0 = not read yet + .buf = memalign(512, 512), + .usage_count = 1, + }).first; + resume_2: + sqe = bs->get_sqe(); + if (!sqe) + { + wait_state = 2; + return; + } + data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ meta_it->second.buf, 512 }; + data->op = this; + io_uring_prep_writev( + sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector + ); + wait_count++; + } + else + meta_it->second.usage_count++; + wait_state = 3; + resume_3: + // After reads complete we submit writes + if (wait_count == 0) + { + for (it = v.begin(); it != v.end(); it++) + { + resume_4: + sqe = bs->get_sqe(); + if (!sqe) + { + // Can't submit a write, ring is full + wait_state = 4; + return; + } + data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ it->buf, (size_t)it->len }; + data->op = this; + io_uring_prep_writev( + sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset + ); + wait_count++; + } + // And a metadata write + resume_5: + if (meta_it->second.state == 0) + { + // metadata sector is still being read, wait for it + wait_state = 5; + return; + } + *((clean_disk_entry*)meta_it->second.buf + meta_pos) = { + .oid = cur.oid, + .version = cur.version, + .flags = DISK_ENTRY_STABLE, + }; + resume_6: + sqe = bs->get_sqe(); + if (!sqe) + { + // Can't submit a write, ring is full + wait_state = 6; + return; + } + data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ meta_it->second.buf, 512 }; + data->op = this; + io_uring_prep_writev( + sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector + ); + wait_count++; + wait_state = 7; + resume_7: + // Done, free all buffers + if (wait_count == 0) + { + meta_it->second.usage_count--; + if (meta_it->second.usage_count == 0) + { + free(meta_it->second.buf); + flusher->meta_sectors.erase(meta_it); + } + for (it = v.begin(); it != v.end(); it++) + { + free(it->buf); + } + v.clear(); + wait_state = 0; + flusher->active_flushers--; + } + // FIXME Now sync everything + } + } +} diff --git a/blockstore_flush.h b/blockstore_flush.h new file mode 100644 index 00000000..05f74448 --- /dev/null +++ b/blockstore_flush.h @@ -0,0 +1,51 @@ +struct copy_buffer_t +{ + uint64_t offset, len; + void *buf; +}; + +struct meta_sector_t +{ + uint64_t offset, len; + int state; + void *buf; + int usage_count; +}; + +class journal_flusher_t; + +// Journal flusher coroutine +class journal_flusher_co +{ + blockstore *bs; + journal_flusher_t *flusher; + int wait_state, wait_count; + struct io_uring_sqe *sqe; + struct ring_data_t *data; + bool skip_copy; + obj_ver_id cur; + std::map::iterator dirty_it; + std::vector v; + std::vector::iterator it; + uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos; + std::map::iterator meta_it; + friend class journal_flusher_t; +public: + void loop(); +}; + +// Journal flusher itself +class journal_flusher_t +{ + int flusher_count; + int active_flushers; + journal_flusher_co *co; + blockstore *bs; + friend class journal_flusher_co; +public: + std::map meta_sectors; + std::deque flush_queue; + journal_flusher_t(int flusher_count, blockstore *bs); + ~journal_flusher_t(); + void loop(); +}; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 3904fd36..2e926a58 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -148,7 +148,7 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op } dirty_it--; } while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid); - flusher.flush_queue.push_back(*v); + flusher->flush_queue.push_back(*v); } } // Acknowledge op @@ -156,294 +156,3 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op op->callback(op); } } - -struct copy_buffer_t -{ - uint64_t offset, len; - void *buf; -}; - -struct meta_sector_t -{ - uint64_t offset, len; - int state; - void *buf; - int usage_count; -}; - -class journal_flusher_t; - -// Journal flusher coroutine -class journal_flusher_co -{ - blockstore *bs; - journal_flusher_t *flusher; - int wait_state, wait_count; - struct io_uring_sqe *sqe; - struct ring_data_t *data; - bool skip_copy; - obj_ver_id cur; - std::map::iterator dirty_it; - std::vector v; - std::vector::iterator it; - uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos; - std::map::iterator meta_it; - friend class journal_flusher_t; -public: - void loop(); -}; - -// Journal flusher itself -class journal_flusher_t -{ - int flusher_count; - int active_flushers; - journal_flusher_co *co; - blockstore *bs; - friend class journal_flusher_co; -public: - std::map meta_sectors; - std::deque flush_queue; - journal_flusher_t(int flusher_count, blockstore *bs); - ~journal_flusher_t(); - void loop(); -}; - -journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs) -{ - this->bs = bs; - this->flusher_count = flusher_count; - this->active_flushers = 0; - co = new journal_flusher_co[flusher_count]; - for (int i = 0; i < flusher_count; i++) - { - co[i].bs = bs; - co[i].wait_state = 0; - co[i].flusher = this; - } -} - -journal_flusher_t::~journal_flusher_t() -{ - delete[] co; -} - -void journal_flusher_t::loop() -{ - if (!active_flushers && !flush_queue.size()) - { - return; - } - for (int i = 0; i < flusher_count; i++) - { - co[i].loop(); - } -} - -void journal_flusher_co::loop() -{ - // This is much better than implementing the whole function as an FSM - // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ... - if (wait_state == 1) - goto resume_1; - else if (wait_state == 2) - goto resume_2; - else if (wait_state == 3) - goto resume_3; - else if (wait_state == 4) - goto resume_4; - else if (wait_state == 5) - goto resume_5; - else if (wait_state == 6) - goto resume_6; - else if (wait_state == 7) - goto resume_7; - if (!flusher->flush_queue.size()) - return; - cur = flusher->flush_queue.front(); - flusher->flush_queue.pop_front(); - dirty_it = bs->dirty_db.find(cur); - if (dirty_it != bs->dirty_db.end()) - { - flusher->active_flushers++; - v.clear(); - wait_count = 0; - clean_loc = UINT64_MAX; - skip_copy = false; - do - { - if (dirty_it->second.state == ST_J_STABLE) - { - // First we submit all reads - offset = dirty_it->second.offset; - len = dirty_it->second.size; - it = v.begin(); - while (1) - { - for (; it != v.end(); it++) - if (it->offset >= offset) - break; - if (it == v.end() || it->offset > offset) - { - submit_len = it->offset >= offset+len ? len : it->offset-offset; - resume_1: - sqe = bs->get_sqe(); - if (!sqe) - { - // Can't submit read, ring is full - wait_state = 1; - return; - } - v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); - data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len }; - data->op = this; - io_uring_prep_readv( - sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset - ); - wait_count++; - } - if (it == v.end() || it->offset+it->len >= offset+len) - { - break; - } - } - // So subsequent stabilizers don't flush the entry again - dirty_it->second.state = ST_J_MOVE_READ_SUBMITTED; - } - else if (dirty_it->second.state == ST_D_STABLE) - { - // Copy last STABLE entry metadata - if (!skip_copy) - { - clean_loc = dirty_it->second.location; - } - skip_copy = true; - } - else if (IS_STABLE(dirty_it->second.state)) - { - break; - } - dirty_it--; - } while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid); - if (clean_loc == UINT64_MAX) - { - // Find it in clean_db - auto clean_it = bs->clean_db.find(cur.oid); - if (clean_it == bs->clean_db.end()) - { - // Object not present at all. This is a bug. - throw new std::runtime_error("BUG: Object we are trying to flush not allocated on the data device"); - } - else - clean_loc = clean_it->second.location; - } - // Also we need to submit the metadata read. We do a read-modify-write for every operation. - // But we must check if the same sector is already in memory. - // Another option is to keep all raw metadata in memory all the time. Maybe I'll do it sometime... - // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot, - // so I'll avoid it as long as I can. - meta_sector = (clean_loc / (512 / sizeof(clean_disk_entry))) * 512; - meta_pos = (clean_loc % (512 / sizeof(clean_disk_entry))); - meta_it = flusher->meta_sectors.find(meta_sector); - if (meta_it == flusher->meta_sectors.end()) - { - // Not in memory yet, read it - meta_it = flusher->meta_sectors.emplace(meta_sector, (meta_sector_t){ - .offset = meta_sector, - .len = 512, - .state = 0, // 0 = not read yet - .buf = memalign(512, 512), - .usage_count = 1, - }).first; - resume_2: - sqe = bs->get_sqe(); - if (!sqe) - { - wait_state = 2; - return; - } - data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ meta_it->second.buf, 512 }; - data->op = this; - io_uring_prep_writev( - sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector - ); - wait_count++; - } - else - meta_it->second.usage_count++; - wait_state = 3; - resume_3: - // After reads complete we submit writes - if (wait_count == 0) - { - for (it = v.begin(); it != v.end(); it++) - { - resume_4: - sqe = bs->get_sqe(); - if (!sqe) - { - // Can't submit a write, ring is full - wait_state = 4; - return; - } - data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ it->buf, (size_t)it->len }; - data->op = this; - io_uring_prep_writev( - sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset - ); - wait_count++; - } - // And a metadata write - resume_5: - if (meta_it->second.state == 0) - { - // metadata sector is still being read, wait for it - wait_state = 5; - return; - } - *((clean_disk_entry*)meta_it->second.buf + meta_pos) = { - .oid = cur.oid, - .version = cur.version, - .flags = DISK_ENTRY_STABLE, - }; - resume_6: - sqe = bs->get_sqe(); - if (!sqe) - { - // Can't submit a write, ring is full - wait_state = 6; - return; - } - data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ meta_it->second.buf, 512 }; - data->op = this; - io_uring_prep_writev( - sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector - ); - wait_count++; - wait_state = 7; - resume_7: - // Done, free all buffers - if (wait_count == 0) - { - meta_it->second.usage_count--; - if (meta_it->second.usage_count == 0) - { - free(meta_it->second.buf); - flusher->meta_sectors.erase(meta_it); - } - for (it = v.begin(); it != v.end(); it++) - { - free(it->buf); - } - v.clear(); - wait_state = 0; - flusher->active_flushers--; - } - // FIXME Now sync everything - } - } -}