Fix repeated syncs
parent
1080cbdf12
commit
2831d40edb
|
@ -141,14 +141,15 @@ resume_0:
|
||||||
{
|
{
|
||||||
// We don't flush different parts of history of the same object in parallel
|
// We don't flush different parts of history of the same object in parallel
|
||||||
// So we check if someone is already flushing this object
|
// So we check if someone is already flushing this object
|
||||||
// In that case we set sync_to_repeat to 2 and pick another object
|
// In that case we set sync_to_repeat and pick another object
|
||||||
// Another coroutine will see this "2" and re-queue the object after it finishes
|
// Another coroutine will see it and re-queue the object after it finishes
|
||||||
|
if (repeat_it->second < cur.version)
|
||||||
repeat_it->second = cur.version;
|
repeat_it->second = cur.version;
|
||||||
wait_state = 0;
|
wait_state = 0;
|
||||||
goto resume_0;
|
goto resume_0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
repeat_it->second = 0;
|
flusher->sync_to_repeat[cur.oid] = 0;
|
||||||
dirty_it = dirty_end;
|
dirty_it = dirty_end;
|
||||||
flusher->active_flushers++;
|
flusher->active_flushers++;
|
||||||
flusher->active_until_sync++;
|
flusher->active_until_sync++;
|
||||||
|
|
|
@ -25,13 +25,10 @@ int blockstore::dequeue_sync(blockstore_operation *op)
|
||||||
}
|
}
|
||||||
int r = continue_sync(op);
|
int r = continue_sync(op);
|
||||||
if (r)
|
if (r)
|
||||||
{
|
|
||||||
int done = ack_sync(op);
|
|
||||||
if (!done)
|
|
||||||
{
|
{
|
||||||
op->prev_sync_count = in_progress_syncs.size();
|
op->prev_sync_count = in_progress_syncs.size();
|
||||||
op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op);
|
op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op);
|
||||||
}
|
ack_sync(op);
|
||||||
}
|
}
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
@ -39,7 +36,6 @@ int blockstore::dequeue_sync(blockstore_operation *op)
|
||||||
int blockstore::continue_sync(blockstore_operation *op)
|
int blockstore::continue_sync(blockstore_operation *op)
|
||||||
{
|
{
|
||||||
auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
|
auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
|
||||||
op->min_used_journal_sector = op->max_used_journal_sector = 0;
|
|
||||||
if (op->sync_state == SYNC_HAS_SMALL)
|
if (op->sync_state == SYNC_HAS_SMALL)
|
||||||
{
|
{
|
||||||
// No big writes, just fsync the journal
|
// No big writes, just fsync the journal
|
||||||
|
@ -47,6 +43,7 @@ int blockstore::continue_sync(blockstore_operation *op)
|
||||||
my_uring_prep_fsync(sqe, journal.fd, 0);
|
my_uring_prep_fsync(sqe, journal.fd, 0);
|
||||||
data->iov = { 0 };
|
data->iov = { 0 };
|
||||||
data->callback = cb;
|
data->callback = cb;
|
||||||
|
op->min_used_journal_sector = op->max_used_journal_sector = 0;
|
||||||
op->pending_ops = 1;
|
op->pending_ops = 1;
|
||||||
op->sync_state = SYNC_JOURNAL_SYNC_SENT;
|
op->sync_state = SYNC_JOURNAL_SYNC_SENT;
|
||||||
}
|
}
|
||||||
|
@ -57,6 +54,7 @@ int blockstore::continue_sync(blockstore_operation *op)
|
||||||
my_uring_prep_fsync(sqe, data_fd, 0);
|
my_uring_prep_fsync(sqe, data_fd, 0);
|
||||||
data->iov = { 0 };
|
data->iov = { 0 };
|
||||||
data->callback = cb;
|
data->callback = cb;
|
||||||
|
op->min_used_journal_sector = op->max_used_journal_sector = 0;
|
||||||
op->pending_ops = 1;
|
op->pending_ops = 1;
|
||||||
op->sync_state = SYNC_DATA_SYNC_SENT;
|
op->sync_state = SYNC_DATA_SYNC_SENT;
|
||||||
}
|
}
|
||||||
|
@ -108,10 +106,6 @@ int blockstore::continue_sync(blockstore_operation *op)
|
||||||
op->sync_state = SYNC_JOURNAL_SYNC_SENT;
|
op->sync_state = SYNC_JOURNAL_SYNC_SENT;
|
||||||
ringloop->submit();
|
ringloop->submit();
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ struct bs_data
|
||||||
ring_loop_t *ringloop;
|
ring_loop_t *ringloop;
|
||||||
/* The list of completed io_u structs. */
|
/* The list of completed io_u structs. */
|
||||||
std::vector<io_u*> completed;
|
std::vector<io_u*> completed;
|
||||||
int op_n = 0;
|
int op_n = 0, inflight = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct bs_options
|
struct bs_options
|
||||||
|
@ -151,6 +151,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
|
||||||
{
|
{
|
||||||
io->error = op->retval < 0 ? -op->retval : 0;
|
io->error = op->retval < 0 ? -op->retval : 0;
|
||||||
bs_data *bsd = (bs_data*)io->engine_data;
|
bs_data *bsd = (bs_data*)io->engine_data;
|
||||||
|
bsd->inflight--;
|
||||||
bsd->completed.push_back(io);
|
bsd->completed.push_back(io);
|
||||||
if (DEBUG)
|
if (DEBUG)
|
||||||
printf("--- OP_WRITE %llx n=%d retval=%d\n", io, n, op->retval);
|
printf("--- OP_WRITE %llx n=%d retval=%d\n", io, n, op->retval);
|
||||||
|
@ -182,6 +183,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
|
||||||
io->error = op->retval < 0 ? -op->retval : 0;
|
io->error = op->retval < 0 ? -op->retval : 0;
|
||||||
bs_data *bsd = (bs_data*)io->engine_data;
|
bs_data *bsd = (bs_data*)io->engine_data;
|
||||||
bsd->completed.push_back(io);
|
bsd->completed.push_back(io);
|
||||||
|
bsd->inflight--;
|
||||||
obj_ver_id *vers = (obj_ver_id*)op->buf;
|
obj_ver_id *vers = (obj_ver_id*)op->buf;
|
||||||
delete[] vers;
|
delete[] vers;
|
||||||
if (DEBUG)
|
if (DEBUG)
|
||||||
|
@ -193,6 +195,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
|
||||||
{
|
{
|
||||||
io->error = op->retval < 0 ? -op->retval : 0;
|
io->error = op->retval < 0 ? -op->retval : 0;
|
||||||
bsd->completed.push_back(io);
|
bsd->completed.push_back(io);
|
||||||
|
bsd->inflight--;
|
||||||
if (DEBUG)
|
if (DEBUG)
|
||||||
printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval);
|
printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval);
|
||||||
delete op;
|
delete op;
|
||||||
|
@ -207,6 +210,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
|
||||||
if (DEBUG)
|
if (DEBUG)
|
||||||
printf("+++ %s %llx\n", op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC", io);
|
printf("+++ %s %llx\n", op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC", io);
|
||||||
io->error = 0;
|
io->error = 0;
|
||||||
|
bsd->inflight++;
|
||||||
bsd->bs->enqueue_op(op);
|
bsd->bs->enqueue_op(op);
|
||||||
bsd->op_n++;
|
bsd->op_n++;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue