Browse Source

Make blockstore object state a combination of type and workflow

Vitaliy Filippov 2 years ago
parent
commit
416a80b099
  1. 6
      blockstore_flush.cpp
  2. 44
      blockstore_impl.h
  3. 6
      blockstore_init.cpp
  4. 9
      blockstore_read.cpp
  5. 14
      blockstore_stable.cpp
  6. 14
      blockstore_sync.cpp
  7. 51
      blockstore_write.cpp

6
blockstore_flush.cpp

@ -530,7 +530,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
clean_init_bitmap = false;
while (1)
{
if (dirty_it->second.state == ST_J_STABLE && !skip_copy)
if (dirty_it->second.state == (BS_ST_SMALL_WRITE | BS_ST_STABLE) && !skip_copy)
{
// First we submit all reads
has_writes = true;
@ -573,7 +573,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
}
}
}
else if (dirty_it->second.state == ST_D_STABLE && !skip_copy)
else if (dirty_it->second.state == (BS_ST_BIG_WRITE | BS_ST_STABLE) && !skip_copy)
{
// There is an unflushed big write. Copy small writes in its position
has_writes = true;
@ -583,7 +583,7 @@ bool journal_flusher_co::scan_dirty(int wait_base)
clean_bitmap_len = dirty_it->second.len;
skip_copy = true;
}
else if (dirty_it->second.state == ST_DEL_STABLE && !skip_copy)
else if (dirty_it->second.state == (BS_ST_DELETE | BS_ST_STABLE) && !skip_copy)
{
// There is an unflushed delete
has_delete = true;

44
blockstore_impl.h

@ -22,40 +22,30 @@
//#define BLOCKSTORE_DEBUG
// States are not stored on disk. Instead, they're deduced from the journal
// FIXME: Rename to BS_ST_*
#define ST_J_WAIT_BIG 1
#define ST_J_IN_FLIGHT 2
#define ST_J_SUBMITTED 3
#define ST_J_WRITTEN 4
#define ST_J_SYNCED 5
#define ST_J_STABLE 6
#define BS_ST_SMALL_WRITE 0x01
#define BS_ST_BIG_WRITE 0x02
#define BS_ST_DELETE 0x03
#define ST_D_IN_FLIGHT 15
#define ST_D_SUBMITTED 16
#define ST_D_WRITTEN 17
#define ST_D_SYNCED 20
#define ST_D_STABLE 21
#define ST_DEL_IN_FLIGHT 31
#define ST_DEL_SUBMITTED 32
#define ST_DEL_WRITTEN 33
#define ST_DEL_SYNCED 34
#define ST_DEL_STABLE 35
#define ST_CURRENT 48
#define BS_ST_WAIT_BIG 0x10
#define BS_ST_IN_FLIGHT 0x20
#define BS_ST_SUBMITTED 0x30
#define BS_ST_WRITTEN 0x40
#define BS_ST_SYNCED 0x50
#define BS_ST_STABLE 0x60
#define IMMEDIATE_NONE 0
#define IMMEDIATE_SMALL 1
#define IMMEDIATE_ALL 2
#define IS_IN_FLIGHT(st) (st == ST_J_WAIT_BIG || st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED)
#define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT)
#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_SYNCED || st == ST_DEL_SYNCED)
#define IS_JOURNAL(st) (st >= ST_J_WAIT_BIG && st <= ST_J_STABLE)
#define IS_BIG_WRITE(st) (st >= ST_D_IN_FLIGHT && st <= ST_D_STABLE)
#define IS_DELETE(st) (st >= ST_DEL_IN_FLIGHT && st <= ST_DEL_STABLE)
#define IS_UNSYNCED(st) (st >= ST_J_WAIT_BIG && st <= ST_J_WRITTEN || st >= ST_D_IN_FLIGHT && st <= ST_D_WRITTEN|| st >= ST_DEL_IN_FLIGHT && st <= ST_DEL_WRITTEN)
#define BS_ST_TYPE_MASK 0x0F
#define BS_ST_WORKFLOW_MASK 0xF0
#define IS_IN_FLIGHT(st) (((st) & 0xF0) <= BS_ST_SUBMITTED)
#define IS_STABLE(st) (((st) & 0xF0) == BS_ST_STABLE)
#define IS_SYNCED(st) (((st) & 0xF0) >= BS_ST_SYNCED)
#define IS_JOURNAL(st) (((st) & 0x0F) == BS_ST_SMALL_WRITE)
#define IS_BIG_WRITE(st) (((st) & 0x0F) == BS_ST_BIG_WRITE)
#define IS_DELETE(st) (((st) & 0x0F) == BS_ST_DELETE)
#define BS_SUBMIT_GET_SQE(sqe, data) \
BS_SUBMIT_GET_ONLY_SQE(sqe); \

6
blockstore_init.cpp

@ -528,7 +528,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
.version = je->small_write.version,
};
bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_J_SYNCED,
.state = (BS_ST_SMALL_WRITE | BS_ST_SYNCED),
.flags = 0,
.location = location,
.offset = je->small_write.offset,
@ -561,7 +561,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
.version = je->big_write.version,
};
bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_D_SYNCED,
.state = (BS_ST_BIG_WRITE | BS_ST_SYNCED),
.flags = 0,
.location = je->big_write.location,
.offset = je->big_write.offset,
@ -616,7 +616,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
.version = je->del.version,
};
bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_DEL_SYNCED,
.state = (BS_ST_DELETE | BS_ST_SYNCED),
.flags = 0,
.location = 0,
.offset = 0,

9
blockstore_read.cpp

@ -157,7 +157,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
{
if (!clean_entry_bitmap_size)
{
if (!fulfill_read(read_op, fulfilled, 0, block_size, ST_CURRENT, 0, clean_it->second.location))
if (!fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_BIG_WRITE | BS_ST_STABLE), 0, clean_it->second.location))
{
// need to wait. undo added requests, don't dequeue op
PRIV(read_op)->read_vec.clear();
@ -189,7 +189,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
{
// fill with zeroes
fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
bmp_end * bitmap_granularity, ST_DEL_STABLE, 0, 0);
bmp_end * bitmap_granularity, (BS_ST_DELETE | BS_ST_STABLE), 0, 0);
}
bmp_start = bmp_end;
while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size)
@ -199,7 +199,8 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
if (bmp_end > bmp_start)
{
if (!fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
bmp_end * bitmap_granularity, ST_CURRENT, 0, clean_it->second.location + bmp_start * bitmap_granularity))
bmp_end * bitmap_granularity, (BS_ST_BIG_WRITE | BS_ST_STABLE), 0,
clean_it->second.location + bmp_start * bitmap_granularity))
{
// need to wait. undo added requests, don't dequeue op
PRIV(read_op)->read_vec.clear();
@ -214,7 +215,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
else if (fulfilled < read_op->len)
{
// fill remaining parts with zeroes
fulfill_read(read_op, fulfilled, 0, block_size, ST_DEL_STABLE, 0, 0);
fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_DELETE | BS_ST_STABLE), 0, 0);
}
assert(fulfilled == read_op->len);
read_op->version = result_version;

14
blockstore_stable.cpp

@ -64,7 +64,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
// Already stable
}
}
else if (IS_UNSYNCED(dirty_it->second.state))
else if (!IS_SYNCED(dirty_it->second.state))
{
// Object not synced yet. Caller must sync it first
op->retval = -EBUSY;
@ -184,17 +184,9 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v)
{
while (1)
{
if (dirty_it->second.state == ST_J_SYNCED)
if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_SYNCED)
{
dirty_it->second.state = ST_J_STABLE;
}
else if (dirty_it->second.state == ST_D_SYNCED)
{
dirty_it->second.state = ST_D_STABLE;
}
else if (dirty_it->second.state == ST_DEL_SYNCED)
{
dirty_it->second.state = ST_DEL_STABLE;
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_STABLE;
}
else if (IS_STABLE(dirty_it->second.state))
{

14
blockstore_sync.cpp

@ -257,13 +257,13 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
auto & unstab = unstable_writes[it->oid];
unstab = unstab < it->version ? it->version : unstab;
auto dirty_it = dirty_db.find(*it);
dirty_it->second.state = ST_D_SYNCED;
dirty_it->second.state = (BS_ST_BIG_WRITE | BS_ST_SYNCED);
dirty_it++;
while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid)
{
if (dirty_it->second.state == ST_J_WAIT_BIG)
if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG)
{
dirty_it->second.state = ST_J_IN_FLIGHT;
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT;
}
dirty_it++;
}
@ -275,15 +275,15 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
#endif
auto & unstab = unstable_writes[it->oid];
unstab = unstab < it->version ? it->version : unstab;
if (dirty_db[*it].state == ST_DEL_WRITTEN)
if (dirty_db[*it].state == (BS_ST_DELETE | BS_ST_WRITTEN))
{
dirty_db[*it].state = ST_DEL_SYNCED;
dirty_db[*it].state = (BS_ST_DELETE | BS_ST_SYNCED);
// Deletions are treated as immediately stable
mark_stable(*it);
}
else /* == ST_J_WRITTEN */
else /* BS_ST_SMALL_WRITE | BS_ST_WRITTEN */
{
dirty_db[*it].state = ST_J_SYNCED;
dirty_db[*it].state = (BS_ST_SMALL_WRITE | BS_ST_SYNCED);
}
}
in_progress_syncs.erase(PRIV(op)->in_progress_ptr);

51
blockstore_write.cpp

@ -18,9 +18,9 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
found = true;
version = dirty_it->first.version + 1;
deleted = IS_DELETE(dirty_it->second.state);
is_inflight_big = dirty_it->second.state >= ST_D_IN_FLIGHT &&
dirty_it->second.state < ST_D_SYNCED ||
dirty_it->second.state == ST_J_WAIT_BIG;
is_inflight_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
? !IS_SYNCED(dirty_it->second.state)
: ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG);
}
}
if (!found)
@ -77,8 +77,10 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
}, (dirty_entry){
.state = (uint32_t)(
is_del
? ST_DEL_IN_FLIGHT
: (op->len == block_size || deleted ? ST_D_IN_FLIGHT : (is_inflight_big ? ST_J_WAIT_BIG : ST_J_IN_FLIGHT))
? (BS_ST_DELETE | BS_ST_IN_FLIGHT)
: (op->len == block_size || deleted
? (BS_ST_BIG_WRITE | BS_ST_IN_FLIGHT)
: (is_inflight_big ? (BS_ST_SMALL_WRITE | BS_ST_WAIT_BIG) : (BS_ST_SMALL_WRITE | BS_ST_IN_FLIGHT)))
),
.flags = 0,
.location = 0,
@ -101,11 +103,12 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
.version = op->version,
});
assert(dirty_it != dirty_db.end());
if (dirty_it->second.state == ST_J_WAIT_BIG)
if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG)
{
// Don't dequeue
return 0;
}
else if (dirty_it->second.state == ST_D_IN_FLIGHT)
else if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
{
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION))
@ -129,7 +132,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
}
BS_SUBMIT_GET_SQE(sqe, data);
dirty_it->second.location = loc << block_order;
dirty_it->second.state = ST_D_SUBMITTED;
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
#ifdef BLOCKSTORE_DEBUG
printf("Allocate block %lu\n", loc);
#endif
@ -169,7 +172,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
PRIV(op)->op_state = 1;
}
}
else
else /* if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_SMALL_WRITE) */
{
// Small (journaled) write
// First check if the journal has sufficient space
@ -257,7 +260,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
// Zero-length overwrite. Allowed to bump object version in EC placement groups without actually writing data
}
dirty_it->second.location = journal.next_free;
dirty_it->second.state = ST_J_SUBMITTED;
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
journal.next_free += op->len;
if (journal.next_free >= journal.len)
{
@ -336,7 +339,7 @@ resume_4:
#ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
#endif
bool imm = dirty_it->second.state == ST_D_SUBMITTED
bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
? (immediate_commit == IMMEDIATE_ALL)
: (immediate_commit != IMMEDIATE_NONE);
if (imm)
@ -344,31 +347,21 @@ resume_4:
auto & unstab = unstable_writes[op->oid];
unstab = unstab < op->version ? op->version : unstab;
}
if (dirty_it->second.state == ST_J_SUBMITTED)
{
dirty_it->second.state = imm ? ST_J_SYNCED : ST_J_WRITTEN;
}
else if (dirty_it->second.state == ST_D_SUBMITTED)
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK)
| (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE)
{
dirty_it->second.state = imm ? ST_D_SYNCED : ST_D_WRITTEN;
}
else if (dirty_it->second.state == ST_DEL_SUBMITTED)
{
dirty_it->second.state = imm ? ST_DEL_SYNCED : ST_DEL_WRITTEN;
if (imm)
{
// Deletions are treated as immediately stable
mark_stable(dirty_it->first);
}
// Deletions are treated as immediately stable
mark_stable(dirty_it->first);
}
if (immediate_commit == IMMEDIATE_ALL)
{
dirty_it++;
while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
{
if (dirty_it->second.state == ST_J_WAIT_BIG)
if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG)
{
dirty_it->second.state = ST_J_IN_FLIGHT;
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT;
}
dirty_it++;
}
@ -487,7 +480,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
je->version = op->version;
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
dirty_it->second.state = ST_DEL_SUBMITTED;
dirty_it->second.state = BS_ST_DELETE | BS_ST_SUBMITTED;
if (immediate_commit != IMMEDIATE_NONE)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);

Loading…
Cancel
Save