Browse Source

Split into multiple files, begin init_loop, adjust read

blocking-uring-test
Vitaliy Filippov 3 years ago
parent
commit
f4705d81d7
  1. 4
      Makefile
  2. 571
      blockstore.cpp
  3. 203
      blockstore.h
  4. 90
      blockstore_journal.h
  5. 160
      blockstore_open.cpp
  6. 142
      blockstore_read.cpp
  7. 62
      test.cpp

4
Makefile

@ -1,3 +1,5 @@
all: allocator.o blockstore.o
all: allocator.o blockstore.o blockstore_open.o blockstore_read.o test
%.o: %.cpp
gcc -c -o $@ $<
test: test.cpp
gcc -o test -luring test.cpp

571
blockstore.cpp

@ -1,234 +1,27 @@
#define _LARGEFILE64_SOURCE
#include <sys/types.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdint.h>
#include <linux/fs.h>
#include "blockstore.h"
#include <vector>
#include <map>
#include "allocator.h"
#include "sparsepp/sparsepp/spp.h"
// States are not stored on disk. Instead, they're deduced from the journal
#define ST_IN_FLIGHT 1
#define ST_J_WRITTEN 2
#define ST_J_SYNCED 3
#define ST_J_STABLE 4
#define ST_J_MOVED 5
#define ST_J_MOVE_SYNCED 6
#define ST_D_WRITTEN 16
#define ST_D_SYNCED 17
#define ST_D_META_WRITTEN 18
#define ST_D_META_SYNCED 19
#define ST_D_STABLE 20
#define ST_D_META_MOVED 21
#define ST_D_META_COMMITTED 22
#define ST_CURRENT 32
#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32)
#define IS_JOURNAL(st) (st >= 2 && st <= 6)
// Default object size is 128 KB
#define DEFAULT_ORDER 17
#define MAX_BLOCK_SIZE 128*1024*1024
#define DISK_ALIGNMENT 4096
#define MIN_JOURNAL_SIZE 4*1024*1024
#define JOURNAL_MAGIC 0x4A33
#define STRIPE_NUM(oid) ((oid) >> 4)
#define STRIPE_REPLICA(oid) ((oid) & 0xf)
// 16 bytes per object/stripe id
// stripe includes replica number in 4 least significant bits
struct __attribute__((__packed__)) object_id
{
uint64_t inode;
uint64_t stripe;
};
bool operator == (const object_id & a, const object_id & b)
{
return b.inode == a.inode && b.stripe == a.stripe;
}
// 32 bytes per "clean" entry on disk with fixed metadata tables
struct __attribute__((__packed__)) clean_disk_entry
{
uint64_t inode;
uint64_t stripe;
uint64_t version;
uint8_t flags;
uint8_t reserved[7];
};
// 28 bytes per "clean" entry in memory
struct __attribute__((__packed__)) clean_entry
{
uint64_t version;
uint32_t state;
uint64_t location;
};
// 48 bytes per dirty entry in memory
struct __attribute__((__packed__)) dirty_entry
{
uint64_t version;
uint32_t state;
uint32_t flags;
uint64_t location; // location in either journal or data
uint32_t offset; // offset within stripe
uint32_t size; // entry size
};
// Journal entries
// Journal entries are linked to each other by their crc32 value
// The journal is almost a blockchain, because object versions constantly increase
#define JE_START 0x01
#define JE_SMALL_WRITE 0x02
#define JE_BIG_WRITE 0x03
#define JE_STABLE 0x04
#define JE_DELETE 0x05
struct __attribute__((__packed__)) journal_entry_start
blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config, io_uring *ring)
{
uint32_t type;
uint32_t size;
uint32_t crc32;
uint32_t reserved1;
uint64_t offset;
};
struct __attribute__((__packed__)) journal_entry_small_write
{
uint32_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
uint32_t offset;
uint32_t len;
};
struct __attribute__((__packed__)) journal_entry_big_write
{
uint32_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
uint64_t block;
};
struct __attribute__((__packed__)) journal_entry_stable
{
uint32_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
};
struct __attribute__((__packed__)) journal_entry_del
{
uint32_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
};
struct __attribute__((__packed__)) journal_entry
{
union
this->ring = ring;
initialized = 0;
block_order = stoull(config["block_size_order"]);
block_size = 1 << block_order;
if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE)
{
struct __attribute__((__packed__))
{
uint16_t magic;
uint16_t type;
uint32_t size;
uint32_t crc32;
};
journal_entry_start start;
journal_entry_small_write small_write;
journal_entry_big_write big_write;
journal_entry_stable stable;
journal_entry_del del;
};
};
typedef std::vector<dirty_entry> dirty_list;
class oid_hash
{
public:
size_t operator()(const object_id &s) const
{
size_t seed = 0;
spp::hash_combine(seed, s.inode);
spp::hash_combine(seed, s.stripe);
return seed;
throw new std::runtime_error("Bad block size");
}
};
class blockstore
{
public:
spp::sparse_hash_map<object_id, clean_entry, oid_hash> object_db;
spp::sparse_hash_map<object_id, dirty_list, oid_hash> dirty_queue;
int block_order, block_size;
uint64_t block_count;
allocator *data_alloc;
int journal_fd;
int meta_fd;
int data_fd;
uint64_t journal_offset, journal_size, journal_len;
uint64_t meta_offset, meta_size, meta_len;
uint64_t data_offset, data_size, data_len;
uint64_t journal_start, journal_end;
blockstore(spp::sparse_hash_map<std::string, std::string> & config)
data_fd = meta_fd = journal_fd = -1;
try
{
block_order = stoll(config["block_size_order"]);
block_size = 1 << block_order;
if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE)
{
throw new std::runtime_error("Bad block size");
}
data_fd = meta_fd = journal_fd = -1;
try
{
open_data(config);
open_meta(config);
open_journal(config);
calc_lengths(config);
data_alloc = allocator_create(block_count);
if (!data_alloc)
throw new std::bad_alloc();
}
catch (std::exception & e)
{
if (data_fd >= 0)
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal_fd >= 0 && journal_fd != meta_fd)
close(journal_fd);
throw e;
}
open_data(config);
open_meta(config);
open_journal(config);
calc_lengths(config);
data_alloc = allocator_create(block_count);
if (!data_alloc)
throw new std::bad_alloc();
}
~blockstore()
catch (std::exception & e)
{
if (data_fd >= 0)
close(data_fd);
@ -236,239 +29,145 @@ public:
close(meta_fd);
if (journal_fd >= 0 && journal_fd != meta_fd)
close(journal_fd);
throw e;
}
}
void calc_lengths(spp::sparse_hash_map<std::string, std::string> & config)
blockstore::~blockstore()
{
if (data_fd >= 0)
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal_fd >= 0 && journal_fd != meta_fd)
close(journal_fd);
}
// must be called in the event loop until it returns 0
int blockstore::init_loop()
{
// read metadata, then journal
if (initialized)
{
// data
data_len = data_size - data_offset;
if (data_fd == meta_fd && data_offset < meta_offset)
{
data_len = meta_offset - data_offset;
}
if (data_fd == journal_fd && data_offset < journal_offset)
{
data_len = data_len < journal_offset-data_offset
? data_len : journal_offset-data_offset;
}
// meta
meta_len = (meta_fd == data_fd ? data_size : meta_size) - meta_offset;
if (meta_fd == data_fd && meta_offset < data_offset)
{
meta_len = data_offset - meta_offset;
}
if (meta_fd == journal_fd && meta_offset < journal_offset)
{
meta_len = meta_len < journal_offset-meta_offset
? meta_len : journal_offset-meta_offset;
}
// journal
journal_len = (journal_fd == data_fd ? data_size : (journal_fd == meta_fd ? meta_size : journal_size)) - journal_offset;
if (journal_fd == data_fd && journal_offset < data_offset)
{
journal_len = data_offset - journal_offset;
}
if (journal_fd == meta_fd && journal_offset < meta_offset)
{
journal_len = journal_len < meta_offset-journal_offset
? journal_len : meta_offset-journal_offset;
}
// required metadata size
block_count = data_len / block_size;
uint64_t meta_required = block_count * sizeof(clean_disk_entry);
if (meta_len < meta_required)
{
throw new std::runtime_error("Metadata area is too small");
}
// requested journal size
uint64_t journal_wanted = stoll(config["journal_size"]);
if (journal_wanted > journal_len)
{
throw new std::runtime_error("Requested journal_size is too large");
}
else if (journal_wanted > 0)
{
journal_len = journal_wanted;
}
if (journal_len < MIN_JOURNAL_SIZE)
{
throw new std::runtime_error("Journal is too small");
}
return 0;
}
void open_data(spp::sparse_hash_map<std::string, std::string> & config)
if (!metadata_init_reader)
{
int sectsize;
data_offset = stoll(config["data_offset"]);
if (data_offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("data_offset not aligned");
}
data_fd = open(config["data_device"].c_str(), O_DIRECT|O_RDWR);
if (data_fd == -1)
{
throw new std::runtime_error("Failed to open data device");
}
if (ioctl(data_fd, BLKSSZGET, &sectsize) < 0 ||
ioctl(data_fd, BLKGETSIZE64, &data_size) < 0 ||
sectsize != 512)
{
throw new std::runtime_error("Data device sector is not equal to 512 bytes");
}
if (data_offset >= data_size)
{
throw new std::runtime_error("data_offset exceeds device size");
}
metadata_init_reader = new blockstore_init_meta(this);
}
void open_meta(spp::sparse_hash_map<std::string, std::string> & config)
if (metadata_init_reader->read_loop())
{
int sectsize;
meta_offset = stoll(config["meta_offset"]);
if (meta_offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("meta_offset not aligned");
}
if (config["meta_device"] != "")
{
meta_offset = 0;
meta_fd = open(config["meta_device"].c_str(), O_DIRECT|O_RDWR);
if (meta_fd == -1)
{
throw new std::runtime_error("Failed to open metadata device");
}
if (ioctl(meta_fd, BLKSSZGET, &sectsize) < 0 ||
ioctl(meta_fd, BLKGETSIZE64, &meta_size) < 0 ||
sectsize != 512)
{
throw new std::runtime_error("Metadata device sector is not equal to 512 bytes (or ioctl failed)");
}
if (meta_offset >= meta_size)
{
throw new std::runtime_error("meta_offset exceeds device size");
}
}
else
{
meta_fd = data_fd;
meta_size = 0;
if (meta_offset >= data_size)
{
throw new std::runtime_error("meta_offset exceeds device size");
}
}
return 1;
}
if (!journal_init_reader)
{
journal_init_reader = new blockstore_init_journal(this);
}
if (journal_init_reader->read_loop())
{
return 1;
}
initialized = true;
delete metadata_init_reader;
delete journal_init_reader;
metadata_init_reader = NULL;
journal_init_reader = NULL;
return 0;
}
blockstore_init_meta::blockstore_init_meta(blockstore *bs)
{
this->bs = bs;
}
void open_journal(spp::sparse_hash_map<std::string, std::string> & config)
int blockstore_init_meta::read_loop()
{
if (metadata_read >= bs->meta_len)
{
int sectsize;
journal_offset = stoll(config["journal_offset"]);
if (journal_offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("journal_offset not aligned");
}
if (config["journal_device"] != "")
return 0;
}
if (!metadata_buffer)
{
metadata_buffer = new uint8_t[2*bs->metadata_buf_size];
}
if (submitted)
{
struct io_uring_cqe *cqe;
io_uring_peek_cqe(bs->ring, &cqe);
if (cqe)
{
journal_fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR);
if (journal_fd == -1)
{
throw new std::runtime_error("Failed to open journal device");
}
if (ioctl(journal_fd, BLKSSZGET, &sectsize) < 0 ||
ioctl(journal_fd, BLKGETSIZE64, &journal_size) < 0 ||
sectsize != 512)
if (cqe->res < 0)
{
throw new std::runtime_error("Journal device sector is not equal to 512 bytes");
throw new std::runtime_error(
std::string("read metadata failed at offset ") + std::to_string(metadata_read) +
std::string(": ") + strerror(-cqe->res)
);
}
prev_done = cqe->res > 0 ? submitted : 0;
done_len = cqe->res;
metadata_read += cqe->res;
submitted = 0;
}
else
}
if (!submitted)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring);
if (!sqe)
{
journal_fd = meta_fd;
journal_size = 0;
if (journal_offset >= data_size)
{
throw new std::runtime_error("journal_offset exceeds device size");
}
throw new std::runtime_error("io_uring is full while trying to read metadata");
}
submit_iov = {
metadata_buffer + (prev == 1 ? bs->metadata_buf_size : 0),
bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read,
};
io_uring_prep_readv(sqe, bs->meta_fd, &submit_iov, 1, bs->meta_offset + metadata_read);
io_uring_submit(bs->ring);
submitted = (prev == 1 ? 2 : 1);
prev = submitted;
}
struct read_fulfill
if (prev_done)
{
uint64_t flags;
uint64_t offset;
uint64_t len;
void *buf;
};
void fulfill_read(std::map<uint64_t, read_fulfill> & fulfill, uint8_t* buf, uint32_t offset, uint32_t len,
uint32_t item_start, uint32_t dirty_end, uint32_t item_state, uint64_t item_location)
assert(!(done_len % sizeof(clean_disk_entry)));
int count = done_len / sizeof(clean_disk_entry);
struct clean_disk_entry *entries = (struct clean_disk_entry*)(metadata_buffer + (prev_done == 1 ? bs->metadata_buf_size : 0));
// handle <count> entries
handle_entries(entries, count);
done_cnt += count;
prev_done = 0;
done_len = 0;
}
if (metadata_read >= bs->meta_len)
{
uint32_t dirty_start = item_start;
if (dirty_start < offset+len && dirty_end > offset)
{
dirty_start = dirty_start < offset ? offset : dirty_start;
dirty_end = dirty_end > offset+len ? offset+len : dirty_end;
auto fulfill_near = fulfill.lower_bound(dirty_start);
if (fulfill_near != fulfill.begin())
{
fulfill_near--;
if (fulfill_near->second.offset + fulfill_near->second.len <= dirty_start)
fulfill_near++;
}
while (fulfill_near != fulfill.end() && fulfill_near->second.offset < dirty_end)
{
if (fulfill_near->second.offset > dirty_start)
{
fulfill[dirty_start] = (read_fulfill){
item_state,
item_location + dirty_start - item_start,
fulfill_near->second.offset - dirty_start,
buf + dirty_start - offset,
};
}
dirty_start = fulfill_near->second.offset + fulfill_near->second.len;
}
if (dirty_start < dirty_end)
{
fulfill[dirty_start] = (read_fulfill){
item_state,
item_location + dirty_start - item_start,
dirty_end - dirty_start,
buf + dirty_start - offset
};
}
}
// metadata read finished
delete[] metadata_buffer;
metadata_buffer = NULL;
return 0;
}
return 1;
}
// flags: READ_DIRTY
#define READ_DIRTY 1
int read(object_id oid, uint32_t offset, uint32_t len, uint32_t flags, uint8_t *buf, void (*callback)(int arg), int arg)
void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int count)
{
for (unsigned i = 0; i < count; i++)
{
auto clean_it = object_db.find(oid);
auto dirty_it = dirty_queue.find(oid);
if (clean_it == object_db.end() && dirty_it == object_db.end())
{
memset(buf, 0, len);
callback(arg);
return 0;
}
uint64_t fulfilled = 0;
std::map<uint64_t, read_fulfill> fulfill;
//std::vector<read_fulfill> fulfill;
if (dirty_it != object_db.end())
{
dirty_list dirty = dirty_it->second;
for (int i = dirty.size()-1; i >= 0; i--)
{
if ((flags & READ_DIRTY) || IS_STABLE(dirty[i].state))
{
fulfill_read(fulfill, buf, offset, len, dirty[i].offset, dirty[i].offset + dirty[i].size, IS_JOURNAL(dirty[i].state), dirty[i].location);
}
}
}
if (clean_it != object_db.end())
if (entries[i].oid.inode > 0)
{
fulfill_read(fulfill, buf, offset, len, 0, block_size, 0, clean_it->second.location);
allocator_set(bs->data_alloc, done_cnt+i, true);
bs->object_db[entries[i].oid] = (struct clean_entry){
entries[i].version,
(uint32_t)(entries[i].flags ? ST_CURRENT : ST_D_META_SYNCED),
done_cnt+i
};
}
}
};
}
blockstore_init_journal::blockstore_init_journal(blockstore *bs)
{
this->bs = bs;
}
int blockstore_init_journal::read_loop()
{
return 0;
}

203
blockstore.h

@ -0,0 +1,203 @@
#pragma once
#define _LARGEFILE64_SOURCE
#include <sys/types.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdint.h>
#include <linux/fs.h>
#include <liburing.h>
#include <vector>
#include <map>
#include <deque>
#include <set>
#include <functional>
#include "allocator.h"
#include "sparsepp/sparsepp/spp.h"
// States are not stored on disk. Instead, they're deduced from the journal
#define ST_IN_FLIGHT 1
#define ST_J_WRITTEN 2
#define ST_J_SYNCED 3
#define ST_J_STABLE 4
#define ST_J_MOVED 5
#define ST_J_MOVE_SYNCED 6
#define ST_D_WRITTEN 16
#define ST_D_SYNCED 17
#define ST_D_META_WRITTEN 18
#define ST_D_META_SYNCED 19
#define ST_D_STABLE 20
#define ST_D_META_MOVED 21
#define ST_D_META_COMMITTED 22
#define ST_CURRENT 32
#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32)
#define IS_JOURNAL(st) (st >= 2 && st <= 6)
// Default object size is 128 KB
#define DEFAULT_ORDER 17
#define MAX_BLOCK_SIZE 128*1024*1024
#define DISK_ALIGNMENT 512
#define STRIPE_NUM(oid) ((oid) >> 4)
#define STRIPE_REPLICA(oid) ((oid) & 0xf)
// 16 bytes per object/stripe id
// stripe includes replica number in 4 least significant bits
struct __attribute__((__packed__)) object_id
{
uint64_t inode;
uint64_t stripe;
};
#include "blockstore_journal.h"
inline bool operator == (const object_id & a, const object_id & b)
{
return b.inode == a.inode && b.stripe == a.stripe;
}
// 32 bytes per "clean" entry on disk with fixed metadata tables
struct __attribute__((__packed__)) clean_disk_entry
{
object_id oid;
uint64_t version;
uint8_t flags;
uint8_t reserved[7];
};
// 28 bytes per "clean" entry in memory
struct __attribute__((__packed__)) clean_entry
{
uint64_t version;
uint32_t state;
uint64_t location;
};
// 48 bytes per dirty entry in memory
struct __attribute__((__packed__)) dirty_entry
{
uint64_t version;
uint32_t state;
uint32_t flags;
uint64_t location; // location in either journal or data
uint32_t offset; // offset within stripe
uint32_t size; // entry size
};
typedef std::vector<dirty_entry> dirty_list;
class oid_hash
{
public:
size_t operator()(const object_id &s) const
{
size_t seed = 0;
spp::hash_combine(seed, s.inode);
spp::hash_combine(seed, s.stripe);
return seed;
}
};
// SYNC must be submitted after previous WRITEs/DELETEs (not before!)
// READs to the same object must be submitted after previous WRITEs/DELETEs
// Otherwise, the submit order is free, that is all operations may be submitted immediately
// In fact, adding a write operation must immediately result in dirty_queue being populated
#define OP_READ 1
#define OP_READ_DIRTY 2
#define OP_WRITE 3
#define OP_SYNC 4
#define OP_STABLE 5
#define OP_DELETE 6
#define WAIT_SQE 1
#define WAIT_IN_FLIGHT 2
struct blockstore_operation
{
std::function<void (blockstore_operation*)> callback;
uint32_t flags;
object_id oid;
uint64_t version;
uint32_t offset;
uint32_t len;
uint8_t *buf;
std::map<uint64_t, struct iovec> read_vec;
int completed;
int wait_for;
uint64_t wait_version;
};
class blockstore;
class blockstore_init_meta
{
blockstore *bs;
uint8_t *metadata_buffer;
uint64_t metadata_read = 0;
struct iovec submit_iov;
int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0;
public:
blockstore_init_meta(blockstore* bs);
int read_loop();
void handle_entries(struct clean_disk_entry* entries, int count);
};
class blockstore_init_journal
{
blockstore *bs;
public:
blockstore_init_journal(blockstore* bs);
int read_loop();
};
class blockstore
{
public:
spp::sparse_hash_map<object_id, clean_entry, oid_hash> object_db;
spp::sparse_hash_map<object_id, dirty_list, oid_hash> dirty_queue;
std::deque<blockstore_operation*> submit_queue;
std::set<blockstore_operation*> in_process_ops;
int block_order, block_size;
uint64_t block_count;
allocator *data_alloc;
int journal_fd;
int meta_fd;
int data_fd;
uint64_t journal_offset, journal_size, journal_len;
uint64_t meta_offset, meta_size, meta_area, meta_len;
uint64_t data_offset, data_size, data_len;
uint64_t journal_start, journal_end;
struct io_uring *ring;
blockstore(spp::sparse_hash_map<std::string, std::string> & config, struct io_uring *ring);
~blockstore();
void calc_lengths(spp::sparse_hash_map<std::string, std::string> & config);
void open_data(spp::sparse_hash_map<std::string, std::string> & config);
void open_meta(spp::sparse_hash_map<std::string, std::string> & config);
void open_journal(spp::sparse_hash_map<std::string, std::string> & config);
// Asynchronous init
int initialized;
int metadata_buf_size;
blockstore_init_meta* metadata_init_reader;
blockstore_init_journal* journal_init_reader;
int init_loop();
// Read
int read(blockstore_operation *read_op);
int fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location);
};

90
blockstore_journal.h

@ -0,0 +1,90 @@
#pragma once
#define MIN_JOURNAL_SIZE 4*1024*1024
#define JOURNAL_MAGIC 0x4A33
// Journal entries
// Journal entries are linked to each other by their crc32 value
// The journal is almost a blockchain, because object versions constantly increase
#define JE_START 0x01
#define JE_SMALL_WRITE 0x02
#define JE_BIG_WRITE 0x03
#define JE_STABLE 0x04
#define JE_DELETE 0x05
struct __attribute__((__packed__)) journal_entry_start
{
uint16_t magic;
uint16_t type;
uint32_t size;
uint32_t crc32;
uint32_t reserved1;
uint64_t offset;
};
struct __attribute__((__packed__)) journal_entry_small_write
{
uint16_t magic;
uint16_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
uint32_t offset;
uint32_t len;
// small_write entries contain <len> bytes of data, but data is stored in the next journal sector
};
struct __attribute__((__packed__)) journal_entry_big_write
{
uint16_t magic;
uint16_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
uint64_t block;
};
struct __attribute__((__packed__)) journal_entry_stable
{
uint16_t magic;
uint16_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
};
struct __attribute__((__packed__)) journal_entry_del
{
uint16_t magic;
uint16_t type;
uint32_t size;
uint32_t crc32;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
};
struct __attribute__((__packed__)) journal_entry
{
union
{
struct __attribute__((__packed__))
{
uint16_t magic;
uint16_t type;
uint32_t size;
uint32_t crc32;
};
journal_entry_start start;
journal_entry_small_write small_write;
journal_entry_big_write big_write;
journal_entry_stable stable;
journal_entry_del del;
};
};

160
blockstore_open.cpp

@ -0,0 +1,160 @@
#include "blockstore.h"
void blockstore::calc_lengths(spp::sparse_hash_map<std::string, std::string> & config)
{
// data
data_len = data_size - data_offset;
if (data_fd == meta_fd && data_offset < meta_offset)
{
data_len = meta_offset - data_offset;
}
if (data_fd == journal_fd && data_offset < journal_offset)
{
data_len = data_len < journal_offset-data_offset
? data_len : journal_offset-data_offset;
}
// meta
meta_area = (meta_fd == data_fd ? data_size : meta_size) - meta_offset;
if (meta_fd == data_fd && meta_offset < data_offset)
{
meta_area = data_offset - meta_offset;
}
if (meta_fd == journal_fd && meta_offset < journal_offset)
{
meta_area = meta_area < journal_offset-meta_offset
? meta_area : journal_offset-meta_offset;
}
// journal
journal_len = (journal_fd == data_fd ? data_size : (journal_fd == meta_fd ? meta_size : journal_size)) - journal_offset;
if (journal_fd == data_fd && journal_offset < data_offset)
{
journal_len = data_offset - journal_offset;
}
if (journal_fd == meta_fd && journal_offset < meta_offset)
{
journal_len = journal_len < meta_offset-journal_offset
? journal_len : meta_offset-journal_offset;
}
// required metadata size
block_count = data_len / block_size;
meta_len = block_count * sizeof(clean_disk_entry);
if (meta_area < meta_len)
{
throw new std::runtime_error("Metadata area is too small");
}
metadata_buf_size = stoull(config["meta_buf_size"]);
if (metadata_buf_size < 65536)
{
metadata_buf_size = 4*1024*1024;
}
// requested journal size
uint64_t journal_wanted = stoull(config["journal_size"]);
if (journal_wanted > journal_len)
{
throw new std::runtime_error("Requested journal_size is too large");
}
else if (journal_wanted > 0)
{
journal_len = journal_wanted;
}
if (journal_len < MIN_JOURNAL_SIZE)
{
throw new std::runtime_error("Journal is too small");
}
}
void blockstore::open_data(spp::sparse_hash_map<std::string, std::string> & config)
{
int sectsize;
data_offset = stoull(config["data_offset"]);
if (data_offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("data_offset not aligned");
}
data_fd = open(config["data_device"].c_str(), O_DIRECT|O_RDWR);
if (data_fd == -1)
{
throw new std::runtime_error("Failed to open data device");
}
if (ioctl(data_fd, BLKSSZGET, &sectsize) < 0 ||
ioctl(data_fd, BLKGETSIZE64, &data_size) < 0 ||
sectsize != 512)
{
throw new std::runtime_error("Data device sector is not equal to 512 bytes");
}
if (data_offset >= data_size)
{
throw new std::runtime_error("data_offset exceeds device size");
}
}
void blockstore::open_meta(spp::sparse_hash_map<std::string, std::string> & config)
{
int sectsize;
meta_offset = stoull(config["meta_offset"]);
if (meta_offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("meta_offset not aligned");
}
if (config["meta_device"] != "")
{
meta_offset = 0;
meta_fd = open(config["meta_device"].c_str(), O_DIRECT|O_RDWR);
if (meta_fd == -1)
{
throw new std::runtime_error("Failed to open metadata device");
}
if (ioctl(meta_fd, BLKSSZGET, &sectsize) < 0 ||
ioctl(meta_fd, BLKGETSIZE64, &meta_size) < 0 ||
sectsize != 512)
{
throw new std::runtime_error("Metadata device sector is not equal to 512 bytes (or ioctl failed)");
}
if (meta_offset >= meta_size)
{
throw new std::runtime_error("meta_offset exceeds device size");
}
}
else
{
meta_fd = data_fd;
meta_size = 0;
if (meta_offset >= data_size)
{
throw new std::runtime_error("meta_offset exceeds device size");
}
}
}
void blockstore::open_journal(spp::sparse_hash_map<std::string, std::string> & config)
{
int sectsize;
journal_offset = stoull(config["journal_offset"]);
if (journal_offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("journal_offset not aligned");
}
if (config["journal_device"] != "")
{
journal_fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR);
if (journal_fd == -1)
{
throw new std::runtime_error("Failed to open journal device");
}
if (ioctl(journal_fd, BLKSSZGET, &sectsize) < 0 ||
ioctl(journal_fd, BLKGETSIZE64, &journal_size) < 0 ||
sectsize != 512)
{
throw new std::runtime_error("Journal device sector is not equal to 512 bytes");
}
}
else
{
journal_fd = meta_fd;
journal_size = 0;
if (journal_offset >= data_size)
{
throw new std::runtime_error("journal_offset exceeds device size");
}
}
}

142
blockstore_read.cpp

@ -0,0 +1,142 @@
#include "blockstore.h"
int blockstore::fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location)
{
uint32_t cur_start = item_start;
if (cur_start < read_op.offset + read_op.len && item_end > read_op.offset)
{
cur_start = cur_start < read_op.offset ? read_op.offset : cur_start;
item_end = item_end > read_op.offset + read_op.len ? read_op.offset + read_op.len : item_end;
auto fulfill_near = read_op.read_vec.lower_bound(cur_start);
if (fulfill_near != read_op.read_vec.begin())
{
fulfill_near--;
if (fulfill_near->first + fulfill_near->second.iov_len <= cur_start)
{
fulfill_near++;
}
}
while (fulfill_near != read_op.read_vec.end() && fulfill_near->first < item_end)
{
if (fulfill_near->first > cur_start)
{
if (item_state == ST_IN_FLIGHT)
{
// Pause until it's written somewhere
read_op.wait_for = WAIT_IN_FLIGHT;
read_op.wait_version = item_version;
return -1;
}
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe)
{
// Pause until there are more requests available
read_op.wait_for = WAIT_SQE;
return -1;
}
read_op.read_vec[cur_start] = (struct iovec){
read_op.buf + cur_start - read_op.offset,
fulfill_near->first - cur_start
};
io_uring_prep_readv(
sqe,
IS_JOURNAL(item_state) ? journal_fd : data_fd,
// FIXME: &read_op.read_vec is forbidden
&read_op.read_vec[cur_start], 1,
(IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start
);
io_uring_sqe_set_data(sqe, 0/*read op link*/);
}
cur_start = fulfill_near->first + fulfill_near->second.iov_len;
fulfill_near++;
}
if (cur_start < item_end)
{
if (item_state == ST_IN_FLIGHT)
{
// Pause until it's written somewhere
read_op.wait_for = WAIT_IN_FLIGHT;
read_op.wait_version = item_version;
return -1;
}
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe)
{
// Pause until there are more requests available
read_op.wait_for = WAIT_SQE;
return -1;
}
read_op.read_vec[cur_start] = (struct iovec){
read_op.buf + cur_start - read_op.offset,
item_end - cur_start
};
io_uring_prep_readv(
sqe,
IS_JOURNAL(item_state) ? journal_fd : data_fd,
&read_op.read_vec[cur_start], 1,
(IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start
);
io_uring_sqe_set_data(sqe, 0/*read op link*/);
}
}
return 0;
}
int blockstore::read(blockstore_operation *read_op)
{
auto clean_it = object_db.find(read_op->oid);
auto dirty_it = dirty_queue.find(read_op->oid);
if (clean_it == object_db.end() && dirty_it == object_db.end())
{
// region is not allocated - return zeroes
memset(read_op->buf, 0, read_op->len);
read_op->callback(read_op);
return 0;
}
unsigned prev_sqe_pos = ring->sq.sqe_tail;
uint64_t fulfilled = 0;
if (dirty_it != object_db.end())
{
dirty_list dirty = dirty_it->second;
for (int i = dirty.size()-1; i >= 0; i--)
{
if (read_op->flags == OP_READ_DIRTY || IS_STABLE(dirty[i].state))
{
if (fulfill_read(*read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0)
{
// need to wait for something, undo added requests and requeue op
ring->sq.sqe_tail = prev_sqe_pos;
read_op->read_vec.clear();
submit_queue.push_front(read_op);
return 0;
}
}
}
}
if (clean_it != object_db.end())
{
if (fulfill_read(*read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0)
{
// need to wait for something, undo added requests and requeue op
ring->sq.sqe_tail = prev_sqe_pos;
read_op->read_vec.clear();
submit_queue.push_front(read_op);
return 0;
}
}
if (!read_op->read_vec.size())
{
// region is not allocated - return zeroes
free(read_op);
memset(read_op->buf, 0, read_op->len);
read_op->callback(read_op);
return 0;
}
int ret = io_uring_submit(ring);
if (ret < 0)
{
throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
}
return 0;
}

62
test.cpp

@ -0,0 +1,62 @@
#define _LARGEFILE64_SOURCE
#include <sys/types.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdint.h>
#include <malloc.h>
#include <linux/fs.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <stdio.h>
#include <liburing.h>
static int setup_context(unsigned entries, struct io_uring *ring)
{
int ret = io_uring_queue_init(entries, ring, 0);
if (ret < 0)
{
fprintf(stderr, "queue_init: %s\n", strerror(-ret));
return -1;
}
return 0;
}
static void test_write(struct io_uring *ring, int fd)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
assert(sqe);
uint8_t *buf = (uint8_t*)memalign(512, 1024*1024*1024);
struct iovec iov = { buf, 1024*1024*1024 };
io_uring_prep_writev(sqe, fd, &iov, 1, 0);
io_uring_sqe_set_data(sqe, 0);
io_uring_submit_and_wait(ring, 1);
struct io_uring_cqe *cqe;
io_uring_peek_cqe(ring, &cqe);
int ret = cqe->res;
//int ret = writev(fd, &iov, 1);
if (ret < 0)
printf("cqe failed: %d %s\n", ret, strerror(-ret));
else
printf("result: %d\n", ret);
free(buf);
}
int main(int argc, char *argv[])
{
struct io_uring ring;
int fd = open("testfile", O_RDWR | O_DIRECT, 0644);
if (fd < 0)
{
perror("open infile");
return 1;
}
if (setup_context(32, &ring))
return 1;
test_write(&ring, fd);
close(fd);
io_uring_queue_exit(&ring);
return 0;
}
Loading…
Cancel
Save