Add PG structures, begin peer connection handling code

blocking-uring-test
Vitaliy Filippov 2019-12-26 14:06:03 +03:00
parent 8a386270bd
commit 3134b7729a
7 changed files with 323 additions and 77 deletions

View File

@ -20,5 +20,5 @@ test_allocator: test_allocator.cpp allocator.o
g++ $(CXXFLAGS) -o test_allocator test_allocator.cpp allocator.o
libfio_blockstore.so: fio_engine.cpp $(BLOCKSTORE_OBJS)
g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -luring -o libfio_blockstore.so fio_engine.cpp $(BLOCKSTORE_OBJS)
libfio_sec_osd.so: fio_sec_osd.cpp
libfio_sec_osd.so: fio_sec_osd.cpp osd_ops.h
g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -luring -o libfio_sec_osd.so fio_sec_osd.cpp

View File

@ -154,7 +154,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
union
{
osd_any_op_t op;
uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 };
uint8_t op_buf[OSD_PACKET_SIZE] = { 0 };
};
op.hdr.magic = SECONDARY_OSD_OP_MAGIC;
@ -195,7 +195,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
bsd->op_n++;
bsd->queue[n] = io;
if (write(bsd->connect_fd, op_buf, OSD_OP_PACKET_SIZE) != OSD_OP_PACKET_SIZE)
if (write(bsd->connect_fd, op_buf, OSD_PACKET_SIZE) != OSD_PACKET_SIZE)
{
perror("write");
exit(1);
@ -249,11 +249,11 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
union
{
osd_any_reply_t reply;
uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 };
uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 };
};
while (bsd->completed.size() < min)
{
read_blocking(bsd->connect_fd, reply_buf, OSD_REPLY_PACKET_SIZE);
read_blocking(bsd->connect_fd, reply_buf, OSD_PACKET_SIZE);
if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC)
{
fprintf(stderr, "bad reply: magic = %lx instead of %lx\n", reply.hdr.magic, SECONDARY_OSD_REPLY_MAGIC);

254
osd.cpp
View File

@ -10,11 +10,14 @@
#define CL_READ_OP 1
#define CL_READ_DATA 2
#define CL_READ_REPLY_DATA 3
#define SQE_SENT 0x100l
#define CL_WRITE_READY 1
#define CL_WRITE_REPLY 2
#define CL_WRITE_DATA 3
// FIXME: Split into more files
osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop)
{
bind_address = config["bind_address"];
@ -91,7 +94,8 @@ osd_op_t::~osd_op_t()
if (buf)
{
// Note: reusing osd_op_t WILL currently lead to memory leaks
if (op.hdr.opcode == OSD_OP_SHOW_CONFIG)
if (op_type == OSD_OP_IN &&
op.hdr.opcode == OSD_OP_SHOW_CONFIG)
{
std::string *str = (std::string*)buf;
delete str;
@ -252,10 +256,16 @@ void osd_t::read_requests()
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.read_buf)
{
// no reads in progress, so this is probably a new command
cl.read_op = new osd_op_t;
// no reads in progress
// so this is either a new command or a reply to a previously sent command
if (!cl.read_op)
{
cl.read_op = new osd_op_t;
cl.read_op->peer_fd = peer_fd;
}
cl.read_op->op_type = OSD_OP_IN;
cl.read_buf = &cl.read_op->op_buf;
cl.read_remaining = OSD_OP_PACKET_SIZE;
cl.read_remaining = OSD_PACKET_SIZE;
cl.read_state = CL_READ_OP;
}
cl.read_iov.iov_base = cl.read_buf;
@ -294,55 +304,120 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
cl.read_buf += data->res;
if (cl.read_remaining <= 0)
{
osd_op_t *cur_op = cl.read_op;
cl.read_buf = NULL;
if (cl.read_state == CL_READ_OP)
{
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
if (cl.read_op->op.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
{
// Allocate a buffer
cur_op->buf = memalign(512, cur_op->op.sec_rw.len);
}
else if (cur_op->op.hdr.opcode == OSD_OP_READ ||
cur_op->op.hdr.opcode == OSD_OP_WRITE)
{
cur_op->buf = memalign(512, cur_op->op.rw.len);
}
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ||
cur_op->op.hdr.opcode == OSD_OP_WRITE)
{
// Read data
cl.read_buf = cur_op->buf;
cl.read_remaining = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE
? cur_op->op.sec_rw.len
: cur_op->op.rw.len);
cl.read_state = CL_READ_DATA;
handle_read_reply(&cl);
}
else
{
// Command is ready
cur_op->peer_fd = peer_fd;
enqueue_op(cur_op);
cl.read_op = NULL;
cl.read_state = 0;
handle_read_op(&cl);
}
}
else if (cl.read_state == CL_READ_DATA)
{
// Command is ready
cur_op->peer_fd = peer_fd;
enqueue_op(cur_op);
// Operation is ready
enqueue_op(cl.read_op);
cl.read_op = NULL;
cl.read_state = 0;
}
else if (cl.read_state == CL_READ_REPLY_DATA)
{
// Reply is ready
auto req_it = cl.sent_ops.find(cl.read_reply_id);
osd_op_t *request = req_it->second;
cl.sent_ops.erase(req_it);
cl.read_reply_id = 0;
cl.read_state = 0;
handle_reply(request);
}
}
}
}
}
void osd_t::handle_read_op(osd_client_t *cl)
{
osd_op_t *cur_op = cl->read_op;
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
{
// Allocate a buffer
cur_op->buf = memalign(512, cur_op->op.sec_rw.len);
}
else if (cur_op->op.hdr.opcode == OSD_OP_READ ||
cur_op->op.hdr.opcode == OSD_OP_WRITE)
{
cur_op->buf = memalign(512, cur_op->op.rw.len);
}
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ||
cur_op->op.hdr.opcode == OSD_OP_WRITE)
{
// Read data
cl->read_buf = cur_op->buf;
cl->read_remaining = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE
? cur_op->op.sec_rw.len
: cur_op->op.rw.len);
cl->read_state = CL_READ_DATA;
}
else
{
// Operation is ready
cl->read_op = NULL;
cl->read_state = 0;
enqueue_op(cur_op);
}
}
void osd_t::handle_read_reply(osd_client_t *cl)
{
osd_op_t *cur_op = cl->read_op;
auto req_it = cl->sent_ops.find(cur_op->op.hdr.id);
if (req_it == cl->sent_ops.end())
{
// Command out of sync. Drop connection
// FIXME This is probably a peer, so handle all previously sent operations carefully
stop_client(cl->peer_fd);
return;
}
osd_op_t *request = req_it->second;
memcpy(request->reply_buf, cur_op->op_buf, OSD_PACKET_SIZE);
if (request->reply.hdr.opcode == OSD_OP_SECONDARY_READ &&
request->reply.hdr.retval > 0)
{
// Read data
// FIXME: request->buf must be allocated
cl->read_state = CL_READ_REPLY_DATA;
cl->read_reply_id = request->op.hdr.id;
cl->read_buf = request->buf;
cl->read_remaining = request->reply.hdr.retval;
}
else if (request->reply.hdr.opcode == OSD_OP_SECONDARY_LIST &&
request->reply.hdr.retval > 0)
{
request->buf = memalign(512, sizeof(obj_ver_id) * request->reply.hdr.retval);
cl->read_state = CL_READ_REPLY_DATA;
cl->read_reply_id = request->op.hdr.id;
cl->read_buf = request->buf;
cl->read_remaining = sizeof(obj_ver_id) * request->reply.hdr.retval;
}
else
{
cl->read_state = 0;
cl->sent_ops.erase(req_it);
handle_reply(request);
}
}
void osd_t::handle_reply(osd_op_t *cur_op)
{
}
void osd_t::secondary_op_callback(osd_op_t *cur_op)
{
inflight_ops--;
@ -355,6 +430,7 @@ void osd_t::secondary_op_callback(osd_op_t *cur_op)
cl.write_state = CL_WRITE_READY;
write_ready_clients.push_back(cur_op->peer_fd);
}
make_reply(cur_op);
cl.completions.push_back(cur_op);
ringloop->wakeup();
}
@ -438,10 +514,24 @@ void osd_t::enqueue_op(osd_op_t *cur_op)
auto & cl = clients[cur_op->peer_fd];
cl.write_state = CL_WRITE_READY;
write_ready_clients.push_back(cur_op->peer_fd);
make_reply(cur_op);
cl.completions.push_back(cur_op);
ringloop->wakeup();
return;
}
else if (cur_op->op.hdr.opcode == OSD_OP_READ)
{
// Primary OSD also works with individual stripes, but they're twice the size of the blockstore's stripe
// - convert offset & len to stripe number
// - fail operation if offset & len span multiple stripes
// - calc stripe hash and determine PG
// - check if this is our PG
// - redirect or fail operation if not
// - determine whether we need to read A and B or just A or just B or A + parity or B + parity
// and determine read ranges for both objects
// - send read requests
// - reconstruct result
}
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
cur_op->bs_op.opcode = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? BS_OP_READ
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? BS_OP_WRITE
@ -466,7 +556,7 @@ void osd_t::enqueue_op(osd_op_t *cur_op)
}
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
{
cur_op->bs_op.len = cur_op->op.sec_stabilize.len/sizeof(obj_ver_id);
cur_op->bs_op.len = cur_op->op.sec_stab.len/sizeof(obj_ver_id);
cur_op->bs_op.buf = cur_op->buf;
}
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST)
@ -495,10 +585,18 @@ void osd_t::send_replies()
// pick next command
cl.write_op = cl.completions.front();
cl.completions.pop_front();
make_reply(cl.write_op);
cl.write_buf = &cl.write_op->reply_buf;
cl.write_remaining = OSD_REPLY_PACKET_SIZE;
cl.write_state = CL_WRITE_REPLY;
if (cl.write_op->op_type == OSD_OP_OUT)
{
cl.write_buf = &cl.write_op->op_buf;
cl.write_remaining = OSD_PACKET_SIZE;
cl.write_state = CL_WRITE_REPLY;
}
else
{
cl.write_buf = &cl.write_op->reply_buf;
cl.write_remaining = OSD_PACKET_SIZE;
cl.write_state = CL_WRITE_REPLY;
}
}
cl.write_iov.iov_base = cl.write_buf;
cl.write_iov.iov_len = cl.write_remaining;
@ -515,6 +613,7 @@ void osd_t::make_reply(osd_op_t *op)
{
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
op->reply.hdr.id = op->op.hdr.id;
op->reply.hdr.opcode = op->op.hdr.opcode;
if (op->op.hdr.opcode == OSD_OP_SHOW_CONFIG)
{
std::string *str = (std::string*)op->buf;
@ -553,37 +652,72 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd)
if (cl.write_state == CL_WRITE_REPLY)
{
// Send data
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ &&
cur_op->reply.hdr.retval > 0)
if (cur_op->op_type == OSD_OP_IN)
{
cl.write_buf = cur_op->buf;
cl.write_remaining = cur_op->reply.hdr.retval;
cl.write_state = CL_WRITE_DATA;
}
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST &&
cur_op->reply.hdr.retval > 0)
{
cl.write_buf = cur_op->buf;
cl.write_remaining = cur_op->reply.hdr.retval * sizeof(obj_ver_id);
cl.write_state = CL_WRITE_DATA;
}
else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG &&
cur_op->reply.hdr.retval > 0)
{
cl.write_buf = (void*)((std::string*)cur_op->buf)->c_str();
cl.write_remaining = cur_op->reply.hdr.retval;
cl.write_state = CL_WRITE_DATA;
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ &&
cur_op->reply.hdr.retval > 0)
{
cl.write_buf = cur_op->buf;
cl.write_remaining = cur_op->reply.hdr.retval;
cl.write_state = CL_WRITE_DATA;
}
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST &&
cur_op->reply.hdr.retval > 0)
{
cl.write_buf = cur_op->buf;
cl.write_remaining = cur_op->reply.hdr.retval * sizeof(obj_ver_id);
cl.write_state = CL_WRITE_DATA;
}
else if (cur_op->op.hdr.opcode == OSD_OP_SHOW_CONFIG &&
cur_op->reply.hdr.retval > 0)
{
cl.write_buf = (void*)((std::string*)cur_op->buf)->c_str();
cl.write_remaining = cur_op->reply.hdr.retval;
cl.write_state = CL_WRITE_DATA;
}
else
{
goto op_done;
}
}
else
{
goto op_done;
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE)
{
cl.write_buf = cur_op->buf;
cl.write_remaining = cur_op->op.sec_rw.len;
cl.write_state = CL_WRITE_DATA;
}
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE)
{
cl.write_buf = cur_op->buf;
cl.write_remaining = cur_op->op.sec_stab.len;
cl.write_state = CL_WRITE_DATA;
}
else if (cur_op->op.hdr.opcode == OSD_OP_WRITE)
{
cl.write_buf = cur_op->buf;
cl.write_remaining = cur_op->op.rw.len;
cl.write_state = CL_WRITE_DATA;
}
else
{
goto op_done;
}
}
}
else if (cl.write_state == CL_WRITE_DATA)
{
op_done:
// Done
delete cur_op;
if (cur_op->op_type == OSD_OP_IN)
{
delete cur_op;
}
else
{
cl.sent_ops[cl.write_op->op.hdr.id] = cl.write_op;
}
cl.write_op = NULL;
cl.write_state = cl.completions.size() > 0 ? CL_WRITE_READY : 0;
}

78
osd.h
View File

@ -16,21 +16,27 @@
#include "ringloop.h"
#include "osd_ops.h"
#include "sparsepp/sparsepp/spp.h"
#define STRIPE_NUM(stripe) ((stripe) >> 4)
#define STRIPE_REPLICA(stripe) ((stripe) & 0xf)
#define OSD_OP_IN 0
#define OSD_OP_OUT 1
struct osd_op_t
{
int op_type;
int peer_fd;
union
{
osd_any_op_t op;
uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 };
uint8_t op_buf[OSD_PACKET_SIZE] = { 0 };
};
union
{
osd_any_reply_t reply;
uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 };
uint8_t reply_buf[OSD_PACKET_SIZE] = { 0 };
};
blockstore_op_t bs_op;
void *buf = NULL;
@ -49,12 +55,16 @@ struct osd_client_t
bool read_ready = false;
bool reading = false;
osd_op_t *read_op = NULL;
int read_reply_id = 0;
iovec read_iov;
msghdr read_msg;
void *read_buf = NULL;
int read_remaining = 0;
int read_state = 0;
// Outbound operations sent to this client (which is probably an OSD peer)
std::map<int, osd_op_t*> sent_ops;
// Completed operations to send replies back to the client
std::deque<osd_op_t*> completions;
@ -67,17 +77,71 @@ struct osd_client_t
int write_state = 0;
};
struct osd_pg_role_t
{
int role;
uint64_t osd_num;
};
typedef std::vector<osd_pg_role_t> osd_acting_set_t;
namespace std
{
template<> struct hash<osd_acting_set_t>
{
inline size_t operator()(const osd_acting_set_t &s) const
{
size_t seed = 0;
for (int i = 0; i < s.size(); i++)
{
// Copy-pasted from spp::hash_combine()
seed ^= (s[i].role + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
seed ^= (s[i].osd_num + 0xc6a4a7935bd1e995 + (seed << 6) + (seed >> 2));
}
return seed;
}
};
}
#define PG_ST_OFFLINE 1
#define PG_ST_PEERING 2
#define PG_ST_INCOMPLETE 3
#define PG_ST_DEGRADED 4
#define PG_ST_MISPLACED 5
#define PG_ST_ACTIVE 6
struct osd_pg_t
{
int state;
unsigned num;
std::vector<osd_pg_role_t> target_set;
// moved object map. by default, each object is considered to reside on the target_set.
// this map stores all objects that differ.
// this map may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario
// which is up to ~192 MB per 1 TB in the worst case scenario
std::unordered_map<osd_acting_set_t, int> acting_set_ids;
std::map<int, osd_acting_set_t> acting_sets;
spp::sparse_hash_map<object_id, int> object_map;
};
class osd_t
{
// config
uint64_t osd_num = 0;
blockstore_config_t config;
std::string bind_address;
int bind_port, listen_backlog;
int client_queue_depth = 128;
bool allow_test_ops = true;
// fields
// peer OSDs
std::map<uint64_t, int> osd_peer_fds;
std::vector<osd_pg_t> pgs;
unsigned pg_count;
// client & peer I/O
bool stopping = false;
int inflight_ops = 0;
@ -93,15 +157,21 @@ class osd_t
std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients;
// methods
void loop();
int handle_epoll_events();
void stop_client(int peer_fd);
void read_requests();
void handle_read(ring_data_t *data, int peer_fd);
void enqueue_op(osd_op_t *cur_op);
void handle_read_op(osd_client_t *cl);
void handle_read_reply(osd_client_t *cl);
void send_replies();
void make_reply(osd_op_t *op);
void handle_send(ring_data_t *data, int peer_fd);
void handle_reply(osd_op_t *cur_op);
void enqueue_op(osd_op_t *cur_op);
void secondary_op_callback(osd_op_t *cur_op);
public:
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);

40
osd_client.cpp Normal file
View File

@ -0,0 +1,40 @@
void slice()
{
// Slice the request into blockstore requests to individual objects
// Primary OSD still operates individual stripes, except they're twice the size of the blockstore's stripe.
std::vector read_parts;
int block = bs->get_block_size();
uint64_t stripe1 = cur_op->op.rw.offset / block / 2;
uint64_t stripe2 = (cur_op->op.rw.offset + cur_op->op.rw.len + block*2 - 1) / block / 2 - 1;
for (uint64_t s = stripe1; s <= stripe2; s++)
{
uint64_t start = s == stripe1 ? cur_op->op.rw.offset - stripe1*block*2 : 0;
uint64_t end = s == stripe2 ? cur_op->op.rw.offset + cur_op->op.rw.len - stripe2*block*2 : block*2;
if (start < block)
{
read_parts.push_back({
.role = 1,
.oid = {
.inode = cur_op->op.rw.inode,
.stripe = (s << STRIPE_ROLE_BITS) | 1,
},
.version = UINT64_MAX,
.offset = start,
.len = (block < end ? block : end) - start,
});
}
if (end > block)
{
read_parts.push_back({
.role = 2,
.oid = {
.inode = cur_op->op.rw.inode,
.stripe = (s << STRIPE_ROLE_BITS) | 2,
},
.version = UINT64_MAX,
.offset = (start > block ? start-block : 0),
.len = end - (start > block ? start-block : 0),
});
}
}
}

View File

@ -2,8 +2,8 @@
int main(int narg, char *args[])
{
if (sizeof(osd_any_op_t) >= OSD_OP_PACKET_SIZE ||
sizeof(osd_any_reply_t) >= OSD_REPLY_PACKET_SIZE)
if (sizeof(osd_any_op_t) >= OSD_PACKET_SIZE ||
sizeof(osd_any_reply_t) >= OSD_PACKET_SIZE)
{
perror("BUG: too small packet size");
return 1;

View File

@ -5,9 +5,8 @@
// Magic numbers
#define SECONDARY_OSD_OP_MAGIC 0x2bd7b10325434553l
#define SECONDARY_OSD_REPLY_MAGIC 0xbaa699b87b434553l
// Operation request headers and operation reply headers have fixed size after which comes data
#define OSD_OP_PACKET_SIZE 0x80
#define OSD_REPLY_PACKET_SIZE 0x40
// Operation request / reply headers have fixed size after which comes data
#define OSD_PACKET_SIZE 0x80
// Opcodes
#define OSD_OP_MIN 1
#define OSD_OP_SECONDARY_READ 1
@ -42,6 +41,8 @@ struct __attribute__((__packed__)) osd_reply_header_t
uint64_t magic;
// operation id
uint64_t id;
// operation type
uint64_t opcode;
// return value
int64_t retval;
};
@ -127,10 +128,11 @@ struct __attribute__((__packed__)) osd_reply_secondary_list_t
{
osd_reply_header_t header;
// stable object version count. header.retval = total object version count
// FIXME: maybe change to the number of bytes in the reply...
uint64_t stable_count;
};
// read or write to the primary OSD
// read or write to the primary OSD (must be within individual stripe)
struct __attribute__((__packed__)) osd_op_rw_t
{
osd_op_header_t header;
@ -153,7 +155,7 @@ union osd_any_op_t
osd_op_secondary_rw_t sec_rw;
osd_op_secondary_del_t sec_del;
osd_op_secondary_sync_t sec_sync;
osd_op_secondary_stabilize_t sec_stabilize;
osd_op_secondary_stabilize_t sec_stab;
osd_op_secondary_list_t sec_list;
osd_op_show_config_t show_conf;
osd_op_rw_t rw;
@ -165,7 +167,7 @@ union osd_any_reply_t
osd_reply_secondary_rw_t sec_rw;
osd_reply_secondary_del_t sec_del;
osd_reply_secondary_sync_t sec_sync;
osd_reply_secondary_stabilize_t sec_stabilize;
osd_reply_secondary_stabilize_t sec_stab;
osd_reply_secondary_list_t sec_list;
osd_reply_show_config_t show_conf;
osd_reply_rw_t rw;