Try to implement journal flusher as a FSM

blocking-uring-test
Vitaliy Filippov 2019-11-12 13:52:27 +03:00
parent ab20aef941
commit 34451b6e44
3 changed files with 154 additions and 64 deletions

View File

@ -1,6 +1,8 @@
#pragma once #pragma once
#ifndef _LARGEFILE64_SOURCE
#define _LARGEFILE64_SOURCE #define _LARGEFILE64_SOURCE
#endif
#include <sys/types.h> #include <sys/types.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <sys/stat.h> #include <sys/stat.h>
@ -240,9 +242,9 @@ class blockstore
// Another option is https://github.com/algorithm-ninja/cpp-btree // Another option is https://github.com/algorithm-ninja/cpp-btree
spp::sparse_hash_map<object_id, clean_entry, oid_hash> clean_db; spp::sparse_hash_map<object_id, clean_entry, oid_hash> clean_db;
std::map<obj_ver_id, dirty_entry> dirty_db; std::map<obj_ver_id, dirty_entry> dirty_db;
std::list<blockstore_operation*> submit_queue; std::list<blockstore_operation*> submit_queue; // FIXME: funny thing is that vector is better here
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes; std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
std::list<blockstore_operation*> in_progress_syncs; std::list<blockstore_operation*> in_progress_syncs; // ...and probably here, too
uint32_t block_order, block_size; uint32_t block_order, block_size;
uint64_t block_count; uint64_t block_count;
allocator *data_alloc; allocator *data_alloc;
@ -265,6 +267,7 @@ class blockstore
friend class blockstore_init_meta; friend class blockstore_init_meta;
friend class blockstore_init_journal; friend class blockstore_init_journal;
friend class blockstore_journal_check_t; friend class blockstore_journal_check_t;
friend class journal_flusher_t;
void calc_lengths(spp::sparse_hash_map<std::string, std::string> & config); void calc_lengths(spp::sparse_hash_map<std::string, std::string> & config);
void open_data(spp::sparse_hash_map<std::string, std::string> & config); void open_data(spp::sparse_hash_map<std::string, std::string> & config);

View File

@ -123,17 +123,12 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
if (op->pending_ops == 0) if (op->pending_ops == 0)
{ {
// First step: mark dirty_db entries as stable, acknowledge op completion // First step: mark dirty_db entries as stable, acknowledge op completion
// FIXME: oops... we seem to have to copy object id/version pairs...
// No, no, no, copying is bad. We don't want copying.
obj_ver_id* v; obj_ver_id* v;
int i; int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{ {
// Mark all dirty_db entries up to op->version as stable // Mark all dirty_db entries up to op->version as stable
auto dirty_it = dirty_db.find((obj_ver_id){ auto dirty_it = dirty_db.find(*v);
.oid = v->oid,
.version = v->version,
});
if (dirty_it != dirty_db.end()) if (dirty_it != dirty_db.end())
{ {
do do
@ -146,8 +141,13 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
{ {
dirty_it->second.state = ST_D_STABLE; dirty_it->second.state = ST_D_STABLE;
} }
else if (IS_STABLE(dirty_it->second.state))
{
break;
}
dirty_it--; dirty_it--;
} while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid); } while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid);
flusher.flush_queue.push_back(*v);
} }
} }
// Acknowledge op // Acknowledge op
@ -161,62 +161,149 @@ struct offset_len
uint64_t offset, len; uint64_t offset, len;
}; };
struct journal_flusher_t class journal_flusher_t
{ {
std::deque<obj_ver_id> flush_queue; blockstore *bs;
int state;
obj_ver_id cur; obj_ver_id cur;
std::map<obj_ver_id, dirty_entry>::iterator dirty_it; std::map<obj_ver_id, dirty_entry>::iterator dirty_it;
std::vector<offset_len> v; std::vector<offset_len> v;
std::vector<offset_len>::iterator it;
uint64_t offset, len;
public:
journal_flusher_t();
std::deque<obj_ver_id> flush_queue;
void stabilize_object_loop();
}; };
void blockstore::stabilize_object(object_id oid, uint64_t max_ver) #define F_NEXT_OBJ 0
#define F_NEXT_VER 1
#define F_FIND_POS 2
#define F_SUBMIT_FULL 3
#define F_SUBMIT_PART 4
#define F_CUT_OFFSET 5
#define F_FINISH_VER 6
journal_flusher_t::journal_flusher_t()
{ {
auto dirty_it = dirty_db.find((obj_ver_id){ state = F_NEXT_OBJ;
.oid = oid, }
.version = max_ver,
}); // It would be prettier as a coroutine (maybe https://github.com/hnes/libaco ?)
if (dirty_it != dirty_db.end()) // Now it's a state machine
{ void journal_flusher_t::stabilize_object_loop()
std::vector<offset_len> v; {
do begin:
{ if (state == F_NEXT_OBJ)
if (dirty_it->second.state == ST_J_STABLE) {
{ // Pick next object
uint64_t offset = dirty_it->second.offset, len = dirty_it->second.size; if (!flush_queue.size())
auto it = v.begin(); return;
while (1) while (1)
{ {
for (; it != v.end(); it++) cur = flush_queue.front();
if (it->offset >= offset) flush_queue.pop_front();
break; dirty_it = bs->dirty_db.find(cur);
if (it == v.end() || it->offset >= offset+len) if (dirty_it != bs->dirty_db.end())
{ {
v.insert(it, (offset_len){ .offset = offset, .len = len }); state = F_NEXT_VER;
break; v.clear();
} break;
else }
{ else if (flush_queue.size() == 0)
if (it->offset > offset) return;
v.insert(it, (offset_len){ .offset = offset, .len = it->offset-offset }); }
if (offset+len > it->offset+it->len) }
{ if (state == F_NEXT_VER)
len = offset+len - (it->offset+it->len); {
offset = it->offset+it->len; if (dirty_it->second.state == ST_J_STABLE)
} {
else offset = dirty_it->second.offset;
break; len = dirty_it->second.size;
} it = v.begin();
} state = F_FIND_POS;
} }
else if (dirty_it->second.state == ST_D_STABLE) else if (dirty_it->second.state == ST_D_STABLE)
{ {
break; state = F_NEXT_OBJ;
} }
else if (IS_STABLE(dirty_it->second.state)) else if (IS_STABLE(dirty_it->second.state))
{ {
break; state = F_NEXT_OBJ;
} }
} while (dirty_it != dirty_db.begin()); else
} state = F_FINISH_VER;
}
if (state == F_FIND_POS)
{
for (; it != v.end(); it++)
if (it->offset >= offset)
break;
if (it == v.end() || it->offset >= offset+len)
{
state = F_SUBMIT_FULL;
}
else
{
if (it->offset > offset)
state = F_SUBMIT_PART;
else
state = F_CUT_OFFSET;
}
}
if (state == F_SUBMIT_FULL)
{
struct io_uring_sqe *sqe = get_sqe();
if (!sqe)
return;
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ malloc(len), len };
data->op = op; // FIXME OOPS
io_uring_prep_readv(
sqe, journal_fd, &data->iov, 1, journal_offset + dirty_it->second.location + offset
);
op->pending_ops = 1;
v.insert(it, (offset_len){ .offset = offset, .len = len });
state = F_SUBMIT_FULL_WRITE;
return;
}
if (state == F_SUBMIT_FULL_WRITE)
{
struct io_uring_sqe *sqe = get_sqe();
if (!sqe)
return;
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
}
if (state == F_SUBMIT_PART)
{
if (!can_submit)
{
return;
}
v.insert(it, (offset_len){ .offset = offset, .len = it->offset-offset });
state = F_CUT_OFFSET;
}
if (state == F_CUT_OFFSET)
{
if (offset+len > it->offset+it->len)
{
len = offset+len - (it->offset+it->len);
offset = it->offset+it->len;
state = F_FIND_POS;
}
else
state = F_FINISH_VER;
}
if (state == F_FINISH_VER)
{
dirty_it--;
if (dirty_it == bs->dirty_db.begin() || dirty_it->first.oid != cur.oid)
state = F_NEXT_OBJ;
else
state = F_NEXT_VER;
}
goto begin;
} }

View File

@ -134,7 +134,7 @@ int main_vec(int argc, char *argv[])
for (; it != v.end(); it++) for (; it != v.end(); it++)
if (it->iov_len >= r) if (it->iov_len >= r)
break; break;
v.insert(it, (iovec){ .iov_base = 0, .iov_len = r }); v.insert(it, (iovec){ .iov_base = 0, .iov_len = (size_t)r });
} }
} }
return 0; return 0;
@ -150,7 +150,7 @@ int main_map(int argc, char *argv[])
for (int i = 0; i < 2048; i++) for (int i = 0; i < 2048; i++)
{ {
int r = rand(); int r = rand();
v[r] = (iovec){ .iov_base = 0, .iov_len = r }; v[r] = (iovec){ .iov_base = 0, .iov_len = (size_t)r };
} }
} }
return 0; return 0;
@ -169,14 +169,14 @@ int main0(int argc, char *argv[])
{ {
dirty_db[(obj_ver_id){ dirty_db[(obj_ver_id){
.oid = (object_id){ .oid = (object_id){
.inode = rand(), .inode = (uint64_t)rand(),
.stripe = i, .stripe = (uint64_t)i,
}, },
.version = 1, .version = 1,
}] = (dirty_entry){ }] = (dirty_entry){
.state = ST_D_META_SYNCED, .state = ST_D_META_SYNCED,
.flags = 0, .flags = 0,
.location = i << 17, .location = (uint64_t)i << 17,
.offset = 0, .offset = 0,
.size = 1 << 17, .size = 1 << 17,
}; };