From 84c62840bdb611602b95e1b244c9ed0741ec270c Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 7 Nov 2019 02:24:12 +0300 Subject: [PATCH] Begin write algorithm --- Makefile | 2 +- blockstore.cpp | 27 +++++++++- blockstore.h | 12 ++++- blockstore_read.cpp | 2 +- blockstore_write.cpp | 123 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 161 insertions(+), 5 deletions(-) create mode 100644 blockstore_write.cpp diff --git a/Makefile b/Makefile index f22f92a4..a4f86e61 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o crc32c.o ringloop.o test +all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o blockstore_write.o crc32c.o ringloop.o test clean: rm -f *.o crc32c.o: crc32c.c diff --git a/blockstore.cpp b/blockstore.cpp index 6b3f4f07..44c3b605 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -138,14 +138,39 @@ void blockstore::loop() break; } } + else if (((*cur)->flags & OP_TYPE_MASK) == OP_WRITE || + ((*cur)->flags & OP_TYPE_MASK) == OP_DELETE) + { + int dequeue_op = dequeue_write(*cur); + if (dequeue_op) + { + submit_queue.erase(cur); + } + else if ((*cur)->wait_for == WAIT_SQE) + { + // ring is full, stop submission + break; + } + } + else if (((*cur)->flags & OP_TYPE_MASK) == OP_SYNC) + { + + } + else if (((*cur)->flags & OP_TYPE_MASK) == OP_STABLE) + { + + } } } } int blockstore::enqueue_op(blockstore_operation *op) { - if (op->offset >= block_size || op->len >= block_size-op->offset) + if (op->offset >= block_size || op->len >= block_size-op->offset || + (op->len % DISK_ALIGNMENT) || + (op->flags & OP_TYPE_MASK) < OP_READ || (op->flags & OP_TYPE_MASK) > OP_DELETE) { + // Basic verification not passed return -EINVAL; } submit_queue.push_back(op); diff --git a/blockstore.h b/blockstore.h index 7e31514f..e2609d4f 100644 --- a/blockstore.h +++ b/blockstore.h @@ -69,6 +69,7 @@ inline bool operator == (const object_id & a, const object_id & b) } // 32 bytes per "clean" entry on disk with fixed metadata tables +// FIXME: maybe add crc32's to metadata struct __attribute__((__packed__)) clean_disk_entry { object_id oid; @@ -156,7 +157,7 @@ struct blockstore_operation std::map read_vec; int pending_ops; int wait_for; - uint64_t wait_version; + uint64_t wait_detail; }; class blockstore; @@ -211,11 +212,18 @@ public: void handle_event(ring_data_t* data); void loop(); - // Read + // Submission int enqueue_op(blockstore_operation *op); + + // Read int dequeue_read(blockstore_operation *read_op); int fulfill_read(blockstore_operation *read_op, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location); int fulfill_read_push(blockstore_operation *read_op, uint32_t item_start, uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end); + + // Write + int dequeue_write(blockstore_operation *op); + + // Sync }; diff --git a/blockstore_read.cpp b/blockstore_read.cpp index b2d1305b..5a384014 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -9,7 +9,7 @@ int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_s { // Pause until it's written somewhere read_op->wait_for = WAIT_IN_FLIGHT; - read_op->wait_version = item_version; + read_op->wait_detail = item_version; return -1; } else if (item_state == ST_DEL_WRITTEN || item_state == ST_DEL_SYNCED || item_state == ST_DEL_MOVED) diff --git a/blockstore_write.cpp b/blockstore_write.cpp new file mode 100644 index 00000000..57f10d42 --- /dev/null +++ b/blockstore_write.cpp @@ -0,0 +1,123 @@ +#include "blockstore.h" + +// First step of the write algorithm: dequeue operation and submit initial write(s) +int blockstore::dequeue_write(blockstore_operation *op) +{ + auto dirty_it = dirty_queue[op->oid].find(op->version); // FIXME OOPS + if (op->len == block_size) + { + // Big (redirect) write + uint64_t loc = allocator_find_free(data_alloc); + if (loc == (uint64_t)-1) + { + // no space + op->retval = -ENOSPC; + op->callback(op); + return 1; + } + struct io_uring_sqe *sqe = get_sqe(); + if (!sqe) + { + // Pause until there are more requests available + op->wait_for = WAIT_SQE; + return 0; + } + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + (*dirty_it).location = loc << block_order; + //(*dirty_it).state = ST_D_SUBMITTED; + allocator_set(data_alloc, loc, true); + data->iov = (struct iovec){ op->buf, op->len }; + data->op = op; + io_uring_prep_writev( + sqe, data_fd, &data->iov, 1, data_offset + (loc << block_order) + ); + op->pending_ops = 1; + } + else + { + // Small (journaled) write + // First check if the journal has sufficient space + bool two_sqes = false; + uint64_t next_pos = journal_data_pos; + if (512 - journal_sector_pos < sizeof(struct journal_entry_small_write)) + { + next_pos = next_pos + 512; + if (journal_len - next_pos < op->len) + two_sqes = true; + if (next_pos >= journal_len) + next_pos = 512; + } + else if (journal_sector + 512 != journal_data_pos || journal_len - journal_data_pos < op->len) + two_sqes = true; + next_pos = (journal_len - next_pos < op->len ? 512 : next_pos) + op->len; + if (next_pos >= journal_start) + { + // No space in the journal. Wait until it's available + op->wait_for = WAIT_JOURNAL; + op->wait_detail = next_pos - journal_start; + return 0; + } + // There is sufficient space. Get SQE(s) + struct io_uring_sqe *sqe1 = get_sqe(), *sqe2 = two_sqes ? get_sqe() : NULL; + if (!sqe1 || two_sqes && !sqe2) + { + // Pause until there are more requests available + op->wait_for = WAIT_SQE; + return 0; + } + struct ring_data_t *data1 = ((ring_data_t*)sqe1->user_data); + struct ring_data_t *data2 = two_sqes ? ((ring_data_t*)sqe2->user_data) : NULL; + // Got SQEs. Prepare journal sector write + if (512 - journal_sector_pos < sizeof(struct journal_entry_small_write)) + { + // Move to the next journal sector + next_pos = journal_data_pos + 512; + if (next_pos >= journal_len) + next_pos = 512; + journal_sector = journal_data_pos; + journal_sector_pos = 0; + journal_data_pos = next_pos; + memset(journal_sector_buf, 0, 512); + } + journal_entry_small_write *je = (struct journal_entry_small_write*)(journal_sector_buf + journal_sector_pos); + *je = { + .crc32 = 0, + .magic = JOURNAL_MAGIC, + .type = JE_SMALL_WRITE, + .size = sizeof(struct journal_entry_small_write), + .crc32_prev = journal_crc32_last, + .oid = op->oid, + .version = op->version, + .offset = op->offset, + .len = op->len, + }; + je.crc32 = je_crc32((journal_entry*)je); + data1->iov = (struct iovec){ journal_sector_buf, 512 }; + data1->op = op; + io_uring_prep_writev( + sqe1, journal_fd, &data1->iov, 1, journal_offset + journal_sector + ); + // Prepare journal data write + if (journal_len - journal_data_pos < op->len) + journal_data_pos = 512; + data2->iov = (struct iovec){ op->buf, op->len }; + data2->op = op; + io_uring_prep_writev( + sqe2, journal_fd, &data2->iov, 1, journal_offset + journal_data_pos + ); + (*dirty_it).location = journal_data_pos; + //(*dirty_it).state = ST_J_SUBMITTED; + // Move journal_data_pos + journal_data_pos += op->len; + if (journal_data_pos >= journal_len) + journal_data_pos = 512; + op->pending_ops = 2; + } + in_process_ops.insert(op); + int ret = ringloop->submit(); + if (ret < 0) + { + throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); + } + return 1; +}