Compare commits

...

3 Commits

  1. 8
      Makefile
  2. 21
      msgr_receive.cpp
  3. 24
      msgr_send.cpp
  4. 95
      osd.cpp
  5. 8
      osd.h
  6. 50
      timerfd_manager.cpp
  7. 10
      timerfd_manager.h

8
Makefile

@ -18,7 +18,7 @@ OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_f
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \
osd_rmw.o json11.o base64.o timerfd_manager.o
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring -lpthread
stub_osd: stub_osd.o rw_blocking.o
g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal
@ -87,7 +87,7 @@ dump_journal.o: dump_journal.cpp allocator.h blockstore.h blockstore_flush.h blo
g++ $(CXXFLAGS) -c -o $@ $<
epoll_manager.o: epoll_manager.cpp epoll_manager.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h timerfd_manager.h
etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
fio_cluster.o: fio_cluster.cpp cluster_client.h epoll_manager.h etcd_state_client.h fio/fio.h fio/optgroup.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
@ -95,7 +95,7 @@ fio_engine.o: fio_engine.cpp blockstore.h fio/fio.h fio/optgroup.h json11/json11
g++ $(CXXFLAGS) -c -o $@ $<
fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd_ops.h rw_blocking.h
g++ $(CXXFLAGS) -c -o $@ $<
http_client.o: http_client.cpp http_client.h json11/json11.hpp timerfd_manager.h
http_client.o: http_client.cpp http_client.h json11/json11.hpp ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
messenger.o: messenger.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<
@ -149,5 +149,5 @@ test_blockstore.o: test_blockstore.cpp blockstore.h object_id.h ringloop.h timer
g++ $(CXXFLAGS) -c -o $@ $<
timerfd_interval.o: timerfd_interval.cpp ringloop.h timerfd_interval.h
g++ $(CXXFLAGS) -c -o $@ $<
timerfd_manager.o: timerfd_manager.cpp timerfd_manager.h
timerfd_manager.o: timerfd_manager.cpp ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $<

21
msgr_receive.cpp

@ -2,17 +2,10 @@
void osd_messenger_t::read_requests()
{
for (int i = 0; i < read_ready_clients.size(); i++)
while (read_ready_clients.size() > 0)
{
int peer_fd = read_ready_clients[i];
int peer_fd = read_ready_clients[0];
auto & cl = clients[peer_fd];
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.read_op || cl.read_remaining < receive_buffer_size)
{
cl.read_iov.iov_base = cl.in_buf;
@ -25,10 +18,14 @@ void osd_messenger_t::read_requests()
}
cl.read_msg.msg_iov = &cl.read_iov;
cl.read_msg.msg_iovlen = 1;
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + 1);
int result = recvmsg(peer_fd, &cl.read_msg, 0);
if (result < 0)
{
result = -errno;
}
handle_read(result, peer_fd);
}
read_ready_clients.clear();
}
bool osd_messenger_t::handle_read(int result, int peer_fd)

24
msgr_send.cpp

@ -42,12 +42,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
bool osd_messenger_t::try_send(osd_client_t & cl)
{
int peer_fd = cl.peer_fd;
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
return false;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.write_op)
{
// pick next command
@ -84,23 +78,21 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
}
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size();
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0);
int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL);
if (result < 0)
result = -errno;
handle_send(result, peer_fd);
return true;
}
void osd_messenger_t::send_replies()
{
for (int i = 0; i < write_ready_clients.size(); i++)
while (write_ready_clients.size() > 0)
{
int peer_fd = write_ready_clients[i];
if (!try_send(clients[peer_fd]))
{
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
return;
}
auto & cl = clients[write_ready_clients[0]];
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + 1);
try_send(cl);
}
write_ready_clients.clear();
}
void osd_messenger_t::handle_send(int result, int peer_fd)

95
osd.cpp

@ -1,5 +1,6 @@
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/poll.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
@ -43,8 +44,14 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
{
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
}
event_fd = eventfd(0, EFD_NONBLOCK);
if (event_fd < 0)
{
throw std::runtime_error(std::string("eventfd: ") + strerror(errno));
}
this->tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); });
this->tfd = new timerfd_manager_t(ringloop);
this->tfd->set_fd_handler = [this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); };
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{
print_stats();
@ -59,17 +66,40 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer);
epoll_thread = new std::thread([this]()
{
int nfds;
epoll_event events[MAX_EPOLL_EVENTS];
while (1)
{
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, -1);
{
std::lock_guard<std::mutex> guard(epoll_mutex);
for (int i = 0; i < nfds; i++)
{
int fd = events[i].data.fd;
int ev = events[i].events;
epoll_ready[fd] |= ev;
}
uint64_t n = 1;
write(event_fd, &n, 8);
}
}
});
}
osd_t::~osd_t()
{
close(epoll_fd);
epoll_thread->join();
delete epoll_thread;
if (tfd)
{
delete tfd;
tfd = NULL;
}
ringloop->unregister_consumer(&consumer);
close(epoll_fd);
close(event_fd);
close(listen_fd);
}
@ -188,8 +218,13 @@ void osd_t::bind_socket()
{
close(listen_fd);
close(epoll_fd);
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
throw std::runtime_error(std::string("epoll_ctl (add listen_fd): ") + strerror(errno));
}
epoll_handlers[listen_fd] = [this](int peer_fd, int epoll_events)
{
c_cli.accept_connections(listen_fd);
};
}
bool osd_t::shutdown()
@ -204,10 +239,23 @@ bool osd_t::shutdown()
void osd_t::loop()
{
if (!wait_state)
std::map<int,int> cur_epoll;
{
handle_epoll_events();
wait_state = 1;
std::lock_guard<std::mutex> guard(epoll_mutex);
cur_epoll.swap(epoll_ready);
}
for (auto p: cur_epoll)
{
auto cb_it = epoll_handlers.find(p.first);
if (cb_it != epoll_handlers.end())
{
cb_it->second(p.first, p.second);
}
}
if (!(wait_state & 2))
{
handle_eventfd();
wait_state = wait_state | 2;
}
handle_peers();
c_cli.read_requests();
@ -225,7 +273,7 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
throw std::runtime_error(std::string(exists ? "epoll_ctl (mod fd): " : "epoll_ctl (add fd): ") + strerror(errno));
}
epoll_handlers[fd] = handler;
}
@ -233,49 +281,36 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
{
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
throw std::runtime_error(std::string("epoll_ctl (remove fd): ") + strerror(errno));
}
epoll_handlers.erase(fd);
}
}
void osd_t::handle_epoll_events()
void osd_t::handle_eventfd()
{
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET");
throw std::runtime_error("can't get SQE, will fall out of sync with eventfd");
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
my_uring_prep_poll_add(sqe, event_fd, POLLIN);
data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
}
handle_epoll_events();
handle_eventfd();
};
ringloop->submit();
int nfds;
epoll_event events[MAX_EPOLL_EVENTS];
restart:
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
for (int i = 0; i < nfds; i++)
{
if (events[i].data.fd == listen_fd)
{
c_cli.accept_connections(listen_fd);
}
else
{
auto & cb = epoll_handlers[events[i].data.fd];
cb(events[i].data.fd, events[i].events);
}
}
if (nfds == MAX_EPOLL_EVENTS)
uint64_t n = 0;
size_t res = read(event_fd, &n, 8);
if (res == 8)
{
goto restart;
// No need to do anything, the loop has already woken up
ringloop->wakeup();
}
}

8
osd.h

@ -12,6 +12,8 @@
#include <set>
#include <deque>
#include <mutex>
#include <thread>
#include "blockstore.h"
#include "ringloop.h"
@ -114,6 +116,10 @@ class osd_t
int wait_state = 0;
int epoll_fd = 0;
int event_fd = 0;
std::thread *epoll_thread = NULL;
std::mutex epoll_mutex;
std::map<int, int> epoll_ready;
int listening_port = 0;
int listen_fd = 0;
ring_consumer_t consumer;
@ -150,7 +156,7 @@ class osd_t
// event loop, socket read/write
void loop();
void set_fd_handler(int fd, std::function<void(int, int)> handler);
void handle_epoll_events();
void handle_eventfd();
// peer handling (primary OSD logic)
void parse_test_peer(std::string peer);

50
timerfd_manager.cpp

@ -1,29 +1,24 @@
#include <sys/timerfd.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "timerfd_manager.h"
timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler)
timerfd_manager_t::timerfd_manager_t(ring_loop_t *ringloop)
{
this->set_fd_handler = set_fd_handler;
wait_state = 0;
timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (timerfd < 0)
{
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
}
set_fd_handler(timerfd, [this](int fd, int events)
{
handle_readable();
});
consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer);
this->ringloop = ringloop;
}
timerfd_manager_t::~timerfd_manager_t()
{
set_fd_handler(timerfd, NULL);
ringloop->unregister_consumer(&consumer);
close(timerfd);
}
@ -53,6 +48,7 @@ int timerfd_manager_t::set_timer(uint64_t millis, bool repeat, std::function<voi
});
inc_timer(timers[timers.size()-1]);
set_nearest();
set_wait();
return timer_id;
}
@ -73,6 +69,7 @@ void timerfd_manager_t::clear_timer(int timer_id)
nearest--;
}
set_nearest();
set_wait();
break;
}
}
@ -157,3 +154,36 @@ void timerfd_manager_t::trigger_nearest()
cb(nearest_id);
nearest = -1;
}
void timerfd_manager_t::loop()
{
if (!(wait_state & 1) && timers.size())
{
set_nearest();
}
set_wait();
}
void timerfd_manager_t::set_wait()
{
if ((wait_state & 3) == 1)
{
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{
return;
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, timerfd, POLLIN);
data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res));
}
handle_readable();
set_wait();
};
wait_state = 3;
}
}

10
timerfd_manager.h

@ -1,8 +1,7 @@
#pragma once
#include <time.h>
#include <vector>
#include <functional>
#include "ringloop.h"
struct timerfd_timer_t
{
@ -20,15 +19,20 @@ class timerfd_manager_t
int nearest = -1;
int id = 1;
std::vector<timerfd_timer_t> timers;
ring_loop_t *ringloop;
ring_consumer_t consumer;
void inc_timer(timerfd_timer_t & t);
void set_nearest();
void trigger_nearest();
void handle_readable();
void set_wait();
void loop();
public:
// FIXME shouldn't be here
std::function<void(int, std::function<void(int, int)>)> set_fd_handler;
timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler);
timerfd_manager_t(ring_loop_t *ringloop);
~timerfd_manager_t();
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
void clear_timer(int timer_id);

Loading…
Cancel
Save