diff --git a/blockstore.cpp b/blockstore.cpp index 641b4333..4c5b4d84 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -108,18 +108,30 @@ void blockstore::loop() continue_sync(*cur_sync++); } auto cur = submit_queue.begin(); - bool has_writes = false; + int has_writes = 0; while (cur != submit_queue.end()) { auto op_ptr = cur; auto op = *(cur++); + // FIXME: This needs some simplification + // Writes should not block reads if the ring is not null and if reads don't depend on them + // In all other cases we should stop submission if (op->wait_for) { check_wait(op); if (op->wait_for == WAIT_SQE) + { break; + } else if (op->wait_for) + { + if ((op->flags & OP_TYPE_MASK) == OP_WRITE || + (op->flags & OP_TYPE_MASK) == OP_DELETE) + { + has_writes = 2; + } continue; + } } unsigned ring_space = io_uring_sq_space_left(&ringloop->ring); unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail; @@ -131,8 +143,13 @@ void blockstore::loop() else if ((op->flags & OP_TYPE_MASK) == OP_WRITE || (op->flags & OP_TYPE_MASK) == OP_DELETE) { - has_writes = true; + if (has_writes == 2) + { + // Some writes could not be submitted + break; + } dequeue_op = dequeue_write(op); + has_writes = dequeue_op ? 1 : 2; } else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) { @@ -229,7 +246,7 @@ void blockstore::check_wait(blockstore_operation *op) } else if (op->wait_for == WAIT_JOURNAL) { - if (journal.used_start < op->wait_detail) + if (journal.used_start == op->wait_detail) { // do not submit return; diff --git a/blockstore.h b/blockstore.h index 7240f51d..9cf707c0 100644 --- a/blockstore.h +++ b/blockstore.h @@ -312,6 +312,7 @@ class blockstore int dequeue_sync(blockstore_operation *op); void handle_sync_event(ring_data_t *data, blockstore_operation *op); int continue_sync(blockstore_operation *op); + void ack_one_sync(blockstore_operation *op); int ack_sync(blockstore_operation *op); // Stabilize diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 0e46c63c..318bcd01 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -63,7 +63,8 @@ void journal_flusher_t::queue_flush(obj_ver_id ov) auto it = flush_versions.find(ov.oid); if (it != flush_versions.end()) { - it->second = ov.version; + if (it->second < ov.version) + it->second = ov.version; } else { @@ -77,7 +78,8 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov) auto it = flush_versions.find(ov.oid); if (it != flush_versions.end()) { - it->second = ov.version; + if (it->second < ov.version) + it->second = ov.version; } else { @@ -199,7 +201,12 @@ resume_0: } else if (!IS_STABLE(dirty_it->second.state)) { - throw std::runtime_error("BUG: Unexpected dirty_entry state during flush: " + std::to_string(dirty_it->second.state)); + char err[1024]; + snprintf( + err, 1024, "BUG: Unexpected dirty_entry %lu:%lu v%lu state during flush: %d", + dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state + ); + throw std::runtime_error(err); } if (dirty_it == bs->dirty_db.begin()) { @@ -217,7 +224,7 @@ resume_0: flusher->active_flushers--; flusher->active_until_sync--; repeat_it = flusher->sync_to_repeat.find(cur.oid); - if (repeat_it->second != 0) + if (repeat_it->second > cur.version) { // Requeue version flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }); @@ -475,12 +482,12 @@ resume_0: wait_state = 0; flusher->active_flushers--; repeat_it = flusher->sync_to_repeat.find(cur.oid); - if (repeat_it->second != 0) + if (repeat_it->second > cur.version) { // Requeue version flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }); - flusher->sync_to_repeat.erase(repeat_it); } + flusher->sync_to_repeat.erase(repeat_it); goto resume_0; } } diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index 047dccce..0c5451e7 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -55,9 +55,9 @@ int blockstore_journal_check_t::check_available(blockstore_operation *op, int re } if (!right_dir && next_pos >= bs->journal.used_start-512) { - // No space in the journal. Wait for it. + // No space in the journal. Wait until used_start changes. op->wait_for = WAIT_JOURNAL; - op->wait_detail = next_pos; + op->wait_detail = bs->journal.used_start; return 0; } return 1; diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 133d1ac4..44edff5f 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -140,6 +140,7 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op) if (op->sync_state == SYNC_DATA_SYNC_SENT) { op->sync_state = SYNC_DATA_SYNC_DONE; + // FIXME: This is not needed, in fact for (auto it = op->sync_big_writes.begin(); it != op->sync_big_writes.end(); it++) { dirty_db[*it].state = ST_D_SYNCED; @@ -148,18 +149,6 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op) else if (op->sync_state == SYNC_JOURNAL_SYNC_SENT) { op->sync_state = SYNC_DONE; - for (auto it = op->sync_big_writes.begin(); it != op->sync_big_writes.end(); it++) - { - auto & unstab = unstable_writes[it->oid]; - unstab = !unstab || unstab > it->version ? it->version : unstab; - dirty_db[*it].state = ST_D_META_SYNCED; - } - for (auto it = op->sync_small_writes.begin(); it != op->sync_small_writes.end(); it++) - { - auto & unstab = unstable_writes[it->oid]; - unstab = !unstab || unstab > it->version ? it->version : unstab; - dirty_db[*it].state = ST_J_SYNCED; - } ack_sync(op); } else @@ -177,6 +166,8 @@ int blockstore::ack_sync(blockstore_operation *op) auto it = op->in_progress_ptr; int done_syncs = 1; ++it; + // Acknowledge sync + ack_one_sync(op); while (it != in_progress_syncs.end()) { auto & next_sync = *it++; @@ -185,16 +176,30 @@ int blockstore::ack_sync(blockstore_operation *op) { done_syncs++; // Acknowledge next_sync - in_progress_syncs.erase(next_sync->in_progress_ptr); - next_sync->retval = 0; - next_sync->callback(next_sync); + ack_one_sync(next_sync); } } - // Acknowledge sync - in_progress_syncs.erase(op->in_progress_ptr); - op->retval = 0; - op->callback(op); return 1; } return 0; } + +void blockstore::ack_one_sync(blockstore_operation *op) +{ + // Handle states + for (auto it = op->sync_big_writes.begin(); it != op->sync_big_writes.end(); it++) + { + auto & unstab = unstable_writes[it->oid]; + unstab = unstab < it->version ? it->version : unstab; + dirty_db[*it].state = ST_D_META_SYNCED; + } + for (auto it = op->sync_small_writes.begin(); it != op->sync_small_writes.end(); it++) + { + auto & unstab = unstable_writes[it->oid]; + unstab = unstab < it->version ? it->version : unstab; + dirty_db[*it].state = ST_J_SYNCED; + } + in_progress_syncs.erase(op->in_progress_ptr); + op->retval = 0; + op->callback(op); +} diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 4a1ee476..2e6ef284 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -41,24 +41,6 @@ void blockstore::enqueue_write(blockstore_operation *op) .len = op->len, .journal_sector = 0, }); - // Remember write as unsynced here, so external consumers could get - // the list of dirty objects to sync just before issuing a SYNC request - if (op->len == block_size || op->version == 1) - { - // Remember big write as unsynced - unsynced_big_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); - } - else - { - // Remember small write as unsynced - unsynced_small_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); - } } // First step of the write algorithm: dequeue operation and submit initial write(s) @@ -109,6 +91,11 @@ int blockstore::dequeue_write(blockstore_operation *op) ); op->pending_ops = 1; op->min_used_journal_sector = op->max_used_journal_sector = 0; + // Remember big write as unsynced + unsynced_big_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); } else { @@ -154,6 +141,11 @@ int blockstore::dequeue_write(blockstore_operation *op) dirty_it->second.state = ST_J_SUBMITTED; journal.next_free += op->len; op->pending_ops = 2; + // Remember small write as unsynced + unsynced_small_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); } return 1; } diff --git a/fio_engine.cpp b/fio_engine.cpp index 4d92eb08..f7cd7095 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -178,7 +178,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) .version = it->second, }; } - bsd->bs->enqueue_op(op); + bsd->bs->unstable_writes.clear(); op->callback = [io, n](blockstore_operation *op) { io->error = op->retval < 0 ? -op->retval : 0; @@ -191,6 +191,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval); delete op; }; + bsd->bs->enqueue_op(op); } else {