diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 1cc8fcf8..4b2b857e 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -151,8 +151,8 @@ void blockstore_impl_t::loop() { if (has_writes == 2) { - // Some writes could not be submitted - break; + // Some writes already could not be submitted + continue; } dequeue_op = dequeue_write(op); has_writes = dequeue_op ? 1 : 2; @@ -161,8 +161,8 @@ void blockstore_impl_t::loop() { if (has_writes == 2) { - // Some writes could not be submitted - break; + // Some writes already could not be submitted + continue; } dequeue_op = dequeue_del(op); has_writes = dequeue_op ? 1 : 2; @@ -182,33 +182,19 @@ void blockstore_impl_t::loop() } else if (op->opcode == BS_OP_STABLE) { - if (has_writes == 2) - { - // Don't submit additional flushes before completing previous LISTs - break; - } dequeue_op = dequeue_stable(op); } else if (op->opcode == BS_OP_ROLLBACK) { - if (has_writes == 2) - { - // Don't submit additional flushes before completing previous LISTs - break; - } dequeue_op = dequeue_rollback(op); } else if (op->opcode == BS_OP_LIST) { - // Block LIST operation by previous modifications, - // so it always returns a consistent state snapshot - if (has_writes == 2 || inflight_writes > 0) - has_writes = 2; - else - { - process_list(op); - dequeue_op = true; - } + // LIST doesn't need to be blocked by previous modifications, + // it only needs to include all in-progress writes as they're guaranteed + // to be readable and stabilizable/rollbackable by subsequent operations + process_list(op); + dequeue_op = true; } if (dequeue_op) { diff --git a/blockstore_impl.h b/blockstore_impl.h index 3a6a5d9b..3f8b5c50 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -226,7 +226,6 @@ class blockstore_impl_t bool live = false, queue_stall = false; ring_loop_t *ringloop; - int inflight_writes = 0; bool stop_sync_submitted; diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index 39174e9b..a9be5f03 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -100,10 +100,11 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries { // No space in the journal. Wait until used_start changes. printf( - "Ran out of journal space (free space: %lu bytes)\n", + "Ran out of journal space (free space: %lu bytes, sectors to write: %d)\n", (bs->journal.next_free >= bs->journal.used_start ? bs->journal.len-bs->journal.block_size - (bs->journal.next_free-bs->journal.used_start) - : bs->journal.used_start - bs->journal.next_free) + : bs->journal.used_start - bs->journal.next_free), + sectors_required ); PRIV(op)->wait_for = WAIT_JOURNAL; bs->flusher->request_trim(); diff --git a/blockstore_journal.h b/blockstore_journal.h index d0c06769..bda2cd77 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -10,6 +10,8 @@ #define JOURNAL_BUFFER_SIZE 4*1024*1024 // We reserve some extra space for future stabilize requests during writes +// FIXME: This value should be dynamic i.e. Blockstore ideally shouldn't allow +// writing more than can be stabilized afterwards #define JOURNAL_STABILIZE_RESERVATION 65536 // Journal entries diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp index e0074a66..35f2ddfa 100644 --- a/blockstore_rollback.cpp +++ b/blockstore_rollback.cpp @@ -40,7 +40,12 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) } while (dirty_it->first.oid == v->oid && dirty_it->first.version > v->version) { - if (!IS_SYNCED(dirty_it->second.state) || + if (IS_IN_FLIGHT(dirty_it->second.state)) + { + // Object write is still in progress. Wait until the write request completes + return 0; + } + else if (!IS_SYNCED(dirty_it->second.state) || IS_STABLE(dirty_it->second.state)) { op->retval = -EBUSY; @@ -103,7 +108,6 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; PRIV(op)->op_state = 1; - inflight_writes++; return 1; } @@ -145,7 +149,6 @@ resume_5: mark_rolled_back(*v); } journal.trim(); - inflight_writes--; // Acknowledge op op->retval = 0; FINISH_OP(op); @@ -205,7 +208,6 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t live = true; if (data->res != data->iov.iov_len) { - inflight_writes--; throw std::runtime_error( "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 2d272a65..02bf885a 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -67,6 +67,11 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // Already stable } } + else if (IS_IN_FLIGHT(dirty_it->second.state)) + { + // Object write is still in progress. Wait until the write request completes + return 0; + } else if (!IS_SYNCED(dirty_it->second.state)) { // Object not synced yet. Caller must sync it first @@ -135,7 +140,6 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; PRIV(op)->op_state = 1; - inflight_writes++; return 1; } @@ -178,7 +182,6 @@ resume_5: // Mark all dirty_db entries up to op->version as stable mark_stable(*v); } - inflight_writes--; // Acknowledge op op->retval = 0; FINISH_OP(op); @@ -228,7 +231,6 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t * live = true; if (data->res != data->iov.iov_len) { - inflight_writes--; throw std::runtime_error( "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index e90ddbef..dff23503 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -107,7 +107,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) // 2nd step: Data device is synced, prepare & write journal entries // Check space in the journal and journal memory buffers blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(), sizeof(journal_entry_big_write), 0)) + if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(), sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION)) { return 0; } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 33f47b37..3feff350 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -289,7 +289,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) PRIV(op)->op_state = 3; } } - inflight_writes++; return 1; } @@ -374,7 +373,6 @@ resume_4: dirty_it++; } } - inflight_writes--; // Acknowledge write op->retval = op->len; FINISH_OP(op); @@ -386,7 +384,6 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o live = true; if (data->res != data->iov.iov_len) { - inflight_writes--; // FIXME: our state becomes corrupted after a write error. maybe do something better than just die throw std::runtime_error( "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ @@ -445,7 +442,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) }); assert(dirty_it != dirty_db.end()); blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, 1, sizeof(journal_entry_del), 0)) + if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_STABILIZE_RESERVATION)) { return 0; } diff --git a/osd.cpp b/osd.cpp index 4cc0a040..465e36cd 100644 --- a/osd.cpp +++ b/osd.cpp @@ -99,7 +99,7 @@ void osd_t::parse_config(blockstore_config_t & config) print_stats_interval = 3; slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10); if (!slow_log_interval) - slow_log_interval = 3; + slow_log_interval = 10; c_cli.peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10); if (!c_cli.peer_connect_interval) c_cli.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; @@ -332,7 +332,7 @@ void osd_t::print_slow() { for (auto op: kv.second->received_ops) { - if (now.tv_sec - op->tv_begin.tv_sec >= slow_log_interval) + if ((now.tv_sec - op->tv_begin.tv_sec) >= slow_log_interval) { int l = sizeof(alloc), n; char *buf = alloc; @@ -366,7 +366,11 @@ void osd_t::print_slow() } else if (op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK) { - bufprintf(" %lu object versions", op->req.sec_stab.len / sizeof(obj_ver_id)); + for (uint64_t i = 0; i < op->req.sec_stab.len; i += sizeof(obj_ver_id)) + { + obj_ver_id *ov = (obj_ver_id*)(op->buf + i); + bufprintf(i == 0 ? " %lx:%lx v%lu" : ", %lx:%lx v%lu", ov->oid.inode, ov->oid.stripe, ov->version); + } } else if (op->req.hdr.opcode == OSD_OP_SEC_LIST) { diff --git a/osd.h b/osd.h index 2010603d..db630793 100644 --- a/osd.h +++ b/osd.h @@ -70,7 +70,7 @@ class osd_t int client_queue_depth = 128; bool allow_test_ops = true; int print_stats_interval = 3; - int slow_log_interval = 30; + int slow_log_interval = 10; int immediate_commit = IMMEDIATE_NONE; int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;