mirror of https://github.com/perkeep/perkeep.git
463 lines
11 KiB
Go
463 lines
11 KiB
Go
/*
|
|
Copyright 2018 The Perkeep Authors
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
/*
|
|
Package sftp registers the "sftp" blobserver storage type, storing
|
|
blobs one-per-file in a forest of sharded directories to a remote SFTP
|
|
server over an SSH connection. It uses the same directory & file
|
|
structure as the "localdisk" storage type.
|
|
|
|
Example low-level config:
|
|
|
|
"/storage/": {
|
|
"handler": "storage-sftp",
|
|
"handlerArgs": {
|
|
"user": "alice",
|
|
"addr": "10.1.2.3",
|
|
"dir": "/remote/path/to/store/blobs/in",
|
|
"serverFingerprint": "SHA256:fBkTSuUzQVnVMJ9+e74XNTCnQKSHldbfFiOI9kBMemc",
|
|
|
|
"pass": "s3cr3thunteR1!",
|
|
"passFile": "/home/alice/keys/sftp.password"
|
|
}
|
|
},
|
|
*/
|
|
package sftp // import "perkeep.org/pkg/blobserver/sftp"
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"path"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/sftp"
|
|
|
|
"go4.org/jsonconfig"
|
|
"go4.org/syncutil"
|
|
"go4.org/syncutil/singleflight"
|
|
"go4.org/wkfs"
|
|
"golang.org/x/crypto/ssh"
|
|
"perkeep.org/internal/osutil"
|
|
"perkeep.org/pkg/blob"
|
|
"perkeep.org/pkg/blobserver"
|
|
"perkeep.org/pkg/blobserver/files"
|
|
)
|
|
|
|
// Storage implements the blobserver.Storage interface using an SFTP server.
|
|
type Storage struct {
|
|
blobserver.Storage
|
|
blob.SubFetcher
|
|
|
|
addr, root string
|
|
cc *ssh.ClientConfig
|
|
|
|
getClientGroup singleflight.Group
|
|
|
|
mu sync.Mutex
|
|
lastGet time.Time // time last fetched
|
|
sc *sftp.Client
|
|
connCloser io.Closer // ssh.Conn or net.Conn
|
|
}
|
|
|
|
// Validate we implement expected interfaces.
|
|
var (
|
|
_ blobserver.Storage = (*Storage)(nil)
|
|
_ blob.SubFetcher = (*Storage)(nil)
|
|
)
|
|
|
|
func (s *Storage) String() string {
|
|
return fmt.Sprintf("\"sftp\" file-per-blob at %s@%s, dir %s", s.cc.User, s.addr, s.root)
|
|
}
|
|
|
|
const (
|
|
// We refuse to create a Storage when the user's ulimit is lower than
|
|
// minFDLimit. 100 is ridiculously low, but the default value on OSX is 256, and we
|
|
// don't want to fail by default, so our min value has to be lower than 256.
|
|
minFDLimit = 100
|
|
recommendedFDLimit = 1024
|
|
)
|
|
|
|
// NewStorage returns a new SFTP storage implementation at the provided
|
|
// TCP addr (host:port) in the named directory. An empty dir means ".".
|
|
// The provided SSH client configured is required.
|
|
func NewStorage(addr, dir string, cc *ssh.ClientConfig) (*Storage, error) {
|
|
if dir == "" {
|
|
dir = "."
|
|
}
|
|
s := &Storage{
|
|
addr: addr,
|
|
root: dir,
|
|
cc: cc,
|
|
}
|
|
fs := files.NewStorage(sftpFS{s}, dir)
|
|
s.Storage = fs
|
|
s.SubFetcher = fs
|
|
|
|
if err := s.adjustFDLimit(fs); err != nil {
|
|
return nil, err
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func (s *Storage) adjustFDLimit(fs *files.Storage) error {
|
|
ul, err := osutil.MaxFD()
|
|
if errors.Is(err, osutil.ErrNotSupported) {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("sftp failed to determine system's file descriptor limit: %w", err)
|
|
}
|
|
if ul < minFDLimit {
|
|
return fmt.Errorf("the max number of open file descriptors on your system (ulimit -n) is too low. Please fix it with 'ulimit -S -n X' with X being at least %d", recommendedFDLimit)
|
|
}
|
|
// Setting the gate to 80% of the ulimit, to leave a bit of room for other file ops happening in Perkeep.
|
|
// TODO(mpl): make this used and enforced Perkeep-wide. Issue #837.
|
|
fs.SetNewFileGate(syncutil.NewGate(int(ul * 80 / 100)))
|
|
return nil
|
|
}
|
|
|
|
func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
|
|
user := config.RequiredString("user")
|
|
dir := config.RequiredString("dir")
|
|
addr := config.RequiredString("addr")
|
|
pass := config.OptionalString("pass", "")
|
|
passFile := config.OptionalString("passFile", "")
|
|
wantFingerprint := config.RequiredString("serverFingerprint")
|
|
|
|
if err := config.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if pass != "" && passFile != "" {
|
|
return nil, errors.New(`the "pass" and "passFile" options are mutually exclusive`)
|
|
}
|
|
if passFile != "" {
|
|
slurp, err := wkfs.ReadFile(passFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pass = strings.TrimSpace(string(slurp))
|
|
}
|
|
|
|
if _, _, err := net.SplitHostPort(addr); err != nil {
|
|
addr = net.JoinHostPort(addr, "22")
|
|
}
|
|
cc := &ssh.ClientConfig{
|
|
User: user,
|
|
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
|
|
keyPrint := ssh.FingerprintSHA256(key)
|
|
if keyPrint == wantFingerprint {
|
|
log.Printf("sftp: connected to %s at %v@%v, with expected fingerprint %v",
|
|
hostname, user, remote, ssh.FingerprintSHA256(key))
|
|
return nil
|
|
}
|
|
if wantFingerprint == "insecure-skip-verify" {
|
|
log.Printf(`sftp: WARNNING: using "insecure-skip-verify", connected to %s at %v@%v, with untrusted fingerprint %v`,
|
|
hostname, user, remote, ssh.FingerprintSHA256(key))
|
|
return nil
|
|
}
|
|
return fmt.Errorf(`sftp: unexpected fingerprint %q connecting to %v/%v; want %q (or "insecure-skip-verify")`,
|
|
keyPrint, hostname, remote, wantFingerprint)
|
|
},
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
if pass != "" {
|
|
cc.Auth = []ssh.AuthMethod{ssh.Password(pass)}
|
|
}
|
|
return NewStorage(addr, dir, cc)
|
|
}
|
|
|
|
// markConnDead clears the cached SFTP connection after the caller detects
|
|
// a connection failure.
|
|
func (s *Storage) markConnDead() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.markConnDeadLocked()
|
|
}
|
|
|
|
func (s *Storage) markConnDeadLocked() {
|
|
if s.connCloser != nil {
|
|
go s.connCloser.Close()
|
|
}
|
|
s.sc = nil
|
|
s.connCloser = nil
|
|
}
|
|
|
|
func (s *Storage) monitorSSHConn(wait func() error) {
|
|
err := wait()
|
|
log.Printf("sftp: marking SSH connection dead: %v", err)
|
|
s.markConnDead()
|
|
}
|
|
|
|
func (s *Storage) dialSFTP() (sc *sftp.Client, waiter func() error, toClose io.Closer, err error) {
|
|
// Special case for testing:
|
|
if s.cc.User == "RAWSFTPNOSSH" {
|
|
var c net.Conn
|
|
c, err = net.Dial("tcp", s.addr)
|
|
if err != nil {
|
|
return
|
|
}
|
|
sc, err = sftp.NewClientPipe(c, c)
|
|
if err != nil {
|
|
go c.Close()
|
|
return
|
|
}
|
|
toClose = c
|
|
return
|
|
}
|
|
|
|
var pw io.WriteCloser
|
|
var pr io.Reader
|
|
|
|
// Another special case for testing:
|
|
user := s.cc.User
|
|
const sysPrefix = "use-system-ssh:"
|
|
if strings.HasPrefix(user, sysPrefix) {
|
|
user = strings.TrimPrefix(user, sysPrefix)
|
|
cmd := exec.Command("ssh", user+"@"+strings.TrimSuffix(s.addr, ":22"), "-s", "sftp")
|
|
cmd.Stderr = os.Stderr
|
|
// get stdin and stdout
|
|
pw, err = cmd.StdinPipe()
|
|
if err != nil {
|
|
err = fmt.Errorf("%v: %w", cmd.Args, err)
|
|
return
|
|
}
|
|
pr, err = cmd.StdoutPipe()
|
|
if err != nil {
|
|
err = fmt.Errorf("%v: %w", cmd.Args, err)
|
|
return
|
|
}
|
|
|
|
// start the process
|
|
if err = cmd.Start(); err != nil {
|
|
err = fmt.Errorf("%v: %w", cmd.Args, err)
|
|
return
|
|
}
|
|
log.Printf("using sftp directly")
|
|
sc, err = sftp.NewClientPipe(pr, pw)
|
|
return
|
|
}
|
|
|
|
var sshc *ssh.Client
|
|
sshc, err = ssh.Dial("tcp", s.addr, s.cc)
|
|
if err != nil {
|
|
log.Printf("sftp: Dial: %v", err)
|
|
return
|
|
}
|
|
|
|
var sess *ssh.Session
|
|
sess, err = sshc.NewSession()
|
|
if err != nil {
|
|
log.Printf("sftp: ssh NewSession: %v", err)
|
|
go sshc.Close()
|
|
return
|
|
}
|
|
if err = sess.RequestSubsystem("sftp"); err != nil {
|
|
log.Printf("sftp: RequestSubsystem: %v", err)
|
|
go sshc.Close()
|
|
return
|
|
}
|
|
pw, err = sess.StdinPipe()
|
|
if err != nil {
|
|
go sshc.Close()
|
|
return
|
|
}
|
|
pr, err = sess.StdoutPipe()
|
|
if err != nil {
|
|
go sshc.Close()
|
|
return
|
|
}
|
|
|
|
sc, err = sftp.NewClientPipe(pr, pw)
|
|
if err != nil {
|
|
go sshc.Close()
|
|
return
|
|
}
|
|
toClose = sshc
|
|
waiter = sshc.Wait
|
|
return
|
|
}
|
|
|
|
// sftp returns the *sftp.Client to the server, handling reconnects and coalesced dialing
|
|
// for concurrent callers.
|
|
func (s *Storage) sftp() (*sftp.Client, error) {
|
|
s.mu.Lock()
|
|
if s.sc != nil {
|
|
defer s.mu.Unlock()
|
|
// TODO: remove all this "lastGet" stuff once the wait
|
|
// mechanism has test coverage and we're sending
|
|
// periodic heartbeats over the SSH channel.
|
|
if now := time.Now(); s.lastGet.After(now.Add(-30 * time.Second)) {
|
|
s.lastGet = now
|
|
return s.sc, nil
|
|
} else {
|
|
// It's been awhile. Let's see if it's still good.
|
|
_, err := s.sc.Stat(".")
|
|
if err != nil {
|
|
s.markConnDeadLocked()
|
|
} else {
|
|
s.lastGet = now
|
|
return s.sc, nil
|
|
}
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
ci, err := s.getClientGroup.Do("", func() (interface{}, error) {
|
|
sc, waiter, toClose, err := s.dialSFTP()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if waiter != nil {
|
|
go s.monitorSSHConn(waiter)
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.connCloser = toClose
|
|
s.sc = sc
|
|
s.lastGet = time.Now()
|
|
return sc, nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return ci.(*sftp.Client), nil
|
|
}
|
|
|
|
func init() {
|
|
blobserver.RegisterStorageConstructor("sftp", blobserver.StorageConstructor(newFromConfig))
|
|
}
|
|
|
|
type sftpFS struct {
|
|
*Storage
|
|
}
|
|
|
|
func (s sftpFS) Remove(file string) error {
|
|
sc, err := s.sftp()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return sc.Remove(filepath.ToSlash(file))
|
|
}
|
|
|
|
func (s sftpFS) RemoveDir(dir string) error {
|
|
sc, err := s.sftp()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return sc.RemoveDirectory(filepath.ToSlash(dir))
|
|
}
|
|
|
|
func (s sftpFS) Open(file string) (files.ReadableFile, error) {
|
|
sc, err := s.sftp()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return sc.Open(filepath.ToSlash(file))
|
|
}
|
|
|
|
func (s sftpFS) Rename(oldname, newname string) error {
|
|
sc, err := s.sftp()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return sc.PosixRename(filepath.ToSlash(oldname), filepath.ToSlash(newname))
|
|
}
|
|
|
|
func (s sftpFS) TempFile(dir, prefix string) (files.WritableFile, error) {
|
|
sc, err := s.sftp()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dir = filepath.ToSlash(dir)
|
|
for tries := 0; tries < 5; tries++ {
|
|
sufRand := make([]byte, 5)
|
|
_, _ = rand.Read(sufRand)
|
|
suffix := fmt.Sprintf("%x", sufRand)
|
|
name := path.Join(dir, prefix+suffix)
|
|
f, err := sc.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR)
|
|
if err == nil {
|
|
return writableFile{
|
|
name: name,
|
|
WriteCloser: f,
|
|
}, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("sftp: failed to open temp file in %s:%s/%s with prefix %s", s.addr, s.root, dir, prefix)
|
|
}
|
|
|
|
type writableFile struct {
|
|
io.WriteCloser
|
|
name string
|
|
}
|
|
|
|
func (f writableFile) Name() string { return f.name }
|
|
func (f writableFile) Sync() error { return nil } // TODO: send fsync
|
|
|
|
func (s sftpFS) ReadDirNames(dir string) ([]string, error) {
|
|
sc, err := s.sftp()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fis, err := sc.ReadDir(filepath.ToSlash(dir))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// TODO: it's wasteful that we throw all this info away and
|
|
// have the files package restat each file. Change the
|
|
// interface or add an optional one that sftp can implement
|
|
// and make the files package smarter about not asking for
|
|
// duplicate work when possible.
|
|
names := make([]string, len(fis))
|
|
for i, fi := range fis {
|
|
names[i] = fi.Name()
|
|
}
|
|
return names, nil
|
|
}
|
|
|
|
func (s sftpFS) Stat(path string) (os.FileInfo, error) {
|
|
sc, err := s.sftp()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return sc.Stat(filepath.ToSlash(path))
|
|
}
|
|
|
|
func (s sftpFS) Lstat(dir string) (os.FileInfo, error) {
|
|
sc, err := s.sftp()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return sc.Lstat(filepath.ToSlash(dir))
|
|
}
|
|
|
|
func (s sftpFS) MkdirAll(dir string, perm os.FileMode) error {
|
|
sc, err := s.sftp()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return sc.MkdirAll(filepath.ToSlash(dir))
|
|
}
|