Compare commits

..

6 Commits

Author SHA1 Message Date
Vitaliy Filippov 20a4406acc Support IPv6 OSD addresses 2021-12-19 10:42:17 +03:00
Vitaliy Filippov f93491bc6c Implement journal write batching and slightly refactor journal writes
Slightly reduces WA. For example, in 4K T1Q128 replicated randwrite tests
WA is reduced from ~3.6 to ~3.1, in T1Q64 from ~3.8 to ~3.4.

Only effective without no_same_sector_overwrites.
2021-12-16 00:27:17 +03:00
Vitaliy Filippov 999bed8514 Fix opening regular files as blockstore 2021-12-15 02:08:58 +03:00
Vitaliy Filippov 3f33095fd7 Do not try to initialize client in simple-offsets 2021-12-15 02:07:27 +03:00
Vitaliy Filippov dd74c5ce1b Fix OSDs marking PGs incomplete instead of trying to connect with peers 2021-12-14 01:57:51 +03:00
Vitaliy Filippov c6d104ecd6 Print object version on fatal overwrite 2021-12-14 01:57:04 +03:00
25 changed files with 342 additions and 358 deletions

View File

@ -88,8 +88,8 @@ if (IBVERBS_LIBRARIES)
set(MSGR_RDMA "msgr_rdma.cpp")
endif (IBVERBS_LIBRARIES)
add_library(vitastor_common STATIC
epoll_manager.cpp etcd_state_client.cpp
messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp
epoll_manager.cpp etcd_state_client.cpp messenger.cpp addr_util.cpp
msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp
http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ${MSGR_RDMA}
)
target_compile_options(vitastor_common PUBLIC -fPIC)
@ -112,6 +112,7 @@ if (${WITH_FIO})
add_library(fio_vitastor_sec SHARED
fio_sec_osd.cpp
rw_blocking.cpp
addr_util.cpp
)
target_link_libraries(fio_vitastor_sec
tcmalloc_minimal
@ -189,11 +190,11 @@ endif (${WITH_QEMU})
### Test stubs
# stub_osd, stub_bench, osd_test
add_executable(stub_osd stub_osd.cpp rw_blocking.cpp)
add_executable(stub_osd stub_osd.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(stub_osd tcmalloc_minimal)
add_executable(stub_bench stub_bench.cpp rw_blocking.cpp)
add_executable(stub_bench stub_bench.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(stub_bench tcmalloc_minimal)
add_executable(osd_test osd_test.cpp rw_blocking.cpp)
add_executable(osd_test osd_test.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(osd_test tcmalloc_minimal)
# osd_rmw_test

60
src/addr_util.cpp Normal file
View File

@ -0,0 +1,60 @@
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <stdexcept>
#include "addr_util.h"
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr)
{
if (parse_port)
{
int p = str.rfind(':');
if (p != std::string::npos && !(str.length() > 0 && str[p-1] == ']')) // "[ipv6]" which contains ':'
{
char null_byte = 0;
int n = sscanf(str.c_str()+p+1, "%d%c", &default_port, &null_byte);
if (n != 1 || default_port >= 0x10000)
return false;
str = str.substr(0, p);
}
}
if (inet_pton(AF_INET, str.c_str(), &((struct sockaddr_in*)addr)->sin_addr) == 1)
{
addr->sa_family = AF_INET;
((struct sockaddr_in*)addr)->sin_port = htons(default_port);
return true;
}
if (str.length() >= 2 && str[0] == '[' && str[str.length()-1] == ']')
str = str.substr(1, str.length()-2);
if (inet_pton(AF_INET6, str.c_str(), &((struct sockaddr_in6*)addr)->sin6_addr) == 1)
{
addr->sa_family = AF_INET6;
((struct sockaddr_in6*)addr)->sin6_port = htons(default_port);
return true;
}
return false;
}
std::string addr_to_string(const sockaddr &addr)
{
char peer_str[256];
bool ok = false;
int port;
if (addr.sa_family == AF_INET)
{
ok = !!inet_ntop(AF_INET, &((sockaddr_in*)&addr)->sin_addr, peer_str, 256);
port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else if (addr.sa_family == AF_INET6)
{
ok = !!inet_ntop(AF_INET6, &((sockaddr_in6*)&addr)->sin6_addr, peer_str, 256);
port = ntohs(((sockaddr_in6*)&addr)->sin6_port);
}
else
throw std::runtime_error("Unknown address family "+std::to_string(addr.sa_family));
if (!ok)
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
return std::string(peer_str)+":"+std::to_string(port);
}

7
src/addr_util.h Normal file
View File

@ -0,0 +1,7 @@
#pragma once
#include <sys/socket.h>
#include <string>
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr);
std::string addr_to_string(const sockaddr &addr);

View File

@ -547,8 +547,9 @@ resume_1:
clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid)
{
printf("Fatal error (metadata corruption or bug): tried to delete metadata entry %lu (%lx:%lx) while deleting %lx:%lx\n",
clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe);
printf("Fatal error (metadata corruption or bug): tried to delete metadata entry %lu (%lx:%lx v%lu) while deleting %lx:%lx\n",
clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe,
new_entry->version, cur.oid.inode, cur.oid.stripe);
exit(1);
}
// zero out new metadata entry
@ -559,8 +560,9 @@ resume_1:
clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid)
{
printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx) with %lx:%lx\n",
clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe);
printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx v%lu) with %lx:%lx v%lu\n",
clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, new_entry->version,
cur.oid.inode, cur.oid.stripe, cur.version);
exit(1);
}
new_entry->oid = cur.oid;

View File

@ -235,6 +235,12 @@ void blockstore_impl_t::loop()
{
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
}
for (auto s: journal.submitting_sectors)
{
// Mark journal sector writes as submitted
journal.sector_info[s].submit_id = 0;
}
journal.submitting_sectors.clear();
if ((initial_ring_space - ringloop->space_left()) > 0)
{
live = true;

View File

@ -54,6 +54,14 @@
#define IS_BIG_WRITE(st) (((st) & 0x0F) == BS_ST_BIG_WRITE)
#define IS_DELETE(st) (((st) & 0x0F) == BS_ST_DELETE)
#define BS_SUBMIT_CHECK_SQES(n) \
if (ringloop->space_left() < (n))\
{\
/* Pause until there are more requests available */\
PRIV(op)->wait_for = WAIT_SQE;\
return 0;\
}
#define BS_SUBMIT_GET_SQE(sqe, data) \
BS_SUBMIT_GET_ONLY_SQE(sqe); \
struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
@ -170,7 +178,7 @@ struct blockstore_op_private_t
std::vector<fulfill_read_t> read_vec;
// Sync, write
uint64_t min_flushed_journal_sector, max_flushed_journal_sector;
int min_flushed_journal_sector, max_flushed_journal_sector;
// Write
struct iovec iov_zerofill[3];
@ -283,6 +291,10 @@ class blockstore_impl_t
void open_journal();
uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset);
// Journaling
void prepare_journal_sector_write(int sector, blockstore_op_t *op);
void handle_journal_write(ring_data_t *data, uint64_t flush_id);
// Asynchronous init
int initialized;
int metadata_buf_size;
@ -310,21 +322,18 @@ class blockstore_impl_t
// Sync
int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync);
void handle_sync_event(ring_data_t *data, blockstore_op_t *op);
void ack_sync(blockstore_op_t *op);
// Stabilize
int dequeue_stable(blockstore_op_t *op);
int continue_stable(blockstore_op_t *op);
void mark_stable(const obj_ver_id & ov, bool forget_dirty = false);
void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
void stabilize_object(object_id oid, uint64_t max_ver);
// Rollback
int dequeue_rollback(blockstore_op_t *op);
int continue_rollback(blockstore_op_t *op);
void mark_rolled_back(const obj_ver_id & ov);
void handle_rollback_event(ring_data_t *data, blockstore_op_t *op);
void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc);
// List

View File

@ -153,22 +153,73 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
return je;
}
void prepare_journal_sector_write(journal_t & journal, int cur_sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb)
void blockstore_impl_t::prepare_journal_sector_write(int cur_sector, blockstore_op_t *op)
{
// Don't submit the same sector twice in the same batch
if (!journal.sector_info[cur_sector].submit_id)
{
io_uring_sqe *sqe = get_sqe();
// Caller must ensure availability of an SQE
assert(sqe != NULL);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
journal.sector_info[cur_sector].written = true;
journal.sector_info[cur_sector].submit_id = ++journal.submit_id;
journal.submitting_sectors.push_back(cur_sector);
journal.sector_info[cur_sector].flush_count++;
data->iov = (struct iovec){
(journal.inmemory
? journal.buffer + journal.sector_info[cur_sector].offset
: journal.sector_buf + journal.block_size*cur_sector),
journal.block_size
};
data->callback = [this, flush_id = journal.submit_id](ring_data_t *data) { handle_journal_write(data, flush_id); };
my_uring_prep_writev(
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset
);
}
journal.sector_info[cur_sector].dirty = false;
journal.sector_info[cur_sector].written = true;
journal.sector_info[cur_sector].flush_count++;
ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){
(journal.inmemory
? journal.buffer + journal.sector_info[cur_sector].offset
: journal.sector_buf + journal.block_size*cur_sector),
journal.block_size
};
data->callback = cb;
my_uring_prep_writev(
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset
);
// But always remember that this operation has to wait until this exact journal write is finished
journal.flushing_ops.insert((pending_journaling_t){
.flush_id = journal.sector_info[cur_sector].submit_id,
.sector = cur_sector,
.op = op,
});
auto priv = PRIV(op);
priv->pending_ops++;
if (!priv->min_flushed_journal_sector)
priv->min_flushed_journal_sector = 1+cur_sector;
priv->max_flushed_journal_sector = 1+cur_sector;
}
void blockstore_impl_t::handle_journal_write(ring_data_t *data, uint64_t flush_id)
{
live = true;
if (data->res != data->iov.iov_len)
{
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
throw std::runtime_error(
"journal write failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
auto fl_it = journal.flushing_ops.upper_bound((pending_journaling_t){ .flush_id = flush_id });
if (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id)
{
journal.sector_info[fl_it->sector].flush_count--;
}
while (fl_it != journal.flushing_ops.end() && fl_it->flush_id == flush_id)
{
auto priv = PRIV(fl_it->op);
priv->pending_ops--;
assert(priv->pending_ops >= 0);
if (priv->pending_ops == 0)
{
release_journal_sectors(fl_it->op);
priv->op_state++;
ringloop->wakeup();
}
journal.flushing_ops.erase(fl_it++);
}
}
journal_t::~journal_t()

View File

@ -4,6 +4,7 @@
#pragma once
#include "crc32c.h"
#include <set>
#define MIN_JOURNAL_SIZE 4*1024*1024
#define JOURNAL_MAGIC 0x4A33
@ -145,8 +146,21 @@ struct journal_sector_info_t
uint64_t flush_count;
bool written;
bool dirty;
uint64_t submit_id;
};
struct pending_journaling_t
{
uint64_t flush_id;
int sector;
blockstore_op_t *op;
};
inline bool operator < (const pending_journaling_t & a, const pending_journaling_t & b)
{
return a.flush_id < b.flush_id || a.flush_id == b.flush_id && a.op < b.op;
}
struct journal_t
{
int fd;
@ -172,6 +186,9 @@ struct journal_t
bool no_same_sector_overwrites = false;
int cur_sector = 0;
int in_sector_pos = 0;
std::vector<int> submitting_sectors;
std::set<pending_journaling_t> flushing_ops;
uint64_t submit_id = 0;
// Used sector map
// May use ~ 80 MB per 1 GB of used journal space in the worst case
@ -200,5 +217,3 @@ struct blockstore_journal_check_t
};
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size);
void prepare_journal_sector_write(journal_t & journal, int sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb);

View File

@ -306,6 +306,10 @@ static void check_size(int fd, uint64_t *size, uint64_t *sectsize, std::string n
if (S_ISREG(st.st_mode))
{
*size = st.st_size;
if (sectsize)
{
*sectsize = st.st_blksize;
}
}
else if (S_ISBLK(st.st_mode))
{

View File

@ -74,24 +74,17 @@ skip_ov:
{
return 0;
}
// There is sufficient space. Get SQEs
struct io_uring_sqe *sqe[space_check.sectors_to_write];
for (i = 0; i < space_check.sectors_to_write; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// There is sufficient space. Check SQEs
BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
// Prepare and submit journal entries
auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
int s = 0, cur_sector = -1;
int s = 0;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
if (!journal.entry_fits(sizeof(journal_entry_rollback)) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal.cur_sector, op);
s++;
}
journal_entry_rollback *je = (journal_entry_rollback*)
prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback));
@ -100,12 +93,9 @@ skip_ov:
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
}
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
prepare_journal_sector_write(journal.cur_sector, op);
s++;
assert(s == space_check.sectors_to_write);
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = 1;
return 1;
}
@ -114,30 +104,23 @@ int blockstore_impl_t::continue_rollback(blockstore_op_t *op)
{
if (PRIV(op)->op_state == 2)
goto resume_2;
else if (PRIV(op)->op_state == 3)
goto resume_3;
else if (PRIV(op)->op_state == 5)
goto resume_5;
else if (PRIV(op)->op_state == 4)
goto resume_4;
else
return 1;
resume_2:
// Release used journal sectors
release_journal_sectors(op);
resume_3:
if (!disable_journal_fsync)
{
io_uring_sqe *sqe;
BS_SUBMIT_GET_SQE_DECL(sqe);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 4;
PRIV(op)->op_state = 3;
return 1;
}
resume_5:
resume_4:
obj_ver_id* v;
int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
@ -196,24 +179,6 @@ void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov)
}
}
void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
PRIV(op)->op_state++;
ringloop->wakeup();
}
}
void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc)
{
if (dirty_end == dirty_start)

View File

@ -97,25 +97,18 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
{
return 0;
}
// There is sufficient space. Get SQEs
struct io_uring_sqe *sqe[space_check.sectors_to_write];
for (i = 0; i < space_check.sectors_to_write; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// There is sufficient space. Check SQEs
BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
// Prepare and submit journal entries
auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
int s = 0, cur_sector = -1;
int s = 0;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
// FIXME: Only stabilize versions that aren't stable yet
if (!journal.entry_fits(sizeof(journal_entry_stable)) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal.cur_sector, op);
s++;
}
journal_entry_stable *je = (journal_entry_stable*)
prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable));
@ -124,12 +117,9 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
}
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb);
prepare_journal_sector_write(journal.cur_sector, op);
s++;
assert(s == space_check.sectors_to_write);
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = 1;
return 1;
}
@ -138,30 +128,23 @@ int blockstore_impl_t::continue_stable(blockstore_op_t *op)
{
if (PRIV(op)->op_state == 2)
goto resume_2;
else if (PRIV(op)->op_state == 3)
goto resume_3;
else if (PRIV(op)->op_state == 5)
goto resume_5;
else if (PRIV(op)->op_state == 4)
goto resume_4;
else
return 1;
resume_2:
// Release used journal sectors
release_journal_sectors(op);
resume_3:
if (!disable_journal_fsync)
{
io_uring_sqe *sqe;
BS_SUBMIT_GET_SQE_DECL(sqe);
ring_data_t *data = ((ring_data_t*)sqe->user_data);
BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 4;
PRIV(op)->op_state = 3;
return 1;
}
resume_5:
resume_4:
// Mark dirty_db entries as stable, acknowledge op completion
obj_ver_id* v;
int i;
@ -257,21 +240,3 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
unstable_writes.erase(unstab_it);
}
}
void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
PRIV(op)->op_state++;
ringloop->wakeup();
}
}

View File

@ -44,10 +44,8 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
if (journal.sector_info[journal.cur_sector].dirty)
{
// Write out the last journal sector if it happens to be dirty
BS_SUBMIT_GET_ONLY_SQE(sqe);
prepare_journal_sector_write(journal, journal.cur_sector, sqe, [this, op](ring_data_t *data) { handle_sync_event(data, op); });
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
BS_SUBMIT_CHECK_SQES(1);
prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
return 1;
}
@ -64,7 +62,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_DATA_SYNC_SENT;
@ -85,24 +83,18 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
{
return 0;
}
// Get SQEs. Don't bother about merging, submit each journal sector as a separate request
struct io_uring_sqe *sqe[space_check.sectors_to_write];
for (int i = 0; i < space_check.sectors_to_write; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqe[i]);
}
// Check SQEs. Don't bother about merging, submit each journal sector as a separate request
BS_SUBMIT_CHECK_SQES(space_check.sectors_to_write);
// Prepare and submit journal entries
auto it = PRIV(op)->sync_big_writes.begin();
int s = 0, cur_sector = -1;
int s = 0;
while (it != PRIV(op)->sync_big_writes.end())
{
if (!journal.entry_fits(sizeof(journal_entry_big_write) + clean_entry_bitmap_size) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal.cur_sector, op);
s++;
}
auto & dirty_entry = dirty_db.at(*it);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
@ -129,12 +121,9 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
journal.crc32_last = je->crc32;
it++;
}
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
prepare_journal_sector_write(journal.cur_sector, op);
s++;
assert(s == space_check.sectors_to_write);
if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
return 1;
}
@ -145,7 +134,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT;
@ -164,42 +153,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_prog
return 1;
}
void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
if (data->res != data->iov.iov_len)
{
throw std::runtime_error(
"write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
"). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
);
}
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
// Release used journal sectors
release_journal_sectors(op);
// Handle states
if (PRIV(op)->op_state == SYNC_DATA_SYNC_SENT)
{
PRIV(op)->op_state = SYNC_DATA_SYNC_DONE;
}
else if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_SENT)
{
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE;
}
else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT)
{
PRIV(op)->op_state = SYNC_DONE;
}
else
{
throw std::runtime_error("BUG: unexpected sync op state");
}
ringloop->wakeup();
}
}
void blockstore_impl_t::ack_sync(blockstore_op_t *op)
{
// Handle states

View File

@ -268,8 +268,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
cancel_all_writes(op, dirty_it, -ENOSPC);
return 2;
}
write_iodepth++;
BS_SUBMIT_GET_SQE(sqe, data);
write_iodepth++;
dirty_it->second.location = loc << block_order;
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
#ifdef BLOCKSTORE_DEBUG
@ -324,29 +324,21 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{
return 0;
}
write_iodepth++;
// There is sufficient space. Get SQE(s)
struct io_uring_sqe *sqe1 = NULL;
if (immediate_commit != IMMEDIATE_NONE ||
!journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size))
{
// There is sufficient space. Check SQE(s)
BS_SUBMIT_CHECK_SQES(
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode
BS_SUBMIT_GET_SQE_DECL(sqe1);
}
struct io_uring_sqe *sqe2 = NULL;
if (op->len > 0)
{
BS_SUBMIT_GET_SQE_DECL(sqe2);
}
(immediate_commit != IMMEDIATE_NONE ||
!journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size) ? 1 : 0) +
(op->len > 0 ? 1 : 0)
);
write_iodepth++;
// Got SQEs. Prepare previous journal sector write if required
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
if (immediate_commit == IMMEDIATE_NONE)
{
if (sqe1)
if (!journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size))
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
prepare_journal_sector_write(journal.cur_sector, op);
}
else
{
@ -380,9 +372,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
journal.crc32_last = je->crc32;
if (immediate_commit != IMMEDIATE_NONE)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
prepare_journal_sector_write(journal.cur_sector, op);
}
if (op->len > 0)
{
@ -392,7 +382,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
// Copy data
memcpy(journal.buffer + journal.next_free, op->buf, op->len);
}
ring_data_t *data2 = ((ring_data_t*)sqe2->user_data);
BS_SUBMIT_GET_SQE(sqe2, data2);
data2->iov = (struct iovec){ op->buf, op->len };
data2->callback = cb;
my_uring_prep_writev(
@ -441,13 +431,12 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry
{
BS_SUBMIT_CHECK_SQES(1);
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
assert(dirty_it != dirty_db.end());
io_uring_sqe *sqe = NULL;
BS_SUBMIT_GET_SQE_DECL(sqe);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write) + clean_entry_bitmap_size
@ -469,10 +458,7 @@ resume_2:
memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size);
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
prepare_journal_sector_write(journal, journal.cur_sector, sqe,
[this, op](ring_data_t *data) { handle_write_event(data, op); });
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
prepare_journal_sector_write(journal.cur_sector, op);
PRIV(op)->op_state = 3;
return 1;
}
@ -587,6 +573,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o
);
}
PRIV(op)->pending_ops--;
assert(PRIV(op)->pending_ops >= 0);
if (PRIV(op)->pending_ops == 0)
{
release_journal_sectors(op);
@ -604,7 +591,6 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
uint64_t s = PRIV(op)->min_flushed_journal_sector;
while (1)
{
journal.sector_info[s-1].flush_count--;
if (s != (1+journal.cur_sector) && journal.sector_info[s-1].flush_count == 0)
{
// We know for sure that we won't write into this sector anymore
@ -644,23 +630,19 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
return 0;
}
write_iodepth++;
io_uring_sqe *sqe = NULL;
if (immediate_commit != IMMEDIATE_NONE ||
(journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
journal.sector_info[journal.cur_sector].dirty)
{
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode
BS_SUBMIT_GET_SQE_DECL(sqe);
}
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode
BS_SUBMIT_CHECK_SQES(
(immediate_commit != IMMEDIATE_NONE ||
(journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
journal.sector_info[journal.cur_sector].dirty) ? 1 : 0
);
// Prepare journal sector write
if (immediate_commit == IMMEDIATE_NONE)
{
if (sqe)
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
journal.sector_info[journal.cur_sector].dirty)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
prepare_journal_sector_write(journal.cur_sector, op);
}
else
{
@ -687,9 +669,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
dirty_it->second.state = BS_ST_DELETE | BS_ST_SUBMITTED;
if (immediate_commit != IMMEDIATE_NONE)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
prepare_journal_sector_write(journal.cur_sector, op);
}
if (!PRIV(op)->pending_ops)
{

View File

@ -305,6 +305,10 @@ void cli_tool_t::run(json11::Json cfg)
fprintf(stderr, "unknown command: %s\n", cmd[0].string_value().c_str());
exit(1);
}
if (action_cb == NULL)
{
return;
}
color = !cfg["no-color"].bool_value();
json_output = cfg["json"].bool_value();
iodepth = cfg["iodepth"].uint64_value();

View File

@ -28,6 +28,7 @@
#include <vector>
#include <unordered_map>
#include "addr_util.h"
#include "rw_blocking.h"
#include "osd_ops.h"
#include "fio_headers.h"
@ -152,17 +153,14 @@ static int sec_init(struct thread_data *td)
bsd->block_order = o->block_order == 0 ? 17 : o->block_order;
bsd->block_size = 1 << o->block_order;
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, o->host ? o->host : "127.0.0.1", &addr.sin_addr)) != 1)
sockaddr addr;
if (!string_to_addr(std::string(o->host ? o->host : "127.0.0.1"), false, o->port > 0 ? o->port : 11203, &addr))
{
fprintf(stderr, "server address: %s%s\n", o->host ? o->host : "127.0.0.1", r == 0 ? " is not valid" : ": no ipv4 support");
fprintf(stderr, "server address: %s is not valid\n", o->host ? o->host : "127.0.0.1");
return 1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(o->port ? o->port : 11203);
bsd->connect_fd = socket(AF_INET, SOCK_STREAM, 0);
bsd->connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (bsd->connect_fd < 0)
{
perror("socket");

View File

@ -15,13 +15,13 @@
#include <stdexcept>
#include "addr_util.h"
#include "json11/json11.hpp"
#include "http_client.h"
#include "timerfd_manager.h"
#define READ_BUFFER_SIZE 9000
static int extract_port(std::string & host);
static std::string trim(const std::string & in);
static std::string ws_format_frame(int type, uint64_t size);
static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
@ -185,19 +185,15 @@ http_co_t::~http_co_t()
void http_co_t::start_connection()
{
stackin();
int port = extract_port(host);
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1)
struct sockaddr addr;
if (!string_to_addr(host.c_str(), 1, 80, &addr))
{
parsed.error_code = ENXIO;
stackout();
end();
return;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(port ? port : 80);
peer_fd = socket(AF_INET, SOCK_STREAM, 0);
peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (peer_fd < 0)
{
parsed.error_code = errno;
@ -219,7 +215,7 @@ void http_co_t::start_connection()
}
epoll_events = 0;
// Finally call connect
r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
int r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS)
{
parsed.error_code = errno;
@ -759,22 +755,6 @@ std::vector<std::string> getifaddr_list(json11::Json mask_cfg, bool include_v6)
return addresses;
}
static int extract_port(std::string & host)
{
int port = 0;
int pos = 0;
if ((pos = host.find(':')) >= 0)
{
port = strtoull(host.c_str() + pos + 1, NULL, 10);
if (port >= 0x10000)
{
port = 0;
}
host = host.substr(0, pos);
}
return port;
}
std::string strtolower(const std::string & in)
{
std::string s = in;

View File

@ -45,7 +45,7 @@ struct websocket_t
void parse_http_headers(std::string & res, http_response_t *parsed);
std::vector<std::string> getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = false);
std::vector<std::string> getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = true);
uint64_t stoull_full(const std::string & str, int base = 10);

View File

@ -8,6 +8,7 @@
#include <netinet/tcp.h>
#include <stdexcept>
#include "addr_util.h"
#include "messenger.h"
void osd_messenger_t::init()
@ -220,23 +221,20 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
{
assert(peer_osd != this->osd_num);
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
struct sockaddr addr;
if (!string_to_addr(peer_host, 0, peer_port, &addr))
{
on_connect_peer(peer_osd, -EINVAL);
return;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(peer_port ? peer_port : 11203);
int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
int peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (peer_fd < 0)
{
on_connect_peer(peer_osd, -errno);
return;
}
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
int r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS)
{
close(peer_fd);
@ -485,21 +483,20 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
void osd_messenger_t::accept_connections(int listen_fd)
{
// Accept new connections
sockaddr_in addr;
sockaddr addr;
socklen_t peer_addr_size = sizeof(addr);
int peer_fd;
while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0)
{
assert(peer_fd != 0);
char peer_str[256];
fprintf(stderr, "[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd,
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd,
addr_to_string(addr).c_str());
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
clients[peer_fd] = new osd_client_t();
clients[peer_fd]->peer_addr = addr;
clients[peer_fd]->peer_port = ntohs(addr.sin_port);
clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
clients[peer_fd]->peer_fd = peer_fd;
clients[peer_fd]->peer_state = PEER_CONNECTED;
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);

View File

@ -49,7 +49,7 @@ struct osd_client_t
{
int refs = 0;
sockaddr_in peer_addr;
sockaddr peer_addr;
int peer_port;
int peer_fd;
int peer_state;

View File

@ -7,6 +7,7 @@
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "addr_util.h"
#include "blockstore_impl.h"
#include "osd_primary.h"
#include "osd.h"
@ -156,14 +157,6 @@ void osd_t::parse_config(const json11::Json & config)
void osd_t::bind_socket()
{
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (config["osd_network"].is_string() ||
config["osd_network"].is_array())
{
@ -173,7 +166,7 @@ void osd_t::bind_socket()
else
for (auto v: config["osd_network"].array_items())
mask.push_back(v.string_value());
auto matched_addrs = getifaddr_list(mask, false);
auto matched_addrs = getifaddr_list(mask);
if (matched_addrs.size() > 1)
{
fprintf(stderr, "More than 1 address matches requested network(s): %s\n", json11::Json(matched_addrs).dump().c_str());
@ -192,17 +185,21 @@ void osd_t::bind_socket()
// FIXME Support multiple listening sockets
sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1)
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
close(listen_fd);
throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support"));
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
@ -215,7 +212,7 @@ void osd_t::bind_socket()
close(listen_fd);
throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
}
listening_port = ntohs(addr.sin_port);
listening_port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else
{

View File

@ -194,6 +194,24 @@ void osd_t::start_pg_peering(pg_t & pg)
});
}
}
if (pg.pg_cursize < pg.pg_minsize)
{
pg.state = PG_INCOMPLETE;
report_pg_state(pg);
return;
}
std::set<osd_num_t> cur_peers;
for (auto pg_osd: pg.all_peers)
{
if (pg_osd == this->osd_num || msgr.osd_peer_fds.find(pg_osd) != msgr.osd_peer_fds.end())
{
cur_peers.insert(pg_osd);
}
else if (msgr.wanted_peers.find(pg_osd) == msgr.wanted_peers.end())
{
msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
}
}
if (pg.target_history.size())
{
// Refuse to start PG if no peers are available from any of the historical OSD sets
@ -222,24 +240,6 @@ void osd_t::start_pg_peering(pg_t & pg)
}
}
}
if (pg.pg_cursize < pg.pg_minsize)
{
pg.state = PG_INCOMPLETE;
report_pg_state(pg);
return;
}
std::set<osd_num_t> cur_peers;
for (auto pg_osd: pg.all_peers)
{
if (pg_osd == this->osd_num || msgr.osd_peer_fds.find(pg_osd) != msgr.osd_peer_fds.end())
{
cur_peers.insert(pg_osd);
}
else if (msgr.wanted_peers.find(pg_osd) == msgr.wanted_peers.end())
{
msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
}
}
pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end());
if (pg.peering_state)
{

View File

@ -16,6 +16,7 @@
#include <stdexcept>
#include "addr_util.h"
#include "osd_ops.h"
#include "rw_blocking.h"
#include "test_pattern.h"
@ -133,17 +134,14 @@ int main(int narg, char *args[])
int connect_osd(const char *osd_address, int osd_port)
{
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, osd_address, &addr.sin_addr)) != 1)
struct sockaddr addr;
if (!string_to_addr(osd_address, 0, osd_port, &addr))
{
fprintf(stderr, "server address: %s%s\n", osd_address, r == 0 ? " is not valid" : ": no ipv4 support");
fprintf(stderr, "server address: %s is not valid\n", osd_address);
return -1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(osd_port);
int connect_fd = socket(AF_INET, SOCK_STREAM, 0);
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (connect_fd < 0)
{
perror("socket");

View File

@ -21,6 +21,7 @@
#include <stdexcept>
#include "addr_util.h"
#include "rw_blocking.h"
#include "osd_ops.h"
@ -66,16 +67,14 @@ int main(int narg, char *args[])
int connect_stub(const char *server_address, int server_port)
{
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, server_address, &addr.sin_addr)) != 1)
struct sockaddr addr;
if (!string_to_addr(server_address, 0, server_port, &addr))
{
fprintf(stderr, "server address: %s%s\n", server_address, r == 0 ? " is not valid" : ": no ipv4 support");
fprintf(stderr, "server address: %s is not valid\n", server_address);
return -1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
int connect_fd = socket(AF_INET, SOCK_STREAM, 0);
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (connect_fd < 0)
{
perror("socket");

View File

@ -37,10 +37,11 @@
#include <stdexcept>
#include "addr_util.h"
#include "rw_blocking.h"
#include "osd_ops.h"
int bind_stub(const char *bind_address, int bind_port);
int bind_stub(std::string bind_address, int bind_port);
void run_stub(int peer_fd);
@ -48,13 +49,13 @@ int main(int narg, char *args[])
{
int listen_fd = bind_stub("0.0.0.0", 11203);
// Accept new connections
sockaddr_in addr;
sockaddr addr;
socklen_t peer_addr_size = sizeof(addr);
int peer_fd;
while (1)
{
printf("stub_osd: waiting for 1 client\n");
peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size);
peer_fd = accept(listen_fd, &addr, &peer_addr_size);
if (peer_fd == -1)
{
if (errno == EAGAIN)
@ -62,9 +63,8 @@ int main(int narg, char *args[])
else
throw std::runtime_error(std::string("accept: ") + strerror(errno));
}
char peer_str[256];
printf("stub_osd: new client %d: connection from %s port %d\n", peer_fd,
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
printf("stub_osd: new client %d: connection from %s\n", peer_fd,
addr_to_string(addr).c_str());
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
run_stub(peer_fd);
@ -76,11 +76,17 @@ int main(int narg, char *args[])
return 0;
}
int bind_stub(const char *bind_address, int bind_port)
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
@ -88,17 +94,7 @@ int bind_stub(const char *bind_address, int bind_port)
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1)
{
close(listen_fd);
throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support"));
}
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));

View File

@ -20,11 +20,12 @@
#include <stdexcept>
#include "addr_util.h"
#include "ringloop.h"
#include "epoll_manager.h"
#include "messenger.h"
int bind_stub(const char *bind_address, int bind_port);
int bind_stub(std::string bind_address, int bind_port);
void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op);
@ -66,11 +67,17 @@ int main(int narg, char *args[])
return 0;
}
int bind_stub(const char *bind_address, int bind_port)
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
@ -78,17 +85,7 @@ int bind_stub(const char *bind_address, int bind_port)
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1)
{
close(listen_fd);
throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support"));
}
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));