Begin implementation of the STABLE operation

blocking-uring-test
Vitaliy Filippov 2019-11-10 14:37:45 +03:00
parent 7aabe11ef9
commit 890335bff6
4 changed files with 65 additions and 3 deletions

View File

@ -1,4 +1,5 @@
all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o blockstore_write.o blockstore_sync.o crc32c.o ringloop.o test
all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o \
blockstore_write.o blockstore_sync.o blockstore_stable.o crc32c.o ringloop.o test
clean:
rm -f *.o
crc32c.o: crc32c.c

View File

@ -81,7 +81,7 @@ void blockstore::handle_event(ring_data_t *data)
}
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
{
handle_stable_event(data, op);
}
}
}
@ -169,7 +169,7 @@ void blockstore::loop()
}
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
{
dequeue_op = dequeue_stable(op);
}
if (dequeue_op)
{

View File

@ -162,6 +162,8 @@ public:
// to calculate parity for subsequent writes
// - Writes may be submitted in any order, because they don't overlap. Each write
// goes into a new location - either on the journal device or on the data device
// - Stable (stabilize) must be submitted after sync of that object is completed
// It's even OK to return an error to the caller if that object is not synced yet
// - Journal trim may be processed only after all versions are moved to
// the main storage AND after all read operations for older versions complete
// - If an operation can not be submitted because the ring is full
@ -286,6 +288,10 @@ class blockstore
int continue_sync(blockstore_operation *op);
int ack_sync(blockstore_operation *op);
// Stable
int dequeue_stable(blockstore_operation *op);
void handle_stable_event(ring_data_t *data, blockstore_operation *op);
public:
blockstore(spp::sparse_hash_map<std::string, std::string> & config, ring_loop_t *ringloop);

55
blockstore_stable.cpp Normal file
View File

@ -0,0 +1,55 @@
#include "blockstore.h"
int blockstore::dequeue_stable(blockstore_operation *op)
{
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
if (dirty_it == dirty_db.end())
{
auto clean_it = object_db.find(op->oid);
if (clean_it == object_db.end() || clean_it->second.version < op->version)
{
// No such object version
op->retval = EINVAL;
}
else
{
// Already stable
op->retval = 0;
}
op->callback(op);
return 1;
}
else if (IS_UNSYNCED(dirty_it->second.state))
{
// Object not synced yet. Caller must sync it first
op->retval = EAGAIN;
op->callback(op);
return 1;
}
else if (IS_STABLE(dirty_it->second.state))
{
// Already stable
op->retval = 0;
op->callback(op);
return 1;
}
return 0;
}
void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op)
{
if (data->res < 0)
{
// sync error
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
}
op->pending_ops--;
if (op->pending_ops == 0)
{
}
}