/* Copyright 2013 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ /* Package diskpacked registers the "diskpacked" blobserver storage type, storing blobs in sequence of monolithic data files indexed by a kvfile index. Example low-level config: "/storage/": { "handler": "storage-diskpacked", "handlerArgs": { "path": "/var/camlistore/blobs" } }, */ package diskpacked import ( "bytes" "errors" "expvar" "fmt" "io" "log" "os" "path/filepath" "strings" "sync" "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver/local" "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/sorted" "camlistore.org/pkg/sorted/kvfile" "camlistore.org/pkg/syncutil" "camlistore.org/pkg/types" "camlistore.org/third_party/github.com/camlistore/lock" ) // TODO(wathiede): replace with glog.V(2) when we decide our logging story. type debugT bool var debug = debugT(false) func (d debugT) Printf(format string, args ...interface{}) { if bool(d) { log.Printf(format, args...) } } func (d debugT) Println(args ...interface{}) { if bool(d) { log.Println(args...) } } const defaultMaxFileSize = 512 << 20 // 512MB type storage struct { root string index sorted.KeyValue maxFileSize int64 writeLock io.Closer // Provided by lock.Lock, and guards other processes from accesing the file open for writes. mu sync.Mutex // Guards all I/O state. closed bool writer *os.File fds []*os.File size int64 *local.Generationer } func (s *storage) String() string { return fmt.Sprintf("\"diskpacked\" blob packs at %s", s.root) } var ( readVar = expvar.NewMap("diskpacked-read-bytes") readTotVar = expvar.NewMap("diskpacked-total-read-bytes") openFdsVar = expvar.NewMap("diskpacked-open-fds") writeVar = expvar.NewMap("diskpacked-write-bytes") writeTotVar = expvar.NewMap("diskpacked-total-write-bytes") ) const indexKV = "index.kv" // IsDir reports whether dir is a diskpacked directory. func IsDir(dir string) (bool, error) { _, err := os.Stat(filepath.Join(dir, indexKV)) if os.IsNotExist(err) { return false, nil } return err == nil, err } // New returns a diskpacked storage implementation, adding blobs to // the provided directory. It doesn't delete any existing blob pack // files. func New(dir string) (blobserver.Storage, error) { var maxSize int64 if ok, _ := IsDir(dir); ok { // TODO: detect existing max size from size of files, if obvious, // and set maxSize to that? } return newStorage(dir, maxSize) } // newStorage returns a new storage in path root with the given maxFileSize, // or defaultMaxFileSize (512MB) if <= 0 func newStorage(root string, maxFileSize int64) (s *storage, err error) { fi, err := os.Stat(root) if os.IsNotExist(err) { return nil, fmt.Errorf("storage root %q doesn't exist", root) } if err != nil { return nil, fmt.Errorf("Failed to stat directory %q: %v", root, err) } if !fi.IsDir() { return nil, fmt.Errorf("storage root %q exists but is not a directory.", root) } index, err := kvfile.NewStorage(filepath.Join(root, indexKV)) if err != nil { return nil, err } defer func() { if err != nil { index.Close() } }() if maxFileSize <= 0 { maxFileSize = defaultMaxFileSize } // Be consistent with trailing slashes. Makes expvar stats for total // reads/writes consistent across diskpacked targets, regardless of what // people put in their low level config. root = strings.TrimRight(root, `\/`) s = &storage{ root: root, index: index, maxFileSize: maxFileSize, Generationer: local.NewGenerationer(root), } s.mu.Lock() defer s.mu.Unlock() if err := s.openAllPacks(); err != nil { return nil, err } if _, _, err := s.StorageGeneration(); err != nil { return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err) } return s, nil } func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) { path := config.RequiredString("path") maxFileSize := config.OptionalInt("maxFileSize", 0) if err := config.Validate(); err != nil { return nil, err } return newStorage(path, int64(maxFileSize)) } func init() { blobserver.RegisterStorageConstructor("diskpacked", blobserver.StorageConstructor(newFromConfig)) } // openForRead will open pack file n for read and keep a handle to it in // s.fds. os.IsNotExist returned if n >= the number of pack files in s.root. // This function is not thread safe, s.mu should be locked by the caller. func (s *storage) openForRead(n int) error { if n > len(s.fds) { panic(fmt.Sprintf("openForRead called out of order got %d, expected %d", n, len(s.fds))) } fn := s.filename(n) f, err := os.Open(fn) if err != nil { return err } openFdsVar.Add(s.root, 1) debug.Printf("diskpacked: opened for read %q", fn) s.fds = append(s.fds, f) return nil } // openForWrite will create or open pack file n for writes, create a lock // visible external to the process and seek to the end of the file ready for // appending new data. // This function is not thread safe, s.mu should be locked by the caller. func (s *storage) openForWrite(n int) error { fn := s.filename(n) l, err := lock.Lock(fn + ".lock") if err != nil { return err } f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0666) if err != nil { l.Close() return err } openFdsVar.Add(s.root, 1) debug.Printf("diskpacked: opened for write %q", fn) s.size, err = f.Seek(0, os.SEEK_END) if err != nil { return err } s.writer = f s.writeLock = l return nil } // nextPack will close the current writer and release its lock if open, // open the next pack file in sequence for writing, grab its lock, set it // to the currently active writer, and open another copy for read-only use. // This function is not thread safe, s.mu should be locked by the caller. func (s *storage) nextPack() error { debug.Println("diskpacked: nextPack") s.size = 0 if s.writeLock != nil { err := s.writeLock.Close() if err != nil { return err } s.writeLock = nil } if s.writer != nil { if err := s.writer.Close(); err != nil { return err } openFdsVar.Add(s.root, -1) } n := len(s.fds) if err := s.openForWrite(n); err != nil { return err } return s.openForRead(n) } // openAllPacks opens read-only each pack file in s.root, populating s.fds. // The latest pack file will also have a writable handle opened. // This function is not thread safe, s.mu should be locked by the caller. func (s *storage) openAllPacks() error { debug.Println("diskpacked: openAllPacks") n := 0 for { err := s.openForRead(n) if os.IsNotExist(err) { break } if err != nil { s.Close() return err } n++ } if n == 0 { // If no pack files are found, we create one open for read and write. return s.nextPack() } // If 1 or more pack files are found, open the last one read and write. return s.openForWrite(n - 1) } func (s *storage) Close() error { s.mu.Lock() defer s.mu.Unlock() var closeErr error if !s.closed { s.closed = true if err := s.index.Close(); err != nil { log.Println("diskpacked: closing index:", err) } for _, f := range s.fds { if err := f.Close(); err != nil { closeErr = err } openFdsVar.Add(s.root, -1) } s.writer = nil if l := s.writeLock; l != nil { err := l.Close() if closeErr == nil { closeErr = err } s.writeLock = nil } } return closeErr } func (s *storage) FetchStreaming(br blob.Ref) (io.ReadCloser, uint32, error) { return s.Fetch(br) } func (s *storage) Fetch(br blob.Ref) (types.ReadSeekCloser, uint32, error) { meta, err := s.meta(br) if err != nil { return nil, 0, err } if meta.file >= len(s.fds) { return nil, 0, fmt.Errorf("diskpacked: attempt to fetch blob from out of range pack file %d > %d", meta.file, len(s.fds)) } rac := s.fds[meta.file] var rs io.ReadSeeker = io.NewSectionReader(rac, meta.offset, int64(meta.size)) fn := rac.Name() // Ensure entry is in map. readVar.Add(fn, 0) if v, ok := readVar.Get(fn).(*expvar.Int); ok { rs = types.NewStatsReadSeeker(v, rs) } readTotVar.Add(s.root, 0) if v, ok := readTotVar.Get(s.root).(*expvar.Int); ok { rs = types.NewStatsReadSeeker(v, rs) } rsc := struct { io.ReadSeeker io.Closer }{ rs, types.NopCloser, } return rsc, meta.size, nil } func (s *storage) filename(file int) string { return filepath.Join(s.root, fmt.Sprintf("pack-%05d.blobs", file)) } func (s *storage) RemoveBlobs(blobs []blob.Ref) error { // TODO(adg): remove blob from index and pad data with spaces return blobserver.ErrNotImplemented } var statGate = syncutil.NewGate(20) // arbitrary func (s *storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) (err error) { var wg syncutil.Group for _, br := range blobs { br := br statGate.Start() wg.Go(func() error { defer statGate.Done() m, err := s.meta(br) if err == nil { dest <- m.SizedRef(br) return nil } if err == os.ErrNotExist { return nil } return err }) } return wg.Err() } func (s *storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) { defer close(dest) t := s.index.Find(after, "") defer func() { closeErr := t.Close() if err == nil { err = closeErr } }() for i := 0; i < limit && t.Next(); { key := t.Key() if key <= after { // EnumerateBlobs' semantics are '>', but sorted.KeyValue.Find is '>='. continue } br, ok := blob.Parse(key) if !ok { return fmt.Errorf("diskpacked: couldn't parse index key %q", key) } m, ok := parseBlobMeta(t.Value()) if !ok { return fmt.Errorf("diskpacked: couldn't parse index value %q: %q", key, t.Value()) } select { case dest <- m.SizedRef(br): case <-ctx.Done(): return context.ErrCanceled } i++ } return nil } func (s *storage) ReceiveBlob(br blob.Ref, source io.Reader) (sbr blob.SizedRef, err error) { var b bytes.Buffer n, err := b.ReadFrom(source) if err != nil { return } sbr = blob.SizedRef{Ref: br, Size: uint32(n)} // Check if it's a dup. Still accept it if the pack file on disk seems to be corrupt // or truncated. if m, err := s.meta(br); err == nil { fi, err := os.Stat(s.filename(m.file)) if err == nil && fi.Size() >= m.offset+int64(m.size) { return sbr, nil } } err = s.append(sbr, &b) return } // append writes the provided blob to the current data file. func (s *storage) append(br blob.SizedRef, r io.Reader) error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return errors.New("diskpacked: write to closed storage") } fn := s.writer.Name() n, err := fmt.Fprintf(s.writer, "[%v %v]", br.Ref.String(), br.Size) s.size += int64(n) writeVar.Add(fn, int64(n)) writeTotVar.Add(s.root, int64(n)) if err != nil { return err } // TODO(adg): remove this seek and the offset check once confident offset, err := s.writer.Seek(0, os.SEEK_CUR) if err != nil { return err } if offset != s.size { return fmt.Errorf("diskpacked: seek says offset = %d, we think %d", offset, s.size) } offset = s.size // make this a declaration once the above is removed n2, err := io.Copy(s.writer, r) s.size += n2 writeVar.Add(fn, int64(n)) writeTotVar.Add(s.root, int64(n)) if err != nil { return err } if n2 != int64(br.Size) { return fmt.Errorf("diskpacked: written blob size %d didn't match size %d", n, br.Size) } if err = s.writer.Sync(); err != nil { return err } packIdx := len(s.fds) - 1 if s.size > s.maxFileSize { if err := s.nextPack(); err != nil { return err } } return s.index.Set(br.Ref.String(), blobMeta{packIdx, offset, br.Size}.String()) } // meta fetches the metadata for the specified blob from the index. func (s *storage) meta(br blob.Ref) (m blobMeta, err error) { ms, err := s.index.Get(br.String()) if err != nil { if err == sorted.ErrNotFound { err = os.ErrNotExist } return } m, ok := parseBlobMeta(ms) if !ok { err = fmt.Errorf("diskpacked: bad blob metadata: %q", ms) } return } // blobMeta is the blob metadata stored in the index. type blobMeta struct { file int offset int64 size uint32 } func parseBlobMeta(s string) (m blobMeta, ok bool) { n, err := fmt.Sscan(s, &m.file, &m.offset, &m.size) return m, n == 3 && err == nil } func (m blobMeta) String() string { return fmt.Sprintf("%v %v %v", m.file, m.offset, m.size) } func (m blobMeta) SizedRef(br blob.Ref) blob.SizedRef { return blob.SizedRef{Ref: br, Size: m.size} }