diff --git a/blockstore.h b/blockstore.h index 6d17daf8..c01ce067 100644 --- a/blockstore.h +++ b/blockstore.h @@ -35,8 +35,12 @@ #define ST_D_STABLE 20 #define ST_D_META_MOVED 21 #define ST_D_META_COMMITTED 22 +#define ST_DEL_WRITTEN 23 +#define ST_DEL_SYNCED 24 +#define ST_DEL_STABLE 25 +#define ST_DEL_MOVED 26 #define ST_CURRENT 32 -#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32) +#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32 || (st) == 24 || (st) == 25) #define IS_JOURNAL(st) (st >= 2 && st <= 6) // Default object size is 128 KB @@ -183,4 +187,6 @@ public: int read(blockstore_operation *read_op); int fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location); + int fulfill_read_push(blockstore_operation & read_op, uint32_t item_start, + uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end); }; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 2992f91d..02555175 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -1,5 +1,4 @@ #include "blockstore.h" -#include "crc32c.h" blockstore_init_meta::blockstore_init_meta(blockstore *bs) { @@ -101,13 +100,6 @@ bool iszero(uint64_t *buf, int len) return true; } -inline uint32_t je_crc32(journal_entry *je) -{ - return crc32c_zero4(((uint8_t*)je)+4, je->size-4); -} - -#define JOURNAL_BUFFER_SIZE 4*1024*1024 - int blockstore_init_journal::read_loop() { if (step == 100) @@ -148,8 +140,8 @@ int blockstore_init_journal::read_loop() if (iszero((uint64_t*)journal_buffer, 3)) { // Journal is empty - bs->journal_start = 0; - bs->journal_end = 0; + bs->journal_start = 512; + bs->journal_end = 512; step = 99; } else @@ -194,36 +186,51 @@ int blockstore_init_journal::read_loop() { // Continue from the beginning journal_pos = 512; + wrapped = true; } submitted = 0; } } - if (!submitted && step != 3) + if (!submitted) { - struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring); - if (!sqe) + if (step != 3) { - throw new std::runtime_error("io_uring is full while trying to read journal"); + if (journal_pos == bs->journal_start && wrapped) + { + step = 3; + } + else + { + struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring); + if (!sqe) + { + throw new std::runtime_error("io_uring is full while trying to read journal"); + } + uint64_t end = bs->journal_len; + if (journal_pos < bs->journal_start) + { + end = bs->journal_start; + } + submit_iov = { + journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0), + end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, + }; + io_uring_prep_readv(sqe, bs->journal_fd, &submit_iov, 1, bs->journal_offset + journal_pos); + io_uring_submit(bs->ring); + submitted = done_buf == 1 ? 2 : 1; + } } - uint64_t end = bs->journal_len; - if (journal_pos < bs->journal_start) + else { - end = bs->journal_start; + step = 99; } - submit_iov = { - journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0), - end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, - }; - io_uring_prep_readv(sqe, bs->journal_fd, &submit_iov, 1, bs->journal_offset + journal_pos); - io_uring_submit(bs->ring); - submitted = done_buf == 1 ? 2 : 1; } if (done_buf && step != 3) { // handle journal entries - if (handle_journal(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0) + if (handle_journal_part(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0) { - // finish + // journal ended. wait for the next read to complete, then stop step = 3; } done_buf = 0; @@ -232,43 +239,69 @@ int blockstore_init_journal::read_loop() if (step == 99) { free(journal_buffer); + bs->journal_crc32_last = crc32_last; journal_buffer = NULL; step = 100; } return 1; } -int blockstore_init_journal::handle_journal(void *buf, int len) +int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) { - int total_pos = 0; + uint64_t total_pos = 0; + if (cur_skip >= 0) + { + total_pos = cur_skip; + cur_skip = 0; + } while (total_pos < len) { - int pos = 0, skip = 0; + total_pos += 512; + uint64_t pos = 0; while (pos < 512) { journal_entry *je = (journal_entry*)((uint8_t*)buf + total_pos + pos); if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 || je->type < JE_SMALL_WRITE || je->type > JE_DELETE || je->crc32_prev != crc32_last) { - // Invalid entry - end of the journal - bs->journal_end = done_pos + total_pos + pos; - // FIXME: save - return 0; + if (pos == 0) + { + // invalid entry in the beginning, this is definitely the end of the journal + bs->journal_end = done_pos + total_pos + pos; + return 0; + } + else + { + // allow partially filled sectors + break; + } } pos += je->size; + crc32_last = je->crc32; if (je->type == JE_SMALL_WRITE) { // oid, version, offset, len + uint64_t location; + if (cur_skip > 0 || done_pos + total_pos + je->small_write.len > bs->journal_len) + { + // data continues from the beginning of the journal + location = 512 + cur_skip; + cur_skip += je->small_write.len; + } + else + { + // data is right next + location = done_pos + total_pos; + total_pos += je->small_write.len; + } bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){ .version = je->small_write.version, .state = ST_J_SYNCED, .flags = 0, - // FIXME: data in journal may never be non-contiguous - .location = done_pos + total_pos + 512 + skip, + .location = location, .offset = je->small_write.offset, .size = je->small_write.len, }); - skip += je->small_write.len; } else if (je->type == JE_BIG_WRITE) { @@ -288,28 +321,47 @@ int blockstore_init_journal::handle_journal(void *buf, int len) auto it = bs->dirty_queue.find(je->stable.oid); if (it == bs->dirty_queue.end()) { - // FIXME ignore entry, but warn + // journal contains a legitimate STABLE entry for a non-existing dirty write + // this probably means that journal was trimmed between WRITTEN and STABLE entries + // skip for now. but FIXME: maybe warn about it in the future } else { auto & lst = it->second; - for (int i = 0; i < lst.size(); i++) + int i; + for (i = 0; i < lst.size(); i++) { if (lst[i].version == je->stable.version) { - lst[i].state = (lst[i].state == ST_D_META_SYNCED ? ST_D_STABLE : ST_J_STABLE); + lst[i].state = (lst[i].state == ST_D_META_SYNCED + ? ST_D_STABLE + : (lst[i].state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE)); break; } } + if (i >= lst.size()) + { + // same. STABLE entry for a missing object version + } } } else if (je->type == JE_DELETE) { // oid, version - // FIXME + bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){ + .version = je->small_write.version, + .state = ST_DEL_SYNCED, + .flags = 0, + .location = 0, + .offset = 0, + .size = 0, + }); } } - total_pos += 512 + skip; + } + if (cur_skip == 0 && total_pos > len) + { + cur_skip = total_pos - len; } return 1; } diff --git a/blockstore_init.h b/blockstore_init.h index 51fde6dd..e240ee93 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -22,8 +22,9 @@ class blockstore_init_journal struct iovec submit_iov; uint64_t done_pos = 0, journal_pos = 0; uint64_t cur_skip = 0; + bool wrapped = false; int submitted = 0, done_buf = 0, done_len = 0; - int handle_journal(void *buf, int len); + int handle_journal_part(void *buf, uint64_t len); public: blockstore_init_journal(blockstore* bs); int read_loop(); diff --git a/blockstore_journal.h b/blockstore_journal.h index 1314a1a0..64a9aba0 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -1,7 +1,10 @@ #pragma once +#include "crc32c.h" + #define MIN_JOURNAL_SIZE 4*1024*1024 #define JOURNAL_MAGIC 0x4A33 +#define JOURNAL_BUFFER_SIZE 4*1024*1024 // Journal entries // Journal entries are linked to each other by their crc32 value @@ -90,3 +93,8 @@ struct __attribute__((__packed__)) journal_entry journal_entry_del del; }; }; + +inline uint32_t je_crc32(journal_entry *je) +{ + return crc32c_zero4(((uint8_t*)je)+4, je->size-4); +} diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 0cb686da..51744679 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -1,5 +1,46 @@ #include "blockstore.h" +int blockstore::fulfill_read_push(blockstore_operation & read_op, uint32_t item_start, + uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end) +{ + if (cur_end > cur_start) + { + if (item_state == ST_IN_FLIGHT) + { + // Pause until it's written somewhere + read_op.wait_for = WAIT_IN_FLIGHT; + read_op.wait_version = item_version; + return -1; + } + else if (item_state == ST_DEL_WRITTEN || item_state == ST_DEL_SYNCED || item_state == ST_DEL_MOVED) + { + // item is unallocated - return zeroes + memset(read_op.buf + cur_start - read_op.offset, 0, cur_end - cur_start); + return 0; + } + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) + { + // Pause until there are more requests available + read_op.wait_for = WAIT_SQE; + return -1; + } + read_op.read_vec[cur_start] = (struct iovec){ + read_op.buf + cur_start - read_op.offset, + cur_end - cur_start + }; + io_uring_prep_readv( + sqe, + IS_JOURNAL(item_state) ? journal_fd : data_fd, + // FIXME: &read_op.read_vec is forbidden + &read_op.read_vec[cur_start], 1, + (IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start + ); + io_uring_sqe_set_data(sqe, 0/*read op link*/); + } + return 0; +} + int blockstore::fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location) { @@ -19,65 +60,16 @@ int blockstore::fulfill_read(blockstore_operation & read_op, uint32_t item_start } while (fulfill_near != read_op.read_vec.end() && fulfill_near->first < item_end) { - if (fulfill_near->first > cur_start) + if (fulfill_read_push(read_op, item_start, item_state, item_version, item_location, cur_start, fulfill_near->first) < 0) { - if (item_state == ST_IN_FLIGHT) - { - // Pause until it's written somewhere - read_op.wait_for = WAIT_IN_FLIGHT; - read_op.wait_version = item_version; - return -1; - } - struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) - { - // Pause until there are more requests available - read_op.wait_for = WAIT_SQE; - return -1; - } - read_op.read_vec[cur_start] = (struct iovec){ - read_op.buf + cur_start - read_op.offset, - fulfill_near->first - cur_start - }; - io_uring_prep_readv( - sqe, - IS_JOURNAL(item_state) ? journal_fd : data_fd, - // FIXME: &read_op.read_vec is forbidden - &read_op.read_vec[cur_start], 1, - (IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start - ); - io_uring_sqe_set_data(sqe, 0/*read op link*/); + return -1; } cur_start = fulfill_near->first + fulfill_near->second.iov_len; fulfill_near++; } - if (cur_start < item_end) + if (fulfill_read_push(read_op, item_start, item_state, item_version, item_location, cur_start, item_end) < 0) { - if (item_state == ST_IN_FLIGHT) - { - // Pause until it's written somewhere - read_op.wait_for = WAIT_IN_FLIGHT; - read_op.wait_version = item_version; - return -1; - } - struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) - { - // Pause until there are more requests available - read_op.wait_for = WAIT_SQE; - return -1; - } - read_op.read_vec[cur_start] = (struct iovec){ - read_op.buf + cur_start - read_op.offset, - item_end - cur_start - }; - io_uring_prep_readv( - sqe, - IS_JOURNAL(item_state) ? journal_fd : data_fd, - &read_op.read_vec[cur_start], 1, - (IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start - ); - io_uring_sqe_set_data(sqe, 0/*read op link*/); + return -1; } } return 0; @@ -121,6 +113,7 @@ int blockstore::read(blockstore_operation *read_op) // need to wait for something, undo added requests and requeue op ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); + // FIXME: bad implementation submit_queue.push_front(read_op); return 0; } @@ -128,11 +121,11 @@ int blockstore::read(blockstore_operation *read_op) if (!read_op->read_vec.size()) { // region is not allocated - return zeroes - free(read_op); memset(read_op->buf, 0, read_op->len); read_op->callback(read_op); return 0; } + // FIXME reap events! int ret = io_uring_submit(ring); if (ret < 0) {