diff --git a/.gitignore b/.gitignore index 29c36c9..b00ed0a 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ testbed testing/ data/pwncat.sqlite-journal pwncat.sqlite-journal +linpeas.txt diff --git a/pwncat/__main__.py b/pwncat/__main__.py index 0ea3b34..0e03357 100644 --- a/pwncat/__main__.py +++ b/pwncat/__main__.py @@ -1,13 +1,12 @@ #!/usr/bin/env python3 -import argparse import logging import selectors import shlex import sys - -from sqlalchemy.exc import InvalidRequestError import warnings + from sqlalchemy import exc as sa_exc +from sqlalchemy.exc import InvalidRequestError import pwncat from pwncat import util diff --git a/pwncat/commands/enumerate.py b/pwncat/commands/enumerate.py index d6e6706..a4514bc 100644 --- a/pwncat/commands/enumerate.py +++ b/pwncat/commands/enumerate.py @@ -195,19 +195,28 @@ class Command(CommandDefinition): try: # Grab hostname hostname = pwncat.victim.enumerate.first("system.hostname").data - system_details.append(["Hostname", hostname]) + system_details.append(["Hostname", util.escape_markdown(hostname)]) except ValueError: hostname = "[unknown-hostname]" # Not provided by enumerate, but natively known due to our connection - system_details.append(["Primary Address", pwncat.victim.host.ip]) - system_details.append(["Derived Hash", pwncat.victim.host.hash]) + system_details.append( + ["Primary Address", util.escape_markdown(pwncat.victim.host.ip)] + ) + system_details.append( + ["Derived Hash", util.escape_markdown(pwncat.victim.host.hash)] + ) try: # Grab distribution distro = pwncat.victim.enumerate.first("system.distro").data system_details.append( - ["Distribution", f"{distro.name} ({distro.ident}) {distro.version}"] + [ + "Distribution", + util.escape_markdown( + f"{distro.name} ({distro.ident}) {distro.version}" + ), + ] ) except ValueError: pass @@ -215,7 +224,7 @@ class Command(CommandDefinition): try: # Grab the architecture arch = pwncat.victim.enumerate.first("system.arch").data - system_details.append(["Architecture", arch.arch]) + system_details.append(["Architecture", util.escape_markdown(arch.arch)]) except ValueError: pass @@ -225,7 +234,9 @@ class Command(CommandDefinition): system_details.append( [ "Kernel", - f"Linux Kernel {kernel.major}.{kernel.minor}.{kernel.patch}-{kernel.abi}", + util.escape_markdown( + f"Linux Kernel {kernel.major}.{kernel.minor}.{kernel.patch}-{kernel.abi}" + ), ] ) except ValueError: @@ -234,7 +245,7 @@ class Command(CommandDefinition): try: # Grab SELinux State selinux = pwncat.victim.enumerate.first("system.selinux").data - system_details.append(["SELinux", selinux.state]) + system_details.append(["SELinux", util.escape_markdown(selinux.state)]) except ValueError: pass @@ -242,7 +253,7 @@ class Command(CommandDefinition): # Grab ASLR State aslr = pwncat.victim.enumerate.first("system.aslr").data system_details.append( - ["ASLR", "DISABLED" if aslr.state == 0 else "ENABLED"] + ["ASLR", "disabled" if aslr.state == 0 else "enabled"] ) except ValueError: pass @@ -250,14 +261,14 @@ class Command(CommandDefinition): try: # Grab init system init = pwncat.victim.enumerate.first("system.init").data - system_details.append(["Init", init.init]) + system_details.append(["Init", util.escape_markdown(str(init.init))]) except ValueError: pass try: # Check if we are in a container container = pwncat.victim.enumerate.first("system.container").data - system_details.append(["Container", container.type]) + system_details.append(["Container", util.escape_markdown(container.type)]) except ValueError: pass @@ -369,12 +380,16 @@ class Command(CommandDefinition): if getattr(fact.data, "description", None) is not None: sections.append(fact) continue - filp.write(f"- {util.strip_ansi_escape(str(fact.data))}\n") + filp.write( + f"- {util.escape_markdown(util.strip_ansi_escape(str(fact.data)))}\n" + ) filp.write("\n") for section in sections: - filp.write(f"### {util.strip_ansi_escape(str(section.data))}\n\n") + filp.write( + f"### {util.escape_markdown(util.strip_ansi_escape(str(section.data)))}\n\n" + ) filp.write(f"```\n{section.data.description}\n```\n\n") def show_facts(self, typ: str, provider: str, long: bool): diff --git a/pwncat/enumerate/fstab.py b/pwncat/enumerate/fstab.py new file mode 100644 index 0000000..460b357 --- /dev/null +++ b/pwncat/enumerate/fstab.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +import dataclasses +from typing import Generator, List + +from colorama import Fore + +from pwncat.enumerate import FactData +import pwncat + +name = "pwncat.enumerate.fstab" +provides = "system.fstab" +per_user = False + + +@dataclasses.dataclass +class FstabEntry(FactData): + + spec: str + """ The FS Specification (e.g. /dev/sda1 or UUID=XXXX) """ + target: str + """ The target location for this mount (e.g. /mnt/mydisk or /home) """ + fstype: str + """ The type of filesystem being mounted (e.g. ext4 or bind) """ + options: List[str] + """ The list of options associated with this mount (split on comma) """ + freq: int + """ Whether to dump this filesystem (defaults to zero, fifth field, see fstab(5)) """ + passno: int + """ Order of fsck at boot time. See fstab(5) and fsck(8). """ + mounted: bool + """ Whether this is currently mounted (not from fstab, but cross-referenced w/ /proc/mount) """ + + def __str__(self): + if self.mounted: + return ( + f"{Fore.BLUE}{self.spec}{Fore.RESET} {Fore.GREEN}mounted{Fore.RESET} at " + f"{Fore.YELLOW}{self.target}{Fore.RESET} " + f"as {Fore.CYAN}{self.fstype}{Fore.RESET}" + ) + else: + return ( + f"{Fore.BLUE}{self.spec}{Fore.RESET} {Fore.RED}available{Fore.RESET} to " + f"mount at {Fore.YELLOW}{self.target}{Fore.RESET} " + f"as {Fore.CYAN}{self.fstype}{Fore.RESET}" + ) + + @property + def description(self): + return "\t".join( + [ + self.spec, + self.target, + self.fstype, + ",".join(self.options), + str(self.freq), + str(self.passno), + ] + ) + + +def enumerate() -> Generator[FactData, None, None]: + """ + Enumerate filesystems in /etc/fstab. At some point, this should mark + file systems with their mount status, but I'm not sure how to resolve + the UUID= entries intelligently right now, so for now it just results + in returning the entries in the fstab. + + :return: + """ + + try: + with pwncat.victim.open("/etc/fstab", "r") as filp: + for line in filp: + line = line.strip() + if line.startswith("#") or line == "": + continue + try: + spec, target, fstype, options, *entries = line.split() + # Optional entries + freq = int(entries[0]) if entries else "0" + passno = int(entries[1]) if len(entries) > 1 else "0" + except (ValueError, IndexError): + # Badly formatted line + continue + yield FstabEntry( + spec, target, fstype, options.split(","), freq, passno, False + ) + except (FileNotFoundError, PermissionError): + pass diff --git a/pwncat/enumerate/processes.py b/pwncat/enumerate/processes.py new file mode 100644 index 0000000..8b47fed --- /dev/null +++ b/pwncat/enumerate/processes.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +import dataclasses +from typing import Generator, List +import shlex + +from colorama import Fore + +import pwncat +from pwncat.enumerate import FactData + +name = "pwncat.enumerate.processes" +provides = "process" +per_user = False + + +@dataclasses.dataclass +class ProcessData(FactData): + + uid: int + pid: int + ppid: int + argv: List[str] + + def __str__(self): + if isinstance(self.uid, str): + user = self.uid + color = Fore.YELLOW + else: + if self.uid == 0: + color = Fore.RED + elif self.uid < 1000: + color = Fore.BLUE + else: + color = Fore.MAGENTA + + # Color our current user differently + if self.uid == pwncat.victim.current_user.id: + color = Fore.LIGHTBLUE_EX + + user = self.user.name + + result = f"{color}{user:>10s}{Fore.RESET} " + result += f"{Fore.MAGENTA}{self.pid:<7d}{Fore.RESET} " + result += f"{Fore.LIGHTMAGENTA_EX}{self.ppid:<7d}{Fore.RESET} " + result += f"{Fore.CYAN}{shlex.join(self.argv)}{Fore.RESET}" + + return result + + @property + def user(self) -> pwncat.db.User: + return pwncat.victim.find_user_by_id(self.uid) + + +def enumerate() -> Generator[FactData, None, None]: + """ + Enumerate all running processes. Extract command line, user, and pts information + :return: + """ + + ps = pwncat.victim.which("ps") + + if ps is not None: + with pwncat.victim.subprocess(f"{ps} -elfww --no-headers", "r") as filp: + # Skip first line... it's just the headers + try: + # next(filp) + pass + except StopIteration: + pass + + # Iterate over each process + for line in filp: + line = line.strip().decode("utf-8") + + entities = line.split() + _, _, username, pid, ppid, _, _, _, _, _, _, _, _, _, *argv = entities + if username not in pwncat.victim.users: + uid = username + else: + uid = pwncat.victim.users[username].id + + command = " ".join(argv) + # Kernel threads aren't helpful for us + if command.startswith("[") and command.endswith("]"): + continue + + pid = int(pid) + ppid = int(ppid) + + yield ProcessData(uid, pid, ppid, argv) + else: + # We should try to parse /proc. It's slow, but should work. + # I'll implement that later. + pass diff --git a/pwncat/enumerate/system/sudo.py b/pwncat/enumerate/system/sudo.py new file mode 100644 index 0000000..b3a2eb5 --- /dev/null +++ b/pwncat/enumerate/system/sudo.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +import dataclasses +from typing import Generator, List +import re + +from colorama import Fore + +from pwncat.enumerate import FactData +from pwncat import util +import pwncat + +name = "pwncat.enumerate.system" +provides = "system.sudo_version" +per_user = False + + +@dataclasses.dataclass +class SudoVersion(FactData): + """ + Version of the installed sudo binary may be useful for exploitation + + """ + + version: str + output: str + vulnerable: bool + + def __str__(self): + result = f"{Fore.YELLOW}sudo{Fore.RESET} version {Fore.CYAN}{self.version}{Fore.RESET}" + if self.vulnerable: + result += f" (may be {Fore.RED}vulnerable{Fore.RESET})" + return result + + @property + def description(self): + result = self.output + if self.vulnerable: + result = result.rstrip("\n") + "\n\n" + result += ( + f'This version may be vulnerable. Check against "searchsploit sudo"' + ) + return result + + +def enumerate() -> Generator[FactData, None, None]: + """ + Enumerate kernel/OS version information + :return: + """ + + try: + # Check the sudo version number + result = pwncat.victim.env(["sudo", "--version"]).decode("utf-8").strip() + except FileNotFoundError: + return + + # Taken from here: + # https://book.hacktricks.xyz/linux-unix/privilege-escalation#sudo-version + known_vulnerable = [ + "1.6.8p9", + "1.6.9p18", + "1.8.14", + "1.8.20", + "1.6.9p21", + "1.7.2p4", + "1.8.0", + "1.8.1", + "1.8.2", + "1.8.3", + "1.4", + "1.5", + "1.6", + ] + + # Can we match this output to a specific sudo version? + match = re.search(r"sudo version ([0-9]+\.[0-9]+\.[^\s]*)", result, re.IGNORECASE) + if match is not None and match.group(1) is not None: + vulnerable = False + # Is this in our list of known vulnerable versions? Not a guarantee, but + # a rough quick check. + for v in known_vulnerable: + if match.group(1).startswith(v): + vulnerable = True + break + + yield SudoVersion(match.group(1), result, vulnerable) + return + + # We couldn't parse the version out, but at least give the full version + # output in the long form/report of enumeration. + yield SudoVersion("unknown", result, False) diff --git a/pwncat/privesc/__init__.py b/pwncat/privesc/__init__.py index 6841067..3097c44 100644 --- a/pwncat/privesc/__init__.py +++ b/pwncat/privesc/__init__.py @@ -577,7 +577,7 @@ class Finder: authkeys_path, ("\n".join(authkeys) + "\n").encode("utf-8"), writer ) - if len(readers) == 0: + if not readers: # We couldn't read their authkeys, but log that we clobbered it. # The user asked us to. At least create an un-removable tamper # noting that we clobbered this file. @@ -773,10 +773,7 @@ class Finder: def in_chain(self, user: str, chain: List[Tuple["Technique", str]]) -> bool: """ Check if the given user is in the chain """ - for link in chain: - if link[0].user == user: - return True - return False + return any(link[0].user == user for link in chain) def unwrap(self, techniques: List[Tuple["Technique", str]]): # Work backwards to get back to the original shell diff --git a/pwncat/privesc/dirtycow.py b/pwncat/privesc/dirtycow.py index 5c3dafd..c8b9783 100644 --- a/pwncat/privesc/dirtycow.py +++ b/pwncat/privesc/dirtycow.py @@ -43,9 +43,7 @@ class Method_disabled(BaseMethod): if triplet[0] == 4 and triplet[1] == 4 and triplet[2] >= 26: raise PrivescError("kernel seemingly not vulnerable") - techniques = [Technique("root", self, None, Capability.SHELL)] - - return techniques + return [Technique("root", self, None, Capability.SHELL)] def execute(self, technique: Technique): """ Run the specified technique """ diff --git a/pwncat/remote/victim.py b/pwncat/remote/victim.py index 1a3e4c8..0ed7b51 100644 --- a/pwncat/remote/victim.py +++ b/pwncat/remote/victim.py @@ -5,8 +5,7 @@ import os import shlex import socket import sys -import time -from typing import Dict, Optional, IO, Any, List, Tuple, Iterator, Union, Generator +from typing import Dict, Optional, Any, List, Tuple, Iterator, Union, Generator import paramiko import pkg_resources @@ -16,17 +15,17 @@ from sqlalchemy.engine import Engine, create_engine from sqlalchemy.orm import Session, sessionmaker import pwncat.db -from pwncat import privesc +import pwncat.enumerate from pwncat import persist +from pwncat import privesc from pwncat import util from pwncat.commands import CommandParser from pwncat.config import Config, KeyType from pwncat.file import RemoteBinaryPipe from pwncat.gtfobins import GTFOBins, Capability, Stream from pwncat.remote import RemoteService -from pwncat.tamper import Tamper, TamperManager +from pwncat.tamper import TamperManager from pwncat.util import State -import pwncat.enumerate def remove_busybox_tamper(): @@ -381,7 +380,6 @@ class Victim: returned from ``victim.which`` vice local versions. :param url: a base url for compiled versions of busybox - :param type: str """ if self.host.busybox is not None: @@ -573,7 +571,7 @@ class Victim: cache entries for the remote host to speed up pwncat. Further, if busybox is installed, it will return busybox version of binaries without asking the remote host. - + :param name: the name of the remote binary (e.g. "touch"). :type name: str :param quote: whether to quote the returned string with shlex. @@ -614,7 +612,7 @@ class Victim: r""" Process local input from ``stdin``. This is used internally to handle keyboard shortcuts and pass data to the remote host when in raw mode. - + :param data: the newly entered data :type data: bytes """ @@ -667,14 +665,14 @@ class Victim: The current state of ``pwncat``. Changing this property has side-effects beyond just modifying a variable. Switching to RAW mode will close the local terminal automatically and enter RAW/no-echo mode in the local terminal. - + Setting command mode will not return until command mode is exited, and enters the CommandProcessor input loop. - + Setting SINGLE mode is like COMMAND mode except it will return after one local command is entered and executed. - - + + :return: pwncat.util.State """ return self._state @@ -742,10 +740,10 @@ class Victim: ``env`` command-line program. The only difference is that there is no way to clear the current environment. This will also resolve argv[0] to ensure it exists on the remote system. - + If the specified binary does not exist on the remote host, a FileNotFoundError is raised. - + :param argv: the argument list. argv[0] is the command to run. :type argv: List[str] :param envp: a dictionary of environment variables to set @@ -827,7 +825,7 @@ class Victim: and end of command output. This method will wait for the starting delimeter before returning. The output of the command can then be retrieved from the ``victim.client`` socket. - + :param cmd: the command to run on the remote host :type cmd: str :param delim: whether to wrap the output in delimeters @@ -841,11 +839,7 @@ class Victim: sdelim = util.random_string(10) edelim = util.random_string(10) - if delim: - command = f" echo; echo {sdelim}; {cmd}; echo {edelim}" - else: - command = f" {cmd}" - + command = f" echo; echo {sdelim}; {cmd}; echo {edelim}" if delim else f" {cmd}" # Send the command to the remote host self.client.send(command.encode("utf-8") + b"\n") @@ -877,14 +871,14 @@ class Victim: file-like object is closed, no other interaction with the remote host can occur (this will result in a deadlock). It is recommended to wrap uses of this object in a ``with`` statement: - + .. code-block:: python - + with pwncat.victim.subprocess("find / -name interesting", "r") as stdout: for file_path in stdout: print("Interesting file:", file_path.strip().decode("utf-8")) - - + + :param cmd: the command to execute :param mode: a mode string like with the standard "open" function :param data: data to send to the remote process prior to waiting for output @@ -962,7 +956,7 @@ class Victim: Retrieve the size of a remote file. This method raises a FileNotFoundError if the remote file does not exist. It may also raise PermissionError if the remote file is not readable. - + :param path: path to the remote file :type path: str :return: int @@ -1000,8 +994,8 @@ class Victim: the remote ``test`` command to interrogate the given path and it's parent directory. If the ``test`` and ``[`` commands are not available, Access.NONE is returned. - - + + :param path: the remote file path :type path: str :return: pwncat.util.Access flags @@ -1035,8 +1029,10 @@ class Victim: access |= util.Access.EXECUTE if b"exists" in result: access |= util.Access.EXISTS - if b"write" in result or ( - b"parent_write" in result and not b"exists" in result + if ( + b"write" in result + or b"parent_write" in result + and b"exists" not in result ): access |= util.Access.WRITE if b"read" in result: @@ -1091,7 +1087,7 @@ class Victim: This method implements the underlying read logic for the ``open`` method. It shouldn't be called directly. It may raise a FileNotFoundError or PermissionError depending on access to the requested file. - + :param path: the path to the remote file :type path: str :param mode: the open mode for the remote file (supports "b" and text modes) @@ -1175,7 +1171,7 @@ class Victim: access = self.access(path) if util.Access.DIRECTORY in access: raise IsADirectoryError(f"Is a directory: '{path}'") - if util.Access.EXISTS in access and not util.Access.WRITE in access: + if util.Access.EXISTS in access and util.Access.WRITE not in access: raise PermissionError(f"Permission denied: '{path}'") if util.Access.EXISTS not in access: if util.Access.PARENT_EXIST not in access: @@ -1236,7 +1232,7 @@ class Victim: or process stream is open. This will cause a dead-lock. This method may raise a FileNotFoundError or PermissionDenied in case of access issues with the remote file. - + :param path: remote file path :type path: str :param mode: the open mode; this cannot contain both read and write! @@ -1265,7 +1261,7 @@ class Victim: Create a remote temporary file and open it in the specified mode. The mode must contain "w", as opening a new file for reading makes not sense. If "b" is not included, the file will be opened in text mode. - + :param mode: the mode string as with ``victim.open`` :type mode: str :param length: length of the expected data (as with ``open``) @@ -1296,7 +1292,7 @@ class Victim: known and an abstract RemoteService layer is implemented for the init system. Currently, only ``systemd`` is understood by pwncat, but facilities to implement more abstracted init systems is built-in. - + :return: Iterator[RemoteService] """ @@ -1312,7 +1308,7 @@ class Victim: Locate a remote service by name. This uses the same interface as the ``services`` property, meaning a supported ``init`` system must be used on the remote host. If the service is not found, a ValueError is raised. - + :param name: the name of the remote service :type name: str :param user: whether to lookup user services (e.g. ``systemctl --user``) @@ -1341,7 +1337,7 @@ class Victim: A ValueError is raised if the init system is not understood by ``pwncat``. A PermissionError may be raised if insufficient permissions are found to create the service. - + :param name: the name of the remote service :type name: str :param description: the description for the remote service @@ -1373,9 +1369,9 @@ class Victim: and then utilized to switch the active user of your shell. If ``check`` is specified, do not actually switch users. Only check that the given password is correct. - + Raises PermissionError if the password is incorrect or the ``su`` fails. - + :param user: the user to switch to :type user: str :param password: the password for the specified user or None if currently UID=0 @@ -1542,7 +1538,7 @@ class Victim: def flush_output(self, some=False): """ Flush any data in the socket buffer. - + :param some: if true, wait for at least one byte of data before flushing. :type some: bool """ @@ -1569,7 +1565,7 @@ class Victim: """ Retrieve the currently pending data in the socket buffer without removing the data from the buffer. - + :param some: if true, wait for at least one byte of data to be received :type some: bool :return: bytes @@ -1598,7 +1594,7 @@ class Victim: """ Reset the remote terminal using the ``reset`` command. This also restores your prompt, and sets up the environment correctly for ``pwncat``. - + """ if hard: self.run("reset", wait=False) @@ -1615,7 +1611,7 @@ class Victim: Receive data from the socket until the specified string of bytes is found. There is no timeout features, so you should be 100% sure these bytes will end up in the output of the remote process at some point. - + :param needle: the bytes to search for :type needle: bytes :param flags: flags to pass to the underlying ``recv`` call @@ -1646,7 +1642,7 @@ class Victim: def whoami(self): """ Use the ``whoami`` command to retrieve the current user name. - + :return: str, the current user name """ return self.cached_user @@ -1662,7 +1658,7 @@ class Victim: def getenv(self, name: str): """ Utilize ``echo`` to get the current value of the given environment variable. - + :param name: environment variable name :type name: str :return: str @@ -1674,9 +1670,9 @@ class Victim: """ Retrieves a dictionary representing the result of the ``id`` command. The resulting dictionary looks like: - + .. code-block:: python - + { "uid": { "name": "username", "id": 1000 }, "gid": { "name": "username", "id": 1000 }, @@ -1685,7 +1681,7 @@ class Victim: "groups": [ {"name": "wheel", "id": 10} ], "context": "SELinux context" } - + :return: Dict[str,Any] """ @@ -1699,14 +1695,14 @@ class Victim: id_properties = {} for key, value in props.items(): - if key == "groups": + if key == "context": + id_properties["context"] = value.split(":") + elif key == "groups": groups = [] for group in value.split(","): p = group.split("(") groups.append({"id": int(p[0]), "name": p[1].split(")")[0]}) id_properties["groups"] = groups - elif key == "context": - id_properties["context"] = value.split(":") else: p = value.split("(") id_properties[key] = {"id": int(p[0]), "name": p[1].split(")")[0]} @@ -1726,7 +1722,7 @@ class Victim: """ Reload user and group information from /etc/passwd and /etc/group and update the local database. - + """ ident = self.id @@ -1831,7 +1827,7 @@ class Victim: """ Return a list of users from the local user database cache. If the users have not been requested yet, this willc all ``victim.reload_users``. - + :return: Dict[str, pwncat.db.User] """ @@ -1858,7 +1854,7 @@ class Victim: def find_user_by_id(self, uid: int): """ Locate a user in the database with the specified user ID. - + :param uid: the user id to look up :type uid: int :returns: str diff --git a/pwncat/util.py b/pwncat/util.py index 338056f..3c9a9e0 100644 --- a/pwncat/util.py +++ b/pwncat/util.py @@ -71,17 +71,13 @@ def isprintable(data) -> bool: if type(data) is str: data = data.encode("utf-8") - for c in data: - if c not in bytes(string.printable, "ascii"): - return False - - return True + return all(c in bytes(string.printable, "ascii") for c in data) def human_readable_size(size, decimal_places=2): for unit in ["B", "KiB", "MiB", "GiB", "TiB"]: if size < 1024.0: - break + return f"{size:.{decimal_places}f}{unit}" size /= 1024.0 return f"{size:.{decimal_places}f}{unit}" @@ -94,9 +90,7 @@ def human_readable_delta(seconds): if seconds < 60: return f"{seconds:.2f} seconds" - output = [] - output.append(f"{int(seconds%60)} seconds") - + output = [f"{int(seconds % 60)} seconds"] minutes = seconds // 60 output.append(f"{minutes % 60} minutes") @@ -142,6 +136,15 @@ def strip_ansi_escape(s: str) -> str: return ansi_escape_pattern.sub("", s) +def escape_markdown(s: str) -> str: + """ + Escape any markdown special characters + :param s: + :return: + """ + return re.sub(r"""([\\`*_}{\[\]()#+!])""", r"\\\1", s) + + def copyfileobj(src, dst, callback, nomv=False): """ Copy a file object to another file object with a callback. This method assumes that both files are binary and support readinto @@ -330,7 +333,7 @@ def log(level, message, overlay=False): "prog": f"[{Fore.CYAN}+{Fore.RESET}]", } - if overlay or (LAST_LOG_MESSAGE[1] and (level == "success" or level == "error")): + if overlay or (LAST_LOG_MESSAGE[1] and level in ["success", "error"]): erase_progress() elif LAST_LOG_MESSAGE[1]: sys.stdout.write("\n")