diff --git a/blockstore.cpp b/blockstore.cpp index 44c3b605..fe950730 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -13,7 +13,7 @@ blockstore::blockstore(spp::sparse_hash_map & config, { throw new std::runtime_error("Bad block size"); } - data_fd = meta_fd = journal_fd = -1; + data_fd = meta_fd = journal.fd = -1; try { open_data(config); @@ -30,8 +30,8 @@ blockstore::blockstore(spp::sparse_hash_map & config, close(data_fd); if (meta_fd >= 0 && meta_fd != data_fd) close(meta_fd); - if (journal_fd >= 0 && journal_fd != meta_fd) - close(journal_fd); + if (journal.fd >= 0 && journal.fd != meta_fd) + close(journal.fd); throw e; } } @@ -43,8 +43,10 @@ blockstore::~blockstore() close(data_fd); if (meta_fd >= 0 && meta_fd != data_fd) close(meta_fd); - if (journal_fd >= 0 && journal_fd != meta_fd) - close(journal_fd); + if (journal.fd >= 0 && journal.fd != meta_fd) + close(journal.fd); + free(journal.sector_buf); + free(journal.sector_info); } // main event loop - handle requests @@ -81,6 +83,32 @@ void blockstore::handle_event(ring_data_t *data) in_process_ops.erase(op); } } + else if ((op->flags & OP_TYPE_MASK) == OP_WRITE || + (op->flags & OP_TYPE_MASK) == OP_DELETE) + { + op->pending_ops--; + if (data->res < 0) + { + // write error + // FIXME: our state becomes corrupted after a write error. maybe do something better than just die + throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); + op->retval = data->res; + } + if (op->used_journal_sector > 0) + { + uint64_t s = op->used_journal_sector-1; + if (journal.sector_info[s].usage_count > 0) + { + // The last write to this journal sector was made by this op, release the buffer + journal.sector_info[s].usage_count--; + } + op->used_journal_sector = 0; + } + if (op->pending_ops == 0) + { + + } + } } } @@ -180,14 +208,14 @@ int blockstore::enqueue_op(blockstore_operation *op) auto dirty_it = dirty_queue.find(op->oid); if (dirty_it != dirty_queue.end()) { - op->version = (*dirty_it).back().version + 1; + op->version = dirty_it->second.back().version + 1; } else { auto clean_it = object_db.find(op->oid); if (clean_it != object_db.end()) { - op->version = (*clean_it).version + 1; + op->version = clean_it->second.version + 1; } else { @@ -196,7 +224,7 @@ int blockstore::enqueue_op(blockstore_operation *op) dirty_it = dirty_queue.emplace(op->oid, dirty_list()).first; } // Immediately add the operation into the dirty queue, so subsequent reads could see it - (*dirty_it).push_back((dirty_entry){ + dirty_it->second.push_back((dirty_entry){ .version = op->version, .state = ST_IN_FLIGHT, .flags = 0, diff --git a/blockstore.h b/blockstore.h index e2609d4f..def7b741 100644 --- a/blockstore.h +++ b/blockstore.h @@ -139,8 +139,14 @@ public: #define OP_DELETE 6 #define OP_TYPE_MASK 0x7 +// Suspend operation until there are more free SQEs #define WAIT_SQE 1 +// Suspend operation until version of object is written #define WAIT_IN_FLIGHT 2 +// Suspend operation until there are bytes of free space in the journal on disk +#define WAIT_JOURNAL 3 +// Suspend operation until the next journal sector buffer is free +#define WAIT_JOURNAL_BUFFER 4 struct blockstore_operation { @@ -158,6 +164,7 @@ struct blockstore_operation int pending_ops; int wait_for; uint64_t wait_detail; + uint64_t used_journal_sector; }; class blockstore; @@ -176,16 +183,13 @@ public: uint64_t block_count; allocator *data_alloc; - int journal_fd; int meta_fd; int data_fd; - uint64_t journal_offset, journal_size, journal_len; uint64_t meta_offset, meta_size, meta_area, meta_len; uint64_t data_offset, data_size, data_len; - uint64_t journal_start, journal_end; - uint32_t journal_crc32_last; + struct journal_t journal; ring_loop_t *ringloop; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index ab4141c3..a2c5c677 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -112,8 +112,9 @@ void blockstore_init_journal::handle_event(ring_data_t *data) if (iszero((uint64_t*)journal_buffer, 3)) { // Journal is empty - bs->journal_start = 512; - bs->journal_end = 512; + // FIXME handle this wrapping to 512 better + bs->journal.used_start = 512; + bs->journal.next_free = 512; step = 99; } else @@ -128,7 +129,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data) // Entry is corrupt throw new std::runtime_error("first entry of the journal is corrupt"); } - journal_pos = bs->journal_start = je->journal_start; + journal_pos = bs->journal.used_start = je->journal_start; crc32_last = je->crc32_replaced; step = 2; } @@ -147,7 +148,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data) done_buf = submitted; done_len = data->res; journal_pos += data->res; - if (journal_pos >= bs->journal_len) + if (journal_pos >= bs->journal.len) { // Continue from the beginning journal_pos = 512; @@ -177,7 +178,7 @@ int blockstore_init_journal::loop() } struct ring_data_t *data = ((ring_data_t*)sqe->user_data); data->iov = { journal_buffer, 512 }; - io_uring_prep_readv(sqe, bs->journal_fd, &data->iov, 1, bs->journal_offset); + io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); bs->ringloop->submit(); step = 1; } @@ -188,7 +189,7 @@ int blockstore_init_journal::loop() { if (step != 3) { - if (journal_pos == bs->journal_start && wrapped) + if (journal_pos == bs->journal.used_start && wrapped) { step = 3; } @@ -200,16 +201,16 @@ int blockstore_init_journal::loop() throw new std::runtime_error("io_uring is full while trying to read journal"); } struct ring_data_t *data = ((ring_data_t*)sqe->user_data); - uint64_t end = bs->journal_len; - if (journal_pos < bs->journal_start) + uint64_t end = bs->journal.len; + if (journal_pos < bs->journal.used_start) { - end = bs->journal_start; + end = bs->journal.used_start; } data->iov = { journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0), end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, }; - io_uring_prep_readv(sqe, bs->journal_fd, &data->iov, 1, bs->journal_offset + journal_pos); + io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos); bs->ringloop->submit(); submitted = done_buf == 1 ? 2 : 1; } @@ -233,7 +234,7 @@ int blockstore_init_journal::loop() if (step == 99) { free(journal_buffer); - bs->journal_crc32_last = crc32_last; + bs->journal.crc32_last = crc32_last; journal_buffer = NULL; step = 100; } @@ -261,7 +262,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) if (pos == 0) { // invalid entry in the beginning, this is definitely the end of the journal - bs->journal_end = done_pos + total_pos + pos; + // FIXME handle the edge case when the journal is full + bs->journal.next_free = done_pos + total_pos; return 0; } else @@ -276,7 +278,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) { // oid, version, offset, len uint64_t location; - if (cur_skip > 0 || done_pos + total_pos + je->small_write.len > bs->journal_len) + if (cur_skip > 0 || done_pos + total_pos + je->small_write.len > bs->journal.len) { // data continues from the beginning of the journal location = 512 + cur_skip; diff --git a/blockstore_journal.h b/blockstore_journal.h index 64a9aba0..5bc3b042 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -98,3 +98,27 @@ inline uint32_t je_crc32(journal_entry *je) { return crc32c_zero4(((uint8_t*)je)+4, je->size-4); } + +struct journal_sector_info_t +{ + uint64_t offset; + uint64_t usage_count; +}; + +struct journal_t +{ + int fd; + uint64_t device_size; + + uint64_t offset, len; + uint64_t next_free = 512; + uint64_t used_start = 512; + uint32_t crc32_last = 0; + + // Current sector(s) used for writing + uint8_t *sector_buf; + journal_sector_info_t *sector_info; + uint64_t sector_count; + uint64_t cur_sector = 0; + uint64_t in_sector_pos = 0; +}; diff --git a/blockstore_open.cpp b/blockstore_open.cpp index c9f47152..b9cbddaa 100644 --- a/blockstore_open.cpp +++ b/blockstore_open.cpp @@ -8,10 +8,10 @@ void blockstore::calc_lengths(spp::sparse_hash_map & c { data_len = meta_offset - data_offset; } - if (data_fd == journal_fd && data_offset < journal_offset) + if (data_fd == journal.fd && data_offset < journal.offset) { - data_len = data_len < journal_offset-data_offset - ? data_len : journal_offset-data_offset; + data_len = data_len < journal.offset-data_offset + ? data_len : journal.offset-data_offset; } // meta meta_area = (meta_fd == data_fd ? data_size : meta_size) - meta_offset; @@ -19,21 +19,21 @@ void blockstore::calc_lengths(spp::sparse_hash_map & c { meta_area = data_offset - meta_offset; } - if (meta_fd == journal_fd && meta_offset < journal_offset) + if (meta_fd == journal.fd && meta_offset < journal.offset) { - meta_area = meta_area < journal_offset-meta_offset - ? meta_area : journal_offset-meta_offset; + meta_area = meta_area < journal.offset-meta_offset + ? meta_area : journal.offset-meta_offset; } // journal - journal_len = (journal_fd == data_fd ? data_size : (journal_fd == meta_fd ? meta_size : journal_size)) - journal_offset; - if (journal_fd == data_fd && journal_offset < data_offset) + journal.len = (journal.fd == data_fd ? data_size : (journal.fd == meta_fd ? meta_size : journal.device_size)) - journal.offset; + if (journal.fd == data_fd && journal.offset < data_offset) { - journal_len = data_offset - journal_offset; + journal.len = data_offset - journal.offset; } - if (journal_fd == meta_fd && journal_offset < meta_offset) + if (journal.fd == meta_fd && journal.offset < meta_offset) { - journal_len = journal_len < meta_offset-journal_offset - ? journal_len : meta_offset-journal_offset; + journal.len = journal.len < meta_offset-journal.offset + ? journal.len : meta_offset-journal.offset; } // required metadata size block_count = data_len / block_size; @@ -49,15 +49,15 @@ void blockstore::calc_lengths(spp::sparse_hash_map & c } // requested journal size uint64_t journal_wanted = stoull(config["journal_size"]); - if (journal_wanted > journal_len) + if (journal_wanted > journal.len) { throw new std::runtime_error("Requested journal_size is too large"); } else if (journal_wanted > 0) { - journal_len = journal_wanted; + journal.len = journal_wanted; } - if (journal_len < MIN_JOURNAL_SIZE) + if (journal.len < MIN_JOURNAL_SIZE) { throw new std::runtime_error("Journal is too small"); } @@ -129,20 +129,20 @@ void blockstore::open_meta(spp::sparse_hash_map & conf void blockstore::open_journal(spp::sparse_hash_map & config) { int sectsize; - journal_offset = stoull(config["journal_offset"]); - if (journal_offset % DISK_ALIGNMENT) + journal.offset = stoull(config["journal_offset"]); + if (journal.offset % DISK_ALIGNMENT) { throw new std::runtime_error("journal_offset not aligned"); } if (config["journal_device"] != "") { - journal_fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR); - if (journal_fd == -1) + journal.fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR); + if (journal.fd == -1) { throw new std::runtime_error("Failed to open journal device"); } - if (ioctl(journal_fd, BLKSSZGET, §size) < 0 || - ioctl(journal_fd, BLKGETSIZE64, &journal_size) < 0 || + if (ioctl(journal.fd, BLKSSZGET, §size) < 0 || + ioctl(journal.fd, BLKGETSIZE64, &journal.device_size) < 0 || sectsize != 512) { throw new std::runtime_error("Journal device sector is not equal to 512 bytes"); @@ -150,11 +150,22 @@ void blockstore::open_journal(spp::sparse_hash_map & c } else { - journal_fd = meta_fd; - journal_size = 0; - if (journal_offset >= data_size) + journal.fd = meta_fd; + journal.device_size = 0; + if (journal.offset >= data_size) { throw new std::runtime_error("journal_offset exceeds device size"); } } + journal.sector_count = stoull(config["journal_sector_buffer_count"]); + if (!journal.sector_count) + { + journal.sector_count = 32; + } + journal.sector_buf = (uint8_t*)memalign(512, journal.sector_count * 512); + journal.sector_info = (journal_sector_info_t*)calloc(journal.sector_count, sizeof(journal_sector_info_t)); + if (!journal.sector_buf || !journal.sector_info) + { + throw new std::bad_alloc(); + } } diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 5a384014..23a547dc 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -33,9 +33,9 @@ int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_s read_op->read_vec[cur_start] = data->iov; io_uring_prep_readv( sqe, - IS_JOURNAL(item_state) ? journal_fd : data_fd, + IS_JOURNAL(item_state) ? journal.fd : data_fd, &data->iov, 1, - (IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start + (IS_JOURNAL(item_state) ? journal.offset : data_offset) + item_location + cur_start - item_start ); data->op = read_op; } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 57f10d42..48724697 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -32,29 +32,39 @@ int blockstore::dequeue_write(blockstore_operation *op) sqe, data_fd, &data->iov, 1, data_offset + (loc << block_order) ); op->pending_ops = 1; + op->used_journal_sector = 0; } else { // Small (journaled) write // First check if the journal has sufficient space - bool two_sqes = false; - uint64_t next_pos = journal_data_pos; - if (512 - journal_sector_pos < sizeof(struct journal_entry_small_write)) + // FIXME Always two SQEs for now. Although it's possible to send 1 + bool two_sqes = true; + uint64_t next_pos = journal.next_free; + if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write)) { next_pos = next_pos + 512; - if (journal_len - next_pos < op->len) - two_sqes = true; - if (next_pos >= journal_len) + //if (journal.len - next_pos < op->len) + // two_sqes = true; + if (next_pos >= journal.len) next_pos = 512; + // Also check if we have an unused memory buffer for the journal sector + if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0) + { + // No memory buffer available. Wait for it. + op->wait_for = WAIT_JOURNAL_BUFFER; + return 0; + } } - else if (journal_sector + 512 != journal_data_pos || journal_len - journal_data_pos < op->len) - two_sqes = true; - next_pos = (journal_len - next_pos < op->len ? 512 : next_pos) + op->len; - if (next_pos >= journal_start) + //else if (journal.sector_info[journal.cur_sector].offset + 512 != journal.next_free || + // journal.len - next_pos < op->len) + // two_sqes = true; + next_pos = (journal.len - next_pos < op->len ? 512 : next_pos) + op->len; + if (next_pos >= journal.used_start) { - // No space in the journal. Wait until it's available + // No space in the journal. Wait for it. op->wait_for = WAIT_JOURNAL; - op->wait_detail = next_pos - journal_start; + op->wait_detail = next_pos - journal.used_start; return 0; } // There is sufficient space. Get SQE(s) @@ -68,50 +78,54 @@ int blockstore::dequeue_write(blockstore_operation *op) struct ring_data_t *data1 = ((ring_data_t*)sqe1->user_data); struct ring_data_t *data2 = two_sqes ? ((ring_data_t*)sqe2->user_data) : NULL; // Got SQEs. Prepare journal sector write - if (512 - journal_sector_pos < sizeof(struct journal_entry_small_write)) + if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write)) { // Move to the next journal sector - next_pos = journal_data_pos + 512; - if (next_pos >= journal_len) - next_pos = 512; - journal_sector = journal_data_pos; - journal_sector_pos = 0; - journal_data_pos = next_pos; - memset(journal_sector_buf, 0, 512); + // Also select next sector buffer in memory + journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count); + journal.sector_info[journal.cur_sector].offset = journal.next_free; + journal.in_sector_pos = 0; + journal.next_free = (journal.next_free + 512) >= journal.len ? journal.next_free + 512 : 512; + memset(journal.sector_buf + 512*journal.cur_sector, 0, 512); } - journal_entry_small_write *je = (struct journal_entry_small_write*)(journal_sector_buf + journal_sector_pos); + journal_entry_small_write *je = (struct journal_entry_small_write*)( + journal.sector_buf + 512*journal.cur_sector + journal.in_sector_pos + ); *je = { .crc32 = 0, .magic = JOURNAL_MAGIC, .type = JE_SMALL_WRITE, .size = sizeof(struct journal_entry_small_write), - .crc32_prev = journal_crc32_last, + .crc32_prev = journal.crc32_last, .oid = op->oid, .version = op->version, .offset = op->offset, .len = op->len, }; - je.crc32 = je_crc32((journal_entry*)je); - data1->iov = (struct iovec){ journal_sector_buf, 512 }; + je->crc32 = je_crc32((journal_entry*)je); + data1->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; data1->op = op; io_uring_prep_writev( - sqe1, journal_fd, &data1->iov, 1, journal_offset + journal_sector + sqe1, journal.fd, &data1->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset ); // Prepare journal data write - if (journal_len - journal_data_pos < op->len) - journal_data_pos = 512; + if (journal.len - journal.next_free < op->len) + journal.next_free = 512; data2->iov = (struct iovec){ op->buf, op->len }; data2->op = op; io_uring_prep_writev( - sqe2, journal_fd, &data2->iov, 1, journal_offset + journal_data_pos + sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free ); - (*dirty_it).location = journal_data_pos; + (*dirty_it).location = journal.next_free; //(*dirty_it).state = ST_J_SUBMITTED; - // Move journal_data_pos - journal_data_pos += op->len; - if (journal_data_pos >= journal_len) - journal_data_pos = 512; + // Move journal.next_free and save last write for current sector + journal.next_free += op->len; + if (journal.next_free >= journal.len) + journal.next_free = 512; + journal.sector_info[journal.cur_sector].usage_count++; + journal.crc32_last = je->crc32; op->pending_ops = 2; + op->used_journal_sector = 1 + journal.cur_sector; } in_process_ops.insert(op); int ret = ringloop->submit();