diff --git a/Makefile b/Makefile index a4f86e61..86fa8811 100644 --- a/Makefile +++ b/Makefile @@ -2,8 +2,8 @@ all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_rea clean: rm -f *.o crc32c.o: crc32c.c - gcc -c -o $@ $< -%.o: %.cpp - gcc -c -o $@ $< + g++ -c -o $@ $< +%.o: %.cpp blockstore.h + g++ -c -o $@ $< test: test.cpp - gcc -o test -luring test.cpp + g++ -o test -luring test.cpp diff --git a/allocator.cpp b/allocator.cpp index bd91296b..da3335d5 100644 --- a/allocator.cpp +++ b/allocator.cpp @@ -3,8 +3,6 @@ #include #include -#define MAX64 ((uint64_t)-1) - allocator *allocator_create(uint64_t blocks) { if (blocks >= 0x80000000 || blocks <= 1) @@ -22,8 +20,8 @@ allocator *allocator_create(uint64_t blocks) allocator *buf = (allocator*)memalign(sizeof(uint64_t), (2 + total)*sizeof(uint64_t)); buf->size = blocks; buf->last_one_mask = (blocks % 64) == 0 - ? MAX64 - : ~(MAX64 << (64 - blocks % 64)); + ? UINT64_MAX + : ~(UINT64_MAX << (64 - blocks % 64)); for (uint64_t i = 0; i < blocks; i++) { buf->mask[i] = 0; @@ -61,7 +59,7 @@ void allocator_set(allocator *alloc, uint64_t addr, bool value) { alloc->mask[last] = alloc->mask[last] | (1 << bit); if (alloc->mask[last] != (!is_last || cur_addr/64 < alloc->size/64 - ? MAX64 : alloc->last_one_mask)) + ? UINT64_MAX : alloc->last_one_mask)) { break; } @@ -109,13 +107,13 @@ uint64_t allocator_find_free(allocator *alloc) if (i == 64) { // No space - return MAX64; + return UINT64_MAX; } addr = (addr * 64) | i; if (addr >= alloc->size) { // No space - return MAX64; + return UINT64_MAX; } offset += p2; p2 = p2 * 64; diff --git a/blockstore.cpp b/blockstore.cpp index fe950730..7bef9d17 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -205,10 +205,14 @@ int blockstore::enqueue_op(blockstore_operation *op) if ((op->flags & OP_TYPE_MASK) == OP_WRITE) { // Assign version number - auto dirty_it = dirty_queue.find(op->oid); - if (dirty_it != dirty_queue.end()) + auto dirty_it = dirty_db.upper_bound((obj_ver_id){ + .oid = op->oid, + .version = UINT64_MAX, + }); + dirty_it--; + if (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid) { - op->version = dirty_it->second.back().version + 1; + op->version = dirty_it->first.version + 1; } else { @@ -221,11 +225,12 @@ int blockstore::enqueue_op(blockstore_operation *op) { op->version = 1; } - dirty_it = dirty_queue.emplace(op->oid, dirty_list()).first; } - // Immediately add the operation into the dirty queue, so subsequent reads could see it - dirty_it->second.push_back((dirty_entry){ + // Immediately add the operation into dirty_db, so subsequent reads could see it + dirty_db.emplace((obj_ver_id){ + .oid = op->oid, .version = op->version, + }, (dirty_entry){ .state = ST_IN_FLIGHT, .flags = 0, .location = 0, diff --git a/blockstore.h b/blockstore.h index def7b741..cb266349 100644 --- a/blockstore.h +++ b/blockstore.h @@ -65,7 +65,12 @@ struct __attribute__((__packed__)) object_id inline bool operator == (const object_id & a, const object_id & b) { - return b.inode == a.inode && b.stripe == a.stripe; + return a.inode == b.inode && a.stripe == b.stripe; +} + +inline bool operator < (const object_id & a, const object_id & b) +{ + return a.inode < b.inode || a.inode == b.inode && a.stripe < b.stripe; } // 32 bytes per "clean" entry on disk with fixed metadata tables @@ -87,9 +92,19 @@ struct __attribute__((__packed__)) clean_entry }; // 48 bytes per dirty entry in memory +struct __attribute__((__packed__)) obj_ver_id +{ + object_id oid; + uint64_t version; +}; + +inline bool operator < (const obj_ver_id & a, const obj_ver_id & b) +{ + return a.oid < b.oid || a.oid == b.oid && a.version < b.version; +} + struct __attribute__((__packed__)) dirty_entry { - uint64_t version; uint32_t state; uint32_t flags; uint64_t location; // location in either journal or data @@ -97,8 +112,6 @@ struct __attribute__((__packed__)) dirty_entry uint32_t size; // entry size }; -typedef std::vector dirty_list; - class oid_hash { public: @@ -124,7 +137,7 @@ public: // we should stop submission of other operations. Otherwise some "scatter" reads // may end up blocked for a long time. // Otherwise, the submit order is free, that is all operations may be submitted immediately -// In fact, adding a write operation must immediately result in dirty_queue being populated +// In fact, adding a write operation must immediately result in dirty_db being populated // write -> immediately add to dirty ops, immediately submit. postpone if ring full // read -> check dirty ops, read or wait, remember max used journal offset, then unremember it @@ -176,7 +189,7 @@ class blockstore struct ring_consumer_t ring_consumer; public: spp::sparse_hash_map object_db; - spp::sparse_hash_map dirty_queue; + std::map dirty_db; std::list submit_queue; std::set in_process_ops; uint32_t block_order, block_size; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index a2c5c677..c9f02602 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -290,8 +290,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) location = done_pos + total_pos; total_pos += je->small_write.len; } - bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){ + bs->dirty_db.emplace((obj_ver_id){ + .oid = je->small_write.oid, .version = je->small_write.version, + }, (dirty_entry){ .state = ST_J_SYNCED, .flags = 0, .location = location, @@ -302,8 +304,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) else if (je->type == JE_BIG_WRITE) { // oid, version, block - bs->dirty_queue[je->big_write.oid].push_back((dirty_entry){ + bs->dirty_db.emplace((obj_ver_id){ + .oid = je->big_write.oid, .version = je->big_write.version, + }, (dirty_entry){ .state = ST_D_META_SYNCED, .flags = 0, .location = je->big_write.block, @@ -314,8 +318,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) else if (je->type == JE_STABLE) { // oid, version - auto it = bs->dirty_queue.find(je->stable.oid); - if (it == bs->dirty_queue.end()) + auto it = bs->dirty_db.find((obj_ver_id){ + .oid = je->stable.oid, + .version = je->stable.version, + }); + if (it == bs->dirty_db.end()) { // journal contains a legitimate STABLE entry for a non-existing dirty write // this probably means that journal was trimmed between WRITTEN and STABLE entries @@ -323,29 +330,18 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) } else { - auto & lst = it->second; - int i; - for (i = 0; i < lst.size(); i++) - { - if (lst[i].version == je->stable.version) - { - lst[i].state = (lst[i].state == ST_D_META_SYNCED - ? ST_D_STABLE - : (lst[i].state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE)); - break; - } - } - if (i >= lst.size()) - { - // same. STABLE entry for a missing object version - } + it->second.state = (it->second.state == ST_D_META_SYNCED + ? ST_D_STABLE + : (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE)); } } else if (je->type == JE_DELETE) { // oid, version - bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){ - .version = je->small_write.version, + bs->dirty_db.emplace((obj_ver_id){ + .oid = je->del.oid, + .version = je->del.version, + }, (dirty_entry){ .state = ST_DEL_SYNCED, .flags = 0, .location = 0, diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 23a547dc..c826bbd4 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -79,8 +79,14 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint32_t item_start, int blockstore::dequeue_read(blockstore_operation *read_op) { auto clean_it = object_db.find(read_op->oid); - auto dirty_it = dirty_queue.find(read_op->oid); - if (clean_it == object_db.end() && dirty_it == dirty_queue.end()) + auto dirty_it = dirty_db.upper_bound((obj_ver_id){ + .oid = read_op->oid, + .version = UINT64_MAX, + }); + dirty_it--; + bool clean_found = clean_it != object_db.end(); + bool dirty_found = (dirty_it != dirty_db.end() && dirty_it->first.oid == read_op->oid); + if (!clean_found && !dirty_found) { // region is not allocated - return zeroes memset(read_op->buf, 0, read_op->len); @@ -90,14 +96,15 @@ int blockstore::dequeue_read(blockstore_operation *read_op) } unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; uint64_t fulfilled = 0; - if (dirty_it != dirty_queue.end()) + if (dirty_found) { - dirty_list dirty = dirty_it->second; - for (int i = dirty.size()-1; i >= 0; i--) + while (dirty_it->first.oid == read_op->oid) { - if ((read_op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || IS_STABLE(dirty[i].state)) + dirty_entry& dirty = dirty_it->second; + if ((read_op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || IS_STABLE(dirty.state)) { - if (fulfill_read(read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0) + if (fulfill_read(read_op, dirty.offset, dirty.offset + dirty.size, + dirty.state, dirty_it->first.version, dirty.location) < 0) { // need to wait. undo added requests, don't dequeue op ringloop->ring->sq.sqe_tail = prev_sqe_pos; @@ -105,6 +112,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op) return 0; } } + dirty_it--; } } if (clean_it != object_db.end()) diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 48724697..6997f60e 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -3,7 +3,10 @@ // First step of the write algorithm: dequeue operation and submit initial write(s) int blockstore::dequeue_write(blockstore_operation *op) { - auto dirty_it = dirty_queue[op->oid].find(op->version); // FIXME OOPS + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); if (op->len == block_size) { // Big (redirect) write @@ -23,8 +26,8 @@ int blockstore::dequeue_write(blockstore_operation *op) return 0; } struct ring_data_t *data = ((ring_data_t*)sqe->user_data); - (*dirty_it).location = loc << block_order; - //(*dirty_it).state = ST_D_SUBMITTED; + dirty_it->second.location = loc << block_order; + //dirty_it->second.state = ST_D_SUBMITTED; allocator_set(data_alloc, loc, true); data->iov = (struct iovec){ op->buf, op->len }; data->op = op; @@ -38,7 +41,7 @@ int blockstore::dequeue_write(blockstore_operation *op) { // Small (journaled) write // First check if the journal has sufficient space - // FIXME Always two SQEs for now. Although it's possible to send 1 + // FIXME Always two SQEs for now. Although it's possible to send 1 sometimes bool two_sqes = true; uint64_t next_pos = journal.next_free; if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write)) @@ -116,8 +119,8 @@ int blockstore::dequeue_write(blockstore_operation *op) io_uring_prep_writev( sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free ); - (*dirty_it).location = journal.next_free; - //(*dirty_it).state = ST_J_SUBMITTED; + dirty_it->second.location = journal.next_free; + //dirty_it->second.state = ST_J_SUBMITTED; // Move journal.next_free and save last write for current sector journal.next_free += op->len; if (journal.next_free >= journal.len) diff --git a/test.cpp b/test.cpp index f006a3ce..66737567 100644 --- a/test.cpp +++ b/test.cpp @@ -13,6 +13,8 @@ #include #include +#include + static int setup_context(unsigned entries, struct io_uring *ring) { int ret = io_uring_queue_init(entries, ring, 0); @@ -47,6 +49,12 @@ static void test_write(struct io_uring *ring, int fd) int main(int argc, char *argv[]) { + std::map strs; + strs.emplace(12, "str"); + auto it = strs.upper_bound(11); + printf("s = %d %s %d\n", it->first, it->second.c_str(), it == strs.begin()); + it--; + printf("s = %d %s\n", it->first, it->second.c_str()); struct io_uring ring; int fd = open("testfile", O_RDWR | O_DIRECT, 0644); if (fd < 0)