diff --git a/Makefile b/Makefile index 8a8cd180..65afb203 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,15 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ - blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o + blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so clean: rm -f *.o crc32c.o: crc32c.c g++ -fPIC -c -o $@ $< -%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h +%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h g++ -g -Wall -Wno-sign-compare -Wno-parentheses -fPIC -c -o $@ $< test: test.cpp g++ -g -O3 -o test -luring test.cpp test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp g++ -g -o test_blockstore -luring test_blockstore.cpp $(BLOCKSTORE_OBJS) libfio_blockstore.so: fio_engine.cpp $(BLOCKSTORE_OBJS) - g++ -Wno-pointer-arith -fPIC -shared -luring -o libfio_blockstore.so fio_engine.cpp $(BLOCKSTORE_OBJS) + g++ -g -Wno-pointer-arith -fPIC -shared -luring -o libfio_blockstore.so fio_engine.cpp $(BLOCKSTORE_OBJS) diff --git a/fio_engine.cpp b/fio_engine.cpp index 7034d37f..f58ca7f1 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -43,7 +43,7 @@ static int bs_setup(struct thread_data *td) int r; //int64_t size; - bsd = (bs_data*)calloc(1, sizeof(*bsd)); + bsd = new bs_data; if (!bsd) { td_verror(td, errno, "calloc"); @@ -71,7 +71,7 @@ static void bs_cleanup(struct thread_data *td) if (bsd) { - free(bsd); + delete bsd; } } @@ -88,9 +88,12 @@ static int bs_init(struct thread_data *td) config["data_device"] = "./test_data.bin"; bsd->ringloop = new ring_loop_t(512); bsd->bs = new blockstore(config, bsd->ringloop); - while (!bsd->bs->is_started()) + while (1) { bsd->ringloop->loop(); + if (bsd->bs->is_started()) + break; + bsd->ringloop->wait(); } log_info("fio: blockstore initialized\n"); @@ -122,8 +125,9 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u) }; op->offset = io_u->offset % bsd->bs->block_size; op->len = io_u->xfer_buflen; - op->callback = [&](blockstore_operation *op) + op->callback = [io_u](blockstore_operation *op) { + bs_data *bsd = (bs_data*)io_u->engine_data; bsd->completed.push_back(io_u); delete op; }; @@ -137,16 +141,18 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u) }; op->offset = io_u->offset % bsd->bs->block_size; op->len = io_u->xfer_buflen; - op->callback = [&](blockstore_operation *op) + op->callback = [io_u](blockstore_operation *op) { + bs_data *bsd = (bs_data*)io_u->engine_data; bsd->completed.push_back(io_u); delete op; }; break; case DDIR_SYNC: op->flags = OP_SYNC; - op->callback = [&](blockstore_operation *op) + op->callback = [io_u](blockstore_operation *op) { + bs_data *bsd = (bs_data*)io_u->engine_data; if (bsd->bs->unstable_writes.size() > 0) { op->flags = OP_STABLE; @@ -162,8 +168,9 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u) }; } bsd->bs->enqueue_op(op); - op->callback = [&](blockstore_operation *op) + op->callback = [io_u](blockstore_operation *op) { + bs_data *bsd = (bs_data*)io_u->engine_data; bsd->completed.push_back(io_u); obj_ver_id *vers = (obj_ver_id*)op->buf; delete[] vers; @@ -192,9 +199,12 @@ static int bs_getevents(struct thread_data *td, unsigned int min, unsigned int m { bs_data *bsd = (bs_data*)td->io_ops_data; // FIXME timeout - while (bsd->completed.size() < min) + while (true) { bsd->ringloop->loop(); + if (bsd->completed.size() >= min) + break; + bsd->ringloop->wait(); } return bsd->completed.size(); } diff --git a/ringloop.cpp b/ringloop.cpp index cf495fa0..509366c2 100644 --- a/ringloop.cpp +++ b/ringloop.cpp @@ -62,5 +62,4 @@ void ring_loop_t::loop() consumers[i].loop(); } } while (loop_again); - io_uring_wait_cqe(&ring, &cqe); } diff --git a/ringloop.h b/ringloop.h index 3701cc7a..8201a1e8 100644 --- a/ringloop.h +++ b/ringloop.h @@ -142,4 +142,9 @@ public: { return io_uring_submit(&ring); } + inline int wait() + { + struct io_uring_cqe *cqe; + return io_uring_wait_cqe(&ring, &cqe); + } }; diff --git a/test_blockstore.cpp b/test_blockstore.cpp index 8afdf0cf..fdd5ea58 100644 --- a/test_blockstore.cpp +++ b/test_blockstore.cpp @@ -1,75 +1,7 @@ -#include -#include #include +#include "timerfd_interval.h" #include "blockstore.h" -class timerfd_interval -{ - int wait_state; - int timerfd; - int status; - ring_loop_t *ringloop; - ring_consumer_t consumer; - std::function callback; -public: - timerfd_interval(ring_loop_t *ringloop, int seconds, std::function cb) - { - wait_state = 0; - timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); - if (timerfd < 0) - { - throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno)); - } - struct itimerspec exp = { - .it_interval = { seconds, 0 }, - .it_value = { seconds, 0 }, - }; - if (timerfd_settime(timerfd, 0, &exp, NULL)) - { - throw std::runtime_error(std::string("timerfd_settime: ") + strerror(errno)); - } - consumer.loop = [this]() { loop(); }; - ringloop->register_consumer(consumer); - this->ringloop = ringloop; - this->callback = cb; - } - - ~timerfd_interval() - { - ringloop->unregister_consumer(consumer); - close(timerfd); - } - - void loop() - { - if (wait_state == 1) - { - return; - } - struct io_uring_sqe *sqe = ringloop->get_sqe(); - if (!sqe) - { - wait_state = 0; - return; - } - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); - my_uring_prep_poll_add(sqe, timerfd, POLLIN); - data->callback = [&](ring_data_t *data) - { - if (data->res < 0) - { - throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res)); - } - uint64_t n; - read(timerfd, &n, 8); - wait_state = 0; - callback(); - }; - wait_state = 1; - ringloop->submit(); - } -}; - int main(int narg, char *args[]) { spp::sparse_hash_map config; @@ -141,6 +73,7 @@ int main(int narg, char *args[]) while (1) { ringloop->loop(); + ringloop->wait(); } delete bs; delete ringloop; diff --git a/timerfd_interval.cpp b/timerfd_interval.cpp new file mode 100644 index 00000000..a7903b5f --- /dev/null +++ b/timerfd_interval.cpp @@ -0,0 +1,61 @@ +#include +#include +#include +#include "timerfd_interval.h" + +timerfd_interval::timerfd_interval(ring_loop_t *ringloop, int seconds, std::function cb) +{ + wait_state = 0; + timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + if (timerfd < 0) + { + throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno)); + } + struct itimerspec exp = { + .it_interval = { seconds, 0 }, + .it_value = { seconds, 0 }, + }; + if (timerfd_settime(timerfd, 0, &exp, NULL)) + { + throw std::runtime_error(std::string("timerfd_settime: ") + strerror(errno)); + } + consumer.loop = [this]() { loop(); }; + ringloop->register_consumer(consumer); + this->ringloop = ringloop; + this->callback = cb; +} + +timerfd_interval::~timerfd_interval() +{ + ringloop->unregister_consumer(consumer); + close(timerfd); +} + +void timerfd_interval::loop() +{ + if (wait_state == 1) + { + return; + } + struct io_uring_sqe *sqe = ringloop->get_sqe(); + if (!sqe) + { + wait_state = 0; + return; + } + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + my_uring_prep_poll_add(sqe, timerfd, POLLIN); + data->callback = [&](ring_data_t *data) + { + if (data->res < 0) + { + throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res)); + } + uint64_t n; + read(timerfd, &n, 8); + wait_state = 0; + callback(); + }; + wait_state = 1; + ringloop->submit(); +} diff --git a/timerfd_interval.h b/timerfd_interval.h new file mode 100644 index 00000000..84d9587c --- /dev/null +++ b/timerfd_interval.h @@ -0,0 +1,17 @@ +#pragma once + +#include "ringloop.h" + +class timerfd_interval +{ + int wait_state; + int timerfd; + int status; + ring_loop_t *ringloop; + ring_consumer_t consumer; + std::function callback; +public: + timerfd_interval(ring_loop_t *ringloop, int seconds, std::function cb); + ~timerfd_interval(); + void loop(); +};