Add osd header and osd executable
parent
d3c6314d01
commit
49f8011917
4
Makefile
4
Makefile
|
@ -1,13 +1,15 @@
|
||||||
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
|
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
|
||||||
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o osd.o
|
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o osd.o
|
||||||
CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always
|
CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always
|
||||||
all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so
|
all: $(BLOCKSTORE_OBJS) test test_blockstore libfio_blockstore.so osd
|
||||||
clean:
|
clean:
|
||||||
rm -f *.o
|
rm -f *.o
|
||||||
crc32c.o: crc32c.c
|
crc32c.o: crc32c.c
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h
|
%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h timerfd_interval.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
|
osd: $(BLOCKSTORE_OBJS) osd_main.cpp osd.h osd_ops.h
|
||||||
|
g++ $(CXXFLAGS) -ltcmalloc_minimal -luring -o osd osd_main.cpp $(BLOCKSTORE_OBJS)
|
||||||
test: test.cpp
|
test: test.cpp
|
||||||
g++ $(CXXFLAGS) -o test -luring test.cpp
|
g++ $(CXXFLAGS) -o test -luring test.cpp
|
||||||
test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp
|
test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp
|
||||||
|
|
101
osd.cpp
101
osd.cpp
|
@ -4,10 +4,7 @@
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
|
|
||||||
#include <unordered_map>
|
#include "osd.h"
|
||||||
|
|
||||||
#include "osd_ops.h"
|
|
||||||
#include "ringloop.h"
|
|
||||||
|
|
||||||
#define CL_READ_OP 1
|
#define CL_READ_OP 1
|
||||||
#define CL_READ_DATA 2
|
#define CL_READ_DATA 2
|
||||||
|
@ -16,97 +13,15 @@
|
||||||
#define CL_WRITE_REPLY 2
|
#define CL_WRITE_REPLY 2
|
||||||
#define CL_WRITE_DATA 3
|
#define CL_WRITE_DATA 3
|
||||||
|
|
||||||
struct osd_op_t
|
osd_t::osd_t(blockstore_config_t & config, blockstore *bs, ring_loop_t *ringloop)
|
||||||
{
|
{
|
||||||
int peer_fd;
|
bind_address = config["bind_address"];
|
||||||
union
|
if (bind_address == "")
|
||||||
{
|
bind_address = "0.0.0.0";
|
||||||
osd_any_op_t op;
|
bind_port = strtoull(config["bind_port"].c_str(), NULL, 10);
|
||||||
uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 };
|
if (!bind_port || bind_port > 65535)
|
||||||
};
|
bind_port = 11203;
|
||||||
union
|
|
||||||
{
|
|
||||||
osd_any_reply_t reply;
|
|
||||||
uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 };
|
|
||||||
};
|
|
||||||
blockstore_operation bs_op;
|
|
||||||
void *buf = NULL;
|
|
||||||
|
|
||||||
~osd_op_t()
|
|
||||||
{
|
|
||||||
if (buf)
|
|
||||||
free(buf);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct osd_client_t
|
|
||||||
{
|
|
||||||
sockaddr_in peer_addr;
|
|
||||||
socklen_t peer_addr_size;
|
|
||||||
int peer_fd;
|
|
||||||
//int in_flight_ops = 0;
|
|
||||||
|
|
||||||
// Read state
|
|
||||||
bool read_ready = false;
|
|
||||||
bool reading = false;
|
|
||||||
osd_op_t *read_op = NULL;
|
|
||||||
iovec read_iov;
|
|
||||||
msghdr read_msg;
|
|
||||||
void *read_buf = NULL;
|
|
||||||
int read_remaining = 0;
|
|
||||||
int read_state = 0;
|
|
||||||
|
|
||||||
// Completed operations to send replies back to the client
|
|
||||||
std::deque<osd_op_t*> completions;
|
|
||||||
|
|
||||||
// Write state
|
|
||||||
osd_op_t *write_op = NULL;
|
|
||||||
iovec write_iov;
|
|
||||||
msghdr write_msg;
|
|
||||||
void *write_buf = NULL;
|
|
||||||
int write_remaining = 0;
|
|
||||||
int write_state = 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
class osd_t
|
|
||||||
{
|
|
||||||
// config
|
|
||||||
|
|
||||||
int client_queue_depth = 128;
|
|
||||||
|
|
||||||
// fields
|
|
||||||
|
|
||||||
blockstore *bs;
|
|
||||||
ring_loop_t *ringloop;
|
|
||||||
|
|
||||||
int wait_state = 0;
|
|
||||||
int epoll_fd = 0;
|
|
||||||
int listen_fd = 0;
|
|
||||||
ring_consumer_t consumer;
|
|
||||||
|
|
||||||
std::string bind_address;
|
|
||||||
int bind_port, listen_backlog;
|
|
||||||
|
|
||||||
std::unordered_map<int,osd_client_t> clients;
|
|
||||||
std::vector<int> read_ready_clients;
|
|
||||||
std::vector<int> write_ready_clients;
|
|
||||||
|
|
||||||
void loop();
|
|
||||||
int handle_epoll_events();
|
|
||||||
void stop_client(int peer_fd);
|
|
||||||
void read_requests();
|
|
||||||
void handle_read(ring_data_t *data, int peer_fd);
|
|
||||||
void enqueue_op(osd_op_t *cur_op);
|
|
||||||
void send_replies();
|
|
||||||
void make_reply(osd_op_t *op);
|
|
||||||
void handle_send(ring_data_t *data, int peer_fd);
|
|
||||||
public:
|
|
||||||
osd_t(blockstore *bs, ring_loop_t *ringloop);
|
|
||||||
~osd_t();
|
|
||||||
};
|
|
||||||
|
|
||||||
osd_t::osd_t(blockstore *bs, ring_loop_t *ringloop)
|
|
||||||
{
|
|
||||||
this->bs = bs;
|
this->bs = bs;
|
||||||
this->ringloop = ringloop;
|
this->ringloop = ringloop;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
|
#include "ringloop.h"
|
||||||
|
#include "osd_ops.h"
|
||||||
|
|
||||||
|
struct osd_op_t
|
||||||
|
{
|
||||||
|
int peer_fd;
|
||||||
|
union
|
||||||
|
{
|
||||||
|
osd_any_op_t op;
|
||||||
|
uint8_t op_buf[OSD_OP_PACKET_SIZE] = { 0 };
|
||||||
|
};
|
||||||
|
union
|
||||||
|
{
|
||||||
|
osd_any_reply_t reply;
|
||||||
|
uint8_t reply_buf[OSD_REPLY_PACKET_SIZE] = { 0 };
|
||||||
|
};
|
||||||
|
blockstore_operation bs_op;
|
||||||
|
void *buf = NULL;
|
||||||
|
|
||||||
|
~osd_op_t()
|
||||||
|
{
|
||||||
|
if (buf)
|
||||||
|
free(buf);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct osd_client_t
|
||||||
|
{
|
||||||
|
sockaddr_in peer_addr;
|
||||||
|
socklen_t peer_addr_size;
|
||||||
|
int peer_fd;
|
||||||
|
//int in_flight_ops = 0;
|
||||||
|
|
||||||
|
// Read state
|
||||||
|
bool read_ready = false;
|
||||||
|
bool reading = false;
|
||||||
|
osd_op_t *read_op = NULL;
|
||||||
|
iovec read_iov;
|
||||||
|
msghdr read_msg;
|
||||||
|
void *read_buf = NULL;
|
||||||
|
int read_remaining = 0;
|
||||||
|
int read_state = 0;
|
||||||
|
|
||||||
|
// Completed operations to send replies back to the client
|
||||||
|
std::deque<osd_op_t*> completions;
|
||||||
|
|
||||||
|
// Write state
|
||||||
|
osd_op_t *write_op = NULL;
|
||||||
|
iovec write_iov;
|
||||||
|
msghdr write_msg;
|
||||||
|
void *write_buf = NULL;
|
||||||
|
int write_remaining = 0;
|
||||||
|
int write_state = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
class osd_t
|
||||||
|
{
|
||||||
|
// config
|
||||||
|
|
||||||
|
int client_queue_depth = 128;
|
||||||
|
|
||||||
|
// fields
|
||||||
|
|
||||||
|
blockstore *bs;
|
||||||
|
ring_loop_t *ringloop;
|
||||||
|
|
||||||
|
int wait_state = 0;
|
||||||
|
int epoll_fd = 0;
|
||||||
|
int listen_fd = 0;
|
||||||
|
ring_consumer_t consumer;
|
||||||
|
|
||||||
|
std::string bind_address;
|
||||||
|
int bind_port, listen_backlog;
|
||||||
|
|
||||||
|
std::unordered_map<int,osd_client_t> clients;
|
||||||
|
std::vector<int> read_ready_clients;
|
||||||
|
std::vector<int> write_ready_clients;
|
||||||
|
|
||||||
|
void loop();
|
||||||
|
int handle_epoll_events();
|
||||||
|
void stop_client(int peer_fd);
|
||||||
|
void read_requests();
|
||||||
|
void handle_read(ring_data_t *data, int peer_fd);
|
||||||
|
void enqueue_op(osd_op_t *cur_op);
|
||||||
|
void send_replies();
|
||||||
|
void make_reply(osd_op_t *op);
|
||||||
|
void handle_send(ring_data_t *data, int peer_fd);
|
||||||
|
public:
|
||||||
|
osd_t(blockstore_config_t & config, blockstore *bs, ring_loop_t *ringloop);
|
||||||
|
~osd_t();
|
||||||
|
};
|
|
@ -0,0 +1,27 @@
|
||||||
|
#include "osd.h"
|
||||||
|
|
||||||
|
int main(int narg, char *args[])
|
||||||
|
{
|
||||||
|
if (sizeof(osd_any_op_t) >= OSD_OP_PACKET_SIZE ||
|
||||||
|
sizeof(osd_any_reply_t) >= OSD_REPLY_PACKET_SIZE)
|
||||||
|
{
|
||||||
|
perror("BUG: too small packet size");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
blockstore_config_t config;
|
||||||
|
config["meta_device"] = "./test_meta.bin";
|
||||||
|
config["journal_device"] = "./test_journal.bin";
|
||||||
|
config["data_device"] = "./test_data.bin";
|
||||||
|
ring_loop_t *ringloop = new ring_loop_t(512);
|
||||||
|
blockstore *bs = new blockstore(config, ringloop);
|
||||||
|
osd_t *osd = new osd_t(config, bs, ringloop);
|
||||||
|
while (1)
|
||||||
|
{
|
||||||
|
ringloop->loop();
|
||||||
|
ringloop->wait();
|
||||||
|
}
|
||||||
|
delete osd;
|
||||||
|
delete bs;
|
||||||
|
delete ringloop;
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -61,8 +61,6 @@ struct __attribute__((__packed__)) osd_op_secondary_rw_t
|
||||||
struct __attribute__((__packed__)) osd_reply_secondary_rw_t
|
struct __attribute__((__packed__)) osd_reply_secondary_rw_t
|
||||||
{
|
{
|
||||||
osd_reply_header_t header;
|
osd_reply_header_t header;
|
||||||
// buffer size
|
|
||||||
uint64_t len;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// delete object on the secondary OSD
|
// delete object on the secondary OSD
|
||||||
|
@ -115,8 +113,6 @@ struct __attribute__((__packed__)) osd_op_secondary_list_t
|
||||||
struct __attribute__((__packed__)) osd_reply_secondary_list_t
|
struct __attribute__((__packed__)) osd_reply_secondary_list_t
|
||||||
{
|
{
|
||||||
osd_reply_header_t header;
|
osd_reply_header_t header;
|
||||||
// oid array length
|
|
||||||
uint64_t len;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
union osd_any_op_t
|
union osd_any_op_t
|
||||||
|
@ -138,7 +134,3 @@ union osd_any_reply_t
|
||||||
osd_reply_secondary_stabilize_t sec_stabilize;
|
osd_reply_secondary_stabilize_t sec_stabilize;
|
||||||
osd_reply_secondary_list_t sec_list;
|
osd_reply_secondary_list_t sec_list;
|
||||||
};
|
};
|
||||||
|
|
||||||
static int size_ok = sizeof(osd_any_op_t) < OSD_OP_PACKET_SIZE &&
|
|
||||||
sizeof(osd_any_reply_t) < OSD_REPLY_PACKET_SIZE
|
|
||||||
? (perror("BUG: too small packet size"), 0) : 1;
|
|
||||||
|
|
Loading…
Reference in New Issue