From db3b8105882ff199c3d01dee11730db0a581027c Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 1 Nov 2019 02:47:57 +0300 Subject: [PATCH] journal structures + read fulfill --- blockstore.cpp | 245 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 203 insertions(+), 42 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index eebd04cd..1636c4c4 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -3,7 +3,13 @@ #include #include #include +#include #include +#include + +#include +#include + #include "allocator.h" #include "sparsepp/sparsepp/spp.h" @@ -23,65 +29,147 @@ #define ST_D_META_MOVED 21 #define ST_D_META_COMMITTED 22 #define ST_CURRENT 32 +#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32) +#define IS_JOURNAL(st) (st >= 2 && st <= 6) +// Default object size is 128 KB #define DEFAULT_ORDER 17 #define MAX_BLOCK_SIZE 128*1024*1024 #define DISK_ALIGNMENT 4096 #define MIN_JOURNAL_SIZE 4*1024*1024 +#define JOURNAL_MAGIC 0x4A33 #define STRIPE_NUM(oid) ((oid) >> 4) #define STRIPE_REPLICA(oid) ((oid) & 0xf) -struct __attribute__((__packed__)) oid +// 16 bytes per object/stripe id +// stripe includes replica number in 4 least significant bits +struct __attribute__((__packed__)) object_id { uint64_t inode; uint64_t stripe; }; -struct __attribute__((__packed__)) meta_entry +bool operator == (const object_id & a, const object_id & b) +{ + return b.inode == a.inode && b.stripe == a.stripe; +} + +// 32 bytes per "clean" entry on disk with fixed metadata tables +struct __attribute__((__packed__)) clean_disk_entry { uint64_t inode; uint64_t stripe; - uint32_t epoch; - uint32_t version; - uint64_t location_flags; + uint64_t version; + uint8_t flags; + uint8_t reserved[7]; }; -struct __attribute__((__packed__)) object_version +// 28 bytes per "clean" entry in memory +struct __attribute__((__packed__)) clean_entry { - uint32_t epoch; - uint32_t version; - uint64_t location; - uint32_t size; + uint64_t version; uint32_t state; - - bool in_journal() - { - return (location & (1 << 63)); - } - - uint64_t offset() - { - return (location & ~(1 << 63)); - } + uint64_t location; }; -struct __attribute__((__packed__)) object_ver_list +// 48 bytes per dirty entry in memory +struct __attribute__((__packed__)) dirty_entry { - uint64_t count; - object_version versions[]; + uint64_t version; + uint32_t state; + uint32_t flags; + uint64_t location; // location in either journal or data + uint32_t offset; // offset within stripe + uint32_t size; // entry size }; -struct __attribute__((__packed__)) object_info +// Journal entries +// Journal entries are linked to each other by their crc32 value +// The journal is almost a blockchain, because object versions constantly increase +#define JE_START 0x01 +#define JE_SMALL_WRITE 0x02 +#define JE_BIG_WRITE 0x03 +#define JE_STABLE 0x04 +#define JE_DELETE 0x05 + +struct __attribute__((__packed__)) journal_entry_start { - object_version first; - object_ver_list *other; + uint32_t type; + uint32_t size; + uint32_t crc32; + uint32_t reserved1; + uint64_t offset; }; +struct __attribute__((__packed__)) journal_entry_small_write +{ + uint32_t type; + uint32_t size; + uint32_t crc32; + uint32_t crc32_prev; + object_id oid; + uint64_t version; + uint32_t offset; + uint32_t len; +}; + +struct __attribute__((__packed__)) journal_entry_big_write +{ + uint32_t type; + uint32_t size; + uint32_t crc32; + uint32_t crc32_prev; + object_id oid; + uint64_t version; + uint64_t block; +}; + +struct __attribute__((__packed__)) journal_entry_stable +{ + uint32_t type; + uint32_t size; + uint32_t crc32; + uint32_t crc32_prev; + object_id oid; + uint64_t version; +}; + +struct __attribute__((__packed__)) journal_entry_del +{ + uint32_t type; + uint32_t size; + uint32_t crc32; + uint32_t crc32_prev; + object_id oid; + uint64_t version; +}; + +struct __attribute__((__packed__)) journal_entry +{ + union + { + struct __attribute__((__packed__)) + { + uint16_t magic; + uint16_t type; + uint32_t size; + uint32_t crc32; + }; + journal_entry_start start; + journal_entry_small_write small_write; + journal_entry_big_write big_write; + journal_entry_stable stable; + journal_entry_del del; + }; +}; + +typedef std::vector dirty_list; + class oid_hash { public: - size_t operator()(const oid &s) const + size_t operator()(const object_id &s) const { size_t seed = 0; spp::hash_combine(seed, s.inode); @@ -93,18 +181,23 @@ public: class blockstore { public: - spp::sparse_hash_map object_db; + spp::sparse_hash_map object_db; + spp::sparse_hash_map dirty_queue; int block_order, block_size; uint64_t block_count; allocator *data_alloc; + int journal_fd; int meta_fd; int data_fd; + uint64_t journal_offset, journal_size, journal_len; uint64_t meta_offset, meta_size, meta_len; uint64_t data_offset, data_size, data_len; - blockstore(std::unordered_map & config) + uint64_t journal_start, journal_end; + + blockstore(spp::sparse_hash_map & config) { block_order = stoll(config["block_size_order"]); block_size = 1 << block_order; @@ -145,7 +238,7 @@ public: close(journal_fd); } - void calc_lengths(std::unordered_map & config) + void calc_lengths(spp::sparse_hash_map & config) { // data data_len = data_size - data_offset; @@ -182,7 +275,7 @@ public: } // required metadata size block_count = data_len / block_size; - uint64_t meta_required = block_count * sizeof(meta_entry); + uint64_t meta_required = block_count * sizeof(clean_disk_entry); if (meta_len < meta_required) { throw new std::runtime_error("Metadata area is too small"); @@ -203,7 +296,7 @@ public: } } - void open_data(std::unordered_map & config) + void open_data(spp::sparse_hash_map & config) { int sectsize; data_offset = stoll(config["data_offset"]); @@ -211,7 +304,7 @@ public: { throw new std::runtime_error("data_offset not aligned"); } - data_fd = open(config["data_device"], O_DIRECT|O_RDWR); + data_fd = open(config["data_device"].c_str(), O_DIRECT|O_RDWR); if (data_fd == -1) { throw new std::runtime_error("Failed to open data device"); @@ -228,7 +321,7 @@ public: } } - void open_meta(std::unordered_map & config) + void open_meta(spp::sparse_hash_map & config) { int sectsize; meta_offset = stoll(config["meta_offset"]); @@ -239,7 +332,7 @@ public: if (config["meta_device"] != "") { meta_offset = 0; - meta_fd = open(config["meta_device"], O_DIRECT|O_RDWR); + meta_fd = open(config["meta_device"].c_str(), O_DIRECT|O_RDWR); if (meta_fd == -1) { throw new std::runtime_error("Failed to open metadata device"); @@ -266,7 +359,7 @@ public: } } - void open_journal(std::unordered_map & config) + void open_journal(spp::sparse_hash_map & config) { int sectsize; journal_offset = stoll(config["journal_offset"]); @@ -276,7 +369,7 @@ public: } if (config["journal_device"] != "") { - journal_fd = open(config["journal_device"], O_DIRECT|O_RDWR); + journal_fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR); if (journal_fd == -1) { throw new std::runtime_error("Failed to open journal device"); @@ -299,15 +392,83 @@ public: } } - int read(oid stripe, uint32_t offset, uint32_t len, void *buf, void (*callback)(int arg), int arg) + struct read_fulfill { - auto o = object_db.find(stripe); - if (o == object_db.end()) + uint64_t flags; + uint64_t offset; + uint64_t len; + void *buf; + }; + + void fulfill_read(std::map & fulfill, uint8_t* buf, uint32_t offset, uint32_t len, + uint32_t item_start, uint32_t dirty_end, uint32_t item_state, uint64_t item_location) + { + uint32_t dirty_start = item_start; + if (dirty_start < offset+len && dirty_end > offset) + { + dirty_start = dirty_start < offset ? offset : dirty_start; + dirty_end = dirty_end > offset+len ? offset+len : dirty_end; + auto fulfill_near = fulfill.lower_bound(dirty_start); + if (fulfill_near != fulfill.begin()) + { + fulfill_near--; + if (fulfill_near->second.offset + fulfill_near->second.len <= dirty_start) + fulfill_near++; + } + while (fulfill_near != fulfill.end() && fulfill_near->second.offset < dirty_end) + { + if (fulfill_near->second.offset > dirty_start) + { + fulfill[dirty_start] = (read_fulfill){ + item_state, + item_location + dirty_start - item_start, + fulfill_near->second.offset - dirty_start, + buf + dirty_start - offset, + }; + } + dirty_start = fulfill_near->second.offset + fulfill_near->second.len; + } + if (dirty_start < dirty_end) + { + fulfill[dirty_start] = (read_fulfill){ + item_state, + item_location + dirty_start - item_start, + dirty_end - dirty_start, + buf + dirty_start - offset + }; + } + } + } + + // flags: READ_DIRTY +#define READ_DIRTY 1 + int read(object_id oid, uint32_t offset, uint32_t len, uint32_t flags, uint8_t *buf, void (*callback)(int arg), int arg) + { + auto clean_it = object_db.find(oid); + auto dirty_it = dirty_queue.find(oid); + if (clean_it == object_db.end() && dirty_it == object_db.end()) { memset(buf, 0, len); callback(arg); - return; + return 0; + } + uint64_t fulfilled = 0; + std::map fulfill; + //std::vector fulfill; + if (dirty_it != object_db.end()) + { + dirty_list dirty = dirty_it->second; + for (int i = dirty.size()-1; i >= 0; i--) + { + if ((flags & READ_DIRTY) || IS_STABLE(dirty[i].state)) + { + fulfill_read(fulfill, buf, offset, len, dirty[i].offset, dirty[i].offset + dirty[i].size, IS_JOURNAL(dirty[i].state), dirty[i].location); + } + } + } + if (clean_it != object_db.end()) + { + fulfill_read(fulfill, buf, offset, len, 0, block_size, 0, clean_it->second.location); } - auto info = o->second; } };