Test and fix primary-read

blocking-uring-test
Vitaliy Filippov 3 years ago
parent 235d15422c
commit 97d3fc593c
  1. 6
      Makefile
  2. 14
      osd.cpp
  3. 4
      osd.h
  4. 1
      osd_exec_secondary.cpp
  5. 22
      osd_peering.cpp
  6. 9
      osd_primary.cpp
  7. 2
      osd_send.cpp
  8. 52
      rw_blocking.cpp
  9. 6
      rw_blocking.h
  10. 200
      test_osd.cpp

@ -1,7 +1,7 @@
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o
CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always
all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd
all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd test_osd
clean:
rm -f *.o
@ -43,6 +43,10 @@ osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
g++ $(CXXFLAGS) -o osd osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring
stub_osd: stub_osd.cpp osd_ops.h
g++ $(CXXFLAGS) -o stub_osd stub_osd.cpp -ltcmalloc_minimal
rw_blocking.o: rw_blocking.cpp rw_blocking.h
g++ $(CXXFLAGS) -c -o $@ $<
test_osd: test_osd.cpp osd_ops.h rw_blocking.o
g++ $(CXXFLAGS) -o test_osd test_osd.cpp rw_blocking.o -ltcmalloc_minimal
libfio_sec_osd.so: fio_sec_osd.cpp osd_ops.h
g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o libfio_sec_osd.so fio_sec_osd.cpp -luring

@ -193,19 +193,17 @@ int osd_t::handle_epoll_events()
else
{
auto & cl = clients[events[i].data.fd];
if (events[i].events & EPOLLRDHUP)
if (cl.peer_state == PEER_CONNECTING)
{
// Either OUT (connected) or HUP
handle_connect_result(cl.peer_fd);
}
else if (events[i].events & EPOLLRDHUP)
{
// Stop client
printf("osd: client %d disconnected\n", cl.peer_fd);
stop_client(cl.peer_fd);
}
else if (cl.peer_state == PEER_CONNECTING)
{
if (events[i].events & EPOLLOUT)
{
handle_connect_result(cl.peer_fd);
}
}
else if (!cl.read_ready)
{
// Mark client as ready (i.e. some data is available)

@ -126,7 +126,7 @@ struct osd_client_t
int peer_port;
int peer_fd;
int peer_state;
std::function<void(int)> connect_callback;
std::function<void(osd_num_t, int)> connect_callback;
osd_num_t osd_num = 0;
// Read state
@ -211,7 +211,7 @@ class osd_t
void outbox_push(osd_client_t & cl, osd_op_t *op);
// peer handling (primary OSD logic)
void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(int)> callback);
void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback);
void handle_connect_result(int peer_fd);
void stop_client(int peer_fd);
osd_peer_def_t parse_peer(std::string peer);

@ -159,6 +159,7 @@ void osd_t::make_reply(osd_op_t *op)
else if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST &&
op->reply.hdr.retval > 0)
{
op->buf = op->bs_op.buf; // allocated by blockstore
op->send_list.push_back(op->buf, op->reply.hdr.retval * sizeof(obj_ver_id));
}
}

@ -43,13 +43,13 @@ osd_peer_def_t osd_t::parse_peer(std::string peer)
return r;
}
void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(int)> callback)
void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback)
{
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
{
callback(-EINVAL);
callback(osd_num, -EINVAL);
return;
}
addr.sin_family = AF_INET;
@ -57,7 +57,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port
int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
if (peer_fd < 0)
{
callback(-errno);
callback(osd_num, -errno);
return;
}
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
@ -65,7 +65,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port
if (r < 0 && errno != EINPROGRESS)
{
close(peer_fd);
callback(-errno);
callback(osd_num, -errno);
return;
}
clients[peer_fd] = (osd_client_t){
@ -90,7 +90,8 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port
void osd_t::handle_connect_result(int peer_fd)
{
auto & cl = clients[peer_fd];
std::function<void(int)> callback = cl.connect_callback;
osd_num_t osd_num = cl.osd_num;
auto callback = cl.connect_callback;
int result = 0;
socklen_t result_len = sizeof(result);
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
@ -100,7 +101,7 @@ void osd_t::handle_connect_result(int peer_fd)
if (result != 0)
{
stop_client(peer_fd);
callback(-result);
callback(osd_num, -result);
return;
}
int one = 1;
@ -115,7 +116,7 @@ void osd_t::handle_connect_result(int peer_fd)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
callback(peer_fd);
callback(osd_num, peer_fd);
}
// Peering loop
@ -130,8 +131,13 @@ void osd_t::handle_peers()
time(NULL) - peers[i].last_connect_attempt > 5)
{
peers[i].last_connect_attempt = time(NULL);
connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](int peer_fd)
connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](osd_num_t osd_num, int peer_fd)
{
if (peer_fd < 0)
{
printf("Failed to connect to peer OSD %lu: %s\n", osd_num, strerror(-peer_fd));
return;
}
printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd);
int i;
for (i = 0; i < peers.size(); i++)

@ -62,6 +62,7 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
osd_primary_read_t *op_data = (osd_primary_read_t*)calloc(
sizeof(osd_primary_read_t) + sizeof(osd_read_stripe_t) * pgs[pg_num].pg_size, 1
);
op_data->oid = oid;
osd_read_stripe_t *stripes = (op_data->stripes = ((osd_read_stripe_t*)(op_data+1)));
cur_op->op_data = op_data;
for (int role = 0; role < pgs[pg_num].pg_minsize; role++)
@ -219,9 +220,14 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* target_set, osd
int subop = 0;
for (int role = 0; role < read_pg_size; role++)
{
if (stripes[role].real_end == 0)
{
continue;
}
auto role_osd_num = target_set[role];
if (role_osd_num != 0)
{
printf("Read subop from %lu: %lu / %lu\n", role_osd_num, op_data->oid.inode, op_data->oid.stripe | role);
if (role_osd_num == this->osd_num)
{
subops[subop].bs_op = {
@ -262,8 +268,11 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* target_set, osd
subops[subop].buf = cur_op->buf + stripes[role].pos;
subops[subop].callback = [this, cur_op](osd_op_t *subop)
{
// so it doesn't get freed. FIXME: do it better
subop->buf = NULL;
handle_primary_read_subop(cur_op, subop->reply.hdr.retval == subop->op.sec_rw.len);
};
outbox_push(clients[subops[subop].peer_fd], &subops[subop]);
}
subop++;
}

@ -2,6 +2,7 @@
void osd_t::outbox_push(osd_client_t & cl, osd_op_t *cur_op)
{
assert(cur_op->peer_fd);
if (cl.write_state == 0)
{
cl.write_state = CL_WRITE_READY;
@ -80,6 +81,7 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd)
{
// Send data
cl.write_buf = cur_op->send_list[cur_op->send_list.sent].buf;
assert(cl.write_buf);
cl.write_remaining = cur_op->send_list[cur_op->send_list.sent].len;
cur_op->send_list.sent++;
cl.write_state = CL_WRITE_DATA;

@ -0,0 +1,52 @@
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include "rw_blocking.h"
int read_blocking(int fd, void *read_buf, size_t remaining)
{
size_t done = 0;
while (done < remaining)
{
size_t r = read(fd, read_buf, remaining-done);
if (r <= 0)
{
if (!errno)
{
// EOF
return done;
}
else if (errno != EAGAIN && errno != EPIPE)
{
perror("read");
exit(1);
}
continue;
}
done += r;
read_buf += r;
}
return done;
}
int write_blocking(int fd, void *write_buf, size_t remaining)
{
size_t done = 0;
while (done < remaining)
{
size_t r = write(fd, write_buf, remaining-done);
if (r < 0)
{
if (errno != EAGAIN && errno != EPIPE)
{
perror("write");
exit(1);
}
continue;
}
done += r;
write_buf += r;
}
return done;
}

@ -0,0 +1,6 @@
#pragma once
#include <unistd.h>
int read_blocking(int fd, void *read_buf, size_t remaining);
int write_blocking(int fd, void *write_buf, size_t remaining);

@ -0,0 +1,200 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <malloc.h>
#include <stdexcept>
#include "osd_ops.h"
#include "rw_blocking.h"
int connect_osd(const char *osd_address, int osd_port);
uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern);
void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len);
bool check_pattern(void *buf, uint64_t len, uint64_t pattern);
#define PATTERN0 0x8c4641acc762840e
#define PATTERN1 0x70a549add9a2280a
#define PATTERN2 (PATTERN0 ^ PATTERN1)
int main0(int narg, char *args[])
{
int connect_fd;
// Prepare data for cluster read
connect_fd = connect_osd("127.0.0.1", 11203);
test_write(connect_fd, 2, 0, 1, PATTERN0);
close(connect_fd);
connect_fd = connect_osd("127.0.0.1", 11204);
test_write(connect_fd, 2, 1, 1, PATTERN1);
close(connect_fd);
connect_fd = connect_osd("127.0.0.1", 11205);
test_write(connect_fd, 2, 2, 1, PATTERN2);
close(connect_fd);
return 0;
}
int main(int narg, char *args[])
{
int connect_fd;
void *data;
// Cluster read
connect_fd = connect_osd("127.0.0.1", 11203);
data = test_primary_read(connect_fd, 2, 0, 128*1024);
if (data && check_pattern(data, 128*1024, PATTERN0))
printf("inode=2 0-128K OK\n");
if (data)
free(data);
data = test_primary_read(connect_fd, 2, 0, 256*1024);
if (data && check_pattern(data, 128*1024, PATTERN0) &&
check_pattern(data+128*1024, 128*1024, PATTERN1))
printf("inode=2 0-256K OK\n");
if (data)
free(data);
close(connect_fd);
return 0;
}
int connect_osd(const char *osd_address, int osd_port)
{
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, osd_address, &addr.sin_addr)) != 1)
{
fprintf(stderr, "server address: %s%s\n", osd_address, r == 0 ? " is not valid" : ": no ipv4 support");
return -1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(osd_port);
int connect_fd = socket(AF_INET, SOCK_STREAM, 0);
if (connect_fd < 0)
{
perror("socket");
return -1;
}
if (connect(connect_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
{
perror("connect");
return -1;
}
int one = 1;
setsockopt(connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
return connect_fd;
}
uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern)
{
union
{
osd_any_op_t op;
uint8_t op_buf[OSD_PACKET_SIZE] = { 0 };
};
union
{
osd_any_reply_t reply;
uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 };
};
op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
op.hdr.id = 1;
op.hdr.opcode = OSD_OP_SECONDARY_WRITE;
op.sec_rw.oid = {
.inode = inode,
.stripe = stripe,
};
op.sec_rw.version = version;
op.sec_rw.offset = 0;
op.sec_rw.len = 128*1024;
void *data = memalign(512, 128*1024);
for (int i = 0; i < 128*1024/sizeof(uint64_t); i++)
((uint64_t*)data)[i] = pattern;
write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE);
write_blocking(connect_fd, data, 128*1024);
int r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE);
if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC ||
reply.hdr.id != 1 || reply.hdr.opcode != OSD_OP_SECONDARY_WRITE ||
reply.hdr.retval != 128*1024)
{
free(data);
perror("read");
return 0;
}
version = reply.sec_rw.version;
op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL;
op.hdr.id = 2;
write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE);
r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE);
if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC ||
reply.hdr.id != 2 || reply.hdr.opcode != OSD_OP_TEST_SYNC_STAB_ALL ||
reply.hdr.retval != 0)
{
free(data);
perror("read");
return 0;
}
free(data);
return version;
}
void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len)
{
union
{
osd_any_op_t op;
uint8_t op_buf[OSD_PACKET_SIZE] = { 0 };
};
union
{
osd_any_reply_t reply;
uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 };
};
op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
op.hdr.id = 1;
op.hdr.opcode = OSD_OP_READ;
op.rw.inode = inode;
op.rw.offset = offset;
op.rw.len = len;
void *data = memalign(512, len);
write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE);
int r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE);
if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC ||
reply.hdr.id != 1 || reply.hdr.opcode != OSD_OP_READ ||
reply.hdr.retval != len)
{
free(data);
perror("read");
return NULL;
}
r = read_blocking(connect_fd, data, len);
if (r != len)
{
free(data);
perror("read data");
return NULL;
}
return data;
}
bool check_pattern(void *buf, uint64_t len, uint64_t pattern)
{
for (int i = 0; i < len/sizeof(uint64_t); i++)
{
if (((uint64_t*)buf)[i] != pattern)
{
printf("(result[%d] = %lu) != %lu\n", i, ((uint64_t*)buf)[i], pattern);
return false;
}
}
return true;
}
Loading…
Cancel
Save