From bb55a7fbf4687f9bd2d71d1036d6d613832c7e2b Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 12 Nov 2019 19:30:28 +0300 Subject: [PATCH] Zero-fill new objects and write them to the main storage --- blockstore.cpp | 2 ++ blockstore.h | 11 +++++++++-- blockstore_stable.cpp | 15 +++------------ blockstore_write.cpp | 26 +++++++++++++++++++++----- 4 files changed, 35 insertions(+), 19 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index b288294d..de2e87c2 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -13,6 +13,7 @@ blockstore::blockstore(spp::sparse_hash_map & config, { throw new std::runtime_error("Bad block size"); } + zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size); data_fd = meta_fd = journal.fd = -1; try { @@ -38,6 +39,7 @@ blockstore::blockstore(spp::sparse_hash_map & config, blockstore::~blockstore() { + free(zero_object); ringloop->unregister_consumer(ring_consumer.number); if (data_fd >= 0) close(data_fd); diff --git a/blockstore.h b/blockstore.h index ae7c646a..70f9ece3 100644 --- a/blockstore.h +++ b/blockstore.h @@ -69,14 +69,17 @@ #define STRIPE_REPLICA(oid) ((oid) & 0xf) #define BS_SUBMIT_GET_SQE(sqe, data) \ + BS_SUBMIT_GET_ONLY_SQE(sqe); \ + struct ring_data_t *data = ((ring_data_t*)sqe->user_data) + +#define BS_SUBMIT_GET_ONLY_SQE(sqe) \ struct io_uring_sqe *sqe = get_sqe();\ if (!sqe)\ {\ /* Pause until there are more requests available */\ op->wait_for = WAIT_SQE;\ return 0;\ - }\ - struct ring_data_t *data = ((ring_data_t*)sqe->user_data) + } #define BS_SUBMIT_GET_SQE_DECL(sqe) \ sqe = get_sqe();\ @@ -230,6 +233,9 @@ private: // Sync, write uint64_t min_used_journal_sector, max_used_journal_sector; + // Write + struct iovec iov_zerofill[3]; + // Sync std::vector sync_big_writes, sync_small_writes; std::list::iterator in_progress_ptr; @@ -251,6 +257,7 @@ class blockstore uint32_t block_order, block_size; uint64_t block_count; allocator *data_alloc; + uint8_t *zero_object; int meta_fd; int data_fd; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index efe152ee..c42d08da 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -174,7 +174,6 @@ class journal_flusher_t std::vector v; std::vector::iterator it; uint64_t offset, len, submit_len, clean_loc; - bool allocated; public: journal_flusher_t(int flush_count); @@ -208,7 +207,6 @@ void journal_flusher_t::loop() v.clear(); wait_count = 0; clean_loc = UINT64_MAX; - allocated = false; skip_copy = false; do { @@ -249,7 +247,7 @@ void journal_flusher_t::loop() } } // So subsequent stabilizers don't flush the entry again - dirty_it->second.state = ST_J_READ_SUBMITTED; + dirty_it->second.state = ST_J_MOVE_READ_SUBMITTED; } else if (dirty_it->second.state == ST_D_STABLE) { @@ -272,15 +270,8 @@ void journal_flusher_t::loop() auto clean_it = bs->clean_db.find(cur.oid); if (clean_it == bs->clean_db.end()) { - // Object not present at all. We must allocate and zero it. - clean_loc = allocator_find_free(bs->data_alloc); - if (clean_loc == UINT64_MAX) - { - throw new std::runtime_error("No space on the data device while trying to flush journal"); - } - // This is an interesting part. Flushing journal results in an allocation we don't know where to put O_o. - allocator_set(bs->data_alloc, clean_loc, true); - allocated = true; + // Object not present at all. This is a bug. + throw new std::runtime_error("BUG: Object we are trying to flush not allocated on the data device"); } else clean_loc = clean_it->second.location; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index a8f310cf..46bd943b 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -37,7 +37,7 @@ void blockstore::enqueue_write(blockstore_operation *op) }); // Remember write as unsynced here, so external consumers could get // the list of dirty objects to sync just before issuing a SYNC request - if (op->len == block_size) + if (op->len == block_size || op->version == 1) { // Remember big write as unsynced unsynced_big_writes.push_back((obj_ver_id){ @@ -62,7 +62,7 @@ int blockstore::dequeue_write(blockstore_operation *op) .oid = op->oid, .version = op->version, }); - if (op->len == block_size) + if (op->len == block_size || op->version == 1) { // Big (redirect) write uint64_t loc = allocator_find_free(data_alloc); @@ -77,10 +77,26 @@ int blockstore::dequeue_write(blockstore_operation *op) dirty_it->second.location = loc << block_order; dirty_it->second.state = ST_D_SUBMITTED; allocator_set(data_alloc, loc, true); - data->iov = (struct iovec){ op->buf, op->len }; + int vcnt = 0; + if (op->version == 1 && op->len != block_size) + { + // zero fill newly allocated object + // FIXME: it's not so good because it turns new small writes into big writes + // but it's the first and the simplest implementation + if (op->offset > 0) + op->iov_zerofill[vcnt++] = (struct iovec){ zero_object, op->offset }; + op->iov_zerofill[vcnt++] = (struct iovec){ op->buf, op->len }; + if (op->offset+op->len < block_size) + op->iov_zerofill[vcnt++] = (struct iovec){ zero_object, block_size - (op->offset + op->len) }; + } + else + { + vcnt = 1; + op->iov_zerofill[0] = (struct iovec){ op->buf, op->len }; + } data->op = op; io_uring_prep_writev( - sqe, data_fd, &data->iov, 1, data_offset + (loc << block_order) + sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order) ); op->pending_ops = 1; op->min_used_journal_sector = op->max_used_journal_sector = 0; @@ -100,7 +116,7 @@ int blockstore::dequeue_write(blockstore_operation *op) return 0; } // There is sufficient space. Get SQE(s) - BS_SUBMIT_GET_SQE(sqe1, data1); + BS_SUBMIT_GET_ONLY_SQE(sqe1); BS_SUBMIT_GET_SQE(sqe2, data2); // Got SQEs. Prepare journal sector write journal_entry_small_write *je = (journal_entry_small_write*)