Browse Source

Implement blockstore rollback operation

Rollback operation is required for the primary OSD to kill unstable
object versions in OSD peers so they don't occupy journal space
blocking-uring-test
Vitaliy Filippov 2 years ago
parent
commit
2b09710d6f
  1. 2
      Makefile
  2. 4
      blockstore.h
  3. 36
      blockstore_flush.cpp
  4. 2
      blockstore_flush.h
  5. 29
      blockstore_impl.cpp
  6. 9
      blockstore_impl.h
  7. 53
      blockstore_init.cpp
  8. 13
      blockstore_journal.h
  9. 187
      blockstore_rollback.cpp
  10. 2
      blockstore_stable.cpp
  11. 2
      blockstore_write.cpp
  12. 4
      osd_exec_secondary.cpp
  13. 3
      osd_ops.h

2
Makefile

@ -1,5 +1,5 @@
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o
CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always
all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd
clean:

4
blockstore.h

@ -25,8 +25,8 @@
#define BS_OP_STABLE 4
#define BS_OP_DELETE 5
#define BS_OP_LIST 6
#define BS_OP_MAX 6
#define BS_OP_TYPE_MASK 0x7
#define BS_OP_ROLLBACK 7
#define BS_OP_MAX 7
#define BS_OP_PRIVATE_DATA_SIZE 256

36
blockstore_flush.cpp

@ -438,7 +438,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
{
goto resume_0;
}
dirty_it = dirty_end;
dirty_it = dirty_start = dirty_end;
v.clear();
wait_count = 0;
copy_count = 0;
@ -511,6 +511,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
);
throw std::runtime_error(err);
}
dirty_start = dirty_it;
if (dirty_it == bs->dirty_db.begin())
{
break;
@ -593,38 +594,7 @@ void journal_flusher_co::update_clean_db()
.location = clean_loc,
};
}
dirty_it = dirty_end;
while (1)
{
if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc)
{
#ifdef BLOCKSTORE_DEBUG
printf("Free block %lu\n", dirty_it->second.location >> bs->block_order);
#endif
bs->data_alloc->set(dirty_it->second.location >> bs->block_order, false);
}
#ifdef BLOCKSTORE_DEBUG
printf("remove usage of journal offset %lu by %lu:%lu v%lu\n", dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version);
#endif
int used = --bs->journal.used_sectors[dirty_it->second.journal_sector];
if (used == 0)
{
bs->journal.used_sectors.erase(dirty_it->second.journal_sector);
}
if (dirty_it == bs->dirty_db.begin())
{
break;
}
dirty_it--;
if (dirty_it->first.oid != cur.oid)
{
break;
}
}
// Then, basically, remove everything up to the current version from dirty_db...
if (dirty_it->first.oid != cur.oid)
dirty_it++;
bs->dirty_db.erase(dirty_it, std::next(dirty_end));
bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc);
}
bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)

2
blockstore_flush.h

@ -41,7 +41,7 @@ class journal_flusher_co
std::list<flusher_sync_t>::iterator cur_sync;
obj_ver_id cur;
std::map<obj_ver_id, dirty_entry>::iterator dirty_it, dirty_end;
std::map<obj_ver_id, dirty_entry>::iterator dirty_it, dirty_start, dirty_end;
std::map<object_id, uint64_t>::iterator repeat_it;
std::function<void(ring_data_t*)> simple_callback_r, simple_callback_w;

29
blockstore_impl.cpp

@ -127,8 +127,7 @@ void blockstore_impl_t::loop()
}
else if (PRIV(op)->wait_for)
{
if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE ||
(op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE)
if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE)
{
has_writes = 2;
}
@ -138,12 +137,11 @@ void blockstore_impl_t::loop()
unsigned ring_space = ringloop->space_left();
unsigned prev_sqe_pos = ringloop->save();
int dequeue_op = 0;
if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_READ)
if (op->opcode == BS_OP_READ)
{
dequeue_op = dequeue_read(op);
}
else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE ||
(op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE)
else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE)
{
if (has_writes == 2)
{
@ -153,7 +151,7 @@ void blockstore_impl_t::loop()
dequeue_op = dequeue_write(op);
has_writes = dequeue_op ? 1 : 2;
}
else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_SYNC)
else if (op->opcode == BS_OP_SYNC)
{
// wait for all small writes to be submitted
// wait for all big writes to complete, submit data device fsync
@ -166,11 +164,15 @@ void blockstore_impl_t::loop()
}
dequeue_op = dequeue_sync(op);
}
else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_STABLE)
else if (op->opcode == BS_OP_STABLE)
{
dequeue_op = dequeue_stable(op);
}
else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_LIST)
else if (op->opcode == BS_OP_ROLLBACK)
{
dequeue_op = dequeue_rollback(op);
}
else if (op->opcode == BS_OP_LIST)
{
process_list(op);
dequeue_op = true;
@ -296,15 +298,14 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
{
int type = op->opcode & BS_OP_TYPE_MASK;
if (type < BS_OP_MIN || type > BS_OP_MAX ||
((type == BS_OP_READ || type == BS_OP_WRITE) && (
if (op->opcode < BS_OP_MIN || op->opcode > BS_OP_MAX ||
((op->opcode == BS_OP_READ || op->opcode == BS_OP_WRITE) && (
op->offset >= block_size ||
op->len > block_size-op->offset ||
(op->len % disk_alignment)
)) ||
readonly && type != BS_OP_READ ||
first && type == BS_OP_WRITE)
readonly && op->opcode != BS_OP_READ ||
first && op->opcode == BS_OP_WRITE)
{
// Basic verification not passed
op->retval = -EINVAL;
@ -324,7 +325,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
{
submit_queue.push_front(op);
}
if (type == BS_OP_WRITE)
if (op->opcode == BS_OP_WRITE)
{
enqueue_write(op);
}

9
blockstore_impl.h

@ -167,6 +167,8 @@ struct blockstore_op_private_t
int sync_state, prev_sync_count;
};
typedef std::map<obj_ver_id, dirty_entry> blockstore_dirty_db_t;
#include "blockstore_init.h"
#include "blockstore_flush.h"
@ -199,7 +201,7 @@ class blockstore_impl_t
// Another option is https://github.com/algorithm-ninja/cpp-btree
spp::sparse_hash_map<object_id, clean_entry> clean_db;
uint8_t *clean_bitmap = NULL;
std::map<obj_ver_id, dirty_entry> dirty_db;
blockstore_dirty_db_t dirty_db;
std::list<blockstore_op_t*> submit_queue; // FIXME: funny thing is that vector is better here
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
std::list<blockstore_op_t*> in_progress_syncs; // ...and probably here, too
@ -278,6 +280,11 @@ class blockstore_impl_t
void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
void stabilize_object(object_id oid, uint64_t max_ver);
// Rollback
int dequeue_rollback(blockstore_op_t *op);
void handle_rollback_event(ring_data_t *data, blockstore_op_t *op);
void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc);
// List
void process_list(blockstore_op_t *op);

53
blockstore_init.cpp

@ -559,7 +559,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
if (it == bs->dirty_db.end())
{
// journal contains a legitimate STABLE entry for a non-existing dirty write
// this probably means that journal was trimmed between WRITTEN and STABLE entries
// this probably means that journal was trimmed between WRITE and STABLE entries
// skip it
}
else
@ -583,6 +583,57 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
bs->unstable_writes.erase(unstab_it);
}
}
else if (je->type == JE_ROLLBACK)
{
#ifdef BLOCKSTORE_DEBUG
printf("je_rollback oid=%lu:%lu ver=%lu\n", je->rollback.oid.inode, je->rollback.oid.stripe, je->rollback.version);
#endif
// rollback dirty writes of <oid> up to <version>
auto it = bs->dirty_db.lower_bound((obj_ver_id){
.oid = je->rollback.oid,
.version = UINT64_MAX,
});
if (it != bs->dirty_db.begin())
{
uint64_t max_unstable = 0;
auto rm_start = it;
auto rm_end = it;
it--;
while (it->first.oid == je->rollback.oid &&
it->first.version > je->rollback.version &&
!IS_IN_FLIGHT(it->second.state) &&
!IS_STABLE(it->second.state))
{
if (it->first.oid != je->rollback.oid)
break;
else if (it->first.version <= je->rollback.version)
{
if (!IS_STABLE(it->second.state))
max_unstable = it->first.version;
break;
}
else if (IS_STABLE(it->second.state))
break;
// Remove entry
rm_start = it;
if (it == bs->dirty_db.begin())
break;
it--;
}
if (rm_start != rm_end)
{
bs->erase_dirty(rm_start, rm_end, UINT64_MAX);
}
auto unstab_it = bs->unstable_writes.find(je->rollback.oid);
if (unstab_it != bs->unstable_writes.end())
{
if (max_unstable == 0)
bs->unstable_writes.erase(unstab_it);
else
unstab_it->second = max_unstable;
}
}
}
else if (je->type == JE_DELETE)
{
#ifdef BLOCKSTORE_DEBUG

13
blockstore_journal.h

@ -17,6 +17,7 @@
#define JE_BIG_WRITE 0x03
#define JE_STABLE 0x04
#define JE_DELETE 0x05
#define JE_ROLLBACK 0x06
// crc32c comes first to ease calculation and is equal to crc32()
struct __attribute__((__packed__)) journal_entry_start
@ -71,6 +72,17 @@ struct __attribute__((__packed__)) journal_entry_stable
uint64_t version;
};
struct __attribute__((__packed__)) journal_entry_rollback
{
uint32_t crc32;
uint16_t magic;
uint16_t type;
uint32_t size;
uint32_t crc32_prev;
object_id oid;
uint64_t version;
};
struct __attribute__((__packed__)) journal_entry_del
{
uint32_t crc32;
@ -98,6 +110,7 @@ struct __attribute__((__packed__)) journal_entry
journal_entry_small_write small_write;
journal_entry_big_write big_write;
journal_entry_stable stable;
journal_entry_rollback rollback;
journal_entry_del del;
};
};

187
blockstore_rollback.cpp

@ -0,0 +1,187 @@
#include "blockstore_impl.h"
int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op)
{
obj_ver_id* v;
int i, todo = op->len;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
// Check that there are some versions greater than v->version (which may be zero),
// check that they're unstable, synced, and not currently written to
auto dirty_it = dirty_db.lower_bound((obj_ver_id){
.oid = v->oid,
.version = UINT64_MAX,
});
if (dirty_it == dirty_db.begin())
{
bad_op:
op->retval = -EINVAL;
FINISH_OP(op);
return 1;
}
else
{
dirty_it--;
if (dirty_it->first.oid != v->oid || dirty_it->first.version < v->version)
{
goto bad_op;
}
while (dirty_it->first.oid == v->oid && dirty_it->first.version > v->version)
{
if (!IS_SYNCED(dirty_it->second.state) ||
IS_STABLE(dirty_it->second.state))
{
goto bad_op;
}
if (dirty_it == dirty_db.begin())
{
break;
}
dirty_it--;
}
}
}
// Check journal space
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, todo, sizeof(journal_entry_rollback), 0))
{
return 0;
}
// There is sufficient space. Get SQEs
struct io_uring_sqe *sqe[space_check.sectors_required];
for (i = 0; i < space_check.sectors_required; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// Prepare and submit journal entries
auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
int s = 0, cur_sector = -1;
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_rollback) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)
PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector;
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
}
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
// FIXME This is here only for the purpose of tracking unstable_writes. Remove if not required
// FIXME ...aaaand this is similar to blockstore_init.cpp - maybe dedup it?
auto dirty_it = dirty_db.lower_bound((obj_ver_id){
.oid = v->oid,
.version = UINT64_MAX,
});
uint64_t max_unstable = 0;
while (dirty_it != dirty_db.begin())
{
dirty_it--;
if (dirty_it->first.oid != v->oid)
break;
else if (dirty_it->first.version <= v->version)
{
if (!IS_STABLE(dirty_it->second.state))
max_unstable = dirty_it->first.version;
break;
}
}
auto unstab_it = unstable_writes.find(v->oid);
if (unstab_it != unstable_writes.end())
{
if (max_unstable == 0)
unstable_writes.erase(unstab_it);
else
unstab_it->second = max_unstable;
}
journal_entry_rollback *je = (journal_entry_rollback*)
prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback));
journal.sector_info[journal.cur_sector].dirty = false;
je->oid = v->oid;
je->version = v->version;
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
if (cur_sector != journal.cur_sector)
{
if (cur_sector == -1)
PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector;
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
}
}
PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
return 1;
}
void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
// Release used journal sectors
release_journal_sectors(op);
obj_ver_id* v;
int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
// Erase dirty_db entries
auto rm_end = dirty_db.lower_bound((obj_ver_id){
.oid = v->oid,
.version = UINT64_MAX,
});
rm_end--;
auto rm_start = rm_end;
while (1)
{
if (rm_end->first.oid != v->oid)
break;
else if (rm_end->first.version <= v->version)
break;
rm_start = rm_end;
if (rm_end == dirty_db.begin())
break;
rm_end--;
}
if (rm_end != rm_start)
erase_dirty(rm_start, rm_end, UINT64_MAX);
}
journal.trim();
// Acknowledge op
op->retval = 0;
FINISH_OP(op);
}
}
void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc)
{
auto dirty_it = dirty_end;
while (dirty_it != dirty_start)
{
dirty_it--;
if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc)
{
#ifdef BLOCKSTORE_DEBUG
printf("Free block %lu\n", dirty_it->second.location >> block_order);
#endif
data_alloc->set(dirty_it->second.location >> block_order, false);
}
#ifdef BLOCKSTORE_DEBUG
printf("remove usage of journal offset %lu by %lu:%lu v%lu\n", dirty_it->second.journal_sector,
dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version);
#endif
int used = --journal.used_sectors[dirty_it->second.journal_sector];
if (used == 0)
{
journal.used_sectors.erase(dirty_it->second.journal_sector);
}
}
dirty_db.erase(dirty_start, dirty_end);
}

2
blockstore_stable.cpp

@ -145,7 +145,7 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *
{
// Release used journal sectors
release_journal_sectors(op);
// First step: mark dirty_db entries as stable, acknowledge op completion
// Mark dirty_db entries as stable, acknowledge op completion
obj_ver_id* v;
int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)

2
blockstore_write.cpp

@ -3,7 +3,7 @@
void blockstore_impl_t::enqueue_write(blockstore_op_t *op)
{
// Check or assign version number
bool found = false, deleted = false, is_del = (op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE;
bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE);
uint64_t version = 1;
if (dirty_db.size() > 0)
{

4
osd_exec_secondary.cpp

@ -139,5 +139,9 @@ void osd_t::make_reply(osd_op_t *op)
op->reply.hdr.retval = op->bs_op.retval;
if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST)
op->reply.sec_list.stable_count = op->bs_op.version;
else if (op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE)
op->reply.sec_rw.version = op->bs_op.version;
else if (op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE)
op->reply.sec_del.version = op->bs_op.version;
}
}

3
osd_ops.h

@ -64,6 +64,8 @@ struct __attribute__((__packed__)) osd_op_secondary_rw_t
struct __attribute__((__packed__)) osd_reply_secondary_rw_t
{
osd_reply_header_t header;
// for writes: assigned version number
uint64_t version;
};
// delete object on the secondary OSD
@ -79,6 +81,7 @@ struct __attribute__((__packed__)) osd_op_secondary_del_t
struct __attribute__((__packed__)) osd_reply_secondary_del_t
{
osd_reply_header_t header;
uint64_t version;
};
// sync to the secondary OSD

Loading…
Cancel
Save