Merge pull request #10 from markhpc/wip-threaded-init

s3-benchmark: make bucket initialization multi-threaded.
master
Mark Nelson 2019-08-15 12:23:58 -05:00 committed by GitHub
commit ce911b7ad5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 36 additions and 29 deletions

View File

@ -36,10 +36,10 @@ import (
// Global variables
var access_key, secret_key, url_host, bucket_prefix, object_prefix, region, modes, sizeArg string
var buckets []string
var duration_secs, threads, loops, bucket_count int
var duration_secs, threads, loops int
var object_data []byte
var object_data_md5 string
var running_threads, object_count, object_size, op_count int64
var running_threads, bucket_count, object_count, object_size, op_counter int64
var endtime, upload_finish, download_finish, delete_finish time.Time
var interval float64
@ -96,7 +96,7 @@ func getS3Client() *s3.S3 {
return client
}
func createBucket(bucket_num int, ignore_errors bool) {
func createBucket(bucket_num int64, ignore_errors bool) {
svc := s3.New(session.New(), cfg)
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
if _, err := svc.CreateBucket(in); err != nil {
@ -112,7 +112,7 @@ func createBucket(bucket_num int, ignore_errors bool) {
}
}
func deleteAllObjects(bucket_num int) {
func deleteAllObjects(bucket_num int64) {
svc := s3.New(session.New(), cfg)
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil {
@ -419,10 +419,10 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
if duration_secs > -1 && time.Now().After(endtime) {
break
}
objnum := atomic.AddInt64(&op_count, 1)
objnum := atomic.AddInt64(&op_counter, 1)
bucket_num := objnum % int64(bucket_count)
if object_count > -1 && objnum > object_count {
objnum = atomic.AddInt64(&op_count, -1)
if object_count > -1 && objnum >= object_count {
objnum = atomic.AddInt64(&op_counter, -1)
break
}
fileobj := bytes.NewReader(object_data)
@ -444,7 +444,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
if err != nil {
errcnt++
stats.addSlowDown(thread_num);
atomic.AddInt64(&op_count, -1)
atomic.AddInt64(&op_counter, -1)
fmt.Println("upload err", err)
} else {
// Update the stats
@ -470,9 +470,9 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) {
break
}
objnum := atomic.AddInt64(&op_count, 1)
if objnum > object_count {
atomic.AddInt64(&op_count, -1)
objnum := atomic.AddInt64(&op_counter, 1)
if objnum >= object_count {
atomic.AddInt64(&op_counter, -1)
break
}
@ -519,9 +519,9 @@ func runDelete(thread_num int, stats *Stats) {
svc := s3.New(session.New(), cfg)
for {
objnum := atomic.AddInt64(&op_count, 1)
if objnum > object_count {
atomic.AddInt64(&op_count, -1)
objnum := atomic.AddInt64(&op_counter, 1)
if objnum >= object_count {
atomic.AddInt64(&op_counter, -1)
break
}
@ -561,21 +561,27 @@ func runDelete(thread_num int, stats *Stats) {
var cfg *aws.Config
func initBuckets(loop int, stats *Stats) {
func initBuckets(thread_num int, stats *Stats) {
// Create the buckets and delete all the objects
for i := 0; i < bucket_count; i++ {
start := time.Now().UnixNano()
createBucket(i, true)
deleteAllObjects(i)
end := time.Now().UnixNano()
stats.updateIntervals(0)
stats.addOp(0, 0, end-start)
for {
bucket_num := atomic.AddInt64(&op_counter, 1)
if bucket_num >= bucket_count {
atomic.AddInt64(&op_counter, -1)
break
}
start := time.Now().UnixNano()
createBucket(bucket_num, true)
deleteAllObjects(bucket_num)
end := time.Now().UnixNano()
stats.updateIntervals(thread_num)
stats.addOp(thread_num, 0, end-start)
}
stats.finish(0)
atomic.AddInt64(&running_threads, -1)
stats.finish(thread_num)
}
func runWrapper(loop int, r rune) {
op_count = 0
op_counter = -1
running_threads = int64(threads)
intervalNano := int64(interval*1000000000)
endtime = time.Now().Add(time.Second * time.Duration(duration_secs))
@ -584,9 +590,10 @@ func runWrapper(loop int, r rune) {
switch r {
case 'i':
log.Printf("Running Loop %d Init", loop)
stats = makeStats(loop, "INIT", 1, intervalNano)
initBuckets(loop, &stats);
running_threads = 0;
stats = makeStats(loop, "INIT", threads, intervalNano)
for n := 0; n < threads; n++ {
go initBuckets(n, &stats);
}
case 'p':
log.Printf("Running Loop %d Put Test", loop)
stats = makeStats(loop, "PUT", threads, intervalNano)
@ -624,7 +631,7 @@ func init() {
myflag.StringVar(&region, "r", "us-east-1", "Region for testing")
myflag.StringVar(&modes, "m", "ipgd", "Run modes in order. See NOTES for more info")
myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>")
myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>")
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
@ -719,7 +726,7 @@ func main() {
initData()
// Setup the slice of buckets
for i := 0; i < bucket_count; i++ {
for i := int64(0); i < bucket_count; i++ {
buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i))
}