Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

694 lines
18 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #include <netinet/tcp.h>
  4. #include <sys/epoll.h>
  5. #include <net/if.h>
  6. #include <arpa/inet.h>
  7. #include <ifaddrs.h>
  8. #include <ctype.h>
  9. #include <unistd.h>
  10. #include <fcntl.h>
  11. #include <string.h>
  12. #include <stdexcept>
  13. #include "json11/json11.hpp"
  14. #include "http_client.h"
  15. #include "timerfd_manager.h"
  16. #define READ_BUFFER_SIZE 9000
  17. static int extract_port(std::string & host);
  18. static std::string strtolower(const std::string & in);
  19. static std::string trim(const std::string & in);
  20. static std::string ws_format_frame(int type, uint64_t size);
  21. static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
  22. // FIXME: Use keepalive
  23. struct http_co_t
  24. {
  25. timerfd_manager_t *tfd;
  26. int request_timeout = 0;
  27. std::string host;
  28. std::string request;
  29. std::string ws_outbox;
  30. std::string response;
  31. bool want_streaming;
  32. http_response_t parsed;
  33. uint64_t target_response_size = 0;
  34. int state = 0;
  35. int peer_fd = -1;
  36. int timeout_id = -1;
  37. int epoll_events = 0;
  38. int sent = 0;
  39. std::vector<char> rbuf;
  40. iovec read_iov, send_iov;
  41. msghdr read_msg = { 0 }, send_msg = { 0 };
  42. std::function<void(const http_response_t*)> callback;
  43. websocket_t ws;
  44. int onstack = 0;
  45. bool ended = false;
  46. ~http_co_t();
  47. inline void stackin() { onstack++; }
  48. inline void stackout() { onstack--; if (!onstack && ended) end(); }
  49. inline void end() { ended = true; if (!onstack) { delete this; } }
  50. void start_connection();
  51. void handle_events();
  52. void handle_connect_result();
  53. void submit_read();
  54. void submit_send();
  55. bool handle_read();
  56. void post_message(int type, const std::string & msg);
  57. };
  58. #define HTTP_CO_CONNECTING 1
  59. #define HTTP_CO_SENDING_REQUEST 2
  60. #define HTTP_CO_REQUEST_SENT 3
  61. #define HTTP_CO_HEADERS_RECEIVED 4
  62. #define HTTP_CO_WEBSOCKET 5
  63. #define HTTP_CO_CHUNKED 6
  64. #define DEFAULT_TIMEOUT 5000
  65. void http_request(timerfd_manager_t *tfd, const std::string & host, const std::string & request,
  66. const http_options_t & options, std::function<void(const http_response_t *response)> callback)
  67. {
  68. http_co_t *handler = new http_co_t();
  69. handler->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout);
  70. handler->want_streaming = options.want_streaming;
  71. handler->tfd = tfd;
  72. handler->host = host;
  73. handler->request = request;
  74. handler->callback = callback;
  75. handler->ws.co = handler;
  76. handler->start_connection();
  77. }
  78. void http_request_json(timerfd_manager_t *tfd, const std::string & host, const std::string & request,
  79. int timeout, std::function<void(std::string, json11::Json r)> callback)
  80. {
  81. http_request(tfd, host, request, { .timeout = timeout }, [callback](const http_response_t* res)
  82. {
  83. if (res->error_code != 0)
  84. {
  85. callback("Error code: "+std::to_string(res->error_code)+" ("+std::string(strerror(res->error_code))+")", json11::Json());
  86. return;
  87. }
  88. if (res->status_code != 200)
  89. {
  90. callback("HTTP "+std::to_string(res->status_code)+" "+res->status_line+" body: "+trim(res->body), json11::Json());
  91. return;
  92. }
  93. std::string json_err;
  94. json11::Json data = json11::Json::parse(res->body, json_err);
  95. if (json_err != "")
  96. {
  97. callback("Bad JSON: "+json_err+" (response: "+trim(res->body)+")", json11::Json());
  98. return;
  99. }
  100. callback(std::string(), data);
  101. });
  102. }
  103. websocket_t* open_websocket(timerfd_manager_t *tfd, const std::string & host, const std::string & path,
  104. int timeout, std::function<void(const http_response_t *msg)> callback)
  105. {
  106. std::string request = "GET "+path+" HTTP/1.1\r\n"
  107. "Host: "+host+"\r\n"
  108. "Upgrade: websocket\r\n"
  109. "Connection: upgrade\r\n"
  110. "Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n"
  111. "Sec-WebSocket-Version: 13\r\n"
  112. "\r\n";
  113. http_co_t *handler = new http_co_t();
  114. handler->request_timeout = timeout < 0 ? -1 : (timeout == 0 ? DEFAULT_TIMEOUT : timeout);
  115. handler->want_streaming = false;
  116. handler->tfd = tfd;
  117. handler->host = host;
  118. handler->request = request;
  119. handler->callback = callback;
  120. handler->ws.co = handler;
  121. handler->start_connection();
  122. return &handler->ws;
  123. }
  124. void websocket_t::post_message(int type, const std::string & msg)
  125. {
  126. co->post_message(type, msg);
  127. }
  128. void websocket_t::close()
  129. {
  130. co->end();
  131. }
  132. http_co_t::~http_co_t()
  133. {
  134. if (timeout_id >= 0)
  135. {
  136. tfd->clear_timer(timeout_id);
  137. timeout_id = -1;
  138. }
  139. if (peer_fd >= 0)
  140. {
  141. tfd->set_fd_handler(peer_fd, false, NULL);
  142. close(peer_fd);
  143. peer_fd = -1;
  144. }
  145. if (parsed.headers["transfer-encoding"] == "chunked")
  146. {
  147. int prev = 0, pos = 0;
  148. while ((pos = response.find("\r\n", prev)) >= prev)
  149. {
  150. uint64_t len = strtoull(response.c_str()+prev, NULL, 16);
  151. parsed.body += response.substr(pos+2, len);
  152. prev = pos+2+len+2;
  153. }
  154. }
  155. else
  156. {
  157. std::swap(parsed.body, response);
  158. }
  159. parsed.eof = true;
  160. callback(&parsed);
  161. }
  162. void http_co_t::start_connection()
  163. {
  164. stackin();
  165. int port = extract_port(host);
  166. struct sockaddr_in addr;
  167. int r;
  168. if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1)
  169. {
  170. parsed.error_code = ENXIO;
  171. stackout();
  172. end();
  173. return;
  174. }
  175. addr.sin_family = AF_INET;
  176. addr.sin_port = htons(port ? port : 80);
  177. peer_fd = socket(AF_INET, SOCK_STREAM, 0);
  178. if (peer_fd < 0)
  179. {
  180. parsed.error_code = errno;
  181. stackout();
  182. end();
  183. return;
  184. }
  185. fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
  186. if (request_timeout > 0)
  187. {
  188. timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id)
  189. {
  190. if (response.length() == 0)
  191. {
  192. parsed.error_code = ETIME;
  193. }
  194. end();
  195. });
  196. }
  197. epoll_events = 0;
  198. // Finally call connect
  199. r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
  200. if (r < 0 && errno != EINPROGRESS)
  201. {
  202. parsed.error_code = errno;
  203. stackout();
  204. end();
  205. return;
  206. }
  207. tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
  208. {
  209. this->epoll_events |= epoll_events;
  210. handle_events();
  211. });
  212. state = HTTP_CO_CONNECTING;
  213. stackout();
  214. }
  215. void http_co_t::handle_events()
  216. {
  217. stackin();
  218. while (epoll_events)
  219. {
  220. if (state == HTTP_CO_CONNECTING)
  221. {
  222. handle_connect_result();
  223. }
  224. else
  225. {
  226. epoll_events &= ~EPOLLOUT;
  227. if (epoll_events & EPOLLIN)
  228. {
  229. submit_read();
  230. }
  231. else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
  232. {
  233. end();
  234. break;
  235. }
  236. }
  237. }
  238. stackout();
  239. }
  240. void http_co_t::handle_connect_result()
  241. {
  242. stackin();
  243. int result = 0;
  244. socklen_t result_len = sizeof(result);
  245. if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
  246. {
  247. result = errno;
  248. }
  249. if (result != 0)
  250. {
  251. parsed.error_code = result;
  252. stackout();
  253. end();
  254. return;
  255. }
  256. int one = 1;
  257. setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
  258. tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
  259. {
  260. this->epoll_events |= epoll_events;
  261. handle_events();
  262. });
  263. state = HTTP_CO_SENDING_REQUEST;
  264. submit_send();
  265. stackout();
  266. }
  267. void http_co_t::submit_read()
  268. {
  269. stackin();
  270. int res;
  271. if (rbuf.size() != READ_BUFFER_SIZE)
  272. {
  273. rbuf.resize(READ_BUFFER_SIZE);
  274. }
  275. read_iov = { .iov_base = rbuf.data(), .iov_len = READ_BUFFER_SIZE };
  276. read_msg.msg_iov = &read_iov;
  277. read_msg.msg_iovlen = 1;
  278. res = recvmsg(peer_fd, &read_msg, 0);
  279. if (res < 0)
  280. {
  281. res = -errno;
  282. }
  283. if (res == -EAGAIN)
  284. {
  285. epoll_events = epoll_events & ~EPOLLIN;
  286. }
  287. else if (res <= 0)
  288. {
  289. // < 0 means error, 0 means EOF
  290. if (!res)
  291. epoll_events = epoll_events & ~EPOLLIN;
  292. end();
  293. }
  294. else
  295. {
  296. response += std::string(rbuf.data(), res);
  297. handle_read();
  298. }
  299. stackout();
  300. }
  301. void http_co_t::submit_send()
  302. {
  303. stackin();
  304. int res;
  305. again:
  306. if (sent < request.size())
  307. {
  308. send_iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent };
  309. send_msg.msg_iov = &send_iov;
  310. send_msg.msg_iovlen = 1;
  311. res = sendmsg(peer_fd, &send_msg, MSG_NOSIGNAL);
  312. if (res < 0)
  313. {
  314. res = -errno;
  315. }
  316. if (res == -EAGAIN)
  317. {
  318. res = 0;
  319. }
  320. else if (res < 0)
  321. {
  322. stackout();
  323. end();
  324. return;
  325. }
  326. sent += res;
  327. if (state == HTTP_CO_SENDING_REQUEST)
  328. {
  329. if (sent >= request.size())
  330. {
  331. state = HTTP_CO_REQUEST_SENT;
  332. }
  333. else
  334. goto again;
  335. }
  336. else if (state == HTTP_CO_WEBSOCKET)
  337. {
  338. request = request.substr(sent);
  339. sent = 0;
  340. goto again;
  341. }
  342. }
  343. stackout();
  344. }
  345. bool http_co_t::handle_read()
  346. {
  347. stackin();
  348. if (state == HTTP_CO_REQUEST_SENT)
  349. {
  350. int pos = response.find("\r\n\r\n");
  351. if (pos >= 0)
  352. {
  353. if (timeout_id >= 0)
  354. {
  355. tfd->clear_timer(timeout_id);
  356. timeout_id = -1;
  357. }
  358. state = HTTP_CO_HEADERS_RECEIVED;
  359. parse_http_headers(response, &parsed);
  360. if (parsed.status_code == 101 &&
  361. parsed.headers.find("sec-websocket-accept") != parsed.headers.end() &&
  362. parsed.headers["upgrade"] == "websocket" &&
  363. parsed.headers["connection"] == "upgrade")
  364. {
  365. // Don't care about validating the key
  366. state = HTTP_CO_WEBSOCKET;
  367. request = ws_outbox;
  368. ws_outbox = "";
  369. sent = 0;
  370. submit_send();
  371. }
  372. else if (parsed.headers["transfer-encoding"] == "chunked")
  373. {
  374. state = HTTP_CO_CHUNKED;
  375. }
  376. else if (parsed.headers["connection"] != "close")
  377. {
  378. target_response_size = stoull_full(parsed.headers["content-length"]);
  379. if (!target_response_size)
  380. {
  381. // Sorry, unsupported response
  382. stackout();
  383. end();
  384. return false;
  385. }
  386. }
  387. }
  388. }
  389. if (state == HTTP_CO_HEADERS_RECEIVED && target_response_size > 0 && response.size() >= target_response_size)
  390. {
  391. stackout();
  392. end();
  393. return false;
  394. }
  395. if (state == HTTP_CO_CHUNKED && response.size() > 0)
  396. {
  397. int prev = 0, pos = 0;
  398. while ((pos = response.find("\r\n", prev)) >= prev)
  399. {
  400. uint64_t len = strtoull(response.c_str()+prev, NULL, 16);
  401. if (!len)
  402. {
  403. // Zero length chunk indicates EOF
  404. parsed.eof = true;
  405. break;
  406. }
  407. if (response.size() < pos+2+len+2)
  408. {
  409. break;
  410. }
  411. parsed.body += response.substr(pos+2, len);
  412. prev = pos+2+len+2;
  413. }
  414. if (prev > 0)
  415. {
  416. response = response.substr(prev);
  417. }
  418. if (parsed.eof)
  419. {
  420. stackout();
  421. end();
  422. return false;
  423. }
  424. if (want_streaming && parsed.body.size() > 0)
  425. {
  426. callback(&parsed);
  427. parsed.body = "";
  428. }
  429. }
  430. if (state == HTTP_CO_WEBSOCKET && response.size() > 0)
  431. {
  432. while (ws_parse_frame(response, parsed.ws_msg_type, parsed.body))
  433. {
  434. callback(&parsed);
  435. parsed.body = "";
  436. }
  437. }
  438. stackout();
  439. return true;
  440. }
  441. void http_co_t::post_message(int type, const std::string & msg)
  442. {
  443. stackin();
  444. if (state == HTTP_CO_WEBSOCKET)
  445. {
  446. request += ws_format_frame(type, msg.size());
  447. request += msg;
  448. submit_send();
  449. }
  450. else
  451. {
  452. ws_outbox += ws_format_frame(type, msg.size());
  453. ws_outbox += msg;
  454. }
  455. stackout();
  456. }
  457. uint64_t stoull_full(const std::string & str, int base)
  458. {
  459. if (isspace(str[0]))
  460. {
  461. return 0;
  462. }
  463. char *end = NULL;
  464. uint64_t r = strtoull(str.c_str(), &end, base);
  465. if (end != str.c_str()+str.length())
  466. {
  467. return 0;
  468. }
  469. return r;
  470. }
  471. void parse_http_headers(std::string & res, http_response_t *parsed)
  472. {
  473. int pos = res.find("\r\n");
  474. pos = pos < 0 ? res.length() : pos+2;
  475. std::string status_line = res.substr(0, pos);
  476. int http_version;
  477. char *status_text = NULL;
  478. sscanf(status_line.c_str(), "HTTP/1.%d %d %ms", &http_version, &parsed->status_code, &status_text);
  479. if (status_text)
  480. {
  481. parsed->status_line = status_text;
  482. // %ms = allocate a buffer
  483. free(status_text);
  484. status_text = NULL;
  485. }
  486. int prev = pos;
  487. while ((pos = res.find("\r\n", prev)) >= prev)
  488. {
  489. if (pos == prev)
  490. {
  491. res = res.substr(pos+2);
  492. break;
  493. }
  494. std::string header = res.substr(prev, pos-prev);
  495. int p2 = header.find(":");
  496. if (p2 >= 0)
  497. {
  498. std::string key = strtolower(header.substr(0, p2));
  499. int p3 = p2+1;
  500. while (p3 < header.length() && isblank(header[p3]))
  501. p3++;
  502. parsed->headers[key] = key == "connection" || key == "upgrade" || key == "transfer-encoding"
  503. ? strtolower(header.substr(p3)) : header.substr(p3);
  504. }
  505. prev = pos+2;
  506. }
  507. }
  508. static std::string ws_format_frame(int type, uint64_t size)
  509. {
  510. // Always zero mask
  511. std::string res;
  512. int p = 0;
  513. res.resize(2 + (size >= 126 ? 2 : 0) + (size >= 65536 ? 6 : 0) + /*mask*/4);
  514. res[p++] = 0x80 | type;
  515. if (size < 126)
  516. res[p++] = size | /*mask*/0x80;
  517. else if (size < 65536)
  518. {
  519. res[p++] = 126 | /*mask*/0x80;
  520. res[p++] = (size >> 8) & 0xFF;
  521. res[p++] = (size >> 0) & 0xFF;
  522. }
  523. else
  524. {
  525. res[p++] = 127 | /*mask*/0x80;
  526. res[p++] = (size >> 56) & 0xFF;
  527. res[p++] = (size >> 48) & 0xFF;
  528. res[p++] = (size >> 40) & 0xFF;
  529. res[p++] = (size >> 32) & 0xFF;
  530. res[p++] = (size >> 24) & 0xFF;
  531. res[p++] = (size >> 16) & 0xFF;
  532. res[p++] = (size >> 8) & 0xFF;
  533. res[p++] = (size >> 0) & 0xFF;
  534. }
  535. res[p++] = 0;
  536. res[p++] = 0;
  537. res[p++] = 0;
  538. res[p++] = 0;
  539. return res;
  540. }
  541. static bool ws_parse_frame(std::string & buf, int & type, std::string & res)
  542. {
  543. uint64_t hdr = 2;
  544. if (buf.size() < hdr)
  545. {
  546. return false;
  547. }
  548. type = buf[0] & ~0x80;
  549. bool mask = !!(buf[1] & 0x80);
  550. hdr += mask ? 4 : 0;
  551. uint64_t len = ((uint8_t)buf[1] & ~0x80);
  552. if (len == 126)
  553. {
  554. hdr += 2;
  555. if (buf.size() < hdr)
  556. {
  557. return false;
  558. }
  559. len = ((uint64_t)(uint8_t)buf[2] << 8) | ((uint64_t)(uint8_t)buf[3] << 0);
  560. }
  561. else if (len == 127)
  562. {
  563. hdr += 8;
  564. if (buf.size() < hdr)
  565. {
  566. return false;
  567. }
  568. len = ((uint64_t)(uint8_t)buf[2] << 56) |
  569. ((uint64_t)(uint8_t)buf[3] << 48) |
  570. ((uint64_t)(uint8_t)buf[4] << 40) |
  571. ((uint64_t)(uint8_t)buf[5] << 32) |
  572. ((uint64_t)(uint8_t)buf[6] << 24) |
  573. ((uint64_t)(uint8_t)buf[7] << 16) |
  574. ((uint64_t)(uint8_t)buf[8] << 8) |
  575. ((uint64_t)(uint8_t)buf[9] << 0);
  576. }
  577. if (buf.size() < hdr+len)
  578. {
  579. return false;
  580. }
  581. if (mask)
  582. {
  583. for (int i = 0; i < len; i++)
  584. buf[hdr+i] ^= buf[hdr-4+(i & 3)];
  585. }
  586. res += buf.substr(hdr, len);
  587. buf = buf.substr(hdr+len);
  588. return true;
  589. }
  590. std::vector<std::string> getifaddr_list(bool include_v6)
  591. {
  592. std::vector<std::string> addresses;
  593. ifaddrs *list, *ifa;
  594. if (getifaddrs(&list) == -1)
  595. {
  596. throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno));
  597. }
  598. for (ifa = list; ifa != NULL; ifa = ifa->ifa_next)
  599. {
  600. if (!ifa->ifa_addr)
  601. {
  602. continue;
  603. }
  604. int family = ifa->ifa_addr->sa_family;
  605. if ((family == AF_INET || family == AF_INET6 && include_v6) &&
  606. (ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING))
  607. {
  608. void *addr_ptr;
  609. if (family == AF_INET)
  610. addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr;
  611. else
  612. addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr;
  613. char addr[INET6_ADDRSTRLEN];
  614. if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN))
  615. {
  616. throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
  617. }
  618. addresses.push_back(std::string(addr));
  619. }
  620. }
  621. freeifaddrs(list);
  622. return addresses;
  623. }
  624. static int extract_port(std::string & host)
  625. {
  626. int port = 0;
  627. int pos = 0;
  628. if ((pos = host.find(':')) >= 0)
  629. {
  630. port = strtoull(host.c_str() + pos + 1, NULL, 10);
  631. if (port >= 0x10000)
  632. {
  633. port = 0;
  634. }
  635. host = host.substr(0, pos);
  636. }
  637. return port;
  638. }
  639. static std::string strtolower(const std::string & in)
  640. {
  641. std::string s = in;
  642. for (int i = 0; i < s.length(); i++)
  643. {
  644. s[i] = tolower(s[i]);
  645. }
  646. return s;
  647. }
  648. static std::string trim(const std::string & in)
  649. {
  650. int begin = in.find_first_not_of(" \n\r\t");
  651. if (begin == -1)
  652. return "";
  653. int end = in.find_last_not_of(" \n\r\t");
  654. return in.substr(begin, end+1-begin);
  655. }