Detect dependency cycles beyond a depth = 2 (#274)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Bernát Gábor <bgabor8@bloomberg.net>
This commit is contained in:
Kemal Zebari 2023-07-29 18:52:51 -07:00 committed by GitHub
parent 616618fd7d
commit 1ca7fe7472
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 28 deletions

View File

@ -5,6 +5,8 @@ from collections import defaultdict
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pipdeptree._models.package import Package
from ._cli import Options
from ._models import DistPackage, PackageDAG, ReqPackage
@ -59,36 +61,61 @@ def render_conflicts_text(conflicts: dict[DistPackage, list[ReqPackage]]) -> Non
print(f" - {req_str}", file=sys.stderr) # noqa: T201
def cyclic_deps(tree: PackageDAG) -> list[tuple[DistPackage, ReqPackage, ReqPackage]]:
def cyclic_deps(tree: PackageDAG) -> list[list[Package]]:
"""
Return cyclic dependencies as list of tuples.
Return cyclic dependencies as list of lists.
:param tree: package tree/dag
:returns: list of tuples representing cyclic dependencies
:returns: list of lists, where each list represents a cycle
"""
index = {p.key: {r.key for r in rs} for p, rs in tree.items()}
cyclic: list[tuple[DistPackage, ReqPackage, ReqPackage]] = []
for p, rs in tree.items():
for r in rs:
if p.key in index.get(r.key, []):
val = tree.get_node_as_parent(r.key)
if val is not None:
entry = tree.get(val)
if entry is not None:
p_as_dep_of_r = next(x for x in entry if x.key == p.key)
cyclic.append((p, r, p_as_dep_of_r))
return cyclic
def dfs(root: DistPackage, current: Package, visited: set[str], cdeps: list[Package]) -> bool:
if current.key not in visited:
visited.add(current.key)
current_dist = tree.get_node_as_parent(current.key)
if not current_dist:
return False
reqs = tree.get(current_dist)
if not reqs:
return False
for req in reqs:
if dfs(root, req, visited, cdeps):
cdeps.append(current)
return True
elif current.key == root.key:
cdeps.append(current)
return True
return False
cycles: list[list[Package]] = []
for p in tree:
cdeps: list[Package] = []
visited: set[str] = set()
if dfs(p, p, visited, cdeps):
cdeps.reverse()
cycles.append(cdeps)
return cycles
def render_cycles_text(cycles: list[tuple[DistPackage, ReqPackage, ReqPackage]]) -> None:
def render_cycles_text(cycles: list[list[Package]]) -> None:
if cycles:
print("Warning!! Cyclic dependencies found:", file=sys.stderr) # noqa: T201
# List in alphabetical order of the dependency that's cycling
# (2nd item in the tuple)
cycles = sorted(cycles, key=lambda xs: xs[1].key)
for a, b, c in cycles:
print(f"* {a.project_name} => {b.project_name} => {c.project_name}", file=sys.stderr) # noqa: T201
# List in alphabetical order the dependency that caused the cycle (i.e. the second-to-last Package element)
cycles = sorted(cycles, key=lambda c: c[len(c) - 2].key)
for cycle in cycles:
print("*", end=" ", file=sys.stderr) # noqa: T201
size = len(cycle) - 1
for idx, pkg in enumerate(cycle):
if idx == size:
print(f"{pkg.project_name}", end="", file=sys.stderr) # noqa: T201
else:
print(f"{pkg.project_name} =>", end=" ", file=sys.stderr) # noqa: T201
print(file=sys.stderr) # noqa: T201
__all__ = [

View File

@ -16,35 +16,67 @@ if TYPE_CHECKING:
@pytest.mark.parametrize(
("mpkgs", "expected_keys", "expected_output"),
[
(
pytest.param(
{
("a", "1.0.1"): [("b", [(">=", "2.0.0")])],
("b", "2.3.0"): [("a", [(">=", "1.0.1")])],
("c", "4.5.0"): [("d", [("==", "2.0")])],
("d", "2.0"): [],
},
[("a", "b", "a"), ("b", "a", "b")],
[["a", "b", "a"], ["b", "a", "b"]],
["Warning!! Cyclic dependencies found:", "* b => a => b", "* a => b => a"],
id="depth-of-2",
),
( # if a dependency isn't installed, cannot verify cycles
pytest.param(
{
("a", "1.0.1"): [("b", [(">=", "2.0.0")]), ("e", [("==", "2.0")])],
("b", "2.3.0"): [("c", [(">=", "4.5.0")])],
("c", "4.5.0"): [("d", [("==", "1.0.1")])],
("d", "1.0.0"): [("a", [("==", "1.0.1")]), ("e", [("==", "2.0")])],
("e", "2.0"): [],
},
[
["b", "c", "d", "a", "b"],
["c", "d", "a", "b", "c"],
["d", "a", "b", "c", "d"],
["a", "b", "c", "d", "a"],
],
[
"Warning!! Cyclic dependencies found:",
"* b => c => d => a => b",
"* c => d => a => b => c",
"* d => a => b => c => d",
"* a => b => c => d => a",
],
id="depth-greater-than-2",
),
pytest.param(
{("a", "1.0.1"): [("b", [(">=", "2.0.0")])], ("b", "2.0.0"): []},
[],
[],
id="no-cycle",
),
pytest.param(
{
("a", "1.0.1"): [("b", [(">=", "2.0.0")])],
},
[],
[], # no output expected
[],
id="dependency-not-installed",
),
pytest.param({("a", "1.0.1"): []}, [], [], id="no-dependencies"),
],
)
def test_cyclic_deps(
capsys: pytest.CaptureFixture[str],
mpkgs: dict[tuple[str, str], list[tuple[str, list[tuple[str, str]]]]],
expected_keys: list[tuple[str, ...]],
expected_keys: list[list[str]],
expected_output: list[str],
mock_pkgs: Callable[[MockGraph], Iterator[Mock]],
) -> None:
tree = PackageDAG.from_pkgs(list(mock_pkgs(mpkgs)))
result = cyclic_deps(tree)
result_keys = [(a.key, b.key, c.key) for (a, b, c) in result]
result_keys = [[dep.key for dep in deps] for deps in result]
assert sorted(expected_keys) == sorted(result_keys)
render_cycles_text(result)
captured = capsys.readouterr()