Compare commits

...

6 Commits

Author SHA1 Message Date
Vitaliy Filippov 20a4406acc Support IPv6 OSD addresses 2021-12-19 10:42:17 +03:00
Vitaliy Filippov f93491bc6c Implement journal write batching and slightly refactor journal writes
Slightly reduces WA. For example, in 4K T1Q128 replicated randwrite tests
WA is reduced from ~3.6 to ~3.1, in T1Q64 from ~3.8 to ~3.4.

Only effective without no_same_sector_overwrites.
2021-12-16 00:27:17 +03:00
Vitaliy Filippov 999bed8514 Fix opening regular files as blockstore 2021-12-15 02:08:58 +03:00
Vitaliy Filippov 3f33095fd7 Do not try to initialize client in simple-offsets 2021-12-15 02:07:27 +03:00
Vitaliy Filippov dd74c5ce1b Fix OSDs marking PGs incomplete instead of trying to connect with peers 2021-12-14 01:57:51 +03:00
Vitaliy Filippov c6d104ecd6 Print object version on fatal overwrite 2021-12-14 01:57:04 +03:00
25 changed files with 342 additions and 358 deletions

View File

@ -88,8 +88,8 @@ if (IBVERBS_LIBRARIES)
set(MSGR_RDMA "msgr_rdma.cpp") set(MSGR_RDMA "msgr_rdma.cpp")
endif (IBVERBS_LIBRARIES) endif (IBVERBS_LIBRARIES)
add_library(vitastor_common STATIC add_library(vitastor_common STATIC
epoll_manager.cpp etcd_state_client.cpp epoll_manager.cpp etcd_state_client.cpp messenger.cpp addr_util.cpp
messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp
http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ${MSGR_RDMA} http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ${MSGR_RDMA}
) )
target_compile_options(vitastor_common PUBLIC -fPIC) target_compile_options(vitastor_common PUBLIC -fPIC)
@ -112,6 +112,7 @@ if (${WITH_FIO})
add_library(fio_vitastor_sec SHARED add_library(fio_vitastor_sec SHARED
fio_sec_osd.cpp fio_sec_osd.cpp
rw_blocking.cpp rw_blocking.cpp
addr_util.cpp
) )
target_link_libraries(fio_vitastor_sec target_link_libraries(fio_vitastor_sec
tcmalloc_minimal tcmalloc_minimal
@ -189,11 +190,11 @@ endif (${WITH_QEMU})
### Test stubs ### Test stubs
# stub_osd, stub_bench, osd_test # stub_osd, stub_bench, osd_test
add_executable(stub_osd stub_osd.cpp rw_blocking.cpp) add_executable(stub_osd stub_osd.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(stub_osd tcmalloc_minimal) target_link_libraries(stub_osd tcmalloc_minimal)
add_executable(stub_bench stub_bench.cpp rw_blocking.cpp) add_executable(stub_bench stub_bench.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(stub_bench tcmalloc_minimal) target_link_libraries(stub_bench tcmalloc_minimal)
add_executable(osd_test osd_test.cpp rw_blocking.cpp) add_executable(osd_test osd_test.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(osd_test tcmalloc_minimal) target_link_libraries(osd_test tcmalloc_minimal)
# osd_rmw_test # osd_rmw_test

60
src/addr_util.cpp Normal file
View File

@ -0,0 +1,60 @@
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <stdexcept>
#include "addr_util.h"
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr)
{
if (parse_port)
{
int p = str.rfind(':');
if (p != std::string::npos && !(str.length() > 0 && str[p-1] == ']')) // "[ipv6]" which contains ':'
{
char null_byte = 0;
int n = sscanf(str.c_str()+p+1, "%d%c", &default_port, &null_byte);
if (n != 1 || default_port >= 0x10000)
return false;
str = str.substr(0, p);
}
}
if (inet_pton(AF_INET, str.c_str(), &((struct sockaddr_in*)addr)->sin_addr) == 1)
{
addr->sa_family = AF_INET;
((struct sockaddr_in*)addr)->sin_port = htons(default_port);
return true;
}
if (str.length() >= 2 && str[0] == '[' && str[str.length()-1] == ']')
str = str.substr(1, str.length()-2);
if (inet_pton(AF_INET6, str.c_str(), &((struct sockaddr_in6*)addr)->sin6_addr) == 1)
{
addr->sa_family = AF_INET6;
((struct sockaddr_in6*)addr)->sin6_port = htons(default_port);
return true;
}
return false;
}
std::string addr_to_string(const sockaddr &addr)
{
char peer_str[256];
bool ok = false;
int port;
if (addr.sa_family == AF_INET)
{
ok = !!inet_ntop(AF_INET, &((sockaddr_in*)&addr)->sin_addr, peer_str, 256);
port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else if (addr.sa_family == AF_INET6)
{
ok = !!inet_ntop(AF_INET6, &((sockaddr_in6*)&addr)->sin6_addr, peer_str, 256);
port = ntohs(((sockaddr_in6*)&addr)->sin6_port);
}
else
throw std::runtime_error("Unknown address family "+std::to_string(addr.sa_family));
if (!ok)
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
return std::string(peer_str)+":"+std::to_string(port);
}

7
src/addr_util.h Normal file
View File

@ -0,0 +1,7 @@
#pragma once
#include <sys/socket.h>
#include <string>
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr);
std::string addr_to_string(const sockaddr &addr);

View File

@ -547,8 +547,9 @@ resume_1:
clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size); clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid) if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid)
{ {
printf("Fatal error (metadata corruption or bug): tried to delete metadata entry %lu (%lx:%lx) while deleting %lx:%lx\n", printf("Fatal error (metadata corruption or bug): tried to delete metadata entry %lu (%lx:%lx v%lu) while deleting %lx:%lx\n",
clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe); clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe,
new_entry->version, cur.oid.inode, cur.oid.stripe);
exit(1); exit(1);
} }
// zero out new metadata entry // zero out new metadata entry
@ -559,8 +560,9 @@ resume_1:
clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size); clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid) if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid)
{ {
printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx) with %lx:%lx\n", printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx v%lu) with %lx:%lx v%lu\n",
clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe); clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, new_entry->version,
cur.oid.inode, cur.oid.stripe, cur.version);
exit(1); exit(1);
} }
new_entry->oid = cur.oid; new_entry->oid = cur.oid;

View File

@ -235,6 +235,12 @@ void blockstore_impl_t::loop()
{ {
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
} }
for (auto s: journal.submitting_sectors)
{
// Mark journal sector writes as submitted
journal.sector_info[s].submit_id = 0;
}
journal.submitting_sectors.clear();
if ((initial_ring_space - ringloop->space_left()) > 0) if ((initial_ring_space - ringloop->space_left()) > 0)
{ {
live = true; live = true;

View File

@ -54,6 +54,14 @@
#define IS_BIG_WRITE(st) (((st) & 0x0F) == BS_ST_BIG_WRITE) #define IS_BIG_WRITE(st) (((st) & 0x0F) == BS_ST_BIG_WRITE)
#define IS_DELETE(st) (((st) & 0x0F) == BS_ST_DELETE) #define IS_DELETE(st) (((st) & 0x0F) == BS_ST_DELETE)
#define BS_SUBMIT_CHECK_SQES(n) \
if (ringloop->space_left() < (n))\
{\
/* Pause until there are more requests available */\
PRIV(op)->wait_for = WAIT_SQE;\
return 0;\
}
#define BS_SUBMIT_GET_SQE(sqe, data) \ #define BS_SUBMIT_GET_SQE(sqe, data) \
BS_SUBMIT_GET_ONLY_SQE(sqe); \ BS_SUBMIT_GET_ONLY_SQE(sqe); \
struct ring_data_t *data = ((ring_data_t*)sqe->user_data) struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
@ -170,7 +178,7 @@ struct blockstore_op_private_t
std::vector<fulfill_read_t> read_vec; std::vector<fulfill_read_t> read_vec;
// Sync, write // Sync, write
uint64_t min_flushed_journal_sector, max_flushed_journal_sector; int min_flushed_journal_sector, max_flushed_journal_sector;
// Write // Write
struct iovec iov_zerofill[3]; struct iovec iov_zerofill[3];
@ -283,6 +291,10 @@ class blockstore_impl_t
void open_journal(); void open_journal();
uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset); uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset);
// Journaling
void prepare_journal_sector_write(int sector, blockstore_op_t *op);
void handle_journal_write(ring_data_t *data, uint64_t flush_id);
// Asynchronous init // Asynchronous init
int initialized; int initialized;
int metadata_buf_size; int metadata_buf_size;
@ -310,21 +322,18 @@ class blockstore_impl_t
// Sync // Sync
int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync); int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync);
void handle_sync_event(ring_data_t *data, blockstore_op_t *op);
void ack_sync(blockstore_op_t *op); void ack_sync(blockstore_op_t *op);
// Stabilize // Stabilize
int dequeue_stable(blockstore_op_t *op); int dequeue_stable(blockstore_op_t *op);
int continue_stable(blockstore_op_t *op); int continue_stable(blockstore_op_t *op);
void mark_stable(const obj_ver_id & ov, bool forget_dirty = false); void mark_stable(const obj_ver_id & ov, bool forget_dirty = false);
void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
void stabilize_object(object_id oid, uint64_t max_ver); void stabilize_object(object_id oid, uint64_t max_ver);
// Rollback // Rollback
int dequeue_rollback(blockstore_op_t *op); int dequeue_rollback(blockstore_op_t *op);
int continue_rollback(blockstore_op_t *op); int continue_rollback(blockstore_op_t *op);
void mark_rolled_back(const obj_ver_id & ov); void mark_rolled_back(const obj_ver_id & ov);
void handle_rollback_event(ring_data_t *data, blockstore_op_t *op);
void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc); void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc);
// List // List

View File

@ -153,22 +153,73 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
return je; return je;
} }
void prepare_journal_sector_write(journal_t & journal, int cur_sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb) void blockstore_impl_t::prepare_journal_sector_write(int cur_sector, blockstore_op_t *op)
{ {
// Don't submit the same sector twice in the same batch
if (!journal.sector_info[cur_sector].submit_id)
{
io_uring_sqe *sqe = get_sqe();
// Caller must ensure availability of an SQE
assert(sqe != NULL);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
journal.sector_info[cur_sector].written = true;
journal.sector_info[cur_sector].submit_id = ++journal.submit_id;
journal.submitting_sectors.push_back(cur_sector);
journal.sector_info[cur_sector].flush_count++;
data->iov = (struct iovec){
(journal.inmemory
? journal.buffer + journal.sector_info[cur_sector].offset
: journal.sector_buf + journal.block_size*cur_sector),
journal.block_size
};
data->callback = [this, flush_id = journal.submit_id](ring_data_t *data) { handle_journal_write(data, flush_id); };
my_uring_prep_writev(
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset
);
}
journal.sector_info[cur_sector].dirty = false; journal.sector_info[cur_sector].dirty = false;
journal.sector_info[cur_sector].written = true; // But always remember that this operation has to wait until this exact journal write is finished
journal.sector_info[cur_sector].flush_count++; journal.flushing_ops.insert((pending_journaling_t){
ring_data_t *data = ((ring_data_t*)sqe->user_data); .flush_id = journal.sector_info[cur_sector].submit_id,
data->iov = (struct iovec){ .sector = cur_sector,
(journal.inmemory .op = op,
? journal.buffer + journal.sector_info[cur_sector].offset });
: journal.sector_buf + journal.block_size*cur_sector), auto priv = PRIV(op);
journal.block_size priv->pending_ops++;
}; if (!priv->min_flushed_journal_sector)
data->callback = cb; priv->min_flushed_journal_sector = 1+cur_sector;
my_uring_prep_writev( priv->max_flushed_journal_sector = 1+cur_sector;
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset }
);
void blockstore_impl_t::handle_journal_write(ring_data_t *data, uint64_t flush_id)
{
live = true;
if (data->res != data->iov.iov_len)
{
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
throw std::runtime_error(
"journal write failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
auto fl_it = journal.flushing_ops.upper_bound((pending_journaling_t){ .flush_id = flush_id });
if (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id)
{
journal.sector_info[fl_it->sector].flush_count--;
}
while (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id)
{
auto priv = PRIV(fl_it->op);
priv->pending_ops--;
assert(priv->pending_ops >= 0);
if (priv->pending_ops == 0)
{
release_journal_sectors(fl_it->op);
priv->op_state++;
ringloop->wakeup();
}
journal.flushing_ops.erase(fl_it++);
}
} }
journal_t::~journal_t() journal_t::~journal_t()

View File

@ -4,6 +4,7 @@
#pragma once #pragma once
#include "crc32c.h" #include "crc32c.h"
#include <set>
#define MIN_JOURNAL_SIZE 4*1024*1024 #define MIN_JOURNAL_SIZE 4*1024*1024
#define JOURNAL_MAGIC 0x4A33 #define JOURNAL_MAGIC 0x4A33
@ -145,8 +146,21 @@ struct journal_sector_info_t
uint64_t flush_count; uint64_t flush_count;
bool written; bool written;
bool dirty; bool dirty;
uint64_t submit_id;
}; };
struct pending_journaling_t
{
uint64_t flush_id;
int sector;
blockstore_op_t *op;
};
inline bool operator < (const pending_journaling_t & a, const pending_journaling_t & b)
{
return a.flush_id < b.flush_id || a.flush_id == b.flush_id && a.op < b.op;
}
struct journal_t struct journal_t
{ {
int fd; int fd;
@ -172,6 +186,9 @@ struct journal_t
bool no_same_sector_overwrites = false; bool no_same_sector_overwrites = false;
int cur_sector = 0; int cur_sector = 0;
int in_sector_pos = 0; int in_sector_pos = 0;
std::vector<int> submitting_sectors;
std::set<pending_journaling_t> flushing_ops;
uint64_t submit_id = 0;
// Used sector map // Used sector map
// May use ~ 80 MB per 1 GB of used journal space in the worst case // May use ~ 80 MB per 1 GB of used journal space in the worst case
@ -200,5 +217,3 @@ struct blockstore_journal_check_t
}; };
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size); journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size);
void prepare_journal_sector_write(journal_t & journal, int sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb);

View File

@ -306,6 +306,10 @@ static void check_size(int fd, uint64_t *size, uint64_t *sectsize, std::string n
if (S_ISREG(st.st_mode)) if (S_ISREG(st.st_mode))
{ {
*size = st.st_size; *size = st.st_size;
if (sectsize)
{
*sectsize = st.st_blksize;
}
} }
else if (S_ISBLK(st.st_mode)) else if (S_ISBLK(st.st_mode))
{ {

View File

@ -74,24 +74,17 @@ skip_ov:
{ {
return 0; return 0;
} }
// There is sufficient space. Get SQEs // There is sufficient space. Check SQEs
struct io_uring_sqe *sqe[space_check.sectors_to_write]; BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
for (i = 0; i < space_check.sectors_to_write; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// Prepare and submit journal entries // Prepare and submit journal entries
auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); }; int s = 0;
int s = 0, cur_sector = -1;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{ {
if (!journal.entry_fits(sizeof(journal_entry_rollback)) && if (!journal.entry_fits(sizeof(journal_entry_rollback)) &&
journal.sector_info[journal.cur_sector].dirty) journal.sector_info[journal.cur_sector].dirty)
{ {
if (cur_sector == -1) prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; s++;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
cur_sector = journal.cur_sector;
} }
journal_entry_rollback *je = (journal_entry_rollback*) journal_entry_rollback *je = (journal_entry_rollback*)
prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback)); prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback));
@ -100,12 +93,9 @@ skip_ov:
je->crc32 = je_crc32((journal_entry*)je); je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32; journal.crc32_last = je->crc32;
} }
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); prepare_journal_sector_write(journal.cur_sector, op);
s++;
assert(s == space_check.sectors_to_write); assert(s == space_check.sectors_to_write);
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = 1; PRIV(op)->op_state = 1;
return 1; return 1;
} }
@ -114,30 +104,23 @@ int blockstore_impl_t::continue_rollback(blockstore_op_t *op)
{ {
if (PRIV(op)->op_state == 2) if (PRIV(op)->op_state == 2)
goto resume_2; goto resume_2;
else if (PRIV(op)->op_state == 3) else if (PRIV(op)->op_state == 4)
goto resume_3; goto resume_4;
else if (PRIV(op)->op_state == 5)
goto resume_5;
else else
return 1; return 1;
resume_2: resume_2:
// Release used journal sectors
release_journal_sectors(op);
resume_3:
if (!disable_journal_fsync) if (!disable_journal_fsync)
{ {
io_uring_sqe *sqe; BS_SUBMIT_GET_SQE(sqe, data);
BS_SUBMIT_GET_SQE_DECL(sqe);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 }; data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_rollback_event(data, op); }; data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1; PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 4; PRIV(op)->op_state = 3;
return 1; return 1;
} }
resume_5: resume_4:
obj_ver_id* v; obj_ver_id* v;
int i; int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
@ -196,24 +179,6 @@ void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov)
} }
} }
void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
PRIV(op)->op_state++;
ringloop->wakeup();
}
}
void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc) void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc)
{ {
if (dirty_end == dirty_start) if (dirty_end == dirty_start)

View File

@ -97,25 +97,18 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
{ {
return 0; return 0;
} }
// There is sufficient space. Get SQEs // There is sufficient space. Check SQEs
struct io_uring_sqe *sqe[space_check.sectors_to_write]; BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
for (i = 0; i < space_check.sectors_to_write; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// Prepare and submit journal entries // Prepare and submit journal entries
auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; int s = 0;
int s = 0, cur_sector = -1;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{ {
// FIXME: Only stabilize versions that aren't stable yet // FIXME: Only stabilize versions that aren't stable yet
if (!journal.entry_fits(sizeof(journal_entry_stable)) && if (!journal.entry_fits(sizeof(journal_entry_stable)) &&
journal.sector_info[journal.cur_sector].dirty) journal.sector_info[journal.cur_sector].dirty)
{ {
if (cur_sector == -1) prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; s++;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
cur_sector = journal.cur_sector;
} }
journal_entry_stable *je = (journal_entry_stable*) journal_entry_stable *je = (journal_entry_stable*)
prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable)); prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable));
@ -124,12 +117,9 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
je->crc32 = je_crc32((journal_entry*)je); je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32; journal.crc32_last = je->crc32;
} }
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); prepare_journal_sector_write(journal.cur_sector, op);
s++;
assert(s == space_check.sectors_to_write); assert(s == space_check.sectors_to_write);
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = 1; PRIV(op)->op_state = 1;
return 1; return 1;
} }
@ -138,30 +128,23 @@ int blockstore_impl_t::continue_stable(blockstore_op_t *op)
{ {
if (PRIV(op)->op_state == 2) if (PRIV(op)->op_state == 2)
goto resume_2; goto resume_2;
else if (PRIV(op)->op_state == 3) else if (PRIV(op)->op_state == 4)
goto resume_3; goto resume_4;
else if (PRIV(op)->op_state == 5)
goto resume_5;
else else
return 1; return 1;
resume_2: resume_2:
// Release used journal sectors
release_journal_sectors(op);
resume_3:
if (!disable_journal_fsync) if (!disable_journal_fsync)
{ {
io_uring_sqe *sqe; BS_SUBMIT_GET_SQE(sqe, data);
BS_SUBMIT_GET_SQE_DECL(sqe);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 }; data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1; PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 4; PRIV(op)->op_state = 3;
return 1; return 1;
} }
resume_5: resume_4:
// Mark dirty_db entries as stable, acknowledge op completion // Mark dirty_db entries as stable, acknowledge op completion
obj_ver_id* v; obj_ver_id* v;
int i; int i;
@ -257,21 +240,3 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
unstable_writes.erase(unstab_it); unstable_writes.erase(unstab_it);
} }
} }
void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
PRIV(op)->op_state++;
ringloop->wakeup();
}
}

View File

@ -44,10 +44,8 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
if (journal.sector_info[journal.cur_sector].dirty) if (journal.sector_info[journal.cur_sector].dirty)
{ {
// Write out the last journal sector if it happens to be dirty // Write out the last journal sector if it happens to be dirty
BS_SUBMIT_GET_ONLY_SQE(sqe); BS_SUBMIT_CHECK_SQES(1);
prepare_journal_sector_write(journal, journal.cur_sector, sqe, [this, op](ring_data_t *data) { handle_sync_event(data, op); }); prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
return 1; return 1;
} }
@ -64,7 +62,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 }; data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1; PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_DATA_SYNC_SENT; PRIV(op)->op_state = SYNC_DATA_SYNC_SENT;
@ -85,24 +83,18 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
{ {
return 0; return 0;
} }
// Get SQEs. Don't bother about merging, submit each journal sector as a separate request // Check SQEs. Don't bother about merging, submit each journal sector as a separate request
struct io_uring_sqe *sqe[space_check.sectors_to_write]; BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
for (int i = 0; i < space_check.sectors_to_write; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// Prepare and submit journal entries // Prepare and submit journal entries
auto it = PRIV(op)->sync_big_writes.begin(); auto it = PRIV(op)->sync_big_writes.begin();
int s = 0, cur_sector = -1; int s = 0;
while (it != PRIV(op)->sync_big_writes.end()) while (it != PRIV(op)->sync_big_writes.end())
{ {
if (!journal.entry_fits(sizeof(journal_entry_big_write) + clean_entry_bitmap_size) && if (!journal.entry_fits(sizeof(journal_entry_big_write) + clean_entry_bitmap_size) &&
journal.sector_info[journal.cur_sector].dirty) journal.sector_info[journal.cur_sector].dirty)
{ {
if (cur_sector == -1) prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; s++;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
cur_sector = journal.cur_sector;
} }
auto & dirty_entry = dirty_db.at(*it); auto & dirty_entry = dirty_db.at(*it);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
@ -129,12 +121,9 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
journal.crc32_last = je->crc32; journal.crc32_last = je->crc32;
it++; it++;
} }
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); }); prepare_journal_sector_write(journal.cur_sector, op);
s++;
assert(s == space_check.sectors_to_write); assert(s == space_check.sectors_to_write);
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
return 1; return 1;
} }
@ -145,7 +134,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 }; data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1; PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT; PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT;
@ -164,42 +153,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
return 1; return 1;
} }
void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
// Release used journal sectors
release_journal_sectors(op);
// Handle states
if (PRIV(op)->op_state == SYNC_DATA_SYNC_SENT)
{
PRIV(op)->op_state = SYNC_DATA_SYNC_DONE;
}
else if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_SENT)
{
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE;
}
else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT)
{
PRIV(op)->op_state = SYNC_DONE;
}
else
{
throw std::runtime_error("BUG: unexpected sync op state");
}
ringloop->wakeup();
}
}
void blockstore_impl_t::ack_sync(blockstore_op_t *op) void blockstore_impl_t::ack_sync(blockstore_op_t *op)
{ {
// Handle states // Handle states

View File

@ -268,8 +268,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
cancel_all_writes(op, dirty_it, -ENOSPC); cancel_all_writes(op, dirty_it, -ENOSPC);
return 2; return 2;
} }
write_iodepth++;
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
write_iodepth++;
dirty_it->second.location = loc << block_order; dirty_it->second.location = loc << block_order;
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED; dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
@ -324,29 +324,21 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{ {
return 0; return 0;
} }
write_iodepth++; // There is sufficient space. Check SQE(s)
// There is sufficient space. Get SQE(s) BS_SUBMIT_CHECK_SQES(
struct io_uring_sqe *sqe1 = NULL;
if (immediate_commit != IMMEDIATE_NONE ||
!journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size))
{
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode // Write current journal sector only if it's dirty and full, or in the immediate_commit mode
BS_SUBMIT_GET_SQE_DECL(sqe1); (immediate_commit != IMMEDIATE_NONE ||
} !journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size) ? 1 : 0) +
struct io_uring_sqe *sqe2 = NULL; (op->len > 0 ? 1 : 0)
if (op->len > 0) );
{ write_iodepth++;
BS_SUBMIT_GET_SQE_DECL(sqe2);
}
// Got SQEs. Prepare previous journal sector write if required // Got SQEs. Prepare previous journal sector write if required
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
if (immediate_commit == IMMEDIATE_NONE) if (immediate_commit == IMMEDIATE_NONE)
{ {
if (sqe1) if (!journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size))
{ {
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb); prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
} }
else else
{ {
@ -380,9 +372,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
journal.crc32_last = je->crc32; journal.crc32_last = je->crc32;
if (immediate_commit != IMMEDIATE_NONE) if (immediate_commit != IMMEDIATE_NONE)
{ {
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb); prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
} }
if (op->len > 0) if (op->len > 0)
{ {
@ -392,7 +382,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
// Copy data // Copy data
memcpy(journal.buffer + journal.next_free, op->buf, op->len); memcpy(journal.buffer + journal.next_free, op->buf, op->len);
} }
ring_data_t *data2 = ((ring_data_t*)sqe2->user_data); BS_SUBMIT_GET_SQE(sqe2, data2);
data2->iov = (struct iovec){ op->buf, op->len }; data2->iov = (struct iovec){ op->buf, op->len };
data2->callback = cb; data2->callback = cb;
my_uring_prep_writev( my_uring_prep_writev(
@ -441,13 +431,12 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
resume_2: resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry // Only for the immediate_commit mode: prepare and submit big_write journal entry
{ {
BS_SUBMIT_CHECK_SQES(1);
auto dirty_it = dirty_db.find((obj_ver_id){ auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid, .oid = op->oid,
.version = op->version, .version = op->version,
}); });
assert(dirty_it != dirty_db.end()); assert(dirty_it != dirty_db.end());
io_uring_sqe *sqe = NULL;
BS_SUBMIT_GET_SQE_DECL(sqe);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE, journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write) + clean_entry_bitmap_size sizeof(journal_entry_big_write) + clean_entry_bitmap_size
@ -469,10 +458,7 @@ resume_2:
memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size); memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size);
je->crc32 = je_crc32((journal_entry*)je); je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32; journal.crc32_last = je->crc32;
prepare_journal_sector_write(journal, journal.cur_sector, sqe, prepare_journal_sector_write(journal.cur_sector, op);
[this, op](ring_data_t *data) { handle_write_event(data, op); });
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 3; PRIV(op)->op_state = 3;
return 1; return 1;
} }
@ -587,6 +573,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o
); );
} }
PRIV(op)->pending_ops--; PRIV(op)->pending_ops--;
assert(PRIV(op)->pending_ops >= 0);
if (PRIV(op)->pending_ops == 0) if (PRIV(op)->pending_ops == 0)
{ {
release_journal_sectors(op); release_journal_sectors(op);
@ -604,7 +591,6 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
uint64_t s = PRIV(op)->min_flushed_journal_sector; uint64_t s = PRIV(op)->min_flushed_journal_sector;
while (1) while (1)
{ {
journal.sector_info[s-1].flush_count--;
if (s != (1+journal.cur_sector) && journal.sector_info[s-1].flush_count == 0) if (s != (1+journal.cur_sector) && journal.sector_info[s-1].flush_count == 0)
{ {
// We know for sure that we won't write into this sector anymore // We know for sure that we won't write into this sector anymore
@ -644,23 +630,19 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
return 0; return 0;
} }
write_iodepth++; write_iodepth++;
io_uring_sqe *sqe = NULL; // Write current journal sector only if it's dirty and full, or in the immediate_commit mode
if (immediate_commit != IMMEDIATE_NONE || BS_SUBMIT_CHECK_SQES(
(journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) && (immediate_commit != IMMEDIATE_NONE ||
journal.sector_info[journal.cur_sector].dirty) (journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
{ journal.sector_info[journal.cur_sector].dirty) ? 1 : 0
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode );
BS_SUBMIT_GET_SQE_DECL(sqe);
}
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
// Prepare journal sector write // Prepare journal sector write
if (immediate_commit == IMMEDIATE_NONE) if (immediate_commit == IMMEDIATE_NONE)
{ {
if (sqe) if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
journal.sector_info[journal.cur_sector].dirty)
{ {
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
} }
else else
{ {
@ -687,9 +669,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
dirty_it->second.state = BS_ST_DELETE | BS_ST_SUBMITTED; dirty_it->second.state = BS_ST_DELETE | BS_ST_SUBMITTED;
if (immediate_commit != IMMEDIATE_NONE) if (immediate_commit != IMMEDIATE_NONE)
{ {
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
} }
if (!PRIV(op)->pending_ops) if (!PRIV(op)->pending_ops)
{ {

View File

@ -305,6 +305,10 @@ void cli_tool_t::run(json11::Json cfg)
fprintf(stderr, "unknown command: %s\n", cmd[0].string_value().c_str()); fprintf(stderr, "unknown command: %s\n", cmd[0].string_value().c_str());
exit(1); exit(1);
} }
if (action_cb == NULL)
{
return;
}
color = !cfg["no-color"].bool_value(); color = !cfg["no-color"].bool_value();
json_output = cfg["json"].bool_value(); json_output = cfg["json"].bool_value();
iodepth = cfg["iodepth"].uint64_value(); iodepth = cfg["iodepth"].uint64_value();

View File

@ -28,6 +28,7 @@
#include <vector> #include <vector>
#include <unordered_map> #include <unordered_map>
#include "addr_util.h"
#include "rw_blocking.h" #include "rw_blocking.h"
#include "osd_ops.h" #include "osd_ops.h"
#include "fio_headers.h" #include "fio_headers.h"
@ -152,17 +153,14 @@ static int sec_init(struct thread_data *td)
bsd->block_order = o->block_order == 0 ? 17 : o->block_order; bsd->block_order = o->block_order == 0 ? 17 : o->block_order;
bsd->block_size = 1 << o->block_order; bsd->block_size = 1 << o->block_order;
struct sockaddr_in addr; sockaddr addr;
int r; if (!string_to_addr(std::string(o->host ? o->host : "127.0.0.1"), false, o->port > 0 ? o->port : 11203, &addr))
if ((r = inet_pton(AF_INET, o->host ? o->host : "127.0.0.1", &addr.sin_addr)) != 1)
{ {
fprintf(stderr, "server address: %s%s\n", o->host ? o->host : "127.0.0.1", r == 0 ? " is not valid" : ": no ipv4 support"); fprintf(stderr, "server address: %s is not valid\n", o->host ? o->host : "127.0.0.1");
return 1; return 1;
} }
addr.sin_family = AF_INET;
addr.sin_port = htons(o->port ? o->port : 11203);
bsd->connect_fd = socket(AF_INET, SOCK_STREAM, 0); bsd->connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (bsd->connect_fd < 0) if (bsd->connect_fd < 0)
{ {
perror("socket"); perror("socket");

View File

@ -15,13 +15,13 @@
#include <stdexcept> #include <stdexcept>
#include "addr_util.h"
#include "json11/json11.hpp" #include "json11/json11.hpp"
#include "http_client.h" #include "http_client.h"
#include "timerfd_manager.h" #include "timerfd_manager.h"
#define READ_BUFFER_SIZE 9000 #define READ_BUFFER_SIZE 9000
static int extract_port(std::string & host);
static std::string trim(const std::string & in); static std::string trim(const std::string & in);
static std::string ws_format_frame(int type, uint64_t size); static std::string ws_format_frame(int type, uint64_t size);
static bool ws_parse_frame(std::string & buf, int & type, std::string & res); static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
@ -185,19 +185,15 @@ http_co_t::~http_co_t()
void http_co_t::start_connection() void http_co_t::start_connection()
{ {
stackin(); stackin();
int port = extract_port(host); struct sockaddr addr;
struct sockaddr_in addr; if (!string_to_addr(host.c_str(), 1, 80, &addr))
int r;
if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1)
{ {
parsed.error_code = ENXIO; parsed.error_code = ENXIO;
stackout(); stackout();
end(); end();
return; return;
} }
addr.sin_family = AF_INET; peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
addr.sin_port = htons(port ? port : 80);
peer_fd = socket(AF_INET, SOCK_STREAM, 0);
if (peer_fd < 0) if (peer_fd < 0)
{ {
parsed.error_code = errno; parsed.error_code = errno;
@ -219,7 +215,7 @@ void http_co_t::start_connection()
} }
epoll_events = 0; epoll_events = 0;
// Finally call connect // Finally call connect
r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); int r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS) if (r < 0 && errno != EINPROGRESS)
{ {
parsed.error_code = errno; parsed.error_code = errno;
@ -759,22 +755,6 @@ std::vector<std::string> getifaddr_list(json11::Json mask_cfg, bool include_v6)
return addresses; return addresses;
} }
static int extract_port(std::string & host)
{
int port = 0;
int pos = 0;
if ((pos = host.find(':')) >= 0)
{
port = strtoull(host.c_str() + pos + 1, NULL, 10);
if (port >= 0x10000)
{
port = 0;
}
host = host.substr(0, pos);
}
return port;
}
std::string strtolower(const std::string & in) std::string strtolower(const std::string & in)
{ {
std::string s = in; std::string s = in;

View File

@ -45,7 +45,7 @@ struct websocket_t
void parse_http_headers(std::string & res, http_response_t *parsed); void parse_http_headers(std::string & res, http_response_t *parsed);
std::vector<std::string> getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = false); std::vector<std::string> getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = true);
uint64_t stoull_full(const std::string & str, int base = 10); uint64_t stoull_full(const std::string & str, int base = 10);

View File

@ -8,6 +8,7 @@
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <stdexcept> #include <stdexcept>
#include "addr_util.h"
#include "messenger.h" #include "messenger.h"
void osd_messenger_t::init() void osd_messenger_t::init()
@ -220,23 +221,20 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port) void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
{ {
assert(peer_osd != this->osd_num); assert(peer_osd != this->osd_num);
struct sockaddr_in addr; struct sockaddr addr;
int r; if (!string_to_addr(peer_host, 0, peer_port, &addr))
if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
{ {
on_connect_peer(peer_osd, -EINVAL); on_connect_peer(peer_osd, -EINVAL);
return; return;
} }
addr.sin_family = AF_INET; int peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
addr.sin_port = htons(peer_port ? peer_port : 11203);
int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
if (peer_fd < 0) if (peer_fd < 0)
{ {
on_connect_peer(peer_osd, -errno); on_connect_peer(peer_osd, -errno);
return; return;
} }
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); int r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS) if (r < 0 && errno != EINPROGRESS)
{ {
close(peer_fd); close(peer_fd);
@ -485,21 +483,20 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
void osd_messenger_t::accept_connections(int listen_fd) void osd_messenger_t::accept_connections(int listen_fd)
{ {
// Accept new connections // Accept new connections
sockaddr_in addr; sockaddr addr;
socklen_t peer_addr_size = sizeof(addr); socklen_t peer_addr_size = sizeof(addr);
int peer_fd; int peer_fd;
while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0)
{ {
assert(peer_fd != 0); assert(peer_fd != 0);
char peer_str[256]; fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd,
fprintf(stderr, "[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd, addr_to_string(addr).c_str());
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1; int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
clients[peer_fd] = new osd_client_t(); clients[peer_fd] = new osd_client_t();
clients[peer_fd]->peer_addr = addr; clients[peer_fd]->peer_addr = addr;
clients[peer_fd]->peer_port = ntohs(addr.sin_port); clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
clients[peer_fd]->peer_fd = peer_fd; clients[peer_fd]->peer_fd = peer_fd;
clients[peer_fd]->peer_state = PEER_CONNECTED; clients[peer_fd]->peer_state = PEER_CONNECTED;
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);

View File

@ -49,7 +49,7 @@ struct osd_client_t
{ {
int refs = 0; int refs = 0;
sockaddr_in peer_addr; sockaddr peer_addr;
int peer_port; int peer_port;
int peer_fd; int peer_fd;
int peer_state; int peer_state;

View File

@ -7,6 +7,7 @@
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include "addr_util.h"
#include "blockstore_impl.h" #include "blockstore_impl.h"
#include "osd_primary.h" #include "osd_primary.h"
#include "osd.h" #include "osd.h"
@ -156,14 +157,6 @@ void osd_t::parse_config(const json11::Json & config)
void osd_t::bind_socket() void osd_t::bind_socket()
{ {
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (config["osd_network"].is_string() || if (config["osd_network"].is_string() ||
config["osd_network"].is_array()) config["osd_network"].is_array())
{ {
@ -173,7 +166,7 @@ void osd_t::bind_socket()
else else
for (auto v: config["osd_network"].array_items()) for (auto v: config["osd_network"].array_items())
mask.push_back(v.string_value()); mask.push_back(v.string_value());
auto matched_addrs = getifaddr_list(mask, false); auto matched_addrs = getifaddr_list(mask);
if (matched_addrs.size() > 1) if (matched_addrs.size() > 1)
{ {
fprintf(stderr, "More than 1 address matches requested network(s): %s\n", json11::Json(matched_addrs).dump().c_str()); fprintf(stderr, "More than 1 address matches requested network(s): %s\n", json11::Json(matched_addrs).dump().c_str());
@ -192,17 +185,21 @@ void osd_t::bind_socket()
// FIXME Support multiple listening sockets // FIXME Support multiple listening sockets
sockaddr_in addr; sockaddr addr;
int r; if (!string_to_addr(bind_address, 0, bind_port, &addr))
if ((r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1)
{ {
close(listen_fd); throw std::runtime_error("bind address "+bind_address+" is not valid");
throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support"));
} }
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port); listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0) if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{ {
close(listen_fd); close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno)); throw std::runtime_error(std::string("bind: ") + strerror(errno));
@ -215,7 +212,7 @@ void osd_t::bind_socket()
close(listen_fd); close(listen_fd);
throw std::runtime_error(std::string("getsockname: ") + strerror(errno)); throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
} }
listening_port = ntohs(addr.sin_port); listening_port = ntohs(((sockaddr_in*)&addr)->sin_port);
} }
else else
{ {

View File

@ -194,6 +194,24 @@ void osd_t::start_pg_peering(pg_t & pg)
}); });
} }
} }
if (pg.pg_cursize < pg.pg_minsize)
{
pg.state = PG_INCOMPLETE;
report_pg_state(pg);
return;
}
std::set<osd_num_t> cur_peers;
for (auto pg_osd: pg.all_peers)
{
if (pg_osd == this->osd_num || msgr.osd_peer_fds.find(pg_osd) != msgr.osd_peer_fds.end())
{
cur_peers.insert(pg_osd);
}
else if (msgr.wanted_peers.find(pg_osd) == msgr.wanted_peers.end())
{
msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
}
}
if (pg.target_history.size()) if (pg.target_history.size())
{ {
// Refuse to start PG if no peers are available from any of the historical OSD sets // Refuse to start PG if no peers are available from any of the historical OSD sets
@ -222,24 +240,6 @@ void osd_t::start_pg_peering(pg_t & pg)
} }
} }
} }
if (pg.pg_cursize < pg.pg_minsize)
{
pg.state = PG_INCOMPLETE;
report_pg_state(pg);
return;
}
std::set<osd_num_t> cur_peers;
for (auto pg_osd: pg.all_peers)
{
if (pg_osd == this->osd_num || msgr.osd_peer_fds.find(pg_osd) != msgr.osd_peer_fds.end())
{
cur_peers.insert(pg_osd);
}
else if (msgr.wanted_peers.find(pg_osd) == msgr.wanted_peers.end())
{
msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
}
}
pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end()); pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end());
if (pg.peering_state) if (pg.peering_state)
{ {

View File

@ -16,6 +16,7 @@
#include <stdexcept> #include <stdexcept>
#include "addr_util.h"
#include "osd_ops.h" #include "osd_ops.h"
#include "rw_blocking.h" #include "rw_blocking.h"
#include "test_pattern.h" #include "test_pattern.h"
@ -133,17 +134,14 @@ int main(int narg, char *args[])
int connect_osd(const char *osd_address, int osd_port) int connect_osd(const char *osd_address, int osd_port)
{ {
struct sockaddr_in addr; struct sockaddr addr;
int r; if (!string_to_addr(osd_address, 0, osd_port, &addr))
if ((r = inet_pton(AF_INET, osd_address, &addr.sin_addr)) != 1)
{ {
fprintf(stderr, "server address: %s%s\n", osd_address, r == 0 ? " is not valid" : ": no ipv4 support"); fprintf(stderr, "server address: %s is not valid\n", osd_address);
return -1; return -1;
} }
addr.sin_family = AF_INET;
addr.sin_port = htons(osd_port);
int connect_fd = socket(AF_INET, SOCK_STREAM, 0); int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (connect_fd < 0) if (connect_fd < 0)
{ {
perror("socket"); perror("socket");

View File

@ -21,6 +21,7 @@
#include <stdexcept> #include <stdexcept>
#include "addr_util.h"
#include "rw_blocking.h" #include "rw_blocking.h"
#include "osd_ops.h" #include "osd_ops.h"
@ -66,16 +67,14 @@ int main(int narg, char *args[])
int connect_stub(const char *server_address, int server_port) int connect_stub(const char *server_address, int server_port)
{ {
struct sockaddr_in addr; struct sockaddr addr;
int r; if (!string_to_addr(server_address, 0, server_port, &addr))
if ((r = inet_pton(AF_INET, server_address, &addr.sin_addr)) != 1)
{ {
fprintf(stderr, "server address: %s%s\n", server_address, r == 0 ? " is not valid" : ": no ipv4 support"); fprintf(stderr, "server address: %s is not valid\n", server_address);
return -1; return -1;
} }
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port); int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
int connect_fd = socket(AF_INET, SOCK_STREAM, 0);
if (connect_fd < 0) if (connect_fd < 0)
{ {
perror("socket"); perror("socket");

View File

@ -37,10 +37,11 @@
#include <stdexcept> #include <stdexcept>
#include "addr_util.h"
#include "rw_blocking.h" #include "rw_blocking.h"
#include "osd_ops.h" #include "osd_ops.h"
int bind_stub(const char *bind_address, int bind_port); int bind_stub(std::string bind_address, int bind_port);
void run_stub(int peer_fd); void run_stub(int peer_fd);
@ -48,13 +49,13 @@ int main(int narg, char *args[])
{ {
int listen_fd = bind_stub("0.0.0.0", 11203); int listen_fd = bind_stub("0.0.0.0", 11203);
// Accept new connections // Accept new connections
sockaddr_in addr; sockaddr addr;
socklen_t peer_addr_size = sizeof(addr); socklen_t peer_addr_size = sizeof(addr);
int peer_fd; int peer_fd;
while (1) while (1)
{ {
printf("stub_osd: waiting for 1 client\n"); printf("stub_osd: waiting for 1 client\n");
peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size); peer_fd = accept(listen_fd, &addr, &peer_addr_size);
if (peer_fd == -1) if (peer_fd == -1)
{ {
if (errno == EAGAIN) if (errno == EAGAIN)
@ -62,9 +63,8 @@ int main(int narg, char *args[])
else else
throw std::runtime_error(std::string("accept: ") + strerror(errno)); throw std::runtime_error(std::string("accept: ") + strerror(errno));
} }
char peer_str[256]; printf("stub_osd: new client %d: connection from %s\n", peer_fd,
printf("stub_osd: new client %d: connection from %s port %d\n", peer_fd, addr_to_string(addr).c_str());
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
int one = 1; int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
run_stub(peer_fd); run_stub(peer_fd);
@ -76,11 +76,17 @@ int main(int narg, char *args[])
return 0; return 0;
} }
int bind_stub(const char *bind_address, int bind_port) int bind_stub(std::string bind_address, int bind_port)
{ {
int listen_backlog = 128; int listen_backlog = 128;
int listen_fd = socket(AF_INET, SOCK_STREAM, 0); sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0) if (listen_fd < 0)
{ {
throw std::runtime_error(std::string("socket: ") + strerror(errno)); throw std::runtime_error(std::string("socket: ") + strerror(errno));
@ -88,17 +94,7 @@ int bind_stub(const char *bind_address, int bind_port)
int enable = 1; int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
sockaddr_in addr; if (bind(listen_fd, &addr, sizeof(addr)) < 0)
int r;
if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1)
{
close(listen_fd);
throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support"));
}
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
{ {
close(listen_fd); close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno)); throw std::runtime_error(std::string("bind: ") + strerror(errno));

View File

@ -20,11 +20,12 @@
#include <stdexcept> #include <stdexcept>
#include "addr_util.h"
#include "ringloop.h" #include "ringloop.h"
#include "epoll_manager.h" #include "epoll_manager.h"
#include "messenger.h" #include "messenger.h"
int bind_stub(const char *bind_address, int bind_port); int bind_stub(std::string bind_address, int bind_port);
void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op); void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op);
@ -66,11 +67,17 @@ int main(int narg, char *args[])
return 0; return 0;
} }
int bind_stub(const char *bind_address, int bind_port) int bind_stub(std::string bind_address, int bind_port)
{ {
int listen_backlog = 128; int listen_backlog = 128;
int listen_fd = socket(AF_INET, SOCK_STREAM, 0); sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0) if (listen_fd < 0)
{ {
throw std::runtime_error(std::string("socket: ") + strerror(errno)); throw std::runtime_error(std::string("socket: ") + strerror(errno));
@ -78,17 +85,7 @@ int bind_stub(const char *bind_address, int bind_port)
int enable = 1; int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
sockaddr_in addr; if (bind(listen_fd, &addr, sizeof(addr)) < 0)
int r;
if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1)
{
close(listen_fd);
throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support"));
}
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
{ {
close(listen_fd); close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno)); throw std::runtime_error(std::string("bind: ") + strerror(errno));