forked from vitalif/vitastor
Move SYNC_STAB_ALL into blockstore implementation
parent
4a52a15564
commit
c71b67f2f7
|
@ -30,7 +30,8 @@
|
|||
#define BS_OP_DELETE 5
|
||||
#define BS_OP_LIST 6
|
||||
#define BS_OP_ROLLBACK 7
|
||||
#define BS_OP_MAX 7
|
||||
#define BS_OP_SYNC_STAB_ALL 8
|
||||
#define BS_OP_MAX 8
|
||||
|
||||
#define BS_OP_PRIVATE_DATA_SIZE 256
|
||||
|
||||
|
|
|
@ -312,8 +312,47 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
|||
op->callback(op);
|
||||
return;
|
||||
}
|
||||
if (op->opcode == BS_OP_SYNC_STAB_ALL)
|
||||
{
|
||||
std::function<void(blockstore_op_t*)> *old_callback = new std::function<void(blockstore_op_t*)>(op->callback);
|
||||
op->opcode = BS_OP_SYNC;
|
||||
op->callback = [this, old_callback](blockstore_op_t *op)
|
||||
{
|
||||
if (op->retval >= 0 && unstable_writes.size() > 0)
|
||||
{
|
||||
op->opcode = BS_OP_STABLE;
|
||||
op->len = unstable_writes.size();
|
||||
obj_ver_id *vers = new obj_ver_id[op->len];
|
||||
op->buf = vers;
|
||||
int i = 0;
|
||||
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++)
|
||||
{
|
||||
vers[i] = {
|
||||
.oid = it->first,
|
||||
.version = it->second,
|
||||
};
|
||||
}
|
||||
unstable_writes.clear();
|
||||
op->callback = [this, old_callback](blockstore_op_t *op)
|
||||
{
|
||||
obj_ver_id *vers = (obj_ver_id*)op->buf;
|
||||
delete[] vers;
|
||||
op->buf = NULL;
|
||||
(*old_callback)(op);
|
||||
delete old_callback;
|
||||
};
|
||||
this->enqueue_op(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
(*old_callback)(op);
|
||||
delete old_callback;
|
||||
}
|
||||
};
|
||||
}
|
||||
if (op->opcode == BS_OP_WRITE && !enqueue_write(op))
|
||||
{
|
||||
op->callback(op);
|
||||
return;
|
||||
}
|
||||
if (0 && op->opcode == BS_OP_SYNC && immediate_commit)
|
||||
|
|
|
@ -39,14 +39,12 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
|||
{
|
||||
// Invalid version requested
|
||||
op->retval = -EINVAL;
|
||||
FINISH_OP(op);
|
||||
return false;
|
||||
}
|
||||
if (deleted && is_del)
|
||||
{
|
||||
// Already deleted
|
||||
op->retval = 0;
|
||||
FINISH_OP(op);
|
||||
return false;
|
||||
}
|
||||
// Immediately add the operation into dirty_db, so subsequent reads could see it
|
||||
|
|
|
@ -211,51 +211,17 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
|
|||
bsd->last_sync = false;
|
||||
break;
|
||||
case DDIR_SYNC:
|
||||
op->opcode = BS_OP_SYNC;
|
||||
op->opcode = BS_OP_SYNC_STAB_ALL;
|
||||
op->callback = [io, n](blockstore_op_t *op)
|
||||
{
|
||||
bs_data *bsd = (bs_data*)io->engine_data;
|
||||
auto & unstable_writes = bsd->bs->get_unstable_writes();
|
||||
if (op->retval >= 0 && unstable_writes.size() > 0)
|
||||
{
|
||||
op->opcode = BS_OP_STABLE;
|
||||
op->len = unstable_writes.size();
|
||||
obj_ver_id *vers = new obj_ver_id[op->len];
|
||||
op->buf = vers;
|
||||
int i = 0;
|
||||
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++)
|
||||
{
|
||||
vers[i] = {
|
||||
.oid = it->first,
|
||||
.version = it->second,
|
||||
};
|
||||
}
|
||||
unstable_writes.clear();
|
||||
op->callback = [io, n](blockstore_op_t *op)
|
||||
{
|
||||
io->error = op->retval < 0 ? -op->retval : 0;
|
||||
bs_data *bsd = (bs_data*)io->engine_data;
|
||||
bsd->completed.push_back(io);
|
||||
bsd->inflight--;
|
||||
obj_ver_id *vers = (obj_ver_id*)op->buf;
|
||||
delete[] vers;
|
||||
io->error = op->retval < 0 ? -op->retval : 0;
|
||||
bsd->completed.push_back(io);
|
||||
bsd->inflight--;
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval);
|
||||
printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval);
|
||||
#endif
|
||||
delete op;
|
||||
};
|
||||
bsd->bs->enqueue_op(op);
|
||||
}
|
||||
else
|
||||
{
|
||||
io->error = op->retval < 0 ? -op->retval : 0;
|
||||
bsd->completed.push_back(io);
|
||||
bsd->inflight--;
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval);
|
||||
#endif
|
||||
delete op;
|
||||
}
|
||||
delete op;
|
||||
};
|
||||
bsd->last_sync = true;
|
||||
break;
|
||||
|
|
|
@ -84,44 +84,16 @@ void osd_t::exec_sync_stab_all(osd_op_t *cur_op)
|
|||
{
|
||||
// Sync and stabilize all objects
|
||||
// This command is only valid for tests
|
||||
// FIXME: Dedup between here & fio_engine
|
||||
if (!allow_test_ops)
|
||||
{
|
||||
cur_op->bs_op.retval = -EINVAL;
|
||||
secondary_op_callback(cur_op);
|
||||
return;
|
||||
}
|
||||
cur_op->bs_op.opcode = BS_OP_SYNC;
|
||||
cur_op->bs_op.opcode = BS_OP_SYNC_STAB_ALL;
|
||||
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t *bs_op)
|
||||
{
|
||||
auto & unstable_writes = bs->get_unstable_writes();
|
||||
if (bs_op->retval >= 0 && unstable_writes.size() > 0)
|
||||
{
|
||||
bs_op->opcode = BS_OP_STABLE;
|
||||
bs_op->len = unstable_writes.size();
|
||||
obj_ver_id *vers = new obj_ver_id[bs_op->len];
|
||||
bs_op->buf = vers;
|
||||
int i = 0;
|
||||
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++)
|
||||
{
|
||||
vers[i] = {
|
||||
.oid = it->first,
|
||||
.version = it->second,
|
||||
};
|
||||
}
|
||||
unstable_writes.clear();
|
||||
bs_op->callback = [this, cur_op](blockstore_op_t *bs_op)
|
||||
{
|
||||
secondary_op_callback(cur_op);
|
||||
obj_ver_id *vers = (obj_ver_id*)bs_op->buf;
|
||||
delete[] vers;
|
||||
};
|
||||
bs->enqueue_op(bs_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
secondary_op_callback(cur_op);
|
||||
}
|
||||
secondary_op_callback(cur_op);
|
||||
};
|
||||
#ifdef OSD_STUB
|
||||
cur_op->bs_op.retval = 0;
|
||||
|
|
Loading…
Reference in New Issue