Browse Source

Copy io_uring_prep_* to my_uring_prep_* so they do not clear user_data

blocking-uring-test
Vitaliy Filippov 3 years ago
parent
commit
c2de733e35
  1. 14
      blockstore_flush.cpp
  2. 6
      blockstore_init.cpp
  3. 2
      blockstore_journal.cpp
  4. 2
      blockstore_read.cpp
  5. 6
      blockstore_sync.cpp
  6. 4
      blockstore_write.cpp
  7. 10
      ringloop.cpp
  8. 103
      ringloop.h
  9. 12
      test_blockstore.cpp

14
blockstore_flush.cpp

@ -168,7 +168,7 @@ resume_0:
v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
data->callback = simple_callback;
io_uring_prep_readv(
my_uring_prep_readv(
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset
);
wait_count++;
@ -250,7 +250,7 @@ resume_0:
meta_it->second.state = 1;
wait_count--;
};
io_uring_prep_readv(
my_uring_prep_readv(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
);
wait_count++;
@ -267,7 +267,7 @@ resume_0:
await_sqe(4);
data->iov = (struct iovec){ it->buf, (size_t)it->len };
data->callback = simple_callback;
io_uring_prep_writev(
my_uring_prep_writev(
sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
);
wait_count++;
@ -289,7 +289,7 @@ resume_0:
await_sqe(6);
data->iov = (struct iovec){ meta_it->second.buf, 512 };
data->callback = simple_callback;
io_uring_prep_writev(
my_uring_prep_writev(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
);
wait_count++;
@ -325,13 +325,13 @@ resume_0:
// Sync batch is ready. Do it.
await_sqe(9);
data->callback = simple_callback;
io_uring_prep_fsync(sqe, bs->data_fd, 0);
my_uring_prep_fsync(sqe, bs->data_fd, 0);
wait_count++;
if (bs->meta_fd != bs->data_fd)
{
await_sqe(10);
data->callback = simple_callback;
io_uring_prep_fsync(sqe, bs->meta_fd, 0);
my_uring_prep_fsync(sqe, bs->meta_fd, 0);
wait_count++;
}
wait_state = 11;
@ -423,7 +423,7 @@ resume_0:
};
((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock);
data->iov = (struct iovec){ flusher->journal_superblock, 512 };
io_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
wait_count++;
wait_state = 13;
resume_13:

6
blockstore_init.cpp

@ -43,7 +43,7 @@ int blockstore_init_meta::loop()
bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read,
};
data->callback = [this](ring_data_t *data) { handle_event(data); };
io_uring_prep_readv(sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + metadata_read);
my_uring_prep_readv(sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + metadata_read);
bs->ringloop->submit();
submitted = (prev == 1 ? 2 : 1);
prev = submitted;
@ -192,7 +192,7 @@ int blockstore_init_journal::loop()
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = { journal_buffer, 512 };
data->callback = [this](ring_data_t *data) { handle_event(data); };
io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
bs->ringloop->submit();
step = 1;
}
@ -225,7 +225,7 @@ int blockstore_init_journal::loop()
end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
};
data->callback = [this](ring_data_t *data) { handle_event(data); };
io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos);
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos);
bs->ringloop->submit();
submitted = done_buf == 1 ? 2 : 1;
}

2
blockstore_journal.cpp

@ -53,7 +53,7 @@ void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::f
ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 };
data->callback = cb;
io_uring_prep_writev(
my_uring_prep_writev(
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset
);
}

2
blockstore_read.cpp

@ -25,7 +25,7 @@ int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled,
};
// FIXME: use simple std::vector instead of map for read_vec
op->read_vec[cur_start] = data->iov;
io_uring_prep_readv(
my_uring_prep_readv(
sqe,
IS_JOURNAL(item_state) ? journal.fd : data_fd,
&data->iov, 1,

6
blockstore_sync.cpp

@ -42,7 +42,7 @@ int blockstore::continue_sync(blockstore_operation *op)
{
// No big writes, just fsync the journal
BS_SUBMIT_GET_SQE(sqe, data);
io_uring_prep_fsync(sqe, journal.fd, 0);
my_uring_prep_fsync(sqe, journal.fd, 0);
data->callback = cb;
op->pending_ops = 1;
op->sync_state = SYNC_JOURNAL_SYNC_SENT;
@ -51,7 +51,7 @@ int blockstore::continue_sync(blockstore_operation *op)
{
// 1st step: fsync data
BS_SUBMIT_GET_SQE(sqe, data);
io_uring_prep_fsync(sqe, data_fd, 0);
my_uring_prep_fsync(sqe, data_fd, 0);
data->callback = cb;
op->pending_ops = 1;
op->sync_state = SYNC_DATA_SYNC_SENT;
@ -96,7 +96,7 @@ int blockstore::continue_sync(blockstore_operation *op)
}
op->max_used_journal_sector = 1 + journal.cur_sector;
// ... And a journal fsync
io_uring_prep_fsync(sqe[s], journal.fd, 0);
my_uring_prep_fsync(sqe[s], journal.fd, 0);
struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data);
data->callback = cb;
op->pending_ops = 1 + s;

4
blockstore_write.cpp

@ -97,7 +97,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
op->iov_zerofill[0] = (struct iovec){ op->buf, op->len };
}
data->callback = cb;
io_uring_prep_writev(
my_uring_prep_writev(
sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order)
);
op->pending_ops = 1;
@ -137,7 +137,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512;
data2->iov = (struct iovec){ op->buf, op->len };
data2->callback = cb;
io_uring_prep_writev(
my_uring_prep_writev(
sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free
);
dirty_it->second.location = journal.next_free;

10
ringloop.cpp

@ -20,16 +20,6 @@ ring_loop_t::~ring_loop_t()
io_uring_queue_exit(&ring);
}
struct io_uring_sqe* ring_loop_t::get_sqe()
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
if (sqe)
{
io_uring_sqe_set_data(sqe, ring_data + (sqe - ring.sq.sqes));
}
return sqe;
}
int ring_loop_t::register_consumer(ring_consumer_t & consumer)
{
consumer.number = consumers.size();

103
ringloop.h

@ -10,6 +10,99 @@
#include <functional>
#include <vector>
static inline void my_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd, const void *addr, unsigned len, off_t offset)
{
sqe->opcode = op;
sqe->flags = 0;
sqe->ioprio = 0;
sqe->fd = fd;
sqe->off = offset;
sqe->addr = (unsigned long) addr;
sqe->len = len;
sqe->rw_flags = 0;
sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
}
static inline void my_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
{
my_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset);
}
static inline void my_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset, int buf_index)
{
my_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
sqe->buf_index = buf_index;
}
static inline void my_uring_prep_writev(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
{
my_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset);
}
static inline void my_uring_prep_write_fixed(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned nbytes, off_t offset, int buf_index)
{
my_uring_prep_rw(IORING_OP_WRITE_FIXED, sqe, fd, buf, nbytes, offset);
sqe->buf_index = buf_index;
}
static inline void my_uring_prep_recvmsg(struct io_uring_sqe *sqe, int fd, struct msghdr *msg, unsigned flags)
{
my_uring_prep_rw(IORING_OP_RECVMSG, sqe, fd, msg, 1, 0);
sqe->msg_flags = flags;
}
static inline void my_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd, const struct msghdr *msg, unsigned flags)
{
my_uring_prep_rw(IORING_OP_SENDMSG, sqe, fd, msg, 1, 0);
sqe->msg_flags = flags;
}
static inline void my_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, short poll_mask)
{
my_uring_prep_rw(IORING_OP_POLL_ADD, sqe, fd, NULL, 0, 0);
sqe->poll_events = poll_mask;
}
static inline void my_uring_prep_poll_remove(struct io_uring_sqe *sqe, void *user_data)
{
my_uring_prep_rw(IORING_OP_POLL_REMOVE, sqe, 0, user_data, 0, 0);
}
static inline void my_uring_prep_fsync(struct io_uring_sqe *sqe, int fd, unsigned fsync_flags)
{
my_uring_prep_rw(IORING_OP_FSYNC, sqe, fd, NULL, 0, 0);
sqe->fsync_flags = fsync_flags;
}
static inline void my_uring_prep_nop(struct io_uring_sqe *sqe)
{
my_uring_prep_rw(IORING_OP_NOP, sqe, 0, NULL, 0, 0);
}
static inline void my_uring_prep_timeout(struct io_uring_sqe *sqe, struct __kernel_timespec *ts, unsigned count, unsigned flags)
{
my_uring_prep_rw(IORING_OP_TIMEOUT, sqe, 0, ts, 1, count);
sqe->timeout_flags = flags;
}
static inline void my_uring_prep_timeout_remove(struct io_uring_sqe *sqe, __u64 user_data, unsigned flags)
{
my_uring_prep_rw(IORING_OP_TIMEOUT_REMOVE, sqe, 0, (void *)user_data, 0, 0);
sqe->timeout_flags = flags;
}
static inline void my_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
{
my_uring_prep_rw(IORING_OP_ACCEPT, sqe, fd, addr, 0, (__u64) addrlen);
sqe->accept_flags = flags;
}
static inline void my_uring_prep_cancel(struct io_uring_sqe *sqe, void *user_data, int flags)
{
my_uring_prep_rw(IORING_OP_ASYNC_CANCEL, sqe, 0, user_data, 0, 0);
sqe->cancel_flags = flags;
}
struct ring_data_t
{
struct iovec iov; // for single-entry read/write operations
@ -31,7 +124,15 @@ public:
struct io_uring ring;
ring_loop_t(int qd);
~ring_loop_t();
struct io_uring_sqe* get_sqe();
inline struct io_uring_sqe* get_sqe()
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
if (sqe)
{
io_uring_sqe_set_data(sqe, ring_data + (sqe - ring.sq.sqes));
}
return sqe;
}
int register_consumer(ring_consumer_t & consumer);
void unregister_consumer(int number);
void loop(bool sleep);

12
test_blockstore.cpp

@ -51,11 +51,17 @@ public:
return;
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
io_uring_prep_poll_add(sqe, timerfd, POLLIN);
my_uring_prep_poll_add(sqe, timerfd, POLLIN);
data->callback = [&](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res));
}
uint64_t n;
read(timerfd, &n, 8);
wait_state = 0;
printf("tick\n");
printf("tick 1s\n");
};
wait_state = 1;
ringloop->submit();
@ -69,7 +75,7 @@ int main(int narg, char *args[])
config["journal_device"] = "./test_journal.bin";
config["data_device"] = "./test_data.bin";
ring_loop_t *ringloop = new ring_loop_t(512);
// print "tick" each second
// print "tick" every second
timerfd_interval tick_tfd(ringloop, 1);
while (true)
{

Loading…
Cancel
Save