Merge "pkg/server: stream images not slurp to RAM."

This commit is contained in:
Brad Fitzpatrick 2014-01-04 19:43:24 +00:00 committed by Gerrit Code Review
commit abfd09567f
1 changed files with 21 additions and 15 deletions

View File

@ -161,17 +161,12 @@ func (ih *ImageHandler) scaledCached(buf *bytes.Buffer, file blob.Ref) (format s
return format return format
} }
// These gates control the max concurrency of slurping raw images // Gate the number of concurrent image resizes to limit RAM & CPU use.
// (e.g. JPEG bytes) to RAM, and then decoding and resizing them,
// respectively. We allow more concurrency for the former because
// it's slower and less memory-intensive. The actual resizing takes
// much more CPU and RAM.
// TODO: these numbers were just guesses and not based on any // TODO: this number is just a guess and not based on any
// data. measure? make these configurable? Automatically tuned // data. measure? make these configurable? Automatically tuned
// somehow? Based on memory usage/availability? // somehow? Based on memory usage/availability?
var ( var (
scaleImageGateSlurp = syncutil.NewGate(5)
scaleImageGateResize = syncutil.NewGate(2) scaleImageGateResize = syncutil.NewGate(2)
) )
@ -180,6 +175,18 @@ type formatAndImage struct {
image []byte image []byte
} }
// TODO(wathiede): move to a common location if the pattern of TeeReader'ing
// to a statWriter proves useful.
type statWriter struct {
*expvar.Int
}
func (sw statWriter) Write(p []byte) (int, error) {
c := len(p)
sw.Add(int64(c))
return c, nil
}
func (ih *ImageHandler) scaleImage(fileRef blob.Ref) (*formatAndImage, error) { func (ih *ImageHandler) scaleImage(fileRef blob.Ref) (*formatAndImage, error) {
fr, err := schema.NewFileReader(ih.storageSeekFetcher(), fileRef) fr, err := schema.NewFileReader(ih.storageSeekFetcher(), fileRef)
if err != nil { if err != nil {
@ -187,11 +194,8 @@ func (ih *ImageHandler) scaleImage(fileRef blob.Ref) (*formatAndImage, error) {
} }
defer fr.Close() defer fr.Close()
var buf bytes.Buffer sw := statWriter{imageBytesFetchedVar}
scaleImageGateSlurp.Start() tr := io.TeeReader(fr, sw)
n, err := io.Copy(&buf, fr)
scaleImageGateSlurp.Done()
imageBytesFetchedVar.Add(n)
if err != nil { if err != nil {
return nil, fmt.Errorf("image resize: error reading image %s: %v", fileRef, err) return nil, fmt.Errorf("image resize: error reading image %s: %v", fileRef, err)
@ -200,8 +204,10 @@ func (ih *ImageHandler) scaleImage(fileRef blob.Ref) (*formatAndImage, error) {
scaleImageGateResize.Start() scaleImageGateResize.Start()
defer scaleImageGateResize.Done() defer scaleImageGateResize.Done()
i, imConfig, err := images.Decode(bytes.NewReader(buf.Bytes()), i, imConfig, err := images.Decode(tr, &images.DecodeOpts{
&images.DecodeOpts{MaxWidth: ih.MaxWidth, MaxHeight: ih.MaxHeight}) MaxWidth: ih.MaxWidth,
MaxHeight: ih.MaxHeight,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -218,9 +224,9 @@ func (ih *ImageHandler) scaleImage(fileRef blob.Ref) (*formatAndImage, error) {
b = i.Bounds() b = i.Bounds()
} }
var buf bytes.Buffer
if !useBytesUnchanged { if !useBytesUnchanged {
// Encode as a new image // Encode as a new image
buf.Reset()
switch format { switch format {
case "png": case "png":
err = png.Encode(&buf, i) err = png.Encode(&buf, i)