Browse Source

Fix client connection recovery bugs, add dirty_ops limit

rel-0.5
Vitaliy Filippov 2 years ago
parent
commit
f6d705383a
  1. 134
      src/cluster_client.cpp
  2. 13
      src/cluster_client.h
  3. 23
      src/messenger.cpp
  4. 9
      src/msgr_stop.cpp
  5. 47
      src/test_cluster_client.cpp
  6. 4
      src/timerfd_manager.cpp

134
src/cluster_client.cpp

@ -5,6 +5,9 @@
#include <assert.h>
#include "cluster_client.h"
#define PART_SENT 1
#define PART_DONE 2
#define PART_ERROR 4
#define CACHE_DIRTY 1
#define CACHE_FLUSHING 2
#define CACHE_REPEATING 4
@ -30,7 +33,6 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
{
// peer_osd just dropped connection
// determine WHICH dirty_buffers are now obsolete and repeat them
dirty_osds.erase(peer_osd);
for (auto & wr: dirty_buffers)
{
if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) &&
@ -97,49 +99,42 @@ void cluster_client_t::continue_ops(bool up_retry)
int j = 0;
for (int i = 0; i < op_queue.size(); i++)
{
bool rm = false;
bool rm = false, is_flush = op_queue[i]->flags & OP_FLUSH_BUFFER;
auto opcode = op_queue[i]->opcode;
if (!op_queue[i]->up_wait || up_retry)
{
op_queue[i]->up_wait = false;
if (op_queue[i]->opcode == OSD_OP_READ)
if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE)
{
rm = continue_rw(op_queue[i]);
}
else if (op_queue[i]->opcode == OSD_OP_WRITE)
{
if (op_queue[i]->flags & OP_FLUSH_BUFFER)
if (is_flush || !has_flushes)
{
// Regular writes can't proceed before buffer flushes
rm = continue_rw(op_queue[i]);
if (!rm)
{
// Regular writes can't proceed before buffer flushes
has_flushes = true;
}
}
else if (!has_flushes)
{
rm = continue_rw(op_queue[i]);
}
if (!rm)
{
has_writes = true;
}
}
else if (op_queue[i]->opcode == OSD_OP_SYNC)
else if (opcode == OSD_OP_SYNC)
{
if (!has_writes)
{
// SYNC can't proceed before previous writes
rm = continue_sync(op_queue[i]);
if (!rm)
{
// Postpone writes until previous SYNC completes
// ...so dirty_writes can't contain anything newer than SYNC
has_flushes = true;
}
}
}
}
if (opcode == OSD_OP_WRITE)
{
has_writes = has_writes || !rm;
if (is_flush)
{
has_flushes = has_writes || !rm;
}
}
else if (opcode == OSD_OP_SYNC)
{
// Postpone writes until previous SYNC completes
// ...so dirty_writes can't contain anything newer than SYNC
has_flushes = has_writes || !rm;
}
if (!rm)
{
op_queue[j++] = op_queue[i];
@ -185,13 +180,26 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
// Cluster-wide immediate_commit mode
immediate_commit = true;
}
if (config.find("client_max_dirty_bytes") != config.end())
{
client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value();
}
else if (config.find("client_dirty_limit") != config.end())
{
client_dirty_limit = config["client_dirty_limit"].uint64_value();
// Old name
client_max_dirty_bytes = config["client_dirty_limit"].uint64_value();
}
if (!client_dirty_limit)
if (config.find("client_max_dirty_ops") != config.end())
{
client_dirty_limit = DEFAULT_CLIENT_DIRTY_LIMIT;
client_max_dirty_ops = config["client_max_dirty_ops"].uint64_value();
}
if (!client_max_dirty_bytes)
{
client_max_dirty_bytes = DEFAULT_CLIENT_MAX_DIRTY_BYTES;
}
if (!client_max_dirty_ops)
{
client_max_dirty_ops = DEFAULT_CLIENT_MAX_DIRTY_OPS;
}
up_wait_retry_interval = config["up_wait_retry_interval"].uint64_value();
if (!up_wait_retry_interval)
@ -306,7 +314,7 @@ void cluster_client_t::execute(cluster_op_t *op)
op->retval = 0;
if (op->opcode == OSD_OP_WRITE && !immediate_commit)
{
if (dirty_bytes >= client_dirty_limit)
if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops)
{
// Push an extra SYNC operation to flush previous writes
cluster_op_t *sync_op = new cluster_op_t;
@ -317,12 +325,15 @@ void cluster_client_t::execute(cluster_op_t *op)
};
op_queue.push_back(sync_op);
dirty_bytes = 0;
dirty_ops = 0;
}
dirty_bytes += op->len;
dirty_ops++;
}
else if (op->opcode == OSD_OP_SYNC)
{
dirty_bytes = 0;
dirty_ops = 0;
}
op_queue.push_back(op);
continue_ops();
@ -457,7 +468,7 @@ resume_0:
}
if (op->opcode == OSD_OP_WRITE)
{
if (!immediate_commit)
if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER))
{
copy_write(op, dirty_buffers);
}
@ -469,13 +480,33 @@ resume_1:
resume_2:
// Send unsent parts, if they're not subject to change
op->state = 3;
if (op->needs_reslice)
{
for (int i = 0; i < op->parts.size(); i++)
{
if (!(op->parts[i].flags & PART_SENT) && op->retval)
{
op->retval = -EPIPE;
}
}
goto resume_3;
}
for (int i = 0; i < op->parts.size(); i++)
{
if (!op->parts[i].sent && !op->parts[i].done)
if (!(op->parts[i].flags & PART_SENT))
{
if (!try_send(op, i))
{
// We'll need to retry again
op->up_wait = true;
if (!retry_timeout_id)
{
retry_timeout_id = tfd->set_timer(up_wait_retry_interval, false, [this](int)
{
retry_timeout_id = 0;
continue_ops(true);
});
}
op->state = 2;
}
}
@ -485,7 +516,7 @@ resume_2:
return 0;
}
resume_3:
if (op->sent_count > 0)
if (op->inflight_count > 0)
{
op->state = 3;
return 0;
@ -517,6 +548,10 @@ resume_3:
}
else
{
for (int i = 0; i < op->parts.size(); i++)
{
op->parts[i].flags = 0;
}
goto resume_2;
}
}
@ -548,8 +583,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
.offset = begin,
.len = (uint32_t)(end - begin),
.pg_num = pg_num,
.sent = false,
.done = false,
.flags = 0,
};
int left = end-begin;
while (left > 0 && iov_idx < op->iov.count)
@ -606,8 +640,8 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i)
{
int peer_fd = peer_it->second;
part->osd_num = primary_osd;
part->sent = true;
op->sent_count++;
part->flags |= PART_SENT;
op->inflight_count++;
part->op = (osd_op_t){
.op_type = OSD_OP_OUT,
.peer_fd = peer_fd,
@ -675,8 +709,7 @@ int cluster_client_t::continue_sync(cluster_op_t *op)
op->parts[i] = {
.parent = op,
.osd_num = sync_osd,
.sent = false,
.done = false,
.flags = 0,
};
send_sync(op, &op->parts[i]);
i++;
@ -684,7 +717,7 @@ int cluster_client_t::continue_sync(cluster_op_t *op)
}
dirty_osds.clear();
resume_1:
if (op->sent_count > 0)
if (op->inflight_count > 0)
{
op->state = 1;
return 0;
@ -703,7 +736,7 @@ resume_1:
// Retry later
op->parts.clear();
op->retval = 0;
op->sent_count = 0;
op->inflight_count = 0;
op->done_count = 0;
op->state = 0;
return 0;
@ -730,8 +763,8 @@ void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part)
{
auto peer_it = msgr.osd_peer_fds.find(part->osd_num);
assert(peer_it != msgr.osd_peer_fds.end());
part->sent = true;
op->sent_count++;
part->flags |= PART_SENT;
op->inflight_count++;
part->op = (osd_op_t){
.op_type = OSD_OP_OUT,
.peer_fd = peer_it->second,
@ -753,8 +786,7 @@ void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part)
void cluster_client_t::handle_op_part(cluster_op_part_t *part)
{
cluster_op_t *op = part->parent;
part->sent = false;
op->sent_count--;
op->inflight_count--;
int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len;
if (part->op.reply.hdr.retval != expected)
{
@ -763,9 +795,9 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
"%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n",
osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected
);
msgr.stop_client(part->op.peer_fd);
if (part->op.reply.hdr.retval == -EPIPE)
{
// Mark op->up_wait = true before stopping the client
op->up_wait = true;
if (!retry_timeout_id)
{
@ -781,15 +813,17 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
// Don't overwrite other errors with -EPIPE
op->retval = part->op.reply.hdr.retval;
}
msgr.stop_client(part->op.peer_fd);
part->flags |= PART_ERROR;
}
else
{
// OK
dirty_osds.insert(part->osd_num);
part->done = true;
part->flags |= PART_DONE;
op->done_count++;
}
if (op->sent_count == 0)
if (op->inflight_count == 0)
{
continue_ops();
}

13
src/cluster_client.h

@ -10,7 +10,8 @@
#define MAX_BLOCK_SIZE 128*1024*1024
#define DEFAULT_DISK_ALIGNMENT 4096
#define DEFAULT_BITMAP_GRANULARITY 4096
#define DEFAULT_CLIENT_DIRTY_LIMIT 32*1024*1024
#define DEFAULT_CLIENT_MAX_DIRTY_BYTES 32*1024*1024
#define DEFAULT_CLIENT_MAX_DIRTY_OPS 1024
struct cluster_op_t;
@ -22,8 +23,7 @@ struct cluster_op_part_t
pg_num_t pg_num;
osd_num_t osd_num;
osd_op_buf_list_t iov;
bool sent;
bool done;
unsigned flags;
osd_op_t op;
};
@ -43,7 +43,7 @@ protected:
cluster_op_t *orig_op = NULL;
bool needs_reslice = false;
bool up_wait = false;
int sent_count = 0, done_count = 0;
int inflight_count = 0, done_count = 0;
std::vector<cluster_op_part_t> parts;
friend class cluster_client_t;
};
@ -66,7 +66,8 @@ class cluster_client_t
std::map<pool_id_t, uint64_t> pg_counts;
bool immediate_commit = false;
// FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
uint64_t client_dirty_limit = 0;
uint64_t client_max_dirty_bytes = 0;
uint64_t client_max_dirty_ops = 0;
int log_level;
int up_wait_retry_interval = 500; // ms
@ -76,7 +77,7 @@ class cluster_client_t
std::deque<cluster_op_t*> op_queue;
std::map<object_id, cluster_buffer_t> dirty_buffers;
std::set<osd_num_t> dirty_osds;
uint64_t dirty_bytes = 0;
uint64_t dirty_bytes = 0, dirty_ops = 0;
bool pgs_loaded = false;
ring_consumer_t consumer;

23
src/messenger.cpp

@ -180,23 +180,12 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
on_connect_peer(peer_osd, -errno);
return;
}
int timeout_id = -1;
if (peer_connect_timeout > 0)
{
timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id)
{
osd_num_t peer_osd = clients.at(peer_fd)->osd_num;
stop_client(peer_fd, true);
on_connect_peer(peer_osd, -EIO);
return;
});
}
clients[peer_fd] = new osd_client_t((osd_client_t){
.peer_addr = addr,
.peer_port = peer_port,
.peer_fd = peer_fd,
.peer_state = PEER_CONNECTING,
.connect_timeout_id = timeout_id,
.connect_timeout_id = -1,
.osd_num = peer_osd,
.in_buf = malloc_or_die(receive_buffer_size),
});
@ -205,6 +194,16 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
// Either OUT (connected) or HUP
handle_connect_epoll(peer_fd);
});
if (peer_connect_timeout > 0)
{
clients[peer_fd]->connect_timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id)
{
osd_num_t peer_osd = clients.at(peer_fd)->osd_num;
stop_client(peer_fd, true);
on_connect_peer(peer_osd, -EIO);
return;
});
}
}
void osd_messenger_t::handle_connect_epoll(int peer_fd)

9
src/msgr_stop.cpp

@ -8,12 +8,19 @@
void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
{
std::vector<osd_op_t*> cancel_ops;
cancel_ops.resize(cl->sent_ops.size());
int i = 0;
for (auto p: cl->sent_ops)
{
cancel_op(p.second);
cancel_ops[i++] = p.second;
}
cl->sent_ops.clear();
cl->outbox.clear();
for (auto op: cancel_ops)
{
cancel_op(op);
}
}
void osd_messenger_t::cancel_op(osd_op_t *op)

47
src/test_cluster_client.cpp

@ -124,6 +124,15 @@ void pretend_disconnected(cluster_client_t *cli, osd_num_t osd_num)
cli->msgr.stop_client(cli->msgr.osd_peer_fds.at(osd_num));
}
void check_disconnected(cluster_client_t *cli, osd_num_t osd_num)
{
if (cli->msgr.osd_peer_fds.find(osd_num) != cli->msgr.osd_peer_fds.end())
{
printf("OSD %lu not disconnected as it ought to be\n", osd_num);
assert(0);
}
}
void check_op_count(cluster_client_t *cli, osd_num_t osd_num, int ops)
{
int peer_fd = cli->msgr.osd_peer_fds.at(osd_num);
@ -152,20 +161,20 @@ osd_op_t *find_op(cluster_client_t *cli, osd_num_t osd_num, uint64_t opcode, uin
return NULL;
}
void pretend_op_completed(cluster_client_t *cli, osd_op_t *op, int retval)
void pretend_op_completed(cluster_client_t *cli, osd_op_t *op, int64_t retval)
{
assert(op);
printf("Pretend completed %s %lx+%x\n", op->req.hdr.opcode == OSD_OP_SYNC
? "sync" : (op->req.hdr.opcode == OSD_OP_WRITE ? "write" : "read"), op->req.rw.offset, op->req.rw.len);
uint64_t op_id = op->req.hdr.id;
int peer_fd = op->peer_fd;
cli->msgr.clients[peer_fd]->sent_ops.erase(op_id);
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
op->reply.hdr.id = op->req.hdr.id;
op->reply.hdr.opcode = op->req.hdr.opcode;
op->reply.hdr.retval = retval < 0 ? retval : (op->req.hdr.opcode == OSD_OP_SYNC ? 0 : op->req.rw.len);
// Copy lambda to be unaffected by `delete op`
std::function<void(osd_op_t*)>(op->callback)(op);
cli->msgr.clients[peer_fd]->sent_ops.erase(op_id);
}
void test1()
@ -177,6 +186,7 @@ void test1()
int *r1 = test_write(cli, 0, 4096, 0x55);
configure_single_pg_pool(cli);
pretend_connected(cli, 1);
cli->continue_ops(true);
can_complete(r1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
@ -184,6 +194,8 @@ void test1()
pretend_disconnected(cli, 1);
int *r2 = test_sync(cli);
pretend_connected(cli, 1);
check_op_count(cli, 1, 0);
cli->continue_ops(true);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
check_op_count(cli, 1, 1);
@ -226,8 +238,8 @@ void test1()
r1 = test_write(cli, 0x10000, 0x4000, 0x58);
pretend_disconnected(cli, 1);
cli->continue_ops(true);
pretend_connected(cli, 1);
cli->continue_ops(true);
// Check replay
{
@ -260,8 +272,11 @@ void test1()
assert(offset == op->req.rw.offset+op->req.rw.len);
replay_ops.push_back(op);
}
assert(replay_start == 0);
assert(replay_end == 0x14000);
if (replay_start != 0 || replay_end != 0x14000)
{
printf("Write replay: range mismatch: %lx-%lx\n", replay_start, replay_end);
assert(0);
}
for (auto op: replay_ops)
{
pretend_op_completed(cli, op, 0);
@ -273,6 +288,28 @@ void test1()
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x10000, 0x4000), 0);
check_completed(r1);
check_op_count(cli, 1, 0);
// Check sync
r2 = test_sync(cli);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_completed(r2);
// Check disconnect during write
r1 = test_write(cli, 0, 4096, 0x59);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), -EPIPE);
check_disconnected(cli, 1);
pretend_connected(cli, 1);
check_op_count(cli, 1, 0);
cli->continue_ops(true);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_op_count(cli, 1, 1);
can_complete(r1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_completed(r1);
// Free client
delete cli;
delete tfd;

4
src/timerfd_manager.cpp

@ -121,7 +121,7 @@ again:
exp.it_value.tv_sec--;
exp.it_value.tv_nsec += 1000000000;
}
if (exp.it_value.tv_sec < 0 || !exp.it_value.tv_sec && !exp.it_value.tv_nsec)
if (exp.it_value.tv_sec < 0 || exp.it_value.tv_sec == 0 && exp.it_value.tv_nsec <= 0)
{
// It already happened
trigger_nearest();
@ -159,6 +159,6 @@ void timerfd_manager_t::trigger_nearest()
{
timers.erase(timers.begin()+nearest, timers.begin()+nearest+1);
}
cb(nearest_id);
nearest = -1;
cb(nearest_id);
}

Loading…
Cancel
Save