Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

769 lines
23 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #include <stdexcept>
  4. #include "cluster_client.h"
  5. cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
  6. {
  7. this->ringloop = ringloop;
  8. this->tfd = tfd;
  9. log_level = config["log_level"].int64_value();
  10. msgr.osd_num = 0;
  11. msgr.tfd = tfd;
  12. msgr.ringloop = ringloop;
  13. msgr.log_level = log_level;
  14. msgr.repeer_pgs = [this](osd_num_t peer_osd)
  15. {
  16. if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end())
  17. {
  18. // peer_osd just connected
  19. continue_ops();
  20. }
  21. else if (unsynced_writes.size())
  22. {
  23. // peer_osd just dropped connection
  24. for (auto op: syncing_writes)
  25. {
  26. for (auto & part: op->parts)
  27. {
  28. if (part.osd_num == peer_osd && part.done)
  29. {
  30. // repeat this operation
  31. part.osd_num = 0;
  32. part.done = false;
  33. assert(!part.sent);
  34. op->done_count--;
  35. }
  36. }
  37. }
  38. for (auto op: unsynced_writes)
  39. {
  40. for (auto & part: op->parts)
  41. {
  42. if (part.osd_num == peer_osd && part.done)
  43. {
  44. // repeat this operation
  45. part.osd_num = 0;
  46. part.done = false;
  47. assert(!part.sent);
  48. op->done_count--;
  49. }
  50. }
  51. if (op->done_count < op->parts.size())
  52. {
  53. cur_ops.insert(op);
  54. }
  55. }
  56. continue_ops();
  57. }
  58. };
  59. msgr.exec_op = [this](osd_op_t *op)
  60. {
  61. // Garbage in
  62. printf("Incoming garbage from peer %d\n", op->peer_fd);
  63. msgr.stop_client(op->peer_fd);
  64. delete op;
  65. };
  66. msgr.use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
  67. config["use_sync_send_recv"].uint64_value();
  68. st_cli.tfd = tfd;
  69. st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
  70. st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); };
  71. st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_hook(changes); };
  72. st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
  73. st_cli.parse_config(config);
  74. st_cli.load_global_config();
  75. // Temporary implementation: discard all bitmaps
  76. // It will be of course replaced by the implementation of snapshots
  77. scrap_bitmap_size = 4096;
  78. scrap_bitmap = malloc_or_die(scrap_bitmap_size);
  79. if (ringloop)
  80. {
  81. consumer.loop = [this]()
  82. {
  83. msgr.read_requests();
  84. msgr.send_replies();
  85. this->ringloop->submit();
  86. };
  87. ringloop->register_consumer(&consumer);
  88. }
  89. }
  90. cluster_client_t::~cluster_client_t()
  91. {
  92. if (ringloop)
  93. {
  94. ringloop->unregister_consumer(&consumer);
  95. }
  96. free(scrap_bitmap);
  97. }
  98. void cluster_client_t::stop()
  99. {
  100. while (msgr.clients.size() > 0)
  101. {
  102. msgr.stop_client(msgr.clients.begin()->first);
  103. }
  104. }
  105. void cluster_client_t::continue_ops(bool up_retry)
  106. {
  107. for (auto op_it = cur_ops.begin(); op_it != cur_ops.end(); )
  108. {
  109. if ((*op_it)->up_wait)
  110. {
  111. if (up_retry)
  112. {
  113. (*op_it)->up_wait = false;
  114. continue_rw(*op_it++);
  115. }
  116. else
  117. op_it++;
  118. }
  119. else
  120. continue_rw(*op_it++);
  121. }
  122. }
  123. static uint32_t is_power_of_two(uint64_t value)
  124. {
  125. uint32_t l = 0;
  126. while (value > 1)
  127. {
  128. if (value & 1)
  129. {
  130. return 64;
  131. }
  132. value = value >> 1;
  133. l++;
  134. }
  135. return l;
  136. }
  137. void cluster_client_t::on_load_config_hook(json11::Json::object & config)
  138. {
  139. bs_block_size = config["block_size"].uint64_value();
  140. bs_bitmap_granularity = config["bitmap_granularity"].uint64_value();
  141. if (!bs_block_size)
  142. {
  143. bs_block_size = DEFAULT_BLOCK_SIZE;
  144. }
  145. if (!bs_bitmap_granularity)
  146. {
  147. bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
  148. }
  149. uint32_t block_order;
  150. if ((block_order = is_power_of_two(bs_block_size)) >= 64 || bs_block_size < MIN_BLOCK_SIZE || bs_block_size >= MAX_BLOCK_SIZE)
  151. {
  152. throw std::runtime_error("Bad block size");
  153. }
  154. if (config["immediate_commit"] == "all")
  155. {
  156. // Cluster-wide immediate_commit mode
  157. immediate_commit = true;
  158. }
  159. else if (config.find("client_dirty_limit") != config.end())
  160. {
  161. client_dirty_limit = config["client_dirty_limit"].uint64_value();
  162. }
  163. if (!client_dirty_limit)
  164. {
  165. client_dirty_limit = DEFAULT_CLIENT_DIRTY_LIMIT;
  166. }
  167. up_wait_retry_interval = config["up_wait_retry_interval"].uint64_value();
  168. if (!up_wait_retry_interval)
  169. {
  170. up_wait_retry_interval = 500;
  171. }
  172. else if (up_wait_retry_interval < 50)
  173. {
  174. up_wait_retry_interval = 50;
  175. }
  176. msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value();
  177. if (!msgr.peer_connect_interval)
  178. {
  179. msgr.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
  180. }
  181. msgr.peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
  182. if (!msgr.peer_connect_timeout)
  183. {
  184. msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
  185. }
  186. st_cli.load_pgs();
  187. }
  188. void cluster_client_t::on_load_pgs_hook(bool success)
  189. {
  190. for (auto pool_item: st_cli.pool_config)
  191. {
  192. pg_counts[pool_item.first] = pool_item.second.real_pg_count;
  193. }
  194. pgs_loaded = true;
  195. for (auto fn: on_ready_hooks)
  196. {
  197. fn();
  198. }
  199. on_ready_hooks.clear();
  200. for (auto op: offline_ops)
  201. {
  202. execute(op);
  203. }
  204. offline_ops.clear();
  205. continue_ops();
  206. }
  207. void cluster_client_t::on_change_hook(json11::Json::object & changes)
  208. {
  209. for (auto pool_item: st_cli.pool_config)
  210. {
  211. if (pg_counts[pool_item.first] != pool_item.second.real_pg_count)
  212. {
  213. // At this point, all pool operations should have been suspended
  214. // And now they have to be resliced!
  215. for (auto op: cur_ops)
  216. {
  217. if (INODE_POOL(op->inode) == pool_item.first)
  218. {
  219. op->needs_reslice = true;
  220. }
  221. }
  222. for (auto op: unsynced_writes)
  223. {
  224. if (INODE_POOL(op->inode) == pool_item.first)
  225. {
  226. op->needs_reslice = true;
  227. }
  228. }
  229. for (auto op: syncing_writes)
  230. {
  231. if (INODE_POOL(op->inode) == pool_item.first)
  232. {
  233. op->needs_reslice = true;
  234. }
  235. }
  236. pg_counts[pool_item.first] = pool_item.second.real_pg_count;
  237. }
  238. }
  239. continue_ops();
  240. }
  241. void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd)
  242. {
  243. if (msgr.wanted_peers.find(peer_osd) != msgr.wanted_peers.end())
  244. {
  245. msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]);
  246. }
  247. }
  248. void cluster_client_t::on_ready(std::function<void(void)> fn)
  249. {
  250. if (pgs_loaded)
  251. {
  252. fn();
  253. }
  254. else
  255. {
  256. on_ready_hooks.push_back(fn);
  257. }
  258. }
  259. /**
  260. * How writes are synced when immediate_commit is false
  261. *
  262. * 1) accept up to <client_dirty_limit> write operations for execution,
  263. * queue all subsequent writes into <next_writes>
  264. * 2) accept exactly one SYNC, queue all subsequent SYNCs into <next_writes>, too
  265. * 3) "continue" all accepted writes
  266. *
  267. * "Continue" WRITE:
  268. * 1) if the operation is not a copy yet - copy it (required for replay)
  269. * 2) if the operation is not sliced yet - slice it
  270. * 3) if the operation doesn't require reslice - try to connect & send all remaining parts
  271. * 4) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout
  272. * 5) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch"
  273. * 6) if PG count changes before all parts are done, wait for all in-progress parts to finish,
  274. * throw all results away, reslice and resubmit op
  275. * 7) when all parts are done, try to "continue" the current SYNC
  276. * 8) if the operation succeeds, but then some OSDs drop their connections, repeat
  277. * parts from the current "unsynced batch" previously sent to those OSDs in any order
  278. *
  279. * "Continue" current SYNC:
  280. * 1) take all unsynced operations from the current batch
  281. * 2) check if all affected OSDs are still alive
  282. * 3) if yes, send all SYNCs. otherwise, leave current SYNC as is.
  283. * 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes
  284. * 5) if any of them fail due to other errors, fail the SYNC operation
  285. */
  286. void cluster_client_t::execute(cluster_op_t *op)
  287. {
  288. if (!pgs_loaded)
  289. {
  290. // We're offline
  291. offline_ops.push_back(op);
  292. return;
  293. }
  294. op->retval = 0;
  295. if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE ||
  296. (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len ||
  297. op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity))
  298. {
  299. op->retval = -EINVAL;
  300. std::function<void(cluster_op_t*)>(op->callback)(op);
  301. return;
  302. }
  303. if (op->opcode == OSD_OP_SYNC)
  304. {
  305. execute_sync(op);
  306. return;
  307. }
  308. if (op->opcode == OSD_OP_WRITE && !immediate_commit)
  309. {
  310. if (next_writes.size() > 0)
  311. {
  312. assert(cur_sync);
  313. next_writes.push_back(op);
  314. return;
  315. }
  316. if (queued_bytes >= client_dirty_limit)
  317. {
  318. // Push an extra SYNC operation to flush previous writes
  319. next_writes.push_back(op);
  320. cluster_op_t *sync_op = new cluster_op_t;
  321. sync_op->is_internal = true;
  322. sync_op->opcode = OSD_OP_SYNC;
  323. sync_op->callback = [](cluster_op_t* sync_op) {};
  324. execute_sync(sync_op);
  325. return;
  326. }
  327. queued_bytes += op->len;
  328. }
  329. cur_ops.insert(op);
  330. continue_rw(op);
  331. }
  332. void cluster_client_t::continue_rw(cluster_op_t *op)
  333. {
  334. pool_id_t pool_id = INODE_POOL(op->inode);
  335. if (!pool_id)
  336. {
  337. op->retval = -EINVAL;
  338. std::function<void(cluster_op_t*)>(op->callback)(op);
  339. return;
  340. }
  341. if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() ||
  342. st_cli.pool_config[pool_id].real_pg_count == 0)
  343. {
  344. // Postpone operations to unknown pools
  345. return;
  346. }
  347. if (op->opcode == OSD_OP_WRITE && !immediate_commit && !op->is_internal)
  348. {
  349. // Save operation for replay when PG goes out of sync
  350. // (primary OSD drops our connection in this case)
  351. cluster_op_t *op_copy = new cluster_op_t();
  352. op_copy->is_internal = true;
  353. op_copy->orig_op = op;
  354. op_copy->opcode = op->opcode;
  355. op_copy->inode = op->inode;
  356. op_copy->offset = op->offset;
  357. op_copy->len = op->len;
  358. op_copy->buf = malloc_or_die(op->len);
  359. op_copy->iov.push_back(op_copy->buf, op->len);
  360. op_copy->callback = [](cluster_op_t* op_copy)
  361. {
  362. if (op_copy->orig_op)
  363. {
  364. // Acknowledge write and forget the original pointer
  365. op_copy->orig_op->retval = op_copy->retval;
  366. std::function<void(cluster_op_t*)>(op_copy->orig_op->callback)(op_copy->orig_op);
  367. op_copy->orig_op = NULL;
  368. }
  369. };
  370. void *cur_buf = op_copy->buf;
  371. for (int i = 0; i < op->iov.count; i++)
  372. {
  373. memcpy(cur_buf, op->iov.buf[i].iov_base, op->iov.buf[i].iov_len);
  374. cur_buf += op->iov.buf[i].iov_len;
  375. }
  376. unsynced_writes.push_back(op_copy);
  377. cur_ops.erase(op);
  378. cur_ops.insert(op_copy);
  379. op = op_copy;
  380. }
  381. if (!op->parts.size())
  382. {
  383. // Slice the operation into parts
  384. slice_rw(op);
  385. }
  386. if (!op->needs_reslice)
  387. {
  388. // Send unsent parts, if they're not subject to change
  389. for (auto & op_part: op->parts)
  390. {
  391. if (!op_part.sent && !op_part.done)
  392. {
  393. try_send(op, &op_part);
  394. }
  395. }
  396. }
  397. if (!op->sent_count)
  398. {
  399. if (op->done_count >= op->parts.size())
  400. {
  401. // Finished successfully
  402. // Even if the PG count has changed in meanwhile we treat it as success
  403. // because if some operations were invalid for the new PG count we'd get errors
  404. cur_ops.erase(op);
  405. op->retval = op->len;
  406. std::function<void(cluster_op_t*)>(op->callback)(op);
  407. continue_sync();
  408. return;
  409. }
  410. else if (op->retval != 0 && op->retval != -EPIPE)
  411. {
  412. // Fatal error (not -EPIPE)
  413. cur_ops.erase(op);
  414. if (!immediate_commit && op->opcode == OSD_OP_WRITE)
  415. {
  416. for (int i = 0; i < unsynced_writes.size(); i++)
  417. {
  418. if (unsynced_writes[i] == op)
  419. {
  420. unsynced_writes.erase(unsynced_writes.begin()+i, unsynced_writes.begin()+i+1);
  421. break;
  422. }
  423. }
  424. }
  425. bool del = op->is_internal;
  426. std::function<void(cluster_op_t*)>(op->callback)(op);
  427. if (del)
  428. {
  429. if (op->buf)
  430. free(op->buf);
  431. delete op;
  432. }
  433. continue_sync();
  434. return;
  435. }
  436. else
  437. {
  438. // -EPIPE or no error - clear the error
  439. op->retval = 0;
  440. if (op->needs_reslice)
  441. {
  442. op->parts.clear();
  443. op->done_count = 0;
  444. op->needs_reslice = false;
  445. continue_rw(op);
  446. }
  447. }
  448. }
  449. }
  450. void cluster_client_t::slice_rw(cluster_op_t *op)
  451. {
  452. // Slice the request into individual object stripe requests
  453. // Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC
  454. auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)];
  455. uint64_t pg_block_size = bs_block_size * (
  456. pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks
  457. );
  458. uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size;
  459. uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
  460. op->retval = 0;
  461. op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1);
  462. int iov_idx = 0;
  463. size_t iov_pos = 0;
  464. int i = 0;
  465. for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
  466. {
  467. pg_num_t pg_num = (op->inode + stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1;
  468. uint64_t begin = (op->offset < stripe ? stripe : op->offset);
  469. uint64_t end = (op->offset + op->len) > (stripe + pg_block_size)
  470. ? (stripe + pg_block_size) : (op->offset + op->len);
  471. op->parts[i] = (cluster_op_part_t){
  472. .parent = op,
  473. .offset = begin,
  474. .len = (uint32_t)(end - begin),
  475. .pg_num = pg_num,
  476. .sent = false,
  477. .done = false,
  478. };
  479. int left = end-begin;
  480. while (left > 0 && iov_idx < op->iov.count)
  481. {
  482. if (op->iov.buf[iov_idx].iov_len - iov_pos < left)
  483. {
  484. op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, op->iov.buf[iov_idx].iov_len - iov_pos);
  485. left -= (op->iov.buf[iov_idx].iov_len - iov_pos);
  486. iov_pos = 0;
  487. iov_idx++;
  488. }
  489. else
  490. {
  491. op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, left);
  492. iov_pos += left;
  493. left = 0;
  494. }
  495. }
  496. assert(left == 0);
  497. i++;
  498. }
  499. }
  500. bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part)
  501. {
  502. auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)];
  503. auto pg_it = pool_cfg.pg_config.find(part->pg_num);
  504. if (pg_it != pool_cfg.pg_config.end() &&
  505. !pg_it->second.pause && pg_it->second.cur_primary)
  506. {
  507. osd_num_t primary_osd = pg_it->second.cur_primary;
  508. auto peer_it = msgr.osd_peer_fds.find(primary_osd);
  509. if (peer_it != msgr.osd_peer_fds.end())
  510. {
  511. int peer_fd = peer_it->second;
  512. part->osd_num = primary_osd;
  513. part->sent = true;
  514. op->sent_count++;
  515. part->op = (osd_op_t){
  516. .op_type = OSD_OP_OUT,
  517. .peer_fd = peer_fd,
  518. .req = { .rw = {
  519. .header = {
  520. .magic = SECONDARY_OSD_OP_MAGIC,
  521. .id = op_id++,
  522. .opcode = op->opcode,
  523. },
  524. .inode = op->inode,
  525. .offset = part->offset,
  526. .len = part->len,
  527. } },
  528. .bitmap = scrap_bitmap,
  529. .bitmap_len = scrap_bitmap_size,
  530. .callback = [this, part](osd_op_t *op_part)
  531. {
  532. handle_op_part(part);
  533. },
  534. };
  535. part->op.iov = part->iov;
  536. msgr.outbox_push(&part->op);
  537. return true;
  538. }
  539. else if (msgr.wanted_peers.find(primary_osd) == msgr.wanted_peers.end())
  540. {
  541. msgr.connect_peer(primary_osd, st_cli.peer_states[primary_osd]);
  542. }
  543. }
  544. return false;
  545. }
  546. void cluster_client_t::execute_sync(cluster_op_t *op)
  547. {
  548. if (immediate_commit)
  549. {
  550. // Syncs are not required in the immediate_commit mode
  551. op->retval = 0;
  552. std::function<void(cluster_op_t*)>(op->callback)(op);
  553. }
  554. else if (cur_sync != NULL)
  555. {
  556. next_writes.push_back(op);
  557. }
  558. else
  559. {
  560. cur_sync = op;
  561. continue_sync();
  562. }
  563. }
  564. void cluster_client_t::continue_sync()
  565. {
  566. if (!cur_sync || cur_sync->parts.size() > 0)
  567. {
  568. // Already submitted
  569. return;
  570. }
  571. cur_sync->retval = 0;
  572. std::set<osd_num_t> sync_osds;
  573. for (auto prev_op: unsynced_writes)
  574. {
  575. if (prev_op->done_count < prev_op->parts.size())
  576. {
  577. // Writes not finished yet
  578. return;
  579. }
  580. for (auto & part: prev_op->parts)
  581. {
  582. if (part.osd_num)
  583. {
  584. sync_osds.insert(part.osd_num);
  585. }
  586. }
  587. }
  588. if (!sync_osds.size())
  589. {
  590. // No dirty writes
  591. finish_sync();
  592. return;
  593. }
  594. // Check that all OSD connections are still alive
  595. for (auto sync_osd: sync_osds)
  596. {
  597. auto peer_it = msgr.osd_peer_fds.find(sync_osd);
  598. if (peer_it == msgr.osd_peer_fds.end())
  599. {
  600. // SYNC is pointless to send to a non connected OSD
  601. return;
  602. }
  603. }
  604. syncing_writes.swap(unsynced_writes);
  605. // Post sync to affected OSDs
  606. cur_sync->parts.resize(sync_osds.size());
  607. int i = 0;
  608. for (auto sync_osd: sync_osds)
  609. {
  610. cur_sync->parts[i] = {
  611. .parent = cur_sync,
  612. .osd_num = sync_osd,
  613. .sent = false,
  614. .done = false,
  615. };
  616. send_sync(cur_sync, &cur_sync->parts[i]);
  617. i++;
  618. }
  619. }
  620. void cluster_client_t::finish_sync()
  621. {
  622. int retval = cur_sync->retval;
  623. if (retval != 0)
  624. {
  625. for (auto op: syncing_writes)
  626. {
  627. if (op->done_count < op->parts.size())
  628. {
  629. cur_ops.insert(op);
  630. }
  631. }
  632. unsynced_writes.insert(unsynced_writes.begin(), syncing_writes.begin(), syncing_writes.end());
  633. syncing_writes.clear();
  634. }
  635. if (retval == -EPIPE)
  636. {
  637. // Retry later
  638. cur_sync->parts.clear();
  639. cur_sync->retval = 0;
  640. cur_sync->sent_count = 0;
  641. cur_sync->done_count = 0;
  642. return;
  643. }
  644. std::function<void(cluster_op_t*)>(cur_sync->callback)(cur_sync);
  645. if (!retval)
  646. {
  647. for (auto op: syncing_writes)
  648. {
  649. assert(op->sent_count == 0);
  650. if (op->is_internal)
  651. {
  652. if (op->buf)
  653. free(op->buf);
  654. delete op;
  655. }
  656. }
  657. syncing_writes.clear();
  658. }
  659. cur_sync = NULL;
  660. queued_bytes = 0;
  661. std::vector<cluster_op_t*> next_wr_copy;
  662. next_wr_copy.swap(next_writes);
  663. for (auto next_op: next_wr_copy)
  664. {
  665. execute(next_op);
  666. }
  667. }
  668. void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part)
  669. {
  670. auto peer_it = msgr.osd_peer_fds.find(part->osd_num);
  671. assert(peer_it != msgr.osd_peer_fds.end());
  672. part->sent = true;
  673. op->sent_count++;
  674. part->op = (osd_op_t){
  675. .op_type = OSD_OP_OUT,
  676. .peer_fd = peer_it->second,
  677. .req = {
  678. .hdr = {
  679. .magic = SECONDARY_OSD_OP_MAGIC,
  680. .id = op_id++,
  681. .opcode = OSD_OP_SYNC,
  682. },
  683. },
  684. .callback = [this, part](osd_op_t *op_part)
  685. {
  686. handle_op_part(part);
  687. },
  688. };
  689. msgr.outbox_push(&part->op);
  690. }
  691. void cluster_client_t::handle_op_part(cluster_op_part_t *part)
  692. {
  693. cluster_op_t *op = part->parent;
  694. part->sent = false;
  695. op->sent_count--;
  696. int expected = part->op.req.hdr.opcode == OSD_OP_SYNC ? 0 : part->op.req.rw.len;
  697. if (part->op.reply.hdr.retval != expected)
  698. {
  699. // Operation failed, retry
  700. printf(
  701. "Operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n",
  702. part->osd_num, part->op.reply.hdr.retval, expected
  703. );
  704. msgr.stop_client(part->op.peer_fd);
  705. if (part->op.reply.hdr.retval == -EPIPE)
  706. {
  707. op->up_wait = true;
  708. if (!retry_timeout_id)
  709. {
  710. retry_timeout_id = tfd->set_timer(up_wait_retry_interval, false, [this](int)
  711. {
  712. retry_timeout_id = 0;
  713. continue_ops(true);
  714. });
  715. }
  716. }
  717. if (!op->retval || op->retval == -EPIPE)
  718. {
  719. // Don't overwrite other errors with -EPIPE
  720. op->retval = part->op.reply.hdr.retval;
  721. }
  722. }
  723. else
  724. {
  725. // OK
  726. part->done = true;
  727. op->done_count++;
  728. }
  729. if (op->sent_count == 0)
  730. {
  731. if (op->opcode == OSD_OP_SYNC)
  732. {
  733. assert(op == cur_sync);
  734. finish_sync();
  735. }
  736. else if (!op->up_wait)
  737. {
  738. continue_rw(op);
  739. }
  740. }
  741. }