From c44d89e22fefed8f26697759a758197ea84aaaf1 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 3 Aug 2013 19:08:17 -0700 Subject: [PATCH] blob: more porting blobref to new pkg blob Change-Id: Id7f0dd80dd6ce5fa55a2c04e2e2a882a5ef8add6 --- pkg/blob/chanpeek.go | 78 ++++++++++++++ pkg/blob/fetcher.go | 235 +++++++++++++++++++++++++++++++++++++++++++ pkg/blob/ref.go | 163 ++++++++++++++++++++++++++---- pkg/blob/ref_test.go | 101 +++++++++++++++++-- 4 files changed, 548 insertions(+), 29 deletions(-) create mode 100644 pkg/blob/chanpeek.go create mode 100644 pkg/blob/fetcher.go diff --git a/pkg/blob/chanpeek.go b/pkg/blob/chanpeek.go new file mode 100644 index 000000000..b08f389c5 --- /dev/null +++ b/pkg/blob/chanpeek.go @@ -0,0 +1,78 @@ +/* +Copyright 2011 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blob + +// TODO: use Generics if/when available +type ChanPeeker struct { + Ch <-chan SizedRef + + // A channel should either have a peek value or be closed: + peek *SizedRef + closed bool +} + +func (cp *ChanPeeker) MustPeek() SizedRef { + sr, ok := cp.Peek() + if !ok { + panic("No Peek value available") + } + return sr +} + +func (cp *ChanPeeker) Peek() (sr SizedRef, ok bool) { + if cp.closed { + return + } + if cp.peek != nil { + return *cp.peek, true + } + v, ok := <-cp.Ch + if !ok { + cp.closed = true + return + } + cp.peek = &v + return *cp.peek, true +} + +func (cp *ChanPeeker) Closed() bool { + cp.Peek() + return cp.closed +} + +func (cp *ChanPeeker) MustTake() SizedRef { + sr, ok := cp.Take() + if !ok { + panic("MustTake called on empty channel") + } + return sr +} + +func (cp *ChanPeeker) Take() (sr SizedRef, ok bool) { + v, ok := cp.Peek() + if !ok { + return + } + cp.peek = nil + return v, true +} + +func (cp *ChanPeeker) ConsumeAll() { + for !cp.Closed() { + cp.Take() + } +} diff --git a/pkg/blob/fetcher.go b/pkg/blob/fetcher.go new file mode 100644 index 000000000..bb4b4cede --- /dev/null +++ b/pkg/blob/fetcher.go @@ -0,0 +1,235 @@ +/* +Copyright 2011 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blob + +import ( + "bytes" + "crypto" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + + "camlistore.org/pkg/osutil" + "camlistore.org/pkg/types" +) + +// TODO: rename StreamingFetcher to be Fetcher (the common case) + +// TODO: add FetcherAt / FetchAt (for HTTP range requests). But then how +// to make all SeekFetcer also be a FetchAt? By hand? + +type SeekFetcher interface { + // Fetch returns a blob. If the blob is not found then + // os.ErrNotExist should be returned for the error (not a wrapped + // error with a ErrNotExist inside) + // + // The caller should close blob. + Fetch(Ref) (blob types.ReadSeekCloser, size int64, err error) +} + +// SeekTester is the interface implemented by storage implementations that don't +// know until runtime whether or not their StreamingFetcher happens to also +// return a ReadCloser that's also a ReadSeekCloser. +type SeekTester interface { + IsFetcherASeeker() bool +} + +// fetcherToSeekerWrapper wraps a StreamingFetcher and converts it into +// a SeekFetcher if SeekTester has confirmed the interface conversion +// is safe. +type fetcherToSeekerWrapper struct { + StreamingFetcher +} + +func (w *fetcherToSeekerWrapper) Fetch(r Ref) (file types.ReadSeekCloser, size int64, err error) { + rc, size, err := w.StreamingFetcher.FetchStreaming(r) + if err != nil { + return + } + file = rc.(types.ReadSeekCloser) + return +} + +type StreamingFetcher interface { + // FetchStreaming returns a blob. If the blob is not found then + // os.ErrNotExist should be returned for the error (not a wrapped + // error with a ErrNotExist inside) + // + // The caller should close blob. + FetchStreaming(Ref) (blob io.ReadCloser, size int64, err error) +} + +func NewSerialFetcher(fetchers ...SeekFetcher) SeekFetcher { + return &serialFetcher{fetchers} +} + +func NewSerialStreamingFetcher(fetchers ...StreamingFetcher) StreamingFetcher { + return &serialStreamingFetcher{fetchers} +} + +func NewSimpleDirectoryFetcher(dir string) *DirFetcher { + return &DirFetcher{dir, "camli"} +} + +func NewConfigDirFetcher() *DirFetcher { + configDir := filepath.Join(osutil.CamliConfigDir(), "keyblobs") + return NewSimpleDirectoryFetcher(configDir) +} + +type serialFetcher struct { + fetchers []SeekFetcher +} + +func (sf *serialFetcher) Fetch(r Ref) (file types.ReadSeekCloser, size int64, err error) { + for _, fetcher := range sf.fetchers { + file, size, err = fetcher.Fetch(r) + if err == nil { + return + } + } + return + +} + +type serialStreamingFetcher struct { + fetchers []StreamingFetcher +} + +func (sf *serialStreamingFetcher) FetchStreaming(r Ref) (file io.ReadCloser, size int64, err error) { + for _, fetcher := range sf.fetchers { + file, size, err = fetcher.FetchStreaming(r) + if err == nil { + return + } + } + return +} + +type DirFetcher struct { + directory, extension string +} + +func (df *DirFetcher) FetchStreaming(r Ref) (file io.ReadCloser, size int64, err error) { + return df.Fetch(r) +} + +func (df *DirFetcher) Fetch(r Ref) (file types.ReadSeekCloser, size int64, err error) { + fileName := fmt.Sprintf("%s/%s.%s", df.directory, r.String(), df.extension) + var stat os.FileInfo + stat, err = os.Stat(fileName) + if err != nil { + return + } + file, err = os.Open(fileName) + if err != nil { + return + } + size = stat.Size() + return +} + +// MemoryStore stores blobs in memory and is a Fetcher and +// StreamingFetcher. Its zero value is usable. +type MemoryStore struct { + lk sync.Mutex + m map[string]string +} + +func (s *MemoryStore) AddBlob(hashtype crypto.Hash, data string) (Ref, error) { + if hashtype != crypto.SHA1 { + return Ref{}, errors.New("blobref: unsupported hash type") + } + hash := hashtype.New() + hash.Write([]byte(data)) + bstr := fmt.Sprintf("sha1-%x", hash.Sum(nil)) + s.lk.Lock() + defer s.lk.Unlock() + if s.m == nil { + s.m = make(map[string]string) + } + s.m[bstr] = data + return MustParse(bstr), nil +} + +func (s *MemoryStore) FetchStreaming(b Ref) (file io.ReadCloser, size int64, err error) { + s.lk.Lock() + defer s.lk.Unlock() + if s.m == nil { + return nil, 0, os.ErrNotExist + } + str, ok := s.m[b.String()] + if !ok { + return nil, 0, os.ErrNotExist + } + return ioutil.NopCloser(strings.NewReader(str)), int64(len(str)), nil +} + +// SeekerFromStreamingFetcher returns the most efficient implementation of a seeking fetcher +// from a provided streaming fetcher. +func SeekerFromStreamingFetcher(f StreamingFetcher) SeekFetcher { + if sk, ok := f.(SeekFetcher); ok { + return sk + } + if tester, ok := f.(SeekTester); ok && tester.IsFetcherASeeker() { + return &fetcherToSeekerWrapper{f} + } + return bufferingSeekFetcherWrapper{f} +} + +// bufferingSeekFetcherWrapper is a SeekFetcher that implements +// seeking on a wrapped streaming-only fetcher by buffering the +// content into memory, optionally spilling to disk if local disk is +// available. In practice, most blobs will be "small" (able to fit in +// memory). +type bufferingSeekFetcherWrapper struct { + sf StreamingFetcher +} + +func (b bufferingSeekFetcherWrapper) Fetch(br Ref) (rsc types.ReadSeekCloser, size int64, err error) { + rc, size, err := b.sf.FetchStreaming(br) + if err != nil { + return nil, 0, err + } + defer rc.Close() + + const tryDiskThreshold = 32 << 20 + if size > tryDiskThreshold { + // TODO(bradfitz): disk spilling, if a temp file can be made + } + + // Buffer all to memory + var buf bytes.Buffer + n, err := io.Copy(&buf, rc) + if err != nil { + return nil, 0, fmt.Errorf("Error reading blob %s: %v", br, err) + } + if n != size { + return nil, 0, fmt.Errorf("Read %d bytes of %s; expected %s", n, br, size) + } + return struct { + io.ReadSeeker + io.Closer + }{ + ReadSeeker: io.NewSectionReader(bytes.NewReader(buf.Bytes()), 0, size), + Closer: ioutil.NopCloser(nil), + }, size, nil +} diff --git a/pkg/blob/ref.go b/pkg/blob/ref.go index 2980eb5e7..89c84810a 100644 --- a/pkg/blob/ref.go +++ b/pkg/blob/ref.go @@ -18,6 +18,7 @@ limitations under the License. package blob import ( + "bytes" "crypto/sha1" "fmt" "hash" @@ -47,12 +48,17 @@ type SizedRef struct { Size int64 } +func (sr SizedRef) String() string { + return fmt.Sprintf("[%s; %d bytes]", sr.Ref.String(), sr.Size) +} + // digestType is an interface type, but any type implementing it must // be of concrete type [N]byte, so it supports equality with ==, // which is a requirement for ref. type digestType interface { bytes() []byte digestName() string + newHash() hash.Hash } func (r Ref) String() string { @@ -62,7 +68,8 @@ func (r Ref) String() string { // TODO: maybe memoize this. dname := r.digest.digestName() bs := r.digest.bytes() - buf := make([]byte, 0, len(dname)+1+len(bs)*2) + buf := getBuf(len(dname) + 1 + len(bs)*2)[:0] + defer putBuf(buf) buf = append(buf, dname...) buf = append(buf, '-') for _, b := range bs { @@ -71,11 +78,79 @@ func (r Ref) String() string { return string(buf) } +// HashName returns the lowercase hash function name of the reference. +// It panics if r is zero. +func (r Ref) HashName() string { + if r.digest == nil { + panic("HashName called on invalid Ref") + } + return r.digest.digestName() +} + +// Digest returns the lower hex digest of the blobref, without +// the e.g. "sha1-" prefix. It panics if r is zero. +func (r Ref) Digest() string { + if r.digest == nil { + panic("Digest called on invalid Ref") + } + bs := r.digest.bytes() + buf := getBuf(len(bs) * 2)[:0] + defer putBuf(buf) + for _, b := range bs { + buf = append(buf, hexDigit[b>>4], hexDigit[b&0xf]) + } + return string(buf) +} + +func (r Ref) DigestPrefix(digits int) string { + v := r.Digest() + if len(v) < digits { + return v + } + return v[:digits] +} + +func (r Ref) DomID() string { + if !r.Valid() { + return "" + } + return "camli-" + r.String() +} + +func (r Ref) Sum32() uint32 { + var v uint32 + for _, b := range r.digest.bytes()[:4] { + v = v<<8 | uint32(b) + } + return v +} + +func (r Ref) Sum64() uint64 { + var v uint64 + for _, b := range r.digest.bytes()[:8] { + v = v<<8 | uint64(b) + } + return v +} + +// Hash returns a new hash.Hash of r's type. +// It panics if r is zero. +func (r Ref) Hash() hash.Hash { + return r.digest.newHash() +} + +func (r Ref) HashMatches(h hash.Hash) bool { + if r.digest == nil { + return false + } + return bytes.Equal(h.Sum(nil), r.digest.bytes()) +} + const hexDigit = "0123456789abcdef" -func (r *Ref) Valid() bool { return r.digest != nil } +func (r Ref) Valid() bool { return r.digest != nil } -func (r *Ref) IsSupported() bool { +func (r Ref) IsSupported() bool { if !r.Valid() { return false } @@ -94,13 +169,14 @@ func Parse(s string) (ref Ref, ok bool) { hex := s[i+1:] meta, ok := metaFromString[name] if !ok { - return parseUnknown(s) + return parseUnknown(name, hex) + } + if len(hex) != meta.size*2 { + ok = false + return } buf := getBuf(meta.size) defer putBuf(buf) - if len(hex) != len(buf)*2 { - return - } bad := false for i := 0; i < len(hex); i += 2 { buf[i/2] = hexVal(hex[i], &bad)<<4 | hexVal(hex[i+1], &bad) @@ -142,10 +218,24 @@ func hexVal(b byte, bad *bool) byte { return 0 } -// parseUnknown parses s where s is a blobref of a digest type not known -// to this server. e.g. ("foo-ababab") -func parseUnknown(s string) (ref Ref, ok bool) { - panic("TODO") +// parseUnknown parses a blobref where the digest type isn't known to this server. +// e.g. ("foo-ababab") +func parseUnknown(digest, hex string) (ref Ref, ok bool) { + if len(hex) < 2 || len(hex)%2 != 0 || len(hex) > maxOtherDigestLen*2 { + return + } + o := otherDigest{ + name: digest, + sumLen: len(hex) / 2, + } + bad := false + for i := 0; i < len(hex); i += 2 { + o.sum[i/2] = hexVal(hex[i], &bad)<<4 | hexVal(hex[i+1], &bad) + } + if bad { + return + } + return Ref{o}, true } func fromSHA1Bytes(b []byte) digestType { @@ -157,9 +247,9 @@ func fromSHA1Bytes(b []byte) digestType { return a } -// FromHash returns a blobref representing the given hash. +// RefFromHash returns a blobref representing the given hash. // It panics if the hash isn't of a known type. -func FromHash(h hash.Hash) Ref { +func RefFromHash(h hash.Hash) Ref { meta, ok := metaFromType[reflect.TypeOf(h)] if !ok { panic(fmt.Sprintf("Currently-unsupported hash type %T", h)) @@ -171,26 +261,33 @@ func FromHash(h hash.Hash) Ref { func SHA1FromString(s string) Ref { s1 := sha1.New() s1.Write([]byte(s)) - return FromHash(s1) + return RefFromHash(s1) } // SHA1FromBytes returns a SHA-1 blobref of the provided bytes. func SHA1FromBytes(b []byte) Ref { s1 := sha1.New() s1.Write(b) - return FromHash(s1) + return RefFromHash(s1) } type sha1Digest [20]byte +func (s sha1Digest) digestName() string { return "sha1" } +func (s sha1Digest) bytes() []byte { return s[:] } +func (s sha1Digest) newHash() hash.Hash { return sha1.New() } + +const maxOtherDigestLen = 128 + type otherDigest struct { name string - sum [128]byte + sum [maxOtherDigestLen]byte sumLen int // bytes in sum that are valid } -func (s sha1Digest) digestName() string { return "sha1" } -func (s sha1Digest) bytes() []byte { return s[:] } +func (d otherDigest) digestName() string { return d.name } +func (d otherDigest) bytes() []byte { return d.sum[:d.sumLen] } +func (d otherDigest) newHash() hash.Hash { return nil } var sha1Meta = &digestMeta{ ctor: fromSHA1Bytes, @@ -220,3 +317,33 @@ func getBuf(size int) []byte { func putBuf(b []byte) { // TODO: pool } + +// NewHash returns a new hash.Hash of the currently recommended hash type. +// Currently this is just SHA-1, but will likely change within the next +// year or so. +func NewHash() hash.Hash { + return sha1.New() +} + +func ValidRefString(s string) bool { + // TODO: optimize to not allocate + return ParseOrZero(s).Valid() +} + +func (r *Ref) UnmarshalJSON(d []byte) error { + if len(d) < 2 || d[0] != '"' || d[len(d)-1] != '"' { + return fmt.Errorf("blob: expecting a JSON string to unmarshal, got %q", d) + } + refStr := string(d[1 : len(d)-1]) + p, ok := Parse(refStr) + if !ok { + return fmt.Errorf("blobref: invalid blobref %q (%d)", refStr, len(refStr)) + } + *r = p + return nil +} + +func (r Ref) MarshalJSON() ([]byte, error) { + // TODO: do just one allocation here if we cared. + return []byte(fmt.Sprintf("%q", r.String())), nil +} diff --git a/pkg/blob/ref_test.go b/pkg/blob/ref_test.go index 20c8443fc..5c6ac2f88 100644 --- a/pkg/blob/ref_test.go +++ b/pkg/blob/ref_test.go @@ -17,21 +17,48 @@ limitations under the License. package blob import ( + "encoding/json" + "strings" "testing" ) +var parseTests = []struct { + in string + bad bool +}{ + {in: "", bad: true}, + {in: "foo", bad: true}, + {in: "-0f", bad: true}, + {in: "sha1-xx", bad: true}, + {in: "-", bad: true}, + {in: "sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"}, + {in: "sha1-0b", bad: true}, + {in: "foo-0b"}, + {in: "foo-0b0c"}, + {in: "foo-0b0cd", bad: true}, // odd number +} + func TestParse(t *testing.T) { - in := "sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33" - r, ok := Parse(in) - if !ok { - t.Fatal("failed to parse") - } - if !r.Valid() { - t.Error("not Valid") - } - got := r.String() - if got != in { - t.Errorf("parse(%q).String = %q; want same input", in, got) + for _, tt := range parseTests { + r, ok := Parse(tt.in) + if r.Valid() != ok { + t.Errorf("Valid != ok for %q", tt.in) + } + if !ok { + if !tt.bad { + t.Errorf("Parse(%q) failed to parse", tt.in) + continue + } + continue + } + str := r.String() + if str != tt.in { + t.Errorf("Parsed %q but String() value differs: %q", tt.in, str) + } + wantDig := str[strings.Index(str, "-")+1:] + if dig := r.Digest(); dig != wantDig { + t.Errorf("Digest(%q) = %q; want %q", tt.in, dig, wantDig) + } } } @@ -51,3 +78,55 @@ func TestEquality(t *testing.T) { t.Errorf("r and r3 should not be equal") } } + +func TestSum32(t *testing.T) { + got := MustParse("sha1-1234567800000000000000000000000000000000").Sum32() + want := uint32(0x12345678) + if got != want { + t.Errorf("Sum32 = %x, want %x", got, want) + } +} + +func TestSum64(t *testing.T) { + got := MustParse("sha1-12345678876543210000000000000000000000ff").Sum64() + want := uint64(0x1234567887654321) + if got != want { + t.Errorf("Sum64 = %x, want %x", got, want) + } +} + +type Foo struct { + B Ref `json:"foo"` +} + +func TestJSONUnmarshal(t *testing.T) { + var f Foo + if err := json.Unmarshal([]byte(`{"foo": "abc-def123", "other": 123}`), &f); err != nil { + t.Fatalf("Unmarshal: %v", err) + } + if !f.B.Valid() { + t.Fatal("blobref is nil") + } + if g, e := f.B.String(), "abc-def123"; g != e { + t.Errorf("got %q, want %q", g, e) + } +} + +func TestJSONMarshal(t *testing.T) { + f := &Foo{B: MustParse("def-1234abcd")} + bs, err := json.Marshal(f) + if err != nil { + t.Fatalf("Marshal: %v", err) + } + if g, e := string(bs), `{"foo":"def-1234abcd"}`; g != e { + t.Errorf("got %q, want %q", g, e) + } +} + +func TestSizedBlobRefString(t *testing.T) { + sr := SizedRef{Ref: MustParse("abc-1234"), Size: 456} + want := "[abc-1234; 456 bytes]" + if got := sr.String(); got != want { + t.Errorf("SizedRef.String() = %q, want %q", got, want) + } +}