diff --git a/blockstore.cpp b/blockstore.cpp index 0088220c..3d2e1da5 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -93,15 +93,13 @@ void blockstore::handle_event(ring_data_t *data) // FIXME: our state becomes corrupted after a write error. maybe do something better than just die throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); } - if (op->used_journal_sector > 0) + if (op->min_used_journal_sector > 0) { - uint64_t s = op->used_journal_sector-1; - if (journal.sector_info[s].usage_count > 0) + for (uint64_t s = op->min_used_journal_sector; s <= op->max_used_journal_sector; s++) { - // The last write to this journal sector was made by this op, release the buffer - journal.sector_info[s].usage_count--; + journal.sector_info[s-1].usage_count--; } - op->used_journal_sector = 0; + op->min_used_journal_sector = op->max_used_journal_sector = 0; } if (op->pending_ops == 0) { @@ -123,6 +121,14 @@ void blockstore::handle_event(ring_data_t *data) } else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) { + if (op->min_used_journal_sector > 0) + { + for (uint64_t s = op->min_used_journal_sector; s <= op->max_used_journal_sector; s++) + { + journal.sector_info[s-1].usage_count--; + } + op->min_used_journal_sector = op->max_used_journal_sector = 0; + } } else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) diff --git a/blockstore.h b/blockstore.h index b625ce67..b1eaf1b4 100644 --- a/blockstore.h +++ b/blockstore.h @@ -69,11 +69,20 @@ struct io_uring_sqe *sqe = get_sqe();\ if (!sqe)\ {\ - // Pause until there are more requests available\ + /* Pause until there are more requests available */\ op->wait_for = WAIT_SQE;\ return 0;\ }\ - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + struct ring_data_t *data = ((ring_data_t*)sqe->user_data) + +#define BS_SUBMIT_GET_SQE_DECL(sqe) \ + sqe = get_sqe();\ + if (!sqe)\ + {\ + /* Pause until there are more requests available */\ + op->wait_for = WAIT_SQE;\ + return 0;\ + } // 16 bytes per object/stripe id // stripe includes replica number in 4 least significant bits @@ -202,9 +211,10 @@ struct blockstore_operation // FIXME make all of these pointers and put them into a union std::map read_vec; - uint64_t used_journal_sector; + uint64_t min_used_journal_sector, max_used_journal_sector; std::deque sync_writes; - bool has_big_writes; + int big_write_count; + int big_write_state; }; class blockstore; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index dc645879..9d3ffcb2 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -18,7 +18,7 @@ int blockstore::dequeue_write(blockstore_operation *op) op->callback(op); return 1; } - BS_GET_SQE(sqe, data); + BS_SUBMIT_GET_SQE(sqe, data); dirty_it->second.location = loc << block_order; dirty_it->second.state = ST_D_SUBMITTED; allocator_set(data_alloc, loc, true); @@ -28,7 +28,7 @@ int blockstore::dequeue_write(blockstore_operation *op) sqe, data_fd, &data->iov, 1, data_offset + (loc << block_order) ); op->pending_ops = 1; - op->used_journal_sector = 0; + op->min_used_journal_sector = op->max_used_journal_sector = 0; } else { @@ -38,11 +38,9 @@ int blockstore::dequeue_write(blockstore_operation *op) uint64_t next_pos = journal.next_free; if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write)) { - next_pos = next_pos + 512; //if (journal.len - next_pos < op->len) // two_sqes = true; - if (next_pos >= journal.len) - next_pos = 512; + next_pos = (next_pos+512) < journal.len ? next_pos+512 : 512; // Also check if we have an unused memory buffer for the journal sector if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0) { @@ -64,8 +62,8 @@ int blockstore::dequeue_write(blockstore_operation *op) } // There is sufficient space. Get SQE(s) unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; - BS_GET_SQE(sqe1, data1); - BS_GET_SQE(sqe2, data2); + BS_SUBMIT_GET_SQE(sqe1, data1); + BS_SUBMIT_GET_SQE(sqe2, data2); // Got SQEs. Prepare journal sector write if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write)) { @@ -74,7 +72,7 @@ int blockstore::dequeue_write(blockstore_operation *op) journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count); journal.sector_info[journal.cur_sector].offset = journal.next_free; journal.in_sector_pos = 0; - journal.next_free = (journal.next_free + 512) >= journal.len ? journal.next_free + 512 : 512; + journal.next_free = (journal.next_free+512) < journal.len ? journal.next_free + 512 : 512; memset(journal.sector_buf + 512*journal.cur_sector, 0, 512); } journal_entry_small_write *je = (struct journal_entry_small_write*)( @@ -92,14 +90,14 @@ int blockstore::dequeue_write(blockstore_operation *op) .len = op->len, }; je->crc32 = je_crc32((journal_entry*)je); + journal.crc32_last = je->crc32; data1->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; data1->op = op; io_uring_prep_writev( sqe1, journal.fd, &data1->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset ); // Prepare journal data write - if (journal.len - journal.next_free < op->len) - journal.next_free = 512; + journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free + op->len : 512; data2->iov = (struct iovec){ op->buf, op->len }; data2->op = op; io_uring_prep_writev( @@ -109,32 +107,31 @@ int blockstore::dequeue_write(blockstore_operation *op) dirty_it->second.state = ST_J_SUBMITTED; // Move journal.next_free and save last write for current sector journal.next_free += op->len; - if (journal.next_free >= journal.len) - journal.next_free = 512; journal.sector_info[journal.cur_sector].usage_count++; - journal.crc32_last = je->crc32; op->pending_ops = 2; - op->used_journal_sector = 1 + journal.cur_sector; + op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; } return 1; } int blockstore::dequeue_sync(blockstore_operation *op) { - op->has_big_writes = 0x10000; + op->big_write_count = 0; + op->big_write_state = 0x10000; op->sync_writes.swap(unsynced_writes); unsynced_writes.clear(); - auto it = sync_writes.begin(); - while (it != sync_writes.end()) + auto it = op->sync_writes.begin(); + while (it != op->sync_writes.end()) { uint32_t state = dirty_db[*it].state; if (IS_BIG_WRITE(state)) { - op->has_big_writes = op->has_big_writes < state ? op->has_big_writes : state; + op->big_write_count++; + op->big_write_state = op->big_write_state < state ? op->big_write_state : state; } it++; } - if (op->has_big_writes == 0x10000 || op->has_big_writes == ST_D_META_WRITTEN) + if (op->big_write_count == 0 || op->big_write_state == ST_D_META_WRITTEN) { // Just fsync the journal BS_SUBMIT_GET_SQE(sqe, data); @@ -142,7 +139,7 @@ int blockstore::dequeue_sync(blockstore_operation *op) data->op = op; op->pending_ops = 1; } - else if (op->has_big_writes == ST_D_WRITTEN) + else if (op->big_write_state == ST_D_WRITTEN) { // 1st step: fsync data BS_SUBMIT_GET_SQE(sqe, data); @@ -150,10 +147,87 @@ int blockstore::dequeue_sync(blockstore_operation *op) data->op = op; op->pending_ops = 1; } - else if (op->has_big_writes == ST_D_SYNCED) + else if (op->big_write_state == ST_D_SYNCED) { // 2nd step: Data device is synced, prepare & write journal entries - + // Check space in the journal and journal memory buffers + int required = op->big_write_count, sectors_required = 1; + uint64_t next_pos = journal.next_free, next_sector = journal.cur_sector; + while (1) + { + int fits = (512 - journal.in_sector_pos) / sizeof(journal_entry_big_write); + required -= fits; + if (required <= 0) + break; + next_pos = (next_pos+512) < journal.len ? next_pos+512 : 512; + sectors_required++; + next_sector = ((next_sector + 1) % journal.sector_count); + if (journal.sector_info[next_sector].usage_count > 0) + { + // No memory buffer available. Wait for it. + op->wait_for = WAIT_JOURNAL_BUFFER; + return 0; + } + } + if (next_pos >= journal.used_start) + { + // No space in the journal. Wait for it. + op->wait_for = WAIT_JOURNAL; + op->wait_detail = next_pos; + return 0; + } + // Get SQEs. Don't bother about merging, submit each journal sector as a separate request + struct io_uring_sqe *sqe[sectors_required]; + for (int i = 0; i < sectors_required; i++) + { + BS_SUBMIT_GET_SQE_DECL(sqe[i]); + } + // Prepare and submit journal entries + op->min_used_journal_sector = 1 + journal.cur_sector; + sectors_required = 0; + required = op->big_write_count; + it = op->sync_writes.begin(); + while (1) + { + int fits = (512 - journal.in_sector_pos) / sizeof(journal_entry_big_write); + while (fits > 0 && required > 0) + { + journal_entry_big_write *je = (journal_entry_big_write*)( + journal.sector_buf + 512*journal.cur_sector + journal.in_sector_pos + ); + *je = { + .crc32 = 0, + .magic = JOURNAL_MAGIC, + .type = JE_BIG_WRITE, + .size = sizeof(journal_entry_big_write), + .crc32_prev = journal.crc32_last, + .oid = it->oid, + .version = it->version, + .block = dirty_db[*it].location, + }; + je->crc32 = je_crc32((journal_entry*)je); + journal.crc32_last = je->crc32; + journal.in_sector_pos += sizeof(journal_entry_big_write); + required--; + } + if (required <= 0) + break; + journal.sector_info[journal.cur_sector].usage_count++; + struct ring_data_t *data = ((ring_data_t*)sqe[sectors_required]->user_data); + data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; + data->op = op; + io_uring_prep_writev( + sqe[sectors_required], journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset + ); + journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count); + journal.sector_info[journal.cur_sector].offset = journal.next_free; + journal.in_sector_pos = 0; + journal.next_free = (journal.next_free + 512) < journal.len ? journal.next_free + 512 : 512; + memset(journal.sector_buf + 512*journal.cur_sector, 0, 512); + sectors_required++; + } + op->pending_ops = sectors_required; + op->max_used_journal_sector = 1 + journal.cur_sector; } return 1; }