mirror of https://github.com/n1nj4sec/pupy.git
Merge branch 'search' of https://github.com/AlessandroZ/pupy into dev
This commit is contained in:
commit
32f739159f
|
@ -1,5 +1,7 @@
|
|||
# -*- coding: UTF8 -*-
|
||||
from pupylib.PupyModule import *
|
||||
import os
|
||||
from pupylib.utils.term import colorize
|
||||
|
||||
__class_name__="SearchModule"
|
||||
|
||||
|
@ -9,18 +11,30 @@ class SearchModule(PupyModule):
|
|||
daemon=True
|
||||
def init_argparse(self):
|
||||
self.arg_parser = PupyArgumentParser(prog="search", description=self.__doc__)
|
||||
self.arg_parser.add_argument('path', help='path')
|
||||
self.arg_parser.add_argument('-e','--extensions',metavar='ext1,ext2,...', help='limit to some extensions')
|
||||
self.arg_parser.add_argument('--path', default='.', help='root path to start (default: current path)')
|
||||
self.arg_parser.add_argument('-e','--extensions',metavar='ext1,ext2,...', default= '', help='limit to some extensions')
|
||||
self.arg_parser.add_argument('strings', nargs='+', metavar='string', help='strings to search')
|
||||
self.arg_parser.add_argument('-m','--max-size', type=int, default=None, help='max file size')
|
||||
self.arg_parser.add_argument('-m','--max-size', type=int, default=20000000, help='max file size (default 20 Mo)')
|
||||
self.arg_parser.add_argument('--content', action='store_true', help='check inside files (such as grep)')
|
||||
|
||||
def run(self, args):
|
||||
self.client.load_package("pupyutils.search", force=True)
|
||||
exts=[]
|
||||
self.client.load_package("scandir")
|
||||
|
||||
if args.extensions:
|
||||
exts=args.extensions.split(',')
|
||||
self.info("searching strings %s in %s ..."%(args.strings, args.path))
|
||||
for res in self.client.conn.modules['pupyutils.search'].search_path(args.path, args.strings, files_extensions=exts, max_size=args.max_size):
|
||||
self.success("%s:%s > %s"%(res[0],res[1],res[2]))
|
||||
self.info("search finished !")
|
||||
args.extensions = tuple(f.strip() for f in args.extensions.split(','))
|
||||
# if not extension is provided for find commad, try to extract it to gain time during the research
|
||||
elif not args.content:
|
||||
args.extensions = tuple(os.path.splitext(s)[1].strip() for s in args.strings)
|
||||
|
||||
search_str = [s.lower() for s in args.strings]
|
||||
|
||||
s = self.client.conn.modules['pupyutils.search'].Search(files_extensions=args.extensions, max_size=args.max_size, check_content=args.content, root_path=args.path, search_str=search_str)
|
||||
self.info("searching strings %s in %s ..."%(args.strings, args.path))
|
||||
for res in s.run():
|
||||
# add color
|
||||
for s in search_str:
|
||||
if s in res:
|
||||
res = res.replace(s, colorize(s,"green"))
|
||||
self.success("%s" % res)
|
||||
self.info("search finished !")
|
|
@ -1,42 +1,75 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: UTF8 -*-
|
||||
# -*- coding: utf-8 -*-
|
||||
from scandir import scandir, walk
|
||||
import time
|
||||
import os
|
||||
import os.path
|
||||
import re
|
||||
import sys
|
||||
|
||||
def search_file(path, search_strings):
|
||||
buf=b""
|
||||
line_nb=0
|
||||
try:
|
||||
with open(path, 'rb') as f:
|
||||
for line in f:
|
||||
line=line.lower()
|
||||
for s in search_strings:
|
||||
start=0
|
||||
while True:
|
||||
i=line.find(s.lower(), start)
|
||||
if i==-1:
|
||||
break
|
||||
start=i+1
|
||||
yield (line_nb+1, line[i:i+50].strip())
|
||||
line_nb+=1
|
||||
except Exception:
|
||||
pass
|
||||
class Search():
|
||||
def __init__(self, files_extensions='', max_size=20000000, check_content=False, root_path='.', search_str=[]):
|
||||
# By default max size is 20 Mo
|
||||
self.max_size = max_size
|
||||
self.files_extensions = files_extensions
|
||||
self.check_content = check_content
|
||||
if root_path == '.':
|
||||
self.root_path = os.getcwd()
|
||||
else:
|
||||
self.root_path = root_path
|
||||
self.search_str = search_str
|
||||
|
||||
def search_string(self, path, search_str):
|
||||
buffer_size = 4096
|
||||
buffer = None
|
||||
try:
|
||||
with open(path, 'rb') as f:
|
||||
while True:
|
||||
buffer = f.read(buffer_size)
|
||||
if buffer:
|
||||
for string in search_str:
|
||||
# no case sensitive on regex
|
||||
indexes = [m.start() for m in re.finditer(string, buffer, flags=re.IGNORECASE)]
|
||||
for i in indexes:
|
||||
# return the entire line
|
||||
yield buffer[i:].strip().split('\n')[0]
|
||||
else:
|
||||
break
|
||||
except:
|
||||
pass
|
||||
|
||||
def search_path(path, search_strings, files_extensions=None, max_size=None):
|
||||
""" search recursively for a string in all files in the path """
|
||||
if not files_extensions:
|
||||
files_extensions=None
|
||||
if files_extensions is not None:
|
||||
files_extensions=tuple(files_extensions)
|
||||
for root, dirs, files in os.walk(path):
|
||||
for f in files:
|
||||
if files_extensions is None or f.lower().endswith(files_extensions):
|
||||
if max_size is None or os.path.getsize(os.path.join(root,f))<max_size:
|
||||
for res in search_file(os.path.join(root,f),search_strings):
|
||||
yield (os.path.join(root,f), res[0], res[1])
|
||||
|
||||
if __name__=="__main__":
|
||||
import sys
|
||||
search_path(sys.argv[1],[sys.argv[2]])
|
||||
def scanwalk(self, path, followlinks=False):
|
||||
''' lists of DirEntries instead of lists of strings '''
|
||||
dirs, nondirs = [], []
|
||||
try:
|
||||
for entry in scandir(path):
|
||||
if entry.is_dir(follow_symlinks=followlinks):
|
||||
dirs.append(entry)
|
||||
else:
|
||||
if self.max_size > entry.stat(follow_symlinks=False).st_size:
|
||||
if entry.name.endswith(self.files_extensions):
|
||||
nondirs.append(entry)
|
||||
yield path, dirs, nondirs
|
||||
# try / except used for permission denied
|
||||
except:
|
||||
pass
|
||||
|
||||
for dir in dirs:
|
||||
for res in self.scanwalk(dir.path, followlinks=followlinks):
|
||||
yield res
|
||||
|
||||
def run(self):
|
||||
for root, dirs, files in self.scanwalk(self.root_path):
|
||||
for f in files:
|
||||
# such as find command
|
||||
for s in self.search_str:
|
||||
if f.name.lower().find(s) != -1:
|
||||
yield 'File: %s\n\n' % os.path.join(root, f.name)
|
||||
|
||||
# such as grep command
|
||||
if self.check_content:
|
||||
for res in self.search_string(os.path.join(root, f.name), self.search_str):
|
||||
try:
|
||||
res = res.encode('utf-8')
|
||||
yield 'File: %s > %s\n\n' % (os.path.join(root, f.name), res)
|
||||
except:
|
||||
pass
|
Loading…
Reference in New Issue