Author | SHA1 | Message | Date |
---|---|---|---|
|
735b97fe33 | Trace I/O operations (SQEs, recvmsg/sendmsg, uring_submit) | 8 months ago |
|
d56633843f | Replace io_uring sendmsg/recvmsg with synchronous sendmsg/recvmsg | 11 months ago |
@@ -33,6 +33,12 @@ journal_flusher_co::journal_flusher_co() | |||
); | |||
} | |||
wait_count--; | |||
if (!wait_count) | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
}; | |||
simple_callback_w = [this](ring_data_t* data) | |||
{ | |||
@@ -45,6 +51,12 @@ journal_flusher_co::journal_flusher_co() | |||
); | |||
} | |||
wait_count--; | |||
if (!wait_count) | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
}; | |||
} | |||
@@ -122,6 +134,11 @@ void journal_flusher_t::release_trim() | |||
#define await_sqe(label) \ | |||
resume_##label:\ | |||
{\ | |||
timespec now;\ | |||
clock_gettime(CLOCK_REALTIME, &now);\ | |||
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\ | |||
}\ | |||
sqe = bs->get_sqe();\ | |||
if (!sqe)\ | |||
{\ | |||
@@ -62,6 +62,11 @@ | |||
struct ring_data_t *data = ((ring_data_t*)sqe->user_data) | |||
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \ | |||
{\ | |||
timespec now;\ | |||
clock_gettime(CLOCK_REALTIME, &now);\ | |||
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\ | |||
}\ | |||
struct io_uring_sqe *sqe = get_sqe();\ | |||
if (!sqe)\ | |||
{\ | |||
@@ -71,6 +76,11 @@ | |||
} | |||
#define BS_SUBMIT_GET_SQE_DECL(sqe) \ | |||
{\ | |||
timespec now;\ | |||
clock_gettime(CLOCK_REALTIME, &now);\ | |||
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\ | |||
}\ | |||
sqe = get_sqe();\ | |||
if (!sqe)\ | |||
{\ | |||
@@ -147,6 +147,11 @@ resume_2: | |||
resume_3: | |||
if (!disable_journal_fsync) | |||
{ | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
io_uring_sqe *sqe = get_sqe(); | |||
if (!sqe) | |||
{ | |||
@@ -237,6 +242,11 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t * | |||
PRIV(op)->pending_ops--; | |||
if (PRIV(op)->pending_ops == 0) | |||
{ | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
PRIV(op)->op_state++; | |||
if (!continue_stable(op)) | |||
{ | |||
@@ -302,6 +302,11 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op) | |||
return 1; | |||
resume_2: | |||
// Only for the immediate_commit mode: prepare and submit big_write journal entry | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
sqe = get_sqe(); | |||
if (!sqe) | |||
{ | |||
@@ -333,6 +338,11 @@ resume_2: | |||
return 1; | |||
resume_4: | |||
// Switch object state | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("write_done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
#ifdef BLOCKSTORE_DEBUG | |||
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state); | |||
#endif | |||
@@ -57,6 +57,11 @@ void epoll_manager_t::set_fd_handler(int fd, std::function<void(int, int)> handl | |||
void epoll_manager_t::handle_epoll_events() | |||
{ | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("epoll %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
io_uring_sqe *sqe = ringloop->get_sqe(); | |||
if (!sqe) | |||
{ | |||
@@ -2,17 +2,10 @@ | |||
void osd_messenger_t::read_requests() | |||
{ | |||
for (int i = 0; i < read_ready_clients.size(); i++) | |||
while (read_ready_clients.size() > 0) | |||
{ | |||
int peer_fd = read_ready_clients[i]; | |||
int peer_fd = read_ready_clients[0]; | |||
auto & cl = clients[peer_fd]; | |||
io_uring_sqe* sqe = ringloop->get_sqe(); | |||
if (!sqe) | |||
{ | |||
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i); | |||
return; | |||
} | |||
ring_data_t* data = ((ring_data_t*)sqe->user_data); | |||
if (!cl.read_op || cl.read_remaining < receive_buffer_size) | |||
{ | |||
cl.read_iov.iov_base = cl.in_buf; | |||
@@ -25,10 +18,19 @@ void osd_messenger_t::read_requests() | |||
} | |||
cl.read_msg.msg_iov = &cl.read_iov; | |||
cl.read_msg.msg_iovlen = 1; | |||
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); }; | |||
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); | |||
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + 1); | |||
int result = recvmsg(peer_fd, &cl.read_msg, 0); | |||
if (result < 0) | |||
{ | |||
result = -errno; | |||
} | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("recvmsg done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
handle_read(result, peer_fd); | |||
} | |||
read_ready_clients.clear(); | |||
} | |||
bool osd_messenger_t::handle_read(int result, int peer_fd) | |||
@@ -42,12 +42,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) | |||
bool osd_messenger_t::try_send(osd_client_t & cl) | |||
{ | |||
int peer_fd = cl.peer_fd; | |||
io_uring_sqe* sqe = ringloop->get_sqe(); | |||
if (!sqe) | |||
{ | |||
return false; | |||
} | |||
ring_data_t* data = ((ring_data_t*)sqe->user_data); | |||
if (!cl.write_op) | |||
{ | |||
// pick next command | |||
@@ -84,23 +78,26 @@ bool osd_messenger_t::try_send(osd_client_t & cl) | |||
} | |||
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec(); | |||
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size(); | |||
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); }; | |||
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); | |||
int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL); | |||
if (result < 0) | |||
result = -errno; | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("sendmsg done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
handle_send(result, peer_fd); | |||
return true; | |||
} | |||
void osd_messenger_t::send_replies() | |||
{ | |||
for (int i = 0; i < write_ready_clients.size(); i++) | |||
while (write_ready_clients.size() > 0) | |||
{ | |||
int peer_fd = write_ready_clients[i]; | |||
if (!try_send(clients[peer_fd])) | |||
{ | |||
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i); | |||
return; | |||
} | |||
auto & cl = clients[write_ready_clients[0]]; | |||
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + 1); | |||
try_send(cl); | |||
} | |||
write_ready_clients.clear(); | |||
} | |||
void osd_messenger_t::handle_send(int result, int peer_fd) | |||
@@ -241,6 +241,11 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler) | |||
void osd_t::handle_epoll_events() | |||
{ | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("epoll %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
io_uring_sqe *sqe = ringloop->get_sqe(); | |||
if (!sqe) | |||
{ | |||
@@ -198,6 +198,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) | |||
else if (op_data->st == 8) goto resume_8; | |||
else if (op_data->st == 9) goto resume_9; | |||
assert(op_data->st == 0); | |||
printf("primary_write\n"); | |||
if (!check_write_queue(cur_op, pg)) | |||
{ | |||
return; | |||
@@ -389,6 +390,7 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op) | |||
else if (op_data->st == 5) goto resume_5; | |||
else if (op_data->st == 6) goto resume_6; | |||
assert(op_data->st == 0); | |||
printf("primary_sync\n"); | |||
if (syncs_in_progress.size() > 0) | |||
{ | |||
// Wait for previous syncs, if any | |||
@@ -4,6 +4,8 @@ | |||
#define _LARGEFILE64_SOURCE | |||
#endif | |||
#include <stdio.h> | |||
#include <time.h> | |||
#include <string.h> | |||
#include <assert.h> | |||
#include <liburing.h> | |||
@@ -158,7 +160,13 @@ public: | |||
} | |||
inline int submit() | |||
{ | |||
return io_uring_submit(&ring); | |||
int r = io_uring_submit(&ring); | |||
{ | |||
timespec now; | |||
clock_gettime(CLOCK_REALTIME, &now); | |||
printf("submit %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000); | |||
} | |||
return r; | |||
} | |||
inline int wait() | |||
{ | |||