|
|
@ -29,16 +29,23 @@ |
|
|
|
#include <unordered_map> |
|
|
|
|
|
|
|
#include "addr_util.h" |
|
|
|
#include "rw_blocking.h" |
|
|
|
#include "epoll_manager.h" |
|
|
|
#include "ringloop.h" |
|
|
|
#include "messenger.h" |
|
|
|
#include "osd_ops.h" |
|
|
|
#include "fio_headers.h" |
|
|
|
|
|
|
|
struct sec_data |
|
|
|
{ |
|
|
|
epoll_manager_t *epmgr; |
|
|
|
ring_loop_t *ringloop; |
|
|
|
osd_messenger_t *msgr; |
|
|
|
ring_consumer_t looper; |
|
|
|
void *bitmap_buf; |
|
|
|
int connect_fd; |
|
|
|
uint64_t op_id; |
|
|
|
/* block_size = 1 << block_order (128KB by default) */ |
|
|
|
uint64_t block_order = 17, block_size = 1 << 17; |
|
|
|
std::unordered_map<uint64_t, io_u*> queue; |
|
|
|
bool last_sync = false; |
|
|
|
/* The list of completed io_u structs. */ |
|
|
|
std::vector<io_u*> completed; |
|
|
@ -111,9 +118,6 @@ static struct fio_option options[] = { |
|
|
|
static int sec_setup(struct thread_data *td) |
|
|
|
{ |
|
|
|
sec_data *bsd; |
|
|
|
//fio_file *f;
|
|
|
|
//int r;
|
|
|
|
//int64_t size;
|
|
|
|
|
|
|
|
bsd = new sec_data; |
|
|
|
if (!bsd) |
|
|
@ -130,8 +134,6 @@ static int sec_setup(struct thread_data *td) |
|
|
|
td->o.open_files++; |
|
|
|
} |
|
|
|
|
|
|
|
//f = td->files[0];
|
|
|
|
//f->real_file_size = size;
|
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
@ -140,6 +142,10 @@ static void sec_cleanup(struct thread_data *td) |
|
|
|
sec_data *bsd = (sec_data*)td->io_ops_data; |
|
|
|
if (bsd) |
|
|
|
{ |
|
|
|
delete bsd->msgr; |
|
|
|
delete bsd->epmgr; |
|
|
|
delete bsd->ringloop; |
|
|
|
free(bsd->bitmap_buf); |
|
|
|
close(bsd->connect_fd); |
|
|
|
delete bsd; |
|
|
|
} |
|
|
@ -174,6 +180,45 @@ static int sec_init(struct thread_data *td) |
|
|
|
int one = 1; |
|
|
|
setsockopt(bsd->connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); |
|
|
|
|
|
|
|
fcntl(bsd->connect_fd, F_SETFL, fcntl(bsd->connect_fd, F_GETFL, 0) | O_NONBLOCK); |
|
|
|
|
|
|
|
json11::Json cfg = json11::Json::object{ { "use_rdma", 0 } }; |
|
|
|
|
|
|
|
bsd->bitmap_buf = malloc(4096); |
|
|
|
|
|
|
|
bsd->ringloop = new ring_loop_t(512); |
|
|
|
bsd->epmgr = new epoll_manager_t(bsd->ringloop); |
|
|
|
bsd->msgr = new osd_messenger_t(); |
|
|
|
bsd->msgr->tfd = bsd->epmgr->tfd; |
|
|
|
bsd->msgr->ringloop = bsd->ringloop; |
|
|
|
bsd->msgr->repeer_pgs = [](osd_num_t){}; |
|
|
|
bsd->msgr->parse_config(cfg); |
|
|
|
bsd->msgr->init(); |
|
|
|
|
|
|
|
bsd->looper.loop = [bsd]() |
|
|
|
{ |
|
|
|
bsd->msgr->read_requests(); |
|
|
|
bsd->msgr->send_replies(); |
|
|
|
bsd->ringloop->submit(); |
|
|
|
}; |
|
|
|
bsd->ringloop->register_consumer(&bsd->looper); |
|
|
|
|
|
|
|
int peer_fd = bsd->connect_fd; |
|
|
|
bsd->msgr->clients[peer_fd] = new osd_client_t(); |
|
|
|
bsd->msgr->clients[peer_fd]->peer_addr = addr; |
|
|
|
bsd->msgr->clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port); |
|
|
|
bsd->msgr->clients[peer_fd]->peer_fd = peer_fd; |
|
|
|
bsd->msgr->clients[peer_fd]->peer_state = PEER_CONNECTED; |
|
|
|
bsd->msgr->clients[peer_fd]->connect_timeout_id = -1; |
|
|
|
bsd->msgr->clients[peer_fd]->osd_num = 1; |
|
|
|
bsd->msgr->clients[peer_fd]->in_buf = malloc_or_die(bsd->msgr->receive_buffer_size); |
|
|
|
bsd->epmgr->tfd->set_fd_handler(peer_fd, true, [msgr = bsd->msgr](int peer_fd, int epoll_events) |
|
|
|
{ |
|
|
|
// Either OUT (connected) or HUP
|
|
|
|
msgr->handle_peer_epoll(peer_fd, epoll_events); |
|
|
|
}); |
|
|
|
bsd->msgr->osd_peer_fds[1] = peer_fd; |
|
|
|
|
|
|
|
// FIXME: read config (block size) from OSD
|
|
|
|
|
|
|
|
return 0; |
|
|
@ -193,7 +238,12 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) |
|
|
|
} |
|
|
|
|
|
|
|
io->engine_data = bsd; |
|
|
|
osd_any_op_t op = { 0 }; |
|
|
|
|
|
|
|
osd_op_t *oo = new osd_op_t(); |
|
|
|
oo->op_type = OSD_OP_OUT; |
|
|
|
oo->peer_fd = bsd->connect_fd; |
|
|
|
|
|
|
|
osd_any_op_t & op = oo->req; |
|
|
|
|
|
|
|
op.hdr.magic = SECONDARY_OSD_OP_MAGIC; |
|
|
|
op.hdr.id = n; |
|
|
@ -210,6 +260,9 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) |
|
|
|
op.sec_rw.version = UINT64_MAX; // last unstable
|
|
|
|
op.sec_rw.offset = io->offset % bsd->block_size; |
|
|
|
op.sec_rw.len = io->xfer_buflen; |
|
|
|
op.sec_rw.attr_len = 4; |
|
|
|
oo->bitmap = bsd->bitmap_buf; |
|
|
|
oo->bitmap_len = 4; |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
@ -218,6 +271,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) |
|
|
|
op.rw.offset = io->offset; |
|
|
|
op.rw.len = io->xfer_buflen; |
|
|
|
} |
|
|
|
oo->iov.push_back(io->xfer_buf, io->xfer_buflen); |
|
|
|
bsd->last_sync = false; |
|
|
|
break; |
|
|
|
case DDIR_WRITE: |
|
|
@ -239,6 +293,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) |
|
|
|
op.rw.offset = io->offset; |
|
|
|
op.rw.len = io->xfer_buflen; |
|
|
|
} |
|
|
|
oo->iov.push_back(io->xfer_buf, io->xfer_buflen); |
|
|
|
bsd->last_sync = false; |
|
|
|
break; |
|
|
|
case DDIR_SYNC: |
|
|
@ -260,6 +315,21 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) |
|
|
|
return FIO_Q_COMPLETED; |
|
|
|
} |
|
|
|
|
|
|
|
oo->callback = [td, io](osd_op_t *oo) |
|
|
|
{ |
|
|
|
sec_options *opt = (sec_options*)td->eo; |
|
|
|
sec_data *bsd = (sec_data*)td->io_ops_data; |
|
|
|
if (opt->trace) |
|
|
|
{ |
|
|
|
printf("--- %s # %ld %ld\n", io->ddir == DDIR_READ ? "READ" : |
|
|
|
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), oo->reply.hdr.id, oo->reply.hdr.retval); |
|
|
|
} |
|
|
|
io->error = oo->reply.hdr.retval < 0 ? -oo->reply.hdr.retval : 0; |
|
|
|
bsd->completed.push_back(io); |
|
|
|
delete oo; |
|
|
|
}; |
|
|
|
bsd->msgr->outbox_push(oo); |
|
|
|
|
|
|
|
if (opt->trace) |
|
|
|
{ |
|
|
|
printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" : |
|
|
@ -269,21 +339,6 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) |
|
|
|
io->error = 0; |
|
|
|
bsd->inflight++; |
|
|
|
bsd->op_n++; |
|
|
|
bsd->queue[n] = io; |
|
|
|
|
|
|
|
iovec iov[2] = { { .iov_base = op.buf, .iov_len = OSD_PACKET_SIZE } }; |
|
|
|
int iovcnt = 1, wtotal = OSD_PACKET_SIZE; |
|
|
|
if (io->ddir == DDIR_WRITE) |
|
|
|
{ |
|
|
|
iov[1] = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen }; |
|
|
|
wtotal += io->xfer_buflen; |
|
|
|
iovcnt++; |
|
|
|
} |
|
|
|
if (writev_blocking(bsd->connect_fd, iov, iovcnt) != wtotal) |
|
|
|
{ |
|
|
|
perror("writev"); |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
|
|
|
|
if (io->error != 0) |
|
|
|
return FIO_Q_COMPLETED; |
|
|
@ -292,57 +347,13 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) |
|
|
|
|
|
|
|
static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) |
|
|
|
{ |
|
|
|
sec_options *opt = (sec_options*)td->eo; |
|
|
|
sec_data *bsd = (sec_data*)td->io_ops_data; |
|
|
|
// FIXME timeout, at least poll. Now it's the stupidest implementation possible
|
|
|
|
osd_any_reply_t reply; |
|
|
|
while (bsd->completed.size() < min) |
|
|
|
{ |
|
|
|
read_blocking(bsd->connect_fd, reply.buf, OSD_PACKET_SIZE); |
|
|
|
if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC) |
|
|
|
{ |
|
|
|
fprintf(stderr, "bad reply: magic = %lx instead of %lx\n", reply.hdr.magic, SECONDARY_OSD_REPLY_MAGIC); |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
auto it = bsd->queue.find(reply.hdr.id); |
|
|
|
if (it == bsd->queue.end()) |
|
|
|
{ |
|
|
|
fprintf(stderr, "bad reply: op id %lx missing in local queue\n", reply.hdr.id); |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
io_u* io = it->second; |
|
|
|
bsd->queue.erase(it); |
|
|
|
if (io->ddir == DDIR_READ) |
|
|
|
{ |
|
|
|
if (reply.hdr.retval != io->xfer_buflen) |
|
|
|
{ |
|
|
|
fprintf(stderr, "Short read: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
read_blocking(bsd->connect_fd, io->xfer_buf, io->xfer_buflen); |
|
|
|
} |
|
|
|
else if (io->ddir == DDIR_WRITE) |
|
|
|
{ |
|
|
|
if (reply.hdr.retval != io->xfer_buflen) |
|
|
|
{ |
|
|
|
fprintf(stderr, "Short write: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
} |
|
|
|
else if (io->ddir == DDIR_SYNC) |
|
|
|
{ |
|
|
|
if (reply.hdr.retval != 0) |
|
|
|
{ |
|
|
|
fprintf(stderr, "Sync failed: retval = %ld\n", reply.hdr.retval); |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
} |
|
|
|
if (opt->trace) |
|
|
|
{ |
|
|
|
printf("--- %s # %ld\n", io->ddir == DDIR_READ ? "READ" : |
|
|
|
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), reply.hdr.id); |
|
|
|
} |
|
|
|
bsd->completed.push_back(io); |
|
|
|
bsd->ringloop->loop(); |
|
|
|
if (bsd->completed.size() >= min) |
|
|
|
break; |
|
|
|
bsd->ringloop->wait(); |
|
|
|
} |
|
|
|
return bsd->completed.size(); |
|
|
|
} |
|
|
|