Implement regular automatic syncs, split osd_t constructor into some methods

trace-sqes
Vitaliy Filippov 2020-04-02 22:16:46 +03:00
parent 0f43f6d3f6
commit afe2e76c87
5 changed files with 147 additions and 78 deletions

View File

@ -124,6 +124,7 @@ void journal_flusher_t::force_start()
}\
data = ((ring_data_t*)sqe->user_data);
// FIXME: Implement batch flushing
bool journal_flusher_co::loop()
{
// This is much better than implementing the whole function as an FSM

174
osd.cpp
View File

@ -28,49 +28,64 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
this->config = config;
this->bs = bs;
this->ringloop = ringloop;
this->tick_tfd = new timerfd_interval(ringloop, 3, [this]()
{
for (int i = 0; i <= OSD_OP_MAX; i++)
{
if (op_stat_count[i] != 0)
{
printf("avg latency for op %d (%s): %ld us\n", i, osd_op_names[i], op_stat_sum[i]/op_stat_count[i]);
op_stat_count[i] = 0;
op_stat_sum[i] = 0;
}
}
for (int i = 0; i <= OSD_OP_MAX; i++)
{
if (subop_stat_count[i] != 0)
{
printf("avg latency for subop %d (%s): %ld us\n", i, osd_op_names[i], subop_stat_sum[i]/subop_stat_count[i]);
subop_stat_count[i] = 0;
subop_stat_sum[i] = 0;
}
}
if (send_stat_count != 0)
{
printf("avg latency to send stabilize subop: %ld us\n", send_stat_sum/send_stat_count);
send_stat_count = 0;
send_stat_sum = 0;
}
if (incomplete_objects > 0)
{
printf("%lu object(s) incomplete\n", incomplete_objects);
}
if (degraded_objects > 0)
{
printf("%lu object(s) degraded\n", degraded_objects);
}
if (misplaced_objects > 0)
{
printf("%lu object(s) misplaced\n", misplaced_objects);
}
});
this->bs_block_size = bs->get_block_size();
// FIXME: use bitmap granularity instead
this->bs_disk_alignment = bs->get_disk_alignment();
parse_config(config);
bind_socket();
this->stats_tfd = new timerfd_interval(ringloop, 3, [this]()
{
print_stats();
});
if (run_primary)
init_primary();
consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer);
}
osd_t::~osd_t()
{
delete stats_tfd;
if (sync_tfd)
{
delete sync_tfd;
sync_tfd = NULL;
}
ringloop->unregister_consumer(&consumer);
close(epoll_fd);
close(listen_fd);
}
osd_op_t::~osd_op_t()
{
if (bs_op)
{
delete bs_op;
}
if (op_data)
{
free(op_data);
}
if (rmw_buf)
{
free(rmw_buf);
}
if (buf)
{
// Note: reusing osd_op_t WILL currently lead to memory leaks
// So we don't reuse it, but free it every time
free(buf);
}
}
void osd_t::parse_config(blockstore_config_t & config)
{
bind_address = config["bind_address"];
if (bind_address == "")
bind_address = "0.0.0.0";
@ -85,9 +100,13 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
else if (config["immediate_commit"] == "small")
immediate_commit = IMMEDIATE_SMALL;
run_primary = config["run_primary"] == "true" || config["run_primary"] == "1" || config["run_primary"] == "yes";
if (run_primary)
init_primary();
autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10);
if (autosync_interval < 0 || autosync_interval > MAX_AUTOSYNC_INTERVAL)
autosync_interval = DEFAULT_AUTOSYNC_INTERVAL;
}
void osd_t::bind_socket()
{
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
{
@ -136,39 +155,6 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
close(epoll_fd);
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer);
}
osd_t::~osd_t()
{
delete tick_tfd;
ringloop->unregister_consumer(&consumer);
close(epoll_fd);
close(listen_fd);
}
osd_op_t::~osd_op_t()
{
if (bs_op)
{
delete bs_op;
}
if (op_data)
{
free(op_data);
}
if (rmw_buf)
{
free(rmw_buf);
}
if (buf)
{
// Note: reusing osd_op_t WILL currently lead to memory leaks
// So we don't reuse it, but free it every time
free(buf);
}
}
bool osd_t::shutdown()
@ -423,3 +409,43 @@ void osd_t::exec_op(osd_op_t *cur_op)
exec_secondary(cur_op);
}
}
void osd_t::print_stats()
{
for (int i = 0; i <= OSD_OP_MAX; i++)
{
if (op_stat_count[i] != 0)
{
printf("avg latency for op %d (%s): %ld us\n", i, osd_op_names[i], op_stat_sum[i]/op_stat_count[i]);
op_stat_count[i] = 0;
op_stat_sum[i] = 0;
}
}
for (int i = 0; i <= OSD_OP_MAX; i++)
{
if (subop_stat_count[i] != 0)
{
printf("avg latency for subop %d (%s): %ld us\n", i, osd_op_names[i], subop_stat_sum[i]/subop_stat_count[i]);
subop_stat_count[i] = 0;
subop_stat_sum[i] = 0;
}
}
if (send_stat_count != 0)
{
printf("avg latency to send stabilize subop: %ld us\n", send_stat_sum/send_stat_count);
send_stat_count = 0;
send_stat_sum = 0;
}
if (incomplete_objects > 0)
{
printf("%lu object(s) incomplete\n", incomplete_objects);
}
if (degraded_objects > 0)
{
printf("%lu object(s) degraded\n", degraded_objects);
}
if (misplaced_objects > 0)
{
printf("%lu object(s) misplaced\n", misplaced_objects);
}
}

11
osd.h
View File

@ -42,6 +42,9 @@
#define IMMEDIATE_SMALL 1
#define IMMEDIATE_ALL 2
#define MAX_AUTOSYNC_INTERVAL 3600
#define DEFAULT_AUTOSYNC_INTERVAL 5
//#define OSD_STUB
extern const char* osd_op_names[];
@ -191,6 +194,7 @@ class osd_t
bool allow_test_ops = true;
int receive_buffer_size = 9000;
int immediate_commit = IMMEDIATE_NONE;
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
// peer OSDs
@ -201,6 +205,7 @@ class osd_t
unsigned pg_count = 0;
uint64_t next_subop_id = 1;
osd_recovery_state_t recovery_state;
osd_op_t *autosync_op = NULL;
// Unstable writes
std::map<osd_object_id_t, uint64_t> unstable_writes;
@ -214,7 +219,7 @@ class osd_t
uint32_t bs_block_size, bs_disk_alignment;
uint64_t pg_stripe_size = 4*1024*1024; // 4 MB by default
ring_loop_t *ringloop;
timerfd_interval *tick_tfd;
timerfd_interval *stats_tfd = NULL, *sync_tfd = NULL;
int wait_state = 0;
int epoll_fd = 0;
@ -232,6 +237,9 @@ class osd_t
uint64_t send_stat_count = 0;
// methods
void parse_config(blockstore_config_t & config);
void bind_socket();
void print_stats();
// event loop, socket read/write
void loop();
@ -274,6 +282,7 @@ class osd_t
void secondary_op_callback(osd_op_t *cur_op);
// primary ops
void autosync();
bool prepare_primary_rw(osd_op_t *cur_op);
void continue_primary_read(osd_op_t *cur_op);
void continue_primary_write(osd_op_t *cur_op);

View File

@ -31,6 +31,13 @@ void osd_t::init_primary()
pgs[1].print_state();
pg_count = 1;
peering_state = OSD_CONNECTING_PEERS;
if (autosync_interval > 0)
{
this->sync_tfd = new timerfd_interval(ringloop, 3, [this]()
{
autosync();
});
}
}
osd_peer_def_t osd_t::parse_peer(std::string peer)

View File

@ -71,10 +71,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
// But we must not use K in the process of calculating the PG number
// So we calculate the PG number using a separate setting which should be per-inode (FIXME)
pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / pg_stripe_size) % pg_count + 1;
// FIXME: Postpone operations in inactive PGs
if (pgs.find(pg_num) == pgs.end() || !(pgs[pg_num].state & PG_ACTIVE))
{
finish_op(cur_op, -EINVAL);
finish_op(cur_op, -EPIPE);
return false;
}
uint64_t pg_block_size = bs_block_size * pgs[pg_num].pg_minsize;
@ -620,8 +619,35 @@ resume_7:
}
}
void osd_t::autosync()
{
if (immediate_commit != IMMEDIATE_ALL && !autosync_op)
{
autosync_op = new osd_op_t();
autosync_op->op_type = OSD_OP_IN;
autosync_op->req = {
.sync = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = 1,
.opcode = OSD_OP_SYNC,
},
},
};
autosync_op->callback = [this](osd_op_t *op)
{
if (op->reply.hdr.retval < 0)
{
printf("Warning: automatic sync resulted in an error: %ld (%s)\n", -op->reply.hdr.retval, strerror(-op->reply.hdr.retval));
}
delete autosync_op;
autosync_op = NULL;
};
exec_op(autosync_op);
}
}
// Save and clear unstable_writes -> SYNC all -> STABLE all
// FIXME: Run regular automatic syncs based on the number of unstable writes and/or system time
void osd_t::continue_primary_sync(osd_op_t *cur_op)
{
if (!cur_op->op_data)