journal structures + read fulfill

blocking-uring-test
Vitaliy Filippov 2019-11-01 02:47:57 +03:00
parent f1b2d3d3b4
commit db3b810588
1 changed files with 203 additions and 42 deletions

View File

@ -3,7 +3,13 @@
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
#include <unistd.h>
#include <stdint.h> #include <stdint.h>
#include <linux/fs.h>
#include <vector>
#include <map>
#include "allocator.h" #include "allocator.h"
#include "sparsepp/sparsepp/spp.h" #include "sparsepp/sparsepp/spp.h"
@ -23,65 +29,147 @@
#define ST_D_META_MOVED 21 #define ST_D_META_MOVED 21
#define ST_D_META_COMMITTED 22 #define ST_D_META_COMMITTED 22
#define ST_CURRENT 32 #define ST_CURRENT 32
#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32)
#define IS_JOURNAL(st) (st >= 2 && st <= 6)
// Default object size is 128 KB
#define DEFAULT_ORDER 17 #define DEFAULT_ORDER 17
#define MAX_BLOCK_SIZE 128*1024*1024 #define MAX_BLOCK_SIZE 128*1024*1024
#define DISK_ALIGNMENT 4096 #define DISK_ALIGNMENT 4096
#define MIN_JOURNAL_SIZE 4*1024*1024 #define MIN_JOURNAL_SIZE 4*1024*1024
#define JOURNAL_MAGIC 0x4A33
#define STRIPE_NUM(oid) ((oid) >> 4) #define STRIPE_NUM(oid) ((oid) >> 4)
#define STRIPE_REPLICA(oid) ((oid) & 0xf) #define STRIPE_REPLICA(oid) ((oid) & 0xf)
struct __attribute__((__packed__)) oid // 16 bytes per object/stripe id
// stripe includes replica number in 4 least significant bits
struct __attribute__((__packed__)) object_id
{ {
uint64_t inode; uint64_t inode;
uint64_t stripe; uint64_t stripe;
}; };
struct __attribute__((__packed__)) meta_entry bool operator == (const object_id & a, const object_id & b)
{
return b.inode == a.inode && b.stripe == a.stripe;
}
// 32 bytes per "clean" entry on disk with fixed metadata tables
struct __attribute__((__packed__)) clean_disk_entry
{ {
uint64_t inode; uint64_t inode;
uint64_t stripe; uint64_t stripe;
uint32_t epoch; uint64_t version;
uint32_t version; uint8_t flags;
uint64_t location_flags; uint8_t reserved[7];
}; };
struct __attribute__((__packed__)) object_version // 28 bytes per "clean" entry in memory
struct __attribute__((__packed__)) clean_entry
{ {
uint32_t epoch; uint64_t version;
uint32_t version;
uint64_t location;
uint32_t size;
uint32_t state; uint32_t state;
uint64_t location;
bool in_journal()
{
return (location & (1 << 63));
}
uint64_t offset()
{
return (location & ~(1 << 63));
}
}; };
struct __attribute__((__packed__)) object_ver_list // 48 bytes per dirty entry in memory
struct __attribute__((__packed__)) dirty_entry
{ {
uint64_t count; uint64_t version;
object_version versions[]; uint32_t state;
uint32_t flags;
uint64_t location; // location in either journal or data
uint32_t offset; // offset within stripe
uint32_t size; // entry size
}; };
struct __attribute__((__packed__)) object_info // Journal entries
// Journal entries are linked to each other by their crc32 value
// The journal is almost a blockchain, because object versions constantly increase
#define JE_START 0x01
#define JE_SMALL_WRITE 0x02
#define JE_BIG_WRITE 0x03
#define JE_STABLE 0x04
#define JE_DELETE 0x05
struct __attribute__((__packed__)) journal_entry_start
{ {
object_version first; uint32_t type;
object_ver_list *other; uint32_t size;
uint32_t crc32;
uint32_t reserved1;
uint64_t offset;
}; };
struct __attribute__((__packed__)) journal_entry_small_write
{
uint32_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
uint32_t offset;
uint32_t len;
};
struct __attribute__((__packed__)) journal_entry_big_write
{
uint32_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
uint64_t block;
};
struct __attribute__((__packed__)) journal_entry_stable
{
uint32_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
};
struct __attribute__((__packed__)) journal_entry_del
{
uint32_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
};
struct __attribute__((__packed__)) journal_entry
{
union
{
struct __attribute__((__packed__))
{
uint16_t magic;
uint16_t type;
uint32_t size;
uint32_t crc32;
};
journal_entry_start start;
journal_entry_small_write small_write;
journal_entry_big_write big_write;
journal_entry_stable stable;
journal_entry_del del;
};
};
typedef std::vector<dirty_entry> dirty_list;
class oid_hash class oid_hash
{ {
public: public:
size_t operator()(const oid &s) const size_t operator()(const object_id &s) const
{ {
size_t seed = 0; size_t seed = 0;
spp::hash_combine(seed, s.inode); spp::hash_combine(seed, s.inode);
@ -93,18 +181,23 @@ public:
class blockstore class blockstore
{ {
public: public:
spp::sparse_hash_map<oid, object_info, oid_hash> object_db; spp::sparse_hash_map<object_id, clean_entry, oid_hash> object_db;
spp::sparse_hash_map<object_id, dirty_list, oid_hash> dirty_queue;
int block_order, block_size; int block_order, block_size;
uint64_t block_count; uint64_t block_count;
allocator *data_alloc; allocator *data_alloc;
int journal_fd; int journal_fd;
int meta_fd; int meta_fd;
int data_fd; int data_fd;
uint64_t journal_offset, journal_size, journal_len; uint64_t journal_offset, journal_size, journal_len;
uint64_t meta_offset, meta_size, meta_len; uint64_t meta_offset, meta_size, meta_len;
uint64_t data_offset, data_size, data_len; uint64_t data_offset, data_size, data_len;
blockstore(std::unordered_map<std::string, std::string> & config) uint64_t journal_start, journal_end;
blockstore(spp::sparse_hash_map<std::string, std::string> & config)
{ {
block_order = stoll(config["block_size_order"]); block_order = stoll(config["block_size_order"]);
block_size = 1 << block_order; block_size = 1 << block_order;
@ -145,7 +238,7 @@ public:
close(journal_fd); close(journal_fd);
} }
void calc_lengths(std::unordered_map<std::string, std::string> & config) void calc_lengths(spp::sparse_hash_map<std::string, std::string> & config)
{ {
// data // data
data_len = data_size - data_offset; data_len = data_size - data_offset;
@ -182,7 +275,7 @@ public:
} }
// required metadata size // required metadata size
block_count = data_len / block_size; block_count = data_len / block_size;
uint64_t meta_required = block_count * sizeof(meta_entry); uint64_t meta_required = block_count * sizeof(clean_disk_entry);
if (meta_len < meta_required) if (meta_len < meta_required)
{ {
throw new std::runtime_error("Metadata area is too small"); throw new std::runtime_error("Metadata area is too small");
@ -203,7 +296,7 @@ public:
} }
} }
void open_data(std::unordered_map<std::string, std::string> & config) void open_data(spp::sparse_hash_map<std::string, std::string> & config)
{ {
int sectsize; int sectsize;
data_offset = stoll(config["data_offset"]); data_offset = stoll(config["data_offset"]);
@ -211,7 +304,7 @@ public:
{ {
throw new std::runtime_error("data_offset not aligned"); throw new std::runtime_error("data_offset not aligned");
} }
data_fd = open(config["data_device"], O_DIRECT|O_RDWR); data_fd = open(config["data_device"].c_str(), O_DIRECT|O_RDWR);
if (data_fd == -1) if (data_fd == -1)
{ {
throw new std::runtime_error("Failed to open data device"); throw new std::runtime_error("Failed to open data device");
@ -228,7 +321,7 @@ public:
} }
} }
void open_meta(std::unordered_map<std::string, std::string> & config) void open_meta(spp::sparse_hash_map<std::string, std::string> & config)
{ {
int sectsize; int sectsize;
meta_offset = stoll(config["meta_offset"]); meta_offset = stoll(config["meta_offset"]);
@ -239,7 +332,7 @@ public:
if (config["meta_device"] != "") if (config["meta_device"] != "")
{ {
meta_offset = 0; meta_offset = 0;
meta_fd = open(config["meta_device"], O_DIRECT|O_RDWR); meta_fd = open(config["meta_device"].c_str(), O_DIRECT|O_RDWR);
if (meta_fd == -1) if (meta_fd == -1)
{ {
throw new std::runtime_error("Failed to open metadata device"); throw new std::runtime_error("Failed to open metadata device");
@ -266,7 +359,7 @@ public:
} }
} }
void open_journal(std::unordered_map<std::string, std::string> & config) void open_journal(spp::sparse_hash_map<std::string, std::string> & config)
{ {
int sectsize; int sectsize;
journal_offset = stoll(config["journal_offset"]); journal_offset = stoll(config["journal_offset"]);
@ -276,7 +369,7 @@ public:
} }
if (config["journal_device"] != "") if (config["journal_device"] != "")
{ {
journal_fd = open(config["journal_device"], O_DIRECT|O_RDWR); journal_fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR);
if (journal_fd == -1) if (journal_fd == -1)
{ {
throw new std::runtime_error("Failed to open journal device"); throw new std::runtime_error("Failed to open journal device");
@ -299,15 +392,83 @@ public:
} }
} }
int read(oid stripe, uint32_t offset, uint32_t len, void *buf, void (*callback)(int arg), int arg) struct read_fulfill
{ {
auto o = object_db.find(stripe); uint64_t flags;
if (o == object_db.end()) uint64_t offset;
uint64_t len;
void *buf;
};
void fulfill_read(std::map<uint64_t, read_fulfill> & fulfill, uint8_t* buf, uint32_t offset, uint32_t len,
uint32_t item_start, uint32_t dirty_end, uint32_t item_state, uint64_t item_location)
{
uint32_t dirty_start = item_start;
if (dirty_start < offset+len && dirty_end > offset)
{
dirty_start = dirty_start < offset ? offset : dirty_start;
dirty_end = dirty_end > offset+len ? offset+len : dirty_end;
auto fulfill_near = fulfill.lower_bound(dirty_start);
if (fulfill_near != fulfill.begin())
{
fulfill_near--;
if (fulfill_near->second.offset + fulfill_near->second.len <= dirty_start)
fulfill_near++;
}
while (fulfill_near != fulfill.end() && fulfill_near->second.offset < dirty_end)
{
if (fulfill_near->second.offset > dirty_start)
{
fulfill[dirty_start] = (read_fulfill){
item_state,
item_location + dirty_start - item_start,
fulfill_near->second.offset - dirty_start,
buf + dirty_start - offset,
};
}
dirty_start = fulfill_near->second.offset + fulfill_near->second.len;
}
if (dirty_start < dirty_end)
{
fulfill[dirty_start] = (read_fulfill){
item_state,
item_location + dirty_start - item_start,
dirty_end - dirty_start,
buf + dirty_start - offset
};
}
}
}
// flags: READ_DIRTY
#define READ_DIRTY 1
int read(object_id oid, uint32_t offset, uint32_t len, uint32_t flags, uint8_t *buf, void (*callback)(int arg), int arg)
{
auto clean_it = object_db.find(oid);
auto dirty_it = dirty_queue.find(oid);
if (clean_it == object_db.end() && dirty_it == object_db.end())
{ {
memset(buf, 0, len); memset(buf, 0, len);
callback(arg); callback(arg);
return; return 0;
}
uint64_t fulfilled = 0;
std::map<uint64_t, read_fulfill> fulfill;
//std::vector<read_fulfill> fulfill;
if (dirty_it != object_db.end())
{
dirty_list dirty = dirty_it->second;
for (int i = dirty.size()-1; i >= 0; i--)
{
if ((flags & READ_DIRTY) || IS_STABLE(dirty[i].state))
{
fulfill_read(fulfill, buf, offset, len, dirty[i].offset, dirty[i].offset + dirty[i].size, IS_JOURNAL(dirty[i].state), dirty[i].location);
}
}
}
if (clean_it != object_db.end())
{
fulfill_read(fulfill, buf, offset, len, 0, block_size, 0, clean_it->second.location);
} }
auto info = o->second;
} }
}; };