code cleanup and optimizations

This commit is contained in:
wh1te909 2022-10-02 21:42:46 +00:00
parent fb9ec2b040
commit ade1a73966
44 changed files with 222 additions and 234 deletions

13
.vscode/settings.json vendored
View File

@ -1,7 +1,10 @@
{
"python.defaultInterpreterPath": "api/tacticalrmm/env/bin/python",
"python.defaultInterpreterPath": "api/env/bin/python",
"python.languageServer": "Pylance",
"python.analysis.extraPaths": ["api/tacticalrmm", "api/env"],
"python.analysis.extraPaths": [
"api/tacticalrmm",
"api/env"
],
"python.analysis.diagnosticSeverityOverrides": {
"reportUnusedImport": "error",
"reportDuplicateImport": "error",
@ -22,7 +25,9 @@
"**env/**"
],
"python.formatting.provider": "black",
"mypy.targets": ["api/tacticalrmm"],
"mypy.targets": [
"api/tacticalrmm"
],
"mypy.runUsingActiveInterpreter": true,
"editor.bracketPairColorization.enabled": true,
"editor.guides.bracketPairs": true,
@ -70,4 +75,4 @@
"completeUnimported": true,
"staticcheck": true
}
}
}

View File

@ -1,5 +1,6 @@
import os
import subprocess
from contextlib import suppress
import pyotp
from django.core.management.base import BaseCommand
@ -25,7 +26,7 @@ class Command(BaseCommand):
nginx = "/etc/nginx/sites-available/frontend.conf"
found = None
if os.path.exists(nginx):
try:
with suppress(Exception):
with open(nginx, "r") as f:
for line in f:
if "server_name" in line:
@ -35,8 +36,6 @@ class Command(BaseCommand):
if found:
rep = found.replace("server_name", "").replace(";", "")
domain = "".join(rep.split())
except:
pass
code = pyotp.random_base32()
user.totp_key = code

View File

@ -31,8 +31,8 @@ class RolesPerms(permissions.BasePermission):
def has_permission(self, r, view) -> bool:
if r.method == "GET":
return _has_perm(r, "can_list_roles")
else:
return _has_perm(r, "can_manage_roles")
return _has_perm(r, "can_manage_roles")
class APIKeyPerms(permissions.BasePermission):

View File

@ -64,7 +64,7 @@ class Command(BaseCommand):
try:
agent.delete()
except Exception as e:
err = f"Failed to delete agent {agent.hostname}: {str(e)}"
err = f"Failed to delete agent {agent.hostname}: {e}"
self.stdout.write(self.style.ERROR(err))
else:
deleted_count += 1

View File

@ -1,6 +1,7 @@
import asyncio
import re
from collections import Counter
from contextlib import suppress
from distutils.version import LooseVersion
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union, cast
@ -130,8 +131,8 @@ class Agent(BaseAuditModel):
# return the default timezone unless the timezone is explicity set per agent
if self.time_zone:
return self.time_zone
else:
return get_core_settings().default_time_zone
return get_core_settings().default_time_zone
@property
def is_posix(self) -> bool:
@ -232,12 +233,12 @@ class Agent(BaseAuditModel):
alert_severity = (
check.check_result.alert_severity
if check.check_type
in [
in (
CheckType.MEMORY,
CheckType.CPU_LOAD,
CheckType.DISK_SPACE,
CheckType.SCRIPT,
]
)
else check.alert_severity
)
if alert_severity == AlertSeverity.ERROR:
@ -333,8 +334,8 @@ class Agent(BaseAuditModel):
if len(ret) == 1:
return cast(str, ret[0])
else:
return ", ".join(ret) if ret else "error getting local ips"
return ", ".join(ret) if ret else "error getting local ips"
@property
def make_model(self) -> str:
@ -344,7 +345,7 @@ class Agent(BaseAuditModel):
except:
return "error getting make/model"
try:
with suppress(Exception):
comp_sys = self.wmi_detail["comp_sys"][0]
comp_sys_prod = self.wmi_detail["comp_sys_prod"][0]
make = [x["Vendor"] for x in comp_sys_prod if "Vendor" in x][0]
@ -361,14 +362,10 @@ class Agent(BaseAuditModel):
model = sysfam
return f"{make} {model}"
except:
pass
try:
with suppress(Exception):
comp_sys_prod = self.wmi_detail["comp_sys_prod"][0]
return cast(str, [x["Version"] for x in comp_sys_prod if "Version" in x][0])
except:
pass
return "unknown make/model"
@ -479,7 +476,7 @@ class Agent(BaseAuditModel):
models.prefetch_related_objects(
[
policy
for policy in [self.policy, site_policy, client_policy, default_policy]
for policy in (self.policy, site_policy, client_policy, default_policy)
if policy
],
"excluded_agents",
@ -589,7 +586,7 @@ class Agent(BaseAuditModel):
def approve_updates(self) -> None:
patch_policy = self.get_patch_policy()
severity_list = list()
severity_list = []
if patch_policy.critical == "approve":
severity_list.append("Critical")
@ -623,7 +620,7 @@ class Agent(BaseAuditModel):
policies = self.get_agent_policies()
processed_policies: List[int] = list()
processed_policies: List[int] = []
for _, policy in policies.items():
if (
policy
@ -683,7 +680,7 @@ class Agent(BaseAuditModel):
policies = self.get_agent_policies()
# loop through all policies applied to agent and return an alert_template if found
processed_policies: List[int] = list()
processed_policies: List[int] = []
for key, policy in policies.items():
# default alert_template will override a default policy with alert template applied
if (
@ -873,7 +870,7 @@ class Agent(BaseAuditModel):
return AgentAuditSerializer(agent).data
def delete_superseded_updates(self) -> None:
try:
with suppress(Exception):
pks = [] # list of pks to delete
kbs = list(self.winupdates.values_list("kb", flat=True))
d = Counter(kbs)
@ -898,8 +895,6 @@ class Agent(BaseAuditModel):
pks = list(set(pks))
self.winupdates.filter(pk__in=pks).delete()
except:
pass
def should_create_alert(
self, alert_template: "Optional[AlertTemplate]" = None
@ -1018,16 +1013,16 @@ class AgentCustomField(models.Model):
return cast(List[str], self.multiple_value)
elif self.field.type == CustomFieldType.CHECKBOX:
return self.bool_value
else:
return cast(str, self.string_value)
return cast(str, self.string_value)
def save_to_field(self, value: Union[List[Any], bool, str]) -> None:
if self.field.type in [
if self.field.type in (
CustomFieldType.TEXT,
CustomFieldType.NUMBER,
CustomFieldType.SINGLE,
CustomFieldType.DATETIME,
]:
):
self.string_value = cast(str, value)
self.save()
elif self.field.type == CustomFieldType.MULTIPLE:

View File

@ -122,5 +122,5 @@ class AgentHistoryPerms(permissions.BasePermission):
return _has_perm(r, "can_list_agent_history") and _has_perm_on_agent(
r.user, view.kwargs["agent_id"]
)
else:
return _has_perm(r, "can_list_agent_history")
return _has_perm(r, "can_list_agent_history")

View File

@ -100,21 +100,21 @@ class AgentTableSerializer(serializers.ModelSerializer):
if not obj.alert_template:
return None
else:
return {
"name": obj.alert_template.name,
"always_email": obj.alert_template.agent_always_email,
"always_text": obj.alert_template.agent_always_text,
"always_alert": obj.alert_template.agent_always_alert,
}
return {
"name": obj.alert_template.name,
"always_email": obj.alert_template.agent_always_email,
"always_text": obj.alert_template.agent_always_text,
"always_alert": obj.alert_template.agent_always_alert,
}
def get_logged_username(self, obj) -> str:
if obj.logged_in_username == "None" and obj.status == AGENT_STATUS_ONLINE:
return obj.last_logged_in_user
elif obj.logged_in_username != "None":
return obj.logged_in_username
else:
return "-"
return "-"
def get_italic(self, obj) -> bool:
return obj.logged_in_username == "None" and obj.status == AGENT_STATUS_ONLINE

View File

@ -1,6 +1,7 @@
import asyncio
import tempfile
import urllib.parse
from pathlib import Path
from django.conf import settings
from django.http import FileResponse
@ -54,9 +55,7 @@ def generate_linux_install(
f"{core.mesh_site}/meshagents?id={mesh_id}&installflags=2&meshinstall={arch_id}"
)
sh = settings.LINUX_AGENT_SCRIPT
with open(sh, "r") as f:
text = f.read()
text = Path(settings.LINUX_AGENT_SCRIPT).read_text()
replace = {
"agentDLChange": download_url,

View File

@ -4,6 +4,7 @@ import os
import random
import string
import time
from pathlib import Path
from django.conf import settings
from django.db.models import Count, Exists, OuterRef, Prefetch, Q
@ -239,11 +240,9 @@ class GetUpdateDeleteAgent(APIView):
code = "foo" # stub for windows
if agent.plat == AgentPlat.LINUX:
with open(settings.LINUX_AGENT_SCRIPT, "r") as f:
code = f.read()
code = Path(settings.LINUX_AGENT_SCRIPT).read_text()
elif agent.plat == AgentPlat.DARWIN:
with open(settings.MAC_UNINSTALL, "r") as f:
code = f.read()
code = Path(settings.MAC_UNINSTALL).read_text()
asyncio.run(agent.nats_cmd({"func": "uninstall", "code": code}, wait=False))
name = agent.hostname
@ -255,7 +254,7 @@ class GetUpdateDeleteAgent(APIView):
asyncio.run(remove_mesh_agent(uri, mesh_id))
except Exception as e:
DebugLog.error(
message=f"Unable to remove agent {name} from meshcentral database: {str(e)}",
message=f"Unable to remove agent {name} from meshcentral database: {e}",
log_type=DebugLogType.AGENT_ISSUES,
)
return Response(f"{name} will now be uninstalled.")
@ -273,7 +272,7 @@ class AgentProcesses(APIView):
agent = get_object_or_404(Agent, agent_id=agent_id)
r = asyncio.run(agent.nats_cmd(data={"func": "procs"}, timeout=5))
if r == "timeout" or r == "natsdown":
if r in ("timeout", "natsdown"):
return notify_error("Unable to contact the agent")
return Response(r)
@ -284,7 +283,7 @@ class AgentProcesses(APIView):
agent.nats_cmd({"func": "killproc", "procpid": int(pid)}, timeout=15)
)
if r == "timeout" or r == "natsdown":
if r in ("timeout", "natsdown"):
return notify_error("Unable to contact the agent")
elif r != "ok":
return notify_error(r)
@ -416,7 +415,7 @@ def get_event_log(request, agent_id, logtype, days):
},
}
r = asyncio.run(agent.nats_cmd(data, timeout=timeout + 2))
if r == "timeout" or r == "natsdown":
if r in ("timeout", "natsdown"):
return notify_error("Unable to contact the agent")
return Response(r)
@ -654,10 +653,7 @@ def install_agent(request):
elif request.data["installMethod"] == "powershell":
ps = os.path.join(settings.BASE_DIR, "core/installer.ps1")
with open(ps, "r") as f:
text = f.read()
text = Path(settings.BASE_DIR / "core" / "installer.ps1").read_text()
replace_dict = {
"innosetupchange": inno,
@ -684,8 +680,7 @@ def install_agent(request):
except Exception as e:
DebugLog.error(message=str(e))
with open(ps1, "w") as f:
f.write(text)
Path(ps1).write_text(text)
if settings.DEBUG:
with open(ps1, "r") as f:
@ -1014,10 +1009,10 @@ def agent_maintenance(request):
if count:
action = "disabled" if not request.data["action"] else "enabled"
return Response(f"Maintenance mode has been {action} on {count} agents")
else:
return Response(
f"No agents have been put in maintenance mode. You might not have permissions to the resources."
)
return Response(
f"No agents have been put in maintenance mode. You might not have permissions to the resources."
)
@api_view(["GET"])

View File

@ -617,7 +617,7 @@ class Alert(models.Model):
if not args:
return []
temp_args = list()
temp_args = []
# pattern to match for injection
pattern = re.compile(".*\\{\\{alert\\.(.*)\\}\\}.*")

View File

@ -32,7 +32,7 @@ def _has_perm_on_alert(user: "User", id: int) -> bool:
class AlertPerms(permissions.BasePermission):
def has_permission(self, r, view) -> bool:
if r.method == "GET" or r.method == "PATCH":
if r.method in ("GET", "PATCH"):
if "pk" in view.kwargs.keys():
return _has_perm(r, "can_list_alerts") and _has_perm_on_alert(
r.user, view.kwargs["pk"]
@ -52,5 +52,5 @@ class AlertTemplatePerms(permissions.BasePermission):
def has_permission(self, r, view) -> bool:
if r.method == "GET":
return _has_perm(r, "can_list_alerttemplates")
else:
return _has_perm(r, "can_manage_alerttemplates")
return _has_perm(r, "can_manage_alerttemplates")

View File

@ -41,13 +41,13 @@ class GetAddAlerts(APIView):
elif any(
key
in [
in (
"timeFilter",
"clientFilter",
"severityFilter",
"resolvedFilter",
"snoozedFilter",
]
)
for key in request.data.keys()
):
clientFilter = Q()

View File

@ -218,12 +218,12 @@ class Policy(BaseAuditModel):
def get_policy_tasks(agent: "Agent") -> "List[AutomatedTask]":
# List of all tasks to be applied
tasks = list()
tasks = []
# Get policies applied to agent and agent site and client
policies = agent.get_agent_policies()
processed_policies = list()
processed_policies = []
for _, policy in policies.items():
if policy and policy.active and policy.pk not in processed_policies:
@ -244,10 +244,10 @@ class Policy(BaseAuditModel):
# Used to hold the policies that will be applied and the order in which they are applied
# Enforced policies are applied first
enforced_checks = list()
policy_checks = list()
enforced_checks = []
policy_checks = []
processed_policies = list()
processed_policies = []
for _, policy in policies.items():
if policy and policy.active and policy.pk not in processed_policies:
@ -263,24 +263,24 @@ class Policy(BaseAuditModel):
return []
# Sorted Checks already added
added_diskspace_checks: List[str] = list()
added_ping_checks: List[str] = list()
added_winsvc_checks: List[str] = list()
added_script_checks: List[int] = list()
added_eventlog_checks: List[List[str]] = list()
added_cpuload_checks: List[int] = list()
added_memory_checks: List[int] = list()
added_diskspace_checks: List[str] = []
added_ping_checks: List[str] = []
added_winsvc_checks: List[str] = []
added_script_checks: List[int] = []
added_eventlog_checks: List[List[str]] = []
added_cpuload_checks: List[int] = []
added_memory_checks: List[int] = []
# Lists all agent and policy checks that will be returned
diskspace_checks: "List[Check]" = list()
ping_checks: "List[Check]" = list()
winsvc_checks: "List[Check]" = list()
script_checks: "List[Check]" = list()
eventlog_checks: "List[Check]" = list()
cpuload_checks: "List[Check]" = list()
memory_checks: "List[Check]" = list()
diskspace_checks: "List[Check]" = []
ping_checks: "List[Check]" = []
winsvc_checks: "List[Check]" = []
script_checks: "List[Check]" = []
eventlog_checks: "List[Check]" = []
cpuload_checks: "List[Check]" = []
memory_checks: "List[Check]" = []
overridden_checks: List[int] = list()
overridden_checks: List[int] = []
# Loop over checks in with enforced policies first, then non-enforced policies
for check in enforced_checks + agent_checks + policy_checks:

View File

@ -7,5 +7,5 @@ class AutomationPolicyPerms(permissions.BasePermission):
def has_permission(self, r, view) -> bool:
if r.method == "GET":
return _has_perm(r, "can_list_automation_policies")
else:
return _has_perm(r, "can_manage_automation_policies")
return _has_perm(r, "can_manage_automation_policies")

View File

@ -1,6 +1,7 @@
import asyncio
import random
import string
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import pytz
@ -262,13 +263,13 @@ class AutomatedTask(BaseAuditModel):
else True,
}
if self.task_type in [
if self.task_type in (
TaskType.RUN_ONCE,
TaskType.DAILY,
TaskType.WEEKLY,
TaskType.MONTHLY,
TaskType.MONTHLY_DOW,
]:
):
# set runonce task in future if creating and run_asap_after_missed is set
if (
not editing
@ -432,10 +433,8 @@ class AutomatedTask(BaseAuditModel):
if r != "ok" and "The system cannot find the file specified" not in r:
task_result.sync_status = TaskSyncStatus.PENDING_DELETION
try:
with suppress(DatabaseError):
task_result.save(update_fields=["sync_status"])
except DatabaseError:
pass
DebugLog.warning(
agent=agent,

View File

@ -100,13 +100,13 @@ class TaskSerializer(serializers.ModelSerializer):
# run_time_date required
if (
data["task_type"]
in [
in (
TaskType.RUN_ONCE,
TaskType.DAILY,
TaskType.WEEKLY,
TaskType.MONTHLY,
TaskType.MONTHLY_DOW,
]
)
and not data["run_time_date"]
):
raise serializers.ValidationError(
@ -188,13 +188,12 @@ class TaskSerializer(serializers.ModelSerializer):
if not alert_template:
return None
else:
return {
"name": alert_template.name,
"always_email": alert_template.task_always_email,
"always_text": alert_template.task_always_text,
"always_alert": alert_template.task_always_alert,
}
return {
"name": alert_template.name,
"always_email": alert_template.task_always_email,
"always_text": alert_template.task_always_text,
"always_alert": alert_template.task_always_alert,
}
class Meta:
model = AutomatedTask

View File

@ -1,6 +1,7 @@
import asyncio
import datetime as dt
import random
from contextlib import suppress
from time import sleep
from typing import Optional, Union
@ -16,60 +17,64 @@ from tacticalrmm.constants import DebugLogType
@app.task
def create_win_task_schedule(pk: int, agent_id: Optional[str] = None) -> str:
try:
with suppress(
AutomatedTask.DoesNotExist,
Agent.DoesNotExist,
):
task = AutomatedTask.objects.get(pk=pk)
if agent_id:
task.create_task_on_agent(Agent.objects.get(agent_id=agent_id))
else:
task.create_task_on_agent()
except (AutomatedTask.DoesNotExist, Agent.DoesNotExist):
pass
return "ok"
@app.task
def modify_win_task(pk: int, agent_id: Optional[str] = None) -> str:
try:
with suppress(
AutomatedTask.DoesNotExist,
Agent.DoesNotExist,
):
task = AutomatedTask.objects.get(pk=pk)
if agent_id:
task.modify_task_on_agent(Agent.objects.get(agent_id=agent_id))
else:
task.modify_task_on_agent()
except (AutomatedTask.DoesNotExist, Agent.DoesNotExist):
pass
return "ok"
@app.task
def delete_win_task_schedule(pk: int, agent_id: Optional[str] = None) -> str:
try:
with suppress(
AutomatedTask.DoesNotExist,
Agent.DoesNotExist,
):
task = AutomatedTask.objects.get(pk=pk)
if agent_id:
task.delete_task_on_agent(Agent.objects.get(agent_id=agent_id))
else:
task.delete_task_on_agent()
except (AutomatedTask.DoesNotExist, Agent.DoesNotExist):
pass
return "ok"
@app.task
def run_win_task(pk: int, agent_id: Optional[str] = None) -> str:
try:
with suppress(
AutomatedTask.DoesNotExist,
Agent.DoesNotExist,
):
task = AutomatedTask.objects.get(pk=pk)
if agent_id:
task.run_win_task(Agent.objects.get(agent_id=agent_id))
else:
task.run_win_task()
except (AutomatedTask.DoesNotExist, Agent.DoesNotExist):
pass
return "ok"

View File

@ -149,8 +149,8 @@ class Check(BaseAuditModel):
def __str__(self):
if self.agent:
return f"{self.agent.hostname} - {self.readable_desc}"
else:
return f"{self.policy.name} - {self.readable_desc}"
return f"{self.policy.name} - {self.readable_desc}"
def save(self, *args, **kwargs):
@ -198,10 +198,7 @@ class Check(BaseAuditModel):
return f"{display}: Drive {self.disk} - {text}"
elif self.check_type == CheckType.PING:
return f"{display}: {self.name}"
elif (
self.check_type == CheckType.CPU_LOAD or self.check_type == CheckType.MEMORY
):
elif self.check_type in (CheckType.CPU_LOAD, CheckType.MEMORY):
text = ""
if self.warning_threshold:
text += f" Warning Threshold: {self.warning_threshold}%"
@ -215,8 +212,8 @@ class Check(BaseAuditModel):
return f"{display}: {self.name}"
elif self.check_type == CheckType.SCRIPT:
return f"{display}: {self.script.name}"
else:
return "n/a"
return "n/a"
@staticmethod
def non_editable_fields() -> list[str]:
@ -335,12 +332,12 @@ class CheckResult(models.Model):
def save(self, *args, **kwargs):
# if check is a policy check clear cache on everything
if not self.alert_severity and self.assigned_check.check_type in [
if not self.alert_severity and self.assigned_check.check_type in (
CheckType.MEMORY,
CheckType.CPU_LOAD,
CheckType.DISK_SPACE,
CheckType.SCRIPT,
]:
):
self.alert_severity = AlertSeverity.WARNING
super(CheckResult, self).save(

View File

@ -5,7 +5,7 @@ from tacticalrmm.permissions import _has_perm, _has_perm_on_agent
class ChecksPerms(permissions.BasePermission):
def has_permission(self, r, view) -> bool:
if r.method == "GET" or r.method == "PATCH":
if r.method in ("GET", "PATCH"):
if "agent_id" in view.kwargs.keys():
return _has_perm(r, "can_list_checks") and _has_perm_on_agent(
r.user, view.kwargs["agent_id"]

View File

@ -43,13 +43,13 @@ class CheckSerializer(serializers.ModelSerializer):
if not alert_template:
return None
else:
return {
"name": alert_template.name,
"always_email": alert_template.check_always_email,
"always_text": alert_template.check_always_text,
"always_alert": alert_template.check_always_alert,
}
return {
"name": alert_template.name,
"always_email": alert_template.check_always_email,
"always_text": alert_template.check_always_text,
"always_alert": alert_template.check_always_alert,
}
class Meta:
model = Check

View File

@ -170,5 +170,5 @@ def run_checks(request, agent_id):
return notify_error(f"Checks are already running on {agent.hostname}")
elif r == "ok":
return Response(f"Checks will now be re-run on {agent.hostname}")
else:
return notify_error("Unable to contact the agent")
return notify_error("Unable to contact the agent")

View File

@ -229,16 +229,16 @@ class ClientCustomField(models.Model):
return self.multiple_value
elif self.field.type == CustomFieldType.CHECKBOX:
return self.bool_value
else:
return self.string_value
return self.string_value
def save_to_field(self, value):
if self.field.type in [
if self.field.type in (
CustomFieldType.TEXT,
CustomFieldType.NUMBER,
CustomFieldType.SINGLE,
CustomFieldType.DATETIME,
]:
):
self.string_value = value
self.save()
elif self.field.type == CustomFieldType.MULTIPLE:
@ -280,16 +280,16 @@ class SiteCustomField(models.Model):
return self.multiple_value
elif self.field.type == CustomFieldType.CHECKBOX:
return self.bool_value
else:
return self.string_value
return self.string_value
def save_to_field(self, value):
if self.field.type in [
if self.field.type in (
CustomFieldType.TEXT,
CustomFieldType.NUMBER,
CustomFieldType.SINGLE,
CustomFieldType.DATETIME,
]:
):
self.string_value = value
self.save()
elif self.field.type == CustomFieldType.MULTIPLE:

View File

@ -12,7 +12,7 @@ class ClientsPerms(permissions.BasePermission):
)
else:
return _has_perm(r, "can_list_clients")
elif r.method == "PUT" or r.method == "DELETE":
elif r.method in ("PUT", "DELETE"):
return _has_perm(r, "can_manage_clients") and _has_perm_on_client(
r.user, view.kwargs["pk"]
)
@ -29,7 +29,7 @@ class SitesPerms(permissions.BasePermission):
)
else:
return _has_perm(r, "can_list_sites")
elif r.method == "PUT" or r.method == "DELETE":
elif r.method in ("PUT", "DELETE"):
return _has_perm(r, "can_manage_sites") and _has_perm_on_site(
r.user, view.kwargs["pk"]
)
@ -41,5 +41,5 @@ class DeploymentPerms(permissions.BasePermission):
def has_permission(self, r, view) -> bool:
if r.method == "GET":
return _has_perm(r, "can_list_deployments")
else:
return _has_perm(r, "can_manage_deployments")
return _has_perm(r, "can_manage_deployments")

View File

@ -1,6 +1,7 @@
import datetime as dt
import re
import uuid
from contextlib import suppress
from django.db.models import Count, Exists, OuterRef, Prefetch, prefetch_related_objects
from django.shortcuts import get_object_or_404
@ -338,10 +339,8 @@ class AgentDeployment(APIView):
if not _has_perm_on_site(request.user, d.site.pk):
raise PermissionDenied()
try:
with suppress(Exception):
d.auth_token.delete()
except:
pass
d.delete()
return Response("The deployment was deleted")

View File

@ -1,4 +1,5 @@
import asyncio
from contextlib import suppress
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
@ -24,10 +25,8 @@ class DashInfo(AsyncJsonWebsocketConsumer):
async def disconnect(self, close_code):
try:
with suppress(Exception):
self.dash_info.cancel()
except:
pass
self.connected = False
await self.close()

View File

@ -1,3 +1,5 @@
from contextlib import suppress
from django.core.exceptions import ValidationError
from django.core.management.base import BaseCommand
@ -8,9 +10,7 @@ class Command(BaseCommand):
help = "Populates the global site settings on first install"
def handle(self, *args, **kwargs):
try:
# can only be 1 instance of this. Prevents error when rebuilding docker container
with suppress(ValidationError):
CoreSettings().save()
self.stdout.write("Core db populated")
except ValidationError:
# can only be 1 instance of this. Prevents error when rebuilding docker container
pass

View File

@ -1,4 +1,5 @@
import smtplib
from contextlib import suppress
from email.message import EmailMessage
from typing import TYPE_CHECKING, List, Optional, cast
@ -108,12 +109,10 @@ class CoreSettings(BaseAuditModel):
# for install script
if not self.pk:
try:
with suppress(Exception):
self.mesh_site = settings.MESH_SITE
self.mesh_username = settings.MESH_USERNAME.lower()
self.mesh_token = settings.MESH_TOKEN_KEY
except:
pass
old_settings = type(self).objects.get(pk=self.pk) if self.pk else None
super(BaseAuditModel, self).save(*args, **kwargs)
@ -315,8 +314,8 @@ class CustomField(BaseAuditModel):
return self.default_values_multiple
elif self.type == CustomFieldType.CHECKBOX:
return self.default_value_bool
else:
return self.default_value_string
return self.default_value_string
def get_or_create_field_value(self, instance):
from agents.models import Agent, AgentCustomField

View File

@ -7,8 +7,8 @@ class CoreSettingsPerms(permissions.BasePermission):
def has_permission(self, r, view) -> bool:
if r.method == "GET":
return _has_perm(r, "can_view_core_settings")
else:
return _has_perm(r, "can_edit_core_settings")
return _has_perm(r, "can_edit_core_settings")
class URLActionPerms(permissions.BasePermission):
@ -30,5 +30,5 @@ class CustomFieldPerms(permissions.BasePermission):
def has_permission(self, r, view) -> bool:
if r.method == "GET":
return _has_perm(r, "can_view_customfields")
else:
return _has_perm(r, "can_manage_customfields")
return _has_perm(r, "can_manage_customfields")

View File

@ -1,4 +1,5 @@
import re
from pathlib import Path
import psutil
import pytz
@ -6,8 +7,8 @@ from cryptography import x509
from django.conf import settings
from django.http import JsonResponse
from django.shortcuts import get_object_or_404
from django.views.decorators.csrf import csrf_exempt
from django.utils import timezone as djangotime
from django.views.decorators.csrf import csrf_exempt
from rest_framework.decorators import api_view, permission_classes
from rest_framework.exceptions import PermissionDenied
from rest_framework.permissions import IsAuthenticated
@ -175,8 +176,8 @@ class GetAddCustomFields(APIView):
if "model" in request.data.keys():
fields = CustomField.objects.filter(model=request.data["model"])
return Response(CustomFieldSerializer(fields, many=True).data)
else:
return notify_error("The request was invalid")
return notify_error("The request was invalid")
def post(self, request):
serializer = CustomFieldSerializer(data=request.data, partial=True)
@ -231,7 +232,7 @@ class CodeSign(APIView):
except Exception as e:
return notify_error(str(e))
if r.status_code == 400 or r.status_code == 401:
if r.status_code in (400, 401):
return notify_error(r.json()["ret"])
elif r.status_code == 200:
t = CodeSignToken.objects.first()
@ -414,8 +415,7 @@ def status(request):
mem_usage: int = round(psutil.virtual_memory().percent)
cert_file, _ = get_certs()
with open(cert_file, "rb") as f:
cert_bytes = f.read()
cert_bytes = Path(cert_file).read_bytes()
cert = x509.load_pem_x509_certificate(cert_bytes)
expires = pytz.utc.localize(cert.not_valid_after)

View File

@ -282,7 +282,7 @@ class DebugLog(models.Model):
agent: "Optional[Agent]" = None,
log_type: str = DebugLogType.SYSTEM_ISSUES,
) -> None:
if get_debug_level() in [DebugLogLevel.INFO]:
if get_debug_level() == DebugLogLevel.INFO:
cls.objects.create(
log_level=DebugLogLevel.INFO,
agent=agent,
@ -297,7 +297,7 @@ class DebugLog(models.Model):
agent: "Optional[Agent]" = None,
log_type: str = DebugLogType.SYSTEM_ISSUES,
) -> None:
if get_debug_level() in [DebugLogLevel.INFO, DebugLogLevel.WARN]:
if get_debug_level() in (DebugLogLevel.INFO, DebugLogLevel.WARN):
cls.objects.create(
log_level=DebugLogLevel.INFO,
agent=agent,
@ -312,11 +312,11 @@ class DebugLog(models.Model):
agent: "Optional[Agent]" = None,
log_type: str = DebugLogType.SYSTEM_ISSUES,
) -> None:
if get_debug_level() in [
if get_debug_level() in (
DebugLogLevel.INFO,
DebugLogLevel.WARN,
DebugLogLevel.ERROR,
]:
):
cls.objects.create(
log_level=DebugLogLevel.ERROR,
agent=agent,
@ -331,12 +331,12 @@ class DebugLog(models.Model):
agent: "Optional[Agent]" = None,
log_type: str = DebugLogType.SYSTEM_ISSUES,
) -> None:
if get_debug_level() in [
if get_debug_level() in (
DebugLogLevel.INFO,
DebugLogLevel.WARN,
DebugLogLevel.ERROR,
DebugLogLevel.CRITICAL,
]:
):
cls.objects.create(
log_level=DebugLogLevel.CRITICAL,
agent=agent,
@ -376,8 +376,8 @@ class PendingAction(models.Model):
return "Next update cycle"
elif self.action_type == PAAction.CHOCO_INSTALL:
return "ASAP"
else:
return "On next checkin"
return "On next checkin"
@property
def description(self) -> Optional[str]:
@ -390,15 +390,15 @@ class PendingAction(models.Model):
elif self.action_type == PAAction.CHOCO_INSTALL:
return f"{self.details['name']} software install"
elif self.action_type in [
elif self.action_type in (
PAAction.RUN_CMD,
PAAction.RUN_SCRIPT,
PAAction.RUN_PATCH_SCAN,
PAAction.RUN_PATCH_INSTALL,
]:
):
return f"{self.action_type}"
else:
return None
return None
class BaseAuditModel(models.Model):

View File

@ -16,8 +16,8 @@ class AuditLogSerializer(serializers.ModelSerializer):
return SiteMinimumSerializer(
Agent.objects.get(agent_id=obj.agent_id).site
).data
else:
return None
return None
class Meta:
model = AuditLog

View File

@ -47,7 +47,7 @@ class Script(BaseAuditModel):
@property
def code_no_snippets(self):
return self.script_body if self.script_body else ""
return self.script_body or ""
@property
def code(self):
@ -70,8 +70,8 @@ class Script(BaseAuditModel):
snippet.group(), value.replace("\\", "\\\\"), replaced_code
)
return replaced_code
else:
return code
return code
def hash_script_body(self):
from django.conf import settings
@ -113,14 +113,14 @@ class Script(BaseAuditModel):
else 90
)
args = script["args"] if "args" in script.keys() else list()
args = script["args"] if "args" in script.keys() else []
syntax = script["syntax"] if "syntax" in script.keys() else ""
supported_platforms = (
script["supported_platforms"]
if "supported_platforms" in script.keys()
else list()
else []
)
# if community script exists update it
@ -188,12 +188,12 @@ class Script(BaseAuditModel):
return ScriptSerializer(script).data
@classmethod
def parse_script_args(cls, agent, shell: str, args: List[str] = list()) -> list:
def parse_script_args(cls, agent, shell: str, args: List[str] = []) -> list:
if not args:
return []
temp_args = list()
temp_args = []
# pattern to match for injection
pattern = re.compile(".*\\{\\{(.*)\\}\\}.*")

View File

@ -7,5 +7,5 @@ class ScriptsPerms(permissions.BasePermission):
def has_permission(self, r, view) -> bool:
if r.method == "GET":
return _has_perm(r, "can_list_scripts")
else:
return _has_perm(r, "can_manage_scripts")
return _has_perm(r, "can_manage_scripts")

View File

@ -9,5 +9,5 @@ class WinSvcsPerms(permissions.BasePermission):
return _has_perm(r, "can_manage_winsvcs") and _has_perm_on_agent(
r.user, view.kwargs["agent_id"]
)
else:
return _has_perm(r, "can_manage_winsvcs")
return _has_perm(r, "can_manage_winsvcs")

View File

@ -41,7 +41,7 @@ class GetServices(APIView):
agent = get_object_or_404(Agent, agent_id=agent_id)
r = asyncio.run(agent.nats_cmd(data={"func": "winservices"}, timeout=10))
if r == "timeout" or r == "natsdown":
if r in ("timeout", "natsdown"):
return notify_error("Unable to contact the agent")
agent.services = r

View File

@ -13,7 +13,6 @@ class SoftwarePerms(permissions.BasePermission):
return _has_perm(r, "can_list_software")
else:
return _has_perm(r, "can_manage_software") and _has_perm_on_agent(
r.user, view.kwargs["agent_id"]
)
return _has_perm(r, "can_manage_software") and _has_perm_on_agent(
r.user, view.kwargs["agent_id"]
)

View File

@ -22,8 +22,8 @@ def chocos(request):
chocos = ChocoSoftware.objects.last()
if not chocos:
return Response({})
else:
return Response(chocos.chocos)
return Response(chocos.chocos)
class GetSoftware(APIView):
@ -79,7 +79,7 @@ class GetSoftware(APIView):
return notify_error(f"Not available for {agent.plat}")
r: Any = asyncio.run(agent.nats_cmd({"func": "softwarelist"}, timeout=15))
if r == "timeout" or r == "natsdown":
if r in ("timeout", "natsdown"):
return notify_error("Unable to contact the agent")
if not InstalledSoftware.objects.filter(agent=agent).exists():

View File

@ -1,4 +1,5 @@
import threading
from contextlib import suppress
from typing import Any, Dict, Optional
from django.conf import settings
@ -62,7 +63,7 @@ class AuditMiddleware:
request = APIView().initialize_request(request)
# check if user is authenticated
try:
with suppress(AuthenticationFailed):
if hasattr(request, "user") and request.user.is_authenticated:
try:
@ -83,8 +84,6 @@ class AuditMiddleware:
# get authenticated user after request
request_local.username = request.user.username
except AuthenticationFailed:
pass
def process_exception(self, request, exception):
request_local.debug_info = None

View File

@ -26,7 +26,7 @@ class PermissionQuerySet(models.QuerySet):
model_name = self.model._meta.label.split(".")[1]
# checks which sites and clients the user has access to and filters agents
if model_name in ["Agent", "Deployment"]:
if model_name in ("Agent", "Deployment"):
if can_view_clients:
clients_queryset = models.Q(site__client__in=can_view_clients)
@ -81,7 +81,7 @@ class PermissionQuerySet(models.QuerySet):
return self
# if model that is being filtered is a Check or Automated task we need to allow checks/tasks that are associated with policies
if model_name in ["Check", "AutomatedTask", "DebugLog"] and (
if model_name in ("Check", "AutomatedTask", "DebugLog") and (
can_view_clients or can_view_sites
):
agent_queryset = models.Q(agent=None) # dont filter if agent is None

View File

@ -1,4 +1,5 @@
import os
from contextlib import suppress
from datetime import timedelta
from pathlib import Path
@ -73,10 +74,8 @@ HOSTED = False
SWAGGER_ENABLED = False
REDIS_HOST = "127.0.0.1"
try:
with suppress(ImportError):
from .local_settings import *
except ImportError:
pass
if "GHACTIONS" in os.environ:
DEBUG = False

View File

@ -28,7 +28,7 @@ from tacticalrmm.constants import (
DebugLogType,
ScriptShell,
)
from tacticalrmm.helpers import get_certs, notify_error, get_nats_ports
from tacticalrmm.helpers import get_certs, get_nats_ports, notify_error
def generate_winagent_exe(
@ -112,7 +112,7 @@ def bitdays_to_string(day: int) -> str:
return "Every day"
for key, value in WEEK_DAYS.items():
if day & int(value):
if day & value:
ret.append(key)
return ", ".join(ret)
@ -123,7 +123,7 @@ def bitmonths_to_string(month: int) -> str:
return "Every month"
for key, value in MONTHS.items():
if month & int(value):
if month & value:
ret.append(key)
return ", ".join(ret)
@ -134,7 +134,7 @@ def bitweeks_to_string(week: int) -> str:
return "Every week"
for key, value in WEEKS.items():
if week & int(value):
if week & value:
ret.append(key)
return ", ".join(ret)
@ -144,11 +144,11 @@ def bitmonthdays_to_string(day: int) -> str:
if day == MONTH_DAYS["Last Day"]:
return "Last day"
elif day == 2147483647 or day == 4294967295:
elif day in (2147483647, 4294967295):
return "Every day"
for key, value in MONTH_DAYS.items():
if day & int(value):
if day & value:
ret.append(key)
return ", ".join(ret)
@ -157,8 +157,8 @@ def convert_to_iso_duration(string: str) -> str:
tmp = string.upper()
if "D" in tmp:
return f"P{tmp.replace('D', 'DT')}"
else:
return f"PT{tmp}"
return f"PT{tmp}"
def reload_nats() -> None:
@ -391,5 +391,5 @@ def format_shell_array(value: list[str]) -> str:
def format_shell_bool(value: bool, shell: Optional[str]) -> str:
if shell == ScriptShell.POWERSHELL:
return "$True" if value else "$False"
else:
return "1" if value else "0"
return "1" if value else "0"

View File

@ -143,8 +143,8 @@ class WinUpdatePolicy(BaseAuditModel):
def __str__(self):
if self.agent:
return self.agent.hostname
else:
return self.policy.name
return self.policy.name
@staticmethod
def serialize(policy):

View File

@ -9,5 +9,5 @@ class AgentWinUpdatePerms(permissions.BasePermission):
return _has_perm(r, "can_manage_winupdates") and _has_perm_on_agent(
r.user, view.kwargs["agent_id"]
)
else:
return _has_perm(r, "can_manage_winupdates")
return _has_perm(r, "can_manage_winupdates")

View File

@ -1,6 +1,7 @@
import asyncio
import datetime as dt
import time
from contextlib import suppress
import pytz
from django.utils import timezone as djangotime
@ -123,10 +124,10 @@ def bulk_install_updates_task(pks: list[int]) -> None:
for chunk in chunks:
for agent in chunk:
agent.delete_superseded_updates()
try:
with suppress(Exception):
agent.approve_updates()
except:
pass
nats_data = {
"func": "installwinupdates",
"guids": agent.get_approved_update_guids(),