Browse Source

Rename blockstore_operation to blockstore_op_t

blocking-uring-test
Vitaliy Filippov 2 years ago
parent
commit
749ab6e2c6
  1. 8
      blockstore.cpp
  2. 49
      blockstore.h
  3. 2
      blockstore_journal.cpp
  4. 2
      blockstore_journal.h
  5. 8
      blockstore_read.cpp
  6. 4
      blockstore_stable.cpp
  7. 10
      blockstore_sync.cpp
  8. 8
      blockstore_write.cpp
  9. 10
      fio_engine.cpp
  10. 2
      osd.cpp
  11. 5
      osd.h
  12. 4
      test_blockstore.cpp

8
blockstore.cpp

@ -205,10 +205,10 @@ bool blockstore::is_safe_to_stop()
if (!readonly && !stop_sync_submitted)
{
// We should sync the blockstore before unmounting
blockstore_operation *op = new blockstore_operation;
blockstore_op_t *op = new blockstore_op_t;
op->flags = OP_SYNC;
op->buf = NULL;
op->callback = [](blockstore_operation *op)
op->callback = [](blockstore_op_t *op)
{
delete op;
};
@ -220,7 +220,7 @@ bool blockstore::is_safe_to_stop()
return true;
}
void blockstore::check_wait(blockstore_operation *op)
void blockstore::check_wait(blockstore_op_t *op)
{
if (op->wait_for == WAIT_SQE)
{
@ -276,7 +276,7 @@ void blockstore::check_wait(blockstore_operation *op)
}
}
void blockstore::enqueue_op(blockstore_operation *op)
void blockstore::enqueue_op(blockstore_op_t *op)
{
int type = op->flags & OP_TYPE_MASK;
if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) &&

49
blockstore.h

@ -64,9 +64,6 @@
#define MAX_BLOCK_SIZE 128*1024*1024
#define DISK_ALIGNMENT 512
#define STRIPE_NUM(oid) ((oid) >> 4)
#define STRIPE_REPLICA(oid) ((oid) & 0xf)
#define BS_SUBMIT_GET_SQE(sqe, data) \
BS_SUBMIT_GET_ONLY_SQE(sqe); \
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
@ -91,7 +88,7 @@
class blockstore;
class blockstore_operation;
class blockstore_op_t;
// 16 bytes per object/stripe id
// stripe includes replica number in 4 least significant bits
@ -207,12 +204,12 @@ struct fulfill_read_t
uint64_t offset, len;
};
struct blockstore_operation
struct blockstore_op_t
{
// flags contain operation type and possibly other flags
uint64_t flags;
// finish callback
std::function<void (blockstore_operation*)> callback;
std::function<void (blockstore_op_t*)> callback;
// For reads, writes & deletes: oid is the requested object
object_id oid;
// For reads: version=0 -> last stable, version=UINT64_MAX -> last unstable, version=X -> specific version
@ -246,7 +243,7 @@ private:
// Sync
std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
std::list<blockstore_operation*>::iterator in_progress_ptr;
std::list<blockstore_op_t*>::iterator in_progress_ptr;
int sync_state, prev_sync_count;
};
@ -263,9 +260,9 @@ class blockstore
// Another option is https://github.com/algorithm-ninja/cpp-btree
spp::sparse_hash_map<object_id, clean_entry, oid_hash> clean_db;
std::map<obj_ver_id, dirty_entry> dirty_db;
std::list<blockstore_operation*> submit_queue; // FIXME: funny thing is that vector is better here
std::list<blockstore_op_t*> submit_queue; // FIXME: funny thing is that vector is better here
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
std::list<blockstore_operation*> in_progress_syncs; // ...and probably here, too
std::list<blockstore_op_t*> in_progress_syncs; // ...and probably here, too
allocator *data_alloc = NULL;
uint8_t *zero_object;
@ -313,32 +310,32 @@ class blockstore
blockstore_init_meta* metadata_init_reader;
blockstore_init_journal* journal_init_reader;
void check_wait(blockstore_operation *op);
void check_wait(blockstore_op_t *op);
// Read
int dequeue_read(blockstore_operation *read_op);
int fulfill_read(blockstore_operation *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
int dequeue_read(blockstore_op_t *read_op);
int fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location);
int fulfill_read_push(blockstore_operation *op, void *buf, uint64_t offset, uint64_t len,
int fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len,
uint32_t item_state, uint64_t item_version);
void handle_read_event(ring_data_t *data, blockstore_operation *op);
void handle_read_event(ring_data_t *data, blockstore_op_t *op);
// Write
void enqueue_write(blockstore_operation *op);
int dequeue_write(blockstore_operation *op);
int dequeue_del(blockstore_operation *op);
void handle_write_event(ring_data_t *data, blockstore_operation *op);
void enqueue_write(blockstore_op_t *op);
int dequeue_write(blockstore_op_t *op);
int dequeue_del(blockstore_op_t *op);
void handle_write_event(ring_data_t *data, blockstore_op_t *op);
// Sync
int dequeue_sync(blockstore_operation *op);
void handle_sync_event(ring_data_t *data, blockstore_operation *op);
int continue_sync(blockstore_operation *op);
void ack_one_sync(blockstore_operation *op);
int ack_sync(blockstore_operation *op);
int dequeue_sync(blockstore_op_t *op);
void handle_sync_event(ring_data_t *data, blockstore_op_t *op);
int continue_sync(blockstore_op_t *op);
void ack_one_sync(blockstore_op_t *op);
int ack_sync(blockstore_op_t *op);
// Stabilize
int dequeue_stable(blockstore_operation *op);
void handle_stable_event(ring_data_t *data, blockstore_operation *op);
int dequeue_stable(blockstore_op_t *op);
void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
void stabilize_object(object_id oid, uint64_t max_ver);
public:
@ -359,7 +356,7 @@ public:
bool is_safe_to_stop();
// Submission
void enqueue_op(blockstore_operation *op);
void enqueue_op(blockstore_op_t *op);
// Unstable writes are added here (map of object_id -> version)
std::map<object_id, uint64_t> unstable_writes;

2
blockstore_journal.cpp

@ -11,7 +11,7 @@ blockstore_journal_check_t::blockstore_journal_check_t(blockstore *bs)
}
// Check if we can write <required> entries of <size> bytes and <data_after> data bytes after them to the journal
int blockstore_journal_check_t::check_available(blockstore_operation *op, int required, int size, int data_after)
int blockstore_journal_check_t::check_available(blockstore_op_t *op, int required, int size, int data_after)
{
while (1)
{

2
blockstore_journal.h

@ -144,7 +144,7 @@ struct blockstore_journal_check_t
bool right_dir; // writing to the end or the beginning of the ring buffer
blockstore_journal_check_t(blockstore *bs);
int check_available(blockstore_operation *op, int required, int size, int data_after);
int check_available(blockstore_op_t *op, int required, int size, int data_after);
};
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size);

8
blockstore_read.cpp

@ -1,6 +1,6 @@
#include "blockstore.h"
int blockstore::fulfill_read_push(blockstore_operation *op, void *buf, uint64_t offset, uint64_t len,
int blockstore::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len,
uint32_t item_state, uint64_t item_version)
{
if (IS_IN_FLIGHT(item_state))
@ -34,7 +34,7 @@ int blockstore::fulfill_read_push(blockstore_operation *op, void *buf, uint64_t
return 1;
}
int blockstore::fulfill_read(blockstore_operation *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
int blockstore::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location)
{
uint32_t cur_start = item_start;
@ -69,7 +69,7 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint64_t &fulfilled,
return 1;
}
int blockstore::dequeue_read(blockstore_operation *read_op)
int blockstore::dequeue_read(blockstore_op_t *read_op)
{
auto clean_it = clean_db.find(read_op->oid);
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
@ -148,7 +148,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
return 1;
}
void blockstore::handle_read_event(ring_data_t *data, blockstore_operation *op)
void blockstore::handle_read_event(ring_data_t *data, blockstore_op_t *op)
{
op->pending_ops--;
if (data->res != data->iov.iov_len)

4
blockstore_stable.cpp

@ -38,7 +38,7 @@
// 4) after a while it takes his synced object list and sends stabilize requests
// to peers and to its own blockstore, thus freeing the old version
int blockstore::dequeue_stable(blockstore_operation *op)
int blockstore::dequeue_stable(blockstore_op_t *op)
{
obj_ver_id* v;
int i, todo = 0;
@ -121,7 +121,7 @@ int blockstore::dequeue_stable(blockstore_operation *op)
return 1;
}
void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op)
void blockstore::handle_stable_event(ring_data_t *data, blockstore_op_t *op)
{
if (data->res != data->iov.iov_len)
{

10
blockstore_sync.cpp

@ -7,7 +7,7 @@
#define SYNC_JOURNAL_SYNC_SENT 5
#define SYNC_DONE 6
int blockstore::dequeue_sync(blockstore_operation *op)
int blockstore::dequeue_sync(blockstore_op_t *op)
{
if (op->sync_state == 0)
{
@ -33,7 +33,7 @@ int blockstore::dequeue_sync(blockstore_operation *op)
return r;
}
int blockstore::continue_sync(blockstore_operation *op)
int blockstore::continue_sync(blockstore_op_t *op)
{
auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
if (op->sync_state == SYNC_HAS_SMALL)
@ -131,7 +131,7 @@ int blockstore::continue_sync(blockstore_operation *op)
return 1;
}
void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op)
void blockstore::handle_sync_event(ring_data_t *data, blockstore_op_t *op)
{
if (data->res != data->iov.iov_len)
{
@ -173,7 +173,7 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op)
}
}
int blockstore::ack_sync(blockstore_operation *op)
int blockstore::ack_sync(blockstore_op_t *op)
{
if (op->sync_state == SYNC_DONE && op->prev_sync_count == 0)
{
@ -199,7 +199,7 @@ int blockstore::ack_sync(blockstore_operation *op)
return 0;
}
void blockstore::ack_one_sync(blockstore_operation *op)
void blockstore::ack_one_sync(blockstore_op_t *op)
{
// Handle states
for (auto it = op->sync_big_writes.begin(); it != op->sync_big_writes.end(); it++)

8
blockstore_write.cpp

@ -1,6 +1,6 @@
#include "blockstore.h"
void blockstore::enqueue_write(blockstore_operation *op)
void blockstore::enqueue_write(blockstore_op_t *op)
{
// Assign version number
bool found = false, deleted = false, is_del = (op->flags & OP_TYPE_MASK) == OP_DELETE;
@ -60,7 +60,7 @@ void blockstore::enqueue_write(blockstore_operation *op)
}
// First step of the write algorithm: dequeue operation and submit initial write(s)
int blockstore::dequeue_write(blockstore_operation *op)
int blockstore::dequeue_write(blockstore_op_t *op)
{
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
@ -184,7 +184,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
return 1;
}
void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op)
void blockstore::handle_write_event(ring_data_t *data, blockstore_op_t *op)
{
if (data->res != data->iov.iov_len)
{
@ -236,7 +236,7 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op)
}
}
int blockstore::dequeue_del(blockstore_operation *op)
int blockstore::dequeue_del(blockstore_op_t *op)
{
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,

10
fio_engine.cpp

@ -180,7 +180,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
if (io->ddir == DDIR_WRITE || io->ddir == DDIR_READ)
assert(io->xfer_buflen <= bsd->bs->get_block_size());
blockstore_operation *op = new blockstore_operation;
blockstore_op_t *op = new blockstore_op_t;
op->callback = NULL;
switch (io->ddir)
@ -194,7 +194,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
};
op->offset = io->offset % bsd->bs->get_block_size();
op->len = io->xfer_buflen;
op->callback = [io, n](blockstore_operation *op)
op->callback = [io, n](blockstore_op_t *op)
{
io->error = op->retval < 0 ? -op->retval : 0;
bs_data *bsd = (bs_data*)io->engine_data;
@ -215,7 +215,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
};
op->offset = io->offset % bsd->bs->get_block_size();
op->len = io->xfer_buflen;
op->callback = [io, n](blockstore_operation *op)
op->callback = [io, n](blockstore_op_t *op)
{
io->error = op->retval < 0 ? -op->retval : 0;
bs_data *bsd = (bs_data*)io->engine_data;
@ -229,7 +229,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
break;
case DDIR_SYNC:
op->flags = OP_SYNC;
op->callback = [io, n](blockstore_operation *op)
op->callback = [io, n](blockstore_op_t *op)
{
bs_data *bsd = (bs_data*)io->engine_data;
if (op->retval >= 0 && bsd->bs->unstable_writes.size() > 0)
@ -247,7 +247,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
};
}
bsd->bs->unstable_writes.clear();
op->callback = [io, n](blockstore_operation *op)
op->callback = [io, n](blockstore_op_t *op)
{
io->error = op->retval < 0 ? -op->retval : 0;
bs_data *bsd = (bs_data*)io->engine_data;

2
osd.cpp

@ -309,7 +309,7 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
void osd_t::enqueue_op(osd_op_t *cur_op)
{
cur_op->bs_op.callback = [this, cur_op](blockstore_operation* bs_op)
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op)
{
auto cl_it = clients.find(cur_op->peer_fd);
if (cl_it != clients.end())

5
osd.h

@ -7,6 +7,9 @@
#include "ringloop.h"
#include "osd_ops.h"
#define STRIPE_NUM(stripe) ((stripe) >> 4)
#define STRIPE_REPLICA(stripe) ((stripe) & 0xf)
struct osd_op_t
{
int peer_fd;
@ -20,7 +23,7 @@ struct osd_op_t
osd_any_reply_t reply;
uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 };
};
blockstore_operation bs_op;
blockstore_op_t bs_op;
void *buf = NULL;
~osd_op_t()

4
test_blockstore.cpp

@ -15,11 +15,11 @@ int main(int narg, char *args[])
printf("tick 1s\n");
});
blockstore_operation op;
blockstore_op_t op;
int main_state = 0;
uint64_t version = 0;
ring_consumer_t main_cons;
op.callback = [&](blockstore_operation *op)
op.callback = [&](blockstore_op_t *op)
{
printf("op completed %d\n", op->retval);
if (main_state == 1)

Loading…
Cancel
Save