Implement very simple HTTP streaming to handle etcd watches

trace-sqes
Vitaliy Filippov 2020-04-25 01:29:31 +03:00
parent 895a80dfc4
commit 35481925b1
3 changed files with 79 additions and 39 deletions

4
osd.h
View File

@ -189,6 +189,8 @@ struct osd_wanted_peer_t
int address_index; int address_index;
}; };
struct http_response_t;
class osd_t class osd_t
{ {
friend struct http_co_t; friend struct http_co_t;
@ -295,7 +297,7 @@ class osd_t
void send_replies(); void send_replies();
void handle_send(ring_data_t *data, int peer_fd); void handle_send(ring_data_t *data, int peer_fd);
void outbox_push(osd_client_t & cl, osd_op_t *op); void outbox_push(osd_client_t & cl, osd_op_t *op);
void http_request(std::string host, std::string request, std::function<void(int, std::string)> callback); void http_request(std::string host, std::string request, bool streaming, std::function<void(const http_response_t *response)> callback);
void http_request_json(std::string host, std::string request, std::function<void(std::string, json11::Json data)> callback); void http_request_json(std::string host, std::string request, std::function<void(std::string, json11::Json data)> callback);
// peer handling (primary OSD logic) // peer handling (primary OSD logic)

View File

@ -6,8 +6,8 @@
#include <ctype.h> #include <ctype.h>
#include "osd_http.h"
#include "osd.h" #include "osd.h"
#include "osd_http.h"
static int extract_port(std::string & host) static int extract_port(std::string & host)
{ {
@ -67,28 +67,32 @@ struct http_co_t
std::string request; std::string request;
std::string response; std::string response;
std::vector<char> rbuf; std::vector<char> rbuf;
bool streaming;
bool headers_received = false;
http_response_t parsed;
int st = 0; int st = 0;
int peer_fd = -1; int peer_fd = -1;
int timeout_id = -1; int timeout_id = -1;
int epoll_events = 0; int epoll_events = 0;
int code = 0; int sent = 0;
int sent = 0, received = 0;
iovec iov; iovec iov;
msghdr msg = { 0 }; msghdr msg = { 0 };
int cqe_res = 0; int cqe_res = 0;
std::function<void(int, std::string)> callback; std::function<void(const http_response_t*)> callback;
std::function<void(int, int)> epoll_handler; std::function<void(int, int)> epoll_handler;
~http_co_t(); ~http_co_t();
void resume(); void resume();
}; };
void osd_t::http_request(std::string host, std::string request, std::function<void(int, std::string)> callback) void osd_t::http_request(std::string host, std::string request, bool streaming, std::function<void(const http_response_t *response)> callback)
{ {
http_co_t *handler = new http_co_t(); http_co_t *handler = new http_co_t();
handler->osd = this; handler->osd = this;
handler->streaming = streaming;
handler->host = host; handler->host = host;
handler->request = request; handler->request = request;
handler->callback = callback; handler->callback = callback;
@ -103,14 +107,13 @@ void osd_t::http_request(std::string host, std::string request, std::function<vo
void osd_t::http_request_json(std::string host, std::string request, void osd_t::http_request_json(std::string host, std::string request,
std::function<void(std::string, json11::Json r)> callback) std::function<void(std::string, json11::Json r)> callback)
{ {
http_request(host, request, [this, callback](int err, std::string txt) http_request(host, request, false, [this, callback](const http_response_t* res)
{ {
if (err != 0) if (res->error_code != 0)
{ {
callback("Error code: "+std::to_string(err)+" ("+std::string(strerror(err))+")", json11::Json()); callback("Error code: "+std::to_string(res->error_code)+" ("+std::string(strerror(res->error_code))+")", json11::Json());
return; return;
} }
std::unique_ptr<http_response_t> res(parse_http_response(txt));
if (res->status_code != 200) if (res->status_code != 200)
{ {
callback("HTTP "+std::to_string(res->status_code)+" "+res->status_line+" body: "+res->body, json11::Json()); callback("HTTP "+std::to_string(res->status_code)+" "+res->status_line+" body: "+res->body, json11::Json());
@ -120,16 +123,15 @@ void osd_t::http_request_json(std::string host, std::string request,
json11::Json data = json11::Json::parse(res->body, json_err); json11::Json data = json11::Json::parse(res->body, json_err);
if (json_err != "") if (json_err != "")
{ {
callback("Bad JSON: "+json_err+" (response: "+(res->body == "" ? txt : res->body)+")", json11::Json()); callback("Bad JSON: "+json_err+" (response: "+res->body+")", json11::Json());
return; return;
} }
callback(std::string(), data); callback(std::string(), data);
}); });
} }
http_response_t *parse_http_response(std::string res) void parse_headers(std::string & res, http_response_t *parsed)
{ {
http_response_t *parsed = new http_response_t();
int pos = res.find("\r\n"); int pos = res.find("\r\n");
pos = pos < 0 ? res.length() : pos+2; pos = pos < 0 ? res.length() : pos+2;
std::string status_line = res.substr(0, pos); std::string status_line = res.substr(0, pos);
@ -139,6 +141,7 @@ http_response_t *parse_http_response(std::string res)
if (status_text) if (status_text)
{ {
parsed->status_line = status_text; parsed->status_line = status_text;
// %ms = allocate a buffer
free(status_text); free(status_text);
status_text = NULL; status_text = NULL;
} }
@ -147,20 +150,7 @@ http_response_t *parse_http_response(std::string res)
{ {
if (pos == prev) if (pos == prev)
{ {
if (parsed->headers["transfer-encoding"] == "chunked") res = res.substr(pos+2);
{
prev = pos+2;
while ((pos = res.find("\r\n", prev)) >= prev)
{
uint64_t len = strtoull(res.c_str()+prev, NULL, 16);
parsed->body += res.substr(pos+2, len);
prev = pos+2+len+2;
}
}
else
{
parsed->body = res.substr(pos+2);
}
break; break;
} }
std::string header = res.substr(prev, pos-prev); std::string header = res.substr(prev, pos-prev);
@ -177,7 +167,6 @@ http_response_t *parse_http_response(std::string res)
} }
prev = pos+2; prev = pos+2;
} }
return parsed;
} }
http_co_t::~http_co_t() http_co_t::~http_co_t()
@ -187,7 +176,22 @@ http_co_t::~http_co_t()
osd->tfd->clear_timer(timeout_id); osd->tfd->clear_timer(timeout_id);
timeout_id = -1; timeout_id = -1;
} }
callback(code, response); if (parsed.headers["transfer-encoding"] == "chunked")
{
int prev = 0, pos = 0;
while ((pos = response.find("\r\n", prev)) >= prev)
{
uint64_t len = strtoull(response.c_str()+prev, NULL, 16);
parsed.body += response.substr(pos+2, len);
prev = pos+2+len+2;
}
}
else
{
std::swap(parsed.body, response);
}
parsed.eof = true;
callback(&parsed);
if (peer_fd >= 0) if (peer_fd >= 0)
{ {
osd->epoll_handlers.erase(peer_fd); osd->epoll_handlers.erase(peer_fd);
@ -206,7 +210,7 @@ void http_co_t::resume()
int r; int r;
if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1) if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1)
{ {
code = ENXIO; parsed.error_code = ENXIO;
delete this; delete this;
return; return;
} }
@ -215,7 +219,7 @@ void http_co_t::resume()
peer_fd = socket(AF_INET, SOCK_STREAM, 0); peer_fd = socket(AF_INET, SOCK_STREAM, 0);
if (peer_fd < 0) if (peer_fd < 0)
{ {
code = errno; parsed.error_code = errno;
delete this; delete this;
return; return;
} }
@ -226,7 +230,7 @@ void http_co_t::resume()
{ {
if (response.length() == 0) if (response.length() == 0)
{ {
code = EIO; parsed.error_code = EIO;
} }
delete this; delete this;
}); });
@ -234,7 +238,7 @@ void http_co_t::resume()
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS) if (r < 0 && errno != EINPROGRESS)
{ {
code = errno; parsed.error_code = errno;
delete this; delete this;
return; return;
} }
@ -245,7 +249,7 @@ void http_co_t::resume()
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
{ {
code = errno; parsed.error_code = errno;
delete this; delete this;
return; return;
} }
@ -265,7 +269,7 @@ void http_co_t::resume()
} }
if (result != 0) if (result != 0)
{ {
code = result; parsed.error_code = result;
delete this; delete this;
return; return;
} }
@ -277,7 +281,7 @@ void http_co_t::resume()
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) if (epoll_ctl(osd->epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0)
{ {
code = errno; parsed.error_code = errno;
delete this; delete this;
return; return;
} }
@ -377,7 +381,39 @@ void http_co_t::resume()
return; return;
} }
response += std::string(rbuf.data(), cqe_res); response += std::string(rbuf.data(), cqe_res);
received += cqe_res; if (!headers_received)
{
int pos = response.find("\r\n\r\n");
if (pos >= 0)
{
headers_received = true;
parse_headers(response, &parsed);
streaming = streaming && parsed.headers["transfer-encoding"] == "chunked";
}
}
if (streaming && headers_received && response.size() > 0)
{
int prev = 0, pos = 0;
while ((pos = response.find("\r\n", prev)) >= prev)
{
uint64_t len = strtoull(response.c_str()+prev, NULL, 16);
if (response.size() < pos+2+len+2)
{
break;
}
parsed.body += response.substr(pos+2, len);
prev = pos+2+len+2;
}
if (prev > 0)
{
response = response.substr(prev);
}
if (parsed.body.size() > 0)
{
callback(&parsed);
parsed.body = "";
}
}
st = 5; st = 5;
resume(); resume();
return; return;

View File

@ -6,12 +6,14 @@
struct http_response_t struct http_response_t
{ {
int status_code; bool eof = false;
int error_code = 0;
int status_code = 0;
std::string status_line; std::string status_line;
std::map<std::string, std::string> headers; std::map<std::string, std::string> headers;
std::string body; std::string body;
}; };
http_response_t *parse_http_response(std::string res); void parse_headers(std::string & res, http_response_t *parsed);
std::vector<std::string> getifaddr_list(bool include_v6 = false); std::vector<std::string> getifaddr_list(bool include_v6 = false);
uint64_t stoull_full(const std::string & str, int base = 10); uint64_t stoull_full(const std::string & str, int base = 10);