|
|
|
@ -10,7 +10,6 @@ import ( |
|
|
|
|
"bytes" |
|
|
|
|
"strconv" |
|
|
|
|
"time" |
|
|
|
|
"fmt" |
|
|
|
|
"os" |
|
|
|
|
"os/exec" |
|
|
|
|
"io/ioutil" |
|
|
|
@ -21,8 +20,6 @@ import ( |
|
|
|
|
"google.golang.org/grpc/codes" |
|
|
|
|
"google.golang.org/grpc/status" |
|
|
|
|
|
|
|
|
|
"go.etcd.io/etcd/clientv3" |
|
|
|
|
|
|
|
|
|
"github.com/container-storage-interface/spec/lib/go/csi" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
@ -114,6 +111,34 @@ func GetConnectionParams(params map[string]string) (map[string]string, []string, |
|
|
|
|
return ctxVars, etcdUrl, etcdPrefix |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func invokeCLI(ctxVars map[string]string, args []string) ([]byte, error) |
|
|
|
|
{ |
|
|
|
|
if (ctxVars["etcdUrl"] != "") |
|
|
|
|
{ |
|
|
|
|
args = append(args, "--etcd_address", ctxVars["etcdUrl"]) |
|
|
|
|
} |
|
|
|
|
if (ctxVars["etcdPrefix"] != "") |
|
|
|
|
{ |
|
|
|
|
args = append(args, "--etcd_prefix", ctxVars["etcdPrefix"]) |
|
|
|
|
} |
|
|
|
|
if (ctxVars["configPath"] != "") |
|
|
|
|
{ |
|
|
|
|
args = append(args, "--config_path", ctxVars["configPath"]) |
|
|
|
|
} |
|
|
|
|
c := exec.Command("/usr/bin/vitastor-cli", args...) |
|
|
|
|
var stdout, stderr bytes.Buffer |
|
|
|
|
c.Stdout = &stdout |
|
|
|
|
c.Stderr = &stderr |
|
|
|
|
err := c.Run() |
|
|
|
|
stderrStr := string(stderr.Bytes()) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
klog.Errorf("vitastor-cli %s failed: %s, status %s\n", strings.Join(args, " "), stderrStr, err) |
|
|
|
|
return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")") |
|
|
|
|
} |
|
|
|
|
return stdout.Bytes(), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Create the volume
|
|
|
|
|
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) |
|
|
|
|
{ |
|
|
|
@ -146,128 +171,41 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol |
|
|
|
|
volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// FIXME: The following should PROBABLY be implemented externally in a management tool
|
|
|
|
|
|
|
|
|
|
ctxVars, etcdUrl, etcdPrefix := GetConnectionParams(req.Parameters) |
|
|
|
|
ctxVars, etcdUrl, _ := GetConnectionParams(req.Parameters) |
|
|
|
|
if (len(etcdUrl) == 0) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Connect to etcd
|
|
|
|
|
cli, err := clientv3.New(clientv3.Config{ |
|
|
|
|
DialTimeout: ETCD_TIMEOUT, |
|
|
|
|
Endpoints: etcdUrl, |
|
|
|
|
}) |
|
|
|
|
// Create image using vitastor-cli
|
|
|
|
|
_, err := invokeCLI(ctxVars, []string{ "create", volName, "-s", string(volSize), "--pool", string(poolId) }) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error()) |
|
|
|
|
} |
|
|
|
|
defer cli.Close() |
|
|
|
|
|
|
|
|
|
var imageId uint64 = 0 |
|
|
|
|
for |
|
|
|
|
{ |
|
|
|
|
// Check if the image exists
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) |
|
|
|
|
resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName) |
|
|
|
|
cancel() |
|
|
|
|
if (err != nil) |
|
|
|
|
if (strings.Index(err.Error(), "already exists") > 0) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
if (len(resp.Kvs) > 0) |
|
|
|
|
{ |
|
|
|
|
kv := resp.Kvs[0] |
|
|
|
|
var v InodeIndex |
|
|
|
|
err := json.Unmarshal(kv.Value, &v) |
|
|
|
|
stat, err := invokeCLI(ctxVars, []string{ "ls", "--json", volName }) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error()) |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
poolId = v.PoolId |
|
|
|
|
imageId = v.Id |
|
|
|
|
inodeCfgKey := fmt.Sprintf("/config/inode/%d/%d", poolId, imageId) |
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) |
|
|
|
|
resp, err := cli.Get(ctx, etcdPrefix+inodeCfgKey) |
|
|
|
|
cancel() |
|
|
|
|
var inodeCfg []InodeConfig |
|
|
|
|
err = json.Unmarshal(stat, &inodeCfg) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
if (len(resp.Kvs) == 0) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "missing "+inodeCfgKey+" key in etcd") |
|
|
|
|
return nil, status.Error(codes.Internal, "Invalid JSON in vitastor-cli ls: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
var inodeCfg InodeConfig |
|
|
|
|
err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg) |
|
|
|
|
if (err != nil) |
|
|
|
|
if (len(inodeCfg) == 0) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error()) |
|
|
|
|
return nil, status.Error(codes.Internal, "vitastor-cli create said that image already exists, but ls can't find it") |
|
|
|
|
} |
|
|
|
|
if (inodeCfg.Size < uint64(volSize)) |
|
|
|
|
if (inodeCfg[0].Size < uint64(volSize)) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
// Find a free ID
|
|
|
|
|
// Create image metadata in a transaction verifying that the image doesn't exist yet AND ID is still free
|
|
|
|
|
maxIdKey := fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId) |
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) |
|
|
|
|
resp, err := cli.Get(ctx, maxIdKey) |
|
|
|
|
cancel() |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
var modRev int64 |
|
|
|
|
var nextId uint64 |
|
|
|
|
if (len(resp.Kvs) > 0) |
|
|
|
|
{ |
|
|
|
|
var err error |
|
|
|
|
nextId, err = strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, maxIdKey+" contains invalid ID") |
|
|
|
|
} |
|
|
|
|
modRev = resp.Kvs[0].ModRevision |
|
|
|
|
nextId++ |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
nextId = 1 |
|
|
|
|
} |
|
|
|
|
inodeIdxJson, _ := json.Marshal(InodeIndex{ |
|
|
|
|
Id: nextId, |
|
|
|
|
PoolId: poolId, |
|
|
|
|
}) |
|
|
|
|
inodeCfgJson, _ := json.Marshal(InodeConfig{ |
|
|
|
|
Name: volName, |
|
|
|
|
Size: uint64(volSize), |
|
|
|
|
}) |
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT) |
|
|
|
|
txnResp, err := cli.Txn(ctx).If( |
|
|
|
|
clientv3.Compare(clientv3.ModRevision(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId)), "=", modRev), |
|
|
|
|
clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)), "=", 0), |
|
|
|
|
clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId)), "=", 0), |
|
|
|
|
).Then( |
|
|
|
|
clientv3.OpPut(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId), fmt.Sprintf("%d", nextId)), |
|
|
|
|
clientv3.OpPut(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName), string(inodeIdxJson)), |
|
|
|
|
clientv3.OpPut(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId), string(inodeCfgJson)), |
|
|
|
|
).Commit() |
|
|
|
|
cancel() |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to commit transaction in etcd: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
if (txnResp.Succeeded) |
|
|
|
|
{ |
|
|
|
|
imageId = nextId |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
// Start over if the transaction fails
|
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -299,97 +237,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol |
|
|
|
|
} |
|
|
|
|
volName := ctxVars["name"] |
|
|
|
|
|
|
|
|
|
_, etcdUrl, etcdPrefix := GetConnectionParams(ctxVars) |
|
|
|
|
if (len(etcdUrl) == 0) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
cli, err := clientv3.New(clientv3.Config{ |
|
|
|
|
DialTimeout: ETCD_TIMEOUT, |
|
|
|
|
Endpoints: etcdUrl, |
|
|
|
|
}) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error()) |
|
|
|
|
} |
|
|
|
|
defer cli.Close() |
|
|
|
|
|
|
|
|
|
// Find inode by name
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT) |
|
|
|
|
resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName) |
|
|
|
|
cancel() |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
if (len(resp.Kvs) == 0) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist") |
|
|
|
|
} |
|
|
|
|
var idx InodeIndex |
|
|
|
|
err = json.Unmarshal(resp.Kvs[0].Value, &idx) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Get inode config
|
|
|
|
|
inodeCfgKey := fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id) |
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT) |
|
|
|
|
resp, err = cli.Get(ctx, inodeCfgKey) |
|
|
|
|
cancel() |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
if (len(resp.Kvs) == 0) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist") |
|
|
|
|
} |
|
|
|
|
var inodeCfg InodeConfig |
|
|
|
|
err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Delete inode data by invoking vitastor-cli
|
|
|
|
|
args := []string{ |
|
|
|
|
"rm-data", "--etcd_address", strings.Join(etcdUrl, ","), |
|
|
|
|
"--pool", fmt.Sprintf("%d", idx.PoolId), |
|
|
|
|
"--inode", fmt.Sprintf("%d", idx.Id), |
|
|
|
|
} |
|
|
|
|
if (ctxVars["configPath"] != "") |
|
|
|
|
{ |
|
|
|
|
args = append(args, "--config_path", ctxVars["configPath"]) |
|
|
|
|
} |
|
|
|
|
c := exec.Command("/usr/bin/vitastor-cli", args...) |
|
|
|
|
var stderr bytes.Buffer |
|
|
|
|
c.Stdout = nil |
|
|
|
|
c.Stderr = &stderr |
|
|
|
|
err = c.Run() |
|
|
|
|
stderrStr := string(stderr.Bytes()) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
klog.Errorf("vitastor-cli rm-data failed: %s, status %s\n", stderrStr, err) |
|
|
|
|
return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")") |
|
|
|
|
} |
|
|
|
|
ctxVars, _, _ = GetConnectionParams(ctxVars) |
|
|
|
|
|
|
|
|
|
// Delete inode config in etcd
|
|
|
|
|
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT) |
|
|
|
|
txnResp, err := cli.Txn(ctx).Then( |
|
|
|
|
clientv3.OpDelete(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)), |
|
|
|
|
clientv3.OpDelete(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id)), |
|
|
|
|
).Commit() |
|
|
|
|
cancel() |
|
|
|
|
_, err = invokeCLI(ctxVars, []string{ "rm", volName }) |
|
|
|
|
if (err != nil) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to delete keys in etcd: "+err.Error()) |
|
|
|
|
} |
|
|
|
|
if (!txnResp.Succeeded) |
|
|
|
|
{ |
|
|
|
|
return nil, status.Error(codes.Internal, "failed to delete keys in etcd: transaction failed") |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return &csi.DeleteVolumeResponse{}, nil |
|
|
|
|