parent
226f5a2945
commit
437dc5b630
@ -0,0 +1,89 @@ |
||||
#include <sys/epoll.h> |
||||
#include <sys/poll.h> |
||||
#include <unistd.h> |
||||
|
||||
#include "epoll_manager.h" |
||||
|
||||
#define MAX_EPOLL_EVENTS 64 |
||||
|
||||
epoll_manager_t::epoll_manager_t(ring_loop_t *ringloop) |
||||
{ |
||||
this->ringloop = ringloop; |
||||
|
||||
epoll_fd = epoll_create(1); |
||||
if (epoll_fd < 0) |
||||
{ |
||||
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); |
||||
} |
||||
|
||||
tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); }); |
||||
|
||||
handle_epoll_events(); |
||||
} |
||||
|
||||
epoll_manager_t::~epoll_manager_t() |
||||
{ |
||||
if (tfd) |
||||
{ |
||||
delete tfd; |
||||
tfd = NULL; |
||||
} |
||||
close(epoll_fd); |
||||
} |
||||
|
||||
void epoll_manager_t::set_fd_handler(int fd, std::function<void(int, int)> handler) |
||||
{ |
||||
if (handler != NULL) |
||||
{ |
||||
bool exists = epoll_handlers.find(fd) != epoll_handlers.end(); |
||||
epoll_event ev; |
||||
ev.data.fd = fd; |
||||
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; |
||||
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) |
||||
{ |
||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); |
||||
} |
||||
epoll_handlers[fd] = handler; |
||||
} |
||||
else |
||||
{ |
||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT) |
||||
{ |
||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); |
||||
} |
||||
epoll_handlers.erase(fd); |
||||
} |
||||
} |
||||
|
||||
void epoll_manager_t::handle_epoll_events() |
||||
{ |
||||
io_uring_sqe *sqe = ringloop->get_sqe(); |
||||
if (!sqe) |
||||
{ |
||||
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET"); |
||||
} |
||||
ring_data_t *data = ((ring_data_t*)sqe->user_data); |
||||
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); |
||||
data->callback = [this](ring_data_t *data) |
||||
{ |
||||
if (data->res < 0) |
||||
{ |
||||
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); |
||||
} |
||||
handle_epoll_events(); |
||||
}; |
||||
ringloop->submit(); |
||||
int nfds; |
||||
epoll_event events[MAX_EPOLL_EVENTS]; |
||||
restart: |
||||
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0); |
||||
for (int i = 0; i < nfds; i++) |
||||
{ |
||||
auto & cb = epoll_handlers[events[i].data.fd]; |
||||
cb(events[i].data.fd, events[i].events); |
||||
} |
||||
if (nfds == MAX_EPOLL_EVENTS) |
||||
{ |
||||
goto restart; |
||||
} |
||||
} |
@ -0,0 +1,20 @@ |
||||
#pragma once |
||||
|
||||
#include <map> |
||||
|
||||
#include "ringloop.h" |
||||
#include "timerfd_manager.h" |
||||
|
||||
class epoll_manager_t |
||||
{ |
||||
int epoll_fd; |
||||
ring_loop_t *ringloop; |
||||
std::map<int, std::function<void(int, int)>> epoll_handlers; |
||||
public: |
||||
epoll_manager_t(ring_loop_t *ringloop); |
||||
~epoll_manager_t(); |
||||
void set_fd_handler(int fd, std::function<void(int, int)> handler); |
||||
void handle_epoll_events(); |
||||
|
||||
timerfd_manager_t *tfd; |
||||
}; |
@ -0,0 +1,298 @@ |
||||
// FIO engine to test cluster I/O
|
||||
//
|
||||
// Random write:
|
||||
//
|
||||
// fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \
|
||||
// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M
|
||||
//
|
||||
// Linear write:
|
||||
//
|
||||
// fio -thread -ioengine=./libfio_cluster.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \
|
||||
// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M
|
||||
//
|
||||
// Random read (run with -iodepth=32 or -iodepth=1):
|
||||
//
|
||||
// fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \
|
||||
// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M
|
||||
|
||||
#include <sys/types.h> |
||||
#include <sys/socket.h> |
||||
#include <netinet/in.h> |
||||
#include <netinet/tcp.h> |
||||
|
||||
#include <vector> |
||||
#include <unordered_map> |
||||
|
||||
#include "epoll_manager.h" |
||||
#include "cluster_client.h" |
||||
extern "C" { |
||||
#define CONFIG_HAVE_GETTID |
||||
#define CONFIG_PWRITEV2 |
||||
#include "fio/fio.h" |
||||
#include "fio/optgroup.h" |
||||
} |
||||
|
||||
struct sec_data |
||||
{ |
||||
ring_loop_t *ringloop = NULL; |
||||
epoll_manager_t *epmgr = NULL; |
||||
cluster_client_t *cli = NULL; |
||||
bool last_sync = false; |
||||
/* The list of completed io_u structs. */ |
||||
std::vector<io_u*> completed; |
||||
uint64_t op_n = 0, inflight = 0; |
||||
bool trace = false; |
||||
}; |
||||
|
||||
struct sec_options |
||||
{ |
||||
int __pad; |
||||
char *etcd_host = NULL; |
||||
char *etcd_prefix = NULL; |
||||
int inode = 0; |
||||
int trace = 0; |
||||
}; |
||||
|
||||
static struct fio_option options[] = { |
||||
{ |
||||
.name = "etcd", |
||||
.lname = "etcd address", |
||||
.type = FIO_OPT_STR_STORE, |
||||
.off1 = offsetof(struct sec_options, etcd_host), |
||||
.help = "etcd address in the form HOST:PORT[/PATH]", |
||||
.category = FIO_OPT_C_ENGINE, |
||||
.group = FIO_OPT_G_FILENAME, |
||||
}, |
||||
{ |
||||
.name = "etcd", |
||||
.lname = "etcd key prefix", |
||||
.type = FIO_OPT_STR_STORE, |
||||
.off1 = offsetof(struct sec_options, etcd_prefix), |
||||
.help = "etcd key prefix, by default /microceph", |
||||
.category = FIO_OPT_C_ENGINE, |
||||
.group = FIO_OPT_G_FILENAME, |
||||
}, |
||||
{ |
||||
.name = "inode", |
||||
.lname = "inode to run tests on", |
||||
.type = FIO_OPT_INT, |
||||
.off1 = offsetof(struct sec_options, inode), |
||||
.help = "inode to run tests on (1 by default)", |
||||
.category = FIO_OPT_C_ENGINE, |
||||
.group = FIO_OPT_G_FILENAME, |
||||
}, |
||||
{ |
||||
.name = "osd_trace", |
||||
.lname = "OSD trace", |
||||
.type = FIO_OPT_BOOL, |
||||
.off1 = offsetof(struct sec_options, trace), |
||||
.help = "Trace OSD operations", |
||||
.def = "0", |
||||
.category = FIO_OPT_C_ENGINE, |
||||
.group = FIO_OPT_G_FILENAME, |
||||
}, |
||||
{ |
||||
.name = NULL, |
||||
}, |
||||
}; |
||||
|
||||
static int sec_setup(struct thread_data *td) |
||||
{ |
||||
sec_data *bsd; |
||||
|
||||
bsd = new sec_data; |
||||
if (!bsd) |
||||
{ |
||||
td_verror(td, errno, "calloc"); |
||||
return 1; |
||||
} |
||||
td->io_ops_data = bsd; |
||||
|
||||
if (!td->files_index) |
||||
{ |
||||
add_file(td, "osd_cluster", 0, 0); |
||||
td->o.nr_files = td->o.nr_files ? : 1; |
||||
td->o.open_files++; |
||||
} |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
static void sec_cleanup(struct thread_data *td) |
||||
{ |
||||
sec_data *bsd = (sec_data*)td->io_ops_data; |
||||
if (bsd) |
||||
{ |
||||
delete bsd->cli; |
||||
delete bsd->epmgr; |
||||
delete bsd->ringloop; |
||||
bsd->cli = NULL; |
||||
bsd->epmgr = NULL; |
||||
bsd->ringloop = NULL; |
||||
} |
||||
} |
||||
|
||||
/* Connect to the server from each thread. */ |
||||
static int sec_init(struct thread_data *td) |
||||
{ |
||||
sec_options *o = (sec_options*)td->eo; |
||||
sec_data *bsd = (sec_data*)td->io_ops_data; |
||||
|
||||
json11::Json cfg = json11::Json::object { |
||||
{ "etcd_address", std::string(o->etcd_host) }, |
||||
{ "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/microceph") }, |
||||
}; |
||||
|
||||
bsd->ringloop = new ring_loop_t(512); |
||||
bsd->epmgr = new epoll_manager_t(bsd->ringloop); |
||||
bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg); |
||||
|
||||
bsd->trace = o->trace ? true : false; |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
/* Begin read or write request. */ |
||||
static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) |
||||
{ |
||||
sec_options *opt = (sec_options*)td->eo; |
||||
sec_data *bsd = (sec_data*)td->io_ops_data; |
||||
int n = bsd->op_n; |
||||
|
||||
fio_ro_check(td, io); |
||||
if (io->ddir == DDIR_SYNC && bsd->last_sync) |
||||
{ |
||||
return FIO_Q_COMPLETED; |
||||
} |
||||
|
||||
io->engine_data = bsd; |
||||
cluster_op_t *op = new cluster_op_t; |
||||
|
||||
switch (io->ddir) |
||||
{ |
||||
case DDIR_READ: |
||||
op->opcode = OSD_OP_READ; |
||||
op->inode = opt->inode; |
||||
op->offset = io->offset; |
||||
op->len = io->xfer_buflen; |
||||
op->buf = io->xfer_buf; |
||||
bsd->last_sync = false; |
||||
break; |
||||
case DDIR_WRITE: |
||||
op->opcode = OSD_OP_WRITE; |
||||
op->inode = opt->inode; |
||||
op->offset = io->offset; |
||||
op->len = io->xfer_buflen; |
||||
op->buf = io->xfer_buf; |
||||
bsd->last_sync = false; |
||||
break; |
||||
case DDIR_SYNC: |
||||
op->opcode = OSD_OP_SYNC; |
||||
bsd->last_sync = true; |
||||
break; |
||||
default: |
||||
io->error = EINVAL; |
||||
return FIO_Q_COMPLETED; |
||||
} |
||||
|
||||
op->callback = [io, n](cluster_op_t *op) |
||||
{ |
||||
io->error = op->retval < 0 ? -op->retval : 0; |
||||
sec_data *bsd = (sec_data*)io->engine_data; |
||||
bsd->inflight--; |
||||
bsd->completed.push_back(io); |
||||
if (bsd->trace) |
||||
{ |
||||
printf("--- %s n=%d retval=%d\n", io->ddir == DDIR_READ ? "READ" : |
||||
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n, op->retval); |
||||
} |
||||
delete op; |
||||
}; |
||||
|
||||
if (opt->trace) |
||||
{ |
||||
printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" : |
||||
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n); |
||||
} |
||||
|
||||
io->error = 0; |
||||
bsd->inflight++; |
||||
bsd->op_n++; |
||||
bsd->cli->execute(op); |
||||
|
||||
if (io->error != 0) |
||||
return FIO_Q_COMPLETED; |
||||
return FIO_Q_QUEUED; |
||||
} |
||||
|
||||
static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) |
||||
{ |
||||
sec_data *bsd = (sec_data*)td->io_ops_data; |
||||
while (true) |
||||
{ |
||||
bsd->ringloop->loop(); |
||||
if (bsd->completed.size() >= min) |
||||
break; |
||||
bsd->ringloop->wait(); |
||||
} |
||||
return bsd->completed.size(); |
||||
} |
||||
|
||||
static struct io_u *sec_event(struct thread_data *td, int event) |
||||
{ |
||||
sec_data *bsd = (sec_data*)td->io_ops_data; |
||||
if (bsd->completed.size() == 0) |
||||
return NULL; |
||||
/* FIXME We ignore the event number and assume fio calls us exactly once for [0..nr_events-1] */ |
||||
struct io_u *ev = bsd->completed.back(); |
||||
bsd->completed.pop_back(); |
||||
return ev; |
||||
} |
||||
|
||||
static int sec_io_u_init(struct thread_data *td, struct io_u *io) |
||||
{ |
||||
io->engine_data = NULL; |
||||
return 0; |
||||
} |
||||
|
||||
static void sec_io_u_free(struct thread_data *td, struct io_u *io) |
||||
{ |
||||
} |
||||
|
||||
static int sec_open_file(struct thread_data *td, struct fio_file *f) |
||||
{ |
||||
return 0; |
||||
} |
||||
|
||||
static int sec_invalidate(struct thread_data *td, struct fio_file *f) |
||||
{ |
||||
return 0; |
||||
} |
||||
|
||||
struct ioengine_ops ioengine = { |
||||
.name = "microceph_cluster", |
||||
.version = FIO_IOOPS_VERSION, |
||||
.flags = FIO_MEMALIGN | FIO_DISKLESSIO | FIO_NOEXTEND, |
||||
.setup = sec_setup, |
||||
.init = sec_init, |
||||
.queue = sec_queue, |
||||
.getevents = sec_getevents, |
||||
.event = sec_event, |
||||
.cleanup = sec_cleanup, |
||||
.open_file = sec_open_file, |
||||
.invalidate = sec_invalidate, |
||||
.io_u_init = sec_io_u_init, |
||||
.io_u_free = sec_io_u_free, |
||||
.option_struct_size = sizeof(struct sec_options), |
||||
.options = options, |
||||
}; |
||||
|
||||
static void fio_init fio_sec_register(void) |
||||
{ |
||||
register_ioengine(&ioengine); |
||||
} |
||||
|
||||
static void fio_exit fio_sec_unregister(void) |
||||
{ |
||||
unregister_ioengine(&ioengine); |
||||
} |
Loading…
Reference in new issue