Use vitastor-cli instead of direct etcd interaction in the CSI driver

rdma-flow-control
Vitaliy Filippov 2022-04-17 19:38:35 +03:00
parent 14d6acbcba
commit c1365f46c9
1 changed files with 44 additions and 191 deletions

View File

@ -10,7 +10,6 @@ import (
"bytes"
"strconv"
"time"
"fmt"
"os"
"os/exec"
"io/ioutil"
@ -21,8 +20,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.etcd.io/etcd/clientv3"
"github.com/container-storage-interface/spec/lib/go/csi"
)
@ -114,6 +111,34 @@ func GetConnectionParams(params map[string]string) (map[string]string, []string,
return ctxVars, etcdUrl, etcdPrefix
}
func invokeCLI(ctxVars map[string]string, args []string) ([]byte, error)
{
if (ctxVars["etcdUrl"] != "")
{
args = append(args, "--etcd_address", ctxVars["etcdUrl"])
}
if (ctxVars["etcdPrefix"] != "")
{
args = append(args, "--etcd_prefix", ctxVars["etcdPrefix"])
}
if (ctxVars["configPath"] != "")
{
args = append(args, "--config_path", ctxVars["configPath"])
}
c := exec.Command("/usr/bin/vitastor-cli", args...)
var stdout, stderr bytes.Buffer
c.Stdout = &stdout
c.Stderr = &stderr
err := c.Run()
stderrStr := string(stderr.Bytes())
if (err != nil)
{
klog.Errorf("vitastor-cli %s failed: %s, status %s\n", strings.Join(args, " "), stderrStr, err)
return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")")
}
return stdout.Bytes(), nil
}
// Create the volume
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error)
{
@ -146,128 +171,41 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB
}
// FIXME: The following should PROBABLY be implemented externally in a management tool
ctxVars, etcdUrl, etcdPrefix := GetConnectionParams(req.Parameters)
ctxVars, etcdUrl, _ := GetConnectionParams(req.Parameters)
if (len(etcdUrl) == 0)
{
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf")
}
// Connect to etcd
cli, err := clientv3.New(clientv3.Config{
DialTimeout: ETCD_TIMEOUT,
Endpoints: etcdUrl,
})
// Create image using vitastor-cli
_, err := invokeCLI(ctxVars, []string{ "create", volName, "-s", string(volSize), "--pool", string(poolId) })
if (err != nil)
{
return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error())
}
defer cli.Close()
var imageId uint64 = 0
for
{
// Check if the image exists
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName)
cancel()
if (err != nil)
if (strings.Index(err.Error(), "already exists") > 0)
{
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
}
if (len(resp.Kvs) > 0)
{
kv := resp.Kvs[0]
var v InodeIndex
err := json.Unmarshal(kv.Value, &v)
stat, err := invokeCLI(ctxVars, []string{ "ls", "--json", volName })
if (err != nil)
{
return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error())
return nil, err
}
poolId = v.PoolId
imageId = v.Id
inodeCfgKey := fmt.Sprintf("/config/inode/%d/%d", poolId, imageId)
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
resp, err := cli.Get(ctx, etcdPrefix+inodeCfgKey)
cancel()
var inodeCfg []InodeConfig
err = json.Unmarshal(stat, &inodeCfg)
if (err != nil)
{
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
return nil, status.Error(codes.Internal, "Invalid JSON in vitastor-cli ls: "+err.Error())
}
if (len(resp.Kvs) == 0)
if (len(inodeCfg) == 0)
{
return nil, status.Error(codes.Internal, "missing "+inodeCfgKey+" key in etcd")
return nil, status.Error(codes.Internal, "vitastor-cli create said that image already exists, but ls can't find it")
}
var inodeCfg InodeConfig
err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg)
if (err != nil)
{
return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error())
}
if (inodeCfg.Size < uint64(volSize))
if (inodeCfg[0].Size < uint64(volSize))
{
return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected")
}
}
else
{
// Find a free ID
// Create image metadata in a transaction verifying that the image doesn't exist yet AND ID is still free
maxIdKey := fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId)
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
resp, err := cli.Get(ctx, maxIdKey)
cancel()
if (err != nil)
{
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
}
var modRev int64
var nextId uint64
if (len(resp.Kvs) > 0)
{
var err error
nextId, err = strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64)
if (err != nil)
{
return nil, status.Error(codes.Internal, maxIdKey+" contains invalid ID")
}
modRev = resp.Kvs[0].ModRevision
nextId++
}
else
{
nextId = 1
}
inodeIdxJson, _ := json.Marshal(InodeIndex{
Id: nextId,
PoolId: poolId,
})
inodeCfgJson, _ := json.Marshal(InodeConfig{
Name: volName,
Size: uint64(volSize),
})
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT)
txnResp, err := cli.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId)), "=", modRev),
clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)), "=", 0),
clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId)), "=", 0),
).Then(
clientv3.OpPut(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId), fmt.Sprintf("%d", nextId)),
clientv3.OpPut(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName), string(inodeIdxJson)),
clientv3.OpPut(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId), string(inodeCfgJson)),
).Commit()
cancel()
if (err != nil)
{
return nil, status.Error(codes.Internal, "failed to commit transaction in etcd: "+err.Error())
}
if (txnResp.Succeeded)
{
imageId = nextId
break
}
// Start over if the transaction fails
return nil, err
}
}
@ -299,97 +237,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
volName := ctxVars["name"]
_, etcdUrl, etcdPrefix := GetConnectionParams(ctxVars)
if (len(etcdUrl) == 0)
{
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf")
}
ctxVars, _, _ = GetConnectionParams(ctxVars)
cli, err := clientv3.New(clientv3.Config{
DialTimeout: ETCD_TIMEOUT,
Endpoints: etcdUrl,
})
_, err = invokeCLI(ctxVars, []string{ "rm", volName })
if (err != nil)
{
return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error())
}
defer cli.Close()
// Find inode by name
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName)
cancel()
if (err != nil)
{
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
}
if (len(resp.Kvs) == 0)
{
return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist")
}
var idx InodeIndex
err = json.Unmarshal(resp.Kvs[0].Value, &idx)
if (err != nil)
{
return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error())
}
// Get inode config
inodeCfgKey := fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id)
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT)
resp, err = cli.Get(ctx, inodeCfgKey)
cancel()
if (err != nil)
{
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
}
if (len(resp.Kvs) == 0)
{
return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist")
}
var inodeCfg InodeConfig
err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg)
if (err != nil)
{
return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error())
}
// Delete inode data by invoking vitastor-cli
args := []string{
"rm-data", "--etcd_address", strings.Join(etcdUrl, ","),
"--pool", fmt.Sprintf("%d", idx.PoolId),
"--inode", fmt.Sprintf("%d", idx.Id),
}
if (ctxVars["configPath"] != "")
{
args = append(args, "--config_path", ctxVars["configPath"])
}
c := exec.Command("/usr/bin/vitastor-cli", args...)
var stderr bytes.Buffer
c.Stdout = nil
c.Stderr = &stderr
err = c.Run()
stderrStr := string(stderr.Bytes())
if (err != nil)
{
klog.Errorf("vitastor-cli rm-data failed: %s, status %s\n", stderrStr, err)
return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")")
}
// Delete inode config in etcd
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT)
txnResp, err := cli.Txn(ctx).Then(
clientv3.OpDelete(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)),
clientv3.OpDelete(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id)),
).Commit()
cancel()
if (err != nil)
{
return nil, status.Error(codes.Internal, "failed to delete keys in etcd: "+err.Error())
}
if (!txnResp.Succeeded)
{
return nil, status.Error(codes.Internal, "failed to delete keys in etcd: transaction failed")
return nil, err
}
return &csi.DeleteVolumeResponse{}, nil