Rename dirty_queue to dirty_db and make it a single std::map

blocking-uring-test
Vitaliy Filippov 2019-11-08 00:19:17 +03:00
parent 5330461029
commit 40890aeec5
8 changed files with 89 additions and 58 deletions

View File

@ -2,8 +2,8 @@ all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_rea
clean:
rm -f *.o
crc32c.o: crc32c.c
gcc -c -o $@ $<
%.o: %.cpp
gcc -c -o $@ $<
g++ -c -o $@ $<
%.o: %.cpp blockstore.h
g++ -c -o $@ $<
test: test.cpp
gcc -o test -luring test.cpp
g++ -o test -luring test.cpp

View File

@ -3,8 +3,6 @@
#include <stdlib.h>
#include <malloc.h>
#define MAX64 ((uint64_t)-1)
allocator *allocator_create(uint64_t blocks)
{
if (blocks >= 0x80000000 || blocks <= 1)
@ -22,8 +20,8 @@ allocator *allocator_create(uint64_t blocks)
allocator *buf = (allocator*)memalign(sizeof(uint64_t), (2 + total)*sizeof(uint64_t));
buf->size = blocks;
buf->last_one_mask = (blocks % 64) == 0
? MAX64
: ~(MAX64 << (64 - blocks % 64));
? UINT64_MAX
: ~(UINT64_MAX << (64 - blocks % 64));
for (uint64_t i = 0; i < blocks; i++)
{
buf->mask[i] = 0;
@ -61,7 +59,7 @@ void allocator_set(allocator *alloc, uint64_t addr, bool value)
{
alloc->mask[last] = alloc->mask[last] | (1 << bit);
if (alloc->mask[last] != (!is_last || cur_addr/64 < alloc->size/64
? MAX64 : alloc->last_one_mask))
? UINT64_MAX : alloc->last_one_mask))
{
break;
}
@ -109,13 +107,13 @@ uint64_t allocator_find_free(allocator *alloc)
if (i == 64)
{
// No space
return MAX64;
return UINT64_MAX;
}
addr = (addr * 64) | i;
if (addr >= alloc->size)
{
// No space
return MAX64;
return UINT64_MAX;
}
offset += p2;
p2 = p2 * 64;

View File

@ -205,10 +205,14 @@ int blockstore::enqueue_op(blockstore_operation *op)
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
{
// Assign version number
auto dirty_it = dirty_queue.find(op->oid);
if (dirty_it != dirty_queue.end())
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
.oid = op->oid,
.version = UINT64_MAX,
});
dirty_it--;
if (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
{
op->version = dirty_it->second.back().version + 1;
op->version = dirty_it->first.version + 1;
}
else
{
@ -221,11 +225,12 @@ int blockstore::enqueue_op(blockstore_operation *op)
{
op->version = 1;
}
dirty_it = dirty_queue.emplace(op->oid, dirty_list()).first;
}
// Immediately add the operation into the dirty queue, so subsequent reads could see it
dirty_it->second.push_back((dirty_entry){
// Immediately add the operation into dirty_db, so subsequent reads could see it
dirty_db.emplace((obj_ver_id){
.oid = op->oid,
.version = op->version,
}, (dirty_entry){
.state = ST_IN_FLIGHT,
.flags = 0,
.location = 0,

View File

@ -65,7 +65,12 @@ struct __attribute__((__packed__)) object_id
inline bool operator == (const object_id & a, const object_id & b)
{
return b.inode == a.inode && b.stripe == a.stripe;
return a.inode == b.inode && a.stripe == b.stripe;
}
inline bool operator < (const object_id & a, const object_id & b)
{
return a.inode < b.inode || a.inode == b.inode && a.stripe < b.stripe;
}
// 32 bytes per "clean" entry on disk with fixed metadata tables
@ -87,9 +92,19 @@ struct __attribute__((__packed__)) clean_entry
};
// 48 bytes per dirty entry in memory
struct __attribute__((__packed__)) obj_ver_id
{
object_id oid;
uint64_t version;
};
inline bool operator < (const obj_ver_id & a, const obj_ver_id & b)
{
return a.oid < b.oid || a.oid == b.oid && a.version < b.version;
}
struct __attribute__((__packed__)) dirty_entry
{
uint64_t version;
uint32_t state;
uint32_t flags;
uint64_t location; // location in either journal or data
@ -97,8 +112,6 @@ struct __attribute__((__packed__)) dirty_entry
uint32_t size; // entry size
};
typedef std::vector<dirty_entry> dirty_list;
class oid_hash
{
public:
@ -124,7 +137,7 @@ public:
// we should stop submission of other operations. Otherwise some "scatter" reads
// may end up blocked for a long time.
// Otherwise, the submit order is free, that is all operations may be submitted immediately
// In fact, adding a write operation must immediately result in dirty_queue being populated
// In fact, adding a write operation must immediately result in dirty_db being populated
// write -> immediately add to dirty ops, immediately submit. postpone if ring full
// read -> check dirty ops, read or wait, remember max used journal offset, then unremember it
@ -176,7 +189,7 @@ class blockstore
struct ring_consumer_t ring_consumer;
public:
spp::sparse_hash_map<object_id, clean_entry, oid_hash> object_db;
spp::sparse_hash_map<object_id, dirty_list, oid_hash> dirty_queue;
std::map<obj_ver_id, dirty_entry> dirty_db;
std::list<blockstore_operation*> submit_queue;
std::set<blockstore_operation*> in_process_ops;
uint32_t block_order, block_size;

View File

@ -290,8 +290,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
location = done_pos + total_pos;
total_pos += je->small_write.len;
}
bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){
bs->dirty_db.emplace((obj_ver_id){
.oid = je->small_write.oid,
.version = je->small_write.version,
}, (dirty_entry){
.state = ST_J_SYNCED,
.flags = 0,
.location = location,
@ -302,8 +304,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
else if (je->type == JE_BIG_WRITE)
{
// oid, version, block
bs->dirty_queue[je->big_write.oid].push_back((dirty_entry){
bs->dirty_db.emplace((obj_ver_id){
.oid = je->big_write.oid,
.version = je->big_write.version,
}, (dirty_entry){
.state = ST_D_META_SYNCED,
.flags = 0,
.location = je->big_write.block,
@ -314,8 +318,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
else if (je->type == JE_STABLE)
{
// oid, version
auto it = bs->dirty_queue.find(je->stable.oid);
if (it == bs->dirty_queue.end())
auto it = bs->dirty_db.find((obj_ver_id){
.oid = je->stable.oid,
.version = je->stable.version,
});
if (it == bs->dirty_db.end())
{
// journal contains a legitimate STABLE entry for a non-existing dirty write
// this probably means that journal was trimmed between WRITTEN and STABLE entries
@ -323,29 +330,18 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
}
else
{
auto & lst = it->second;
int i;
for (i = 0; i < lst.size(); i++)
{
if (lst[i].version == je->stable.version)
{
lst[i].state = (lst[i].state == ST_D_META_SYNCED
? ST_D_STABLE
: (lst[i].state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE));
break;
}
}
if (i >= lst.size())
{
// same. STABLE entry for a missing object version
}
it->second.state = (it->second.state == ST_D_META_SYNCED
? ST_D_STABLE
: (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE));
}
}
else if (je->type == JE_DELETE)
{
// oid, version
bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){
.version = je->small_write.version,
bs->dirty_db.emplace((obj_ver_id){
.oid = je->del.oid,
.version = je->del.version,
}, (dirty_entry){
.state = ST_DEL_SYNCED,
.flags = 0,
.location = 0,

View File

@ -79,8 +79,14 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint32_t item_start,
int blockstore::dequeue_read(blockstore_operation *read_op)
{
auto clean_it = object_db.find(read_op->oid);
auto dirty_it = dirty_queue.find(read_op->oid);
if (clean_it == object_db.end() && dirty_it == dirty_queue.end())
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
.oid = read_op->oid,
.version = UINT64_MAX,
});
dirty_it--;
bool clean_found = clean_it != object_db.end();
bool dirty_found = (dirty_it != dirty_db.end() && dirty_it->first.oid == read_op->oid);
if (!clean_found && !dirty_found)
{
// region is not allocated - return zeroes
memset(read_op->buf, 0, read_op->len);
@ -90,14 +96,15 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
}
unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail;
uint64_t fulfilled = 0;
if (dirty_it != dirty_queue.end())
if (dirty_found)
{
dirty_list dirty = dirty_it->second;
for (int i = dirty.size()-1; i >= 0; i--)
while (dirty_it->first.oid == read_op->oid)
{
if ((read_op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || IS_STABLE(dirty[i].state))
dirty_entry& dirty = dirty_it->second;
if ((read_op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || IS_STABLE(dirty.state))
{
if (fulfill_read(read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0)
if (fulfill_read(read_op, dirty.offset, dirty.offset + dirty.size,
dirty.state, dirty_it->first.version, dirty.location) < 0)
{
// need to wait. undo added requests, don't dequeue op
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
@ -105,6 +112,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
return 0;
}
}
dirty_it--;
}
}
if (clean_it != object_db.end())

View File

@ -3,7 +3,10 @@
// First step of the write algorithm: dequeue operation and submit initial write(s)
int blockstore::dequeue_write(blockstore_operation *op)
{
auto dirty_it = dirty_queue[op->oid].find(op->version); // FIXME OOPS
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
if (op->len == block_size)
{
// Big (redirect) write
@ -23,8 +26,8 @@ int blockstore::dequeue_write(blockstore_operation *op)
return 0;
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
(*dirty_it).location = loc << block_order;
//(*dirty_it).state = ST_D_SUBMITTED;
dirty_it->second.location = loc << block_order;
//dirty_it->second.state = ST_D_SUBMITTED;
allocator_set(data_alloc, loc, true);
data->iov = (struct iovec){ op->buf, op->len };
data->op = op;
@ -38,7 +41,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
{
// Small (journaled) write
// First check if the journal has sufficient space
// FIXME Always two SQEs for now. Although it's possible to send 1
// FIXME Always two SQEs for now. Although it's possible to send 1 sometimes
bool two_sqes = true;
uint64_t next_pos = journal.next_free;
if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write))
@ -116,8 +119,8 @@ int blockstore::dequeue_write(blockstore_operation *op)
io_uring_prep_writev(
sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free
);
(*dirty_it).location = journal.next_free;
//(*dirty_it).state = ST_J_SUBMITTED;
dirty_it->second.location = journal.next_free;
//dirty_it->second.state = ST_J_SUBMITTED;
// Move journal.next_free and save last write for current sector
journal.next_free += op->len;
if (journal.next_free >= journal.len)

View File

@ -13,6 +13,8 @@
#include <stdio.h>
#include <liburing.h>
#include <map>
static int setup_context(unsigned entries, struct io_uring *ring)
{
int ret = io_uring_queue_init(entries, ring, 0);
@ -47,6 +49,12 @@ static void test_write(struct io_uring *ring, int fd)
int main(int argc, char *argv[])
{
std::map<int, std::string> strs;
strs.emplace(12, "str");
auto it = strs.upper_bound(11);
printf("s = %d %s %d\n", it->first, it->second.c_str(), it == strs.begin());
it--;
printf("s = %d %s\n", it->first, it->second.c_str());
struct io_uring ring;
int fd = open("testfile", O_RDWR | O_DIRECT, 0644);
if (fd < 0)