Compare commits

...

2 Commits

10 changed files with 95 additions and 29 deletions

View File

@ -33,6 +33,12 @@ journal_flusher_co::journal_flusher_co()
); );
} }
wait_count--; wait_count--;
if (!wait_count)
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
}; };
simple_callback_w = [this](ring_data_t* data) simple_callback_w = [this](ring_data_t* data)
{ {
@ -45,6 +51,12 @@ journal_flusher_co::journal_flusher_co()
); );
} }
wait_count--; wait_count--;
if (!wait_count)
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
}; };
} }
@ -122,6 +134,11 @@ void journal_flusher_t::release_trim()
#define await_sqe(label) \ #define await_sqe(label) \
resume_##label:\ resume_##label:\
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
sqe = bs->get_sqe();\ sqe = bs->get_sqe();\
if (!sqe)\ if (!sqe)\
{\ {\

View File

@ -62,6 +62,11 @@
struct ring_data_t *data = ((ring_data_t*)sqe->user_data) struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
#define BS_SUBMIT_GET_ONLY_SQE(sqe) \ #define BS_SUBMIT_GET_ONLY_SQE(sqe) \
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
struct io_uring_sqe *sqe = get_sqe();\ struct io_uring_sqe *sqe = get_sqe();\
if (!sqe)\ if (!sqe)\
{\ {\
@ -71,6 +76,11 @@
} }
#define BS_SUBMIT_GET_SQE_DECL(sqe) \ #define BS_SUBMIT_GET_SQE_DECL(sqe) \
{\
timespec now;\
clock_gettime(CLOCK_REALTIME, &now);\
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);\
}\
sqe = get_sqe();\ sqe = get_sqe();\
if (!sqe)\ if (!sqe)\
{\ {\

View File

@ -147,6 +147,11 @@ resume_2:
resume_3: resume_3:
if (!disable_journal_fsync) if (!disable_journal_fsync)
{ {
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = get_sqe(); io_uring_sqe *sqe = get_sqe();
if (!sqe) if (!sqe)
{ {
@ -237,6 +242,11 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *
PRIV(op)->pending_ops--; PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0) if (PRIV(op)->pending_ops == 0)
{ {
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("finished %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
PRIV(op)->op_state++; PRIV(op)->op_state++;
if (!continue_stable(op)) if (!continue_stable(op))
{ {

View File

@ -302,6 +302,11 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
return 1; return 1;
resume_2: resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry // Only for the immediate_commit mode: prepare and submit big_write journal entry
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("get_sqe %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
sqe = get_sqe(); sqe = get_sqe();
if (!sqe) if (!sqe)
{ {
@ -333,6 +338,11 @@ resume_2:
return 1; return 1;
resume_4: resume_4:
// Switch object state // Switch object state
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("write_done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state); printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
#endif #endif

View File

@ -57,6 +57,11 @@ void epoll_manager_t::set_fd_handler(int fd, std::function<void(int, int)> handl
void epoll_manager_t::handle_epoll_events() void epoll_manager_t::handle_epoll_events()
{ {
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("epoll %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = ringloop->get_sqe(); io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
{ {

View File

@ -2,17 +2,10 @@
void osd_messenger_t::read_requests() void osd_messenger_t::read_requests()
{ {
for (int i = 0; i < read_ready_clients.size(); i++) while (read_ready_clients.size() > 0)
{ {
int peer_fd = read_ready_clients[i]; int peer_fd = read_ready_clients[0];
auto & cl = clients[peer_fd]; auto & cl = clients[peer_fd];
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.read_op || cl.read_remaining < receive_buffer_size) if (!cl.read_op || cl.read_remaining < receive_buffer_size)
{ {
cl.read_iov.iov_base = cl.in_buf; cl.read_iov.iov_base = cl.in_buf;
@ -25,10 +18,19 @@ void osd_messenger_t::read_requests()
} }
cl.read_msg.msg_iov = &cl.read_iov; cl.read_msg.msg_iov = &cl.read_iov;
cl.read_msg.msg_iovlen = 1; cl.read_msg.msg_iovlen = 1;
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); }; read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + 1);
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); int result = recvmsg(peer_fd, &cl.read_msg, 0);
if (result < 0)
{
result = -errno;
}
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("recvmsg done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
handle_read(result, peer_fd);
} }
read_ready_clients.clear();
} }
bool osd_messenger_t::handle_read(int result, int peer_fd) bool osd_messenger_t::handle_read(int result, int peer_fd)

View File

@ -42,12 +42,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
bool osd_messenger_t::try_send(osd_client_t & cl) bool osd_messenger_t::try_send(osd_client_t & cl)
{ {
int peer_fd = cl.peer_fd; int peer_fd = cl.peer_fd;
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
return false;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.write_op) if (!cl.write_op)
{ {
// pick next command // pick next command
@ -84,23 +78,26 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
} }
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec(); cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size(); cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size();
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); }; int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL);
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); if (result < 0)
result = -errno;
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("sendmsg done %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
handle_send(result, peer_fd);
return true; return true;
} }
void osd_messenger_t::send_replies() void osd_messenger_t::send_replies()
{ {
for (int i = 0; i < write_ready_clients.size(); i++) while (write_ready_clients.size() > 0)
{ {
int peer_fd = write_ready_clients[i]; auto & cl = clients[write_ready_clients[0]];
if (!try_send(clients[peer_fd])) write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + 1);
{ try_send(cl);
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
return;
}
} }
write_ready_clients.clear();
} }
void osd_messenger_t::handle_send(int result, int peer_fd) void osd_messenger_t::handle_send(int result, int peer_fd)

View File

@ -241,6 +241,11 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
void osd_t::handle_epoll_events() void osd_t::handle_epoll_events()
{ {
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("epoll %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
io_uring_sqe *sqe = ringloop->get_sqe(); io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
{ {

View File

@ -198,6 +198,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
else if (op_data->st == 8) goto resume_8; else if (op_data->st == 8) goto resume_8;
else if (op_data->st == 9) goto resume_9; else if (op_data->st == 9) goto resume_9;
assert(op_data->st == 0); assert(op_data->st == 0);
printf("primary_write\n");
if (!check_write_queue(cur_op, pg)) if (!check_write_queue(cur_op, pg))
{ {
return; return;
@ -389,6 +390,7 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
else if (op_data->st == 5) goto resume_5; else if (op_data->st == 5) goto resume_5;
else if (op_data->st == 6) goto resume_6; else if (op_data->st == 6) goto resume_6;
assert(op_data->st == 0); assert(op_data->st == 0);
printf("primary_sync\n");
if (syncs_in_progress.size() > 0) if (syncs_in_progress.size() > 0)
{ {
// Wait for previous syncs, if any // Wait for previous syncs, if any

View File

@ -4,6 +4,8 @@
#define _LARGEFILE64_SOURCE #define _LARGEFILE64_SOURCE
#endif #endif
#include <stdio.h>
#include <time.h>
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
#include <liburing.h> #include <liburing.h>
@ -158,7 +160,13 @@ public:
} }
inline int submit() inline int submit()
{ {
return io_uring_submit(&ring); int r = io_uring_submit(&ring);
{
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
printf("submit %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
}
return r;
} }
inline int wait() inline int wait()
{ {