Improve search and ls modules

This commit is contained in:
Oleksii Shevchuk 2018-10-18 22:44:29 +03:00
parent 60c189afc0
commit f612958a66
11 changed files with 618 additions and 87 deletions

View File

@ -2,16 +2,16 @@
from datetime import datetime
def size_human_readable(num, suffix='B'):
def size_human_readable(num, suffix=''):
try:
num = int(num)
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
for unit in [suffix or 'B','K','M','G','T','P','E','Z']:
if abs(num) < 1024.0:
return "%3.1f %s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f %s%s" % (num, 'Yi', suffix)
return "%.1f%s%s" % (num, 'Yi', suffix)
except:
return '0.00 B'
return '0.0B'
def file_timestamp(timestamp, time=False):
try:

View File

@ -4,6 +4,7 @@ from pupylib.PupyModule import config, PupyModule, PupyArgumentParser
from pupylib.PupyCompleter import remote_path_completer
from pupylib.PupyOutput import Color
from modules.lib import size_human_readable, file_timestamp, to_utf8
from argparse import REMAINDER
__class_name__="ls"
@ -25,7 +26,7 @@ T_HAS_XATTR = 14
# TODO: Rewrite using tables
def output_format(file, windows=False, archive=None, time=False):
def output_format(file, windows=False, archive=None, time=False, uid_len=0, gid_len=0):
if file[T_TYPE] == 'X':
return '--- TRUNCATED ---'
@ -37,22 +38,28 @@ def output_format(file, windows=False, archive=None, time=False):
timestamp_field = u'{:<18}' if time else u'{:<10}'
if windows:
out = u' {}{}{}{}{}'.format(
out = u' {}{}{}{}{}{}'.format(
timestamp_field.format(file_timestamp(file[T_TIMESTAMP], time)),
u'{:<3}'.format(file[T_TYPE]),
u'{:<11}'.format(size_human_readable(file[T_SIZE])),
u'{:<1}'.format('+' if file[T_HAS_XATTR] else ''),
u'{:<40}'.format(name))
u'{:<2}'.format(file[T_TYPE] + ('+' if file[T_HAS_XATTR] else '')),
unicode(file[T_UID]).rjust(uid_len)+' ' if uid_len else '',
unicode(file[T_GID]).rjust(gid_len)+' ' if gid_len else '',
u'{:>9}'.format(size_human_readable(file[T_SIZE])),
u' {:<40}'.format(name))
else:
out = u' {}{}{}{}{}{}{}{}'.format(
if not uid_len:
uid_len = 5
if not gid_len:
gid_len = 5
out = u' {}{}{}{}{}{}{}'.format(
timestamp_field.format(file_timestamp(file[T_TIMESTAMP], time)),
u'{:<3}'.format(file[T_TYPE]),
u'{:<5}'.format(file[T_UID]),
u'{:<5}'.format(file[T_GID]),
u' {:06o} '.format(file[T_MODE]),
u'{:<11}'.format(size_human_readable(file[T_SIZE])),
u'{:<1}'.format('+' if file[T_HAS_XATTR] else ''),
u'{:<40}'.format(name))
u'{:<2}'.format(file[T_TYPE] + ('+' if file[T_HAS_XATTR] else '')),
unicode(file[T_UID]).rjust(uid_len+1)+' ',
unicode(file[T_GID]).rjust(gid_len+1)+' ',
u'{:04o} '.format(file[T_MODE] & 0o7777),
u'{:>9}'.format(size_human_readable(file[T_SIZE])),
u' {:<40}'.format(name))
if archive:
out=Color(out, 'yellow')
@ -90,10 +97,11 @@ class ls(PupyModule):
dependencies = {
'all': [
'pupyutils.basic_cmds', 'scandir', 'zipfile', 'tarfile'
'pupyutils', 'scandir', 'zipfile',
'tarfile', 'scandir', 'fsutils'
],
'windows': ['junctions', 'ntfs_streams'],
'linux': ['xattr']
'windows': ['junctions', 'ntfs_streams', '_scandir'],
'linux': ['xattr', '_scandir']
}
@classmethod
@ -101,6 +109,10 @@ class ls(PupyModule):
cls.arg_parser = PupyArgumentParser(prog="ls", description=cls.__doc__)
cls.arg_parser.add_argument('-d', '--dir', action='store_false', default=True,
help='do not list directories')
cls.arg_parser.add_argument('-u', '--userinfo', action='store_true', help='show uid info')
cls.arg_parser.add_argument('-g', '--groupinfo', action='store_true', help='show gid info')
sort = cls.arg_parser.add_mutually_exclusive_group()
sort.add_argument('-L', '--limit', type=int, default=1024,
help='List no more than this amount of files (server side), '
@ -110,13 +122,18 @@ class ls(PupyModule):
sort.add_argument('-t', '--time', dest='sort', action='store_const', const=T_TIMESTAMP, help='sort by time')
cls.arg_parser.add_argument('-r', '--reverse', action='store_true', default=False, help='reverse sort order')
cls.arg_parser.add_argument(
'path', type=str, nargs='?', help='path of a specific file', completer=remote_path_completer)
'path', type=str, nargs=REMAINDER, help='path of a specific file', completer=remote_path_completer)
def run(self, args):
try:
ls = self.client.remote('pupyutils.basic_cmds', 'ls')
results = ls(args.path, args.dir, args.limit, args.archive)
path = ' '.join(args.path)
results = ls(
path, args.dir, args.limit,
args.archive, args.userinfo or args.groupinfo)
except Exception, e:
self.error(' '.join(x for x in e.args if type(x) in (str, unicode)))
return
@ -135,10 +152,31 @@ class ls(PupyModule):
show_time = args.sort == T_TIMESTAMP
for r in results:
uid_len = 0
gid_len = 0
if T_FILES in r:
archive = None
is_windows = windows
if args.userinfo or args.groupinfo:
for x in r[T_FILES]:
if args.userinfo:
uid = x[T_UID]
if type(uid) == int:
uid = str(uid)
if len(uid) > uid_len:
uid_len = len(uid)
if args.groupinfo:
gid = x[T_GID]
if type(gid) == int:
gid = str(gid)
if len(gid) > gid_len:
gid_len = len(gid)
if T_ZIPFILE in r:
self.log(Color('ZIP: '+r[T_ZIPFILE]+':', 'lightred'))
is_windows = True
@ -168,10 +206,10 @@ class ls(PupyModule):
files_cnt += 1
for f in sorted(dirs, key=lambda x: to_utf8(x.get(T_NAME)), reverse=args.reverse):
self.log(output_format(f, is_windows, time=show_time))
self.log(output_format(f, is_windows, time=show_time, uid_len=uid_len, gid_len=gid_len))
for f in sorted(files, key=lambda x: to_utf8(x.get(T_NAME)), reverse=args.reverse):
self.log(output_format(f, is_windows, time=show_time))
self.log(output_format(f, is_windows, time=show_time, uid_len=uid_len, gid_len=gid_len))
if truncated:
self.warning('Folder is too big. Not listed: {} (-L {})'.format(
@ -192,7 +230,7 @@ class ls(PupyModule):
truncated = True
continue
self.log(output_format(f, is_windows, time=show_time))
self.log(output_format(f, is_windows, time=show_time, uid_len=uid_len, gid_len=gid_len))
if truncated:
self.log('--- TRUNCATED ---')
@ -206,7 +244,23 @@ class ls(PupyModule):
elif T_TARFILE in r:
archive = 'TAR'
is_windows = False
self.log(output_format(r[T_FILE], is_windows, archive, show_time))
if args.userinfo:
uid = r[T_FILE][T_UID]
if type(uid) == int:
uid = str(uid)
uid_len = len(uid)
if args.groupinfo:
gid = r[T_FILE][T_GID]
if type(gid) == int:
gid = str(gid)
gid_len = len(gid)
self.log(output_format(r[T_FILE], is_windows, archive, show_time, uid_len=uid_len, gid_len=gid_len))
else:
self.error('Old format. Update pupyutils.basic_cmds')
return

View File

@ -5,6 +5,9 @@ from pupylib.PupyCompleter import remote_path_completer
from modules.lib.utils.download import DownloadFronted
from threading import Event
from datetime import datetime
import dateparser
__class_name__="SearchModule"
@ -14,9 +17,10 @@ class SearchModule(PupyModule):
dependencies = {
'all': [
'pupyutils.search', 'scandir', 'transfer',
'zipfile', 'tarfile'
'zipfile', 'tarfile', 'fsutils', 'scandir'
],
'windows': ['junctions'],
'windows': ['junctions', 'ntfs_streams', 'pupwinutils', '_scandir'],
'linux': ['xattr', '_scandir']
}
terminate = None
@ -25,8 +29,8 @@ class SearchModule(PupyModule):
def init_argparse(cls):
example = 'Examples:\n'
example += '- Recursively search strings in files:\n'
example += '>> run search .*ini passw.*=.*\n'
example += '>> run search .* passw.*=.* -I\n'
example += '>> run search -C .*ini passw.*=.*\n'
example += '>> run search -C .* passw.*=.* -I\n'
example += '- Recursively search string in file names:\n'
example += '>> run search pwdfile.*\n'
@ -37,6 +41,7 @@ class SearchModule(PupyModule):
help='root path to start (default: current path)')
cls.arg_parser.add_argument('-m','--max-size', type=int, default=20000000, help='max file size (default 20 Mo)')
cls.arg_parser.add_argument('-b', '--binary', action='store_true', help='search content inside binary files')
cls.arg_parser.add_argument('-v', '--verbose', action='store_true', help='show errors')
cls.arg_parser.add_argument('-C', '--content-only', action='store_true', help='show only results with content')
cls.arg_parser.add_argument('-L', '--links', action='store_true', help='follow symlinks')
cls.arg_parser.add_argument('-N', '--no-content', action='store_true', help='if string matches, output just filename')
@ -46,6 +51,17 @@ class SearchModule(PupyModule):
cls.arg_parser.add_argument('-D', '--download', action='store_true', help='download found files (imply -N)')
cls.arg_parser.add_argument('-A', '--archive', action='store_true', default=False, help='search in archive')
cls.arg_parser.add_argument('-U', '--suid', action='store_true', default=False, help='Search SUID files')
cls.arg_parser.add_argument('-G', '--sgid', action='store_true', default=False, help='Search SGID files')
cls.arg_parser.add_argument('-u', '--user', help='Search files owned by user')
cls.arg_parser.add_argument('-g', '--group', help='Search files owned by group')
cls.arg_parser.add_argument('-O', '--own-world-accessible-write', action='store_true',
help='Search accessible files for current process (write)')
cls.arg_parser.add_argument('-t', '--timestamp-newer', help='Search files which are newer than date')
cls.arg_parser.add_argument('-T', '--timestamp-older', help='Search files which are older than date')
cls.arg_parser.add_argument('-X', '--xattr', default=False, nargs='?',
help='Search files with extended attributes (can be specified)')
cls.arg_parser.add_argument('filename', type=str, metavar='filename', help='regex to search (filename)')
cls.arg_parser.add_argument('strings', nargs='*', default=[], type=str, metavar='string', help='regex to search (content)')
@ -55,6 +71,27 @@ class SearchModule(PupyModule):
search = self.client.remote('pupyutils.search')
newer = None
older = None
if args.timestamp_newer:
try:
newer = datetime.fromtimestamp(int(args.timestamp_newer))
except ValueError:
newer = dateparser.parse(args.timestamp_newer)
newer = int((newer - datetime.fromtimestamp(0)).total_seconds())
print newer
if args.timestamp_older:
try:
older = datetime.fromtimestamp(int(args.timestamp_older))
except ValueError:
older = dateparser.parse(args.timestamp_older)
older = int((older - datetime.fromtimestamp(0)).total_seconds())
s = search.Search(
args.filename,
strings=args.strings,
@ -66,7 +103,15 @@ class SearchModule(PupyModule):
binary=args.binary,
same_fs=not args.no_same_fs,
search_in_archives=args.archive,
content_only=args.content_only
content_only=args.content_only,
suid=args.suid,
sgid=args.sgid,
user=args.user,
group=args.group,
owaw=args.own_world_accessible_write,
newer=newer,
older=older,
xattr=args.xattr if args.xattr else args.xattr is not False
)
if args.download:
@ -111,7 +156,13 @@ class SearchModule(PupyModule):
self.terminate = terminate.set
self.info('Search started. Use ^C to interrupt')
s.run_cb(on_data, on_completed, self.error)
error = self.error
if not args.verbose:
def error(x):
pass
s.run_cb(on_data, on_completed, error)
terminate.wait()
s.stop()

View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
__all__ = (
'readlink', 'lstat', 'has_xattrs', 'uidgid',
'username_to_uid', 'groupname_to_gid',
'NoUidGidMapping', 'NoSuchUser', 'NoSuchGroup'
)
class NoUidGidMapping(Exception):
pass
class NoSuchUser(NoUidGidMapping):
pass
class NoSuchGroup(NoUidGidMapping):
pass
from os import readlink, lstat
try:
from xattr import listxattr
def has_xattrs(path):
return listxattr(path)
except ImportError:
def has_xattrs(path):
return None
try:
from pwd import getpwuid, getpwnam
from grp import getgrgid, getgrnam
def username_to_uid(username):
try:
return getpwnam(username).pw_uid
except KeyError:
raise NoSuchUser(username)
def groupname_to_gid(groupname):
try:
return getgrnam(groupname).gr_gid
except KeyError:
raise NoSuchGroup(groupname)
def uidgid(path, item, as_text=True):
if not as_text:
return item.st_uid, item.st_gid
pw = getpwuid(item.st_uid)
gr = getgrgid(item.st_gid)
return \
pw.pw_name if pw else str(item.st_uid), \
gr.gr_name if gr else str(item.st_gid)
except ImportError:
def uidgid(path, item):
return item.st_uid, item.st_gid
def username_to_uid(username):
raise NoSuchUser(username)
def groupname_to_gid(groupname):
raise NoSuchGroup(groupname)

View File

@ -4,7 +4,6 @@ import glob
import shutil
import getpass
import stat
import sys
import datetime
import re
import codecs
@ -39,31 +38,7 @@ T_HAS_XATTR = 14
textchars = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f})
if sys.platform == 'win32':
from junctions import readlink, lstat
try:
from ntfs_streams import get_streams
def has_xattrs(path):
return get_streams(path)
except ImportError:
def has_xattrs(path):
return None
else:
from os import readlink, lstat
try:
from xattr import listxattr
def has_xattrs(path):
return listxattr(path)
except ImportError:
def has_xattrs(path):
return None
from fsutils import readlink, lstat, has_xattrs, uidgid
def is_binary(text):
return bool(text.translate(None, textchars))
@ -96,7 +71,14 @@ def safe_stat(path):
try:
return lstat(path)
except:
return FakeStat()
pass
try:
return os.stat(path)
except:
pass
return FakeStat()
def safe_listdir(path):
path = try_unicode(path)
@ -142,7 +124,7 @@ def special_to_letter(mode):
return letter
def _stat_to_ls_struct(path, name, _stat):
def _stat_to_ls_struct(path, name, _stat, resolve_uidgid=False):
if stat.S_ISLNK(_stat.st_mode):
try:
name += ' -> '+readlink(path)
@ -154,13 +136,18 @@ def _stat_to_ls_struct(path, name, _stat):
except (OSError, IOError):
f_xattrs = False
if resolve_uidgid:
uid, gid = uidgid(path, _stat)
else:
uid, gid = _stat.st_uid, _stat.st_gid
return {
T_NAME: name,
T_TYPE: mode_to_letter(_stat.st_mode),
T_SPEC: special_to_letter(_stat.st_mode),
T_MODE: _stat.st_mode,
T_UID: _stat.st_uid,
T_GID: _stat.st_gid,
T_UID: uid,
T_GID: gid,
T_SIZE: _stat.st_size,
T_TIMESTAMP: int(_stat.st_mtime),
T_HAS_XATTR: bool(f_xattrs)
@ -180,7 +167,7 @@ def _invalid_ls_struct(path, name):
}
def list_file(path):
def list_file(path, resolve_uidgid=False):
path = try_unicode(path)
if path.endswith(os.path.sep):
@ -191,7 +178,7 @@ def list_file(path):
name = os.path.basename(path)
_stat = safe_stat(path)
return _stat_to_ls_struct(path, name, _stat)
return _stat_to_ls_struct(path, name, _stat, resolve_uidgid)
def list_tar(path, max_files=None):
result = []
@ -266,7 +253,7 @@ def list_zip(path, max_files=None):
return result
def list_dir(path, max_files=None):
def list_dir(path, max_files=None, resolve_uidgid=False):
path = try_unicode(path)
result = []
@ -281,7 +268,8 @@ def list_dir(path, max_files=None):
try:
result.append(_stat_to_ls_struct(
item.path, item.name,
item.stat(follow_symlinks=False)))
item.stat(follow_symlinks=False),
resolve_uidgid=resolve_uidgid))
except OSError:
result.append(_invalid_ls_struct(item.path, item.name))
@ -365,7 +353,19 @@ def complete(path, limit=32, dirs=None):
return path, results
def ls(path=None, listdir=True, limit=4096, list_arc=False):
def safe_is_zipfile(filepath):
try:
return is_zipfile(filepath)
except (OSError, IOError):
return False
def safe_is_tarfile(filepath):
try:
return is_tarfile(filepath)
except (OSError, IOError):
return False
def ls(path=None, listdir=True, limit=4096, list_arc=False, resolve_uidgid=False):
if path:
path = try_unicode(path)
path = os.path.expanduser(path)
@ -386,16 +386,18 @@ def ls(path=None, listdir=True, limit=4096, list_arc=False):
if listdir:
results.append({
T_PATH: path,
T_FILES: list_dir(path, max_files=limit)
T_FILES: list_dir(
path, max_files=limit,
resolve_uidgid=resolve_uidgid)
})
else:
results.append({
T_PATH: path,
T_FILE: list_file(path)
T_FILE: list_file(path, resolve_uidgid)
})
elif os.path.isfile(path):
if is_zipfile(path):
if safe_is_zipfile(path):
if list_arc:
results.append({
T_ZIPFILE: path,
@ -406,7 +408,7 @@ def ls(path=None, listdir=True, limit=4096, list_arc=False):
T_ZIPFILE: path,
T_FILE: list_file(path)
})
elif is_tarfile(path):
elif safe_is_tarfile(path):
if list_arc:
results.append({
T_TARFILE: path,
@ -420,12 +422,12 @@ def ls(path=None, listdir=True, limit=4096, list_arc=False):
else:
results.append({
T_PATH: path,
T_FILE: list_file(path)
T_FILE: list_file(path, resolve_uidgid)
})
else:
results.append({
T_PATH: path,
T_FILE: list_file(path)
T_FILE: list_file(path, resolve_uidgid)
})
if not found:

View File

@ -21,18 +21,26 @@ from zipfile import ZipFile, is_zipfile
from tarfile import is_tarfile
from tarfile import open as open_tarfile
from fsutils import uidgid, username_to_uid, groupname_to_gid, has_xattrs
PERMISSION_ERRORS = [
getattr(errno, x) for x in ('EPERM', 'EACCESS') if hasattr(errno, x)
]
SEARCH_WINDOW_SIZE = 32768
from uuid import uuid4
OWAW_PROBE_NAME = str(uuid4())
class Search(object):
def __init__(
self, path,
strings=[], max_size=20000000, root_path='.', no_content=False,
case=False, binary=False, follow_symlinks=False, terminate=None,
same_fs=True, search_in_archives=False, content_only=False
same_fs=True, search_in_archives=False, content_only=False,
suid=False, sgid=False, user=False, group=False,
owaw=False, newer=None, older=None, xattr=False
):
self.max_size = int(max_size)
@ -44,6 +52,15 @@ class Search(object):
self.search_in_archives = search_in_archives
self.content_only = content_only if strings else False
self.suid = suid
self.sgid = sgid
self.user = username_to_uid(user) if user else None
self.group = groupname_to_gid(group) if group else None
self.owaw = owaw
self.newer = newer
self.older = older
self.xattr = xattr
if self.case:
i = re.IGNORECASE | re.UNICODE
else:
@ -76,6 +93,9 @@ class Search(object):
re.compile(s, i) for s in strings
]
if self.xattr and self.xattr is not True:
self.xattr = re.compile(self.xattr, i)
self.terminate = terminate
if root_path == '.':
@ -86,6 +106,12 @@ class Search(object):
if self.same_fs:
self.same_fs = os.stat(self.root_path).st_dev
self.extended = any([
self.xattr, self.suid, self.sgid,
self.user, self.group, self.owaw,
self.newer, self.older
])
def search_string_in_fileobj(self, fileobj, find_all=False, filename=None):
try:
offset = 0
@ -143,9 +169,66 @@ class Search(object):
setattr(e, 'exc', (sys.exc_type, sys.exc_value, sys.exc_traceback))
yield e
def filter_extended(self, item):
if not self.extended:
return True
path = item.path
if self.xattr:
if self.xattr is True:
if has_xattrs(path):
return True
elif any([self.xattr.match(x) for x in has_xattrs(path)]):
return True
if self.suid or self.sgid and sys.platform != 'win32':
if self.suid and item.stat().st_mode & 0o4000:
return True
if self.sgid and item.stat().st_mode & 0o2000:
return True
if self.user or self.group:
uid, gid = uidgid(path, item.stat(), as_text=False)
if self.user and self.user == uid or self.group and self.group == gid:
return True
if self.owaw:
print "TRY OWAW", path
if item.is_dir():
try:
tmp_file = os.path.join(path, OWAW_PROBE_NAME)
f = open(tmp_file, 'w')
f.close()
os.unlink(tmp_file)
return True
except (OSError, IOError):
pass
elif item.is_file():
try:
f = open(path, 'a')
f.close()
return True
except (OSError, IOError):
pass
if self.newer and item.stat().st_mtime > self.newer:
return True
if self.older and item.stat().st_mtime < self.older:
return True
return False
def search_in_archive(self, path):
any_file = not self.name or self.path
# We don't support extended search in archives
if is_zipfile(path):
zf = ZipFile(path)
try:
@ -222,24 +305,29 @@ class Search(object):
):
try:
if not self.strings or not (self.strings and entry.is_file()):
if not any_file:
if not any_file and self.filter_extended(entry):
yield entry.path
else:
size = entry.stat().st_size
if size > self.max_size:
continue
if not self.filter_extended(entry):
continue
for s in self.search_string(entry.path):
if s:
if isinstance(s, Exception):
yield s
elif self.no_content:
yield entry.path
break
if self.filter_extended(entry):
yield entry.path
break
else:
yield (entry.path, s)
if self.filter_extended(entry):
yield (entry.path, s)
except IOError, e:
if e.errno in PERMISSION_ERRORS:
continue

View File

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
__all__ = (
'readlink', 'lstat', 'has_xattrs', 'uidgid',
'username_to_uid', 'groupname_to_gid',
'NoUidGidMapping', 'NoSuchUser', 'NoSuchGroup'
)
class NoUidGidMapping(Exception):
pass
class NoSuchUser(NoUidGidMapping):
pass
class NoSuchGroup(NoUidGidMapping):
pass
from junctions import readlink, lstat
try:
from ntfs_streams import get_streams
def has_xattrs(path):
try:
return get_streams(path)
except (OSError, IOError, WindowsError):
return None
except ImportError:
def has_xattrs(path):
return None
try:
from pupwinutils.security import getfileowner, sidbyname
def uidgid(path, item, as_text=True):
try:
return getfileowner(path, as_sid=not as_text)
except (OSError, IOError, WindowsError):
return '?', '?'
def username_to_uid(username):
try:
sid = sidbyname(username)
if not sid:
raise NoSuchUser(username)
except WindowsError:
raise NoSuchUser(username)
def groupname_to_gid(groupname):
try:
sid = sidbyname(groupname)
if not sid:
raise NoSuchUser(groupname)
except WindowsError:
raise NoSuchGroup(groupname)
except ImportError:
def uidgid(path, item):
return '', ''
def username_to_uid(username):
raise NoSuchUser(username)
def groupname_to_gid(groupname):
raise NoSuchGroup(groupname)

View File

@ -155,7 +155,12 @@ class LinkStat(object):
st_rdev = 0
def lstat(path):
if islink(path):
try:
is_link = islink(path)
except WindowsError:
is_link = False
if is_link:
return LinkStat()
else:
return os.stat(path)

View File

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
__all__ = [
'get_streams'
]
from sys import getfilesystemencoding
from ctypes import WinDLL, c_void_p, byref, Structure
from ctypes import c_longlong as LONGLONG
from ctypes.wintypes import (
LPWSTR, DWORD, WCHAR, HANDLE, BOOL
)
kernel32 = WinDLL('kernel32')
class LARGE_INTEGER_UNION(Structure):
_fields_ = [
("QuadPart", LONGLONG),
]
class WIN32_FIND_STREAM_DATA(Structure):
_fields_ = [
("StreamSize", LARGE_INTEGER_UNION),
("cStreamName", WCHAR * (260+36+1)),
]
FindFirstStreamW = kernel32.FindFirstStreamW
FindFirstStreamW.argtypes = [
LPWSTR, DWORD, c_void_p, DWORD
]
FindFirstStreamW.restype = HANDLE
FindNextStreamW = kernel32.FindNextStreamW
FindNextStreamW.argtypes = [
HANDLE, c_void_p
]
FindNextStreamW.restype = BOOL
FindClose = kernel32.FindClose
FindClose.argtypes = [
HANDLE
]
INVALID_HANDLE_VALUE = c_void_p(-1).value
def get_streams(filename):
if type(filename) == str:
filename = filename.decode(
getfilesystemencoding())
file_infos = WIN32_FIND_STREAM_DATA()
streams = FindFirstStreamW(filename, 0, byref(file_infos), 0)
if streams == INVALID_HANDLE_VALUE:
return []
stream_name = file_infos.cStreamName
stream_list = list()
if stream_name:
if not stream_name.startswith('::'):
stream_list.append(stream_name.split(':')[1])
while FindNextStreamW(streams, byref(file_infos)):
stream_name = file_infos.cStreamName
if not stream_name.startswith('::'):
stream_list.append(stream_name.split(':')[1])
FindClose(streams)
return stream_list

View File

@ -249,11 +249,12 @@ class SECURITY_DESCRIPTOR(Structure):
('Revision', BYTE),
('Sbz1', BYTE),
('Control', WORD),
('Owner', PSID),
('Group', PSID),
('Sacl', POINTER(ACL_HEADER)),
('Dacl', POINTER(ACL_HEADER)),
('Owner', c_void_p),
('Group', c_void_p),
('Sacl', c_void_p),
('Dacl', c_void_p),
]
PSECURITY_DESCRIPTOR = POINTER(SECURITY_DESCRIPTOR)
class SECURITY_ATTRIBUTES(Structure):
_fields_ = [
@ -261,6 +262,7 @@ class SECURITY_ATTRIBUTES(Structure):
("lpSecurityDescriptor", LPVOID),
("bInheritHandle", BOOL),
]
PSECURITY_ATTRIBUTES = POINTER(SECURITY_ATTRIBUTES)
class OSVERSIONINFOEXW(Structure):
@ -287,6 +289,18 @@ class PRIVILEGE_SET_HEADER(Structure):
# advapi32
LookupAccountNameW = advapi32.LookupAccountNameW
LookupAccountNameW.restype = BOOL
LookupAccountNameW.argtypes = [
LPWSTR, LPWSTR, PSID, POINTER(DWORD), LPWSTR, POINTER(DWORD), POINTER(DWORD)
]
LookupAccountSidW = advapi32.LookupAccountSidW
LookupAccountSidW.restype = BOOL
LookupAccountSidW.argtypes = [
LPWSTR, PSID, LPWSTR, POINTER(DWORD), LPWSTR, POINTER(DWORD), POINTER(DWORD)
]
AdjustTokenPrivileges = advapi32.AdjustTokenPrivileges
AdjustTokenPrivileges.restype = BOOL
AdjustTokenPrivileges.argtypes = [HANDLE, BOOL, PTOKEN_PRIVILEGES, DWORD, PTOKEN_PRIVILEGES, POINTER(DWORD)]
@ -297,7 +311,7 @@ CheckTokenMembership.argtypes = [HANDLE, PSID, POINTER(BOOL)]
ConvertSidToStringSidA = advapi32.ConvertSidToStringSidA
ConvertSidToStringSidA.restype = BOOL
ConvertSidToStringSidA.argtypes = [DWORD, POINTER(LPTSTR)]
ConvertSidToStringSidA.argtypes = [PSID, POINTER(LPTSTR)]
CreateProcessAsUser = advapi32.CreateProcessAsUserA
CreateProcessAsUser.restype = BOOL
@ -355,6 +369,14 @@ GetFileSecurityW.argtypes = [LPWSTR, SECURITY_INFORMATION, c_void_p,
DWORD, POINTER(DWORD)]
GetFileSecurityW.restype = BOOL
GetSecurityDescriptorGroup = advapi32.GetSecurityDescriptorGroup
GetSecurityDescriptorGroup.argtypes = [c_void_p, POINTER(PSID), POINTER(BOOL)]
GetSecurityDescriptorGroup.restype = BOOL
GetSecurityDescriptorUser = advapi32.GetSecurityDescriptorGroup
GetSecurityDescriptorUser.argtypes = [c_void_p, POINTER(PSID), POINTER(BOOL)]
GetSecurityDescriptorUser.restype = BOOL
IsValidSecurityDescriptor = advapi32.IsValidSecurityDescriptor
IsValidSecurityDescriptor.argtypes = [c_void_p]
GetFileSecurityW.restype = BOOL
@ -495,7 +517,6 @@ def ListSids():
print e
return list(sids)
def getProcessToken(pid):
hProcess = OpenProcess(PROCESS_QUERY_INFORMATION, False, pid)
hToken = HANDLE(INVALID_HANDLE_VALUE)
@ -795,7 +816,7 @@ def access(path, mode):
access_desired = 0
if type(path) == str:
path = path.decode(sys.getfilesystemencoding())
path = path.decode('utf-8')
attributes = GetFileAttributesW(path)
@ -883,3 +904,108 @@ def access(path, mode):
CloseHandle(hToken)
return is_access_granted
def strsid(sid, exc=True):
StringSid = LPTSTR()
if ConvertSidToStringSidA(sid, byref(StringSid)):
return StringSid.value
if not exc:
return None
raise WinError(get_last_error())
def namebysid(sid, domain=None):
Name = LPWSTR()
cbName = DWORD(0)
ReferencedDomainName = LPWSTR()
cchReferencedDomainName = DWORD(0)
peUse = DWORD(0)
if LookupAccountSidW(domain, sid, Name, byref(cbName),
ReferencedDomainName, byref(cchReferencedDomainName), byref(peUse)) or \
get_last_error() != ERROR_INSUFFICIENT_BUFFER or cbName.value <= 0 or \
cchReferencedDomainName.value <= 0:
return ''
Name = create_unicode_buffer(cbName.value)
ReferencedDomainName = create_unicode_buffer(cchReferencedDomainName.value)
if not LookupAccountSidW(domain, sid, Name, byref(cbName),
ReferencedDomainName, byref(cchReferencedDomainName), byref(peUse)):
raise WinError(get_last_error())
return Name.value
def sidbyname(name):
if type(name) == str:
name = name.decode('utf-8')
domain = None
if '\\' in name:
domain, name = domain.split('\\', 1)
Sid = PSID()
cbSid = DWORD(0)
ReferencedDomainName = LPWSTR()
cchReferencedDomainName = DWORD(0)
peUse = DWORD(0)
if LookupAccountNameW(domain, name, Sid, byref(cbSid),
ReferencedDomainName, byref(cchReferencedDomainName), byref(peUse)) or \
get_last_error() != ERROR_INSUFFICIENT_BUFFER or cbSid.value <= 0 or \
cchReferencedDomainName.value <= 0:
return None
Sid = create_string_buffer(cbSid.value)
ReferencedDomainName = create_unicode_buffer(cchReferencedDomainName.value)
if not LookupAccountNameW(domain, name, Sid, byref(cbSid),
ReferencedDomainName, byref(cchReferencedDomainName), byref(peUse)):
raise WinError(get_last_error())
return strsid(Sid)
def getfileowner(path, as_sid=True):
if type(path) == str:
path = path.decode('utf-8')
requested_information = OWNER_SECURITY_INFORMATION | \
GROUP_SECURITY_INFORMATION | DACL_SECURITY_INFORMATION
dwSize = DWORD(0)
success = GetFileSecurityW(path, requested_information,
c_void_p(0), 0, byref(dwSize))
if not success and get_last_error() != ERROR_INSUFFICIENT_BUFFER:
raise WinError(get_last_error())
pSDBuf = create_string_buffer(dwSize.value)
can_read_access = GetFileSecurityW(
path, requested_information, pSDBuf,
dwSize, byref(dwSize))
if not can_read_access:
raise WinError(get_last_error())
if not IsValidSecurityDescriptor(pSDBuf):
raise WinError(get_last_error())
GSid = PSID()
USid = PSID()
bDefault = BOOL()
if GetSecurityDescriptorUser(pSDBuf, byref(USid), byref(bDefault)) and \
GetSecurityDescriptorGroup(pSDBuf, byref(GSid), byref(bDefault)):
if as_sid:
return strsid(GSid), strsid(USid)
else:
return namebysid(GSid), namebysid(USid)
raise WinError(get_last_error())

View File

@ -34,6 +34,7 @@ M2Crypto>=0.30.1
fusepy
defusedxml
keyboard
dateparser
puttykeys
pyelftools
-e external/pykcp