s3-benchmark: refactor and separate out bucket tests

Signed-off-by: Mark Nelson <mnelson@redhat.com>
master
Mark Nelson 2019-08-15 16:06:28 -04:00
parent ce911b7ad5
commit 4652f6d43e
1 changed files with 114 additions and 73 deletions

View File

@ -40,7 +40,7 @@ var duration_secs, threads, loops int
var object_data []byte
var object_data_md5 string
var running_threads, bucket_count, object_count, object_size, op_counter int64
var endtime, upload_finish, download_finish, delete_finish time.Time
var endtime time.Time
var interval float64
func logit(msg string) {
@ -96,47 +96,6 @@ func getS3Client() *s3.S3 {
return client
}
func createBucket(bucket_num int64, ignore_errors bool) {
svc := s3.New(session.New(), cfg)
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
if _, err := svc.CreateBucket(in); err != nil {
if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) ||
strings.Contains(err.Error(), "BucketAlreadyExists") {
return
}
if ignore_errors {
log.Printf("WARNING: createBucket %s error, ignoring %v", buckets[bucket_num], err)
} else {
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err)
}
}
}
func deleteAllObjects(bucket_num int64) {
svc := s3.New(session.New(), cfg)
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil {
log.Fatal("can't list objects")
}
n := len(out.Contents)
for n > 0 {
fmt.Printf("got existing %v objects, try to delete now...\n", n)
for _, v := range out.Contents {
svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: &buckets[bucket_num],
Key: v.Key,
})
}
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil {
log.Fatal("can't list objects")
}
n = len(out.Contents)
fmt.Printf("after delete, got %v objects\n", n)
}
}
// canonicalAmzHeaders -- return the x-amz headers canonicalized
func canonicalAmzHeaders(req *http.Request) string {
// Parse out all x-amz headers
@ -445,7 +404,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
errcnt++
stats.addSlowDown(thread_num);
atomic.AddInt64(&op_counter, -1)
fmt.Println("upload err", err)
log.Printf("upload err", err)
} else {
// Update the stats
stats.addOp(thread_num, object_size, end-start)
@ -454,11 +413,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
break
}
}
// Remember last done time
upload_finish = time.Now()
// One less thread
atomic.AddInt64(&running_threads, -1)
// stats are done
stats.finish(thread_num)
}
@ -492,7 +447,7 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) {
if err != nil {
errcnt++
stats.addSlowDown(thread_num);
fmt.Println("download err", err)
log.Printf("download err", err)
} else {
// Update the stats
stats.addOp(thread_num, object_size, end-start)
@ -506,11 +461,7 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) {
}
}
// Remember last done time
download_finish = time.Now()
// One less thread
atomic.AddInt64(&running_threads, -1)
// stats are done
stats.finish(thread_num)
}
@ -542,7 +493,7 @@ func runDelete(thread_num int, stats *Stats) {
if err != nil {
errcnt++
stats.addSlowDown(thread_num);
fmt.Println("delete err", err, "out", out.String())
log.Printf("delete err", err, "out", out.String())
} else {
// Update the stats
stats.addOp(thread_num, object_size, end-start)
@ -551,35 +502,104 @@ func runDelete(thread_num int, stats *Stats) {
break
}
}
// Remember last done time
delete_finish = time.Now()
// One less thread
atomic.AddInt64(&running_threads, -1)
// stats are done
stats.finish(thread_num)
}
func runBucketDelete(thread_num int, stats *Stats) {
svc := s3.New(session.New(), cfg)
for {
bucket_num := atomic.AddInt64(&op_counter, 1)
if bucket_num >= bucket_count {
atomic.AddInt64(&op_counter, -1)
break
}
r := &s3.DeleteBucketInput{
Bucket: &buckets[bucket_num],
}
start := time.Now().UnixNano()
_, err := svc.DeleteBucket(r)
end := time.Now().UnixNano()
stats.updateIntervals(thread_num)
if err != nil {
break
}
stats.addOp(thread_num, 0, end-start)
}
atomic.AddInt64(&running_threads, -1)
stats.finish(thread_num)
}
var cfg *aws.Config
func initBuckets(thread_num int, stats *Stats) {
// Create the buckets and delete all the objects
func runBucketsInit(thread_num int, stats *Stats) {
svc := s3.New(session.New(), cfg)
for {
bucket_num := atomic.AddInt64(&op_counter, 1)
bucket_num := atomic.AddInt64(&op_counter, 1)
if bucket_num >= bucket_count {
atomic.AddInt64(&op_counter, -1)
break
}
start := time.Now().UnixNano()
createBucket(bucket_num, true)
deleteAllObjects(bucket_num)
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
_, err := svc.CreateBucket(in)
end := time.Now().UnixNano()
stats.updateIntervals(thread_num)
if err != nil {
if !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) &&
!strings.Contains(err.Error(), "BucketAlreadyExists") {
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err)
}
}
stats.addOp(thread_num, 0, end-start)
}
atomic.AddInt64(&running_threads, -1)
stats.finish(thread_num)
}
func runBucketsClear(thread_num int, stats *Stats) {
svc := s3.New(session.New(), cfg)
for {
bucket_num := atomic.AddInt64(&op_counter, 1)
if bucket_num >= bucket_count {
atomic.AddInt64(&op_counter, -1)
break
}
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil {
break
}
n := len(out.Contents)
for n > 0 {
for _, v := range out.Contents {
start := time.Now().UnixNano()
svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: &buckets[bucket_num],
Key: v.Key,
})
end := time.Now().UnixNano()
stats.updateIntervals(thread_num)
stats.addOp(thread_num, *v.Size, end-start)
}
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil {
break
}
n = len(out.Contents)
}
}
atomic.AddInt64(&running_threads, -1)
stats.finish(thread_num)
}
func runWrapper(loop int, r rune) {
op_counter = -1
running_threads = int64(threads)
@ -588,31 +608,44 @@ func runWrapper(loop int, r rune) {
var stats Stats
switch r {
case 'c':
log.Printf("Running Loop %d BUCKET CLEAR TEST", loop)
stats = makeStats(loop, "BCLR", threads, intervalNano)
for n := 0; n < threads; n++ {
go runBucketsClear(n, &stats);
}
case 'x':
log.Printf("Running Loop %d BUCKET DELETE TEST", loop)
stats = makeStats(loop, "BDEL", threads, intervalNano)
for n := 0; n < threads; n++ {
go runBucketDelete(n, &stats);
}
case 'i':
log.Printf("Running Loop %d Init", loop)
stats = makeStats(loop, "INIT", threads, intervalNano)
log.Printf("Running Loop %d BUCKET INIT TEST", loop)
stats = makeStats(loop, "BINIT", threads, intervalNano)
for n := 0; n < threads; n++ {
go initBuckets(n, &stats);
go runBucketsInit(n, &stats);
}
case 'p':
log.Printf("Running Loop %d Put Test", loop)
log.Printf("Running Loop %d OBJECT PUT TEST", loop)
stats = makeStats(loop, "PUT", threads, intervalNano)
for n := 0; n < threads; n++ {
go runUpload(n, endtime, &stats);
}
case 'g':
log.Printf("Running Loop %d Get Test", loop)
log.Printf("Running Loop %d OBJECT GET TEST", loop)
stats = makeStats(loop, "GET", threads, intervalNano)
for n := 0; n < threads; n++ {
go runDownload(n, endtime, &stats);
}
case 'd':
log.Printf("Running Loop %d Del Test", loop)
log.Printf("Running Loop %d OBJECT DELETE TEST", loop)
stats = makeStats(loop, "DEL", threads, intervalNano)
for n := 0; n < threads; n++ {
go runDelete(n, &stats);
}
}
}
// Wait for it to finish
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
@ -629,7 +662,7 @@ func init() {
myflag.StringVar(&object_prefix, "op", "", "Prefix for objects")
myflag.StringVar(&bucket_prefix, "bp", "hotsauce_bench", "Prefix for buckets")
myflag.StringVar(&region, "r", "us-east-1", "Region for testing")
myflag.StringVar(&modes, "m", "ipgd", "Run modes in order. See NOTES for more info")
myflag.StringVar(&modes, "m", "cxipgdx", "Run modes in order. See NOTES for more info")
myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>")
myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>")
@ -642,10 +675,12 @@ func init() {
`
NOTES:
- Valid mode types for the -m mode string are:
i: initialize buckets and clear any existing objects
c: clear all existing objects from buckets (requires lookups)
x: delete buckets
i: initialize buckets
p: put objects in buckets
g: get objects from buckets
d: delete objects from buckets
d: delete objects from buckets
These modes are processed in-order and can be repeated, ie "ippgd" will
initialize the buckets, put the objects, reput the objects, get the
@ -678,7 +713,13 @@ NOTES:
}
invalid_mode := false
for _, r := range modes {
if (r != 'i' && r != 'p' && r != 'g' && r != 'd') {
if (
r != 'i' &&
r != 'c' &&
r != 'p' &&
r != 'g' &&
r != 'd' &&
r != 'x') {
s := fmt.Sprintf("Invalid mode '%s' passed to -m", string(r))
log.Printf(s)
invalid_mode = true