|
|
@ -3,33 +3,42 @@ |
|
|
|
#define SYNC_NO_BIG 1 |
|
|
|
#define SYNC_HAS_BIG 2 |
|
|
|
#define SYNC_DATA_SYNC_SENT 3 |
|
|
|
#define SYNC_JOURNAL_SYNC_SENT 4 |
|
|
|
#define SYNC_DONE 5 |
|
|
|
#define SYNC_DATA_SYNC_DONE 4 |
|
|
|
#define SYNC_JOURNAL_SYNC_SENT 5 |
|
|
|
#define SYNC_DONE 6 |
|
|
|
|
|
|
|
int blockstore::dequeue_sync(blockstore_operation *op) |
|
|
|
{ |
|
|
|
if (op->sync_state == 0) |
|
|
|
{ |
|
|
|
op->big_write_count = 0; |
|
|
|
op->sync_state = SYNC_NO_BIG; |
|
|
|
op->sync_writes.swap(unsynced_writes); |
|
|
|
unsynced_writes.clear(); |
|
|
|
if (op->sync_writes.size() == 0) |
|
|
|
{ |
|
|
|
op->sync_big_writes.swap(unsynced_big_writes); |
|
|
|
op->big_write_count = op->sync_big_writes.size(); |
|
|
|
if (op->big_write_count > 0) |
|
|
|
op->sync_state = SYNC_HAS_BIG; |
|
|
|
else if (unsynced_small_writes == 0) |
|
|
|
op->sync_state = SYNC_DONE; |
|
|
|
} |
|
|
|
auto it = op->sync_writes.begin(); |
|
|
|
while (it != op->sync_writes.end()) |
|
|
|
else |
|
|
|
op->sync_state = SYNC_NO_BIG; |
|
|
|
unsynced_big_writes.clear(); |
|
|
|
unsynced_small_writes = 0; |
|
|
|
} |
|
|
|
int r = continue_sync(op); |
|
|
|
if (r) |
|
|
|
{ |
|
|
|
int done = ack_sync(op); |
|
|
|
if (!done) |
|
|
|
{ |
|
|
|
uint32_t state = dirty_db[*it].state; |
|
|
|
if (IS_BIG_WRITE(state)) |
|
|
|
{ |
|
|
|
op->big_write_count++; |
|
|
|
op->sync_state = SYNC_HAS_BIG; |
|
|
|
} |
|
|
|
it++; |
|
|
|
in_progress_ops.insert(op); |
|
|
|
op->prev_sync_count = in_progress_syncs.size(); |
|
|
|
op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); |
|
|
|
} |
|
|
|
} |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
int blockstore::continue_sync(blockstore_operation *op) |
|
|
|
{ |
|
|
|
if (op->sync_state == SYNC_NO_BIG) |
|
|
|
{ |
|
|
|
// No big writes, just fsync the journal
|
|
|
@ -48,7 +57,7 @@ int blockstore::dequeue_sync(blockstore_operation *op) |
|
|
|
op->pending_ops = 1; |
|
|
|
op->sync_state = SYNC_DATA_SYNC_SENT; |
|
|
|
} |
|
|
|
else if (op->sync_state == SYNC_DATA_SYNC_SENT) |
|
|
|
else if (op->sync_state == SYNC_DATA_SYNC_DONE) |
|
|
|
{ |
|
|
|
// 2nd step: Data device is synced, prepare & write journal entries
|
|
|
|
// Check space in the journal and journal memory buffers
|
|
|
@ -87,8 +96,7 @@ int blockstore::dequeue_sync(blockstore_operation *op) |
|
|
|
op->min_used_journal_sector = 1 + journal.cur_sector; |
|
|
|
sectors_required = 0; |
|
|
|
required = op->big_write_count; |
|
|
|
// FIXME: advance it
|
|
|
|
auto it = op->sync_writes.begin(); |
|
|
|
auto it = op->sync_big_writes.begin(); |
|
|
|
while (1) |
|
|
|
{ |
|
|
|
int fits = (512 - journal.in_sector_pos) / sizeof(journal_entry_big_write); |
|
|
@ -111,6 +119,7 @@ int blockstore::dequeue_sync(blockstore_operation *op) |
|
|
|
journal.crc32_last = je->crc32; |
|
|
|
journal.in_sector_pos += sizeof(journal_entry_big_write); |
|
|
|
required--; |
|
|
|
it++; |
|
|
|
} |
|
|
|
if (required <= 0) |
|
|
|
break; |
|
|
@ -136,9 +145,10 @@ int blockstore::dequeue_sync(blockstore_operation *op) |
|
|
|
op->max_used_journal_sector = 1 + journal.cur_sector; |
|
|
|
op->sync_state = SYNC_JOURNAL_SYNC_SENT; |
|
|
|
} |
|
|
|
// FIXME: resubmit op from in_progress
|
|
|
|
op->prev_sync_count = in_progress_syncs.size(); |
|
|
|
op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); |
|
|
|
else |
|
|
|
{ |
|
|
|
return 0; |
|
|
|
} |
|
|
|
return 1; |
|
|
|
} |
|
|
|
|
|
|
@ -151,40 +161,62 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op) |
|
|
|
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); |
|
|
|
} |
|
|
|
op->pending_ops--; |
|
|
|
if (op->min_used_journal_sector > 0) |
|
|
|
if (op->pending_ops == 0) |
|
|
|
{ |
|
|
|
for (uint64_t s = op->min_used_journal_sector; s != op->max_used_journal_sector; s = (s + 1) % journal.sector_count) |
|
|
|
// Release used journal sectors
|
|
|
|
if (op->min_used_journal_sector > 0) |
|
|
|
{ |
|
|
|
for (uint64_t s = op->min_used_journal_sector; s != op->max_used_journal_sector; s = (s + 1) % journal.sector_count) |
|
|
|
{ |
|
|
|
journal.sector_info[s-1].usage_count--; |
|
|
|
} |
|
|
|
op->min_used_journal_sector = op->max_used_journal_sector = 0; |
|
|
|
} |
|
|
|
// Handle state
|
|
|
|
if (op->sync_state == SYNC_DATA_SYNC_SENT) |
|
|
|
{ |
|
|
|
journal.sector_info[s-1].usage_count--; |
|
|
|
op->sync_state = SYNC_DATA_SYNC_DONE; |
|
|
|
} |
|
|
|
op->min_used_journal_sector = op->max_used_journal_sector = 0; |
|
|
|
else if (op->sync_state == SYNC_JOURNAL_SYNC_SENT) |
|
|
|
{ |
|
|
|
op->sync_state = SYNC_DONE; |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
throw new std::runtime_error("BUG: unexpected sync op state"); |
|
|
|
} |
|
|
|
ack_sync(op); |
|
|
|
} |
|
|
|
if (op->pending_ops == 0) |
|
|
|
} |
|
|
|
|
|
|
|
int blockstore::ack_sync(blockstore_operation *op) |
|
|
|
{ |
|
|
|
if (op->sync_state == SYNC_DONE && op->prev_sync_count == 0) |
|
|
|
{ |
|
|
|
// Acknowledge sync
|
|
|
|
// Remove dependency of subsequent syncs
|
|
|
|
auto it = op->in_progress_ptr; |
|
|
|
int done_syncs = 1; |
|
|
|
++it; |
|
|
|
while (it != in_progress_syncs.end()) |
|
|
|
{ |
|
|
|
auto & next_sync = *it; |
|
|
|
auto & next_sync = *it++; |
|
|
|
next_sync->prev_sync_count -= done_syncs; |
|
|
|
if (next_sync->prev_sync_count == 0/* && next_sync->DONE*/) |
|
|
|
if (next_sync->prev_sync_count == 0 && next_sync->sync_state == SYNC_DONE) |
|
|
|
{ |
|
|
|
done_syncs++; |
|
|
|
auto next_it = it; |
|
|
|
it++; |
|
|
|
in_progress_syncs.erase(next_it); |
|
|
|
// Acknowledge next_sync
|
|
|
|
in_progress_syncs.erase(next_sync->in_progress_ptr); |
|
|
|
in_progress_ops.erase(next_sync); |
|
|
|
next_sync->retval = 0; |
|
|
|
next_sync->callback(next_sync); |
|
|
|
in_progress_ops.erase(next_sync); |
|
|
|
} |
|
|
|
else |
|
|
|
it++; |
|
|
|
} |
|
|
|
// Acknowledge sync
|
|
|
|
in_progress_syncs.erase(op->in_progress_ptr); |
|
|
|
in_progress_ops.erase(op); |
|
|
|
op->retval = 0; |
|
|
|
op->callback(op); |
|
|
|
in_progress_ops.erase(op); |
|
|
|
return 1; |
|
|
|
} |
|
|
|
return 0; |
|
|
|
} |
|
|
|