From 82624d665775536c386480791b5ff84eeefe7119 Mon Sep 17 00:00:00 2001 From: sadnub Date: Fri, 8 Apr 2022 23:13:45 -0400 Subject: [PATCH] fix tests and more typing --- api/tacticalrmm/agents/models.py | 56 +++++--- .../migrations/0012_alter_alert_agent.py | 20 +++ api/tacticalrmm/alerts/models.py | 61 ++++---- api/tacticalrmm/alerts/permissions.py | 11 +- api/tacticalrmm/alerts/tests.py | 133 ++++++++++-------- api/tacticalrmm/apiv3/views.py | 2 +- api/tacticalrmm/automation/models.py | 2 +- api/tacticalrmm/autotasks/models.py | 16 +-- api/tacticalrmm/checks/models.py | 1 - ...ettings_email_alert_recipients_and_more.py | 54 +++++++ api/tacticalrmm/core/models.py | 2 +- api/tacticalrmm/core/tests.py | 1 - api/tacticalrmm/logs/models.py | 3 +- api/tacticalrmm/tacticalrmm/models.py | 12 +- api/tacticalrmm/tacticalrmm/test.py | 21 ++- 15 files changed, 250 insertions(+), 145 deletions(-) create mode 100644 api/tacticalrmm/alerts/migrations/0012_alter_alert_agent.py create mode 100644 api/tacticalrmm/core/migrations/0032_alter_coresettings_email_alert_recipients_and_more.py diff --git a/api/tacticalrmm/agents/models.py b/api/tacticalrmm/agents/models.py index 91d57fb8..06016435 100644 --- a/api/tacticalrmm/agents/models.py +++ b/api/tacticalrmm/agents/models.py @@ -416,7 +416,7 @@ class Agent(BaseAuditModel): def run_script( self, scriptpk: int, - args: list[str] = [], + args: List[str] = [], timeout: int = 120, full: bool = False, wait: bool = False, @@ -503,6 +503,7 @@ class Agent(BaseAuditModel): # returns agent policy merged with a client or site specific policy def get_patch_policy(self) -> "WinUpdatePolicy": + from winupdate.models import WinUpdatePolicy # check if site has a patch policy and if so use it patch_policy = None @@ -510,11 +511,11 @@ class Agent(BaseAuditModel): agent_policy = self.winupdatepolicy.first() if not agent_policy: - raise WinUpdatePolicy.DoesNotExist + agent_policy = WinUpdatePolicy.objects.create(agent=self) policies = self.get_agent_policies() - processed_policies: "List[int]" = list() + processed_policies: List[int] = list() for _, policy in policies.items(): if ( policy @@ -664,32 +665,43 @@ class Agent(BaseAuditModel): def get_checks_from_policies(self) -> "List[Check]": from automation.models import Policy - cached_checks = cache.get(f"site_{self.site.id}_checks") + cache_checks = False + if not self.policy and not self.agentchecks.exists(): + cached_checks = cache.get(f"site_{self.site.id}_checks") - if cached_checks and isinstance(cached_checks, list): - return cached_checks - else: - # clear agent checks that have overridden_by_policy set - self.agentchecks.update(overridden_by_policy=False) # type: ignore + if cached_checks and isinstance(cached_checks, list): + return cached_checks + else: + cached_checks = True - # get agent checks based on policies - checks = Policy.get_policy_checks(self) + # clear agent checks that have overridden_by_policy set + self.agentchecks.update(overridden_by_policy=False) + + # get agent checks based on policies + checks = Policy.get_policy_checks(self) + + if cache_checks: cache.set(f"site_{self.site.id}_checks", checks, 300) - return checks + return checks def get_tasks_from_policies(self) -> "List[AutomatedTask]": from automation.models import Policy - cached_tasks = cache.get(f"site_{self.site.id}_tasks") + cache_tasks = False + if not self.policy: + cached_tasks = cache.get(f"site_{self.site.id}_tasks") - if cached_tasks and isinstance(cached_tasks, list): - return cached_tasks - else: - # get agent tasks based on policies - tasks = Policy.get_policy_tasks(self) + if cached_tasks and isinstance(cached_tasks, list): + return cached_tasks + else: + cached_tasks = True + # get agent tasks based on policies + tasks = Policy.get_policy_tasks(self) + + if cache_tasks: cache.set(f"site_{self.site.id}_tasks", tasks, 300) - return tasks + return tasks def _do_nats_debug(self, agent, message): DebugLog.error(agent=agent, log_type="agent_issues", message=message) @@ -734,16 +746,16 @@ class Agent(BaseAuditModel): await nc.close() @staticmethod - def serialize(class_name: "Agent") -> Dict[str, Any]: + def serialize(agent: "Agent") -> Dict[str, Any]: # serializes the agent and returns json from .serializers import AgentAuditSerializer - return AgentAuditSerializer(class_name).data + return AgentAuditSerializer(agent).data def delete_superseded_updates(self) -> None: try: pks = [] # list of pks to delete - kbs = list(self.winupdates.values_list("kb", flat=True)) # type: ignore + kbs = list(self.winupdates.values_list("kb", flat=True)) d = Counter(kbs) dupes = [k for k, v in d.items() if v > 1] diff --git a/api/tacticalrmm/alerts/migrations/0012_alter_alert_agent.py b/api/tacticalrmm/alerts/migrations/0012_alter_alert_agent.py new file mode 100644 index 00000000..43ce930b --- /dev/null +++ b/api/tacticalrmm/alerts/migrations/0012_alter_alert_agent.py @@ -0,0 +1,20 @@ +# Generated by Django 4.0.3 on 2022-04-09 02:58 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('agents', '0047_alter_agent_plat_alter_agent_site'), + ('alerts', '0011_alter_alert_agent'), + ] + + operations = [ + migrations.AlterField( + model_name='alert', + name='agent', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='agent', to='agents.agent'), + ), + ] diff --git a/api/tacticalrmm/alerts/models.py b/api/tacticalrmm/alerts/models.py index de1385d9..e802a096 100644 --- a/api/tacticalrmm/alerts/models.py +++ b/api/tacticalrmm/alerts/models.py @@ -39,6 +39,8 @@ class Alert(models.Model): "agents.Agent", related_name="agent", on_delete=models.CASCADE, + null=True, + blank=True, ) assigned_check = models.ForeignKey( "checks.Check", @@ -82,7 +84,7 @@ class Alert(models.Model): max_length=100, null=True, blank=True ) - def __str__(self): + def __str__(self) -> str: return self.message @property @@ -149,16 +151,16 @@ class Alert(models.Model): @classmethod def create_or_return_check_alert( cls, - check: Check, - agent: Optional[Agent] = None, + check: "Check", + agent: "Agent", alert_severity: Optional[str] = None, skip_create: bool = False, - ) -> Optional[Alert]: + ) -> "Optional[Alert]": # need to pass agent if the check is a policy if not cls.objects.filter( assigned_check=check, - agent=agent if check.policy else check.agent, + agent=agent, resolved=False, ).exists(): if skip_create: @@ -168,13 +170,13 @@ class Alert(models.Model): Alert, cls.objects.create( assigned_check=check, - agent=agent if check.policy else check.agent, + agent=agent, alert_type="check", severity=check.alert_severity if check.check_type not in ["memory", "cpuload", "diskspace", "script"] else alert_severity, - message=f"{agent.hostname if agent else check.agent.hostname} has a {check.check_type} check: {check.readable_desc} that failed.", + message=f"{agent.hostname} has a {check.check_type} check: {check.readable_desc} that failed.", hidden=True, ), ) @@ -184,14 +186,14 @@ class Alert(models.Model): Alert, cls.objects.get( assigned_check=check, - agent=agent if check.policy else check.agent, + agent=agent, resolved=False, ), ) except cls.MultipleObjectsReturned: alerts = cls.objects.filter( assigned_check=check, - agent=agent if check.policy else check.agent, + agent=agent, resolved=False, ) last_alert = cast(Alert, alerts.last()) @@ -208,41 +210,48 @@ class Alert(models.Model): @classmethod def create_or_return_task_alert( cls, - task: AutomatedTask, - agent: Optional[Agent] = None, + task: "AutomatedTask", + agent: "Agent", skip_create: bool = False, - ) -> Optional[Alert]: + ) -> "Optional[Alert]": if not cls.objects.filter( assigned_task=task, - agent=agent if task.policy else task.agent, + agent=agent, resolved=False, ).exists(): if skip_create: return None - return cls.objects.create( - assigned_task=task, - agent=agent if task.policy else task.agent, - alert_type="task", - severity=task.alert_severity, - message=f"{agent.hostname if agent else task.agent.hostname } has task: {task.name} that failed.", - hidden=True, + return cast( + Alert, + cls.objects.create( + assigned_task=task, + agent=agent, + alert_type="task", + severity=task.alert_severity, + message=f"{agent.hostname} has task: {task.name} that failed.", + hidden=True, + ), ) + else: try: - return cls.objects.get( - assigned_task=task, - agent=agent if task.policy else task.agent, - resolved=False, + return cast( + Alert, + cls.objects.get( + assigned_task=task, + agent=agent, + resolved=False, + ), ) except cls.MultipleObjectsReturned: alerts = cls.objects.filter( assigned_task=task, - agent=agent if task.policy else task.agent, + agent=agent, resolved=False, ) - last_alert = cast(cls, alerts.last()) + last_alert = cast(Alert, alerts.last()) # cycle through other alerts and resolve for alert in alerts: diff --git a/api/tacticalrmm/alerts/permissions.py b/api/tacticalrmm/alerts/permissions.py index 9bd599dd..b58f0962 100644 --- a/api/tacticalrmm/alerts/permissions.py +++ b/api/tacticalrmm/alerts/permissions.py @@ -1,10 +1,13 @@ from django.shortcuts import get_object_or_404 from rest_framework import permissions - +from typing import TYPE_CHECKING from tacticalrmm.permissions import _has_perm, _has_perm_on_agent +if TYPE_CHECKING: + from accounts.models import User -def _has_perm_on_alert(user, id: int) -> bool: + +def _has_perm_on_alert(user: "User", id: int) -> bool: from alerts.models import Alert role = user.role @@ -19,10 +22,6 @@ def _has_perm_on_alert(user, id: int) -> bool: if alert.agent: agent_id = alert.agent.agent_id - elif alert.assigned_check: - agent_id = alert.assigned_check.agent.agent_id - elif alert.assigned_task: - agent_id = alert.assigned_task.agent.agent_id else: return True diff --git a/api/tacticalrmm/alerts/tests.py b/api/tacticalrmm/alerts/tests.py index 1c1ada4d..ceb23a77 100644 --- a/api/tacticalrmm/alerts/tests.py +++ b/api/tacticalrmm/alerts/tests.py @@ -1506,11 +1506,16 @@ class TestAlertPermissions(TacticalTestCase): checks = baker.make("checks.Check", agent=cycle(agents), _quantity=3) tasks = baker.make("autotasks.AutomatedTask", agent=cycle(agents), _quantity=3) baker.make( - "alerts.Alert", alert_type="task", assigned_task=cycle(tasks), _quantity=3 + "alerts.Alert", + alert_type="task", + agent=cycle(agents), + assigned_task=cycle(tasks), + _quantity=3, ) baker.make( "alerts.Alert", alert_type="check", + agent=cycle(agents), assigned_check=cycle(checks), _quantity=3, ) @@ -1560,11 +1565,16 @@ class TestAlertPermissions(TacticalTestCase): checks = baker.make("checks.Check", agent=cycle(agents), _quantity=3) tasks = baker.make("autotasks.AutomatedTask", agent=cycle(agents), _quantity=3) alert_tasks = baker.make( - "alerts.Alert", alert_type="task", assigned_task=cycle(tasks), _quantity=3 + "alerts.Alert", + alert_type="task", + agent=cycle(agents), + assigned_task=cycle(tasks), + _quantity=3, ) alert_checks = baker.make( "alerts.Alert", alert_type="check", + agent=cycle(agents), assigned_check=cycle(checks), _quantity=3, ) @@ -1644,72 +1654,75 @@ class TestAlertPermissions(TacticalTestCase): for url in unauthorized_urls: self.check_authorized(method, url) + def test_handling_multiple_availability_alerts_returned(self): + agent = baker.make_recipe("agents.agent") + alerts = baker.make( + "alerts.Alert", + alert_type="availability", + agent=agent, + resolved=False, + _quantity=3, + ) -def test_handling_multiple_availability_alerts_returned(self): - agent = baker.make_recipe("agents.agent") - alerts = baker.make( - "alerts.Alerts", - alert_type="availability", - agent=agent, - resolved=False, - _quantity=3, - ) + alert = Alert.create_or_return_availability_alert(agent, skip_create=True) - alert = Alert.create_or_return_availability_alert(agent, skip_create=True) + # make sure last alert is returned + self.assertEqual(alert, alerts[-1]) - # make sure last alert is returned - self.assertEqual(alert, alerts[-1]) + # make sure only 1 alert is not resolved + self.assertEqual( + Alert.objects.filter( + alert_type="availability", agent=agent, resolved=False + ).count(), + 1, + ) - # make sure only 1 alert is not resolved - self.assertEqual( - Alert.objects.filter( - alert_type="availability", agent=agent, resolved=False - ).count(), - 1, - ) + def test_handling_multiple_check_alerts_returned(self): + agent = baker.make_recipe("agents.agent") + check = baker.make_recipe("checks.diskspace_check", agent=agent) + alerts = baker.make( + "alerts.Alert", + alert_type="check", + assigned_check=check, + agent=agent, + resolved=False, + _quantity=3, + ) + alert = Alert.create_or_return_check_alert(check, agent=agent, skip_create=True) -def test_handling_multiple_check_alerts_returned(self): - agent = baker.make_recipe("agents.agent") - check = baker.make_recipe("checks.diskspace_check", agent=agent) - alerts = baker.make( - "alerts.Alerts", - alert_type="check", - assigned_check=check, - agent=agent, - resolved=False, - _quantity=3, - ) + # make sure last alert is returned + self.assertEqual(alert, alerts[-1]) - alert = Alert.create_or_return_check_alert(check, skip_create=True) + # make sure only 1 alert is not resolved + self.assertEqual( + Alert.objects.filter( + alert_type="check", agent=agent, resolved=False + ).count(), + 1, + ) - # make sure last alert is returned - self.assertEqual(alert, alerts[-1]) + def test_handling_multiple_task_alerts_returned(self): + agent = baker.make_recipe("agents.agent") + task = baker.make("autotasks.AutomatedTask", agent=agent) + alerts = baker.make( + "alerts.Alert", + alert_type="task", + assigned_task=task, + agent=agent, + resolved=False, + _quantity=3, + ) - # make sure only 1 alert is not resolved - self.assertEqual( - Alert.objects.filter(alert_type="check", agent=agent, resolved=False).count(), 1 - ) + alert = Alert.create_or_return_task_alert(task, agent=agent, skip_create=True) + # make sure last alert is returned + self.assertEqual(alert, alerts[-1]) -def test_handling_multiple_task_alerts_returned(self): - agent = baker.make_recipe("agents.agent") - task = baker.make_recipe("autotasks.AutomatedTask", agent=agent) - alerts = baker.make( - "alerts.Alerts", - alert_type="check", - assigned_task=task, - agent=agent, - resolved=False, - _quantity=3, - ) - - alert = Alert.create_or_return_task_alert(task, skip_create=True) - - # make sure last alert is returned - self.assertEqual(alert, alerts[-1]) - - # make sure only 1 alert is not resolved - self.assertEqual( - Alert.objects.filter(alert_type="task", agent=agent, resolved=False).count(), 1 - ) + # make sure only 1 alert is not resolved + self.assertEqual( + Alert.objects.filter( + alert_type="task", agent=agent, resolved=False + ).count(), + 1, + ) diff --git a/api/tacticalrmm/apiv3/views.py b/api/tacticalrmm/apiv3/views.py index f82a650c..1f45240c 100644 --- a/api/tacticalrmm/apiv3/views.py +++ b/api/tacticalrmm/apiv3/views.py @@ -312,7 +312,7 @@ class TaskRunner(APIView): task.save(update_fields=["status"]) if status == "passing": - if Alert.create_or_return_task_alert(task, skip_create=True): + if Alert.create_or_return_task_alert(task, agent=agent, skip_create=True): Alert.handle_alert_resolve(task_result) else: Alert.handle_alert_failure(task_result) diff --git a/api/tacticalrmm/automation/models.py b/api/tacticalrmm/automation/models.py index 5778f7cb..f11b6953 100644 --- a/api/tacticalrmm/automation/models.py +++ b/api/tacticalrmm/automation/models.py @@ -249,7 +249,7 @@ class Policy(BaseAuditModel): cpuload_checks: "List[Check]" = list() memory_checks: "List[Check]" = list() - overridden_checks = list() + overridden_checks: List[int] = list() # Loop over checks in with enforced policies first, then non-enforced policies for check in enforced_checks + agent_checks + policy_checks: diff --git a/api/tacticalrmm/autotasks/models.py b/api/tacticalrmm/autotasks/models.py index 7a42cbce..06a7c24d 100644 --- a/api/tacticalrmm/autotasks/models.py +++ b/api/tacticalrmm/autotasks/models.py @@ -2,11 +2,10 @@ import asyncio import random import string import pytz -from typing import TYPE_CHECKING, List, Dict, Optional, Union +from typing import TYPE_CHECKING, List, Dict, Any, Optional, Union from alerts.models import SEVERITY_CHOICES from django.core.validators import MaxValueValidator, MinValueValidator -from django.contrib.postgres.fields import ArrayField from django.utils import timezone as djangotime from django.db import models from django.db.models.fields import DateTimeField @@ -17,7 +16,6 @@ from core.utils import get_core_settings if TYPE_CHECKING: from automation.models import Policy - from autotasks.models import AutomatedTask from alerts.models import Alert, AlertTemplate from agents.models import Agent from checks.models import Check @@ -146,12 +144,12 @@ class AutomatedTask(BaseAuditModel): managed_by_policy = models.BooleanField(default=False) # non-database property - task_result: "Union[TaskResult, Dict]" = {} + task_result: "Union[TaskResult, Dict[None, None]]" = {} - def __str__(self): + def __str__(self) -> str: return self.name - def save(self, *args, **kwargs): + def save(self, *args, **kwargs) -> None: # get old task if exists old_task = AutomatedTask.objects.get(pk=self.pk) if self.pk else None super(AutomatedTask, self).save(old_model=old_task, *args, **kwargs) @@ -170,7 +168,7 @@ class AutomatedTask(BaseAuditModel): ) @property - def schedule(self): + def schedule(self) -> Optional[str]: if self.task_type == "manual": return "Manual" elif self.task_type == "checkfailure": @@ -225,7 +223,7 @@ class AutomatedTask(BaseAuditModel): ] @staticmethod - def generate_task_name(): + def generate_task_name() -> str: chars = string.ascii_letters return "TacticalRMM_" + "".join(random.choice(chars) for i in range(35)) @@ -284,7 +282,7 @@ class AutomatedTask(BaseAuditModel): # agent version >= 1.8.0 def generate_nats_task_payload( self, agent: "Optional[Agent]" = None, editing: bool = False - ) -> Dict: + ) -> Dict[str, Any]: task = { "pk": self.pk, "type": "rmm", diff --git a/api/tacticalrmm/checks/models.py b/api/tacticalrmm/checks/models.py index 189300f9..d92cc168 100644 --- a/api/tacticalrmm/checks/models.py +++ b/api/tacticalrmm/checks/models.py @@ -13,7 +13,6 @@ from core.utils import get_core_settings if TYPE_CHECKING: from alerts.models import Alert, AlertTemplate from automation.models import Policy - from checks.models import CheckResult CHECK_TYPE_CHOICES = [ ("diskspace", "Disk Space Check"), diff --git a/api/tacticalrmm/core/migrations/0032_alter_coresettings_email_alert_recipients_and_more.py b/api/tacticalrmm/core/migrations/0032_alter_coresettings_email_alert_recipients_and_more.py new file mode 100644 index 00000000..fd9fb929 --- /dev/null +++ b/api/tacticalrmm/core/migrations/0032_alter_coresettings_email_alert_recipients_and_more.py @@ -0,0 +1,54 @@ +# Generated by Django 4.0.3 on 2022-04-08 03:16 + +import django.contrib.postgres.fields +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0031_coresettings_date_format'), + ] + + operations = [ + migrations.AlterField( + model_name='coresettings', + name='email_alert_recipients', + field=django.contrib.postgres.fields.ArrayField(base_field=models.EmailField(blank=True, max_length=254, null=True), blank=True, default=list, size=None), + ), + migrations.AlterField( + model_name='coresettings', + name='sms_alert_recipients', + field=django.contrib.postgres.fields.ArrayField(base_field=models.CharField(blank=True, max_length=255, null=True), blank=True, default=list, size=None), + ), + migrations.AlterField( + model_name='coresettings', + name='smtp_from_email', + field=models.CharField(blank=True, default='from@example.com', max_length=255), + ), + migrations.AlterField( + model_name='coresettings', + name='smtp_host', + field=models.CharField(blank=True, default='smtp.gmail.com', max_length=255), + ), + migrations.AlterField( + model_name='coresettings', + name='smtp_host_password', + field=models.CharField(blank=True, default='changeme', max_length=255), + ), + migrations.AlterField( + model_name='coresettings', + name='smtp_host_user', + field=models.CharField(blank=True, default='admin@example.com', max_length=255), + ), + migrations.AlterField( + model_name='coresettings', + name='smtp_port', + field=models.PositiveIntegerField(blank=True, default=587), + ), + migrations.AlterField( + model_name='customfield', + name='name', + field=models.CharField(max_length=30), + ), + ] diff --git a/api/tacticalrmm/core/models.py b/api/tacticalrmm/core/models.py index 70a048b0..ae3e532f 100644 --- a/api/tacticalrmm/core/models.py +++ b/api/tacticalrmm/core/models.py @@ -261,7 +261,7 @@ class CustomField(BaseAuditModel): blank=True, default=list, ) - name = models.CharField(max_length=30, blank=True) + name = models.CharField(max_length=30) required = models.BooleanField(blank=True, default=False) default_value_string = models.TextField(null=True, blank=True) default_value_bool = models.BooleanField(default=False) diff --git a/api/tacticalrmm/core/tests.py b/api/tacticalrmm/core/tests.py index 861ad107..ecb76aea 100644 --- a/api/tacticalrmm/core/tests.py +++ b/api/tacticalrmm/core/tests.py @@ -179,7 +179,6 @@ class TestCoreTasks(TacticalTestCase): serializer = CustomFieldSerializer(custom_fields, many=True) self.assertEqual(r.status_code, 200) self.assertEqual(len(r.data), 5) - self.assertEqual(r.data, serializer.data) self.check_not_authenticated("patch", url) diff --git a/api/tacticalrmm/logs/models.py b/api/tacticalrmm/logs/models.py index 3b72d7e3..ac18feea 100644 --- a/api/tacticalrmm/logs/models.py +++ b/api/tacticalrmm/logs/models.py @@ -7,9 +7,9 @@ from tacticalrmm.middleware import get_debug_info, get_username from tacticalrmm.models import PermissionQuerySet if TYPE_CHECKING: - from agents.models import Agent from clients.models import Client, Site from core.models import URLAction + from agents.models import Agent def get_debug_level() -> str: @@ -240,6 +240,7 @@ class AuditLog(models.Model): instance: "Union[Agent, Client, Site]", debug_info: Dict[Any, Any] = {}, ) -> None: + from agents.models import Agent name = instance.hostname if isinstance(instance, Agent) else instance.name classname = type(instance).__name__ diff --git a/api/tacticalrmm/tacticalrmm/models.py b/api/tacticalrmm/tacticalrmm/models.py index 4490295c..97401ca3 100644 --- a/api/tacticalrmm/tacticalrmm/models.py +++ b/api/tacticalrmm/tacticalrmm/models.py @@ -63,17 +63,9 @@ class PermissionQuerySet(models.QuerySet): custom_alert_queryset = models.Q() if can_view_clients: - clients_queryset = ( - models.Q(agent__site__client__in=can_view_clients) - | models.Q(assigned_check__agent__site__client__in=can_view_clients) - | models.Q(assigned_task__agent__site__client__in=can_view_clients) - ) + clients_queryset = models.Q(agent__site__client__in=can_view_clients) if can_view_sites: - sites_queryset = ( - models.Q(agent__site__in=can_view_sites) - | models.Q(assigned_check__agent__site__in=can_view_sites) - | models.Q(assigned_task__agent__site__in=can_view_sites) - ) + sites_queryset = models.Q(agent__site__in=can_view_sites) if can_view_clients or can_view_sites: custom_alert_queryset = models.Q( agent=None, assigned_check=None, assigned_task=None diff --git a/api/tacticalrmm/tacticalrmm/test.py b/api/tacticalrmm/tacticalrmm/test.py index 70b27fb3..52d1e1cd 100644 --- a/api/tacticalrmm/tacticalrmm/test.py +++ b/api/tacticalrmm/tacticalrmm/test.py @@ -16,6 +16,12 @@ if TYPE_CHECKING: from checks.models import Check from scripts.models import Script +TEST_CACHE = { + "default": { + "BACKEND": "django.core.cache.backends.dummy.DummyCache", + } +} + class TacticalTestCase(TestCase): client: APIClient @@ -38,7 +44,7 @@ class TacticalTestCase(TestCase): password=User.objects.make_random_password(60), # type: ignore ) - def setup_client(self): + def setup_client(self) -> None: self.client = APIClient() def setup_agent_auth(self, agent: "Agent") -> None: @@ -50,9 +56,10 @@ class TacticalTestCase(TestCase): # fixes tests waiting 2 minutes for mesh token to appear @override_settings( - MESH_TOKEN_KEY="41410834b8bb4481446027f87d88ec6f119eb9aa97860366440b778540c7399613f7cabfef4f1aa5c0bd9beae03757e17b2e990e5876b0d9924da59bdf24d3437b3ed1a8593b78d65a72a76c794160d9" + MESH_TOKEN_KEY="41410834b8bb4481446027f87d88ec6f119eb9aa97860366440b778540c7399613f7cabfef4f1aa5c0bd9beae03757e17b2e990e5876b0d9924da59bdf24d3437b3ed1a8593b78d65a72a76c794160d9", + CACHES=TEST_CACHE, ) - def setup_coresettings(self): + def setup_coresettings(self) -> None: self.coresettings = CoreSettings.objects.create() def check_not_authenticated(self, method: str, url: str) -> None: @@ -90,7 +97,7 @@ class TacticalTestCase(TestCase): return checks def check_not_authorized( - self, method: str, url: str, data: Optional[Dict] = {} + self, method: str, url: str, data: Optional[Dict[Any, Any]] = {} ) -> None: try: r = getattr(self.client, method)(url, data, format="json") @@ -98,7 +105,9 @@ class TacticalTestCase(TestCase): except KeyError: pass - def check_authorized(self, method: str, url: str, data: Optional[Dict] = {}) -> Any: + def check_authorized( + self, method: str, url: str, data: Optional[Dict[Any, Any]] = {} + ) -> Any: try: r = getattr(self.client, method)(url, data, format="json") self.assertNotEqual(r.status_code, 403) @@ -107,7 +116,7 @@ class TacticalTestCase(TestCase): pass def check_authorized_superuser( - self, method: str, url: str, data: Optional[Dict] = {} + self, method: str, url: str, data: Optional[Dict[Any, Any]] = {} ) -> Any: try: