diff --git a/Makefile b/Makefile index 75e365914..8dcc3890d 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always -all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd +all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd test_osd clean: rm -f *.o @@ -43,6 +43,10 @@ osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS) g++ $(CXXFLAGS) -o osd osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring stub_osd: stub_osd.cpp osd_ops.h g++ $(CXXFLAGS) -o stub_osd stub_osd.cpp -ltcmalloc_minimal +rw_blocking.o: rw_blocking.cpp rw_blocking.h + g++ $(CXXFLAGS) -c -o $@ $< +test_osd: test_osd.cpp osd_ops.h rw_blocking.o + g++ $(CXXFLAGS) -o test_osd test_osd.cpp rw_blocking.o -ltcmalloc_minimal libfio_sec_osd.so: fio_sec_osd.cpp osd_ops.h g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o libfio_sec_osd.so fio_sec_osd.cpp -luring diff --git a/osd.cpp b/osd.cpp index b4099bfac..e028fd5e1 100644 --- a/osd.cpp +++ b/osd.cpp @@ -193,19 +193,17 @@ int osd_t::handle_epoll_events() else { auto & cl = clients[events[i].data.fd]; - if (events[i].events & EPOLLRDHUP) + if (cl.peer_state == PEER_CONNECTING) + { + // Either OUT (connected) or HUP + handle_connect_result(cl.peer_fd); + } + else if (events[i].events & EPOLLRDHUP) { // Stop client printf("osd: client %d disconnected\n", cl.peer_fd); stop_client(cl.peer_fd); } - else if (cl.peer_state == PEER_CONNECTING) - { - if (events[i].events & EPOLLOUT) - { - handle_connect_result(cl.peer_fd); - } - } else if (!cl.read_ready) { // Mark client as ready (i.e. some data is available) diff --git a/osd.h b/osd.h index e1fb29e9c..a11223c02 100644 --- a/osd.h +++ b/osd.h @@ -126,7 +126,7 @@ struct osd_client_t int peer_port; int peer_fd; int peer_state; - std::function connect_callback; + std::function connect_callback; osd_num_t osd_num = 0; // Read state @@ -211,7 +211,7 @@ class osd_t void outbox_push(osd_client_t & cl, osd_op_t *op); // peer handling (primary OSD logic) - void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback); + void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback); void handle_connect_result(int peer_fd); void stop_client(int peer_fd); osd_peer_def_t parse_peer(std::string peer); diff --git a/osd_exec_secondary.cpp b/osd_exec_secondary.cpp index 9c3c1889d..8e197b5b2 100644 --- a/osd_exec_secondary.cpp +++ b/osd_exec_secondary.cpp @@ -159,6 +159,7 @@ void osd_t::make_reply(osd_op_t *op) else if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST && op->reply.hdr.retval > 0) { + op->buf = op->bs_op.buf; // allocated by blockstore op->send_list.push_back(op->buf, op->reply.hdr.retval * sizeof(obj_ver_id)); } } diff --git a/osd_peering.cpp b/osd_peering.cpp index 6f724789e..c8da16e64 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -43,13 +43,13 @@ osd_peer_def_t osd_t::parse_peer(std::string peer) return r; } -void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback) +void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function callback) { struct sockaddr_in addr; int r; if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) { - callback(-EINVAL); + callback(osd_num, -EINVAL); return; } addr.sin_family = AF_INET; @@ -57,7 +57,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port int peer_fd = socket(AF_INET, SOCK_STREAM, 0); if (peer_fd < 0) { - callback(-errno); + callback(osd_num, -errno); return; } fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); @@ -65,7 +65,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port if (r < 0 && errno != EINPROGRESS) { close(peer_fd); - callback(-errno); + callback(osd_num, -errno); return; } clients[peer_fd] = (osd_client_t){ @@ -90,7 +90,8 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port void osd_t::handle_connect_result(int peer_fd) { auto & cl = clients[peer_fd]; - std::function callback = cl.connect_callback; + osd_num_t osd_num = cl.osd_num; + auto callback = cl.connect_callback; int result = 0; socklen_t result_len = sizeof(result); if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) @@ -100,7 +101,7 @@ void osd_t::handle_connect_result(int peer_fd) if (result != 0) { stop_client(peer_fd); - callback(-result); + callback(osd_num, -result); return; } int one = 1; @@ -115,7 +116,7 @@ void osd_t::handle_connect_result(int peer_fd) { throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } - callback(peer_fd); + callback(osd_num, peer_fd); } // Peering loop @@ -130,8 +131,13 @@ void osd_t::handle_peers() time(NULL) - peers[i].last_connect_attempt > 5) { peers[i].last_connect_attempt = time(NULL); - connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](int peer_fd) + connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](osd_num_t osd_num, int peer_fd) { + if (peer_fd < 0) + { + printf("Failed to connect to peer OSD %lu: %s\n", osd_num, strerror(-peer_fd)); + return; + } printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); int i; for (i = 0; i < peers.size(); i++) diff --git a/osd_primary.cpp b/osd_primary.cpp index 084be9e40..dd04aff57 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -62,6 +62,7 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) osd_primary_read_t *op_data = (osd_primary_read_t*)calloc( sizeof(osd_primary_read_t) + sizeof(osd_read_stripe_t) * pgs[pg_num].pg_size, 1 ); + op_data->oid = oid; osd_read_stripe_t *stripes = (op_data->stripes = ((osd_read_stripe_t*)(op_data+1))); cur_op->op_data = op_data; for (int role = 0; role < pgs[pg_num].pg_minsize; role++) @@ -219,9 +220,14 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* target_set, osd int subop = 0; for (int role = 0; role < read_pg_size; role++) { + if (stripes[role].real_end == 0) + { + continue; + } auto role_osd_num = target_set[role]; if (role_osd_num != 0) { + printf("Read subop from %lu: %lu / %lu\n", role_osd_num, op_data->oid.inode, op_data->oid.stripe | role); if (role_osd_num == this->osd_num) { subops[subop].bs_op = { @@ -262,8 +268,11 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* target_set, osd subops[subop].buf = cur_op->buf + stripes[role].pos; subops[subop].callback = [this, cur_op](osd_op_t *subop) { + // so it doesn't get freed. FIXME: do it better + subop->buf = NULL; handle_primary_read_subop(cur_op, subop->reply.hdr.retval == subop->op.sec_rw.len); }; + outbox_push(clients[subops[subop].peer_fd], &subops[subop]); } subop++; } diff --git a/osd_send.cpp b/osd_send.cpp index 85da80ba9..04b1657f9 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -2,6 +2,7 @@ void osd_t::outbox_push(osd_client_t & cl, osd_op_t *cur_op) { + assert(cur_op->peer_fd); if (cl.write_state == 0) { cl.write_state = CL_WRITE_READY; @@ -80,6 +81,7 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd) { // Send data cl.write_buf = cur_op->send_list[cur_op->send_list.sent].buf; + assert(cl.write_buf); cl.write_remaining = cur_op->send_list[cur_op->send_list.sent].len; cur_op->send_list.sent++; cl.write_state = CL_WRITE_DATA; diff --git a/rw_blocking.cpp b/rw_blocking.cpp new file mode 100644 index 000000000..6e7356812 --- /dev/null +++ b/rw_blocking.cpp @@ -0,0 +1,52 @@ +#include +#include +#include + +#include "rw_blocking.h" + +int read_blocking(int fd, void *read_buf, size_t remaining) +{ + size_t done = 0; + while (done < remaining) + { + size_t r = read(fd, read_buf, remaining-done); + if (r <= 0) + { + if (!errno) + { + // EOF + return done; + } + else if (errno != EAGAIN && errno != EPIPE) + { + perror("read"); + exit(1); + } + continue; + } + done += r; + read_buf += r; + } + return done; +} + +int write_blocking(int fd, void *write_buf, size_t remaining) +{ + size_t done = 0; + while (done < remaining) + { + size_t r = write(fd, write_buf, remaining-done); + if (r < 0) + { + if (errno != EAGAIN && errno != EPIPE) + { + perror("write"); + exit(1); + } + continue; + } + done += r; + write_buf += r; + } + return done; +} diff --git a/rw_blocking.h b/rw_blocking.h new file mode 100644 index 000000000..015595f32 --- /dev/null +++ b/rw_blocking.h @@ -0,0 +1,6 @@ +#pragma once + +#include + +int read_blocking(int fd, void *read_buf, size_t remaining); +int write_blocking(int fd, void *write_buf, size_t remaining); diff --git a/test_osd.cpp b/test_osd.cpp new file mode 100644 index 000000000..15ffb3f39 --- /dev/null +++ b/test_osd.cpp @@ -0,0 +1,200 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "osd_ops.h" +#include "rw_blocking.h" + +int connect_osd(const char *osd_address, int osd_port); + +uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern); + +void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len); + +bool check_pattern(void *buf, uint64_t len, uint64_t pattern); + +#define PATTERN0 0x8c4641acc762840e +#define PATTERN1 0x70a549add9a2280a +#define PATTERN2 (PATTERN0 ^ PATTERN1) + +int main0(int narg, char *args[]) +{ + int connect_fd; + // Prepare data for cluster read + connect_fd = connect_osd("127.0.0.1", 11203); + test_write(connect_fd, 2, 0, 1, PATTERN0); + close(connect_fd); + connect_fd = connect_osd("127.0.0.1", 11204); + test_write(connect_fd, 2, 1, 1, PATTERN1); + close(connect_fd); + connect_fd = connect_osd("127.0.0.1", 11205); + test_write(connect_fd, 2, 2, 1, PATTERN2); + close(connect_fd); + return 0; +} + +int main(int narg, char *args[]) +{ + int connect_fd; + void *data; + // Cluster read + connect_fd = connect_osd("127.0.0.1", 11203); + data = test_primary_read(connect_fd, 2, 0, 128*1024); + if (data && check_pattern(data, 128*1024, PATTERN0)) + printf("inode=2 0-128K OK\n"); + if (data) + free(data); + data = test_primary_read(connect_fd, 2, 0, 256*1024); + if (data && check_pattern(data, 128*1024, PATTERN0) && + check_pattern(data+128*1024, 128*1024, PATTERN1)) + printf("inode=2 0-256K OK\n"); + if (data) + free(data); + close(connect_fd); + return 0; +} + +int connect_osd(const char *osd_address, int osd_port) +{ + struct sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, osd_address, &addr.sin_addr)) != 1) + { + fprintf(stderr, "server address: %s%s\n", osd_address, r == 0 ? " is not valid" : ": no ipv4 support"); + return -1; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(osd_port); + + int connect_fd = socket(AF_INET, SOCK_STREAM, 0); + if (connect_fd < 0) + { + perror("socket"); + return -1; + } + if (connect(connect_fd, (sockaddr*)&addr, sizeof(addr)) < 0) + { + perror("connect"); + return -1; + } + int one = 1; + setsockopt(connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + + return connect_fd; +} + +uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern) +{ + union + { + osd_any_op_t op; + uint8_t op_buf[OSD_PACKET_SIZE] = { 0 }; + }; + union + { + osd_any_reply_t reply; + uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 }; + }; + op.hdr.magic = SECONDARY_OSD_OP_MAGIC; + op.hdr.id = 1; + op.hdr.opcode = OSD_OP_SECONDARY_WRITE; + op.sec_rw.oid = { + .inode = inode, + .stripe = stripe, + }; + op.sec_rw.version = version; + op.sec_rw.offset = 0; + op.sec_rw.len = 128*1024; + void *data = memalign(512, 128*1024); + for (int i = 0; i < 128*1024/sizeof(uint64_t); i++) + ((uint64_t*)data)[i] = pattern; + write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE); + write_blocking(connect_fd, data, 128*1024); + int r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE); + if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC || + reply.hdr.id != 1 || reply.hdr.opcode != OSD_OP_SECONDARY_WRITE || + reply.hdr.retval != 128*1024) + { + free(data); + perror("read"); + return 0; + } + version = reply.sec_rw.version; + op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL; + op.hdr.id = 2; + write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE); + r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE); + if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC || + reply.hdr.id != 2 || reply.hdr.opcode != OSD_OP_TEST_SYNC_STAB_ALL || + reply.hdr.retval != 0) + { + free(data); + perror("read"); + return 0; + } + free(data); + return version; +} + +void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_t len) +{ + union + { + osd_any_op_t op; + uint8_t op_buf[OSD_PACKET_SIZE] = { 0 }; + }; + union + { + osd_any_reply_t reply; + uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 }; + }; + op.hdr.magic = SECONDARY_OSD_OP_MAGIC; + op.hdr.id = 1; + op.hdr.opcode = OSD_OP_READ; + op.rw.inode = inode; + op.rw.offset = offset; + op.rw.len = len; + void *data = memalign(512, len); + write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE); + int r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE); + if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC || + reply.hdr.id != 1 || reply.hdr.opcode != OSD_OP_READ || + reply.hdr.retval != len) + { + free(data); + perror("read"); + return NULL; + } + r = read_blocking(connect_fd, data, len); + if (r != len) + { + free(data); + perror("read data"); + return NULL; + } + return data; +} + +bool check_pattern(void *buf, uint64_t len, uint64_t pattern) +{ + for (int i = 0; i < len/sizeof(uint64_t); i++) + { + if (((uint64_t*)buf)[i] != pattern) + { + printf("(result[%d] = %lu) != %lu\n", i, ((uint64_t*)buf)[i], pattern); + return false; + } + } + return true; +}