diff --git a/Makefile b/Makefile index 0d5efce4..51f82cf7 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,6 @@ -all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o test +all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o crc32c.o test +crc32c.o: crc32c.c + gcc -c -o $@ $< %.o: %.cpp gcc -c -o $@ $< test: test.cpp diff --git a/blockstore.h b/blockstore.h index 61d3d742..6d17daf8 100644 --- a/blockstore.h +++ b/blockstore.h @@ -147,7 +147,7 @@ public: spp::sparse_hash_map dirty_queue; std::deque submit_queue; std::set in_process_ops; - int block_order, block_size; + uint32_t block_order, block_size; uint64_t block_count; allocator *data_alloc; @@ -160,6 +160,7 @@ public: uint64_t data_offset, data_size, data_len; uint64_t journal_start, journal_end; + uint32_t journal_crc32_last; struct io_uring *ring; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index c56535d8..2992f91d 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -1,4 +1,5 @@ #include "blockstore.h" +#include "crc32c.h" blockstore_init_meta::blockstore_init_meta(blockstore *bs) { @@ -92,12 +93,223 @@ blockstore_init_journal::blockstore_init_journal(blockstore *bs) this->bs = bs; } +bool iszero(uint64_t *buf, int len) +{ + for (int i = 0; i < len; i++) + if (buf[i] != 0) + return false; + return true; +} + +inline uint32_t je_crc32(journal_entry *je) +{ + return crc32c_zero4(((uint8_t*)je)+4, je->size-4); +} + +#define JOURNAL_BUFFER_SIZE 4*1024*1024 + int blockstore_init_journal::read_loop() { + if (step == 100) + { + return 0; + } if (!journal_buffer) { - journal_buffer = new uint8_t[4*1024*1024]; + journal_buffer = (uint8_t*)memalign(DISK_ALIGNMENT, 2*JOURNAL_BUFFER_SIZE); } - - return 0; + if (step == 0) + { + // Step 1: Read first block of the journal + struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring); + if (!sqe) + { + throw new std::runtime_error("io_uring is full while trying to read journal"); + } + submit_iov = { journal_buffer, 512 }; + io_uring_prep_readv(sqe, bs->journal_fd, &submit_iov, 1, bs->journal_offset); + io_uring_submit(bs->ring); + step = 1; + } + if (step == 1) + { + // Step 2: Get the completion event and check the beginning for entry + struct io_uring_cqe *cqe; + io_uring_peek_cqe(bs->ring, &cqe); + if (cqe) + { + if (cqe->res < 0) + { + throw new std::runtime_error( + std::string("read journal failed at offset ") + std::to_string(0) + + std::string(": ") + strerror(-cqe->res) + ); + } + if (iszero((uint64_t*)journal_buffer, 3)) + { + // Journal is empty + bs->journal_start = 0; + bs->journal_end = 0; + step = 99; + } + else + { + // First block always contains a single JE_START entry + journal_entry_start *je = (journal_entry_start*)journal_buffer; + if (je->magic != JOURNAL_MAGIC || + je->type != JE_START || + je->size != sizeof(journal_entry_start) || + je_crc32((journal_entry*)je) != je->crc32) + { + // Entry is corrupt + throw new std::runtime_error("first entry of the journal is corrupt"); + } + journal_pos = bs->journal_start = je->journal_start; + crc32_last = je->crc32_replaced; + step = 2; + } + } + } + if (step == 2 || step == 3) + { + // Step 3: Read journal + if (submitted) + { + struct io_uring_cqe *cqe; + io_uring_peek_cqe(bs->ring, &cqe); + if (cqe) + { + if (cqe->res < 0) + { + throw new std::runtime_error( + std::string("read journal failed at offset ") + std::to_string(journal_pos) + + std::string(": ") + strerror(-cqe->res) + ); + } + done_pos = journal_pos; + done_buf = submitted; + done_len = cqe->res; + journal_pos += cqe->res; + if (journal_pos >= bs->journal_len) + { + // Continue from the beginning + journal_pos = 512; + } + submitted = 0; + } + } + if (!submitted && step != 3) + { + struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring); + if (!sqe) + { + throw new std::runtime_error("io_uring is full while trying to read journal"); + } + uint64_t end = bs->journal_len; + if (journal_pos < bs->journal_start) + { + end = bs->journal_start; + } + submit_iov = { + journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0), + end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, + }; + io_uring_prep_readv(sqe, bs->journal_fd, &submit_iov, 1, bs->journal_offset + journal_pos); + io_uring_submit(bs->ring); + submitted = done_buf == 1 ? 2 : 1; + } + if (done_buf && step != 3) + { + // handle journal entries + if (handle_journal(journal_buffer + (done_buf == 1 ? 0 : JOURNAL_BUFFER_SIZE), done_len) == 0) + { + // finish + step = 3; + } + done_buf = 0; + } + } + if (step == 99) + { + free(journal_buffer); + journal_buffer = NULL; + step = 100; + } + return 1; +} + +int blockstore_init_journal::handle_journal(void *buf, int len) +{ + int total_pos = 0; + while (total_pos < len) + { + int pos = 0, skip = 0; + while (pos < 512) + { + journal_entry *je = (journal_entry*)((uint8_t*)buf + total_pos + pos); + if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 || + je->type < JE_SMALL_WRITE || je->type > JE_DELETE || je->crc32_prev != crc32_last) + { + // Invalid entry - end of the journal + bs->journal_end = done_pos + total_pos + pos; + // FIXME: save + return 0; + } + pos += je->size; + if (je->type == JE_SMALL_WRITE) + { + // oid, version, offset, len + bs->dirty_queue[je->small_write.oid].push_back((dirty_entry){ + .version = je->small_write.version, + .state = ST_J_SYNCED, + .flags = 0, + // FIXME: data in journal may never be non-contiguous + .location = done_pos + total_pos + 512 + skip, + .offset = je->small_write.offset, + .size = je->small_write.len, + }); + skip += je->small_write.len; + } + else if (je->type == JE_BIG_WRITE) + { + // oid, version, block + bs->dirty_queue[je->big_write.oid].push_back((dirty_entry){ + .version = je->big_write.version, + .state = ST_D_META_SYNCED, + .flags = 0, + .location = je->big_write.block, + .offset = 0, + .size = bs->block_size, + }); + } + else if (je->type == JE_STABLE) + { + // oid, version + auto it = bs->dirty_queue.find(je->stable.oid); + if (it == bs->dirty_queue.end()) + { + // FIXME ignore entry, but warn + } + else + { + auto & lst = it->second; + for (int i = 0; i < lst.size(); i++) + { + if (lst[i].version == je->stable.version) + { + lst[i].state = (lst[i].state == ST_D_META_SYNCED ? ST_D_STABLE : ST_J_STABLE); + break; + } + } + } + } + else if (je->type == JE_DELETE) + { + // oid, version + // FIXME + } + } + total_pos += 512 + skip; + } + return 1; } diff --git a/blockstore_init.h b/blockstore_init.h index 115291f3..51fde6dd 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -3,20 +3,27 @@ class blockstore_init_meta { blockstore *bs; - uint8_t *metadata_buffer; + uint8_t *metadata_buffer = NULL; uint64_t metadata_read = 0; struct iovec submit_iov; int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0; + void handle_entries(struct clean_disk_entry* entries, int count); public: blockstore_init_meta(blockstore* bs); int read_loop(); - void handle_entries(struct clean_disk_entry* entries, int count); }; class blockstore_init_journal { blockstore *bs; - uint8_t *journal_buffer; + uint8_t *journal_buffer = NULL; + int step = 0; + uint32_t crc32_last = 0; + struct iovec submit_iov; + uint64_t done_pos = 0, journal_pos = 0; + uint64_t cur_skip = 0; + int submitted = 0, done_buf = 0, done_len = 0; + int handle_journal(void *buf, int len); public: blockstore_init_journal(blockstore* bs); int read_loop(); diff --git a/blockstore_journal.h b/blockstore_journal.h index d12f7273..1314a1a0 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -12,22 +12,23 @@ #define JE_STABLE 0x04 #define JE_DELETE 0x05 +// crc32c comes first to ease calculation and is equal to crc32() struct __attribute__((__packed__)) journal_entry_start { + uint32_t crc32; uint16_t magic; uint16_t type; uint32_t size; - uint32_t crc32; - uint32_t reserved1; - uint64_t offset; + uint32_t crc32_replaced; + uint64_t journal_start; }; struct __attribute__((__packed__)) journal_entry_small_write { + uint32_t crc32; uint16_t magic; uint16_t type; uint32_t size; - uint32_t crc32; uint32_t crc32_prev; object_id oid; uint64_t version; @@ -38,10 +39,10 @@ struct __attribute__((__packed__)) journal_entry_small_write struct __attribute__((__packed__)) journal_entry_big_write { + uint32_t crc32; uint16_t magic; uint16_t type; uint32_t size; - uint32_t crc32; uint32_t crc32_prev; object_id oid; uint64_t version; @@ -50,10 +51,10 @@ struct __attribute__((__packed__)) journal_entry_big_write struct __attribute__((__packed__)) journal_entry_stable { + uint32_t crc32; uint16_t magic; uint16_t type; uint32_t size; - uint32_t crc32; uint32_t crc32_prev; object_id oid; uint64_t version; @@ -61,10 +62,10 @@ struct __attribute__((__packed__)) journal_entry_stable struct __attribute__((__packed__)) journal_entry_del { + uint32_t crc32; uint16_t magic; uint16_t type; uint32_t size; - uint32_t crc32; uint32_t crc32_prev; object_id oid; uint64_t version; @@ -76,10 +77,11 @@ struct __attribute__((__packed__)) journal_entry { struct __attribute__((__packed__)) { + uint32_t crc32; uint16_t magic; uint16_t type; uint32_t size; - uint32_t crc32; + uint32_t crc32_prev; }; journal_entry_start start; journal_entry_small_write small_write; diff --git a/crc32c.c b/crc32c.c index b3dab597..7bc6d491 100644 --- a/crc32c.c +++ b/crc32c.c @@ -96,3 +96,18 @@ uint32_t crc32c(uint8_t *buf, int len) } return crc^0xffffffff; } + +uint32_t crc32c_zero4(uint8_t *buf, int len) +{ + uint32_t crc = 0xffffffff; + // pretend that first 4 bytes are zero + crc = (crc>>8) ^ crctable[(crc ^ 0) & 0xFF]; + crc = (crc>>8) ^ crctable[(crc ^ 0) & 0xFF]; + crc = (crc>>8) ^ crctable[(crc ^ 0) & 0xFF]; + crc = (crc>>8) ^ crctable[(crc ^ 0) & 0xFF]; + while (len-- > 0) + { + crc = (crc>>8) ^ crctable[(crc ^ (*buf++)) & 0xFF]; + } + return crc^0xffffffff; +} diff --git a/crc32c.h b/crc32c.h index 5b69b4e9..c3aa9583 100644 --- a/crc32c.h +++ b/crc32c.h @@ -9,3 +9,4 @@ // unsigned int _mm_crc32_u8 (unsigned int crc, unsigned char v) uint32_t crc32c(uint8_t *buf, int len); +uint32_t crc32c_zero4(uint8_t *buf, int len);