Added/improved unit tests
This commit is contained in:
parent
a0e6e2c073
commit
8dccfdff77
|
@ -14,3 +14,4 @@ pwncat.sqlite-journal
|
|||
linpeas.txt
|
||||
NOTES.md
|
||||
db/pwncat*
|
||||
windows/
|
||||
|
|
|
@ -1302,7 +1302,10 @@ class Linux(Platform):
|
|||
# Retrieve the response (this may take some time if wrong)
|
||||
result = proc.stdout.readline().lower()
|
||||
|
||||
if result == "password: \n":
|
||||
proc.stdin.write("\n")
|
||||
proc.stdin.flush()
|
||||
|
||||
if result.strip() == "":
|
||||
result = proc.stdout.readline().lower()
|
||||
|
||||
# Check for keywords indicating failure
|
||||
|
|
|
@ -536,7 +536,7 @@ function prompt {
|
|||
|
||||
# Decode the base64 to the actual dll
|
||||
self.channel.send(
|
||||
f"""certutil -decode "{str(loader_remote_path)}" "{good_dir}\{loader_encoded_name}.dll"\n""".encode(
|
||||
f"""certutil -decode "{str(loader_remote_path)}" "{good_dir}\\{loader_encoded_name}.dll"\n""".encode(
|
||||
"utf-8"
|
||||
)
|
||||
)
|
||||
|
@ -549,7 +549,7 @@ function prompt {
|
|||
|
||||
# Search for all instances of InstallUtil within all installed .Net versions
|
||||
self.channel.send(
|
||||
"""cmd /c "dir \Windows\Microsoft.NET\* /s/b | findstr InstallUtil.exe$"\n""".encode(
|
||||
"""cmd /c "dir \\Windows\\Microsoft.NET\\* /s/b | findstr InstallUtil.exe$"\n""".encode(
|
||||
"utf-8"
|
||||
)
|
||||
)
|
||||
|
@ -570,7 +570,7 @@ function prompt {
|
|||
|
||||
# Execute Install-Util to bypass AppLocker/CLM
|
||||
self.channel.send(
|
||||
f"""{install_utils} /logfile= /LogToConsole=false /U "{good_dir}\{loader_encoded_name}.dll"\n""".encode(
|
||||
f"""{install_utils} /logfile= /LogToConsole=false /U "{good_dir}\\{loader_encoded_name}.dll"\n""".encode(
|
||||
"utf-8"
|
||||
)
|
||||
)
|
||||
|
@ -978,6 +978,15 @@ function prompt {
|
|||
|
||||
try:
|
||||
result = self.powershell(f'Get-ChildItem -Force -Path "{path}" | Select ')
|
||||
|
||||
# Check if there were no entries
|
||||
if not result:
|
||||
return []
|
||||
|
||||
# Check if there was one entry
|
||||
if isinstance(result[0], dict):
|
||||
return [result[0]["Name"]]
|
||||
|
||||
return [r["Name"] for r in (result[0] if len(result) else [])]
|
||||
except PowershellError as exc:
|
||||
if "not exist" in str(exc):
|
||||
|
@ -992,12 +1001,15 @@ function prompt {
|
|||
|
||||
raise PlatformError("lstat not implemented for Windows")
|
||||
|
||||
def mkdir(self, path: str):
|
||||
def mkdir(self, path: str, mode: int = 0o777, parents: bool = True):
|
||||
"""Create a new directory. This is implemented with the New-Item
|
||||
commandlet.
|
||||
|
||||
:param path: path to the new directory
|
||||
:type path: str
|
||||
:param mode: permissions for the directory (ignored for windows)
|
||||
:type mode: int
|
||||
:param parents: whether to create all items (defaults to True for windows)
|
||||
"""
|
||||
|
||||
self.new_item(ItemType="Directory", Path=path)
|
||||
|
@ -1103,7 +1115,7 @@ function prompt {
|
|||
return result
|
||||
|
||||
def tempfile(
|
||||
self, mode: str, length: Optional[int] = None, suffix: Optional[str] = None
|
||||
self, mode: str, length: Optional[int] = 8, suffix: Optional[str] = None
|
||||
):
|
||||
""" Create a temporary file in a safe directory. Optionally provide a suffix """
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ echo "[+] centos bind port: $CENTOS_BIND_PORT"
|
|||
echo "[+] ubuntu bind port: $UBUNTU_BIND_PORT"
|
||||
|
||||
CENTOS_HOST="127.0.0.1" CENTOS_BIND_PORT=$CENTOS_BIND_PORT UBUNTU_HOST="127.0.0.1" UBUNTU_BIND_PORT=$UBUNTU_BIND_PORT \
|
||||
pytest
|
||||
pytest $@
|
||||
|
||||
podman container kill "$CENTOS_CONTAINER"""
|
||||
echo "[+] killed centos container"
|
||||
|
|
7
test.py
7
test.py
|
@ -16,9 +16,8 @@ with pwncat.manager.Manager("data/pwncatrc") as manager:
|
|||
|
||||
# Establish a session
|
||||
# session = manager.create_session("windows", host="192.168.56.10", port=4444)
|
||||
session = manager.create_session("windows", host="192.168.122.11", port=4444)
|
||||
# session = manager.create_session("windows", host="192.168.122.11", port=4444)
|
||||
# session = manager.create_session("linux", host="pwncat-ubuntu", port=4444)
|
||||
# session = manager.create_session("windows", host="0.0.0.0", port=4444)
|
||||
session = manager.create_session("linux", host="127.0.0.1", port=4445)
|
||||
|
||||
with session.platform.open("C:\\Users\\caleb\\test", "w") as filp:
|
||||
filp.write("WAT")
|
||||
session.platform.su("john", "asdfasdfasdf")
|
||||
|
|
|
@ -7,37 +7,93 @@ import pytest
|
|||
from pwncat.util import random_string
|
||||
|
||||
|
||||
def test_file_read_write(session):
|
||||
def test_platform_file_io(session):
|
||||
""" Test file read/write of printable data """
|
||||
|
||||
# Generate random binary data
|
||||
contents = os.urandom(1024)
|
||||
|
||||
# Create a new temporary file
|
||||
with session.platform.tempfile(mode="wb") as filp:
|
||||
filp.write(contents)
|
||||
path = filp.name
|
||||
|
||||
# Ensure it exists
|
||||
assert session.platform.Path(path).exists()
|
||||
|
||||
# Read the data back and ensure it matches
|
||||
with session.platform.open(path, "rb") as filp:
|
||||
assert contents == filp.read()
|
||||
|
||||
|
||||
def test_platform_mkdir(session):
|
||||
""" Test creating a directory """
|
||||
def test_platform_dir_io(session):
|
||||
""" Test creating a directory and interacting with the contents """
|
||||
|
||||
# Create a path object representing the new remote directory
|
||||
path = session.platform.Path(random_string())
|
||||
|
||||
# Create the directory
|
||||
path.mkdir()
|
||||
|
||||
# We construct a new path object to avoid cached stat results
|
||||
assert session.platform.Path(str(path)).is_dir()
|
||||
|
||||
# Create a file
|
||||
(path / "test.txt").touch()
|
||||
|
||||
assert "test.txt" in [item.name for item in path.iterdir()]
|
||||
|
||||
|
||||
def test_platform_run(session):
|
||||
|
||||
# Ensure command output works
|
||||
output_remote = session.platform.run(
|
||||
["echo", "hello world"], capture_output=True, text=True, check=True
|
||||
["echo", "hello world"], shell=True, capture_output=True, text=True, check=True
|
||||
)
|
||||
assert output_remote.stdout == "hello world\n"
|
||||
|
||||
# Ensure we capture the process return code properly
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
session.platform.run("this_command_doesnt_exist", shell=True, check=True)
|
||||
|
||||
|
||||
def test_platform_su(session):
|
||||
""" Test running `su` """
|
||||
|
||||
try:
|
||||
session.platform.su("john", "P@ssw0rd")
|
||||
session.platform.refresh_uid()
|
||||
|
||||
assert session.current_user().name == "john"
|
||||
|
||||
with pytest.raises(PermissionError):
|
||||
session.platform.su("caleb", "wrongpassword")
|
||||
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
|
||||
def test_platform_sudo(session):
|
||||
""" Testing running `sudo` """
|
||||
|
||||
try:
|
||||
|
||||
# We have permission to run `/bin/sh *`, so this should succeed
|
||||
proc = session.platform.sudo(
|
||||
"whoami", user="john", shell=True, stdout=subprocess.PIPE, text=True
|
||||
)
|
||||
output = proc.stdout.read().strip()
|
||||
|
||||
assert proc.wait() == 0
|
||||
assert output == "john"
|
||||
|
||||
# We don't have permission to run a bare `whoami`, so this should fail
|
||||
proc = session.platform.sudo(
|
||||
["whoami"], user="john", shell=False, stdout=subprocess.PIPE, text=True
|
||||
)
|
||||
output = proc.stdout.read().strip()
|
||||
|
||||
assert proc.wait() != 0
|
||||
assert output != "john"
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
from pwncat.modules import IncorrectPlatformError
|
||||
|
||||
|
||||
def test_session_iter_users(session):
|
||||
"""Test the ability to iterate users. This happens
|
||||
implicitly with session.current_user(), but it's worth
|
||||
testing separately."""
|
||||
|
||||
assert "john" in [user.name for user in session.iter_users()]
|
||||
|
||||
|
||||
def test_session_find_user_name(session):
|
||||
""" Test that locating a user by name works """
|
||||
|
||||
assert session.find_user(name="john") is not None
|
||||
|
||||
|
||||
def test_session_find_user_uid(linux):
|
||||
""" Test locating a user by their UID (for linux only) """
|
||||
|
||||
user = linux.find_user(uid=0)
|
||||
|
||||
assert user is not None
|
||||
assert user.name == "root"
|
||||
|
||||
|
||||
def test_session_find_user_sid(windows):
|
||||
""" Test locating a user by their SID (for windows only) """
|
||||
|
||||
# This is the SID of the Administrator in the windows servercore image...
|
||||
# This will only work from the testing container, but I've decided that's fine.
|
||||
user = windows.find_user(uid="S-1-5-21-1417486881-3347836355-822217238-500")
|
||||
|
||||
assert user is not None
|
||||
assert user.name == "Administrator"
|
||||
|
||||
|
||||
def test_session_find_module(session):
|
||||
""" Test that locating modules works """
|
||||
|
||||
assert len(list(session.find_module("enumerate.*"))) > 0
|
||||
assert len(list(session.find_module("enumerate.user"))) == 1
|
||||
assert len(list(session.find_module("module_does_not_exist"))) == 0
|
||||
|
||||
|
||||
def test_session_run_module(session):
|
||||
""" Test running a module within a session """
|
||||
|
||||
# We should be able to enumerate a hostname
|
||||
facts = session.run("enumerate", types=["system.hostname"])
|
||||
assert len(facts) > 0
|
||||
|
||||
|
||||
def test_session_wrong_platform_linux(linux):
|
||||
""" Test that windows modules don't run in linux """
|
||||
|
||||
with pytest.raises(IncorrectPlatformError):
|
||||
linux.run("windows.enumerate.user")
|
||||
|
||||
|
||||
def test_session_wrong_platform_windows(windows):
|
||||
""" Test that linux modules don't run on windows """
|
||||
|
||||
with pytest.raises(IncorrectPlatformError):
|
||||
windows.run("linux.enumerate.user")
|
Loading…
Reference in New Issue