Fix submission ring overflow & ring_data_t reuse conflicts
parent
4fb0579b1b
commit
19abe6227e
|
@ -37,7 +37,7 @@ journal_flusher_co::journal_flusher_co()
|
|||
{
|
||||
throw std::runtime_error(
|
||||
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
|
||||
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
|
||||
"). state "+std::to_string(wait_state)+". in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
|
||||
);
|
||||
}
|
||||
wait_count--;
|
||||
|
@ -443,7 +443,6 @@ bool journal_flusher_co::loop()
|
|||
{
|
||||
// Update journal "superblock"
|
||||
await_sqe(12);
|
||||
data->callback = simple_callback_w;
|
||||
*((journal_entry_start*)flusher->journal_superblock) = {
|
||||
.crc32 = 0,
|
||||
.magic = JOURNAL_MAGIC,
|
||||
|
@ -454,6 +453,7 @@ bool journal_flusher_co::loop()
|
|||
};
|
||||
((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock);
|
||||
data->iov = (struct iovec){ flusher->journal_superblock, 512 };
|
||||
data->callback = simple_callback_w;
|
||||
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
|
||||
wait_count++;
|
||||
resume_13:
|
||||
|
@ -611,8 +611,8 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)
|
|||
{
|
||||
// Sync batch is ready. Do it.
|
||||
await_sqe(0);
|
||||
data->callback = simple_callback_w;
|
||||
data->iov = { 0 };
|
||||
data->callback = simple_callback_w;
|
||||
my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC);
|
||||
cur_sync->state = 1;
|
||||
wait_count++;
|
||||
|
|
|
@ -131,8 +131,8 @@ void blockstore_impl_t::loop()
|
|||
continue;
|
||||
}
|
||||
}
|
||||
unsigned ring_space = io_uring_sq_space_left(&ringloop->ring);
|
||||
unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail;
|
||||
unsigned ring_space = ringloop->space_left();
|
||||
unsigned prev_sqe_pos = ringloop->save();
|
||||
int dequeue_op = 0;
|
||||
if ((op->flags & OP_TYPE_MASK) == OP_READ)
|
||||
{
|
||||
|
@ -172,7 +172,7 @@ void blockstore_impl_t::loop()
|
|||
}
|
||||
else
|
||||
{
|
||||
ringloop->ring.sq.sqe_tail = prev_sqe_pos;
|
||||
ringloop->restore(prev_sqe_pos);
|
||||
if (PRIV(op)->wait_for == WAIT_SQE)
|
||||
{
|
||||
PRIV(op)->wait_detail = 1 + ring_space;
|
||||
|
@ -225,7 +225,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
|
|||
{
|
||||
if (PRIV(op)->wait_for == WAIT_SQE)
|
||||
{
|
||||
if (io_uring_sq_space_left(&ringloop->ring) < PRIV(op)->wait_detail)
|
||||
if (ringloop->space_left() < PRIV(op)->wait_detail)
|
||||
{
|
||||
// stop submission if there's still no free space
|
||||
return;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "blockstore.h"
|
||||
#include "timerfd_interval.h"
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/ioctl.h>
|
||||
|
|
|
@ -133,7 +133,7 @@ static void bs_cleanup(struct thread_data *td)
|
|||
bsd->ringloop->loop();
|
||||
if (bsd->bs->is_safe_to_stop())
|
||||
goto safe;
|
||||
} while (bsd->ringloop->loop_again);
|
||||
} while (bsd->ringloop->get_loop_again());
|
||||
bsd->ringloop->wait();
|
||||
}
|
||||
safe:
|
||||
|
|
9
osd.cpp
9
osd.cpp
|
@ -194,8 +194,6 @@ void osd_t::stop_client(int peer_fd)
|
|||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||
}
|
||||
auto it = clients.find(peer_fd);
|
||||
if (it->second.read_ready)
|
||||
{
|
||||
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
|
||||
{
|
||||
if (*rit == peer_fd)
|
||||
|
@ -204,6 +202,13 @@ void osd_t::stop_client(int peer_fd)
|
|||
break;
|
||||
}
|
||||
}
|
||||
for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++)
|
||||
{
|
||||
if (*wit == peer_fd)
|
||||
{
|
||||
write_ready_clients.erase(wit);
|
||||
break;
|
||||
}
|
||||
}
|
||||
clients.erase(it);
|
||||
close(peer_fd);
|
||||
|
|
29
ringloop.cpp
29
ringloop.cpp
|
@ -7,16 +7,23 @@ ring_loop_t::ring_loop_t(int qd)
|
|||
{
|
||||
throw std::runtime_error(std::string("io_uring_queue_init: ") + strerror(-ret));
|
||||
}
|
||||
ring_data = (struct ring_data_t*)malloc(sizeof(ring_data_t) * ring.sq.ring_sz);
|
||||
if (!ring_data)
|
||||
free_ring_data_ptr = *ring.cq.kring_entries;
|
||||
ring_datas = (struct ring_data_t*)malloc(sizeof(ring_data_t) * free_ring_data_ptr);
|
||||
free_ring_data = (int*)malloc(sizeof(int) * free_ring_data_ptr);
|
||||
if (!ring_datas || !free_ring_data)
|
||||
{
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
for (int i = 0; i < free_ring_data_ptr; i++)
|
||||
{
|
||||
free_ring_data[i] = i;
|
||||
}
|
||||
}
|
||||
|
||||
ring_loop_t::~ring_loop_t()
|
||||
{
|
||||
free(ring_data);
|
||||
free(free_ring_data);
|
||||
free(ring_datas);
|
||||
io_uring_queue_exit(&ring);
|
||||
}
|
||||
|
||||
|
@ -52,6 +59,7 @@ void ring_loop_t::loop()
|
|||
d->res = cqe->res;
|
||||
d->callback(d);
|
||||
}
|
||||
free_ring_data[free_ring_data_ptr++] = d - ring_datas;
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
}
|
||||
do
|
||||
|
@ -63,3 +71,18 @@ void ring_loop_t::loop()
|
|||
}
|
||||
} while (loop_again);
|
||||
}
|
||||
|
||||
unsigned ring_loop_t::save()
|
||||
{
|
||||
return ring.sq.sqe_tail;
|
||||
}
|
||||
|
||||
void ring_loop_t::restore(unsigned sqe_tail)
|
||||
{
|
||||
assert(ring.sq.sqe_tail >= sqe_tail);
|
||||
for (unsigned i = sqe_tail; i < ring.sq.sqe_tail; i++)
|
||||
{
|
||||
free_ring_data[free_ring_data_ptr++] = ((ring_data_t*)ring.sq.sqes[i & *ring.sq.kring_mask].user_data) - ring_datas;
|
||||
}
|
||||
ring.sq.sqe_tail = sqe_tail;
|
||||
}
|
||||
|
|
37
ringloop.h
37
ringloop.h
|
@ -4,8 +4,9 @@
|
|||
#define _LARGEFILE64_SOURCE
|
||||
#endif
|
||||
|
||||
#include <liburing.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <liburing.h>
|
||||
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
|
@ -119,26 +120,26 @@ struct ring_consumer_t
|
|||
class ring_loop_t
|
||||
{
|
||||
std::vector<ring_consumer_t> consumers;
|
||||
struct ring_data_t *ring_data;
|
||||
public:
|
||||
struct ring_data_t *ring_datas;
|
||||
int *free_ring_data;
|
||||
unsigned free_ring_data_ptr;
|
||||
bool loop_again;
|
||||
struct io_uring ring;
|
||||
public:
|
||||
ring_loop_t(int qd);
|
||||
~ring_loop_t();
|
||||
int register_consumer(ring_consumer_t & consumer);
|
||||
void unregister_consumer(ring_consumer_t & consumer);
|
||||
|
||||
inline struct io_uring_sqe* get_sqe()
|
||||
{
|
||||
// FIXME: Limit inflight ops count to not overflow the completion ring
|
||||
if (free_ring_data_ptr == 0)
|
||||
return NULL;
|
||||
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
|
||||
if (sqe)
|
||||
{
|
||||
io_uring_sqe_set_data(sqe, ring_data + (sqe - ring.sq.sqes));
|
||||
}
|
||||
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
|
||||
return sqe;
|
||||
}
|
||||
int register_consumer(ring_consumer_t & consumer);
|
||||
void wakeup();
|
||||
void unregister_consumer(ring_consumer_t & consumer);
|
||||
void loop();
|
||||
inline int submit()
|
||||
{
|
||||
return io_uring_submit(&ring);
|
||||
|
@ -148,4 +149,18 @@ public:
|
|||
struct io_uring_cqe *cqe;
|
||||
return io_uring_wait_cqe(&ring, &cqe);
|
||||
}
|
||||
inline unsigned space_left()
|
||||
{
|
||||
return free_ring_data_ptr;
|
||||
}
|
||||
inline bool get_loop_again()
|
||||
{
|
||||
return loop_again;
|
||||
}
|
||||
|
||||
void loop();
|
||||
void wakeup();
|
||||
|
||||
unsigned save();
|
||||
void restore(unsigned sqe_tail);
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue