diff --git a/.gitignore b/.gitignore index a1338d6..c143d61 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,9 @@ *.so *.dylib +*.log +s3-benchmark + # Test binary, build with `go test -c` *.test diff --git a/s3-benchmark b/s3-benchmark deleted file mode 100755 index 39c18a4..0000000 Binary files a/s3-benchmark and /dev/null differ diff --git a/s3-benchmark.go b/s3-benchmark.go index d6f7f7a..aae0982 100644 --- a/s3-benchmark.go +++ b/s3-benchmark.go @@ -12,11 +12,6 @@ import ( "encoding/base64" "flag" "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/pivotal-golang/bytefmt" "io" "io/ioutil" "log" @@ -25,15 +20,20 @@ import ( "net/http" "os" "sort" - "strconv" "strings" "sync" "sync/atomic" "time" + + "code.cloudfoundry.org/bytefmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" ) // Global variables -var access_key, secret_key, url_host, bucket, region string +var access_key, secret_key, url_host, bucket, region, sizeArg string var duration_secs, threads, loops int var object_size uint64 var object_data []byte @@ -96,10 +96,12 @@ func getS3Client() *s3.S3 { func createBucket(ignore_errors bool) { // Get a client - client := getS3Client() + // client := getS3Client() // Create our bucket (may already exist without error) + svc := s3.New(session.New(), cfg) in := &s3.CreateBucketInput{Bucket: aws.String(bucket)} - if _, err := client.CreateBucket(in); err != nil { + if _, err := svc.CreateBucket(in); err != nil && + !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) { if ignore_errors { log.Printf("WARNING: createBucket %s error, ignoring %v", bucket, err) } else { @@ -110,55 +112,83 @@ func createBucket(ignore_errors bool) { func deleteAllObjects() { // Get a client - client := getS3Client() - // Use multiple routines to do the actual delete - var doneDeletes sync.WaitGroup - // Loop deleting our versions reading as big a list as we can - var keyMarker, versionId *string - var err error - for loop := 1; ; loop++ { - // Delete all the existing objects and versions in the bucket - in := &s3.ListObjectVersionsInput{Bucket: aws.String(bucket), KeyMarker: keyMarker, VersionIdMarker: versionId, MaxKeys: aws.Int64(1000)} - if listVersions, listErr := client.ListObjectVersions(in); listErr == nil { - delete := &s3.Delete{Quiet: aws.Bool(true)} - for _, version := range listVersions.Versions { - delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: version.Key, VersionId: version.VersionId}) - } - for _, marker := range listVersions.DeleteMarkers { - delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: marker.Key, VersionId: marker.VersionId}) - } - if len(delete.Objects) > 0 { - // Start a delete routine - doDelete := func(bucket string, delete *s3.Delete) { - if _, e := client.DeleteObjects(&s3.DeleteObjectsInput{Bucket: aws.String(bucket), Delete: delete}); e != nil { - err = fmt.Errorf("DeleteObjects unexpected failure: %s", e.Error()) - } - doneDeletes.Done() - } - doneDeletes.Add(1) - go doDelete(bucket, delete) - } - // Advance to next versions - if listVersions.IsTruncated == nil || !*listVersions.IsTruncated { - break - } - keyMarker = listVersions.NextKeyMarker - versionId = listVersions.NextVersionIdMarker - } else { - // The bucket may not exist, just ignore in that case - if strings.HasPrefix(listErr.Error(), "NoSuchBucket") { - return - } - err = fmt.Errorf("ListObjectVersions unexpected failure: %v", listErr) - break - } - } - // Wait for deletes to finish - doneDeletes.Wait() - // If error, it is fatal + // client := getS3Client() + svc := s3.New(session.New(), cfg) + // in := &s3.DeleteBucketInput{Bucket: aws.String(bucket)} + // if _, err := svc.DeleteBucket(in); err != nil { + // log.Printf("FATAL: Unable to delete bucket %s : %v", bucket, err) + // } + out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket}) if err != nil { - log.Fatalf("FATAL: Unable to delete objects from bucket: %v", err) + log.Fatal("can't list objects") } + n := len(out.Contents) + if n == 0 { + return + } + fmt.Printf("got existing %v objects, try to delete now...\n", n) + + for _, v := range out.Contents { + svc.DeleteObject(&s3.DeleteObjectInput{ + Bucket: &bucket, + Key: v.Key, + }) + } + out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket}) + if err != nil { + log.Fatal("can't list objects") + } + fmt.Printf("after delete, got %v objects\n", len(out.Contents)) + + // // Use multiple routines to do the actual delete + // var doneDeletes sync.WaitGroup + // // Loop deleting our versions reading as big a list as we can + // var keyMarker, versionId *string + // var err error + // for loop := 1; ; loop++ { + // // Delete all the existing objects and versions in the bucket + // in := &s3.ListObjectVersionsInput{Bucket: aws.String(bucket), KeyMarker: keyMarker, VersionIdMarker: versionId, MaxKeys: aws.Int64(1000)} + // if listVersions, listErr := client.ListObjectVersions(in); listErr == nil { + // delete := &s3.Delete{Quiet: aws.Bool(true)} + // for _, version := range listVersions.Versions { + // delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: version.Key, VersionId: version.VersionId}) + // } + // for _, marker := range listVersions.DeleteMarkers { + // delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: marker.Key, VersionId: marker.VersionId}) + // } + // if len(delete.Objects) > 0 { + // // Start a delete routine + // doDelete := func(bucket string, delete *s3.Delete) { + // if _, e := client.DeleteObjects(&s3.DeleteObjectsInput{Bucket: aws.String(bucket), Delete: delete}); e != nil { + // err = fmt.Errorf("DeleteObjects unexpected failure: %s", e.Error()) + // } + // doneDeletes.Done() + // } + // doneDeletes.Add(1) + // go doDelete(bucket, delete) + // } + // // Advance to next versions + // if listVersions.IsTruncated == nil || !*listVersions.IsTruncated { + // break + // } + // keyMarker = listVersions.NextKeyMarker + // versionId = listVersions.NextVersionIdMarker + // } else { + // // The bucket may not exist, just ignore in that case + // if strings.HasPrefix(listErr.Error(), "NoSuchBucket") { + // return + // } + // err = fmt.Errorf("ListObjectVersions unexpected failure: %v", listErr) + // break + // } + // } + // // Wait for deletes to finish + // doneDeletes.Wait() + + // If error, it is fatal + // if err != nil { + // log.Fatalf("FATAL: Unable to delete objects from bucket: %v", err) + // } } // canonicalAmzHeaders -- return the x-amz headers canonicalized @@ -205,29 +235,57 @@ func setSignature(req *http.Request) { req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature)) } -func runUpload(thread_num int) { +func runUpload(thread_num int, keys *sync.Map) { + errcnt := 0 + svc := s3.New(session.New(), cfg) for time.Now().Before(endtime) { objnum := atomic.AddInt32(&upload_count, 1) fileobj := bytes.NewReader(object_data) - prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) - req, _ := http.NewRequest("PUT", prefix, fileobj) - req.Header.Set("Content-Length", strconv.FormatUint(object_size, 10)) - req.Header.Set("Content-MD5", object_data_md5) - setSignature(req) - if resp, err := httpClient.Do(req); err != nil { - log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err) - } else if resp != nil && resp.StatusCode != http.StatusOK { - if (resp.StatusCode == http.StatusServiceUnavailable) { - atomic.AddInt32(&upload_slowdown_count, 1) - atomic.AddInt32(&upload_count, -1) - } else { - fmt.Printf("Upload status %s: resp: %+v\n", resp.Status, resp) - if resp.Body != nil { - body, _ := ioutil.ReadAll(resp.Body) - fmt.Printf("Body: %s\n", string(body)) - } - } + //prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) + + key := fmt.Sprintf("Object-%d", objnum) + r := &s3.PutObjectInput{ + Bucket: &bucket, + Key: &key, + Body: fileobj, } + + req, _ := svc.PutObjectRequest(r) + // Disable payload checksum calculation (very expensive) + req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") + err := req.Send() + if err != nil { + errcnt++ + atomic.AddInt32(&upload_slowdown_count, 1) + atomic.AddInt32(&upload_count, -1) + fmt.Println("upload err", err) + //break + } + if errcnt > 2 { + break + } + keys.Store(key, nil) + fmt.Printf("upload thread %v, %v\r", thread_num, key) + + // req, _ := http.NewRequest("PUT", prefix, fileobj) + // req.Header.Set("Content-Length", strconv.FormatUint(object_size, 10)) + // req.Header.Set("Content-MD5", object_data_md5) + // setSignature(req) + // if resp, err := httpClient.Do(req); err != nil { + // log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err) + // } else if resp != nil && resp.StatusCode != http.StatusOK { + // if resp.StatusCode == http.StatusServiceUnavailable { + // atomic.AddInt32(&upload_slowdown_count, 1) + // atomic.AddInt32(&upload_count, -1) + // } else { + // fmt.Printf("Upload status %s: resp: %+v\n", resp.Status, resp) + // if resp.Body != nil { + // body, _ := ioutil.ReadAll(resp.Body) + // fmt.Printf("Body: %s\n", string(body)) + // } + // } + // } + } // Remember last done time upload_finish = time.Now() @@ -235,24 +293,73 @@ func runUpload(thread_num int) { atomic.AddInt32(&running_threads, -1) } -func runDownload(thread_num int) { - for time.Now().Before(endtime) { - atomic.AddInt32(&download_count, 1) - objnum := rand.Int31n(download_count) + 1 - prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) - req, _ := http.NewRequest("GET", prefix, nil) - setSignature(req) - if resp, err := httpClient.Do(req); err != nil { - log.Fatalf("FATAL: Error downloading object %s: %v", prefix, err) - } else if resp != nil && resp.Body != nil { - if (resp.StatusCode == http.StatusServiceUnavailable){ - atomic.AddInt32(&download_slowdown_count, 1) - atomic.AddInt32(&download_count, -1) - } else { - io.Copy(ioutil.Discard, resp.Body) - } +func runDownload(thread_num int, keys *sync.Map) { + errcnt := 0 + svc := s3.New(session.New(), cfg) + + keys.Range(func(k, value interface{}) bool { + // for { + // objnum := atomic.AddInt32(&delete_count, 1) + // if objnum > upload_count { + // delete_count = 0 + // } + // key := fmt.Sprintf("Object-%d", objnum) + + // for key, _ := keys.Range() { + if time.Now().After(endtime) { + // fmt.Println("time ended for download") + return false } - } + var key string + var ok bool + if key, ok = k.(string); !ok { + log.Fatal("convert key back error") + } + + fmt.Printf("download thread %v, %v\r", thread_num, key) + + // atomic.AddInt32(&download_count, 1) + // objnum := rand.Int31n(download_count) + 1 + // key := fmt.Sprintf("Object-%d", objnum) + r := &s3.GetObjectInput{ + Bucket: &bucket, + Key: &key, + } + + req, resp := svc.GetObjectRequest(r) + err := req.Send() + if err != nil { + errcnt++ + atomic.AddInt32(&download_slowdown_count, 1) + atomic.AddInt32(&download_count, -1) + fmt.Println("download err", err) + //break + } + if err == nil { + _, err = io.Copy(ioutil.Discard, resp.Body) + } + if errcnt > 2 { + return false + } + atomic.AddInt32(&download_count, 1) + + // prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) + // req, _ := http.NewRequest("GET", prefix, nil) + // setSignature(req) + // if resp, err := httpClient.Do(req); err != nil { + // log.Fatalf("FATAL: Error downloading object %s: %v", prefix, err) + // } else if resp != nil && resp.Body != nil { + // if resp.StatusCode == http.StatusServiceUnavailable { + // atomic.AddInt32(&download_slowdown_count, 1) + // atomic.AddInt32(&download_count, -1) + // } else { + // io.Copy(ioutil.Discard, resp.Body) + // } + // } + + return true + }) + // Remember last done time download_finish = time.Now() // One less thread @@ -260,20 +367,42 @@ func runDownload(thread_num int) { } func runDelete(thread_num int) { + errcnt := 0 + svc := s3.New(session.New(), cfg) for { objnum := atomic.AddInt32(&delete_count, 1) if objnum > upload_count { break } - prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) - req, _ := http.NewRequest("DELETE", prefix, nil) - setSignature(req) - if resp, err := httpClient.Do(req); err != nil { - log.Fatalf("FATAL: Error deleting object %s: %v", prefix, err) - } else if (resp != nil && resp.StatusCode == http.StatusServiceUnavailable) { + key := fmt.Sprintf("Object-%d", objnum) + r := &s3.DeleteObjectInput{ + Bucket: &bucket, + Key: &key, + } + + req, out := svc.DeleteObjectRequest(r) + err := req.Send() + if err != nil { + errcnt++ atomic.AddInt32(&delete_slowdown_count, 1) atomic.AddInt32(&delete_count, -1) + fmt.Println("download err", err, "out", out.String()) + //break } + if errcnt > 2 { + break + } + fmt.Printf("delete thread %v, %v\r", thread_num, key) + + // prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) + // req, _ := http.NewRequest("DELETE", prefix, nil) + // setSignature(req) + // if resp, err := httpClient.Do(req); err != nil { + // log.Fatalf("FATAL: Error deleting object %s: %v", prefix, err) + // } else if resp != nil && resp.StatusCode == http.StatusServiceUnavailable { + // atomic.AddInt32(&delete_slowdown_count, 1) + // atomic.AddInt32(&delete_count, -1) + // } } // Remember last done time delete_finish = time.Now() @@ -281,21 +410,19 @@ func runDelete(thread_num int) { atomic.AddInt32(&running_threads, -1) } -func main() { - // Hello - fmt.Println("Wasabi benchmark program v2.0") +var cfg *aws.Config +func init() { // Parse command line myflag := flag.NewFlagSet("myflag", flag.ExitOnError) - myflag.StringVar(&access_key, "a", "", "Access key") - myflag.StringVar(&secret_key, "s", "", "Secret key") - myflag.StringVar(&url_host, "u", "http://s3.wasabisys.com", "URL for host with method prefix") - myflag.StringVar(&bucket, "b", "wasabi-benchmark-bucket", "Bucket for testing") + myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key") + myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key") + myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix") + myflag.StringVar(&bucket, "b", "loadgen", "Bucket for testing") myflag.StringVar(®ion, "r", "us-east-1", "Region for testing") myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds") myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") - var sizeArg string myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G") if err := myflag.Parse(os.Args[1:]); err != nil { os.Exit(1) @@ -308,10 +435,28 @@ func main() { if secret_key == "" { log.Fatal("Missing argument -s for secret key.") } + if url_host == "" { + log.Fatal("Missing argument -s for host endpoint.") + } var err error if object_size, err = bytefmt.ToBytes(sizeArg); err != nil { log.Fatalf("Invalid -z argument for object size: %v", err) } +} + +func main() { + // Hello + fmt.Println("Wasabi benchmark program v2.0") + + //fmt.Println("accesskey:", access_key, "secretkey:", secret_key) + cfg = &aws.Config{ + Endpoint: aws.String(url_host), + Credentials: credentials.NewStaticCredentials(access_key, secret_key, ""), + Region: aws.String(region), + // DisableParamValidation: aws.Bool(true), + DisableComputeChecksums: aws.Bool(true), + S3ForcePathStyle: aws.Bool(true), + } // Echo the parameters logit(fmt.Sprintf("Parameters: url=%s, bucket=%s, region=%s, duration=%d, threads=%d, loops=%d, size=%s", @@ -339,12 +484,14 @@ func main() { delete_count = 0 delete_slowdown_count = 0 + keys := &sync.Map{} + // Run the upload case running_threads = int32(threads) starttime := time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 1; n <= threads; n++ { - go runUpload(n) + go runUpload(n, keys) } // Wait for it to finish @@ -357,12 +504,19 @@ func main() { logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d", loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count)) + // count := 0 + // keys.Range(func(k, value interface{}) bool { + // count++ + // return true + // }) + // fmt.Println("map got ", count) + // Run the download case running_threads = int32(threads) starttime = time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 1; n <= threads; n++ { - go runDownload(n) + go runDownload(n, keys) } // Wait for it to finish diff --git a/s3-benchmark.ubuntu b/s3-benchmark.ubuntu deleted file mode 100755 index 39c18a4..0000000 Binary files a/s3-benchmark.ubuntu and /dev/null differ