perkeep/lib/python/fusepy/memory3.py

128 lines
3.9 KiB
Python
Executable File

#!/usr/bin/env python
from fuse3 import FUSE, Operations, LoggingMixIn
from collections import defaultdict
from errno import ENOENT
from stat import S_IFDIR, S_IFLNK, S_IFREG
from sys import argv, exit
from time import time
import logging
class Memory(LoggingMixIn, Operations):
"""Example memory filesystem. Supports only one level of files."""
def __init__(self):
self.files = {}
self.data = defaultdict(bytearray)
self.fd = 0
now = time()
self.files['/'] = dict(st_mode=(S_IFDIR | 0o755), st_ctime=now,
st_mtime=now, st_atime=now, st_nlink=2)
def chmod(self, path, mode):
self.files[path]['st_mode'] &= 0o770000
self.files[path]['st_mode'] |= mode
return 0
def chown(self, path, uid, gid):
self.files[path]['st_uid'] = uid
self.files[path]['st_gid'] = gid
def create(self, path, mode):
self.files[path] = dict(st_mode=(S_IFREG | mode), st_nlink=1,
st_size=0, st_ctime=time(), st_mtime=time(), st_atime=time())
self.fd += 1
return self.fd
def getattr(self, path, fh=None):
if path not in self.files:
raise OSError(ENOENT, '')
st = self.files[path]
return st
def getxattr(self, path, name, position=0):
attrs = self.files[path].get('attrs', {})
try:
return attrs[name]
except KeyError:
return '' # Should return ENOATTR
def listxattr(self, path):
attrs = self.files[path].get('attrs', {})
return attrs.keys()
def mkdir(self, path, mode):
self.files[path] = dict(st_mode=(S_IFDIR | mode), st_nlink=2,
st_size=0, st_ctime=time(), st_mtime=time(), st_atime=time())
self.files['/']['st_nlink'] += 1
def open(self, path, flags):
self.fd += 1
return self.fd
def read(self, path, size, offset, fh):
return bytes(self.data[path][offset:offset + size])
def readdir(self, path, fh):
return ['.', '..'] + [x[1:] for x in self.files if x != '/']
def readlink(self, path):
return self.data[path].decode('utf-8')
def removexattr(self, path, name):
attrs = self.files[path].get('attrs', {})
try:
del attrs[name]
except KeyError:
pass # Should return ENOATTR
def rename(self, old, new):
self.files[new] = self.files.pop(old)
def rmdir(self, path):
self.files.pop(path)
self.files['/']['st_nlink'] -= 1
def setxattr(self, path, name, value, options, position=0):
# Ignore options
attrs = self.files[path].setdefault('attrs', {})
attrs[name] = value
def statfs(self, path):
return dict(f_bsize=512, f_blocks=4096, f_bavail=2048)
def symlink(self, target, source):
source = source.encode('utf-8')
self.files[target] = dict(st_mode=(S_IFLNK | 0o777), st_nlink=1,
st_size=len(source))
self.data[target] = bytearray(source)
def truncate(self, path, length, fh=None):
del self.data[path][length:]
self.files[path]['st_size'] = length
def unlink(self, path):
self.files.pop(path)
def utimens(self, path, times=None):
now = time()
atime, mtime = times if times else (now, now)
self.files[path]['st_atime'] = atime
self.files[path]['st_mtime'] = mtime
def write(self, path, data, offset, fh):
del self.data[path][offset:]
self.data[path].extend(data)
self.files[path]['st_size'] = len(self.data[path])
return len(data)
if __name__ == "__main__":
if len(argv) != 2:
print('usage: %s <mountpoint>' % argv[0])
exit(1)
logging.getLogger().setLevel(logging.DEBUG)
fuse = FUSE(Memory(), argv[1], foreground=True)