diff --git a/mon/mon.js b/mon/mon.js index 70b50218..a1ef2586 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -84,7 +84,8 @@ const etcd_tree = { osd_ping_timeout: 5, // seconds. min: 1 up_wait_retry_interval: 500, // ms. min: 50 // osd - etcd_report_interval: 5, + etcd_report_interval: 5, // seconds + etcd_keepalive_interval: 10, // seconds, default is etcd_report_interval*2 run_primary: true, osd_network: null, // "192.168.7.0/24" or an array of masks bind_address: "0.0.0.0", diff --git a/src/addr_util.cpp b/src/addr_util.cpp index fdab6c00..411ae4bc 100644 --- a/src/addr_util.cpp +++ b/src/addr_util.cpp @@ -1,4 +1,7 @@ #include +#include +#include +#include #include #include @@ -58,3 +61,128 @@ std::string addr_to_string(const sockaddr &addr) throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); return std::string(peer_str)+":"+std::to_string(port); } + +static bool cidr_match(const in_addr &addr, const in_addr &net, uint8_t bits) +{ + if (bits == 0) + { + // C99 6.5.7 (3): u32 << 32 is undefined behaviour + return true; + } + return !((addr.s_addr ^ net.s_addr) & htonl(0xFFFFFFFFu << (32 - bits))); +} + +static bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits) +{ + const uint32_t *a = address.s6_addr32; + const uint32_t *n = network.s6_addr32; + int bits_whole, bits_incomplete; + bits_whole = bits >> 5; // number of whole u32 + bits_incomplete = bits & 0x1F; // number of bits in incomplete u32 + if (bits_whole && memcmp(a, n, bits_whole << 2)) + return false; + if (bits_incomplete) + { + uint32_t mask = htonl((0xFFFFFFFFu) << (32 - bits_incomplete)); + if ((a[bits_whole] ^ n[bits_whole]) & mask) + return false; + } + return true; +} + +struct addr_mask_t +{ + sa_family_t family; + in_addr ipv4; + in6_addr ipv6; + uint8_t bits; +}; + +std::vector getifaddr_list(std::vector mask_cfg, bool include_v6) +{ + std::vector masks; + for (auto mask: mask_cfg) + { + unsigned bits = 0; + int p = mask.find('/'); + if (p != std::string::npos) + { + char null_byte = 0; + if (sscanf(mask.c_str()+p+1, "%u%c", &bits, &null_byte) != 1 || bits > 128) + { + throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask); + } + mask = mask.substr(0, p); + } + in_addr ipv4; + in6_addr ipv6; + if (inet_pton(AF_INET, mask.c_str(), &ipv4) == 1) + { + if (bits > 32) + { + throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask); + } + masks.push_back((addr_mask_t){ .family = AF_INET, .ipv4 = ipv4, .bits = (uint8_t)bits }); + } + else if (include_v6 && inet_pton(AF_INET6, mask.c_str(), &ipv6) == 1) + { + masks.push_back((addr_mask_t){ .family = AF_INET6, .ipv6 = ipv6, .bits = (uint8_t)bits }); + } + else + { + throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask); + } + } + std::vector addresses; + ifaddrs *list, *ifa; + if (getifaddrs(&list) == -1) + { + throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno)); + } + for (ifa = list; ifa != NULL; ifa = ifa->ifa_next) + { + if (!ifa->ifa_addr) + { + continue; + } + int family = ifa->ifa_addr->sa_family; + if ((family == AF_INET || family == AF_INET6 && include_v6) && + (ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING)) + { + void *addr_ptr; + if (family == AF_INET) + { + addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr; + } + else + { + addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; + } + if (masks.size() > 0) + { + int i; + for (i = 0; i < masks.size(); i++) + { + if (masks[i].family == family && (family == AF_INET + ? cidr_match(*(in_addr*)addr_ptr, masks[i].ipv4, masks[i].bits) + : cidr6_match(*(in6_addr*)addr_ptr, masks[i].ipv6, masks[i].bits))) + { + break; + } + } + if (i >= masks.size()) + { + continue; + } + } + char addr[INET6_ADDRSTRLEN]; + if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN)) + { + throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); + } + addresses.push_back(std::string(addr)); + } + } + freeifaddrs(list); + return addresses; +} diff --git a/src/addr_util.h b/src/addr_util.h index 53c6eb31..b8f7da75 100644 --- a/src/addr_util.h +++ b/src/addr_util.h @@ -2,6 +2,8 @@ #include #include +#include bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr); std::string addr_to_string(const sockaddr &addr); +std::vector getifaddr_list(std::vector mask_cfg = std::vector(), bool include_v6 = false); diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index 3d18b959..66108bf3 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -5,6 +5,7 @@ #include "pg_states.h" #include "etcd_state_client.h" #ifndef __MOCK__ +#include "addr_util.h" #include "http_client.h" #include "base64.h" #endif @@ -25,9 +26,14 @@ etcd_state_client_t::~etcd_state_client_t() #ifndef __MOCK__ if (etcd_watch_ws) { - etcd_watch_ws->close(); + http_close(etcd_watch_ws); etcd_watch_ws = NULL; } + if (keepalive_client) + { + http_close(keepalive_client); + keepalive_client = NULL; + } #endif } @@ -74,14 +80,26 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t "Host: "+etcd_address+"\r\n" "Content-Type: application/json\r\n" "Content-Length: "+std::to_string(req.size())+"\r\n" - "Connection: close\r\n" + "Connection: keep-alive\r\n" + "Keep-Alive: timeout="+std::to_string(etcd_keepalive_interval)+"\r\n" "\r\n"+req; - http_request_json(tfd, etcd_address, req, timeout, [this, cur_addr = selected_etcd_address, callback](std::string err, json11::Json data) + auto cb = [this, cur_addr = selected_etcd_address, callback](const http_response_t *response) { - if (err != "" && cur_addr == selected_etcd_address) - selected_etcd_address = ""; + std::string err; + json11::Json data; + response->parse_json_response(err, data); + if (err != "") + { + if (cur_addr == selected_etcd_address) + selected_etcd_address = ""; + } callback(err, data); - }); + }; + if (!keepalive_client) + { + keepalive_client = http_init(tfd); + } + http_request(keepalive_client, etcd_address, req, { .timeout = timeout, .keepalive = true }, cb); } void etcd_state_client_t::add_etcd_url(std::string addr) @@ -155,6 +173,13 @@ void etcd_state_client_t::parse_config(const json11::Json & config) this->etcd_prefix = "/"+this->etcd_prefix; } this->log_level = config["log_level"].int64_value(); + this->etcd_keepalive_interval = config["etcd_keepalive_interval"].uint64_value(); + if (this->etcd_keepalive_interval <= 0) + { + this->etcd_keepalive_interval = config["etcd_report_interval"].uint64_value() * 2; + if (this->etcd_keepalive_interval <= 0) + this->etcd_keepalive_interval = 10; + } } void etcd_state_client_t::pick_next_etcd() @@ -200,7 +225,7 @@ void etcd_state_client_t::start_etcd_watcher() ws_alive = 1; if (etcd_watch_ws) { - etcd_watch_ws->close(); + http_close(etcd_watch_ws); etcd_watch_ws = NULL; } etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, @@ -232,7 +257,7 @@ void etcd_state_client_t::start_etcd_watcher() { fprintf(stderr, "Revisions before %lu were compacted by etcd, reloading state\n", data["result"]["compact_revision"].uint64_value()); - etcd_watch_ws->close(); + http_close(etcd_watch_ws); etcd_watch_ws = NULL; etcd_watch_revision = 0; on_reload_hook(); @@ -286,6 +311,7 @@ void etcd_state_client_t::start_etcd_watcher() { selected_etcd_address = ""; } + http_close(etcd_watch_ws); etcd_watch_ws = NULL; if (etcd_watches_initialised == 0) { @@ -302,7 +328,7 @@ void etcd_state_client_t::start_etcd_watcher() } } }); - etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object { { "create_request", json11::Json::object { { "key", base64_encode(etcd_prefix+"/config/") }, { "range_end", base64_encode(etcd_prefix+"/config0") }, @@ -311,7 +337,7 @@ void etcd_state_client_t::start_etcd_watcher() { "progress_notify", true }, } } }).dump()); - etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object { { "create_request", json11::Json::object { { "key", base64_encode(etcd_prefix+"/osd/state/") }, { "range_end", base64_encode(etcd_prefix+"/osd/state0") }, @@ -320,7 +346,7 @@ void etcd_state_client_t::start_etcd_watcher() { "progress_notify", true }, } } }).dump()); - etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object { { "create_request", json11::Json::object { { "key", base64_encode(etcd_prefix+"/pg/state/") }, { "range_end", base64_encode(etcd_prefix+"/pg/state0") }, @@ -329,7 +355,7 @@ void etcd_state_client_t::start_etcd_watcher() { "progress_notify", true }, } } }).dump()); - etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object { { "create_request", json11::Json::object { { "key", base64_encode(etcd_prefix+"/pg/history/") }, { "range_end", base64_encode(etcd_prefix+"/pg/history0") }, @@ -348,14 +374,14 @@ void etcd_state_client_t::start_etcd_watcher() } else if (!ws_alive) { - etcd_watch_ws->close(); + http_close(etcd_watch_ws); etcd_watch_ws = NULL; start_etcd_watcher(); } else { ws_alive = 0; - etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + http_post_message(etcd_watch_ws, WS_TEXT, json11::Json(json11::Json::object { { "progress_request", json11::Json::object { } } }).dump()); } diff --git a/src/etcd_state_client.h b/src/etcd_state_client.h index fbd96c75..9873f993 100644 --- a/src/etcd_state_client.h +++ b/src/etcd_state_client.h @@ -71,7 +71,7 @@ struct inode_watch_t inode_config_t cfg; }; -struct websocket_t; +struct http_co_t; struct etcd_state_client_t { @@ -82,10 +82,11 @@ protected: std::string selected_etcd_address; std::vector addresses_to_try; std::vector watches; - websocket_t *etcd_watch_ws = NULL; + http_co_t *etcd_watch_ws = NULL, *keepalive_client = NULL; int ws_keepalive_timer = -1; int ws_alive = 0; uint64_t bs_block_size = DEFAULT_BLOCK_SIZE; + int etcd_keepalive_interval = 10; void add_etcd_url(std::string); void pick_next_etcd(); public: diff --git a/src/http_client.cpp b/src/http_client.cpp index 82191f16..93fa472c 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -4,9 +4,7 @@ #include #include -#include #include -#include #include #include @@ -25,11 +23,12 @@ static std::string trim(const std::string & in); static std::string ws_format_frame(int type, uint64_t size); static bool ws_parse_frame(std::string & buf, int & type, std::string & res); +static void parse_http_headers(std::string & res, http_response_t *parsed); -// FIXME: Use keepalive struct http_co_t { timerfd_manager_t *tfd; + std::function response_callback; int request_timeout = 0; std::string host; @@ -37,11 +36,12 @@ struct http_co_t std::string ws_outbox; std::string response; bool want_streaming; + bool keepalive; - http_response_t parsed; - uint64_t target_response_size = 0; + std::vector> keepalive_queue; int state = 0; + std::string connected_host; int peer_fd = -1; int timeout_id = -1; int epoll_events = 0; @@ -49,10 +49,8 @@ struct http_co_t std::vector rbuf; iovec read_iov, send_iov; msghdr read_msg = { 0 }, send_msg = { 0 }; - - std::function callback; - - websocket_t ws; + http_response_t parsed; + uint64_t target_response_size = 0; int onstack = 0; bool ended = false; @@ -62,65 +60,39 @@ struct http_co_t inline void stackout() { onstack--; if (!onstack && ended) end(); } inline void end() { ended = true; if (!onstack) { delete this; } } void start_connection(); + void close_connection(); void handle_events(); void handle_connect_result(); void submit_read(); void submit_send(); bool handle_read(); + void fill_parsed_response(); void post_message(int type, const std::string & msg); + void send_request(const std::string & host, const std::string & request, + const http_options_t & options, std::function response_callback); }; +#define HTTP_CO_CLOSED 0 #define HTTP_CO_CONNECTING 1 #define HTTP_CO_SENDING_REQUEST 2 #define HTTP_CO_REQUEST_SENT 3 #define HTTP_CO_HEADERS_RECEIVED 4 #define HTTP_CO_WEBSOCKET 5 #define HTTP_CO_CHUNKED 6 +#define HTTP_CO_KEEPALIVE 7 #define DEFAULT_TIMEOUT 5000 -void http_request(timerfd_manager_t *tfd, const std::string & host, const std::string & request, - const http_options_t & options, std::function callback) +http_co_t *http_init(timerfd_manager_t *tfd) { http_co_t *handler = new http_co_t(); - handler->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout); - handler->want_streaming = options.want_streaming; handler->tfd = tfd; - handler->host = host; - handler->request = request; - handler->callback = callback; - handler->ws.co = handler; - handler->start_connection(); + handler->state = HTTP_CO_CLOSED; + return handler; } -void http_request_json(timerfd_manager_t *tfd, const std::string & host, const std::string & request, - int timeout, std::function callback) -{ - http_request(tfd, host, request, { .timeout = timeout }, [callback](const http_response_t* res) - { - if (res->error_code != 0) - { - callback("Error code: "+std::to_string(res->error_code)+" ("+std::string(strerror(res->error_code))+")", json11::Json()); - return; - } - if (res->status_code != 200) - { - callback("HTTP "+std::to_string(res->status_code)+" "+res->status_line+" body: "+trim(res->body), json11::Json()); - return; - } - std::string json_err; - json11::Json data = json11::Json::parse(res->body, json_err); - if (json_err != "") - { - callback("Bad JSON: "+json_err+" (response: "+trim(res->body)+")", json11::Json()); - return; - } - callback(std::string(), data); - }); -} - -websocket_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, const std::string & path, - int timeout, std::function callback) +http_co_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, const std::string & path, + int timeout, std::function response_callback) { std::string request = "GET "+path+" HTTP/1.1\r\n" "Host: "+host+"\r\n" @@ -130,40 +102,145 @@ websocket_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, co "Sec-WebSocket-Version: 13\r\n" "\r\n"; http_co_t *handler = new http_co_t(); + handler->tfd = tfd; + handler->state = HTTP_CO_CLOSED; + handler->host = host; handler->request_timeout = timeout < 0 ? -1 : (timeout == 0 ? DEFAULT_TIMEOUT : timeout); handler->want_streaming = false; - handler->tfd = tfd; - handler->host = host; + handler->keepalive = false; handler->request = request; - handler->callback = callback; - handler->ws.co = handler; + handler->response_callback = response_callback; handler->start_connection(); - return &handler->ws; + return handler; } -void websocket_t::post_message(int type, const std::string & msg) +void http_request(http_co_t *handler, const std::string & host, const std::string & request, + const http_options_t & options, std::function response_callback) { - co->post_message(type, msg); + handler->send_request(host, request, options, response_callback); } -void websocket_t::close() +void http_co_t::send_request(const std::string & host, const std::string & request, + const http_options_t & options, std::function response_callback) { - co->end(); + stackin(); + if (state == HTTP_CO_WEBSOCKET) + { + stackout(); + throw std::runtime_error("Attempt to send HTTP request into a websocket or chunked stream"); + } + else if (state != HTTP_CO_KEEPALIVE && state != HTTP_CO_CLOSED) + { + keepalive_queue.push_back([this, host, request, options, response_callback]() + { + this->send_request(host, request, options, response_callback); + }); + stackout(); + return; + } + this->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout); + this->want_streaming = options.want_streaming; + this->keepalive = options.keepalive; + this->host = host; + this->request = request; + this->response = ""; + this->sent = 0; + this->response_callback = response_callback; + if (state == HTTP_CO_KEEPALIVE && connected_host != host) + { + close_connection(); + } + if (request_timeout > 0) + { + timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id) + { + stackin(); + close_connection(); + parsed = { .error = "HTTP request timed out" }; + this->response_callback(&parsed); + this->response_callback = NULL; + stackout(); + }); + } + if (state == HTTP_CO_KEEPALIVE) + { + state = HTTP_CO_SENDING_REQUEST; + submit_send(); + } + else + { + start_connection(); + } + stackout(); +} + +void http_post_message(http_co_t *handler, int type, const std::string & msg) +{ + handler->post_message(type, msg); +} + +void http_co_t::post_message(int type, const std::string & msg) +{ + stackin(); + if (state == HTTP_CO_WEBSOCKET) + { + request += ws_format_frame(type, msg.size()); + request += msg; + submit_send(); + } + else if (state == HTTP_CO_KEEPALIVE || state == HTTP_CO_CHUNKED) + { + throw std::runtime_error("Attempt to send websocket message on a regular HTTP connection"); + } + else + { + ws_outbox += ws_format_frame(type, msg.size()); + ws_outbox += msg; + } + stackout(); +} + +void http_close(http_co_t *handler) +{ + handler->end(); +} + +void http_response_t::parse_json_response(std::string & error, json11::Json & r) const +{ + if (this->error != "") + { + error = this->error; + r = json11::Json(); + } + else if (status_code != 200) + { + error = "HTTP "+std::to_string(status_code)+" "+status_line+" body: "+trim(body); + r = json11::Json(); + } + else + { + std::string json_err; + json11::Json data = json11::Json::parse(body, json_err); + if (json_err != "") + { + error = "Bad JSON: "+json_err+" (response: "+trim(body)+")"; + r = json11::Json(); + } + else + { + error = ""; + r = data; + } + } } http_co_t::~http_co_t() { - if (timeout_id >= 0) - { - tfd->clear_timer(timeout_id); - timeout_id = -1; - } - if (peer_fd >= 0) - { - tfd->set_fd_handler(peer_fd, false, NULL); - close(peer_fd); - peer_fd = -1; - } + close_connection(); +} + +void http_co_t::fill_parsed_response() +{ if (parsed.headers["transfer-encoding"] == "chunked") { int prev = 0, pos = 0; @@ -178,8 +255,24 @@ http_co_t::~http_co_t() { std::swap(parsed.body, response); } - parsed.eof = true; - callback(&parsed); +} + +void http_co_t::close_connection() +{ + if (timeout_id >= 0) + { + tfd->clear_timer(timeout_id); + timeout_id = -1; + } + if (peer_fd >= 0) + { + tfd->set_fd_handler(peer_fd, false, NULL); + close(peer_fd); + peer_fd = -1; + } + state = HTTP_CO_CLOSED; + connected_host = ""; + response = ""; } void http_co_t::start_connection() @@ -188,39 +281,31 @@ void http_co_t::start_connection() struct sockaddr addr; if (!string_to_addr(host.c_str(), 1, 80, &addr)) { - parsed.error_code = ENXIO; + parsed = { .error = "Invalid address: "+host }; + response_callback(&parsed); + response_callback = NULL; stackout(); - end(); return; } peer_fd = socket(addr.sa_family, SOCK_STREAM, 0); if (peer_fd < 0) { - parsed.error_code = errno; + parsed = { .error = std::string("socket: ")+strerror(errno) }; + response_callback(&parsed); + response_callback = NULL; stackout(); - end(); return; } fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); - if (request_timeout > 0) - { - timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id) - { - if (response.length() == 0) - { - parsed.error_code = ETIME; - } - end(); - }); - } epoll_events = 0; // Finally call connect int r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); if (r < 0 && errno != EINPROGRESS) { - parsed.error_code = errno; + parsed = { .error = std::string("connect: ")+strerror(errno) }; + response_callback(&parsed); + response_callback = NULL; stackout(); - end(); return; } tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) @@ -228,6 +313,7 @@ void http_co_t::start_connection() this->epoll_events |= epoll_events; handle_events(); }); + connected_host = host; state = HTTP_CO_CONNECTING; stackout(); } @@ -250,7 +336,14 @@ void http_co_t::handle_events() } else if (epoll_events & (EPOLLRDHUP|EPOLLERR)) { - end(); + close_connection(); + if (response_callback != NULL) + { + parsed.eof = true; + response_callback(&parsed); + response_callback = NULL; + parsed = {}; + } break; } } @@ -269,9 +362,11 @@ void http_co_t::handle_connect_result() } if (result != 0) { - parsed.error_code = result; + close_connection(); + parsed = { .error = std::string("connect: ")+strerror(result) }; + response_callback(&parsed); + response_callback = NULL; stackout(); - end(); return; } int one = 1; @@ -286,6 +381,49 @@ void http_co_t::handle_connect_result() stackout(); } +void http_co_t::submit_send() +{ + stackin(); + int res; +again: + if (sent < request.size()) + { + send_iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent }; + send_msg.msg_iov = &send_iov; + send_msg.msg_iovlen = 1; + res = sendmsg(peer_fd, &send_msg, MSG_NOSIGNAL); + if (res < 0) + { + res = -errno; + } + if (res == -EAGAIN) + { + res = 0; + } + else if (res < 0) + { + stackout(); + end(); + return; + } + sent += res; + if (state == HTTP_CO_SENDING_REQUEST) + { + if (sent >= request.size()) + state = HTTP_CO_REQUEST_SENT; + else + goto again; + } + else if (state == HTTP_CO_WEBSOCKET) + { + request = request.substr(sent); + sent = 0; + goto again; + } + } + stackout(); +} + void http_co_t::submit_read() { stackin(); @@ -321,51 +459,6 @@ void http_co_t::submit_read() stackout(); } -void http_co_t::submit_send() -{ - stackin(); - int res; -again: - if (sent < request.size()) - { - send_iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent }; - send_msg.msg_iov = &send_iov; - send_msg.msg_iovlen = 1; - res = sendmsg(peer_fd, &send_msg, MSG_NOSIGNAL); - if (res < 0) - { - res = -errno; - } - if (res == -EAGAIN) - { - res = 0; - } - else if (res < 0) - { - stackout(); - end(); - return; - } - sent += res; - if (state == HTTP_CO_SENDING_REQUEST) - { - if (sent >= request.size()) - { - state = HTTP_CO_REQUEST_SENT; - } - else - goto again; - } - else if (state == HTTP_CO_WEBSOCKET) - { - request = request.substr(sent); - sent = 0; - goto again; - } - } - stackout(); -} - bool http_co_t::handle_read() { stackin(); @@ -376,6 +469,7 @@ bool http_co_t::handle_read() { if (timeout_id >= 0) { + // Timeout is cleared when headers are received tfd->clear_timer(timeout_id); timeout_id = -1; } @@ -403,20 +497,27 @@ bool http_co_t::handle_read() if (!target_response_size) { // Sorry, unsupported response + close_connection(); + parsed = { .error = "Response has neither Connection: close, nor Transfer-Encoding: chunked nor Content-Length headers" }; + response_callback(&parsed); + response_callback = NULL; stackout(); - end(); return false; } } + else + { + keepalive = false; + } } } if (state == HTTP_CO_HEADERS_RECEIVED && target_response_size > 0 && response.size() >= target_response_size) { - stackout(); - end(); - return false; + fill_parsed_response(); + response_callback(&parsed); + parsed.eof = true; } - if (state == HTTP_CO_CHUNKED && response.size() > 0) + else if (state == HTTP_CO_CHUNKED && response.size() > 0) { int prev = 0, pos = 0; while ((pos = response.find("\r\n", prev)) >= prev) @@ -439,55 +540,49 @@ bool http_co_t::handle_read() { response = response.substr(prev); } - if (parsed.eof) + if (want_streaming) { - stackout(); - end(); - return false; - } - if (want_streaming && parsed.body.size() > 0) - { - if (!ended) - { - // Don't deliver additional events after close() - callback(&parsed); - } + // Streaming response + response_callback(&parsed); parsed.body = ""; } + if (parsed.eof && !want_streaming) + { + // Normal response + response_callback(&parsed); + } } - if (state == HTTP_CO_WEBSOCKET && response.size() > 0) + else if (state == HTTP_CO_WEBSOCKET && response.size() > 0) { while (ws_parse_frame(response, parsed.ws_msg_type, parsed.body)) { - if (!ended) - { - // Don't deliver additional events after close() - callback(&parsed); - } + response_callback(&parsed); parsed.body = ""; } } + if (parsed.eof) + { + response_callback = NULL; + parsed = {}; + if (!keepalive) + { + close_connection(); + } + else + { + state = HTTP_CO_KEEPALIVE; + if (keepalive_queue.size() > 0) + { + auto next = keepalive_queue[0]; + keepalive_queue.erase(keepalive_queue.begin(), keepalive_queue.begin()+1); + next(); + } + } + } stackout(); return true; } -void http_co_t::post_message(int type, const std::string & msg) -{ - stackin(); - if (state == HTTP_CO_WEBSOCKET) - { - request += ws_format_frame(type, msg.size()); - request += msg; - submit_send(); - } - else - { - ws_outbox += ws_format_frame(type, msg.size()); - ws_outbox += msg; - } - stackout(); -} - uint64_t stoull_full(const std::string & str, int base) { if (isspace(str[0])) @@ -503,7 +598,7 @@ uint64_t stoull_full(const std::string & str, int base) return r; } -void parse_http_headers(std::string & res, http_response_t *parsed) +static void parse_http_headers(std::string & res, http_response_t *parsed) { int pos = res.find("\r\n"); pos = pos < 0 ? res.length() : pos+2; @@ -625,136 +720,6 @@ static bool ws_parse_frame(std::string & buf, int & type, std::string & res) return true; } -static bool cidr_match(const in_addr &addr, const in_addr &net, uint8_t bits) -{ - if (bits == 0) - { - // C99 6.5.7 (3): u32 << 32 is undefined behaviour - return true; - } - return !((addr.s_addr ^ net.s_addr) & htonl(0xFFFFFFFFu << (32 - bits))); -} - -static bool cidr6_match(const in6_addr &address, const in6_addr &network, uint8_t bits) -{ - const uint32_t *a = address.s6_addr32; - const uint32_t *n = network.s6_addr32; - int bits_whole, bits_incomplete; - bits_whole = bits >> 5; // number of whole u32 - bits_incomplete = bits & 0x1F; // number of bits in incomplete u32 - if (bits_whole && memcmp(a, n, bits_whole << 2)) - return false; - if (bits_incomplete) - { - uint32_t mask = htonl((0xFFFFFFFFu) << (32 - bits_incomplete)); - if ((a[bits_whole] ^ n[bits_whole]) & mask) - return false; - } - return true; -} - -struct addr_mask_t -{ - sa_family_t family; - in_addr ipv4; - in6_addr ipv6; - uint8_t bits; -}; - -std::vector getifaddr_list(json11::Json mask_cfg, bool include_v6) -{ - std::vector masks; - if (mask_cfg.is_string()) - { - mask_cfg = json11::Json::array{ mask_cfg }; - } - for (auto mask_json: mask_cfg.array_items()) - { - std::string mask = mask_json.string_value(); - unsigned bits = 0; - int p = mask.find('/'); - if (p != std::string::npos) - { - char null_byte = 0; - if (sscanf(mask.c_str()+p+1, "%u%c", &bits, &null_byte) != 1 || bits > 128) - { - throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask); - } - mask = mask.substr(0, p); - } - in_addr ipv4; - in6_addr ipv6; - if (inet_pton(AF_INET, mask.c_str(), &ipv4) == 1) - { - if (bits > 32) - { - throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask); - } - masks.push_back((addr_mask_t){ .family = AF_INET, .ipv4 = ipv4, .bits = (uint8_t)bits }); - } - else if (include_v6 && inet_pton(AF_INET6, mask.c_str(), &ipv6) == 1) - { - masks.push_back((addr_mask_t){ .family = AF_INET6, .ipv6 = ipv6, .bits = (uint8_t)bits }); - } - else - { - throw std::runtime_error((include_v6 ? "Invalid IPv4 address mask: " : "Invalid IP address mask: ") + mask); - } - } - std::vector addresses; - ifaddrs *list, *ifa; - if (getifaddrs(&list) == -1) - { - throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno)); - } - for (ifa = list; ifa != NULL; ifa = ifa->ifa_next) - { - if (!ifa->ifa_addr) - { - continue; - } - int family = ifa->ifa_addr->sa_family; - if ((family == AF_INET || family == AF_INET6 && include_v6) && - (ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING)) - { - void *addr_ptr; - if (family == AF_INET) - { - addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr; - } - else - { - addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; - } - if (masks.size() > 0) - { - int i; - for (i = 0; i < masks.size(); i++) - { - if (masks[i].family == family && (family == AF_INET - ? cidr_match(*(in_addr*)addr_ptr, masks[i].ipv4, masks[i].bits) - : cidr6_match(*(in6_addr*)addr_ptr, masks[i].ipv6, masks[i].bits))) - { - break; - } - } - if (i >= masks.size()) - { - continue; - } - } - char addr[INET6_ADDRSTRLEN]; - if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN)) - { - throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); - } - addresses.push_back(std::string(addr)); - } - } - freeifaddrs(list); - return addresses; -} - std::string strtolower(const std::string & in) { std::string s = in; diff --git a/src/http_client.h b/src/http_client.h index d2996553..f5eb5d19 100644 --- a/src/http_client.h +++ b/src/http_client.h @@ -21,41 +21,34 @@ struct http_options_t { int timeout; bool want_streaming; + bool keepalive; }; struct http_response_t { + std::string error; + bool eof = false; - int error_code = 0; int status_code = 0; std::string status_line; std::map headers; int ws_msg_type = -1; std::string body; + + void parse_json_response(std::string & error, json11::Json & r) const; }; +// Opened websocket or keepalive HTTP connection struct http_co_t; -struct websocket_t -{ - http_co_t *co; - void post_message(int type, const std::string & msg); - void close(); -}; - -void parse_http_headers(std::string & res, http_response_t *parsed); - -std::vector getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = true); +http_co_t* http_init(timerfd_manager_t *tfd); +http_co_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, const std::string & path, + int timeout, std::function on_message); +void http_request(http_co_t *handler, const std::string & host, const std::string & request, + const http_options_t & options, std::function response_callback); +void http_post_message(http_co_t *handler, int type, const std::string & msg); +void http_close(http_co_t *co); +// Utils uint64_t stoull_full(const std::string & str, int base = 10); - std::string strtolower(const std::string & in); - -void http_request(timerfd_manager_t *tfd, const std::string & host, const std::string & request, - const http_options_t & options, std::function callback); - -void http_request_json(timerfd_manager_t *tfd, const std::string & host, const std::string & request, - int timeout, std::function callback); - -websocket_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, const std::string & path, - int timeout, std::function callback); diff --git a/src/osd_cluster.cpp b/src/osd_cluster.cpp index 84cad57e..924ef371 100644 --- a/src/osd_cluster.cpp +++ b/src/osd_cluster.cpp @@ -6,6 +6,7 @@ #include "etcd_state_client.h" #include "http_client.h" #include "osd_rmw.h" +#include "addr_util.h" // Startup sequence: // Start etcd watcher -> Load global OSD configuration -> Bind socket -> Acquire lease -> Report&lock OSD state