diff --git a/blockstore_impl.h b/blockstore_impl.h index 7214c19a..27b38d49 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -25,11 +25,12 @@ // States are not stored on disk. Instead, they're deduced from the journal // FIXME: Rename to BS_ST_* -#define ST_J_IN_FLIGHT 1 -#define ST_J_SUBMITTED 2 -#define ST_J_WRITTEN 3 -#define ST_J_SYNCED 4 -#define ST_J_STABLE 5 +#define ST_J_WAIT_BIG 1 +#define ST_J_IN_FLIGHT 2 +#define ST_J_SUBMITTED 3 +#define ST_J_WRITTEN 4 +#define ST_J_SYNCED 5 +#define ST_J_STABLE 6 #define ST_D_IN_FLIGHT 15 #define ST_D_SUBMITTED 16 @@ -49,13 +50,13 @@ #define IMMEDIATE_SMALL 1 #define IMMEDIATE_ALL 2 -#define IS_IN_FLIGHT(st) (st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) +#define IS_IN_FLIGHT(st) (st == ST_J_WAIT_BIG || st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) #define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT) #define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_SYNCED || st == ST_DEL_SYNCED) -#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE) -#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE) -#define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE) -#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_WRITTEN|| st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN) +#define IS_JOURNAL(st) (st >= ST_J_WAIT_BIG && st <= ST_J_STABLE) +#define IS_BIG_WRITE(st) (st >= ST_D_IN_FLIGHT && st <= ST_D_STABLE) +#define IS_DELETE(st) (st >= ST_DEL_IN_FLIGHT && st <= ST_DEL_STABLE) +#define IS_UNSYNCED(st) (st >= ST_J_WAIT_BIG && st <= ST_J_WRITTEN || st >= ST_D_IN_FLIGHT && st <= ST_D_WRITTEN|| st >= ST_DEL_IN_FLIGHT && st <= ST_DEL_WRITTEN) #define BS_SUBMIT_GET_SQE(sqe, data) \ BS_SUBMIT_GET_ONLY_SQE(sqe); \ diff --git a/blockstore_init.cpp b/blockstore_init.cpp index ede217c4..0098465e 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -521,7 +521,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u } auto clean_it = bs->clean_db.find(je->small_write.oid); if (clean_it == bs->clean_db.end() || - clean_it->second.version < je->big_write.version) + clean_it->second.version < je->small_write.version) { obj_ver_id ov = { .oid = je->small_write.oid, diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index a9ff527a..905c6304 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -252,7 +252,17 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) #endif auto & unstab = unstable_writes[it->oid]; unstab = unstab < it->version ? it->version : unstab; - dirty_db[*it].state = ST_D_SYNCED; + auto dirty_it = dirty_db.find(*it); + dirty_it->second.state = ST_D_SYNCED; + dirty_it++; + while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid) + { + if (dirty_it->second.state == ST_J_WAIT_BIG) + { + dirty_it->second.state = ST_J_IN_FLIGHT; + } + dirty_it++; + } } for (auto it = PRIV(op)->sync_small_writes.begin(); it != PRIV(op)->sync_small_writes.end(); it++) { diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 678dd0f5..b4bcb7c2 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -4,6 +4,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) { // Check or assign version number bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE); + bool is_inflight_big = false; uint64_t version = 1; if (dirty_db.size() > 0) { @@ -17,6 +18,9 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) found = true; version = dirty_it->first.version + 1; deleted = IS_DELETE(dirty_it->second.state); + is_inflight_big = dirty_it->second.state >= ST_D_IN_FLIGHT && + dirty_it->second.state < ST_D_SYNCED || + dirty_it->second.state == ST_J_WAIT_BIG; } } if (!found) @@ -47,6 +51,18 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) op->retval = 0; return false; } + if (is_inflight_big && !is_del && !deleted && op->len < block_size && + immediate_commit != IMMEDIATE_ALL) + { + // Issue an additional sync so that the previous big write can reach the journal + blockstore_op_t *sync_op = new blockstore_op_t; + sync_op->opcode = BS_OP_SYNC; + sync_op->callback = [this, op](blockstore_op_t *sync_op) + { + delete sync_op; + }; + enqueue_op(sync_op); + } // Immediately add the operation into dirty_db, so subsequent reads could see it #ifdef BLOCKSTORE_DEBUG if (is_del) @@ -61,7 +77,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) .state = (uint32_t)( is_del ? ST_DEL_IN_FLIGHT - : (op->len == block_size || deleted ? ST_D_IN_FLIGHT : ST_J_IN_FLIGHT) + : (op->len == block_size || deleted ? ST_D_IN_FLIGHT : (is_inflight_big ? ST_J_WAIT_BIG : ST_J_IN_FLIGHT)) ), .flags = 0, .location = 0, @@ -83,7 +99,11 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) .oid = op->oid, .version = op->version, }); - if (dirty_it->second.state == ST_D_IN_FLIGHT) + if (dirty_it->second.state == ST_J_WAIT_BIG) + { + return 0; + } + else if (dirty_it->second.state == ST_D_IN_FLIGHT) { blockstore_journal_check_t space_check(this); if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION)) @@ -262,10 +282,10 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op) { io_uring_sqe *sqe = NULL; journal_entry_big_write *je; - auto & dirty_entry = dirty_db[(obj_ver_id){ + auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, - }]; + }); if (PRIV(op)->op_state == 2) goto resume_2; else if (PRIV(op)->op_state == 4) @@ -280,7 +300,7 @@ resume_2: return 0; } je = (journal_entry_big_write*)prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write)); - dirty_entry.journal_sector = journal.sector_info[journal.cur_sector].offset; + dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; journal.sector_info[journal.cur_sector].dirty = false; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; #ifdef BLOCKSTORE_DEBUG @@ -290,7 +310,7 @@ resume_2: je->version = op->version; je->offset = op->offset; je->len = op->len; - je->location = dirty_entry.location; + je->location = dirty_it->second.location; je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; prepare_journal_sector_write(journal, journal.cur_sector, sqe, @@ -302,9 +322,9 @@ resume_2: resume_4: // Switch object state #ifdef BLOCKSTORE_DEBUG - printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state); + printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state); #endif - bool imm = dirty_entry.state == ST_D_SUBMITTED + bool imm = dirty_it->second.state == ST_D_SUBMITTED ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE); if (imm) @@ -312,17 +332,29 @@ resume_4: auto & unstab = unstable_writes[op->oid]; unstab = unstab < op->version ? op->version : unstab; } - if (dirty_entry.state == ST_J_SUBMITTED) + if (dirty_it->second.state == ST_J_SUBMITTED) { - dirty_entry.state = imm ? ST_J_SYNCED : ST_J_WRITTEN; + dirty_it->second.state = imm ? ST_J_SYNCED : ST_J_WRITTEN; } - else if (dirty_entry.state == ST_D_SUBMITTED) + else if (dirty_it->second.state == ST_D_SUBMITTED) { - dirty_entry.state = imm ? ST_D_SYNCED : ST_D_WRITTEN; + dirty_it->second.state = imm ? ST_D_SYNCED : ST_D_WRITTEN; } - else if (dirty_entry.state == ST_DEL_SUBMITTED) + else if (dirty_it->second.state == ST_DEL_SUBMITTED) { - dirty_entry.state = imm ? ST_DEL_SYNCED : ST_DEL_WRITTEN; + dirty_it->second.state = imm ? ST_DEL_SYNCED : ST_DEL_WRITTEN; + } + if (immediate_commit == IMMEDIATE_ALL) + { + dirty_it++; + while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid) + { + if (dirty_it->second.state == ST_J_WAIT_BIG) + { + dirty_it->second.state = ST_J_IN_FLIGHT; + } + dirty_it++; + } } // Acknowledge write op->retval = op->len;