Merge tag 'v0.6.3'

# Conflicts:
#	src/CMakeLists.txt
pull/5/head
Alexey Shabalin 2021-05-06 00:24:51 +03:00
commit 5edb0b8607
46 changed files with 1742 additions and 447 deletions

View File

@ -12,4 +12,6 @@ else()
project(vitastor VERSION "${VERSION_STRING}")
endif()
set(VERSION "0.6.3")
add_subdirectory(src)

View File

@ -49,6 +49,7 @@ Vitastor на данный момент находится в статусе п
- Именование инодов через хранение их метаданных в etcd
- Снапшоты и copy-on-write клоны
- Сглаживание производительности случайной записи в SSD+HDD конфигурациях
- Поддержка RDMA/RoCEv2 через libibverbs
## Планы развития
@ -60,7 +61,7 @@ Vitastor на данный момент находится в статусе п
- Фоновая проверка целостности без контрольных сумм (сверка реплик)
- Контрольные суммы
- Поддержка SSD-кэширования (tiered storage)
- Поддержка RDMA и NVDIMM
- Поддержка NVDIMM
- Web-интерфейс
- Возможно, сжатие
- Возможно, поддержка кэширования данных через системный page cache
@ -314,14 +315,15 @@ Ceph:
### NBD
NBD - на данный момент единственный способ монтировать Vitastor ядром Linux, но он
приводит к дополнительным копированиям данных, поэтому немного ухудшает производительность,
правда, в основном - линейную, а случайная затрагивается слабо.
NBD расшифровывается как "сетевое блочное устройство", но на самом деле оно также
работает просто как аналог FUSE для блочных устройств, то есть, представляет собой
"блочное устройство в пространстве пользователя".
NBD - на данный момент единственный способ монтировать Vitastor ядром Linux.
NBD немного снижает производительность, так как приводит к дополнительным копированиям
данных между ядром и пространством пользователя. Тем не менее, способ достаточно оптимален,
а производительность случайного доступа вообще затрагивается слабо.
Vitastor с однопоточной NBD прокси на том же стенде:
- T1Q1 запись: 6000 iops (задержка 0.166ms)
- T1Q1 чтение: 5518 iops (задержка 0.18ms)
@ -424,23 +426,90 @@ Vitastor с однопоточной NBD прокси на том же стен
- Запустите все OSD: `systemctl start vitastor.target`
- Ваш кластер должен быть готов - один из мониторов должен уже сконфигурировать PG, а OSD должны запустить их.
- Вы можете проверить состояние PG прямо в etcd: `etcdctl --endpoints=... get --prefix /vitastor/pg/state`. Все PG должны быть 'active'.
- Пример команды для запуска тестов: `fio -thread -ioengine=libfio_vitastor.so -name=test -bs=4M -direct=1 -iodepth=16 -rw=write -etcd=10.115.0.10:2379/v3 -pool=1 -inode=1 -size=400G`.
- Пример команды для заливки образа ВМ в vitastor через qemu-img:
```
qemu-img convert -f qcow2 debian10.qcow2 -p -O raw 'vitastor:etcd_host=10.115.0.10\:2379/v3:pool=1:inode=1:size=2147483648'
```
Если вы используете немодифицированный QEMU, данной команде потребуется переменная окружения `LD_PRELOAD=/usr/lib/x86_64-linux-gnu/qemu/block-vitastor.so`.
- Пример команды запуска QEMU:
```
qemu-system-x86_64 -enable-kvm -m 1024
-drive 'file=vitastor:etcd_host=10.115.0.10\:2379/v3:pool=1:inode=1:size=2147483648',format=raw,if=none,id=drive-virtio-disk0,cache=none
-device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512
-vnc 0.0.0.0:0
```
- Пример команды удаления образа (инода) из Vitastor:
```
vitastor-rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32
```
### Задать имя образу
```
etcdctl --endpoints=<etcd> put /vitastor/config/inode/<pool>/<inode> '{"name":"<name>","size":<size>[,"parent_id":<parent_inode_number>][,"readonly":true]}'
```
Например:
```
etcdctl --endpoints=http://10.115.0.10:2379/v3 put /vitastor/config/inode/1/1 '{"name":"testimg","size":2147483648}'
```
Если вы зададите parent_id, то образ станет CoW-клоном, т.е. все новые запросы записи пойдут в новый инод, а запросы
чтения будут проверять сначала его, а потом родительские слои по цепочке вверх. Чтобы случайно не перезаписать данные
в родительском слое, вы можете переключить его в режим "только чтение", добавив флаг `"readonly":true` в его запись
метаданных. В таком случае родительский образ становится просто снапшотом.
Таким образом, для создания снапшота вам нужно просто переименовать предыдущий inode (например, из testimg в testimg@0),
сделать его readonly и создать новый слой с исходным именем образа (testimg), ссылающийся на только что переименованный
в качестве родительского.
### Запуск тестов с fio
Пример команды для запуска тестов:
```
fio -thread -ioengine=libfio_vitastor.so -name=test -bs=4M -direct=1 -iodepth=16 -rw=write -etcd=10.115.0.10:2379/v3 -image=testimg
```
Если вы не хотите обращаться к образу по имени, вместо `-image=testimg` можно указать номер пула, номер инода и размер:
`-pool=1 -inode=1 -size=400G`.
### Загрузить образ диска ВМ в/из Vitastor
Используйте qemu-img и строку `vitastor:etcd_host=<HOST>:image=<IMAGE>` в качестве имени файла диска. Например:
```
qemu-img convert -f qcow2 debian10.qcow2 -p -O raw 'vitastor:etcd_host=10.115.0.10\:2379/v3:image=testimg'
```
Обратите внимание, что если вы используете немодифицированный QEMU, потребуется установить переменную окружения
`LD_PRELOAD=/usr/lib/x86_64-linux-gnu/qemu/block-vitastor.so`.
Если вы не хотите обращаться к образу по имени, вместо `:image=<IMAGE>` можно указать номер пула, номер инода и размер:
`:pool=<POOL>:inode=<INODE>:size=<SIZE>`.
### Запустить ВМ
Для запуска QEMU используйте опцию `-drive file=vitastor:etcd_host=<HOST>:image=<IMAGE>` (аналогично qemu-img)
и физический размер блока 4 KB.
Например:
```
qemu-system-x86_64 -enable-kvm -m 1024
-drive 'file=vitastor:etcd_host=10.115.0.10\:2379/v3:image=testimg',format=raw,if=none,id=drive-virtio-disk0,cache=none
-device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512
-vnc 0.0.0.0:0
```
Обращение по номерам (`:pool=<POOL>:inode=<INODE>:size=<SIZE>` вместо `:image=<IMAGE>`) работает аналогично qemu-img.
### Удалить образ
Используйте утилиту vitastor-rm. Например:
```
vitastor-rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32
```
### NBD
Чтобы создать локальное блочное устройство, используйте NBD. Например:
```
vitastor-nbd map --etcd_address 10.115.0.10:2379/v3 --image testimg
```
Команда напечатает название устройства вида /dev/nbd0, которое потом можно будет форматировать
и использовать как обычное блочное устройство.
Для обращения по номеру инода, аналогично другим командам, можно использовать опции
`--pool <POOL> --inode <INODE> --size <SIZE>` вместо `--image testimg`.
## Известные проблемы

101
README.md
View File

@ -43,6 +43,7 @@ breaking changes in the future. However, the following is implemented:
- Inode metadata storage in etcd
- Snapshots and copy-on-write image clones
- Write throttling to smooth random write workloads in SSD+HDD configurations
- RDMA/RoCEv2 support via libibverbs
## Roadmap
@ -54,7 +55,7 @@ breaking changes in the future. However, the following is implemented:
- Scrubbing without checksums (verification of replicas)
- Checksums
- Tiered storage
- RDMA and NVDIMM support
- NVDIMM support
- Web GUI
- Compression (possibly)
- Read caching using system page cache (possibly)
@ -379,24 +380,86 @@ and calculate disk offsets almost by hand. This will be fixed in near future.
For jerasure pools the configuration should look like the following: `2:{"name":"ecpool","scheme":"jerasure","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}`.
- At this point, one of the monitors will configure PGs and OSDs will start them.
- You can check PG states with `etcdctl --endpoints=... get --prefix /vitastor/pg/state`. All PGs should become 'active'.
- Run tests with (for example): `fio -thread -ioengine=libfio_vitastor.so -name=test -bs=4M -direct=1 -iodepth=16 -rw=write -etcd=10.115.0.10:2379/v3 -pool=1 -inode=1 -size=400G`.
- Upload VM disk image with qemu-img (for example):
```
qemu-img convert -f qcow2 debian10.qcow2 -p -O raw 'vitastor:etcd_host=10.115.0.10\:2379/v3:pool=1:inode=1:size=2147483648'
```
Note that the command requires to be run with `LD_PRELOAD=/usr/lib/x86_64-linux-gnu/qemu/block-vitastor.so qemu-img ...`
if you use unmodified QEMU.
- Run QEMU with (for example):
```
qemu-system-x86_64 -enable-kvm -m 1024
-drive 'file=vitastor:etcd_host=10.115.0.10\:2379/v3:pool=1:inode=1:size=2147483648',format=raw,if=none,id=drive-virtio-disk0,cache=none
-device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512
-vnc 0.0.0.0:0
```
- Remove inode with (for example):
```
vitastor-rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32
```
### Name an image
```
etcdctl --endpoints=<etcd> put /vitastor/config/inode/<pool>/<inode> '{"name":"<name>","size":<size>[,"parent_id":<parent_inode_number>][,"readonly":true]}'
```
For example:
```
etcdctl --endpoints=http://10.115.0.10:2379/v3 put /vitastor/config/inode/1/1 '{"name":"testimg","size":2147483648}'
```
If you specify parent_id the image becomes a CoW clone. I.e. all writes go to the new inode and reads first check it
and then upper layers. You can then make parent readonly by updating its entry with `"readonly":true` for safety and
basically treat it as a snapshot.
So to create a snapshot you basically rename the previous upper layer (for example from testimg to testimg@0), make it readonly
and create a new top layer with the original name (testimg) and the previous one as a parent.
### Run fio benchmarks
fio command example:
```
fio -thread -ioengine=libfio_vitastor.so -name=test -bs=4M -direct=1 -iodepth=16 -rw=write -etcd=10.115.0.10:2379/v3 -image=testimg
```
If you don't want to access your image by name, you can specify pool number, inode number and size
(`-pool=1 -inode=1 -size=400G`) instead of the image name (`-image=testimg`).
### Upload VM image
Use qemu-img and `vitastor:etcd_host=<HOST>:image=<IMAGE>` disk filename. For example:
```
qemu-img convert -f qcow2 debian10.qcow2 -p -O raw 'vitastor:etcd_host=10.115.0.10\:2379/v3:image=testimg'
```
Note that the command requires to be run with `LD_PRELOAD=/usr/lib/x86_64-linux-gnu/qemu/block-vitastor.so qemu-img ...`
if you use unmodified QEMU.
You can also specify `:pool=<POOL>:inode=<INODE>:size=<SIZE>` instead of `:image=<IMAGE>`
if you don't want to use inode metadata.
### Start a VM
Run QEMU with `-drive file=vitastor:etcd_host=<HOST>:image=<IMAGE>` and use 4 KB physical block size.
For example:
```
qemu-system-x86_64 -enable-kvm -m 1024
-drive 'file=vitastor:etcd_host=10.115.0.10\:2379/v3:image=testimg',format=raw,if=none,id=drive-virtio-disk0,cache=none
-device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512
-vnc 0.0.0.0:0
```
You can also specify `:pool=<POOL>:inode=<INODE>:size=<SIZE>` instead of `:image=<IMAGE>`,
just like in qemu-img.
### Remove inode
Use vitastor-rm. For example:
```
vitastor-rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32
```
### NBD
To create a local block device for a Vitastor image, use NBD. For example:
```
vitastor-nbd map --etcd_address 10.115.0.10:2379/v3 --image testimg
```
It will output the device name, like /dev/nbd0 which you can then format and mount as a normal block device.
Again, you can use `--pool <POOL> --inode <INODE> --size <SIZE>` insteaf of `--image <IMAGE>` if you want.
## Known Problems

14
debian/changelog vendored
View File

@ -1,8 +1,18 @@
vitastor (0.6.2-1) unstable; urgency=medium
vitastor (0.6.3-1) unstable; urgency=medium
* RDMA support
* Bugfixes
-- Vitaliy Filippov <vitalif@yourcmc.ru> Tue, 02 Feb 2021 23:01:24 +0300
-- Vitaliy Filippov <vitalif@yourcmc.ru> Sat, 01 May 2021 18:46:10 +0300
vitastor (0.6.0-1) unstable; urgency=medium
* Snapshots and Copy-on-Write clones
* Image metadata in etcd (name, size)
* Image I/O and space statistics in etcd
* Write throttling for smoothing random write workloads in SSD+HDD configurations
-- Vitaliy Filippov <vitalif@yourcmc.ru> Sun, 11 Apr 2021 00:49:18 +0300
vitastor (0.5.1-1) unstable; urgency=medium

2
debian/control vendored
View File

@ -2,7 +2,7 @@ Source: vitastor
Section: admin
Priority: optional
Maintainer: Vitaliy Filippov <vitalif@yourcmc.ru>
Build-Depends: debhelper, liburing-dev (>= 0.6), g++ (>= 8), libstdc++6 (>= 8), linux-libc-dev, libgoogle-perftools-dev, libjerasure-dev, libgf-complete-dev
Build-Depends: debhelper, liburing-dev (>= 0.6), g++ (>= 8), libstdc++6 (>= 8), linux-libc-dev, libgoogle-perftools-dev, libjerasure-dev, libgf-complete-dev, libibverbs-dev
Standards-Version: 4.5.0
Homepage: https://vitastor.io/
Rules-Requires-Root: no

View File

@ -22,7 +22,7 @@ RUN apt-get -y build-dep qemu
RUN apt-get -y build-dep fio
RUN apt-get --download-only source qemu
RUN apt-get --download-only source fio
RUN apt-get -y install libjerasure-dev cmake
RUN apt-get update && apt-get -y install libjerasure-dev cmake libibverbs-dev
ADD . /root/vitastor
RUN set -e -x; \
@ -40,10 +40,10 @@ RUN set -e -x; \
mkdir -p /root/packages/vitastor-$REL; \
rm -rf /root/packages/vitastor-$REL/*; \
cd /root/packages/vitastor-$REL; \
cp -r /root/vitastor vitastor-0.6.2; \
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.6.2/qemu; \
ln -s /root/fio-build/fio-*/ vitastor-0.6.2/fio; \
cd vitastor-0.6.2; \
cp -r /root/vitastor vitastor-0.6.3; \
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.6.3/qemu; \
ln -s /root/fio-build/fio-*/ vitastor-0.6.3/fio; \
cd vitastor-0.6.3; \
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
QEMU=$(head -n1 qemu/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
sh copy-qemu-includes.sh; \
@ -59,8 +59,8 @@ RUN set -e -x; \
echo "dep:fio=$FIO" > debian/substvars; \
echo "dep:qemu=$QEMU" >> debian/substvars; \
cd /root/packages/vitastor-$REL; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.2.orig.tar.xz vitastor-0.6.2; \
cd vitastor-0.6.2; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.3.orig.tar.xz vitastor-0.6.3; \
cd vitastor-0.6.3; \
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \

View File

@ -41,6 +41,12 @@ const etcd_allow = new RegExp('^'+[
const etcd_tree = {
config: {
/* global: {
// WARNING: NOT ALL OF THESE ARE ACTUALLY CONFIGURABLE HERE
// THIS IS JUST A POOR'S MAN CONFIG DOCUMENTATION
// etcd connection
config_path: "/etc/vitastor/vitastor.conf",
etcd_address: "10.0.115.10:2379/v3",
etcd_prefix: "/vitastor",
// mon
etcd_mon_ttl: 30, // min: 10
etcd_mon_timeout: 1000, // ms. min: 0
@ -50,7 +56,17 @@ const etcd_tree = {
osd_out_time: 600, // seconds. min: 0
placement_levels: { datacenter: 1, rack: 2, host: 3, osd: 4, ... },
// client and osd
tcp_header_buffer_size: 65536,
use_sync_send_recv: false,
use_rdma: true,
rdma_device: null, // for example, "rocep5s0f0"
rdma_port_num: 1,
rdma_gid_index: 0,
rdma_mtu: 4096,
rdma_max_sge: 128,
rdma_max_send: 32,
rdma_max_recv: 8,
rdma_max_msg: 1048576,
log_level: 0,
block_size: 131072,
disk_alignment: 4096,

View File

@ -48,4 +48,4 @@ FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Ve
QEMU=`rpm -qi qemu qemu-kvm | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
perl -i -pe 's/(Requires:\s*qemu(?:-kvm)?)([^\n]+)?/$1 = '$QEMU'/' $VITASTOR/rpm/vitastor-el$EL.spec
tar --transform 's#^#vitastor-0.6.2/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.2$(rpm --eval '%dist').tar.gz *
tar --transform 's#^#vitastor-0.6.3/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.3$(rpm --eval '%dist').tar.gz *

View File

@ -17,6 +17,7 @@ RUN rpm --nomd5 -i fio*.src.rpm
RUN rm -f /etc/yum.repos.d/CentOS-Media.repo
RUN cd ~/rpmbuild/SPECS && yum-builddep -y --enablerepo='*' --disablerepo=centos-sclo-rh --disablerepo=centos-sclo-rh-source --disablerepo=centos-sclo-sclo-testing qemu-kvm.spec
RUN cd ~/rpmbuild/SPECS && yum-builddep -y --enablerepo='*' --disablerepo=centos-sclo-rh --disablerepo=centos-sclo-rh-source --disablerepo=centos-sclo-sclo-testing fio.spec
RUN yum -y install rdma-core-devel
ADD https://vitastor.io/rpms/liburing-el7/liburing-0.7-2.el7.src.rpm /root
@ -37,7 +38,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.6.2.el7.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.6.3.el7.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@ -1,11 +1,11 @@
Name: vitastor
Version: 0.6.2
Version: 0.6.3
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.6.2.el7.tar.gz
Source0: vitastor-0.6.3.el7.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel
@ -14,6 +14,7 @@ BuildRequires: rh-nodejs12
BuildRequires: rh-nodejs12-npm
BuildRequires: jerasure-devel
BuildRequires: gf-complete-devel
BuildRequires: libibverbs-devel
BuildRequires: cmake
Requires: fio = 3.7-1.el7
Requires: qemu-kvm = 2.0.0-1.el7.6
@ -61,8 +62,8 @@ cp -r mon %buildroot/usr/lib/vitastor/mon
%_libdir/libfio_vitastor.so
%_libdir/libfio_vitastor_blk.so
%_libdir/libfio_vitastor_sec.so
%_libdir/libvitastor_blk.so
%_libdir/libvitastor_client.so
%_libdir/libvitastor_blk.so*
%_libdir/libvitastor_client.so*
/usr/lib/vitastor

View File

@ -15,6 +15,7 @@ RUN rpm --nomd5 -i qemu*.src.rpm
RUN rpm --nomd5 -i fio*.src.rpm
RUN cd ~/rpmbuild/SPECS && dnf builddep -y --enablerepo=powertools --spec qemu-kvm.spec
RUN cd ~/rpmbuild/SPECS && dnf builddep -y --enablerepo=powertools --spec fio.spec && dnf install -y cmake
RUN yum -y install libibverbs-devel
ADD https://vitastor.io/rpms/liburing-el7/liburing-0.7-2.el7.src.rpm /root
@ -35,7 +36,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.6.2.el8.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.6.3.el8.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@ -1,11 +1,11 @@
Name: vitastor
Version: 0.6.2
Version: 0.6.3
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.6.2.el8.tar.gz
Source0: vitastor-0.6.3.el8.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel
@ -13,6 +13,7 @@ BuildRequires: gcc-toolset-9-gcc-c++
BuildRequires: nodejs >= 10
BuildRequires: jerasure-devel
BuildRequires: gf-complete-devel
BuildRequires: libibverbs-devel
BuildRequires: cmake
Requires: fio = 3.7-3.el8
Requires: qemu-kvm = 4.2.0-29.el8.6
@ -58,8 +59,8 @@ cp -r mon %buildroot/usr/lib/vitastor
%_libdir/libfio_vitastor.so
%_libdir/libfio_vitastor_blk.so
%_libdir/libfio_vitastor_sec.so
%_libdir/libvitastor_blk.so
%_libdir/libvitastor_client.so
%_libdir/libvitastor_blk.so*
%_libdir/libvitastor_client.so*
/usr/lib/vitastor

View File

@ -9,7 +9,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}")
endif()
add_definitions(-DVERSION="0.6.2")
add_definitions(-DVERSION="0.6.3")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -I ${CMAKE_SOURCE_DIR}/src)
if (${WITH_ASAN})
add_definitions(-fsanitize=address -fno-omit-frame-pointer)
@ -23,11 +23,16 @@ endif()
find_package(PkgConfig)
pkg_check_modules(LIBURING REQUIRED liburing)
pkg_check_modules(IBVERBS libibverbs)
if (IBVERBS_LIBRARIES)
add_definitions(-DWITH_RDMA)
endif (IBVERBS_LIBRARIES)
include_directories(
../
/usr/include/jerasure
${LIBURING_INCLUDE_DIRS}
${IBVERBS_INCLUDE_DIRS}
)
# libvitastor_blk.so
@ -36,20 +41,25 @@ add_library(vitastor_blk SHARED
blockstore_write.cpp blockstore_sync.cpp blockstore_stable.cpp blockstore_rollback.cpp blockstore_flush.cpp crc32c.c ringloop.cpp
)
target_link_libraries(vitastor_blk
vitastor_common
${LIBURING_LIBRARIES}
tcmalloc_minimal
# for timerfd_manager
vitastor_common
)
set_target_properties(vitastor_blk PROPERTIES VERSION ${VERSION_STRING} SOVERSION 0)
set_target_properties(vitastor_blk PROPERTIES VERSION ${VERSION} SOVERSION 0)
# libvitastor_common.a
set(MSGR_RDMA "")
if (IBVERBS_LIBRARIES)
set(MSGR_RDMA "msgr_rdma.cpp")
endif (IBVERBS_LIBRARIES)
add_library(vitastor_common STATIC
epoll_manager.cpp etcd_state_client.cpp
messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp
http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp
http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ${MSGR_RDMA}
)
target_compile_options(vitastor_common PUBLIC -fPIC)
# vitastor-osd
add_executable(vitastor-osd
osd_main.cpp osd.cpp osd_secondary.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp
@ -60,6 +70,7 @@ target_link_libraries(vitastor-osd
vitastor_common
vitastor_blk
Jerasure
${IBVERBS_LIBRARIES}
)
@ -71,8 +82,9 @@ target_link_libraries(vitastor_client
vitastor_common
tcmalloc_minimal
${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES}
)
set_target_properties(vitastor_client PROPERTIES VERSION ${VERSION_STRING} SOVERSION 0)
set_target_properties(vitastor_client PROPERTIES VERSION ${VERSION} SOVERSION 0)
# vitastor-nbd
add_executable(vitastor-nbd
@ -116,6 +128,7 @@ add_executable(stub_uring_osd
target_link_libraries(stub_uring_osd
vitastor_common
${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES}
tcmalloc_minimal
)

View File

@ -43,11 +43,6 @@ int blockstore_t::read_bitmap(object_id oid, uint64_t target_version, void *bitm
return impl->read_bitmap(oid, target_version, bitmap, result_version);
}
std::unordered_map<object_id, uint64_t> & blockstore_t::get_unstable_writes()
{
return impl->unstable_writes;
}
std::map<uint64_t, uint64_t> & blockstore_t::get_inode_space_stats()
{
return impl->inode_space_stats;

View File

@ -183,9 +183,6 @@ public:
// Simplified synchronous operation: get object bitmap & current version
int read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version = NULL);
// Unstable writes are added here (map of object_id -> version)
std::unordered_map<object_id, uint64_t> & get_unstable_writes();
// Get per-inode space usage statistics
std::map<uint64_t, uint64_t> & get_inode_space_stats();

View File

@ -16,6 +16,8 @@
cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
{
config = osd_messenger_t::read_config(config);
this->ringloop = ringloop;
this->tfd = tfd;
this->config = config;
@ -53,6 +55,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
msgr.stop_client(op->peer_fd);
delete op;
};
msgr.parse_config(this->config);
msgr.init();
st_cli.tfd = tfd;
@ -108,6 +111,115 @@ cluster_op_t::~cluster_op_t()
}
}
void cluster_client_t::calc_wait(cluster_op_t *op)
{
op->prev_wait = 0;
if (op->opcode == OSD_OP_WRITE)
{
for (auto prev = op->prev; prev; prev = prev->prev)
{
if (prev->opcode == OSD_OP_SYNC ||
prev->opcode == OSD_OP_WRITE && !(op->flags & OP_FLUSH_BUFFER) && (prev->flags & OP_FLUSH_BUFFER))
{
op->prev_wait++;
}
}
if (!op->prev_wait && pgs_loaded)
continue_rw(op);
}
else if (op->opcode == OSD_OP_SYNC)
{
for (auto prev = op->prev; prev; prev = prev->prev)
{
if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE)
{
op->prev_wait++;
}
}
if (!op->prev_wait && pgs_loaded)
continue_sync(op);
}
else
{
for (auto prev = op->prev; prev; prev = prev->prev)
{
if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER)
{
op->prev_wait++;
}
else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ)
{
// Flushes are always in the beginning
break;
}
}
if (!op->prev_wait && pgs_loaded)
continue_rw(op);
}
}
void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc)
{
if (opcode == OSD_OP_WRITE)
{
while (next)
{
auto n2 = next->next;
if (next->opcode == OSD_OP_SYNC ||
next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER) ||
next->opcode == OSD_OP_READ && (flags & OP_FLUSH_BUFFER))
{
next->prev_wait += inc;
if (!next->prev_wait)
{
if (next->opcode == OSD_OP_SYNC)
continue_sync(next);
else
continue_rw(next);
}
}
next = n2;
}
}
else if (opcode == OSD_OP_SYNC)
{
while (next)
{
auto n2 = next->next;
if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE)
{
next->prev_wait += inc;
if (!next->prev_wait)
{
if (next->opcode == OSD_OP_SYNC)
continue_sync(next);
else
continue_rw(next);
}
}
next = n2;
}
}
}
void cluster_client_t::erase_op(cluster_op_t *op)
{
uint64_t opcode = op->opcode, flags = op->flags;
cluster_op_t *next = op->next;
if (op->prev)
op->prev->next = op->next;
if (op->next)
op->next->prev = op->prev;
if (op_queue_head == op)
op_queue_head = op->next;
if (op_queue_tail == op)
op_queue_tail = op->prev;
op->next = op->prev = NULL;
std::function<void(cluster_op_t*)>(op->callback)(op);
if (!immediate_commit)
inc_wait(opcode, flags, next, -1);
}
void cluster_client_t::continue_ops(bool up_retry)
{
if (!pgs_loaded)
@ -118,60 +230,25 @@ void cluster_client_t::continue_ops(bool up_retry)
if (continuing_ops)
{
// Attempt to reenter the function
continuing_ops = 2;
return;
}
restart:
continuing_ops = 1;
op_queue_pos = 0;
bool has_flushes = false, has_writes = false;
while (op_queue_pos < op_queue.size())
for (auto op = op_queue_head; op; )
{
auto op = op_queue[op_queue_pos];
bool rm = false, is_flush = op->flags & OP_FLUSH_BUFFER;
auto opcode = op->opcode;
cluster_op_t *next_op = op->next;
if (!op->up_wait || up_retry)
{
op->up_wait = false;
if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE)
if (!op->prev_wait)
{
if (is_flush || !has_flushes)
{
// Regular writes can't proceed before buffer flushes
rm = continue_rw(op);
}
}
else if (opcode == OSD_OP_SYNC)
{
if (!has_writes)
{
// SYNC can't proceed before previous writes
rm = continue_sync(op);
}
if (op->opcode == OSD_OP_SYNC)
continue_sync(op);
else
continue_rw(op);
}
}
if (opcode == OSD_OP_WRITE)
{
has_writes = has_writes || !rm;
if (is_flush)
{
has_flushes = has_writes || !rm;
}
}
else if (opcode == OSD_OP_SYNC)
{
// Postpone writes until previous SYNC completes
// ...so dirty_writes can't contain anything newer than SYNC
has_flushes = has_writes || !rm;
}
if (rm)
{
op_queue.erase(op_queue.begin()+op_queue_pos, op_queue.begin()+op_queue_pos+1);
}
else
{
op_queue_pos++;
}
op = next_op;
if (continuing_ops == 2)
{
goto restart;
@ -213,11 +290,8 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
{
throw std::runtime_error("Bad block size");
}
if (config["immediate_commit"] == "all")
{
// Cluster-wide immediate_commit mode
immediate_commit = true;
}
// Cluster-wide immediate_commit mode
immediate_commit = (config["immediate_commit"] == "all");
if (config.find("client_max_dirty_bytes") != config.end())
{
client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value();
@ -281,7 +355,7 @@ void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes
{
// At this point, all pool operations should have been suspended
// And now they have to be resliced!
for (auto op: op_queue)
for (auto op = op_queue_head; op; op = op->next)
{
if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) &&
INODE_POOL(op->cur_inode) == pool_item.first)
@ -362,9 +436,17 @@ void cluster_client_t::execute(cluster_op_t *op)
{
delete sync_op;
};
op_queue.push_back(sync_op);
sync_op->prev = op_queue_tail;
if (op_queue_tail)
{
op_queue_tail->next = sync_op;
op_queue_tail = sync_op;
}
else
op_queue_tail = op_queue_head = sync_op;
dirty_bytes = 0;
dirty_ops = 0;
calc_wait(sync_op);
}
dirty_bytes += op->len;
dirty_ops++;
@ -374,8 +456,23 @@ void cluster_client_t::execute(cluster_op_t *op)
dirty_bytes = 0;
dirty_ops = 0;
}
op_queue.push_back(op);
continue_ops();
op->prev = op_queue_tail;
if (op_queue_tail)
{
op_queue_tail->next = op;
op_queue_tail = op;
}
else
op_queue_tail = op_queue_head = op;
if (!immediate_commit)
calc_wait(op);
else if (pgs_loaded)
{
if (op->opcode == OSD_OP_SYNC)
continue_sync(op);
else
continue_rw(op);
}
}
void cluster_client_t::copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers)
@ -474,12 +571,16 @@ void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr)
}
delete op;
};
op_queue.insert(op_queue.begin(), op);
if (continuing_ops)
op->next = op_queue_head;
if (op_queue_head)
{
continuing_ops = 2;
op_queue_pos++;
op_queue_head->prev = op;
op_queue_head = op;
}
else
op_queue_tail = op_queue_head = op;
inc_wait(op->opcode, op->flags, op->next, 1);
continue_rw(op);
}
int cluster_client_t::continue_rw(cluster_op_t *op)
@ -496,7 +597,7 @@ resume_0:
if (!op->len || op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
{
@ -504,7 +605,7 @@ resume_0:
if (!pool_id)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() ||
@ -520,7 +621,7 @@ resume_0:
if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER))
@ -603,13 +704,13 @@ resume_3:
}
}
op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
else if (op->retval != 0 && op->retval != -EPIPE)
{
// Fatal error (not -EPIPE)
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
else
@ -849,17 +950,18 @@ int cluster_client_t::continue_sync(cluster_op_t *op)
{
// Sync is not required in the immediate_commit mode or if there are no dirty_osds
op->retval = 0;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
// Check that all OSD connections are still alive
for (auto sync_osd: dirty_osds)
for (auto do_it = dirty_osds.begin(); do_it != dirty_osds.end(); )
{
osd_num_t sync_osd = *do_it;
auto peer_it = msgr.osd_peer_fds.find(sync_osd);
if (peer_it == msgr.osd_peer_fds.end())
{
return 0;
}
dirty_osds.erase(do_it++);
else
do_it++;
}
// Post sync to affected OSDs
for (auto & prev_op: dirty_buffers)
@ -924,7 +1026,7 @@ resume_1:
uw_it++;
}
}
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
@ -1008,7 +1110,10 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
}
if (op->inflight_count == 0)
{
continue_ops();
if (op->opcode == OSD_OP_SYNC)
continue_sync(op);
else
continue_rw(op);
}
}

View File

@ -36,7 +36,7 @@ struct cluster_op_t
std::function<void(cluster_op_t*)> callback;
~cluster_op_t();
protected:
int flags = 0;
uint64_t flags = 0;
int state = 0;
uint64_t cur_inode; // for snapshot reads
void *buf = NULL;
@ -47,6 +47,8 @@ protected:
std::vector<cluster_op_part_t> parts;
void *bitmap_buf = NULL, *part_bitmaps = NULL;
unsigned bitmap_buf_size = 0;
cluster_op_t *prev = NULL, *next = NULL;
int prev_wait = 0;
friend class cluster_client_t;
};
@ -66,7 +68,8 @@ class cluster_client_t
uint64_t bs_block_size = 0;
uint32_t bs_bitmap_granularity = 0, bs_bitmap_size = 0;
std::map<pool_id_t, uint64_t> pg_counts;
bool immediate_commit = false;
// WARNING: initially true so execute() doesn't create fake sync
bool immediate_commit = true;
// FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
uint64_t client_max_dirty_bytes = 0;
uint64_t client_max_dirty_ops = 0;
@ -76,7 +79,7 @@ class cluster_client_t
int retry_timeout_id = 0;
uint64_t op_id = 1;
std::vector<cluster_op_t*> offline_ops;
std::vector<cluster_op_t*> op_queue;
cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL;
std::map<object_id, cluster_buffer_t> dirty_buffers;
std::set<osd_num_t> dirty_osds;
uint64_t dirty_bytes = 0, dirty_ops = 0;
@ -88,7 +91,6 @@ class cluster_client_t
ring_consumer_t consumer;
std::vector<std::function<void(void)>> on_ready_hooks;
int continuing_ops = 0;
int op_queue_pos = 0;
public:
etcd_state_client_t st_cli;
@ -117,4 +119,7 @@ protected:
void send_sync(cluster_op_t *op, cluster_op_part_t *part);
void handle_op_part(cluster_op_part_t *part);
void copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part);
void erase_op(cluster_op_t *op);
void calc_wait(cluster_op_t *op);
void inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc);
};

View File

@ -50,6 +50,11 @@ void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function<
void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback)
{
if (!etcd_addresses.size())
{
fprintf(stderr, "etcd_address is missing in Vitastor configuration\n");
exit(1);
}
std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()];
std::string etcd_api_path;
int pos = etcd_address.find('/');
@ -85,7 +90,7 @@ void etcd_state_client_t::add_etcd_url(std::string addr)
}
}
void etcd_state_client_t::parse_config(json11::Json & config)
void etcd_state_client_t::parse_config(const json11::Json & config)
{
this->etcd_addresses.clear();
if (config["etcd_address"].is_string())
@ -122,6 +127,11 @@ void etcd_state_client_t::parse_config(json11::Json & config)
void etcd_state_client_t::start_etcd_watcher()
{
if (!etcd_addresses.size())
{
fprintf(stderr, "etcd_address is missing in Vitastor configuration\n");
exit(1);
}
std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()];
std::string etcd_api_path;
int pos = etcd_address.find('/');
@ -342,7 +352,7 @@ void etcd_state_client_t::load_pgs()
});
}
#else
void etcd_state_client_t::parse_config(json11::Json & config)
void etcd_state_client_t::parse_config(const json11::Json & config)
{
}

View File

@ -106,7 +106,7 @@ public:
void load_global_config();
void load_pgs();
void parse_state(const etcd_kv_t & kv);
void parse_config(json11::Json & config);
void parse_config(const json11::Json & config);
inode_watch_t* watch_inode(std::string name);
void close_watch(inode_watch_t* watch);
~etcd_state_client_t();

View File

@ -24,7 +24,6 @@
#include <netinet/tcp.h>
#include <vector>
#include <unordered_map>
#include "epoll_manager.h"
#include "cluster_client.h"
@ -46,6 +45,7 @@ struct sec_data
struct sec_options
{
int __pad;
char *config_path = NULL;
char *etcd_host = NULL;
char *etcd_prefix = NULL;
char *image = NULL;
@ -53,9 +53,23 @@ struct sec_options
uint64_t inode = 0;
int cluster_log = 0;
int trace = 0;
int use_rdma = 0;
char *rdma_device = NULL;
int rdma_port_num = 0;
int rdma_gid_index = 0;
int rdma_mtu = 0;
};
static struct fio_option options[] = {
{
.name = "conf",
.lname = "Vitastor config path",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct sec_options, config_path),
.help = "Vitastor config path",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "etcd",
.lname = "etcd address",
@ -121,6 +135,55 @@ static struct fio_option options[] = {
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "use_rdma",
.lname = "Use RDMA",
.type = FIO_OPT_BOOL,
.off1 = offsetof(struct sec_options, use_rdma),
.help = "Use RDMA",
.def = "-1",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "rdma_device",
.lname = "RDMA device name",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct sec_options, rdma_device),
.help = "RDMA device name",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "rdma_port_num",
.lname = "RDMA port number",
.type = FIO_OPT_INT,
.off1 = offsetof(struct sec_options, rdma_port_num),
.help = "RDMA port number",
.def = "0",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "rdma_gid_index",
.lname = "RDMA gid index",
.type = FIO_OPT_INT,
.off1 = offsetof(struct sec_options, rdma_gid_index),
.help = "RDMA gid index",
.def = "0",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "rdma_mtu",
.lname = "RDMA path MTU",
.type = FIO_OPT_INT,
.off1 = offsetof(struct sec_options, rdma_mtu),
.help = "RDMA path MTU",
.def = "0",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = NULL,
},
@ -131,12 +194,6 @@ static int sec_setup(struct thread_data *td)
sec_options *o = (sec_options*)td->eo;
sec_data *bsd;
if (!o->etcd_host)
{
td_verror(td, EINVAL, "etcd address is missing");
return 1;
}
bsd = new sec_data;
if (!bsd)
{
@ -152,11 +209,26 @@ static int sec_setup(struct thread_data *td)
td->o.open_files++;
}
json11::Json cfg = json11::Json::object {
{ "etcd_address", std::string(o->etcd_host) },
{ "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/vitastor") },
{ "log_level", o->cluster_log },
};
json11::Json::object cfg;
if (o->config_path)
cfg["config_path"] = std::string(o->config_path);
if (o->etcd_host)
cfg["etcd_address"] = std::string(o->etcd_host);
if (o->etcd_prefix)
cfg["etcd_prefix"] = std::string(o->etcd_prefix);
if (o->rdma_device)
cfg["rdma_device"] = std::string(o->rdma_device);
if (o->rdma_port_num)
cfg["rdma_port_num"] = o->rdma_port_num;
if (o->rdma_gid_index)
cfg["rdma_gid_index"] = o->rdma_gid_index;
if (o->rdma_mtu)
cfg["rdma_mtu"] = o->rdma_mtu;
if (o->cluster_log)
cfg["log_level"] = o->cluster_log;
if (o->use_rdma != -1)
cfg["use_rdma"] = o->use_rdma;
json11::Json cfg_json(cfg);
if (!o->image)
{
@ -181,7 +253,7 @@ static int sec_setup(struct thread_data *td)
}
bsd->ringloop = new ring_loop_t(512);
bsd->epmgr = new epoll_manager_t(bsd->ringloop);
bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg);
bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg_json);
if (o->image)
{
while (!bsd->cli->is_ready())

View File

@ -12,6 +12,31 @@
void osd_messenger_t::init()
{
#ifdef WITH_RDMA
if (use_rdma)
{
rdma_context = msgr_rdma_context_t::create(
rdma_device != "" ? rdma_device.c_str() : NULL,
rdma_port_num, rdma_gid_index, rdma_mtu
);
if (!rdma_context)
{
printf("[OSD %lu] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num);
}
else
{
rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge
? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge;
printf("[OSD %lu] RDMA initialized successfully\n", osd_num);
fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK);
tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events)
{
handle_rdma_events();
});
handle_rdma_events();
}
}
#endif
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
{
std::vector<int> to_stop;
@ -19,7 +44,7 @@ void osd_messenger_t::init()
for (auto cl_it = clients.begin(); cl_it != clients.end(); cl_it++)
{
auto cl = cl_it->second;
if (!cl->osd_num || cl->peer_state != PEER_CONNECTED)
if (!cl->osd_num || cl->peer_state != PEER_CONNECTED && cl->peer_state != PEER_RDMA)
{
// Do not run keepalive on regular clients
continue;
@ -94,32 +119,58 @@ osd_messenger_t::~osd_messenger_t()
{
stop_client(clients.begin()->first, true);
}
#ifdef WITH_RDMA
if (rdma_context)
{
delete rdma_context;
}
#endif
}
void osd_messenger_t::parse_config(const json11::Json & config)
{
#ifdef WITH_RDMA
if (!config["use_rdma"].is_null())
{
// RDMA is on by default in RDMA-enabled builds
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
}
this->rdma_device = config["rdma_device"].string_value();
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
if (!this->rdma_port_num)
this->rdma_port_num = 1;
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
this->rdma_max_sge = config["rdma_max_sge"].uint64_value();
if (!this->rdma_max_sge)
this->rdma_max_sge = 128;
this->rdma_max_send = config["rdma_max_send"].uint64_value();
if (!this->rdma_max_send)
this->rdma_max_send = 32;
this->rdma_max_recv = config["rdma_max_recv"].uint64_value();
if (!this->rdma_max_recv)
this->rdma_max_recv = 8;
this->rdma_max_msg = config["rdma_max_msg"].uint64_value();
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
this->rdma_max_msg = 1024*1024;
#endif
this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value();
if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024)
this->receive_buffer_size = 65536;
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
config["use_sync_send_recv"].uint64_value();
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
if (!this->peer_connect_interval)
{
this->peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
}
this->peer_connect_interval = 5;
this->peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
if (!this->peer_connect_timeout)
{
this->peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
}
this->peer_connect_timeout = 5;
this->osd_idle_timeout = config["osd_idle_timeout"].uint64_value();
if (!this->osd_idle_timeout)
{
this->osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
}
this->osd_idle_timeout = 5;
this->osd_ping_timeout = config["osd_ping_timeout"].uint64_value();
if (!this->osd_ping_timeout)
{
this->osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
}
this->osd_ping_timeout = 5;
this->log_level = config["log_level"].uint64_value();
}
@ -326,6 +377,24 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
},
},
};
#ifdef WITH_RDMA
if (rdma_context)
{
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
if (cl->rdma_conn)
{
json11::Json payload = json11::Json::object {
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
{ "rdma_max_msg", cl->rdma_conn->max_msg },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
}
}
#endif
op->callback = [this, cl](osd_op_t *op)
{
std::string json_err;
@ -361,12 +430,50 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
}
if (err)
{
osd_num_t osd_num = cl->osd_num;
osd_num_t peer_osd = cl->osd_num;
stop_client(op->peer_fd);
on_connect_peer(osd_num, -1);
on_connect_peer(peer_osd, -1);
delete op;
return;
}
#ifdef WITH_RDMA
if (config["rdma_address"].is_string())
{
msgr_rdma_address_t addr;
if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) ||
cl->rdma_conn->connect(&addr) != 0)
{
printf(
"Failed to connect to OSD %lu (address %s) using RDMA\n",
cl->osd_num, config["rdma_address"].string_value().c_str()
);
delete cl->rdma_conn;
cl->rdma_conn = NULL;
// FIXME: Keep TCP connection in this case
osd_num_t peer_osd = cl->osd_num;
stop_client(cl->peer_fd);
on_connect_peer(peer_osd, -1);
delete op;
return;
}
else
{
uint64_t server_max_msg = config["rdma_max_msg"].uint64_value();
if (cl->rdma_conn->max_msg > server_max_msg)
{
cl->rdma_conn->max_msg = server_max_msg;
}
if (log_level > 0)
{
printf("Connected to OSD %lu using RDMA\n", cl->osd_num);
}
cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, NULL);
// Add the initial receive request
try_recv_rdma(cl);
}
}
#endif
osd_peer_fds[cl->osd_num] = cl->peer_fd;
on_connect_peer(cl->osd_num, cl->peer_fd);
delete op;
@ -408,3 +515,57 @@ void osd_messenger_t::accept_connections(int listen_fd)
throw std::runtime_error(std::string("accept: ") + strerror(errno));
}
}
bool osd_messenger_t::is_rdma_enabled()
{
return rdma_context != NULL;
}
json11::Json osd_messenger_t::read_config(const json11::Json & config)
{
const char *config_path = config["config_path"].string_value() != ""
? config["config_path"].string_value().c_str() : VITASTOR_CONFIG_PATH;
int fd = open(config_path, O_RDONLY);
if (fd < 0)
{
if (errno != ENOENT)
fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno));
return config;
}
struct stat st;
if (fstat(fd, &st) != 0)
{
fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno));
close(fd);
return config;
}
std::string buf;
buf.resize(st.st_size);
int done = 0;
while (done < st.st_size)
{
int r = read(fd, (void*)buf.data()+done, st.st_size-done);
if (r < 0)
{
fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno));
close(fd);
return config;
}
done += r;
}
close(fd);
std::string json_err;
json11::Json::object file_config = json11::Json::parse(buf, json_err).object_items();
if (json_err != "")
{
fprintf(stderr, "Invalid JSON in %s: %s\n", config_path, json_err.c_str());
return config;
}
file_config.erase("config_path");
file_config.erase("osd_num");
for (auto kv: config.object_items())
{
file_config[kv.first] = kv.second;
}
return file_config;
}

View File

@ -18,20 +18,32 @@
#include "timerfd_manager.h"
#include <ringloop.h>
#ifdef WITH_RDMA
#include "msgr_rdma.h"
#endif
#define CL_READ_HDR 1
#define CL_READ_DATA 2
#define CL_READ_REPLY_DATA 3
#define CL_WRITE_READY 1
#define CL_WRITE_REPLY 2
#define PEER_CONNECTING 1
#define PEER_CONNECTED 2
#define PEER_STOPPED 3
#define PEER_RDMA_CONNECTING 3
#define PEER_RDMA 4
#define PEER_STOPPED 5
#define DEFAULT_PEER_CONNECT_INTERVAL 5
#define DEFAULT_PEER_CONNECT_TIMEOUT 5
#define DEFAULT_OSD_PING_TIMEOUT 5
#define DEFAULT_BITMAP_GRANULARITY 4096
#define VITASTOR_CONFIG_PATH "/etc/vitastor/vitastor.conf"
#define MSGR_SENDP_HDR 1
#define MSGR_SENDP_FREE 2
struct msgr_sendp_t
{
osd_op_t *op;
int flags;
};
struct osd_client_t
{
@ -48,6 +60,10 @@ struct osd_client_t
void *in_buf = NULL;
#ifdef WITH_RDMA
msgr_rdma_connection_t *rdma_conn = NULL;
#endif
// Read state
int read_ready = 0;
osd_op_t *read_op = NULL;
@ -70,7 +86,7 @@ struct osd_client_t
msghdr write_msg = { 0 };
int write_state = 0;
std::vector<iovec> send_list, next_send_list;
std::vector<osd_op_t*> outbox, next_outbox;
std::vector<msgr_sendp_t> outbox, next_outbox;
~osd_client_t()
{
@ -104,15 +120,23 @@ struct osd_messenger_t
protected:
int keepalive_timer_id = -1;
// FIXME: make receive_buffer_size configurable
int receive_buffer_size = 64*1024;
int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
uint32_t receive_buffer_size = 0;
int peer_connect_interval = 0;
int peer_connect_timeout = 0;
int osd_idle_timeout = 0;
int osd_ping_timeout = 0;
int log_level = 0;
bool use_sync_send_recv = false;
#ifdef WITH_RDMA
bool use_rdma = true;
std::string rdma_device;
uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0;
msgr_rdma_context_t *rdma_context = NULL;
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 8;
uint64_t rdma_max_msg = 0;
#endif
std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients;
std::vector<std::function<void()>> set_immediate;
@ -141,6 +165,13 @@ public:
void accept_connections(int listen_fd);
~osd_messenger_t();
static json11::Json read_config(const json11::Json & config);
#ifdef WITH_RDMA
bool is_rdma_enabled();
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
#endif
protected:
void try_connect_peer(uint64_t osd_num);
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
@ -156,8 +187,15 @@ protected:
void handle_send(int result, osd_client_t *cl);
bool handle_read(int result, osd_client_t *cl);
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
bool handle_finished_read(osd_client_t *cl);
void handle_op_hdr(osd_client_t *cl);
bool handle_reply_hdr(osd_client_t *cl);
void handle_reply_ready(osd_op_t *op);
#ifdef WITH_RDMA
bool try_send_rdma(osd_client_t *cl);
bool try_recv_rdma(osd_client_t *cl);
void handle_rdma_events();
#endif
};

View File

@ -42,3 +42,8 @@ void osd_messenger_t::read_requests()
void osd_messenger_t::send_replies()
{
}
json11::Json osd_messenger_t::read_config(const json11::Json & config)
{
return config;
}

521
src/msgr_rdma.cpp Normal file
View File

@ -0,0 +1,521 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <stdio.h>
#include <stdlib.h>
#include "msgr_rdma.h"
#include "messenger.h"
std::string msgr_rdma_address_t::to_string()
{
char msg[sizeof "0000:00000000:00000000:00000000000000000000000000000000"];
sprintf(
msg, "%04x:%06x:%06x:%016lx%016lx", lid, qpn, psn,
htobe64(((uint64_t*)&gid)[0]), htobe64(((uint64_t*)&gid)[1])
);
return std::string(msg);
}
bool msgr_rdma_address_t::from_string(const char *str, msgr_rdma_address_t *dest)
{
uint64_t* gid = (uint64_t*)&dest->gid;
int n = sscanf(
str, "%hx:%x:%x:%16lx%16lx", &dest->lid, &dest->qpn, &dest->psn, gid, gid+1
);
gid[0] = be64toh(gid[0]);
gid[1] = be64toh(gid[1]);
return n == 5;
}
msgr_rdma_context_t::~msgr_rdma_context_t()
{
if (cq)
ibv_destroy_cq(cq);
if (channel)
ibv_destroy_comp_channel(channel);
if (mr)
ibv_dereg_mr(mr);
if (pd)
ibv_dealloc_pd(pd);
if (context)
ibv_close_device(context);
}
msgr_rdma_connection_t::~msgr_rdma_connection_t()
{
ctx->used_max_cqe -= max_send+max_recv;
if (qp)
ibv_destroy_qp(qp);
}
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu)
{
int res;
ibv_device **dev_list = NULL;
msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
ctx->mtu = mtu;
dev_list = ibv_get_device_list(NULL);
if (!dev_list)
{
fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno));
goto cleanup;
}
if (!ib_devname)
{
ctx->dev = *dev_list;
if (!ctx->dev)
{
fprintf(stderr, "No RDMA devices found\n");
goto cleanup;
}
}
else
{
int i;
for (i = 0; dev_list[i]; ++i)
if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname))
break;
ctx->dev = dev_list[i];
if (!ctx->dev)
{
fprintf(stderr, "RDMA device %s not found\n", ib_devname);
goto cleanup;
}
}
ctx->context = ibv_open_device(ctx->dev);
if (!ctx->context)
{
fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(ctx->dev));
goto cleanup;
}
ctx->ib_port = ib_port;
ctx->gid_index = gid_index;
if ((res = ibv_query_port(ctx->context, ib_port, &ctx->portinfo)) != 0)
{
fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(ctx->dev), ib_port, strerror(res));
goto cleanup;
}
ctx->my_lid = ctx->portinfo.lid;
if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid)
{
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev));
goto cleanup;
}
if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index);
goto cleanup;
}
ctx->pd = ibv_alloc_pd(ctx->context);
if (!ctx->pd)
{
fprintf(stderr, "Couldn't allocate RDMA protection domain\n");
goto cleanup;
}
{
if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
{
fprintf(stderr, "Couldn't query RDMA device for its features\n");
goto cleanup;
}
if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n");
goto cleanup;
}
}
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr)
{
fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup;
}
ctx->channel = ibv_create_comp_channel(ctx->context);
if (!ctx->channel)
{
fprintf(stderr, "Couldn't create RDMA completion channel\n");
goto cleanup;
}
ctx->max_cqe = 4096;
ctx->cq = ibv_create_cq(ctx->context, ctx->max_cqe, NULL, ctx->channel, 0);
if (!ctx->cq)
{
fprintf(stderr, "Couldn't create RDMA completion queue\n");
goto cleanup;
}
if (dev_list)
ibv_free_device_list(dev_list);
return ctx;
cleanup:
delete ctx;
if (dev_list)
ibv_free_device_list(dev_list);
return NULL;
}
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send,
uint32_t max_recv, uint32_t max_sge, uint32_t max_msg)
{
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge;
conn->ctx = ctx;
conn->max_send = max_send;
conn->max_recv = max_recv;
conn->max_sge = max_sge;
conn->max_msg = max_msg;
ctx->used_max_cqe += max_send+max_recv;
if (ctx->used_max_cqe > ctx->max_cqe)
{
// Resize CQ
// Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ
int new_max_cqe = ctx->max_cqe;
while (ctx->used_max_cqe > new_max_cqe)
{
new_max_cqe *= 2;
}
if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0)
{
fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe);
delete conn;
return NULL;
}
ctx->max_cqe = new_max_cqe;
}
ibv_qp_init_attr init_attr = {
.send_cq = ctx->cq,
.recv_cq = ctx->cq,
.cap = {
.max_send_wr = max_send,
.max_recv_wr = max_recv,
.max_send_sge = max_sge,
.max_recv_sge = max_sge,
},
.qp_type = IBV_QPT_RC,
};
conn->qp = ibv_create_qp(ctx->pd, &init_attr);
if (!conn->qp)
{
fprintf(stderr, "Couldn't create RDMA queue pair\n");
delete conn;
return NULL;
}
conn->addr.lid = ctx->my_lid;
conn->addr.gid = ctx->my_gid;
conn->addr.qpn = conn->qp->qp_num;
conn->addr.psn = lrand48() & 0xffffff;
ibv_qp_attr attr = {
.qp_state = IBV_QPS_INIT,
.qp_access_flags = 0,
.pkey_index = 0,
.port_num = ctx->ib_port,
};
if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS))
{
fprintf(stderr, "Failed to switch RDMA queue pair to INIT state\n");
delete conn;
return NULL;
}
return conn;
}
static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu)
{
switch (mtu)
{
case 256: return IBV_MTU_256;
case 512: return IBV_MTU_512;
case 1024: return IBV_MTU_1024;
case 2048: return IBV_MTU_2048;
case 4096: return IBV_MTU_4096;
}
return IBV_MTU_4096;
}
int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest)
{
auto conn = this;
ibv_qp_attr attr = {
.qp_state = IBV_QPS_RTR,
.path_mtu = mtu_to_ibv_mtu(conn->ctx->mtu),
.rq_psn = dest->psn,
.sq_psn = conn->addr.psn,
.dest_qp_num = dest->qpn,
.ah_attr = {
.grh = {
.dgid = dest->gid,
.sgid_index = conn->ctx->gid_index,
.hop_limit = 1, // FIXME can it vary?
},
.dlid = dest->lid,
.sl = 0, // service level
.src_path_bits = 0,
.is_global = (uint8_t)(dest->gid.global.interface_id ? 1 : 0),
.port_num = conn->ctx->ib_port,
},
.max_rd_atomic = 1,
.max_dest_rd_atomic = 1,
// Timeout and min_rnr_timer actual values seem to be 4.096us*2^(timeout+1)
.min_rnr_timer = 1,
.timeout = 14,
.retry_cnt = 7,
.rnr_retry = 7,
};
// FIXME No idea if ibv_modify_qp is a blocking operation or not. No idea if it has a timeout and what it is.
if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER))
{
fprintf(stderr, "Failed to switch RDMA queue pair to RTR (ready-to-receive) state\n");
return 1;
}
attr.qp_state = IBV_QPS_RTS;
if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC))
{
fprintf(stderr, "Failed to switch RDMA queue pair to RTS (ready-to-send) state\n");
return 1;
}
return 0;
}
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg)
{
// Try to connect to the peer using RDMA
msgr_rdma_address_t addr;
if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr))
{
if (client_max_msg > rdma_max_msg)
{
client_max_msg = rdma_max_msg;
}
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
if (rdma_conn)
{
int r = rdma_conn->connect(&addr);
if (r != 0)
{
delete rdma_conn;
printf(
"Failed to connect RDMA queue pair to %s (client %d)\n",
addr.to_string().c_str(), peer_fd
);
}
else
{
// Remember connection, but switch to RDMA only after sending the configuration response
auto cl = clients.at(peer_fd);
cl->rdma_conn = rdma_conn;
cl->peer_state = PEER_RDMA_CONNECTING;
return true;
}
}
}
return false;
}
static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
{
ibv_send_wr *bad_wr = NULL;
ibv_send_wr wr = {
.wr_id = (uint64_t)(cl->peer_fd*2+1),
.sg_list = sge,
.num_sge = op_sge,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
};
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
if (err || bad_wr)
{
printf("RDMA send failed: %s\n", strerror(err));
exit(1);
}
cl->rdma_conn->cur_send++;
}
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
{
auto rc = cl->rdma_conn;
if (!cl->send_list.size() || rc->cur_send > 0)
{
// Only send one batch at a time
return true;
}
uint64_t op_size = 0, op_sge = 0;
ibv_sge sge[rc->max_sge];
while (rc->send_pos < cl->send_list.size())
{
iovec & iov = cl->send_list[rc->send_pos];
if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
{
try_send_rdma_wr(cl, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_send >= rc->max_send)
{
break;
}
}
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size);
sge[op_sge++] = {
.addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos),
.length = len,
.lkey = rc->ctx->mr->lkey,
};
op_size += len;
rc->send_buf_pos += len;
if (rc->send_buf_pos >= iov.iov_len)
{
rc->send_pos++;
rc->send_buf_pos = 0;
}
}
if (op_sge > 0)
{
try_send_rdma_wr(cl, sge, op_sge);
}
return true;
}
static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
{
ibv_recv_wr *bad_wr = NULL;
ibv_recv_wr wr = {
.wr_id = (uint64_t)(cl->peer_fd*2),
.sg_list = sge,
.num_sge = op_sge,
};
int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr);
if (err || bad_wr)
{
printf("RDMA receive failed: %s\n", strerror(err));
exit(1);
}
cl->rdma_conn->cur_recv++;
}
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
{
auto rc = cl->rdma_conn;
while (rc->cur_recv < rc->max_recv)
{
void *buf = malloc_or_die(rc->max_msg);
rc->recv_buffers.push_back(buf);
ibv_sge sge = {
.addr = (uintptr_t)buf,
.length = (uint32_t)rc->max_msg,
.lkey = rc->ctx->mr->lkey,
};
try_recv_rdma_wr(cl, &sge, 1);
}
return true;
}
#define RDMA_EVENTS_AT_ONCE 32
void osd_messenger_t::handle_rdma_events()
{
// Request next notification
ibv_cq *ev_cq;
void *ev_ctx;
// FIXME: This is inefficient as it calls read()...
if (ibv_get_cq_event(rdma_context->channel, &ev_cq, &ev_ctx) == 0)
{
ibv_ack_cq_events(rdma_context->cq, 1);
}
if (ibv_req_notify_cq(rdma_context->cq, 0) != 0)
{
printf("Failed to request RDMA completion notification, exiting\n");
exit(1);
}
ibv_wc wc[RDMA_EVENTS_AT_ONCE];
int event_count;
do
{
event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc);
for (int i = 0; i < event_count; i++)
{
int client_id = wc[i].wr_id >> 1;
bool is_send = wc[i].wr_id & 1;
auto cl_it = clients.find(client_id);
if (cl_it == clients.end())
{
continue;
}
osd_client_t *cl = cl_it->second;
if (wc[i].status != IBV_WC_SUCCESS)
{
printf("RDMA work request failed for client %d", client_id);
if (cl->osd_num)
{
printf(" (OSD %lu)", cl->osd_num);
}
printf(" with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status));
stop_client(client_id);
continue;
}
if (!is_send)
{
cl->rdma_conn->cur_recv--;
handle_read_buffer(cl, cl->rdma_conn->recv_buffers[0], wc[i].byte_len);
free(cl->rdma_conn->recv_buffers[0]);
cl->rdma_conn->recv_buffers.erase(cl->rdma_conn->recv_buffers.begin(), cl->rdma_conn->recv_buffers.begin()+1);
try_recv_rdma(cl);
}
else
{
cl->rdma_conn->cur_send--;
if (!cl->rdma_conn->cur_send)
{
// Wait for the whole batch
for (int i = 0; i < cl->rdma_conn->send_pos; i++)
{
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
{
// Reply fully sent
delete cl->outbox[i].op;
}
}
if (cl->rdma_conn->send_pos > 0)
{
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos);
cl->rdma_conn->send_pos = 0;
}
if (cl->rdma_conn->send_buf_pos > 0)
{
cl->send_list[0].iov_base += cl->rdma_conn->send_buf_pos;
cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos;
cl->rdma_conn->send_buf_pos = 0;
}
try_send_rdma(cl);
}
}
}
} while (event_count > 0);
for (auto cb: set_immediate)
{
cb();
}
set_immediate.clear();
}

58
src/msgr_rdma.h Normal file
View File

@ -0,0 +1,58 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#include <infiniband/verbs.h>
#include <string>
#include <vector>
struct msgr_rdma_address_t
{
ibv_gid gid;
uint16_t lid;
uint32_t qpn;
uint32_t psn;
std::string to_string();
static bool from_string(const char *str, msgr_rdma_address_t *dest);
};
struct msgr_rdma_context_t
{
ibv_context *context = NULL;
ibv_device *dev = NULL;
ibv_device_attr_ex attrx;
ibv_pd *pd = NULL;
ibv_mr *mr = NULL;
ibv_comp_channel *channel = NULL;
ibv_cq *cq = NULL;
ibv_port_attr portinfo;
uint8_t ib_port;
uint8_t gid_index;
uint16_t my_lid;
ibv_gid my_gid;
uint32_t mtu;
int max_cqe = 0;
int used_max_cqe = 0;
static msgr_rdma_context_t *create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu);
~msgr_rdma_context_t();
};
struct msgr_rdma_connection_t
{
msgr_rdma_context_t *ctx = NULL;
ibv_qp *qp = NULL;
msgr_rdma_address_t addr;
int max_send = 0, max_recv = 0, max_sge = 0;
int cur_send = 0, cur_recv = 0;
uint64_t max_msg = 0;
int send_pos = 0, send_buf_pos = 0;
int recv_pos = 0, recv_buf_pos = 0;
std::vector<void*> recv_buffers;
~msgr_rdma_connection_t();
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);
int connect(msgr_rdma_address_t *dest);
};

View File

@ -91,48 +91,9 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
{
if (cl->read_iov.iov_base == cl->in_buf)
{
// Compose operation(s) from the buffer
int remain = result;
void *curbuf = cl->in_buf;
while (remain > 0)
if (!handle_read_buffer(cl, cl->in_buf, result))
{
if (!cl->read_op)
{
cl->read_op = new osd_op_t;
cl->read_op->peer_fd = cl->peer_fd;
cl->read_op->op_type = OSD_OP_IN;
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR;
}
while (cl->recv_list.done < cl->recv_list.count && remain > 0)
{
iovec* cur = cl->recv_list.get_iovec();
if (cur->iov_len > remain)
{
memcpy(cur->iov_base, curbuf, remain);
cl->read_remaining -= remain;
cur->iov_len -= remain;
cur->iov_base += remain;
remain = 0;
}
else
{
memcpy(cur->iov_base, curbuf, cur->iov_len);
curbuf += cur->iov_len;
cl->read_remaining -= cur->iov_len;
remain -= cur->iov_len;
cur->iov_len = 0;
cl->recv_list.done++;
}
}
if (cl->recv_list.done >= cl->recv_list.count)
{
if (!handle_finished_read(cl))
{
goto fin;
}
}
goto fin;
}
}
else
@ -159,6 +120,52 @@ fin:
return ret;
}
bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain)
{
// Compose operation(s) from the buffer
while (remain > 0)
{
if (!cl->read_op)
{
cl->read_op = new osd_op_t;
cl->read_op->peer_fd = cl->peer_fd;
cl->read_op->op_type = OSD_OP_IN;
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR;
}
while (cl->recv_list.done < cl->recv_list.count && remain > 0)
{
iovec* cur = cl->recv_list.get_iovec();
if (cur->iov_len > remain)
{
memcpy(cur->iov_base, curbuf, remain);
cl->read_remaining -= remain;
cur->iov_len -= remain;
cur->iov_base += remain;
remain = 0;
}
else
{
memcpy(cur->iov_base, curbuf, cur->iov_len);
curbuf += cur->iov_len;
cl->read_remaining -= cur->iov_len;
remain -= cur->iov_len;
cur->iov_len = 0;
cl->recv_list.done++;
}
}
if (cl->recv_list.done >= cl->recv_list.count)
{
if (!handle_finished_read(cl))
{
return false;
}
}
}
return true;
}
bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
{
cl->recv_list.reset();
@ -254,6 +261,16 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
}
cl->read_remaining = cur_op->req.rw.len;
}
else if (cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
{
if (cur_op->req.show_conf.json_len > 0)
{
cur_op->buf = malloc_or_die(cur_op->req.show_conf.json_len+1);
((uint8_t*)cur_op->buf)[cur_op->req.show_conf.json_len] = 0;
cl->recv_list.push_back(cur_op->buf, cur_op->req.show_conf.json_len);
}
cl->read_remaining = cur_op->req.show_conf.json_len;
}
if (cl->read_remaining > 0)
{
// Read data
@ -338,11 +355,11 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
}
else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0)
{
assert(!op->iov.count);
delete cl->read_op;
cl->read_op = op;
cl->read_state = CL_READ_REPLY_DATA;
cl->read_remaining = op->reply.hdr.retval;
free(op->buf);
op->buf = malloc_or_die(op->reply.hdr.retval);
cl->recv_list.push_back(op->buf, op->reply.hdr.retval);
}

View File

@ -46,7 +46,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE });
cl->sent_ops[cur_op->req.hdr.id] = cur_op;
}
to_outbox.push_back(NULL);
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_HDR });
// Bitmap
if (cur_op->op_type == OSD_OP_IN &&
cur_op->req.hdr.opcode == OSD_OP_SEC_READ &&
@ -56,7 +56,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
.iov_base = cur_op->bitmap,
.iov_len = cur_op->reply.sec_rw.attr_len,
});
to_outbox.push_back(NULL);
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
}
else if (cur_op->op_type == OSD_OP_OUT &&
(cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) &&
@ -66,7 +66,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
.iov_base = cur_op->bitmap,
.iov_len = cur_op->req.sec_rw.attr_len,
});
to_outbox.push_back(NULL);
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
}
// Operation data
if ((cur_op->op_type == OSD_OP_IN
@ -78,13 +78,14 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)) && cur_op->iov.count > 0)
cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK ||
cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)) && cur_op->iov.count > 0)
{
for (int i = 0; i < cur_op->iov.count; i++)
{
assert(cur_op->iov.buf[i].iov_base);
to_send_list.push_back(cur_op->iov.buf[i]);
to_outbox.push_back(NULL);
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
}
}
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
@ -93,13 +94,19 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->reply.hdr.retval });
else if (cur_op->op_type == OSD_OP_OUT && cur_op->req.sec_read_bmp.len > 0)
to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len });
to_outbox.push_back(NULL);
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
}
if (cur_op->op_type == OSD_OP_IN)
{
// To free it later
to_outbox[to_outbox.size()-1] = cur_op;
to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_FREE;
}
#ifdef WITH_RDMA
if (cl->peer_state == PEER_RDMA)
{
try_send_rdma(cl);
return;
}
#endif
if (!ringloop)
{
// FIXME: It's worse because it doesn't allow batching
@ -232,10 +239,10 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
iovec & iov = cl->send_list[done];
if (iov.iov_len <= result)
{
if (cl->outbox[done])
if (cl->outbox[done].flags & MSGR_SENDP_FREE)
{
// Reply fully sent
delete cl->outbox[done];
delete cl->outbox[done].op;
}
result -= iov.iov_len;
done++;
@ -260,6 +267,21 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
cl->next_outbox.clear();
}
cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
#ifdef WITH_RDMA
if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING)
{
// FIXME: Do something better than just forgetting the FD
// FIXME: Ignore pings during RDMA state transition
if (log_level > 0)
{
printf("Successfully connected with client %d using RDMA\n", cl->peer_fd);
}
cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, NULL);
// Add the initial receive request
try_recv_rdma(cl);
}
#endif
}
if (cl->write_state != 0)
{

View File

@ -122,6 +122,12 @@ void osd_messenger_t::stop_client(int peer_fd, bool force)
// And close the FD only when everything is done
// ...because peer_fd number can get reused after close()
close(peer_fd);
#ifdef WITH_RDMA
if (cl->rdma_conn)
{
delete cl->rdma_conn;
}
#endif
#endif
// Find the item again because it can be invalidated at this point
it = clients.find(peer_fd);

View File

@ -26,7 +26,10 @@ const char *exe_name = NULL;
class nbd_proxy
{
protected:
std::string image_name;
uint64_t inode = 0;
uint64_t device_size = 0;
inode_watch_t *watch = NULL;
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
@ -111,9 +114,9 @@ public:
{
printf(
"Vitastor NBD proxy\n"
"(c) Vitaliy Filippov, 2020 (VNPL-1.1)\n\n"
"(c) Vitaliy Filippov, 2020-2021 (VNPL-1.1)\n\n"
"USAGE:\n"
" %s map --etcd_address <etcd_address> --pool <pool> --inode <inode> --size <size in bytes>\n"
" %s map [--etcd_address <etcd_address>] (--image <image> | --pool <pool> --inode <inode> --size <size in bytes>)\n"
" %s unmap /dev/nbd0\n"
" %s list [--json]\n",
exe_name, exe_name, exe_name
@ -143,26 +146,49 @@ public:
void start(json11::Json cfg)
{
// Check options
if (cfg["etcd_address"].string_value() == "")
if (cfg["image"].string_value() != "")
{
fprintf(stderr, "etcd_address is missing\n");
exit(1);
// Use image name
image_name = cfg["image"].string_value();
inode = 0;
}
if (!cfg["size"].uint64_value())
else
{
fprintf(stderr, "device size is missing\n");
exit(1);
// Use pool, inode number and size
if (!cfg["size"].uint64_value())
{
fprintf(stderr, "device size is missing\n");
exit(1);
}
device_size = cfg["size"].uint64_value();
inode = cfg["inode"].uint64_value();
uint64_t pool = cfg["pool"].uint64_value();
if (pool)
{
inode = (inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (pool << (64-POOL_ID_BITS));
}
if (!(inode >> (64-POOL_ID_BITS)))
{
fprintf(stderr, "pool is missing\n");
exit(1);
}
}
inode = cfg["inode"].uint64_value();
uint64_t pool = cfg["pool"].uint64_value();
if (pool)
// Create client
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
if (!inode)
{
inode = (inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (pool << (64-POOL_ID_BITS));
}
if (!(inode >> (64-POOL_ID_BITS)))
{
fprintf(stderr, "pool is missing\n");
exit(1);
// Load image metadata
while (!cli->is_ready())
{
ringloop->loop();
if (cli->is_ready())
break;
ringloop->wait();
}
watch = cli->st_cli.watch_inode(image_name);
device_size = watch->cfg.size;
}
// Initialize NBD
int sockfd[2];
@ -176,7 +202,7 @@ public:
load_module();
if (!cfg["dev_num"].is_null())
{
if (run_nbd(sockfd, cfg["dev_num"].int64_value(), cfg["size"].uint64_value(), NBD_FLAG_SEND_FLUSH, 30) < 0)
if (run_nbd(sockfd, cfg["dev_num"].int64_value(), device_size, NBD_FLAG_SEND_FLUSH, 30) < 0)
{
perror("run_nbd");
exit(1);
@ -188,7 +214,7 @@ public:
int i = 0;
while (true)
{
int r = run_nbd(sockfd, i, cfg["size"].uint64_value(), NBD_FLAG_SEND_FLUSH, 30);
int r = run_nbd(sockfd, i, device_size, NBD_FLAG_SEND_FLUSH, 30);
if (r == 0)
{
printf("/dev/nbd%d\n", i);
@ -215,10 +241,6 @@ public:
{
daemonize();
}
// Create client
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
// Initialize read state
read_state = CL_READ_HDR;
recv_buf = malloc_or_die(receive_buffer_size);
@ -242,6 +264,7 @@ public:
ringloop->loop();
ringloop->wait();
}
// FIXME: Cleanup when exiting
}
void load_module()
@ -610,7 +633,7 @@ protected:
if (req_type == NBD_CMD_READ || req_type == NBD_CMD_WRITE)
{
op->opcode = req_type == NBD_CMD_READ ? OSD_OP_READ : OSD_OP_WRITE;
op->inode = inode;
op->inode = inode ? inode : watch->cfg.num;
op->offset = be64toh(cur_req.from);
op->len = be32toh(cur_req.len);
buf = malloc_or_die(sizeof(nbd_reply) + op->len);
@ -657,7 +680,15 @@ protected:
}
else
{
cli->execute(cur_op);
if (cur_op->opcode == OSD_OP_WRITE && watch->cfg.readonly)
{
cur_op->retval = -EROFS;
std::function<void(cluster_op_t*)>(cur_op->callback)(cur_op);
}
else
{
cli->execute(cur_op);
}
cur_op = NULL;
cur_buf = &cur_req;
cur_left = sizeof(nbd_request);

View File

@ -10,31 +10,39 @@
#include "osd.h"
#include "http_client.h"
osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop)
static blockstore_config_t json_to_bs(const json11::Json::object & config)
{
bs_block_size = strtoull(config["block_size"].c_str(), NULL, 10);
bs_bitmap_granularity = strtoull(config["bitmap_granularity"].c_str(), NULL, 10);
if (!bs_block_size)
bs_block_size = DEFAULT_BLOCK_SIZE;
if (!bs_bitmap_granularity)
bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
clean_entry_bitmap_size = bs_block_size / bs_bitmap_granularity / 8;
blockstore_config_t bs;
for (auto kv: config)
{
if (kv.second.is_string())
bs[kv.first] = kv.second.string_value();
else
bs[kv.first] = kv.second.dump();
}
return bs;
}
osd_t::osd_t(const json11::Json & config, ring_loop_t *ringloop)
{
zero_buffer_size = 1<<20;
zero_buffer = malloc_or_die(zero_buffer_size);
memset(zero_buffer, 0, zero_buffer_size);
this->config = config;
this->ringloop = ringloop;
this->config = msgr.read_config(config).object_items();
if (this->config.find("log_level") == this->config.end())
this->config["log_level"] = 1;
parse_config(this->config);
epmgr = new epoll_manager_t(ringloop);
// FIXME: Use timerfd_interval based directly on io_uring
this->tfd = epmgr->tfd;
// FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config
this->bs = new blockstore_t(config, ringloop, tfd);
parse_config(config);
auto bs_cfg = json_to_bs(this->config);
this->bs = new blockstore_t(bs_cfg, ringloop, tfd);
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{
@ -45,11 +53,11 @@ osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop)
print_slow();
});
c_cli.tfd = this->tfd;
c_cli.ringloop = this->ringloop;
c_cli.exec_op = [this](osd_op_t *op) { exec_op(op); };
c_cli.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); };
c_cli.init();
msgr.tfd = this->tfd;
msgr.ringloop = this->ringloop;
msgr.exec_op = [this](osd_op_t *op) { exec_op(op); };
msgr.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); };
msgr.init();
init_cluster();
@ -66,62 +74,71 @@ osd_t::~osd_t()
free(zero_buffer);
}
void osd_t::parse_config(blockstore_config_t & config)
void osd_t::parse_config(const json11::Json & config)
{
if (config.find("log_level") == config.end())
config["log_level"] = "1";
log_level = strtoull(config["log_level"].c_str(), NULL, 10);
// Initial startup configuration
json11::Json json_config = json11::Json(config);
st_cli.parse_config(json_config);
etcd_report_interval = strtoull(config["etcd_report_interval"].c_str(), NULL, 10);
if (etcd_report_interval <= 0)
etcd_report_interval = 30;
osd_num = strtoull(config["osd_num"].c_str(), NULL, 10);
st_cli.parse_config(config);
msgr.parse_config(config);
// OSD number
osd_num = config["osd_num"].uint64_value();
if (!osd_num)
throw std::runtime_error("osd_num is required in the configuration");
c_cli.osd_num = osd_num;
msgr.osd_num = osd_num;
// Vital Blockstore parameters
bs_block_size = config["block_size"].uint64_value();
if (!bs_block_size)
bs_block_size = DEFAULT_BLOCK_SIZE;
bs_bitmap_granularity = config["bitmap_granularity"].uint64_value();
if (!bs_bitmap_granularity)
bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
clean_entry_bitmap_size = bs_block_size / bs_bitmap_granularity / 8;
// Bind address
bind_address = config["bind_address"].string_value();
if (bind_address == "")
bind_address = "0.0.0.0";
bind_port = config["bind_port"].uint64_value();
if (bind_port <= 0 || bind_port > 65535)
bind_port = 0;
// OSD configuration
log_level = config["log_level"].uint64_value();
etcd_report_interval = config["etcd_report_interval"].uint64_value();
if (etcd_report_interval <= 0)
etcd_report_interval = 30;
readonly = config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes";
run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no";
no_rebalance = config["no_rebalance"] == "true" || config["no_rebalance"] == "1" || config["no_rebalance"] == "yes";
no_recovery = config["no_recovery"] == "true" || config["no_recovery"] == "1" || config["no_recovery"] == "yes";
// Cluster configuration
bind_address = config["bind_address"];
if (bind_address == "")
bind_address = "0.0.0.0";
bind_port = stoull_full(config["bind_port"]);
if (bind_port <= 0 || bind_port > 65535)
bind_port = 0;
allow_test_ops = config["allow_test_ops"] == "true" || config["allow_test_ops"] == "1" || config["allow_test_ops"] == "yes";
if (config["immediate_commit"] == "all")
immediate_commit = IMMEDIATE_ALL;
else if (config["immediate_commit"] == "small")
immediate_commit = IMMEDIATE_SMALL;
if (config.find("autosync_interval") != config.end())
else
immediate_commit = IMMEDIATE_NONE;
if (!config["autosync_interval"].is_null())
{
autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10);
// Allow to set it to 0
autosync_interval = config["autosync_interval"].uint64_value();
if (autosync_interval > MAX_AUTOSYNC_INTERVAL)
autosync_interval = DEFAULT_AUTOSYNC_INTERVAL;
}
if (config.find("client_queue_depth") != config.end())
if (!config["client_queue_depth"].is_null())
{
client_queue_depth = strtoull(config["client_queue_depth"].c_str(), NULL, 10);
client_queue_depth = config["client_queue_depth"].uint64_value();
if (client_queue_depth < 128)
client_queue_depth = 128;
}
recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10);
recovery_queue_depth = config["recovery_queue_depth"].uint64_value();
if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
recovery_sync_batch = strtoull(config["recovery_sync_batch"].c_str(), NULL, 10);
recovery_sync_batch = config["recovery_sync_batch"].uint64_value();
if (recovery_sync_batch < 1 || recovery_sync_batch > MAX_RECOVERY_QUEUE)
recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes")
readonly = true;
print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10);
print_stats_interval = config["print_stats_interval"].uint64_value();
if (!print_stats_interval)
print_stats_interval = 3;
slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10);
slow_log_interval = config["slow_log_interval"].uint64_value();
if (!slow_log_interval)
slow_log_interval = 10;
c_cli.parse_config(json_config);
}
void osd_t::bind_socket()
@ -174,7 +191,7 @@ void osd_t::bind_socket()
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)
{
c_cli.accept_connections(listen_fd);
msgr.accept_connections(listen_fd);
});
}
@ -191,8 +208,8 @@ bool osd_t::shutdown()
void osd_t::loop()
{
handle_peers();
c_cli.read_requests();
c_cli.send_replies();
msgr.read_requests();
msgr.send_replies();
ringloop->submit();
}
@ -276,7 +293,7 @@ void osd_t::exec_op(osd_op_t *cur_op)
void osd_t::reset_stats()
{
c_cli.stats = { 0 };
msgr.stats = { 0 };
prev_stats = { 0 };
memset(recovery_stat_count, 0, sizeof(recovery_stat_count));
memset(recovery_stat_bytes, 0, sizeof(recovery_stat_bytes));
@ -286,11 +303,11 @@ void osd_t::print_stats()
{
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i] && i != OSD_OP_PING)
if (msgr.stats.op_stat_count[i] != prev_stats.op_stat_count[i] && i != OSD_OP_PING)
{
uint64_t avg = (c_cli.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(c_cli.stats.op_stat_count[i] - prev_stats.op_stat_count[i]);
uint64_t bw = (c_cli.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval;
if (c_cli.stats.op_stat_bytes[i] != 0)
uint64_t avg = (msgr.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(msgr.stats.op_stat_count[i] - prev_stats.op_stat_count[i]);
uint64_t bw = (msgr.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval;
if (msgr.stats.op_stat_bytes[i] != 0)
{
printf(
"[OSD %lu] avg latency for op %d (%s): %lu us, B/W: %.2f %s\n", osd_num, i, osd_op_names[i], avg,
@ -302,19 +319,19 @@ void osd_t::print_stats()
{
printf("[OSD %lu] avg latency for op %d (%s): %lu us\n", osd_num, i, osd_op_names[i], avg);
}
prev_stats.op_stat_count[i] = c_cli.stats.op_stat_count[i];
prev_stats.op_stat_sum[i] = c_cli.stats.op_stat_sum[i];
prev_stats.op_stat_bytes[i] = c_cli.stats.op_stat_bytes[i];
prev_stats.op_stat_count[i] = msgr.stats.op_stat_count[i];
prev_stats.op_stat_sum[i] = msgr.stats.op_stat_sum[i];
prev_stats.op_stat_bytes[i] = msgr.stats.op_stat_bytes[i];
}
}
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
if (c_cli.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i])
if (msgr.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i])
{
uint64_t avg = (c_cli.stats.subop_stat_sum[i] - prev_stats.subop_stat_sum[i])/(c_cli.stats.subop_stat_count[i] - prev_stats.subop_stat_count[i]);
uint64_t avg = (msgr.stats.subop_stat_sum[i] - prev_stats.subop_stat_sum[i])/(msgr.stats.subop_stat_count[i] - prev_stats.subop_stat_count[i]);
printf("[OSD %lu] avg latency for subop %d (%s): %ld us\n", osd_num, i, osd_op_names[i], avg);
prev_stats.subop_stat_count[i] = c_cli.stats.subop_stat_count[i];
prev_stats.subop_stat_sum[i] = c_cli.stats.subop_stat_sum[i];
prev_stats.subop_stat_count[i] = msgr.stats.subop_stat_count[i];
prev_stats.subop_stat_sum[i] = msgr.stats.subop_stat_sum[i];
}
}
for (int i = 0; i < 2; i++)
@ -351,7 +368,7 @@ void osd_t::print_slow()
char alloc[1024];
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
for (auto & kv: c_cli.clients)
for (auto & kv: msgr.clients)
{
for (auto op: kv.second->received_ops)
{

View File

@ -92,7 +92,7 @@ class osd_t
{
// config
blockstore_config_t config;
json11::Json::object config;
int etcd_report_interval = 30;
bool readonly = false;
@ -104,7 +104,7 @@ class osd_t
int bind_port, listen_backlog;
// FIXME: Implement client queue depth limit
int client_queue_depth = 128;
bool allow_test_ops = true;
bool allow_test_ops = false;
int print_stats_interval = 3;
int slow_log_interval = 10;
int immediate_commit = IMMEDIATE_NONE;
@ -116,7 +116,7 @@ class osd_t
// cluster state
etcd_state_client_t st_cli;
osd_messenger_t c_cli;
osd_messenger_t msgr;
int etcd_failed_attempts = 0;
std::string etcd_lease_id;
json11::Json self_state;
@ -167,7 +167,7 @@ class osd_t
uint64_t recovery_stat_bytes[2][2] = { 0 };
// cluster connection
void parse_config(blockstore_config_t & config);
void parse_config(const json11::Json & config);
void init_cluster();
void on_change_osd_state_hook(osd_num_t peer_osd);
void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num);
@ -268,7 +268,7 @@ class osd_t
}
public:
osd_t(blockstore_config_t & config, ring_loop_t *ringloop);
osd_t(const json11::Json & config, ring_loop_t *ringloop);
~osd_t();
void force_stop(int exitcode);
bool shutdown();

View File

@ -21,7 +21,7 @@ void osd_t::init_cluster()
{
// Test version of clustering code with 1 pool, 1 PG and 2 peers
// Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205
std::string peerstr = config["peers"];
std::string peerstr = config["peers"].string_value();
while (peerstr.size())
{
int pos = peerstr.find(',');
@ -104,7 +104,7 @@ void osd_t::parse_test_peer(std::string peer)
{ "addresses", json11::Json::array { addr } },
{ "port", port },
};
c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]);
msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]);
}
json11::Json osd_t::get_osd_state()
@ -146,16 +146,16 @@ json11::Json osd_t::get_statistics()
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
op_stats[osd_op_names[i]] = json11::Json::object {
{ "count", c_cli.stats.op_stat_count[i] },
{ "usec", c_cli.stats.op_stat_sum[i] },
{ "bytes", c_cli.stats.op_stat_bytes[i] },
{ "count", msgr.stats.op_stat_count[i] },
{ "usec", msgr.stats.op_stat_sum[i] },
{ "bytes", msgr.stats.op_stat_bytes[i] },
};
}
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
subop_stats[osd_op_names[i]] = json11::Json::object {
{ "count", c_cli.stats.subop_stat_count[i] },
{ "usec", c_cli.stats.subop_stat_sum[i] },
{ "count", msgr.stats.subop_stat_count[i] },
{ "usec", msgr.stats.subop_stat_sum[i] },
};
}
st["op_stats"] = op_stats;
@ -298,9 +298,9 @@ void osd_t::report_statistics()
void osd_t::on_change_osd_state_hook(osd_num_t peer_osd)
{
if (c_cli.wanted_peers.find(peer_osd) != c_cli.wanted_peers.end())
if (msgr.wanted_peers.find(peer_osd) != msgr.wanted_peers.end())
{
c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]);
msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]);
}
}
@ -340,21 +340,10 @@ void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
void osd_t::on_load_config_hook(json11::Json::object & global_config)
{
blockstore_config_t osd_config = this->config;
for (auto & cfg_var: global_config)
{
if (this->config.find(cfg_var.first) == this->config.end())
{
if (cfg_var.second.is_string())
{
osd_config[cfg_var.first] = cfg_var.second.string_value();
}
else
{
osd_config[cfg_var.first] = cfg_var.second.dump();
}
}
}
json11::Json::object osd_config = this->config;
for (auto & kv: global_config)
if (osd_config.find(kv.first) == osd_config.end())
osd_config[kv.first] = kv.second;
parse_config(osd_config);
bind_socket();
acquire_lease();
@ -380,7 +369,7 @@ void osd_t::acquire_lease()
etcd_lease_id = data["ID"].string_value();
create_osd_state();
});
printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, config["etcd_address"].c_str(), etcd_report_interval);
printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, config["etcd_address"].string_value().c_str(), etcd_report_interval);
tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id)
{
renew_lease();
@ -695,9 +684,9 @@ void osd_t::apply_pg_config()
// Add peers
for (auto pg_osd: all_peers)
{
if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end())
if (pg_osd != this->osd_num && msgr.osd_peer_fds.find(pg_osd) == msgr.osd_peer_fds.end())
{
c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
}
}
start_pg_peering(pg);

View File

@ -82,10 +82,10 @@ void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, p
else
{
printf("Error while doing flush on OSD %lu: %d (%s)\n", osd_num, retval, strerror(-retval));
auto fd_it = c_cli.osd_peer_fds.find(peer_osd);
if (fd_it != c_cli.osd_peer_fds.end())
auto fd_it = msgr.osd_peer_fds.find(peer_osd);
if (fd_it != msgr.osd_peer_fds.end())
{
c_cli.stop_client(fd_it->second);
msgr.stop_client(fd_it->second);
}
return;
}
@ -188,7 +188,7 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
else
{
// Peer
int peer_fd = c_cli.osd_peer_fds[peer_osd];
int peer_fd = msgr.osd_peer_fds[peer_osd];
op->op_type = OSD_OP_OUT;
op->iov.push_back(op->buf, count * sizeof(obj_ver_id));
op->peer_fd = peer_fd;
@ -196,7 +196,7 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
.sec_stab = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.id = msgr.next_subop_id++,
.opcode = (uint64_t)(rollback ? OSD_OP_SEC_ROLLBACK : OSD_OP_SEC_STABILIZE),
},
.len = count * sizeof(obj_ver_id),
@ -207,7 +207,7 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval);
delete op;
};
c_cli.outbox_push(op);
msgr.outbox_push(op);
}
}

View File

@ -29,13 +29,13 @@ int main(int narg, char *args[])
perror("BUG: too small packet size");
return 1;
}
blockstore_config_t config;
json11::Json::object config;
for (int i = 1; i < narg; i++)
{
if (args[i][0] == '-' && args[i][1] == '-' && i < narg-1)
{
char *opt = args[i]+2;
config[opt] = args[++i];
config[std::string(opt)] = std::string(args[++i]);
}
}
signal(SIGINT, handle_sigint);

View File

@ -148,6 +148,8 @@ struct __attribute__((__packed__)) osd_reply_sec_read_bmp_t
struct __attribute__((__packed__)) osd_op_show_config_t
{
osd_op_header_t header;
// JSON request length
uint64_t json_len;
};
struct __attribute__((__packed__)) osd_reply_show_config_t

View File

@ -156,7 +156,7 @@ void osd_t::start_pg_peering(pg_t & pg)
if (immediate_commit != IMMEDIATE_ALL)
{
std::vector<int> to_stop;
for (auto & cp: c_cli.clients)
for (auto & cp: msgr.clients)
{
if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end())
{
@ -165,7 +165,7 @@ void osd_t::start_pg_peering(pg_t & pg)
}
for (auto peer_fd: to_stop)
{
c_cli.stop_client(peer_fd);
msgr.stop_client(peer_fd);
}
}
// Calculate current write OSD set
@ -175,7 +175,7 @@ void osd_t::start_pg_peering(pg_t & pg)
for (int role = 0; role < pg.target_set.size(); role++)
{
pg.cur_set[role] = pg.target_set[role] == this->osd_num ||
c_cli.osd_peer_fds.find(pg.target_set[role]) != c_cli.osd_peer_fds.end() ? pg.target_set[role] : 0;
msgr.osd_peer_fds.find(pg.target_set[role]) != msgr.osd_peer_fds.end() ? pg.target_set[role] : 0;
if (pg.cur_set[role] != 0)
{
pg.pg_cursize++;
@ -199,7 +199,7 @@ void osd_t::start_pg_peering(pg_t & pg)
{
found = false;
if (history_osd == this->osd_num ||
c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end())
msgr.osd_peer_fds.find(history_osd) != msgr.osd_peer_fds.end())
{
found = true;
break;
@ -223,13 +223,13 @@ void osd_t::start_pg_peering(pg_t & pg)
std::set<osd_num_t> cur_peers;
for (auto pg_osd: pg.all_peers)
{
if (pg_osd == this->osd_num || c_cli.osd_peer_fds.find(pg_osd) != c_cli.osd_peer_fds.end())
if (pg_osd == this->osd_num || msgr.osd_peer_fds.find(pg_osd) != msgr.osd_peer_fds.end())
{
cur_peers.insert(pg_osd);
}
else if (c_cli.wanted_peers.find(pg_osd) == c_cli.wanted_peers.end())
else if (msgr.wanted_peers.find(pg_osd) == msgr.wanted_peers.end())
{
c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
}
}
pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end());
@ -325,7 +325,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
else
{
// Peer
auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]);
auto & cl = msgr.clients.at(msgr.osd_peer_fds[role_osd]);
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->peer_fd = cl->peer_fd;
@ -333,7 +333,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
.sec_sync = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SEC_SYNC,
},
},
@ -347,14 +347,14 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
int fail_fd = op->peer_fd;
ps->list_ops.erase(role_osd);
delete op;
c_cli.stop_client(fail_fd);
msgr.stop_client(fail_fd);
return;
}
delete op;
ps->list_ops.erase(role_osd);
submit_list_subop(role_osd, ps);
};
c_cli.outbox_push(op);
msgr.outbox_push(op);
ps->list_ops[role_osd] = op;
}
}
@ -404,12 +404,12 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
// Peer
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->peer_fd = c_cli.osd_peer_fds[role_osd];
op->peer_fd = msgr.osd_peer_fds[role_osd];
op->req = (osd_any_op_t){
.sec_list = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SEC_LIST,
},
.list_pg = ps->pg_num,
@ -427,7 +427,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
int fail_fd = op->peer_fd;
ps->list_ops.erase(role_osd);
delete op;
c_cli.stop_client(fail_fd);
msgr.stop_client(fail_fd);
return;
}
printf(
@ -444,7 +444,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
ps->list_ops.erase(role_osd);
delete op;
};
c_cli.outbox_push(op);
msgr.outbox_push(op);
ps->list_ops[role_osd] = op;
}
}

View File

@ -236,14 +236,14 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
// Send to a remote OSD
osd_op_t *subop = op_data->subops+subop_idx;
subop->op_type = OSD_OP_OUT;
subop->peer_fd = c_cli.osd_peer_fds.at(subop_osd_num);
subop->peer_fd = msgr.osd_peer_fds.at(subop_osd_num);
// FIXME: Use the pre-allocated buffer
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
subop->req = (osd_any_op_t){
.sec_read_bmp = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SEC_READ_BMP,
},
.len = sizeof(obj_ver_id)*(i+1-prev),
@ -273,7 +273,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
}
handle_primary_subop(subop, cur_op);
};
c_cli.outbox_push(subop);
msgr.outbox_push(subop);
subop_idx++;
}
prev = i+1;

View File

@ -87,14 +87,14 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
else
{
// FIXME add separate magic number for primary ops
auto cl_it = c_cli.clients.find(cur_op->peer_fd);
if (cl_it != c_cli.clients.end())
auto cl_it = msgr.clients.find(cur_op->peer_fd);
if (cl_it != msgr.clients.end())
{
cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
cur_op->reply.hdr.id = cur_op->req.hdr.id;
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
cur_op->reply.hdr.retval = retval;
c_cli.outbox_push(cur_op);
msgr.outbox_push(cur_op);
}
else
{
@ -184,13 +184,13 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
else
{
subop->op_type = OSD_OP_OUT;
subop->peer_fd = c_cli.osd_peer_fds.at(role_osd_num);
subop->peer_fd = msgr.osd_peer_fds.at(role_osd_num);
subop->bitmap = stripes[stripe_num].bmp_buf;
subop->bitmap_len = clean_entry_bitmap_size;
subop->req.sec_rw = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.id = msgr.next_subop_id++,
.opcode = (uint64_t)(wr ? (rep ? OSD_OP_SEC_WRITE_STABLE : OSD_OP_SEC_WRITE) : OSD_OP_SEC_READ),
},
.oid = {
@ -227,7 +227,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
{
handle_primary_subop(subop, cur_op);
};
c_cli.outbox_push(subop);
msgr.outbox_push(subop);
}
i++;
}
@ -282,20 +282,20 @@ void osd_t::add_bs_subop_stats(osd_op_t *subop)
uint64_t opcode = bs_op_to_osd_op[subop->bs_op->opcode];
timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end);
c_cli.stats.op_stat_count[opcode]++;
if (!c_cli.stats.op_stat_count[opcode])
msgr.stats.op_stat_count[opcode]++;
if (!msgr.stats.op_stat_count[opcode])
{
c_cli.stats.op_stat_count[opcode] = 1;
c_cli.stats.op_stat_sum[opcode] = 0;
c_cli.stats.op_stat_bytes[opcode] = 0;
msgr.stats.op_stat_count[opcode] = 1;
msgr.stats.op_stat_sum[opcode] = 0;
msgr.stats.op_stat_bytes[opcode] = 0;
}
c_cli.stats.op_stat_sum[opcode] += (
msgr.stats.op_stat_sum[opcode] += (
(tv_end.tv_sec - subop->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - subop->tv_begin.tv_nsec)/1000
);
if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE)
{
c_cli.stats.op_stat_bytes[opcode] += subop->bs_op->len;
msgr.stats.op_stat_bytes[opcode] += subop->bs_op->len;
}
}
@ -322,7 +322,7 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
if (subop->peer_fd >= 0)
{
// Drop connection on any error
c_cli.stop_client(subop->peer_fd);
msgr.stop_client(subop->peer_fd);
}
}
else
@ -332,8 +332,8 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
{
uint64_t version = subop->reply.sec_rw.version;
#ifdef OSD_DEBUG
uint64_t peer_osd = c_cli.clients.find(subop->peer_fd) != c_cli.clients.end()
? c_cli.clients[subop->peer_fd]->osd_num : osd_num;
uint64_t peer_osd = msgr.clients.find(subop->peer_fd) != msgr.clients.end()
? msgr.clients[subop->peer_fd]->osd_num : osd_num;
printf("subop %lu from osd %lu: version = %lu\n", opcode, peer_osd, version);
#endif
if (op_data->fact_ver != UINT64_MAX)
@ -465,11 +465,11 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
else
{
subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num);
subops[i].peer_fd = msgr.osd_peer_fds.at(chunk.osd_num);
subops[i].req = (osd_any_op_t){ .sec_del = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SEC_DELETE,
},
.oid = chunk.oid,
@ -479,7 +479,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
{
handle_primary_subop(subop, cur_op);
};
c_cli.outbox_push(&subops[i]);
msgr.outbox_push(&subops[i]);
}
}
}
@ -509,14 +509,14 @@ int osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
});
bs->enqueue_op(subops[i].bs_op);
}
else if ((peer_it = c_cli.osd_peer_fds.find(sync_osd)) != c_cli.osd_peer_fds.end())
else if ((peer_it = msgr.osd_peer_fds.find(sync_osd)) != msgr.osd_peer_fds.end())
{
subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = peer_it->second;
subops[i].req = (osd_any_op_t){ .sec_sync = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SEC_SYNC,
},
} };
@ -524,7 +524,7 @@ int osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
{
handle_primary_subop(subop, cur_op);
};
c_cli.outbox_push(&subops[i]);
msgr.outbox_push(&subops[i]);
}
else
{
@ -569,11 +569,11 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
else
{
subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = c_cli.osd_peer_fds.at(stab_osd.osd_num);
subops[i].peer_fd = msgr.osd_peer_fds.at(stab_osd.osd_num);
subops[i].req = (osd_any_op_t){ .sec_stab = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SEC_STABILIZE,
},
.len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)),
@ -583,7 +583,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
{
handle_primary_subop(subop, cur_op);
};
c_cli.outbox_push(&subops[i]);
msgr.outbox_push(&subops[i]);
}
}
}

View File

@ -247,8 +247,8 @@ resume_8:
finish:
if (cur_op->peer_fd)
{
auto it = c_cli.clients.find(cur_op->peer_fd);
if (it != c_cli.clients.end())
auto it = msgr.clients.find(cur_op->peer_fd);
if (it != msgr.clients.end())
it->second->dirty_pgs.clear();
}
finish_op(cur_op, 0);

View File

@ -370,8 +370,8 @@ lazy:
}
// Remember PG as dirty to drop the connection when PG goes offline
// (this is required because of the "lazy sync")
auto cl_it = c_cli.clients.find(cur_op->peer_fd);
if (cl_it != c_cli.clients.end())
auto cl_it = msgr.clients.find(cur_op->peer_fd);
if (cl_it != msgr.clients.end())
{
cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
}

View File

@ -144,10 +144,44 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
void osd_t::exec_show_config(osd_op_t *cur_op)
{
// FIXME: Send the real config, not its source
auto cfg_copy = config;
cfg_copy["protocol_version"] = std::to_string(OSD_PROTOCOL_VERSION);
std::string cfg_str = json11::Json(cfg_copy).dump();
std::string json_err;
json11::Json req_json = cur_op->req.show_conf.json_len > 0
? json11::Json::parse(std::string((char *)cur_op->buf), json_err)
: json11::Json();
// Expose sensitive configuration values so peers can check them
json11::Json::object wire_config = json11::Json::object {
{ "osd_num", osd_num },
{ "protocol_version", OSD_PROTOCOL_VERSION },
{ "block_size", (uint64_t)bs_block_size },
{ "bitmap_granularity", (uint64_t)bs_bitmap_granularity },
{ "primary_enabled", run_primary },
{ "blockstore_enabled", bs ? true : false },
{ "readonly", readonly },
{ "immediate_commit", (immediate_commit == IMMEDIATE_ALL ? "all" :
(immediate_commit == IMMEDIATE_SMALL ? "small" : "none")) },
{ "lease_timeout", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 },
};
#ifdef WITH_RDMA
if (msgr.is_rdma_enabled())
{
// Indicate that RDMA is enabled
wire_config["rdma_enabled"] = true;
if (req_json["connect_rdma"].is_string())
{
// Peer is trying to connect using RDMA, try to satisfy him
bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value(), req_json["rdma_max_msg"].uint64_value());
if (ok)
{
auto rc = msgr.clients.at(cur_op->peer_fd)->rdma_conn;
wire_config["rdma_address"] = rc->addr.to_string();
wire_config["rdma_max_msg"] = rc->max_msg;
}
}
}
#endif
if (cur_op->buf)
free(cur_op->buf);
std::string cfg_str = json11::Json(wire_config).dump();
cur_op->buf = malloc_or_die(cfg_str.size()+1);
memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1);
cur_op->iov.push_back(cur_op->buf, cfg_str.size()+1);

View File

@ -40,6 +40,7 @@ typedef struct VitastorClient
{
void *proxy;
void *watch;
char *config_path;
char *etcd_host;
char *etcd_prefix;
char *image;
@ -47,6 +48,10 @@ typedef struct VitastorClient
uint64_t pool;
uint64_t size;
long readonly;
char *rdma_device;
int rdma_port_num;
int rdma_gid_index;
int rdma_mtu;
QemuMutex mutex;
} VitastorClient;
@ -95,7 +100,8 @@ static void qemu_rbd_unescape(char *src)
}
// vitastor[:key=value]*
// vitastor:etcd_host=127.0.0.1:inode=1:pool=1
// vitastor[:etcd_host=127.0.0.1]:inode=1:pool=1[:rdma_gid_index=3]
// vitastor:config_path=/etc/vitastor/vitastor.conf:image=testimg
static void vitastor_parse_filename(const char *filename, QDict *options, Error **errp)
{
const char *start;
@ -123,7 +129,12 @@ static void vitastor_parse_filename(const char *filename, QDict *options, Error
qemu_rbd_unescape(name);
value = qemu_rbd_next_tok(p, ':', &p);
qemu_rbd_unescape(value);
if (!strcmp(name, "inode") || !strcmp(name, "pool") || !strcmp(name, "size"))
if (!strcmp(name, "inode") ||
!strcmp(name, "pool") ||
!strcmp(name, "size") ||
!strcmp(name, "rdma_port_num") ||
!strcmp(name, "rdma_gid_index") ||
!strcmp(name, "rdma_mtu"))
{
unsigned long long num_val;
if (parse_uint_full(value, &num_val, 0))
@ -157,11 +168,6 @@ static void vitastor_parse_filename(const char *filename, QDict *options, Error
goto out;
}
}
if (!qdict_get_str(options, "etcd_host"))
{
error_setg(errp, "etcd_host is missing");
goto out;
}
out:
g_free(buf);
@ -189,9 +195,17 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
VitastorClient *client = bs->opaque;
int64_t ret = 0;
qemu_mutex_init(&client->mutex);
client->config_path = g_strdup(qdict_get_try_str(options, "config_path"));
client->etcd_host = g_strdup(qdict_get_try_str(options, "etcd_host"));
client->etcd_prefix = g_strdup(qdict_get_try_str(options, "etcd_prefix"));
client->proxy = vitastor_proxy_create(bdrv_get_aio_context(bs), client->etcd_host, client->etcd_prefix);
client->rdma_device = g_strdup(qdict_get_try_str(options, "rdma_device"));
client->rdma_port_num = qdict_get_try_int(options, "rdma_port_num", 0);
client->rdma_gid_index = qdict_get_try_int(options, "rdma_gid_index", 0);
client->rdma_mtu = qdict_get_try_int(options, "rdma_mtu", 0);
client->proxy = vitastor_proxy_create(
bdrv_get_aio_context(bs), client->config_path, client->etcd_host, client->etcd_prefix,
client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu
);
client->image = g_strdup(qdict_get_try_str(options, "image"));
client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0;
if (client->image)
@ -241,6 +255,11 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
}
bs->total_sectors = client->size / BDRV_SECTOR_SIZE;
//client->aio_context = bdrv_get_aio_context(bs);
qdict_del(options, "rdma_mtu");
qdict_del(options, "rdma_gid_index");
qdict_del(options, "rdma_port_num");
qdict_del(options, "rdma_device");
qdict_del(options, "config_path");
qdict_del(options, "etcd_host");
qdict_del(options, "etcd_prefix");
qdict_del(options, "image");
@ -255,7 +274,10 @@ static void vitastor_close(BlockDriverState *bs)
VitastorClient *client = bs->opaque;
vitastor_proxy_destroy(client->proxy);
qemu_mutex_destroy(&client->mutex);
g_free(client->etcd_host);
if (client->config_path)
g_free(client->config_path);
if (client->etcd_host)
g_free(client->etcd_host);
if (client->etcd_prefix)
g_free(client->etcd_prefix);
if (client->image)
@ -478,6 +500,7 @@ static QEMUOptionParameter vitastor_create_opts[] = {
static const char *vitastor_strong_runtime_opts[] = {
"inode",
"pool",
"config_path",
"etcd_host",
"etcd_prefix",

View File

@ -34,15 +34,28 @@ public:
cluster_client_t *cli;
AioContext *ctx;
QemuProxy(AioContext *ctx, const char *etcd_host, const char *etcd_prefix)
QemuProxy(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu)
{
this->ctx = ctx;
json11::Json cfg = json11::Json::object {
{ "etcd_address", std::string(etcd_host) },
{ "etcd_prefix", std::string(etcd_prefix ? etcd_prefix : "/vitastor") },
};
json11::Json::object cfg;
if (config_path)
cfg["config_path"] = std::string(config_path);
if (etcd_host)
cfg["etcd_address"] = std::string(etcd_host);
if (etcd_prefix)
cfg["etcd_prefix"] = std::string(etcd_prefix);
if (rdma_device)
cfg["rdma_device"] = std::string(rdma_device);
if (rdma_port_num)
cfg["rdma_port_num"] = rdma_port_num;
if (rdma_gid_index)
cfg["rdma_gid_index"] = rdma_gid_index;
if (rdma_mtu)
cfg["rdma_mtu"] = rdma_mtu;
json11::Json cfg_json(cfg);
tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> callback) { set_fd_handler(fd, wr, callback); });
cli = new cluster_client_t(NULL, tfd, cfg);
cli = new cluster_client_t(NULL, tfd, cfg_json);
}
~QemuProxy()
@ -80,9 +93,10 @@ public:
extern "C" {
void* vitastor_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix)
void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu)
{
QemuProxy *p = new QemuProxy(ctx, etcd_host, etcd_prefix);
QemuProxy *p = new QemuProxy(ctx, config_path, etcd_host, etcd_prefix, rdma_device, rdma_port_num, rdma_gid_index, rdma_mtu);
return p;
}

View File

@ -16,7 +16,8 @@ extern "C" {
// Our exports
typedef void VitastorIOHandler(long retval, void *opaque);
void* vitastor_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix);
void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu);
void vitastor_proxy_destroy(void *client);
void vitastor_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque);

View File

@ -87,7 +87,7 @@ public:
"Vitastor inode removal tool\n"
"(c) Vitaliy Filippov, 2020 (VNPL-1.1)\n\n"
"USAGE:\n"
" %s --etcd_address <etcd_address> --pool <pool> --inode <inode> [--wait-list]\n",
" %s [--etcd_address <etcd_address>] --pool <pool> --inode <inode> [--wait-list]\n",
exe_name
);
exit(0);
@ -95,11 +95,6 @@ public:
void run(json11::Json cfg)
{
if (cfg["etcd_address"].string_value() == "")
{
fprintf(stderr, "etcd_address is missing\n");
exit(1);
}
inode = cfg["inode"].uint64_value();
pool_id = cfg["pool"].uint64_value();
if (pool_id)

View File

@ -46,7 +46,7 @@ $ETCDCTL put /vitastor/config/inode/1/1 '{"name":"debian9@0","size":'$((2048*102
$ETCDCTL put /vitastor/config/inode/1/2 '{"parent_id":1,"name":"debian9","size":'$((2048*1024*1024))'}'
qemu-system-x86_64 -enable-kvm -m 1024 \
-drive 'file=vitastor:etcd_host=127.0.0.1\:$ETCD_PORT/v3:image=debian9',format=raw,if=none,id=drive-virtio-disk0,cache=none \
-drive 'file=vitastor:etcd_host=127.0.0.1\:'$ETCD_PORT'/v3:image=debian9',format=raw,if=none,id=drive-virtio-disk0,cache=none \
-device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512 \
-vnc 0.0.0.0:0