diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a87f8161..00400440 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -67,7 +67,7 @@ target_link_libraries(fio_vitastor_blk add_executable(vitastor-osd osd_main.cpp osd.cpp osd_secondary.cpp msgr_receive.cpp msgr_send.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp osd_primary.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp - etcd_state_client.cpp messenger.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp + etcd_state_client.cpp messenger.cpp msgr_op.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp osd_rmw.cpp base64.cpp timerfd_manager.cpp epoll_manager.cpp ../json11/json11.cpp ) target_link_libraries(vitastor-osd @@ -87,7 +87,7 @@ target_link_libraries(fio_vitastor_sec # libvitastor_client.so add_library(vitastor_client SHARED cluster_client.cpp epoll_manager.cpp etcd_state_client.cpp - messenger.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp + messenger.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ) target_link_libraries(vitastor_client @@ -162,7 +162,7 @@ target_link_libraries(osd_rmw_test Jerasure tcmalloc_minimal) # stub_uring_osd add_executable(stub_uring_osd - stub_uring_osd.cpp epoll_manager.cpp messenger.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp timerfd_manager.cpp ../json11/json11.cpp + stub_uring_osd.cpp epoll_manager.cpp messenger.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp timerfd_manager.cpp ../json11/json11.cpp ) target_link_libraries(stub_uring_osd ${LIBURING_LIBRARIES} diff --git a/src/messenger.cpp b/src/messenger.cpp index 91158578..fec99c81 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -10,22 +10,6 @@ #include "messenger.h" -osd_op_t::~osd_op_t() -{ - assert(!bs_op); - assert(!op_data); - if (rmw_buf) - { - free(rmw_buf); - } - if (buf) - { - // Note: reusing osd_op_t WILL currently lead to memory leaks - // So we don't reuse it, but free it every time - free(buf); - } -} - void osd_messenger_t::init() { keepalive_timer_id = tfd->set_timer(1000, true, [this](int) diff --git a/src/messenger.h b/src/messenger.h index 3a13efee..2c246bc4 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -14,19 +14,15 @@ #include "malloc_or_die.h" #include "json11/json11.hpp" -#include "osd_ops.h" +#include "msgr_op.h" #include "timerfd_manager.h" #include "ringloop.h" -#define OSD_OP_IN 0 -#define OSD_OP_OUT 1 - #define CL_READ_HDR 1 #define CL_READ_DATA 2 #define CL_READ_REPLY_DATA 3 #define CL_WRITE_READY 1 #define CL_WRITE_REPLY 2 -#define OSD_OP_INLINE_BUF_COUNT 16 #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 @@ -36,160 +32,6 @@ #define DEFAULT_PEER_CONNECT_TIMEOUT 5 #define DEFAULT_OSD_PING_TIMEOUT 5 -// Kind of a vector with small-list-optimisation -struct osd_op_buf_list_t -{ - int count = 0, alloc = OSD_OP_INLINE_BUF_COUNT, done = 0; - iovec *buf = NULL; - iovec inline_buf[OSD_OP_INLINE_BUF_COUNT]; - - inline osd_op_buf_list_t() - { - buf = inline_buf; - } - - inline osd_op_buf_list_t(const osd_op_buf_list_t & other) - { - buf = inline_buf; - append(other); - } - - inline osd_op_buf_list_t & operator = (const osd_op_buf_list_t & other) - { - reset(); - append(other); - return *this; - } - - inline ~osd_op_buf_list_t() - { - if (buf && buf != inline_buf) - { - free(buf); - } - } - - inline void reset() - { - count = 0; - done = 0; - } - - inline iovec* get_iovec() - { - return buf + done; - } - - inline int get_size() - { - return count - done; - } - - inline void append(const osd_op_buf_list_t & other) - { - if (count+other.count > alloc) - { - if (buf == inline_buf) - { - int old = alloc; - alloc = (((count+other.count+15)/16)*16); - buf = (iovec*)malloc(sizeof(iovec) * alloc); - if (!buf) - { - printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); - exit(1); - } - memcpy(buf, inline_buf, sizeof(iovec) * old); - } - else - { - alloc = (((count+other.count+15)/16)*16); - buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); - if (!buf) - { - printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); - exit(1); - } - } - } - for (int i = 0; i < other.count; i++) - { - buf[count++] = other.buf[i]; - } - } - - inline void push_back(void *nbuf, size_t len) - { - if (count >= alloc) - { - if (buf == inline_buf) - { - int old = alloc; - alloc = ((alloc/16)*16 + 1); - buf = (iovec*)malloc(sizeof(iovec) * alloc); - if (!buf) - { - printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); - exit(1); - } - memcpy(buf, inline_buf, sizeof(iovec)*old); - } - else - { - alloc = alloc < 16 ? 16 : (alloc+16); - buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); - if (!buf) - { - printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); - exit(1); - } - } - } - buf[count++] = { .iov_base = nbuf, .iov_len = len }; - } - - inline void eat(int result) - { - while (result > 0 && done < count) - { - iovec & iov = buf[done]; - if (iov.iov_len <= result) - { - result -= iov.iov_len; - done++; - } - else - { - iov.iov_len -= result; - iov.iov_base += result; - break; - } - } - } -}; - -struct blockstore_op_t; - -struct osd_primary_op_data_t; - -struct osd_op_t -{ - timespec tv_begin; - uint64_t op_type = OSD_OP_IN; - int peer_fd; - osd_any_op_t req; - osd_any_reply_t reply; - blockstore_op_t *bs_op = NULL; - void *buf = NULL; - void *rmw_buf = NULL; - osd_primary_op_data_t* op_data = NULL; - std::function callback; - - osd_op_buf_list_t iov; - - ~osd_op_t(); -}; - struct osd_client_t { int refs = 0; @@ -252,12 +94,9 @@ struct osd_op_stats_t struct osd_messenger_t { - timerfd_manager_t *tfd; - ring_loop_t *ringloop; +protected: int keepalive_timer_id = -1; - // osd_num_t is only for logging and asserts - osd_num_t osd_num; // FIXME: make receive_buffer_size configurable int receive_buffer_size = 64*1024; int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; @@ -267,19 +106,22 @@ struct osd_messenger_t int log_level = 0; bool use_sync_send_recv = false; - std::map wanted_peers; - std::map osd_peer_fds; - uint64_t next_subop_id = 1; - - std::map clients; std::vector read_ready_clients; std::vector write_ready_clients; std::vector> set_immediate; +public: + timerfd_manager_t *tfd; + ring_loop_t *ringloop; + // osd_num_t is only for logging and asserts + osd_num_t osd_num; + uint64_t next_subop_id = 1; + std::map clients; + std::map wanted_peers; + std::map osd_peer_fds; // op statistics osd_op_stats_t stats; -public: void init(); void parse_config(const json11::Json & config); void connect_peer(uint64_t osd_num, json11::Json peer_state); @@ -287,7 +129,6 @@ public: void outbox_push(osd_op_t *cur_op); std::function exec_op; std::function repeer_pgs; - void handle_peer_epoll(int peer_fd, int epoll_events); void read_requests(); void send_replies(); void accept_connections(int listen_fd); @@ -296,6 +137,7 @@ public: protected: void try_connect_peer(uint64_t osd_num); void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); + void handle_peer_epoll(int peer_fd, int epoll_events); void handle_connect_epoll(int peer_fd); void on_connect_peer(osd_num_t peer_osd, int peer_fd); void check_peer_config(osd_client_t *cl); diff --git a/src/msgr_op.cpp b/src/msgr_op.cpp new file mode 100644 index 00000000..40e8a083 --- /dev/null +++ b/src/msgr_op.cpp @@ -0,0 +1,22 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#include + +#include "msgr_op.h" + +osd_op_t::~osd_op_t() +{ + assert(!bs_op); + assert(!op_data); + if (rmw_buf) + { + free(rmw_buf); + } + if (buf) + { + // Note: reusing osd_op_t WILL currently lead to memory leaks + // So we don't reuse it, but free it every time + free(buf); + } +} diff --git a/src/msgr_op.h b/src/msgr_op.h new file mode 100644 index 00000000..2ad72d48 --- /dev/null +++ b/src/msgr_op.h @@ -0,0 +1,170 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#pragma once + +#include +#include +#include +#include + +#include "osd_ops.h" + +#define OSD_OP_IN 0 +#define OSD_OP_OUT 1 + +#define OSD_OP_INLINE_BUF_COUNT 16 + +// Kind of a vector with small-list-optimisation +struct osd_op_buf_list_t +{ + int count = 0, alloc = OSD_OP_INLINE_BUF_COUNT, done = 0; + iovec *buf = NULL; + iovec inline_buf[OSD_OP_INLINE_BUF_COUNT]; + + inline osd_op_buf_list_t() + { + buf = inline_buf; + } + + inline osd_op_buf_list_t(const osd_op_buf_list_t & other) + { + buf = inline_buf; + append(other); + } + + inline osd_op_buf_list_t & operator = (const osd_op_buf_list_t & other) + { + reset(); + append(other); + return *this; + } + + inline ~osd_op_buf_list_t() + { + if (buf && buf != inline_buf) + { + free(buf); + } + } + + inline void reset() + { + count = 0; + done = 0; + } + + inline iovec* get_iovec() + { + return buf + done; + } + + inline int get_size() + { + return count - done; + } + + inline void append(const osd_op_buf_list_t & other) + { + if (count+other.count > alloc) + { + if (buf == inline_buf) + { + int old = alloc; + alloc = (((count+other.count+15)/16)*16); + buf = (iovec*)malloc(sizeof(iovec) * alloc); + if (!buf) + { + printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); + exit(1); + } + memcpy(buf, inline_buf, sizeof(iovec) * old); + } + else + { + alloc = (((count+other.count+15)/16)*16); + buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); + if (!buf) + { + printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); + exit(1); + } + } + } + for (int i = 0; i < other.count; i++) + { + buf[count++] = other.buf[i]; + } + } + + inline void push_back(void *nbuf, size_t len) + { + if (count >= alloc) + { + if (buf == inline_buf) + { + int old = alloc; + alloc = ((alloc/16)*16 + 1); + buf = (iovec*)malloc(sizeof(iovec) * alloc); + if (!buf) + { + printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); + exit(1); + } + memcpy(buf, inline_buf, sizeof(iovec)*old); + } + else + { + alloc = alloc < 16 ? 16 : (alloc+16); + buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); + if (!buf) + { + printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc); + exit(1); + } + } + } + buf[count++] = { .iov_base = nbuf, .iov_len = len }; + } + + inline void eat(int result) + { + while (result > 0 && done < count) + { + iovec & iov = buf[done]; + if (iov.iov_len <= result) + { + result -= iov.iov_len; + done++; + } + else + { + iov.iov_len -= result; + iov.iov_base += result; + break; + } + } + } +}; + +struct blockstore_op_t; + +struct osd_primary_op_data_t; + +struct osd_op_t +{ + timespec tv_begin; + uint64_t op_type = OSD_OP_IN; + int peer_fd; + osd_any_op_t req; + osd_any_reply_t reply; + blockstore_op_t *bs_op = NULL; + void *buf = NULL; + void *rmw_buf = NULL; + osd_primary_op_data_t* op_data = NULL; + std::function callback; + + osd_op_buf_list_t iov; + + ~osd_op_t(); +};