diff --git a/blockstore.h b/blockstore.h index 36489169..39fe3389 100644 --- a/blockstore.h +++ b/blockstore.h @@ -29,19 +29,20 @@ // States are not stored on disk. Instead, they're deduced from the journal -#define ST_IN_FLIGHT 1 - +#define ST_J_IN_FLIGHT 1 #define ST_J_SUBMITTED 2 #define ST_J_WRITTEN 3 #define ST_J_SYNCED 4 #define ST_J_STABLE 5 +#define ST_D_IN_FLIGHT 15 #define ST_D_SUBMITTED 16 #define ST_D_WRITTEN 17 #define ST_D_META_WRITTEN 19 #define ST_D_META_SYNCED 20 #define ST_D_STABLE 21 +#define ST_DEL_IN_FLIGHT 31 #define ST_DEL_SUBMITTED 32 #define ST_DEL_WRITTEN 33 #define ST_DEL_SYNCED 34 @@ -49,9 +50,9 @@ #define ST_CURRENT 48 -#define IS_IN_FLIGHT(st) (st == ST_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) +#define IS_IN_FLIGHT(st) (st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) #define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT) -#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED) +#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED || st == ST_DEL_SYNCED) #define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE) #define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE) #define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE) @@ -314,6 +315,7 @@ class blockstore // Write void enqueue_write(blockstore_operation *op); int dequeue_write(blockstore_operation *op); + int dequeue_del(blockstore_operation *op); void handle_write_event(ring_data_t *data, blockstore_operation *op); // Sync diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 6d039e94..8fe9eb44 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -19,6 +19,14 @@ // 128K (data) + sync + 512b (journal) + sync + 512b (journal) + sync + 512b (metadata) + sync. // WA = 1.012. Very good :) +// Stabilize delete: +// 1) Remove metadata entry and sync it +// 2) Remove dirty_db entry and clear previous journal entries +// Note that it will lead to problems in a degraded cluster, because deleting 2 of 3 replicas +// and restarting the last replica will then result in extra "missing" objects. To solve that +// we need to store the "tombstones" of deleted objects. We can't do that with current simple +// metadata storage so we'll skip TRIM implementation for now. + // AND We must do it in batches, for the sake of reduced fsync call count // AND We must know what we stabilize. Basic workflow is like: // 1) primary OSD receives sync request @@ -154,6 +162,10 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op { dirty_it->second.state = ST_D_STABLE; } + else if (dirty_it->second.state == ST_DEL_SYNCED) + { + dirty_it->second.state = ST_DEL_STABLE; + } else if (IS_STABLE(dirty_it->second.state)) { break; diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index ea76e7de..6dd52f68 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -218,7 +218,7 @@ void blockstore::ack_one_sync(blockstore_operation *op) #endif auto & unstab = unstable_writes[it->oid]; unstab = unstab < it->version ? it->version : unstab; - dirty_db[*it].state = ST_J_SYNCED; + dirty_db[*it].state = dirty_db[*it].state == ST_DEL_WRITTEN ? ST_DEL_SYNCED : ST_J_SYNCED; } in_progress_syncs.erase(op->in_progress_ptr); op->retval = 0; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index a6e07846..7c5e09eb 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -3,7 +3,7 @@ void blockstore::enqueue_write(blockstore_operation *op) { // Assign version number - bool found = false; + bool found = false, deleted = false, is_del = (op->flags & OP_TYPE_MASK) == OP_DELETE; if (dirty_db.size() > 0) { auto dirty_it = dirty_db.upper_bound((obj_ver_id){ @@ -15,6 +15,7 @@ void blockstore::enqueue_write(blockstore_operation *op) { found = true; op->version = dirty_it->first.version + 1; + deleted = IS_DELETE(dirty_it->second.state); } } if (!found) @@ -26,22 +27,34 @@ void blockstore::enqueue_write(blockstore_operation *op) } else { + deleted = true; op->version = 1; } } + if (deleted && is_del) + { + // Already deleted + op->retval = 0; + op->callback(op); + return; + } // Immediately add the operation into dirty_db, so subsequent reads could see it #ifdef BLOCKSTORE_DEBUG - printf("Write %lu:%lu v%lu\n", op->oid.inode, op->oid.stripe, op->version); + printf("%s %lu:%lu v%lu\n", is_del ? "Delete" : "Write", op->oid.inode, op->oid.stripe, op->version); #endif dirty_db.emplace((obj_ver_id){ .oid = op->oid, .version = op->version, }, (dirty_entry){ - .state = ST_IN_FLIGHT, + .state = (uint32_t)( + is_del + ? ST_DEL_IN_FLIGHT + : (op->len == block_size || deleted ? ST_D_IN_FLIGHT : ST_J_IN_FLIGHT) + ), .flags = 0, .location = 0, - .offset = op->offset, - .len = op->len, + .offset = is_del ? 0 : op->offset, + .len = is_del ? 0 : op->len, .journal_sector = 0, }); } @@ -53,7 +66,7 @@ int blockstore::dequeue_write(blockstore_operation *op) .oid = op->oid, .version = op->version, }); - if (op->len == block_size || op->version == 1) + if (dirty_it->second.state == ST_D_IN_FLIGHT) { // Big (redirect) write uint64_t loc = data_alloc->find_free(); @@ -222,3 +235,40 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op) op->callback(op); } } + +int blockstore::dequeue_del(blockstore_operation *op) +{ + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + blockstore_journal_check_t space_check(this); + if (!space_check.check_available(op, 1, sizeof(journal_entry_del), 0)) + { + return 0; + } + BS_SUBMIT_GET_ONLY_SQE(sqe); + // Prepare journal sector write + journal_entry_del *je = (journal_entry_del*) + prefill_single_journal_entry(journal, JE_DELETE, sizeof(struct journal_entry_del)); + dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; + journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; +#ifdef BLOCKSTORE_DEBUG + printf("journal offset %lu is used by %lu:%lu v%lu\n", dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version); +#endif + je->oid = op->oid; + je->version = op->version; + je->crc32 = je_crc32((journal_entry*)je); + journal.crc32_last = je->crc32; + auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; + prepare_journal_sector_write(journal, sqe, cb); + op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; + op->pending_ops = 1; + dirty_it->second.state = ST_DEL_SUBMITTED; + // Remember small write as unsynced + unsynced_small_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + return 1; +}