diff --git a/blockstore.cpp b/blockstore.cpp index c354956b..a262c2b5 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -3,7 +3,6 @@ blockstore::blockstore(spp::sparse_hash_map & config, ring_loop_t *ringloop) { this->ringloop = ringloop; - ring_consumer.handle_event = [this](ring_data_t *d) { handle_event(d); }; ring_consumer.loop = [this]() { loop(); }; ringloop->register_consumer(ring_consumer); initialized = 0; @@ -56,48 +55,6 @@ blockstore::~blockstore() free(journal.sector_info); } -// main event loop - handle requests -void blockstore::handle_event(ring_data_t *data) -{ - if (initialized != 10) - { - if (metadata_init_reader) - { - metadata_init_reader->handle_event(data); - } - else if (journal_init_reader) - { - journal_init_reader->handle_event(data); - } - } - else - { - struct blockstore_operation* op = (struct blockstore_operation*)data->op; - if ((op->flags & OP_TYPE_MASK) == OP_READ) - { - handle_read_event(data, op); - } - else if ((op->flags & OP_TYPE_MASK) == OP_WRITE || - (op->flags & OP_TYPE_MASK) == OP_DELETE) - { - handle_write_event(data, op); - } - else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) - { - handle_sync_event(data, op); - } - else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) - { - handle_stable_event(data, op); - } - else if ((op->flags & OP_TYPE_MASK) == OP_INTERNAL_FLUSH) - { - // Operation is not a blockstore_operation at all - - } - } -} - // main event loop - produce requests void blockstore::loop() { diff --git a/blockstore.h b/blockstore.h index c7e2f149..dc60f6e2 100644 --- a/blockstore.h +++ b/blockstore.h @@ -188,7 +188,6 @@ public: #define OP_SYNC 3 #define OP_STABLE 4 #define OP_DELETE 5 -#define OP_INTERNAL_FLUSH 6 #define OP_TYPE_MASK 0x7 // Suspend operation until there are more free SQEs @@ -221,7 +220,7 @@ struct blockstore_operation // FIXME: Move internal fields somewhere friend class blockstore; friend class blockstore_journal_check_t; - friend void prepare_journal_sector_write(blockstore_operation *op, journal_t & journal, io_uring_sqe *sqe); + friend void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function cb); private: // Wait status int wait_for; @@ -328,7 +327,6 @@ public: ~blockstore(); // Event loop - void handle_event(ring_data_t* data); void loop(); // Returns true when it's safe to destroy the instance. If destroying the instance diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index dea46c7c..bbc9addf 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -88,7 +88,10 @@ void journal_flusher_co::loop() v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); data = ((ring_data_t*)sqe->user_data); data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len }; - data->op = this; + data->callback = [this](ring_data_t* data) + { + wait_count--; + }; io_uring_prep_readv( sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset ); @@ -156,7 +159,11 @@ void journal_flusher_co::loop() } data = ((ring_data_t*)sqe->user_data); data->iov = (struct iovec){ meta_it->second.buf, 512 }; - data->op = this; + data->callback = [this](ring_data_t* data) + { + + wait_count--; + }; io_uring_prep_writev( sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector ); @@ -181,7 +188,10 @@ void journal_flusher_co::loop() } data = ((ring_data_t*)sqe->user_data); data->iov = (struct iovec){ it->buf, (size_t)it->len }; - data->op = this; + data->callback = [this](ring_data_t* data) + { + wait_count--; + }; io_uring_prep_writev( sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset ); @@ -210,7 +220,10 @@ void journal_flusher_co::loop() } data = ((ring_data_t*)sqe->user_data); data->iov = (struct iovec){ meta_it->second.buf, 512 }; - data->op = this; + data->callback = [this](ring_data_t* data) + { + wait_count--; + }; io_uring_prep_writev( sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector ); diff --git a/blockstore_flush.h b/blockstore_flush.h index 05f74448..31d37801 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -15,7 +15,7 @@ struct meta_sector_t class journal_flusher_t; // Journal flusher coroutine -class journal_flusher_co +struct journal_flusher_co { blockstore *bs; journal_flusher_t *flusher; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 08508c2f..11eeab37 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -42,6 +42,7 @@ int blockstore_init_meta::loop() metadata_buffer + (prev == 1 ? bs->metadata_buf_size : 0), bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read, }; + data->callback = [this](ring_data_t *data) { handle_event(data); }; io_uring_prep_readv(sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + metadata_read); bs->ringloop->submit(); submitted = (prev == 1 ? 2 : 1); @@ -179,6 +180,7 @@ int blockstore_init_journal::loop() } struct ring_data_t *data = ((ring_data_t*)sqe->user_data); data->iov = { journal_buffer, 512 }; + data->callback = [this](ring_data_t *data) { handle_event(data); }; io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); bs->ringloop->submit(); step = 1; @@ -211,6 +213,7 @@ int blockstore_init_journal::loop() journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0), end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, }; + data->callback = [this](ring_data_t *data) { handle_event(data); }; io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos); bs->ringloop->submit(); submitted = done_buf == 1 ? 2 : 1; diff --git a/blockstore_init.h b/blockstore_init.h index 477f5a67..9e71fc88 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -7,9 +7,9 @@ class blockstore_init_meta uint64_t metadata_read = 0; int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0; void handle_entries(struct clean_disk_entry* entries, int count); + void handle_event(ring_data_t *data); public: blockstore_init_meta(blockstore *bs); - void handle_event(ring_data_t *data); int loop(); }; @@ -24,8 +24,8 @@ class blockstore_init_journal bool wrapped = false; int submitted = 0, done_buf = 0, done_len = 0; int handle_journal_part(void *buf, uint64_t len); + void handle_event(ring_data_t *data); public: blockstore_init_journal(blockstore* bs); - void handle_event(ring_data_t *data); int loop(); }; diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index e02794d1..87685005 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -47,12 +47,12 @@ int blockstore_journal_check_t::check_available(blockstore_operation *op, int re return 1; } -void prepare_journal_sector_write(blockstore_operation *op, journal_t & journal, io_uring_sqe *sqe) +void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function cb) { journal.sector_info[journal.cur_sector].usage_count++; - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + ring_data_t *data = ((ring_data_t*)sqe->user_data); data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 }; - data->op = op; + data->callback = cb; io_uring_prep_writev( sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset ); diff --git a/blockstore_journal.h b/blockstore_journal.h index 7afda812..17e8928d 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -157,4 +157,4 @@ inline journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t } // FIXME: make inline -void prepare_journal_sector_write(blockstore_operation *op, journal_t & journal, io_uring_sqe *sqe); +void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function cb); diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 84dae115..21c3f464 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -31,7 +31,7 @@ int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled, &data->iov, 1, (IS_JOURNAL(item_state) ? journal.offset : data_offset) + item_location + cur_start - item_start ); - data->op = op; + data->callback = [this, op](ring_data_t *data) { handle_read_event(data, op); }; fulfilled += cur_end-cur_start; } return 1; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 2e926a58..b8310a33 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -85,6 +85,7 @@ int blockstore::dequeue_stable(blockstore_operation *op) BS_SUBMIT_GET_SQE_DECL(sqe[i]); } // Prepare and submit journal entries + auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; int s = 0, cur_sector = -1; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { @@ -99,7 +100,7 @@ int blockstore::dequeue_stable(blockstore_operation *op) if (cur_sector == -1) op->min_used_journal_sector = 1 + journal.cur_sector; cur_sector = journal.cur_sector; - prepare_journal_sector_write(op, journal, sqe[s++]); + prepare_journal_sector_write(journal, sqe[s++], cb); } } op->max_used_journal_sector = 1 + journal.cur_sector; diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 6e83191a..63e991bb 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -37,12 +37,13 @@ int blockstore::dequeue_sync(blockstore_operation *op) int blockstore::continue_sync(blockstore_operation *op) { + auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; if (op->sync_state == SYNC_HAS_SMALL) { // No big writes, just fsync the journal BS_SUBMIT_GET_SQE(sqe, data); io_uring_prep_fsync(sqe, journal.fd, 0); - data->op = op; + data->callback = cb; op->pending_ops = 1; op->sync_state = SYNC_JOURNAL_SYNC_SENT; } @@ -51,7 +52,7 @@ int blockstore::continue_sync(blockstore_operation *op) // 1st step: fsync data BS_SUBMIT_GET_SQE(sqe, data); io_uring_prep_fsync(sqe, data_fd, 0); - data->op = op; + data->callback = cb; op->pending_ops = 1; op->sync_state = SYNC_DATA_SYNC_SENT; } @@ -88,14 +89,14 @@ int blockstore::continue_sync(blockstore_operation *op) if (cur_sector == -1) op->min_used_journal_sector = 1 + journal.cur_sector; cur_sector = journal.cur_sector; - prepare_journal_sector_write(op, journal, sqe[s++]); + prepare_journal_sector_write(journal, sqe[s++], cb); } } op->max_used_journal_sector = 1 + journal.cur_sector; // ... And a journal fsync io_uring_prep_fsync(sqe[s], journal.fd, 0); struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data); - data->op = op; + data->callback = cb; op->pending_ops = 1 + s; op->sync_state = SYNC_JOURNAL_SYNC_SENT; } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index de5220ce..52ca1869 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -58,6 +58,7 @@ void blockstore::enqueue_write(blockstore_operation *op) // First step of the write algorithm: dequeue operation and submit initial write(s) int blockstore::dequeue_write(blockstore_operation *op) { + auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, @@ -94,7 +95,7 @@ int blockstore::dequeue_write(blockstore_operation *op) vcnt = 1; op->iov_zerofill[0] = (struct iovec){ op->buf, op->len }; } - data->op = op; + data->callback = cb; io_uring_prep_writev( sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order) ); @@ -127,12 +128,12 @@ int blockstore::dequeue_write(blockstore_operation *op) je->len = op->len; je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; - prepare_journal_sector_write(op, journal, sqe1); + prepare_journal_sector_write(journal, sqe1, cb); op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; // Prepare journal data write journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512; data2->iov = (struct iovec){ op->buf, op->len }; - data2->op = op; + data2->callback = cb; io_uring_prep_writev( sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free ); diff --git a/ringloop.cpp b/ringloop.cpp index 521fa6c0..3cc86dd8 100644 --- a/ringloop.cpp +++ b/ringloop.cpp @@ -52,7 +52,6 @@ void ring_loop_t::unregister_consumer(int number) { if (number < consumers.size()) { - consumers[number].handle_event = NULL; consumers[number].loop = NULL; } } @@ -67,14 +66,9 @@ void ring_loop_t::loop(bool sleep) while ((io_uring_peek_cqe(ring, &cqe), cqe)) { struct ring_data_t *d = (struct ring_data_t*)cqe->user_data; - if (d->source < consumers.size()) + if (d->callback) { - d->res = cqe->res; - ring_consumer_t & c = consumers[d->source]; - if (c.handle_event != NULL) - { - c.handle_event(d); - } + d->callback(d); } io_uring_cqe_seen(ring, cqe); } diff --git a/ringloop.h b/ringloop.h index 42ea265b..b2529dde 100644 --- a/ringloop.h +++ b/ringloop.h @@ -15,13 +15,12 @@ struct ring_data_t uint64_t source; struct iovec iov; // for single-entry read/write operations int res; - void *op; + std::function callback; }; struct ring_consumer_t { int number; - std::function handle_event; std::function loop; };