Browse Source

Test & fix single-PG primary OSD

- Add support for benchmarking single primary OSD in fio_sec_osd
- Do not wait for the next event in flushers (return resume_0 back)
- Fix flushing of zero-length writes
- Print PG object count when peering
- Print journal free space when starting and when congested
blocking-uring-test
Vitaliy Filippov 2 years ago
parent
commit
1733de2db6
  1. 75
      blockstore_flush.cpp
  2. 2
      blockstore_flush.h
  3. 10
      blockstore_init.cpp
  4. 6
      blockstore_journal.cpp
  5. 80
      fio_sec_osd.cpp
  6. 7
      osd_peering_pg.cpp
  7. 2
      osd_peering_pg.h
  8. 14
      osd_primary.cpp

75
blockstore_flush.cpp

@ -154,6 +154,7 @@ bool journal_flusher_co::loop()
goto resume_17;
else if (wait_state == 18)
goto resume_18;
resume_0:
if (!flusher->flush_queue.size() ||
!flusher->start_forced && !flusher->active_flushers && flusher->flush_queue.size() < flusher->sync_threshold)
{
@ -181,7 +182,7 @@ bool journal_flusher_co::loop()
if (repeat_it->second < cur.version)
repeat_it->second = cur.version;
wait_state = 0;
return true;
goto resume_0;
}
else
flusher->sync_to_repeat[cur.oid] = 0;
@ -196,7 +197,7 @@ resume_1:
wait_state += 1;
return false;
}
if (copy_count == 0 && clean_loc == UINT64_MAX && !has_delete)
if (copy_count == 0 && clean_loc == UINT64_MAX && !has_delete && !has_empty)
{
// Nothing to flush
flusher->active_flushers--;
@ -208,7 +209,7 @@ resume_1:
}
flusher->sync_to_repeat.erase(repeat_it);
wait_state = 0;
return true;
goto resume_0;
}
// Find it in clean_db
clean_it = bs->clean_db.find(cur.oid);
@ -427,7 +428,7 @@ resume_1:
}
flusher->sync_to_repeat.erase(repeat_it);
wait_state = 0;
return true;
goto resume_0;
}
return true;
}
@ -444,6 +445,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
copy_count = 0;
clean_loc = UINT64_MAX;
has_delete = false;
has_empty = false;
skip_copy = false;
clean_init_bitmap = false;
while (1)
@ -451,40 +453,47 @@ bool journal_flusher_co::scan_dirty(int wait_base)
if (dirty_it->second.state == ST_J_STABLE && !skip_copy)
{
// First we submit all reads
offset = dirty_it->second.offset;
end_offset = dirty_it->second.offset + dirty_it->second.len;
it = v.begin();
while (1)
if (dirty_it->second.len == 0)
{
for (; it != v.end(); it++)
if (it->offset >= offset)
break;
if (it == v.end() || it->offset > offset && it->len > 0)
has_empty = true;
}
else
{
offset = dirty_it->second.offset;
end_offset = dirty_it->second.offset + dirty_it->second.len;
it = v.begin();
while (1)
{
submit_offset = dirty_it->second.location + offset - dirty_it->second.offset;
submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset;
it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(MEM_ALIGNMENT, submit_len) });
copy_count++;
if (bs->journal.inmemory)
{
// Take it from memory
memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len);
}
else
for (; it != v.end(); it++)
if (it->offset >= offset)
break;
if (it == v.end() || it->offset > offset && it->len > 0)
{
// Read it from disk
await_sqe(0);
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
data->callback = simple_callback_r;
my_uring_prep_readv(
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset
);
wait_count++;
submit_offset = dirty_it->second.location + offset - dirty_it->second.offset;
submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset;
it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(MEM_ALIGNMENT, submit_len) });
copy_count++;
if (bs->journal.inmemory)
{
// Take it from memory
memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len);
}
else
{
// Read it from disk
await_sqe(0);
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
data->callback = simple_callback_r;
my_uring_prep_readv(
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset
);
wait_count++;
}
}
offset = it->offset+it->len;
if (it == v.end() || offset >= end_offset)
break;
}
offset = it->offset+it->len;
if (it == v.end() || offset >= end_offset)
break;
}
}
else if (dirty_it->second.state == ST_D_STABLE && !skip_copy)

2
blockstore_flush.h

@ -45,7 +45,7 @@ class journal_flusher_co
std::map<object_id, uint64_t>::iterator repeat_it;
std::function<void(ring_data_t*)> simple_callback_r, simple_callback_w;
bool skip_copy, has_delete;
bool skip_copy, has_delete, has_empty;
spp::sparse_hash_map<object_id, clean_entry>::iterator clean_it;
std::vector<copy_buffer_t> v;
std::vector<copy_buffer_t>::iterator it;

10
blockstore_init.cpp

@ -402,7 +402,15 @@ resume_1:
}
// Trim journal on start so we don't stall when all entries are older
bs->journal.trim();
printf("Journal entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count);
printf(
"Journal entries loaded: %lu, free journal space: %lu bytes (%lu..%lu is used), free blocks: %lu / %lu\n",
entries_loaded,
(bs->journal.next_free >= bs->journal.used_start
? bs->journal.len-bs->journal.block_size - (bs->journal.next_free-bs->journal.used_start)
: bs->journal.used_start - bs->journal.next_free),
bs->journal.used_start, bs->journal.next_free,
bs->data_alloc->get_free_count(), bs->block_count
);
bs->journal.crc32_last = crc32_last;
return 0;
}

6
blockstore_journal.cpp

@ -67,6 +67,12 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
if (!right_dir && next_pos >= bs->journal.used_start-bs->journal.block_size)
{
// No space in the journal. Wait until used_start changes.
printf(
"Ran out of journal space (free space: %lu bytes)\n",
(bs->journal.next_free >= bs->journal.used_start
? bs->journal.len-bs->journal.block_size - (bs->journal.next_free-bs->journal.used_start)
: bs->journal.used_start - bs->journal.next_free)
);
PRIV(op)->wait_for = WAIT_JOURNAL;
bs->flusher->force_start();
PRIV(op)->wait_detail = bs->journal.used_start;

80
fio_sec_osd.cpp

@ -5,7 +5,7 @@
// Random write:
//
// fio -thread -ioengine=./libfio_sec_osd.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \
// -host=127.0.0.1 -port=11203 -size=1000M
// -host=127.0.0.1 -port=11203 [-single_primary=1] -size=1000M
//
// Linear write:
//
@ -50,6 +50,7 @@ struct sec_options
int __pad;
char *host = NULL;
int port = 0;
bool single_primary = false;
};
static struct fio_option options[] = {
@ -71,6 +72,16 @@ static struct fio_option options[] = {
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "single_primary",
.lname = "Single Primary",
.type = FIO_OPT_BOOL,
.off1 = offsetof(struct sec_options, single_primary),
.help = "Test single Primary OSD (one PG) instead of Secondary",
.def = "0",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = NULL,
},
@ -150,6 +161,7 @@ static int sec_init(struct thread_data *td)
/* Begin read or write request. */
static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
{
sec_options *opt = (sec_options*)td->eo;
sec_data *bsd = (sec_data*)td->io_ops_data;
int n = bsd->op_n;
@ -167,31 +179,59 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
switch (io->ddir)
{
case DDIR_READ:
op.hdr.opcode = OSD_OP_SECONDARY_READ;
op.sec_rw.oid = {
.inode = 1,
.stripe = io->offset >> bsd->block_order,
};
op.sec_rw.version = UINT64_MAX; // last unstable
op.sec_rw.offset = io->offset % bsd->block_size;
op.sec_rw.len = io->xfer_buflen;
if (!opt->single_primary)
{
op.hdr.opcode = OSD_OP_SECONDARY_READ;
op.sec_rw.oid = {
.inode = 1,
.stripe = io->offset >> bsd->block_order,
};
op.sec_rw.version = UINT64_MAX; // last unstable
op.sec_rw.offset = io->offset % bsd->block_size;
op.sec_rw.len = io->xfer_buflen;
}
else
{
op.hdr.opcode = OSD_OP_READ;
op.rw.inode = 1;
op.rw.offset = io->offset;
op.rw.len = io->xfer_buflen;
}
bsd->last_sync = false;
break;
case DDIR_WRITE:
op.hdr.opcode = OSD_OP_SECONDARY_WRITE;
op.sec_rw.oid = {
.inode = 1,
.stripe = io->offset >> bsd->block_order,
};
op.sec_rw.version = 0; // assign automatically
op.sec_rw.offset = io->offset % bsd->block_size;
op.sec_rw.len = io->xfer_buflen;
if (!opt->single_primary)
{
op.hdr.opcode = OSD_OP_SECONDARY_WRITE;
op.sec_rw.oid = {
.inode = 1,
.stripe = io->offset >> bsd->block_order,
};
op.sec_rw.version = 0; // assign automatically
op.sec_rw.offset = io->offset % bsd->block_size;
op.sec_rw.len = io->xfer_buflen;
}
else
{
op.hdr.opcode = OSD_OP_WRITE;
op.rw.inode = 1;
op.rw.offset = io->offset;
op.rw.len = io->xfer_buflen;
}
bsd->last_sync = false;
break;
case DDIR_SYNC:
// Allowed only for testing: sync & stabilize all unstable object versions
op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL;
// fio sends 32 syncs with -fsync=32. we omit 31 of them even though it's not 100% fine (FIXME: fix fio itself)
if (!opt->single_primary)
{
// Allowed only for testing: sync & stabilize all unstable object versions
op.hdr.opcode = OSD_OP_TEST_SYNC_STAB_ALL;
}
else
{
op.hdr.opcode = OSD_OP_SYNC;
}
// fio sends 32 syncs with -fsync=32. we omit 31 of them even though
// generally it may not be 100% correct (FIXME: fix fio itself)
bsd->last_sync = true;
break;
default:

7
osd_peering_pg.cpp

@ -133,6 +133,7 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector<obj_ver_role> &
}
else
pg.clean_count++;
pg.total_count++;
}
// FIXME: Write at least some tests for this function
@ -167,6 +168,7 @@ void pg_t::calc_object_states()
std::sort(all.begin(), all.end());
// Walk over it and check object states
pg.clean_count = 0;
pg.total_count = 0;
pg.state = 0;
int replica = 0;
pg_obj_state_check_t st;
@ -251,12 +253,13 @@ void pg_t::calc_object_states()
pg.state = pg.state | PG_DEGRADED;
}
printf(
"PG %u is active%s%s%s%s%s\n", pg.pg_num,
"PG %u is active%s%s%s%s%s (%lu objects)\n", pg.pg_num,
(pg.state & PG_DEGRADED) ? " + degraded" : "",
(pg.state & PG_HAS_UNFOUND) ? " + has_unfound" : "",
(pg.state & PG_HAS_DEGRADED) ? " + has_degraded" : "",
(pg.state & PG_HAS_MISPLACED) ? " + has_misplaced" : "",
(pg.state & PG_HAS_UNCLEAN) ? " + has_unclean" : ""
(pg.state & PG_HAS_UNCLEAN) ? " + has_unclean" : "",
pg.total_count
);
pg.state = pg.state | PG_ACTIVE;
}

2
osd_peering_pg.h

@ -111,7 +111,7 @@ struct pg_t
int state;
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
pg_num_t pg_num;
uint64_t clean_count = 0;
uint64_t clean_count = 0, total_count = 0;
// target_set is the "correct" peer OSD set for this PG
std::vector<osd_num_t> target_set;
// cur_set is the current set of connected peer OSDs for this PG

14
osd_primary.cpp

@ -328,17 +328,18 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
return;
}
}
resume_1:
// Check if there are other write requests to the same object
{
auto vo_it = pg.ver_override.find(op_data->oid);
if (vo_it != pg.ver_override.end())
auto vo_it = pg.write_queue.find(op_data->oid);
if (vo_it != pg.write_queue.end())
{
op_data->st = 1;
pg.write_queue.emplace(op_data->oid, cur_op);
return;
}
pg.write_queue.emplace(op_data->oid, cur_op);
}
resume_1:
// Determine blocks to read
cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize);
// Read required blocks
@ -381,10 +382,13 @@ resume_5:
// Continue other write operations to the same object
{
auto next_it = pg.write_queue.find(op_data->oid);
if (next_it != pg.write_queue.end())
auto this_it = next_it;
next_it++;
pg.write_queue.erase(this_it);
if (next_it != pg.write_queue.end() &&
next_it->first == op_data->oid)
{
osd_op_t *next_op = next_it->second;
pg.write_queue.erase(next_it);
continue_primary_write(next_op);
}
}

Loading…
Cancel
Save