Browse Source

Add WRITE_STABLE operation for future replication support

Vitaliy Filippov 2 years ago
parent
commit
ec7acc8f3a
  1. 19
      blockstore.h
  2. 10
      blockstore_impl.cpp
  3. 2
      blockstore_impl.h
  4. 25
      blockstore_init.cpp
  5. 4
      blockstore_journal.h
  6. 20
      blockstore_sync.cpp
  7. 20
      blockstore_write.cpp
  8. 13
      dump_journal.cpp

19
blockstore.h

@ -27,13 +27,14 @@
#define BS_OP_MIN 1
#define BS_OP_READ 1
#define BS_OP_WRITE 2
#define BS_OP_SYNC 3
#define BS_OP_STABLE 4
#define BS_OP_DELETE 5
#define BS_OP_LIST 6
#define BS_OP_ROLLBACK 7
#define BS_OP_SYNC_STAB_ALL 8
#define BS_OP_MAX 8
#define BS_OP_WRITE_STABLE 3
#define BS_OP_SYNC 4
#define BS_OP_STABLE 5
#define BS_OP_DELETE 6
#define BS_OP_LIST 7
#define BS_OP_ROLLBACK 8
#define BS_OP_SYNC_STAB_ALL 9
#define BS_OP_MAX 9
#define BS_OP_PRIVATE_DATA_SIZE 256
@ -41,9 +42,9 @@
Blockstore opcode documentation:
## BS_OP_READ / BS_OP_WRITE
## BS_OP_READ / BS_OP_WRITE / BS_OP_WRITE_STABLE
Read or write object data.
Read or write object data. WRITE_STABLE writes a version that doesn't require marking as stable.
Input:
- oid = requested object

10
blockstore_impl.cpp

@ -130,7 +130,7 @@ void blockstore_impl_t::loop()
}
else if (PRIV(op)->wait_for)
{
if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE)
if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE)
{
has_writes = 2;
}
@ -144,7 +144,7 @@ void blockstore_impl_t::loop()
{
dequeue_op = dequeue_read(op);
}
else if (op->opcode == BS_OP_WRITE)
else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE)
{
if (has_writes == 2)
{
@ -329,13 +329,13 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
{
if (op->opcode < BS_OP_MIN || op->opcode > BS_OP_MAX ||
((op->opcode == BS_OP_READ || op->opcode == BS_OP_WRITE) && (
((op->opcode == BS_OP_READ || op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) && (
op->offset >= block_size ||
op->len > block_size-op->offset ||
(op->len % disk_alignment)
)) ||
readonly && op->opcode != BS_OP_READ && op->opcode != BS_OP_LIST ||
first && op->opcode == BS_OP_WRITE)
first && (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE))
{
// Basic verification not passed
op->retval = -EINVAL;
@ -380,7 +380,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
}
};
}
if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE) && !enqueue_write(op))
if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op))
{
std::function<void (blockstore_op_t*)>(op->callback)(op);
return;

2
blockstore_impl.h

@ -34,6 +34,8 @@
#define BS_ST_SYNCED 0x50
#define BS_ST_STABLE 0x60
#define BS_ST_INSTANT 0x100
#define IMMEDIATE_NONE 0
#define IMMEDIATE_SMALL 1
#define IMMEDIATE_ALL 2

25
blockstore_init.cpp

@ -454,10 +454,15 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
break;
}
}
if (je->type == JE_SMALL_WRITE)
if (je->type == JE_SMALL_WRITE || je->type == JE_SMALL_WRITE_INSTANT)
{
#ifdef BLOCKSTORE_DEBUG
printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", je->small_write.oid.inode, je->small_write.oid.stripe, je->small_write.version, je->small_write.offset, je->small_write.len);
printf(
"je_small_write%s oid=%lu:%lu ver=%lu offset=%u len=%u\n",
je->type == JE_SMALL_WRITE_INSTANT ? "_instant" : "",
je->small_write.oid.inode, je->small_write.oid.stripe, je->small_write.version,
je->small_write.offset, je->small_write.len
);
#endif
// oid, version, offset, len
uint64_t prev_free = next_free;
@ -544,12 +549,20 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
#endif
auto & unstab = bs->unstable_writes[ov.oid];
unstab = unstab < ov.version ? ov.version : unstab;
if (je->type == JE_SMALL_WRITE_INSTANT)
{
bs->mark_stable(ov);
}
}
}
else if (je->type == JE_BIG_WRITE)
else if (je->type == JE_BIG_WRITE || je->type == JE_BIG_WRITE_INSTANT)
{
#ifdef BLOCKSTORE_DEBUG
printf("je_big_write oid=%lu:%lu ver=%lu loc=%lu\n", je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location);
printf(
"je_big_write%s oid=%lu:%lu ver=%lu loc=%lu\n",
je->type == JE_BIG_WRITE_INSTANT ? "_instant" : "",
je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location
);
#endif
auto clean_it = bs->clean_db.find(je->big_write.oid);
if (clean_it == bs->clean_db.end() ||
@ -575,6 +588,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
bs->journal.used_sectors[proc_pos]++;
auto & unstab = bs->unstable_writes[ov.oid];
unstab = unstab < ov.version ? ov.version : unstab;
if (je->type == JE_BIG_WRITE_INSTANT)
{
bs->mark_stable(ov);
}
}
}
else if (je->type == JE_STABLE)

4
blockstore_journal.h

@ -19,7 +19,9 @@
#define JE_STABLE 0x04
#define JE_DELETE 0x05
#define JE_ROLLBACK 0x06
#define JE_MAX 0x06
#define JE_SMALL_WRITE_INSTANT 0x07
#define JE_BIG_WRITE_INSTANT 0x08
#define JE_MAX 0x08
// crc32c comes first to ease calculation and is equal to crc32()
struct __attribute__((__packed__)) journal_entry_start

20
blockstore_sync.cpp

@ -127,8 +127,10 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
}
while (it != PRIV(op)->sync_big_writes.end())
{
journal_entry_big_write *je = (journal_entry_big_write*)
prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write));
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, (dirty_db[*it].state & BS_ST_INSTANT) ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write)
);
dirty_db[*it].journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.sector_info[journal.cur_sector].dirty = false;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
@ -257,7 +259,11 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
auto & unstab = unstable_writes[it->oid];
unstab = unstab < it->version ? it->version : unstab;
auto dirty_it = dirty_db.find(*it);
dirty_it->second.state = (BS_ST_BIG_WRITE | BS_ST_SYNCED);
dirty_it->second.state = ((dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SYNCED);
if (dirty_it->second.state & BS_ST_INSTANT)
{
mark_stable(dirty_it->first);
}
dirty_it++;
while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid)
{
@ -281,9 +287,13 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
// Deletions are treated as immediately stable
mark_stable(*it);
}
else /* BS_ST_SMALL_WRITE | BS_ST_WRITTEN */
else /* (BS_ST_INSTANT?) | BS_ST_SMALL_WRITE | BS_ST_WRITTEN */
{
dirty_db[*it].state = (BS_ST_SMALL_WRITE | BS_ST_SYNCED);
dirty_db[*it].state = (dirty_db[*it].state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SYNCED;
if (dirty_db[*it].state & BS_ST_INSTANT)
{
mark_stable(*it);
}
}
}
in_progress_syncs.erase(PRIV(op)->in_progress_ptr);

20
blockstore_write.cpp

@ -78,7 +78,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
.state = (uint32_t)(
is_del
? (BS_ST_DELETE | BS_ST_IN_FLIGHT)
: (op->len == block_size || deleted
: (op->opcode == BS_OP_WRITE_STABLE ? BS_ST_INSTANT : 0) | (op->len == block_size || deleted
? (BS_ST_BIG_WRITE | BS_ST_IN_FLIGHT)
: (is_inflight_big ? (BS_ST_SMALL_WRITE | BS_ST_WAIT_BIG) : (BS_ST_SMALL_WRITE | BS_ST_IN_FLIGHT)))
),
@ -212,8 +212,10 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
}
}
// Then pre-fill journal entry
journal_entry_small_write *je = (journal_entry_small_write*)
prefill_single_journal_entry(journal, JE_SMALL_WRITE, sizeof(journal_entry_small_write));
journal_entry_small_write *je = (journal_entry_small_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_SMALL_WRITE_INSTANT : JE_SMALL_WRITE,
sizeof(journal_entry_small_write)
);
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
#ifdef BLOCKSTORE_DEBUG
@ -310,7 +312,10 @@ resume_2:
{
return 0;
}
je = (journal_entry_big_write*)prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write));
je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write)
);
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.sector_info[journal.cur_sector].dirty = false;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
@ -349,7 +354,7 @@ resume_4:
}
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK)
| (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE)
if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)))
{
// Deletions are treated as immediately stable
mark_stable(dirty_it->first);
@ -465,8 +470,9 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
}
}
// Pre-fill journal entry
journal_entry_del *je = (journal_entry_del*)
prefill_single_journal_entry(journal, JE_DELETE, sizeof(struct journal_entry_del));
journal_entry_del *je = (journal_entry_del*)prefill_single_journal_entry(
journal, JE_DELETE, sizeof(struct journal_entry_del)
);
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
#ifdef BLOCKSTORE_DEBUG

13
dump_journal.cpp

@ -104,10 +104,11 @@ void journal_dump_t::dump_block(void *buf)
{
printf("je_start start=%08lx\n", je->start.journal_start);
}
else if (je->type == JE_SMALL_WRITE)
else if (je->type == JE_SMALL_WRITE || je->type == JE_SMALL_WRITE_INSTANT)
{
printf(
"je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u loc=%08lx",
"je_small_write%s oid=%lu:%lu ver=%lu offset=%u len=%u loc=%08lx",
je->type == JE_SMALL_WRITE_INSTANT ? "_instant" : "",
je->small_write.oid.inode, je->small_write.oid.stripe,
je->small_write.version, je->small_write.offset, je->small_write.len,
je->small_write.data_offset
@ -139,9 +140,13 @@ void journal_dump_t::dump_block(void *buf)
);
printf("\n");
}
else if (je->type == JE_BIG_WRITE)
else if (je->type == JE_BIG_WRITE || je->type == JE_BIG_WRITE_INSTANT)
{
printf("je_big_write oid=%lu:%lu ver=%lu loc=%08lx\n", je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location);
printf(
"je_big_write%s oid=%lu:%lu ver=%lu loc=%08lx\n",
je->type == JE_BIG_WRITE_INSTANT ? "_instant" : "",
je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location
);
}
else if (je->type == JE_STABLE)
{

Loading…
Cancel
Save