From 24f5d715950f6d6ae8ca450143da6ea6de63f0a0 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 10 Nov 2019 01:40:48 +0300 Subject: [PATCH] Move handle_event code into _read.cpp, _write.cpp; move sync code into _sync.cpp --- Makefile | 2 +- blockstore.cpp | 155 +++++++++++++---------------------- blockstore.h | 45 ++++++---- blockstore_read.cpp | 19 ++++- blockstore_sync.cpp | 190 +++++++++++++++++++++++++++++++++++++++++++ blockstore_write.cpp | 136 ++++++------------------------- 6 files changed, 324 insertions(+), 223 deletions(-) create mode 100644 blockstore_sync.cpp diff --git a/Makefile b/Makefile index 86fa8811..11377aa1 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o blockstore_write.o crc32c.o ringloop.o test +all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o blockstore_write.o blockstore_sync.o crc32c.o ringloop.o test clean: rm -f *.o crc32c.o: crc32c.c diff --git a/blockstore.cpp b/blockstore.cpp index 3d2e1da5..e943988a 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -69,67 +69,16 @@ void blockstore::handle_event(ring_data_t *data) if ((op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || (op->flags & OP_TYPE_MASK) == OP_READ) { - op->pending_ops--; - if (data->res < 0) - { - // read error - op->retval = data->res; - } - if (op->pending_ops == 0) - { - if (op->retval == 0) - op->retval = op->len; - op->callback(op); - in_process_ops.erase(op); - } + handle_read_event(data, op); } else if ((op->flags & OP_TYPE_MASK) == OP_WRITE || (op->flags & OP_TYPE_MASK) == OP_DELETE) { - op->pending_ops--; - if (data->res < 0) - { - // write error - // FIXME: our state becomes corrupted after a write error. maybe do something better than just die - throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); - } - if (op->min_used_journal_sector > 0) - { - for (uint64_t s = op->min_used_journal_sector; s <= op->max_used_journal_sector; s++) - { - journal.sector_info[s-1].usage_count--; - } - op->min_used_journal_sector = op->max_used_journal_sector = 0; - } - if (op->pending_ops == 0) - { - // Acknowledge write without sync - auto dirty_it = dirty_db.find((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); - dirty_it->second.state = (dirty_it->second.state == ST_J_SUBMITTED - ? ST_J_WRITTEN : (dirty_it->second.state == ST_DEL_SUBMITTED ? ST_DEL_WRITTEN : ST_D_WRITTEN)); - op->retval = op->len; - op->callback(op); - in_process_ops.erase(op); - unsynced_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); - } + handle_write_event(data, op); } else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) { - if (op->min_used_journal_sector > 0) - { - for (uint64_t s = op->min_used_journal_sector; s <= op->max_used_journal_sector; s++) - { - journal.sector_info[s-1].usage_count--; - } - op->min_used_journal_sector = op->max_used_journal_sector = 0; - } - + handle_sync_event(data, op); } else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) { @@ -182,50 +131,11 @@ void blockstore::loop() auto op = *(cur++); if (op->wait_for) { + check_wait(op); if (op->wait_for == WAIT_SQE) - { - if (io_uring_sq_space_left(ringloop->ring) < op->wait_detail) - { - // stop submission if there's still no free space - break; - } - op->wait_for = 0; - } - else if (op->wait_for == WAIT_IN_FLIGHT) - { - auto dirty_it = dirty_db.find((obj_ver_id){ - .oid = op->oid, - .version = op->wait_detail, - }); - if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state)) - { - // do not submit - continue; - } - op->wait_for = 0; - } - else if (op->wait_for == WAIT_JOURNAL) - { - if (journal.used_start < op->wait_detail) - { - // do not submit - continue; - } - op->wait_for = 0; - } - else if (op->wait_for == WAIT_JOURNAL_BUFFER) - { - if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0) - { - // do not submit - continue; - } - op->wait_for = 0; - } - else - { - throw new std::runtime_error("BUG: op->wait_for value is unexpected"); - } + break; + else if (op->wait_for) + continue; } unsigned ring_space = io_uring_sq_space_left(ringloop->ring); unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; @@ -266,7 +176,7 @@ void blockstore::loop() throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); } submit_queue.erase(op_ptr); - in_process_ops.insert(op); + in_progress_ops.insert(op); } else { @@ -282,6 +192,54 @@ void blockstore::loop() } } +void blockstore::check_wait(blockstore_operation *op) +{ + if (op->wait_for == WAIT_SQE) + { + if (io_uring_sq_space_left(ringloop->ring) < op->wait_detail) + { + // stop submission if there's still no free space + return; + } + op->wait_for = 0; + } + else if (op->wait_for == WAIT_IN_FLIGHT) + { + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = op->oid, + .version = op->wait_detail, + }); + if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state)) + { + // do not submit + return; + } + op->wait_for = 0; + } + else if (op->wait_for == WAIT_JOURNAL) + { + if (journal.used_start < op->wait_detail) + { + // do not submit + return; + } + op->wait_for = 0; + } + else if (op->wait_for == WAIT_JOURNAL_BUFFER) + { + if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0) + { + // do not submit + return; + } + op->wait_for = 0; + } + else + { + throw new std::runtime_error("BUG: op->wait_for value is unexpected"); + } +} + int blockstore::enqueue_op(blockstore_operation *op) { if (op->offset >= block_size || op->len >= block_size-op->offset || @@ -292,6 +250,7 @@ int blockstore::enqueue_op(blockstore_operation *op) return -EINVAL; } op->wait_for = 0; + op->sync_state = 0; submit_queue.push_back(op); if ((op->flags & OP_TYPE_MASK) == OP_WRITE) { diff --git a/blockstore.h b/blockstore.h index b1eaf1b4..6f4a0155 100644 --- a/blockstore.h +++ b/blockstore.h @@ -195,7 +195,6 @@ public: struct blockstore_operation { std::function callback; - uint32_t flags; object_id oid; uint64_t version; @@ -204,17 +203,24 @@ struct blockstore_operation uint8_t *buf; int retval; + // FIXME: Move internal fields somewhere + friend class blockstore; +private: // Wait status int wait_for; uint64_t wait_detail; int pending_ops; - // FIXME make all of these pointers and put them into a union + // Read std::map read_vec; + + // Sync, write uint64_t min_used_journal_sector, max_used_journal_sector; + + // Sync std::deque sync_writes; - int big_write_count; - int big_write_state; + std::list::iterator in_progress_ptr; + int big_write_count, sync_state, prev_sync_count; }; class blockstore; @@ -224,13 +230,14 @@ class blockstore; class blockstore { struct ring_consumer_t ring_consumer; -public: + // Another option is https://github.com/algorithm-ninja/cpp-btree spp::sparse_hash_map object_db; std::map dirty_db; std::list submit_queue; std::deque unsynced_writes; - std::set in_process_ops; + std::list in_progress_syncs; + std::set in_progress_ops; uint32_t block_order, block_size; uint64_t block_count; allocator *data_alloc; @@ -250,8 +257,8 @@ public: return ringloop->get_sqe(ring_consumer.number); } - blockstore(spp::sparse_hash_map & config, ring_loop_t *ringloop); - ~blockstore(); + friend class blockstore_init_meta; + friend class blockstore_init_journal; void calc_lengths(spp::sparse_hash_map & config); void open_data(spp::sparse_hash_map & config); @@ -264,12 +271,7 @@ public: blockstore_init_meta* metadata_init_reader; blockstore_init_journal* journal_init_reader; - // Event loop - void handle_event(ring_data_t* data); - void loop(); - - // Submission - int enqueue_op(blockstore_operation *op); + void check_wait(blockstore_operation *op); // Read int dequeue_read(blockstore_operation *read_op); @@ -277,10 +279,25 @@ public: uint32_t item_state, uint64_t item_version, uint64_t item_location); int fulfill_read_push(blockstore_operation *read_op, uint32_t item_start, uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end); + void handle_read_event(ring_data_t *data, blockstore_operation *op); // Write int dequeue_write(blockstore_operation *op); + void handle_write_event(ring_data_t *data, blockstore_operation *op); // Sync int dequeue_sync(blockstore_operation *op); + void handle_sync_event(ring_data_t *data, blockstore_operation *op); + +public: + + blockstore(spp::sparse_hash_map & config, ring_loop_t *ringloop); + ~blockstore(); + + // Event loop + void handle_event(ring_data_t* data); + void loop(); + + // Submission + int enqueue_op(blockstore_operation *op); }; diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 79a9e9d2..57504271 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -126,6 +126,23 @@ int blockstore::dequeue_read(blockstore_operation *read_op) } read_op->retval = 0; read_op->pending_ops = read_op->read_vec.size(); - in_process_ops.insert(read_op); + in_progress_ops.insert(read_op); return 1; } + +void blockstore::handle_read_event(ring_data_t *data, blockstore_operation *op) +{ + op->pending_ops--; + if (data->res < 0) + { + // read error + op->retval = data->res; + } + if (op->pending_ops == 0) + { + if (op->retval == 0) + op->retval = op->len; + op->callback(op); + in_progress_ops.erase(op); + } +} diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp new file mode 100644 index 00000000..d5d3ee85 --- /dev/null +++ b/blockstore_sync.cpp @@ -0,0 +1,190 @@ +#include "blockstore.h" + +#define SYNC_NO_BIG 1 +#define SYNC_HAS_BIG 2 +#define SYNC_DATA_SYNC_SENT 3 +#define SYNC_JOURNAL_SYNC_SENT 4 +#define SYNC_DONE 5 + +int blockstore::dequeue_sync(blockstore_operation *op) +{ + if (op->sync_state == 0) + { + op->big_write_count = 0; + op->sync_state = SYNC_NO_BIG; + op->sync_writes.swap(unsynced_writes); + unsynced_writes.clear(); + if (op->sync_writes.size() == 0) + { + op->sync_state = SYNC_DONE; + } + auto it = op->sync_writes.begin(); + while (it != op->sync_writes.end()) + { + uint32_t state = dirty_db[*it].state; + if (IS_BIG_WRITE(state)) + { + op->big_write_count++; + op->sync_state = SYNC_HAS_BIG; + } + it++; + } + } + if (op->sync_state == SYNC_NO_BIG) + { + // No big writes, just fsync the journal + BS_SUBMIT_GET_SQE(sqe, data); + io_uring_prep_fsync(sqe, journal.fd, 0); + data->op = op; + op->pending_ops = 1; + op->sync_state = SYNC_JOURNAL_SYNC_SENT; + } + else if (op->sync_state == SYNC_HAS_BIG) + { + // 1st step: fsync data + BS_SUBMIT_GET_SQE(sqe, data); + io_uring_prep_fsync(sqe, data_fd, 0); + data->op = op; + op->pending_ops = 1; + op->sync_state = SYNC_DATA_SYNC_SENT; + } + else if (op->sync_state == SYNC_DATA_SYNC_SENT) + { + // 2nd step: Data device is synced, prepare & write journal entries + // Check space in the journal and journal memory buffers + int required = op->big_write_count, sectors_required = 1; + uint64_t next_pos = journal.next_free, next_sector = journal.cur_sector; + while (1) + { + int fits = (512 - journal.in_sector_pos) / sizeof(journal_entry_big_write); + required -= fits; + if (required <= 0) + break; + next_pos = (next_pos+512) < journal.len ? next_pos+512 : 512; + sectors_required++; + next_sector = ((next_sector + 1) % journal.sector_count); + if (journal.sector_info[next_sector].usage_count > 0) + { + // No memory buffer available. Wait for it. + op->wait_for = WAIT_JOURNAL_BUFFER; + return 0; + } + } + if (next_pos >= journal.used_start) + { + // No space in the journal. Wait for it. + op->wait_for = WAIT_JOURNAL; + op->wait_detail = next_pos; + return 0; + } + // Get SQEs. Don't bother about merging, submit each journal sector as a separate request + struct io_uring_sqe *sqe[sectors_required+1]; + for (int i = 0; i < sectors_required+1; i++) + { + BS_SUBMIT_GET_SQE_DECL(sqe[i]); + } + // Prepare and submit journal entries + op->min_used_journal_sector = 1 + journal.cur_sector; + sectors_required = 0; + required = op->big_write_count; + // FIXME: advance it + auto it = op->sync_writes.begin(); + while (1) + { + int fits = (512 - journal.in_sector_pos) / sizeof(journal_entry_big_write); + while (fits > 0 && required > 0) + { + journal_entry_big_write *je = (journal_entry_big_write*)( + journal.sector_buf + 512*journal.cur_sector + journal.in_sector_pos + ); + *je = { + .crc32 = 0, + .magic = JOURNAL_MAGIC, + .type = JE_BIG_WRITE, + .size = sizeof(journal_entry_big_write), + .crc32_prev = journal.crc32_last, + .oid = it->oid, + .version = it->version, + .block = dirty_db[*it].location, + }; + je->crc32 = je_crc32((journal_entry*)je); + journal.crc32_last = je->crc32; + journal.in_sector_pos += sizeof(journal_entry_big_write); + required--; + } + if (required <= 0) + break; + journal.sector_info[journal.cur_sector].usage_count++; + struct ring_data_t *data = ((ring_data_t*)sqe[sectors_required]->user_data); + data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; + data->op = op; + io_uring_prep_writev( + sqe[sectors_required], journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset + ); + journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count); + journal.sector_info[journal.cur_sector].offset = journal.next_free; + journal.in_sector_pos = 0; + journal.next_free = (journal.next_free + 512) < journal.len ? journal.next_free + 512 : 512; + memset(journal.sector_buf + 512*journal.cur_sector, 0, 512); + sectors_required++; + } + // ... And a journal fsync + io_uring_prep_fsync(sqe[sectors_required], journal.fd, 0); + struct ring_data_t *data = ((ring_data_t*)sqe[sectors_required]->user_data); + data->op = op; + op->pending_ops = 1 + sectors_required; + op->max_used_journal_sector = 1 + journal.cur_sector; + op->sync_state = SYNC_JOURNAL_SYNC_SENT; + } + // FIXME: resubmit op from in_progress + op->prev_sync_count = in_progress_syncs.size(); + op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); + return 1; +} + +void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op) +{ + if (data->res < 0) + { + // sync error + // FIXME: our state becomes corrupted after a write error. maybe do something better than just die + throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); + } + op->pending_ops--; + if (op->min_used_journal_sector > 0) + { + for (uint64_t s = op->min_used_journal_sector; s != op->max_used_journal_sector; s = (s + 1) % journal.sector_count) + { + journal.sector_info[s-1].usage_count--; + } + op->min_used_journal_sector = op->max_used_journal_sector = 0; + } + if (op->pending_ops == 0) + { + // Acknowledge sync + auto it = op->in_progress_ptr; + int done_syncs = 1; + ++it; + while (it != in_progress_syncs.end()) + { + auto & next_sync = *it; + next_sync->prev_sync_count -= done_syncs; + if (next_sync->prev_sync_count == 0/* && next_sync->DONE*/) + { + done_syncs++; + auto next_it = it; + it++; + in_progress_syncs.erase(next_it); + next_sync->retval = 0; + next_sync->callback(next_sync); + in_progress_ops.erase(next_sync); + } + else + it++; + } + in_progress_syncs.erase(op->in_progress_ptr); + op->retval = 0; + op->callback(op); + in_progress_ops.erase(op); + } +} diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 9d3ffcb2..9425327b 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -114,120 +114,38 @@ int blockstore::dequeue_write(blockstore_operation *op) return 1; } -int blockstore::dequeue_sync(blockstore_operation *op) +void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op) { - op->big_write_count = 0; - op->big_write_state = 0x10000; - op->sync_writes.swap(unsynced_writes); - unsynced_writes.clear(); - auto it = op->sync_writes.begin(); - while (it != op->sync_writes.end()) + if (data->res < 0) { - uint32_t state = dirty_db[*it].state; - if (IS_BIG_WRITE(state)) - { - op->big_write_count++; - op->big_write_state = op->big_write_state < state ? op->big_write_state : state; - } - it++; + // write error + // FIXME: our state becomes corrupted after a write error. maybe do something better than just die + throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); } - if (op->big_write_count == 0 || op->big_write_state == ST_D_META_WRITTEN) + op->pending_ops--; + if (op->min_used_journal_sector > 0) { - // Just fsync the journal - BS_SUBMIT_GET_SQE(sqe, data); - io_uring_prep_fsync(sqe, journal.fd, 0); - data->op = op; - op->pending_ops = 1; + for (uint64_t s = op->min_used_journal_sector; s <= op->max_used_journal_sector; s++) + { + journal.sector_info[s-1].usage_count--; + } + op->min_used_journal_sector = op->max_used_journal_sector = 0; } - else if (op->big_write_state == ST_D_WRITTEN) + if (op->pending_ops == 0) { - // 1st step: fsync data - BS_SUBMIT_GET_SQE(sqe, data); - io_uring_prep_fsync(sqe, data_fd, 0); - data->op = op; - op->pending_ops = 1; + // Acknowledge write without sync + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + dirty_it->second.state = (dirty_it->second.state == ST_J_SUBMITTED + ? ST_J_WRITTEN : (dirty_it->second.state == ST_DEL_SUBMITTED ? ST_DEL_WRITTEN : ST_D_WRITTEN)); + op->retval = op->len; + op->callback(op); + in_progress_ops.erase(op); + unsynced_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); } - else if (op->big_write_state == ST_D_SYNCED) - { - // 2nd step: Data device is synced, prepare & write journal entries - // Check space in the journal and journal memory buffers - int required = op->big_write_count, sectors_required = 1; - uint64_t next_pos = journal.next_free, next_sector = journal.cur_sector; - while (1) - { - int fits = (512 - journal.in_sector_pos) / sizeof(journal_entry_big_write); - required -= fits; - if (required <= 0) - break; - next_pos = (next_pos+512) < journal.len ? next_pos+512 : 512; - sectors_required++; - next_sector = ((next_sector + 1) % journal.sector_count); - if (journal.sector_info[next_sector].usage_count > 0) - { - // No memory buffer available. Wait for it. - op->wait_for = WAIT_JOURNAL_BUFFER; - return 0; - } - } - if (next_pos >= journal.used_start) - { - // No space in the journal. Wait for it. - op->wait_for = WAIT_JOURNAL; - op->wait_detail = next_pos; - return 0; - } - // Get SQEs. Don't bother about merging, submit each journal sector as a separate request - struct io_uring_sqe *sqe[sectors_required]; - for (int i = 0; i < sectors_required; i++) - { - BS_SUBMIT_GET_SQE_DECL(sqe[i]); - } - // Prepare and submit journal entries - op->min_used_journal_sector = 1 + journal.cur_sector; - sectors_required = 0; - required = op->big_write_count; - it = op->sync_writes.begin(); - while (1) - { - int fits = (512 - journal.in_sector_pos) / sizeof(journal_entry_big_write); - while (fits > 0 && required > 0) - { - journal_entry_big_write *je = (journal_entry_big_write*)( - journal.sector_buf + 512*journal.cur_sector + journal.in_sector_pos - ); - *je = { - .crc32 = 0, - .magic = JOURNAL_MAGIC, - .type = JE_BIG_WRITE, - .size = sizeof(journal_entry_big_write), - .crc32_prev = journal.crc32_last, - .oid = it->oid, - .version = it->version, - .block = dirty_db[*it].location, - }; - je->crc32 = je_crc32((journal_entry*)je); - journal.crc32_last = je->crc32; - journal.in_sector_pos += sizeof(journal_entry_big_write); - required--; - } - if (required <= 0) - break; - journal.sector_info[journal.cur_sector].usage_count++; - struct ring_data_t *data = ((ring_data_t*)sqe[sectors_required]->user_data); - data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; - data->op = op; - io_uring_prep_writev( - sqe[sectors_required], journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset - ); - journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count); - journal.sector_info[journal.cur_sector].offset = journal.next_free; - journal.in_sector_pos = 0; - journal.next_free = (journal.next_free + 512) < journal.len ? journal.next_free + 512 : 512; - memset(journal.sector_buf + 512*journal.cur_sector, 0, 512); - sectors_required++; - } - op->pending_ops = sectors_required; - op->max_used_journal_sector = 1 + journal.cur_sector; - } - return 1; }