Add an OSD stub to compare sync socket I/O with io_uring + skip multiple fsyncs that fio issues

blocking-uring-test
Vitaliy Filippov 2020-01-15 22:03:27 +03:00
parent a3d3949dce
commit 8ea1ccc192
3 changed files with 284 additions and 24 deletions

View File

@ -1,7 +1,7 @@
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o
CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always
all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore
all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd
clean:
rm -f *.o
@ -34,6 +34,8 @@ osd.o: osd.cpp osd.h osd_ops.h
g++ $(CXXFLAGS) -c -o $@ $<
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h osd.o osd_exec_secondary.o osd_read.o osd_send.o json11.o
g++ $(CXXFLAGS) -o osd osd_main.cpp osd.o osd_exec_secondary.o osd_read.o osd_send.o json11.o ./libblockstore.so -ltcmalloc_minimal -luring
stub_osd: stub_osd.cpp osd_ops.h
g++ $(CXXFLAGS) -o stub_osd stub_osd.cpp -ltcmalloc_minimal
libfio_sec_osd.so: fio_sec_osd.cpp osd_ops.h
g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o libfio_sec_osd.so fio_sec_osd.cpp -luring

View File

@ -38,6 +38,7 @@ struct sec_data
/* block_size = 1 << block_order (128KB by default) */
uint64_t block_order = 17, block_size = 1 << 17;
std::unordered_map<uint64_t, io_u*> queue;
bool last_sync = false;
/* The list of completed io_u structs. */
std::vector<io_u*> completed;
uint64_t op_n = 0, inflight = 0;
@ -74,6 +75,9 @@ static struct fio_option options[] = {
},
};
static int read_blocking(int fd, void *read_buf, size_t remaining);
static int write_blocking(int fd, void *write_buf, size_t remaining);
static int sec_setup(struct thread_data *td)
{
sec_data *bsd;
@ -152,6 +156,10 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
int n = bsd->op_n;
fio_ro_check(td, io);
if (io->ddir == DDIR_SYNC && bsd->last_sync)
{
return FIO_Q_COMPLETED;
}
io->engine_data = bsd;
union
@ -173,6 +181,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
op.sec_rw.version = UINT64_MAX; // last unstable
op.sec_rw.offset = io->offset % bsd->block_size;
op.sec_rw.len = io->xfer_buflen;
bsd->last_sync = false;
break;
case DDIR_WRITE:
op.hdr.opcode = OSD_OP_SECONDARY_WRITE;
@ -183,10 +192,13 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
op.sec_rw.version = 0; // assign automatically
op.sec_rw.offset = io->offset % bsd->block_size;
op.sec_rw.len = io->xfer_buflen;
bsd->last_sync = false;
break;
case DDIR_SYNC:
// Allowed only for testing: sync & stabilize all unstable object versions
op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL;
// fio sends 32 syncs with -fsync=32. we omit 31 of them even though it's not 100% fine (FIXME: fix fio itself)
bsd->last_sync = true;
break;
default:
io->error = EINVAL;
@ -206,23 +218,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
if (io->ddir == DDIR_WRITE)
{
// Send data
void *send_buf = io->xfer_buf;
size_t remaining = io->xfer_buflen;
while (remaining > 0)
{
size_t r = write(bsd->connect_fd, send_buf, remaining);
if (r < 0)
{
if (r != EAGAIN)
{
perror("write");
exit(1);
}
continue;
}
remaining -= r;
send_buf += r;
}
write_blocking(bsd->connect_fd, io->xfer_buf, io->xfer_buflen);
}
if (io->error != 0)
@ -230,19 +226,52 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
return FIO_Q_QUEUED;
}
void read_blocking(int fd, void *read_buf, size_t remaining)
static int read_blocking(int fd, void *read_buf, size_t remaining)
{
while (remaining > 0)
size_t done = 0;
while (done < remaining)
{
size_t r = read(fd, read_buf, remaining);
size_t r = read(fd, read_buf, remaining-done);
if (r <= 0)
{
perror("read");
exit(1);
if (!errno)
{
// EOF
return done;
}
else if (errno != EAGAIN && errno != EPIPE)
{
perror("read");
exit(1);
}
continue;
}
remaining -= r;
done += r;
read_buf += r;
}
return done;
}
static int write_blocking(int fd, void *write_buf, size_t remaining)
{
size_t done = 0;
while (done < remaining)
{
size_t r = write(fd, write_buf, remaining-done);
if (r < 0)
{
if (errno != EAGAIN && errno != EPIPE)
{
perror("write");
exit(1);
}
continue;
}
done += r;
write_buf += r;
}
return done;
}
static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)

229
stub_osd.cpp Normal file
View File

@ -0,0 +1,229 @@
/**
* Stub "OSD" to test & compare network performance with sync read/write and io_uring
*
* Core i7-6700HQ laptop
*
* stub_osd:
* randwrite Q1 S1: 36900 iops
* randwrite Q32 S32: 71000 iops
* randwrite Q32 S32 (multi-fsync fix): 113000 iops
* randread Q1: 67300 iops
* randread Q32: 144000 iops
*
* io_uring osd with #define OSD_STUB:
* randwrite Q1 S1: 30000 iops
* randwrite Q32 S32: 78600 iops
* randwrite Q32 S32 (multi-fsync fix): 125000 iops
* randread Q1: 50700 iops
* randread Q32: 86100 iops
*
* It seems io_uring is fine :)
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <stdexcept>
#include "osd_ops.h"
int bind_stub(const char *bind_address, int bind_port);
void run_stub(int peer_fd);
static int write_blocking(int fd, void *write_buf, size_t remaining);
static int read_blocking(int fd, void *read_buf, size_t remaining);
int main(int narg, char *args[])
{
int listen_fd = bind_stub("0.0.0.0", 11203);
// Accept new connections
sockaddr_in addr;
socklen_t peer_addr_size = sizeof(addr);
int peer_fd;
while (1)
{
printf("stub_osd: waiting for 1 client\n");
peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size);
if (peer_fd == -1)
{
if (errno == EAGAIN)
continue;
else
throw std::runtime_error(std::string("accept: ") + strerror(errno));
}
char peer_str[256];
printf("stub_osd: new client %d: connection from %s port %d\n", peer_fd,
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
run_stub(peer_fd);
close(peer_fd);
printf("stub_osd: client %d disconnected\n", peer_fd);
// Try to accept next connection
peer_addr_size = sizeof(addr);
}
return 0;
}
int bind_stub(const char *bind_address, int bind_port)
{
int listen_backlog = 128;
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1)
{
close(listen_fd);
throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support"));
}
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
return listen_fd;
}
void run_stub(int peer_fd)
{
union
{
osd_any_op_t op;
uint8_t op_buf[OSD_PACKET_SIZE] = { 0 };
};
union
{
osd_any_reply_t reply;
uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 };
};
void *buf = NULL;
while (1)
{
int r = read_blocking(peer_fd, op_buf, OSD_PACKET_SIZE);
if (r < OSD_PACKET_SIZE)
{
break;
}
if (op.hdr.magic != SECONDARY_OSD_OP_MAGIC)
{
printf("client %d: bad magic number in operation header\n", peer_fd);
break;
}
reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
reply.hdr.id = op.hdr.id;
reply.hdr.opcode = op.hdr.opcode;
if (op.hdr.opcode == OSD_OP_SECONDARY_READ)
{
reply.hdr.retval = op.sec_rw.len;
buf = malloc(op.sec_rw.len);
r = write_blocking(peer_fd, reply_buf, OSD_PACKET_SIZE);
if (r == OSD_PACKET_SIZE)
r = write_blocking(peer_fd, &buf, op.sec_rw.len);
free(buf);
if (r < op.sec_rw.len)
break;
}
else if (op.hdr.opcode == OSD_OP_SECONDARY_WRITE)
{
buf = malloc(op.sec_rw.len);
r = read_blocking(peer_fd, buf, op.sec_rw.len);
free(buf);
reply.hdr.retval = op.sec_rw.len;
if (r == op.sec_rw.len)
r = write_blocking(peer_fd, reply_buf, OSD_PACKET_SIZE);
else
r = 0;
if (r < OSD_PACKET_SIZE)
break;
}
else if (op.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL)
{
reply.hdr.retval = 0;
r = write_blocking(peer_fd, reply_buf, OSD_PACKET_SIZE);
if (r < OSD_PACKET_SIZE)
break;
}
else
{
printf("client %d: unsupported stub opcode: %lu\n", peer_fd, op.hdr.opcode);
break;
}
}
free(buf);
}
static int read_blocking(int fd, void *read_buf, size_t remaining)
{
size_t done = 0;
while (done < remaining)
{
size_t r = read(fd, read_buf, remaining-done);
if (r <= 0)
{
if (!errno)
{
// EOF
return done;
}
else if (errno != EAGAIN && errno != EPIPE)
{
perror("read");
exit(1);
}
continue;
}
done += r;
read_buf += r;
}
return done;
}
static int write_blocking(int fd, void *write_buf, size_t remaining)
{
size_t done = 0;
while (done < remaining)
{
size_t r = write(fd, write_buf, remaining-done);
if (r < 0)
{
if (errno != EAGAIN && errno != EPIPE)
{
perror("write");
exit(1);
}
continue;
}
done += r;
write_buf += r;
}
return done;
}