diff --git a/blockstore.cpp b/blockstore.cpp index 6377cd09..adfe82f1 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -229,14 +229,16 @@ void blockstore::check_wait(blockstore_operation *op) } } -int blockstore::enqueue_op(blockstore_operation *op) +void blockstore::enqueue_op(blockstore_operation *op) { - if (op->offset >= block_size || op->len >= block_size-op->offset || - (op->len % DISK_ALIGNMENT) || - (op->flags & OP_TYPE_MASK) < OP_READ || (op->flags & OP_TYPE_MASK) > OP_DELETE) + int type = op->flags & OP_TYPE_MASK; + if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) && + (op->offset >= block_size || op->len >= block_size-op->offset || (op->len % DISK_ALIGNMENT))) { // Basic verification not passed - return -EINVAL; + op->retval = -EINVAL; + op->callback(op); + return; } op->wait_for = 0; op->sync_state = 0; @@ -247,5 +249,4 @@ int blockstore::enqueue_op(blockstore_operation *op) enqueue_write(op); } ringloop->wakeup(ring_consumer); - return 0; } diff --git a/blockstore.h b/blockstore.h index ef130671..5d821615 100644 --- a/blockstore.h +++ b/blockstore.h @@ -333,5 +333,5 @@ public: bool stop(); // Submission - int enqueue_op(blockstore_operation *op); + void enqueue_op(blockstore_operation *op); }; diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 94b66b4b..bd7e7e4f 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -151,7 +151,7 @@ resume_0: wait_count = 0; clean_loc = UINT64_MAX; skip_copy = false; - do + while (1) { if (dirty_it->second.state == ST_J_STABLE && !skip_copy) { @@ -166,9 +166,9 @@ resume_0: break; if (it == v.end() || it->offset > offset) { - submit_len = it->offset >= offset+len ? len : it->offset-offset; + submit_len = it == v.end() || it->offset >= offset+len ? len : it->offset-offset; await_sqe(1); - v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); + it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; data->callback = simple_callback; my_uring_prep_readv( @@ -185,6 +185,7 @@ resume_0: else if (dirty_it->second.state == ST_D_STABLE) { // There is an unflushed big write. Copy small writes in its position + printf("found "); if (!skip_copy) { clean_loc = dirty_it->second.location; @@ -195,8 +196,16 @@ resume_0: { throw std::runtime_error("BUG: Unexpected dirty_entry state during flush: " + std::to_string(dirty_it->second.state)); } + if (dirty_it == bs->dirty_db.begin()) + { + break; + } dirty_it--; - } while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid); + if (dirty_it->first.oid != cur.oid) + { + break; + } + } if (wait_count == 0 && clean_loc == UINT64_MAX) { // Nothing to flush @@ -219,11 +228,13 @@ resume_0: if (clean_it == bs->clean_db.end()) { // Object not present at all. This is a bug. - throw std::runtime_error("BUG: Object we are trying to flush not allocated on the data device"); + throw std::runtime_error("BUG: Object we are trying to flush is not allocated on the data device"); } else clean_loc = clean_it->second.location; } + else + clean_it = bs->clean_db.end(); // Also we need to submit the metadata read. We do a read-modify-write for every operation. // But we must check if the same sector is already in memory. // Another option is to keep all raw metadata in memory all the time. Maybe I'll do it sometime... diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 07b82c90..8c2c8e17 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -334,6 +334,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .oid = je->small_write.oid, .version = je->small_write.version, }; + printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", ov.oid.inode, ov.oid.stripe, ov.version, je->small_write.offset, je->small_write.len); bs->dirty_db.emplace(ov, (dirty_entry){ .state = ST_J_SYNCED, .flags = 0, @@ -351,6 +352,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .oid = je->big_write.oid, .version = je->big_write.version, }; + printf("je_big_write oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); bs->dirty_db.emplace(ov, (dirty_entry){ .state = ST_D_META_SYNCED, .flags = 0, @@ -377,9 +379,18 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) } else { - it->second.state = (it->second.state == ST_D_META_SYNCED - ? ST_D_STABLE - : (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE)); + printf("je_stable oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); + while (1) + { + it->second.state = (it->second.state == ST_D_META_SYNCED + ? ST_D_STABLE + : (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE)); + if (it == bs->dirty_db.begin()) + break; + it--; + if (it->first.oid != ov.oid || IS_STABLE(it->second.state)) + break; + } bs->flusher->queue_flush(ov); } } diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index e6eed976..a5d299af 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -41,7 +41,7 @@ int blockstore::dequeue_stable(blockstore_operation *op) if (clean_it == clean_db.end() || clean_it->second.version < v->version) { // No such object version - op->retval = EINVAL; + op->retval = -EINVAL; op->callback(op); return 1; } diff --git a/test_blockstore.cpp b/test_blockstore.cpp index 3c6dd301..aae3d760 100644 --- a/test_blockstore.cpp +++ b/test_blockstore.cpp @@ -85,6 +85,7 @@ int main(int narg, char *args[]) blockstore_operation op; int main_state = 0; + uint64_t version = 0; ring_consumer_t main_cons; op.callback = [&](blockstore_operation *op) { @@ -93,6 +94,8 @@ int main(int narg, char *args[]) main_state = 2; else if (main_state == 3) main_state = 4; + else if (main_state == 5) + main_state = 6; }; main_cons.loop = [&]() { @@ -114,11 +117,24 @@ int main(int narg, char *args[]) } else if (main_state == 2) { - printf("syncing\n"); + printf("version %u written, syncing\n", op.version); + version = op.version; op.flags = OP_SYNC; bs->enqueue_op(&op); main_state = 3; } + else if (main_state == 4) + { + printf("stabilizing version %u\n", version); + op.flags = OP_STABLE; + op.len = 1; + *((obj_ver_id*)op.buf) = { + .oid = { .inode = 1, .stripe = 0 }, + .version = version, + }; + bs->enqueue_op(&op); + main_state = 5; + } }; ringloop->register_consumer(main_cons);