@@ -151,8 +151,8 @@ void blockstore_impl_t::loop() | |||
{ | |||
if (has_writes == 2) | |||
{ | |||
// Some writes could not be submitted | |||
break; | |||
// Some writes already could not be submitted | |||
continue; | |||
} | |||
dequeue_op = dequeue_write(op); | |||
has_writes = dequeue_op ? 1 : 2; | |||
@@ -161,8 +161,8 @@ void blockstore_impl_t::loop() | |||
{ | |||
if (has_writes == 2) | |||
{ | |||
// Some writes could not be submitted | |||
break; | |||
// Some writes already could not be submitted | |||
continue; | |||
} | |||
dequeue_op = dequeue_del(op); | |||
has_writes = dequeue_op ? 1 : 2; | |||
@@ -182,33 +182,19 @@ void blockstore_impl_t::loop() | |||
} | |||
else if (op->opcode == BS_OP_STABLE) | |||
{ | |||
if (has_writes == 2) | |||
{ | |||
// Don't submit additional flushes before completing previous LISTs | |||
break; | |||
} | |||
dequeue_op = dequeue_stable(op); | |||
} | |||
else if (op->opcode == BS_OP_ROLLBACK) | |||
{ | |||
if (has_writes == 2) | |||
{ | |||
// Don't submit additional flushes before completing previous LISTs | |||
break; | |||
} | |||
dequeue_op = dequeue_rollback(op); | |||
} | |||
else if (op->opcode == BS_OP_LIST) | |||
{ | |||
// Block LIST operation by previous modifications, | |||
// so it always returns a consistent state snapshot | |||
if (has_writes == 2 || inflight_writes > 0) | |||
has_writes = 2; | |||
else | |||
{ | |||
process_list(op); | |||
dequeue_op = true; | |||
} | |||
// LIST doesn't need to be blocked by previous modifications, | |||
// it only needs to include all in-progress writes as they're guaranteed | |||
// to be readable and stabilizable/rollbackable by subsequent operations | |||
process_list(op); | |||
dequeue_op = true; | |||
} | |||
if (dequeue_op) | |||
{ | |||
@@ -226,7 +226,6 @@ class blockstore_impl_t | |||
bool live = false, queue_stall = false; | |||
ring_loop_t *ringloop; | |||
int inflight_writes = 0; | |||
bool stop_sync_submitted; | |||
@@ -100,10 +100,11 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries | |||
{ | |||
// No space in the journal. Wait until used_start changes. | |||
printf( | |||
"Ran out of journal space (free space: %lu bytes)\n", | |||
"Ran out of journal space (free space: %lu bytes, sectors to write: %d)\n", | |||
(bs->journal.next_free >= bs->journal.used_start | |||
? bs->journal.len-bs->journal.block_size - (bs->journal.next_free-bs->journal.used_start) | |||
: bs->journal.used_start - bs->journal.next_free) | |||
: bs->journal.used_start - bs->journal.next_free), | |||
sectors_required | |||
); | |||
PRIV(op)->wait_for = WAIT_JOURNAL; | |||
bs->flusher->request_trim(); | |||
@@ -10,6 +10,8 @@ | |||
#define JOURNAL_BUFFER_SIZE 4*1024*1024 | |||
// We reserve some extra space for future stabilize requests during writes | |||
// FIXME: This value should be dynamic i.e. Blockstore ideally shouldn't allow | |||
// writing more than can be stabilized afterwards | |||
#define JOURNAL_STABILIZE_RESERVATION 65536 | |||
// Journal entries | |||
@@ -40,7 +40,12 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) | |||
} | |||
while (dirty_it->first.oid == v->oid && dirty_it->first.version > v->version) | |||
{ | |||
if (!IS_SYNCED(dirty_it->second.state) || | |||
if (IS_IN_FLIGHT(dirty_it->second.state)) | |||
{ | |||
// Object write is still in progress. Wait until the write request completes | |||
return 0; | |||
} | |||
else if (!IS_SYNCED(dirty_it->second.state) || | |||
IS_STABLE(dirty_it->second.state)) | |||
{ | |||
op->retval = -EBUSY; | |||
@@ -103,7 +108,6 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) | |||
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; | |||
PRIV(op)->pending_ops = s; | |||
PRIV(op)->op_state = 1; | |||
inflight_writes++; | |||
return 1; | |||
} | |||
@@ -145,7 +149,6 @@ resume_5: | |||
mark_rolled_back(*v); | |||
} | |||
journal.trim(); | |||
inflight_writes--; | |||
// Acknowledge op | |||
op->retval = 0; | |||
FINISH_OP(op); | |||
@@ -205,7 +208,6 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t | |||
live = true; | |||
if (data->res != data->iov.iov_len) | |||
{ | |||
inflight_writes--; | |||
throw std::runtime_error( | |||
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ | |||
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" | |||
@@ -67,6 +67,11 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) | |||
// Already stable | |||
} | |||
} | |||
else if (IS_IN_FLIGHT(dirty_it->second.state)) | |||
{ | |||
// Object write is still in progress. Wait until the write request completes | |||
return 0; | |||
} | |||
else if (!IS_SYNCED(dirty_it->second.state)) | |||
{ | |||
// Object not synced yet. Caller must sync it first | |||
@@ -135,7 +140,6 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) | |||
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; | |||
PRIV(op)->pending_ops = s; | |||
PRIV(op)->op_state = 1; | |||
inflight_writes++; | |||
return 1; | |||
} | |||
@@ -178,7 +182,6 @@ resume_5: | |||
// Mark all dirty_db entries up to op->version as stable | |||
mark_stable(*v); | |||
} | |||
inflight_writes--; | |||
// Acknowledge op | |||
op->retval = 0; | |||
FINISH_OP(op); | |||
@@ -228,7 +231,6 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t * | |||
live = true; | |||
if (data->res != data->iov.iov_len) | |||
{ | |||
inflight_writes--; | |||
throw std::runtime_error( | |||
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ | |||
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" | |||
@@ -107,7 +107,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) | |||
// 2nd step: Data device is synced, prepare & write journal entries | |||
// Check space in the journal and journal memory buffers | |||
blockstore_journal_check_t space_check(this); | |||
if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(), sizeof(journal_entry_big_write), 0)) | |||
if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(), sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION)) | |||
{ | |||
return 0; | |||
} | |||
@@ -289,7 +289,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) | |||
PRIV(op)->op_state = 3; | |||
} | |||
} | |||
inflight_writes++; | |||
return 1; | |||
} | |||
@@ -374,7 +373,6 @@ resume_4: | |||
dirty_it++; | |||
} | |||
} | |||
inflight_writes--; | |||
// Acknowledge write | |||
op->retval = op->len; | |||
FINISH_OP(op); | |||
@@ -386,7 +384,6 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o | |||
live = true; | |||
if (data->res != data->iov.iov_len) | |||
{ | |||
inflight_writes--; | |||
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die | |||
throw std::runtime_error( | |||
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ | |||
@@ -445,7 +442,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) | |||
}); | |||
assert(dirty_it != dirty_db.end()); | |||
blockstore_journal_check_t space_check(this); | |||
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), 0)) | |||
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_STABILIZE_RESERVATION)) | |||
{ | |||
return 0; | |||
} | |||
@@ -99,7 +99,7 @@ void osd_t::parse_config(blockstore_config_t & config) | |||
print_stats_interval = 3; | |||
slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10); | |||
if (!slow_log_interval) | |||
slow_log_interval = 3; | |||
slow_log_interval = 10; | |||
c_cli.peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10); | |||
if (!c_cli.peer_connect_interval) | |||
c_cli.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; | |||
@@ -332,7 +332,7 @@ void osd_t::print_slow() | |||
{ | |||
for (auto op: kv.second->received_ops) | |||
{ | |||
if (now.tv_sec - op->tv_begin.tv_sec >= slow_log_interval) | |||
if ((now.tv_sec - op->tv_begin.tv_sec) >= slow_log_interval) | |||
{ | |||
int l = sizeof(alloc), n; | |||
char *buf = alloc; | |||
@@ -366,7 +366,11 @@ void osd_t::print_slow() | |||
} | |||
else if (op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK) | |||
{ | |||
bufprintf(" %lu object versions", op->req.sec_stab.len / sizeof(obj_ver_id)); | |||
for (uint64_t i = 0; i < op->req.sec_stab.len; i += sizeof(obj_ver_id)) | |||
{ | |||
obj_ver_id *ov = (obj_ver_id*)(op->buf + i); | |||
bufprintf(i == 0 ? " %lx:%lx v%lu" : ", %lx:%lx v%lu", ov->oid.inode, ov->oid.stripe, ov->version); | |||
} | |||
} | |||
else if (op->req.hdr.opcode == OSD_OP_SEC_LIST) | |||
{ | |||
@@ -70,7 +70,7 @@ class osd_t | |||
int client_queue_depth = 128; | |||
bool allow_test_ops = true; | |||
int print_stats_interval = 3; | |||
int slow_log_interval = 30; | |||
int slow_log_interval = 10; | |||
int immediate_commit = IMMEDIATE_NONE; | |||
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds | |||
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; | |||