Merge pull request #11 from markhpc/wip-bucket-tests
s3-benchmark: refactor and separate out bucket testsmaster
commit
35b1231801
187
s3-benchmark.go
187
s3-benchmark.go
|
@ -40,7 +40,7 @@ var duration_secs, threads, loops int
|
|||
var object_data []byte
|
||||
var object_data_md5 string
|
||||
var running_threads, bucket_count, object_count, object_size, op_counter int64
|
||||
var endtime, upload_finish, download_finish, delete_finish time.Time
|
||||
var endtime time.Time
|
||||
var interval float64
|
||||
|
||||
func logit(msg string) {
|
||||
|
@ -96,47 +96,6 @@ func getS3Client() *s3.S3 {
|
|||
return client
|
||||
}
|
||||
|
||||
func createBucket(bucket_num int64, ignore_errors bool) {
|
||||
svc := s3.New(session.New(), cfg)
|
||||
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
|
||||
if _, err := svc.CreateBucket(in); err != nil {
|
||||
if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) ||
|
||||
strings.Contains(err.Error(), "BucketAlreadyExists") {
|
||||
return
|
||||
}
|
||||
if ignore_errors {
|
||||
log.Printf("WARNING: createBucket %s error, ignoring %v", buckets[bucket_num], err)
|
||||
} else {
|
||||
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func deleteAllObjects(bucket_num int64) {
|
||||
svc := s3.New(session.New(), cfg)
|
||||
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
||||
if err != nil {
|
||||
log.Fatal("can't list objects")
|
||||
}
|
||||
n := len(out.Contents)
|
||||
for n > 0 {
|
||||
fmt.Printf("got existing %v objects, try to delete now...\n", n)
|
||||
|
||||
for _, v := range out.Contents {
|
||||
svc.DeleteObject(&s3.DeleteObjectInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
Key: v.Key,
|
||||
})
|
||||
}
|
||||
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
||||
if err != nil {
|
||||
log.Fatal("can't list objects")
|
||||
}
|
||||
n = len(out.Contents)
|
||||
fmt.Printf("after delete, got %v objects\n", n)
|
||||
}
|
||||
}
|
||||
|
||||
// canonicalAmzHeaders -- return the x-amz headers canonicalized
|
||||
func canonicalAmzHeaders(req *http.Request) string {
|
||||
// Parse out all x-amz headers
|
||||
|
@ -445,7 +404,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
|
|||
errcnt++
|
||||
stats.addSlowDown(thread_num);
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
fmt.Println("upload err", err)
|
||||
log.Printf("upload err", err)
|
||||
} else {
|
||||
// Update the stats
|
||||
stats.addOp(thread_num, object_size, end-start)
|
||||
|
@ -454,11 +413,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
|
|||
break
|
||||
}
|
||||
}
|
||||
// Remember last done time
|
||||
upload_finish = time.Now()
|
||||
// One less thread
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
// stats are done
|
||||
stats.finish(thread_num)
|
||||
}
|
||||
|
||||
|
@ -492,7 +447,7 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) {
|
|||
if err != nil {
|
||||
errcnt++
|
||||
stats.addSlowDown(thread_num);
|
||||
fmt.Println("download err", err)
|
||||
log.Printf("download err", err)
|
||||
} else {
|
||||
// Update the stats
|
||||
stats.addOp(thread_num, object_size, end-start)
|
||||
|
@ -506,11 +461,7 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) {
|
|||
}
|
||||
|
||||
}
|
||||
// Remember last done time
|
||||
download_finish = time.Now()
|
||||
// One less thread
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
// stats are done
|
||||
stats.finish(thread_num)
|
||||
}
|
||||
|
||||
|
@ -542,7 +493,7 @@ func runDelete(thread_num int, stats *Stats) {
|
|||
if err != nil {
|
||||
errcnt++
|
||||
stats.addSlowDown(thread_num);
|
||||
fmt.Println("delete err", err, "out", out.String())
|
||||
log.Printf("delete err", err, "out", out.String())
|
||||
} else {
|
||||
// Update the stats
|
||||
stats.addOp(thread_num, object_size, end-start)
|
||||
|
@ -551,35 +502,104 @@ func runDelete(thread_num int, stats *Stats) {
|
|||
break
|
||||
}
|
||||
}
|
||||
// Remember last done time
|
||||
delete_finish = time.Now()
|
||||
// One less thread
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
// stats are done
|
||||
stats.finish(thread_num)
|
||||
}
|
||||
|
||||
func runBucketDelete(thread_num int, stats *Stats) {
|
||||
svc := s3.New(session.New(), cfg)
|
||||
|
||||
for {
|
||||
bucket_num := atomic.AddInt64(&op_counter, 1)
|
||||
if bucket_num >= bucket_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
r := &s3.DeleteBucketInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
}
|
||||
|
||||
start := time.Now().UnixNano()
|
||||
_, err := svc.DeleteBucket(r)
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
stats.addOp(thread_num, 0, end-start)
|
||||
}
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
stats.finish(thread_num)
|
||||
}
|
||||
|
||||
var cfg *aws.Config
|
||||
|
||||
func initBuckets(thread_num int, stats *Stats) {
|
||||
// Create the buckets and delete all the objects
|
||||
func runBucketsInit(thread_num int, stats *Stats) {
|
||||
svc := s3.New(session.New(), cfg)
|
||||
|
||||
for {
|
||||
bucket_num := atomic.AddInt64(&op_counter, 1)
|
||||
bucket_num := atomic.AddInt64(&op_counter, 1)
|
||||
if bucket_num >= bucket_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
start := time.Now().UnixNano()
|
||||
createBucket(bucket_num, true)
|
||||
deleteAllObjects(bucket_num)
|
||||
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
|
||||
_, err := svc.CreateBucket(in)
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) &&
|
||||
!strings.Contains(err.Error(), "BucketAlreadyExists") {
|
||||
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err)
|
||||
}
|
||||
}
|
||||
stats.addOp(thread_num, 0, end-start)
|
||||
}
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
stats.finish(thread_num)
|
||||
}
|
||||
|
||||
func runBucketsClear(thread_num int, stats *Stats) {
|
||||
svc := s3.New(session.New(), cfg)
|
||||
|
||||
for {
|
||||
bucket_num := atomic.AddInt64(&op_counter, 1)
|
||||
if bucket_num >= bucket_count {
|
||||
atomic.AddInt64(&op_counter, -1)
|
||||
break
|
||||
}
|
||||
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
n := len(out.Contents)
|
||||
for n > 0 {
|
||||
for _, v := range out.Contents {
|
||||
start := time.Now().UnixNano()
|
||||
svc.DeleteObject(&s3.DeleteObjectInput{
|
||||
Bucket: &buckets[bucket_num],
|
||||
Key: v.Key,
|
||||
})
|
||||
end := time.Now().UnixNano()
|
||||
stats.updateIntervals(thread_num)
|
||||
stats.addOp(thread_num, *v.Size, end-start)
|
||||
|
||||
}
|
||||
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
n = len(out.Contents)
|
||||
}
|
||||
}
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
stats.finish(thread_num)
|
||||
}
|
||||
|
||||
|
||||
func runWrapper(loop int, r rune) {
|
||||
op_counter = -1
|
||||
running_threads = int64(threads)
|
||||
|
@ -588,31 +608,44 @@ func runWrapper(loop int, r rune) {
|
|||
var stats Stats
|
||||
|
||||
switch r {
|
||||
case 'c':
|
||||
log.Printf("Running Loop %d BUCKET CLEAR TEST", loop)
|
||||
stats = makeStats(loop, "BCLR", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runBucketsClear(n, &stats);
|
||||
}
|
||||
case 'x':
|
||||
log.Printf("Running Loop %d BUCKET DELETE TEST", loop)
|
||||
stats = makeStats(loop, "BDEL", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runBucketDelete(n, &stats);
|
||||
}
|
||||
case 'i':
|
||||
log.Printf("Running Loop %d Init", loop)
|
||||
stats = makeStats(loop, "INIT", threads, intervalNano)
|
||||
log.Printf("Running Loop %d BUCKET INIT TEST", loop)
|
||||
stats = makeStats(loop, "BINIT", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go initBuckets(n, &stats);
|
||||
go runBucketsInit(n, &stats);
|
||||
}
|
||||
case 'p':
|
||||
log.Printf("Running Loop %d Put Test", loop)
|
||||
log.Printf("Running Loop %d OBJECT PUT TEST", loop)
|
||||
stats = makeStats(loop, "PUT", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runUpload(n, endtime, &stats);
|
||||
}
|
||||
case 'g':
|
||||
log.Printf("Running Loop %d Get Test", loop)
|
||||
log.Printf("Running Loop %d OBJECT GET TEST", loop)
|
||||
stats = makeStats(loop, "GET", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runDownload(n, endtime, &stats);
|
||||
}
|
||||
case 'd':
|
||||
log.Printf("Running Loop %d Del Test", loop)
|
||||
log.Printf("Running Loop %d OBJECT DELETE TEST", loop)
|
||||
stats = makeStats(loop, "DEL", threads, intervalNano)
|
||||
for n := 0; n < threads; n++ {
|
||||
go runDelete(n, &stats);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for it to finish
|
||||
for atomic.LoadInt64(&running_threads) > 0 {
|
||||
time.Sleep(time.Millisecond)
|
||||
|
@ -629,7 +662,7 @@ func init() {
|
|||
myflag.StringVar(&object_prefix, "op", "", "Prefix for objects")
|
||||
myflag.StringVar(&bucket_prefix, "bp", "hotsauce_bench", "Prefix for buckets")
|
||||
myflag.StringVar(®ion, "r", "us-east-1", "Region for testing")
|
||||
myflag.StringVar(&modes, "m", "ipgd", "Run modes in order. See NOTES for more info")
|
||||
myflag.StringVar(&modes, "m", "cxipgdx", "Run modes in order. See NOTES for more info")
|
||||
myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>")
|
||||
myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
|
||||
myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>")
|
||||
|
@ -642,10 +675,12 @@ func init() {
|
|||
`
|
||||
NOTES:
|
||||
- Valid mode types for the -m mode string are:
|
||||
i: initialize buckets and clear any existing objects
|
||||
c: clear all existing objects from buckets (requires lookups)
|
||||
x: delete buckets
|
||||
i: initialize buckets
|
||||
p: put objects in buckets
|
||||
g: get objects from buckets
|
||||
d: delete objects from buckets
|
||||
d: delete objects from buckets
|
||||
|
||||
These modes are processed in-order and can be repeated, ie "ippgd" will
|
||||
initialize the buckets, put the objects, reput the objects, get the
|
||||
|
@ -678,7 +713,13 @@ NOTES:
|
|||
}
|
||||
invalid_mode := false
|
||||
for _, r := range modes {
|
||||
if (r != 'i' && r != 'p' && r != 'g' && r != 'd') {
|
||||
if (
|
||||
r != 'i' &&
|
||||
r != 'c' &&
|
||||
r != 'p' &&
|
||||
r != 'g' &&
|
||||
r != 'd' &&
|
||||
r != 'x') {
|
||||
s := fmt.Sprintf("Invalid mode '%s' passed to -m", string(r))
|
||||
log.Printf(s)
|
||||
invalid_mode = true
|
||||
|
|
Loading…
Reference in New Issue