From 4afa95b0e3a7bdd9f486eccfea8c499632139668 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 12 Nov 2019 18:16:03 +0300 Subject: [PATCH] FSM is a dreadful unreadable thing, reimplement using gotos --- blockstore.cpp | 5 + blockstore.h | 11 +- blockstore_stable.cpp | 258 ++++++++++++++++++++++-------------------- blockstore_write.cpp | 2 +- 4 files changed, 150 insertions(+), 126 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index e2734352..b288294d 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -83,6 +83,11 @@ void blockstore::handle_event(ring_data_t *data) { handle_stable_event(data, op); } + else if ((op->flags & OP_TYPE_MASK) == OP_INTERNAL_FLUSH) + { + // Operation is not a blockstore_operation at all + + } } } diff --git a/blockstore.h b/blockstore.h index 283b77c2..ae7c646a 100644 --- a/blockstore.h +++ b/blockstore.h @@ -33,8 +33,9 @@ #define ST_J_WRITTEN 3 #define ST_J_SYNCED 4 #define ST_J_STABLE 5 -#define ST_J_MOVED 6 -#define ST_J_MOVE_SYNCED 7 +#define ST_J_MOVE_READ_SUBMITTED 6 +#define ST_J_MOVE_WRITE_SUBMITTED 7 +#define ST_J_MOVE_SYNCED 8 #define ST_D_SUBMITTED 16 #define ST_D_WRITTEN 17 @@ -183,6 +184,7 @@ public: #define OP_SYNC 3 #define OP_STABLE 4 #define OP_DELETE 5 +#define OP_INTERNAL_FLUSH 6 #define OP_TYPE_MASK 0x7 // Suspend operation until there are more free SQEs @@ -196,9 +198,10 @@ public: struct blockstore_operation { - std::function callback; // flags contain operation type and possibly other flags - uint32_t flags; + uint64_t flags; + // finish callback + std::function callback; // For reads, writes & deletes: oid is the requested object object_id oid; // For reads: version=0 -> last stable, version=UINT64_MAX -> last unstable, version=X -> specific version diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 7ce8ae05..efe152ee 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -156,154 +156,170 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op } } -struct offset_len +struct copy_buffer_t { uint64_t offset, len; + void *buf; }; class journal_flusher_t { blockstore *bs; - int state; + int wait_state, wait_count; + struct io_uring_sqe *sqe; + struct ring_data_t *data; + bool skip_copy; obj_ver_id cur; std::map::iterator dirty_it; - std::vector v; - std::vector::iterator it; - uint64_t offset, len; + std::vector v; + std::vector::iterator it; + uint64_t offset, len, submit_len, clean_loc; + bool allocated; public: - journal_flusher_t(); + journal_flusher_t(int flush_count); std::deque flush_queue; - void stabilize_object_loop(); + void loop(); }; -#define F_NEXT_OBJ 0 -#define F_NEXT_VER 1 -#define F_FIND_POS 2 -#define F_SUBMIT_FULL 3 -#define F_SUBMIT_PART 4 -#define F_CUT_OFFSET 5 -#define F_FINISH_VER 6 - -journal_flusher_t::journal_flusher_t() +journal_flusher_t::journal_flusher_t(int flusher_count) { - state = F_NEXT_OBJ; } -// It would be prettier as a coroutine (maybe https://github.com/hnes/libaco ?) -// Now it's a state machine -void journal_flusher_t::stabilize_object_loop() +void journal_flusher_t::loop() { -begin: - if (state == F_NEXT_OBJ) + // This is much better than implementing the whole function as an FSM + // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ... + if (wait_state == 1) + goto resume_1; + else if (wait_state == 3) + goto resume_3; + else if (wait_state == 4) + goto resume_4; + else if (wait_state == 5) + goto resume_5; + if (!flush_queue.size()) + return; + cur = flush_queue.front(); + flush_queue.pop_front(); + dirty_it = bs->dirty_db.find(cur); + if (dirty_it != bs->dirty_db.end()) { - // Pick next object - if (!flush_queue.size()) - return; - while (1) + v.clear(); + wait_count = 0; + clean_loc = UINT64_MAX; + allocated = false; + skip_copy = false; + do { - cur = flush_queue.front(); - flush_queue.pop_front(); - dirty_it = bs->dirty_db.find(cur); - if (dirty_it != bs->dirty_db.end()) + if (dirty_it->second.state == ST_J_STABLE) + { + // First we submit all reads + offset = dirty_it->second.offset; + len = dirty_it->second.size; + it = v.begin(); + while (1) + { + for (; it != v.end(); it++) + if (it->offset >= offset) + break; + if (it == v.end() || it->offset > offset) + { + submit_len = it->offset >= offset+len ? len : it->offset-offset; + resume_1: + sqe = bs->get_sqe(); + if (!sqe) + { + // Can't submit read, ring is full + wait_state = 1; + return; + } + v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); + data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len }; + data->op = this; + io_uring_prep_readv( + sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset + ); + wait_count++; + } + if (it == v.end() || it->offset+it->len >= offset+len) + { + break; + } + } + // So subsequent stabilizers don't flush the entry again + dirty_it->second.state = ST_J_READ_SUBMITTED; + } + else if (dirty_it->second.state == ST_D_STABLE) + { + // Copy last STABLE entry metadata + if (!skip_copy) + { + clean_loc = dirty_it->second.location; + } + skip_copy = true; + } + else if (IS_STABLE(dirty_it->second.state)) { - state = F_NEXT_VER; - v.clear(); break; } - else if (flush_queue.size() == 0) - return; - } - } - if (state == F_NEXT_VER) - { - if (dirty_it->second.state == ST_J_STABLE) + dirty_it--; + } while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid); + if (clean_loc == UINT64_MAX) { - offset = dirty_it->second.offset; - len = dirty_it->second.size; - it = v.begin(); - state = F_FIND_POS; - } - else if (dirty_it->second.state == ST_D_STABLE) - { - - state = F_NEXT_OBJ; - } - else if (IS_STABLE(dirty_it->second.state)) - { - state = F_NEXT_OBJ; - } - else - state = F_FINISH_VER; - } - if (state == F_FIND_POS) - { - for (; it != v.end(); it++) - if (it->offset >= offset) - break; - if (it == v.end() || it->offset >= offset+len) - { - state = F_SUBMIT_FULL; - } - else - { - if (it->offset > offset) - state = F_SUBMIT_PART; + // Find it in clean_db + auto clean_it = bs->clean_db.find(cur.oid); + if (clean_it == bs->clean_db.end()) + { + // Object not present at all. We must allocate and zero it. + clean_loc = allocator_find_free(bs->data_alloc); + if (clean_loc == UINT64_MAX) + { + throw new std::runtime_error("No space on the data device while trying to flush journal"); + } + // This is an interesting part. Flushing journal results in an allocation we don't know where to put O_o. + allocator_set(bs->data_alloc, clean_loc, true); + allocated = true; + } else - state = F_CUT_OFFSET; + clean_loc = clean_it->second.location; } - } - if (state == F_SUBMIT_FULL) - { - struct io_uring_sqe *sqe = get_sqe(); - if (!sqe) - return; - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); - data->iov = (struct iovec){ malloc(len), len }; - data->op = op; // FIXME OOPS - io_uring_prep_readv( - sqe, journal_fd, &data->iov, 1, journal_offset + dirty_it->second.location + offset - ); - op->pending_ops = 1; - v.insert(it, (offset_len){ .offset = offset, .len = len }); - state = F_SUBMIT_FULL_WRITE; - return; - } - if (state == F_SUBMIT_FULL_WRITE) - { - struct io_uring_sqe *sqe = get_sqe(); - if (!sqe) - return; - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); - - } - if (state == F_SUBMIT_PART) - { - if (!can_submit) + wait_state = 3; + resume_3: + // After reads complete we submit writes + if (wait_count == 0) { - return; + for (it = v.begin(); it != v.end(); it++) + { + resume_4: + sqe = bs->get_sqe(); + if (!sqe) + { + // Can't submit a write, ring is full + wait_state = 4; + return; + } + data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ it->buf, (size_t)it->len }; + data->op = this; + io_uring_prep_writev( + sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset + ); + wait_count++; + } + wait_state = 5; + resume_5: + // Done, free all buffers + if (wait_count == 0) + { + for (it = v.begin(); it != v.end(); it++) + { + free(it->buf); + } + v.clear(); + wait_state = 0; + } } - v.insert(it, (offset_len){ .offset = offset, .len = it->offset-offset }); - state = F_CUT_OFFSET; } - if (state == F_CUT_OFFSET) - { - if (offset+len > it->offset+it->len) - { - len = offset+len - (it->offset+it->len); - offset = it->offset+it->len; - state = F_FIND_POS; - } - else - state = F_FINISH_VER; - } - if (state == F_FINISH_VER) - { - dirty_it--; - if (dirty_it == bs->dirty_db.begin() || dirty_it->first.oid != cur.oid) - state = F_NEXT_OBJ; - else - state = F_NEXT_VER; - } - goto begin; } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 2d3cf120..a8f310cf 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -66,7 +66,7 @@ int blockstore::dequeue_write(blockstore_operation *op) { // Big (redirect) write uint64_t loc = allocator_find_free(data_alloc); - if (loc == (uint64_t)-1) + if (loc == UINT64_MAX) { // no space op->retval = -ENOSPC;