Skip to content

analysis

code_context_agent.tools.graph.analysis

Graph analysis algorithms for code understanding.

This module provides the CodeAnalyzer class with methods for: - Centrality analysis (hotspots, foundations, entry points) - Clustering (community detection, pattern-based grouping) - Proximity/similarity analysis

CodeAnalyzer

CodeAnalyzer(graph)

Analyzer for code graphs using NetworkX algorithms.

Provides methods for finding important code (centrality), detecting logical modules (clustering), and analyzing relationships between code elements.

Initialize the analyzer with a code graph.

Parameters:

Name Type Description Default
graph CodeGraph

The CodeGraph to analyze

required
Source code in src/code_context_agent/tools/graph/analysis.py
def __init__(self, graph: CodeGraph) -> None:
    """Initialize the analyzer with a code graph.

    Args:
        graph: The CodeGraph to analyze
    """
    self.graph = graph

find_hotspots

find_hotspots(top_k=10)

Find code hotspots using betweenness centrality.

Hotspots are code elements that lie on many shortest paths between other elements - they are often bottlenecks or central integration points.

Parameters:

Name Type Description Default
top_k int

Number of top hotspots to return

10

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries with node info and betweenness score

Source code in src/code_context_agent/tools/graph/analysis.py
def find_hotspots(self, top_k: int = 10) -> list[dict[str, Any]]:
    """Find code hotspots using betweenness centrality.

    Hotspots are code elements that lie on many shortest paths
    between other elements - they are often bottlenecks or
    central integration points.

    Args:
        top_k: Number of top hotspots to return

    Returns:
        List of dictionaries with node info and betweenness score
    """
    view = self.graph.get_view([EdgeType.CALLS, EdgeType.REFERENCES])

    if view.number_of_nodes() == 0:
        return []

    try:
        betweenness = nx.betweenness_centrality(view, weight="weight")
    except nx.NetworkXError:
        return []

    return self._format_ranked_results(betweenness, top_k)

find_foundations

find_foundations(top_k=10)

Find foundational code using PageRank.

Foundations are code elements that are heavily depended upon by other important code - the core infrastructure.

Parameters:

Name Type Description Default
top_k int

Number of top foundations to return

10

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries with node info and PageRank score

Source code in src/code_context_agent/tools/graph/analysis.py
def find_foundations(self, top_k: int = 10) -> list[dict[str, Any]]:
    """Find foundational code using PageRank.

    Foundations are code elements that are heavily depended upon
    by other important code - the core infrastructure.

    Args:
        top_k: Number of top foundations to return

    Returns:
        List of dictionaries with node info and PageRank score
    """
    view = self.graph.get_view([EdgeType.CALLS, EdgeType.IMPORTS])

    if view.number_of_nodes() == 0:
        return []

    try:
        pagerank = nx.pagerank(view, alpha=0.85, weight="weight")
    except nx.NetworkXError:
        return []

    return self._format_ranked_results(pagerank, top_k)

find_trusted_foundations

find_trusted_foundations(seed_nodes=None, top_k=10)

Find foundational code using TrustRank (noise-resistant PageRank).

TrustRank propagates trust from seed nodes, making it more resistant to noise than standard PageRank. If no seed nodes provided, uses entry points as seeds.

Parameters:

Name Type Description Default
seed_nodes list[str] | None

List of trusted node IDs (defaults to entry points)

None
top_k int

Number of top results to return

10

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries with node info and trust score

Source code in src/code_context_agent/tools/graph/analysis.py
def find_trusted_foundations(
    self,
    seed_nodes: list[str] | None = None,
    top_k: int = 10,
) -> list[dict[str, Any]]:
    """Find foundational code using TrustRank (noise-resistant PageRank).

    TrustRank propagates trust from seed nodes, making it more resistant
    to noise than standard PageRank. If no seed nodes provided, uses
    entry points as seeds.

    Args:
        seed_nodes: List of trusted node IDs (defaults to entry points)
        top_k: Number of top results to return

    Returns:
        List of dictionaries with node info and trust score
    """
    view = self.graph.get_view([EdgeType.CALLS, EdgeType.IMPORTS])

    if view.number_of_nodes() == 0:
        return []

    # Use entry points as default seeds
    if not seed_nodes:
        entry_points = self.find_entry_points()
        seed_nodes = [ep["id"] for ep in entry_points[:5]]

    if not seed_nodes:
        return self.find_foundations(top_k)

    # Build personalization dict for TrustRank
    trust = dict.fromkeys(view.nodes(), 0.0)
    for seed in seed_nodes:
        if seed in trust:
            trust[seed] = 1.0 / len(seed_nodes)

    try:
        scores = nx.pagerank(view, alpha=0.85, personalization=trust, weight="weight")
    except nx.NetworkXError:
        return []

    return self._format_ranked_results(scores, top_k)

find_entry_points

find_entry_points()

Find likely entry points in the code.

Entry points are nodes with no incoming call edges but outgoing calls - they initiate execution flow.

Returns:

Type Description
list[dict[str, Any]]

List of dictionaries with entry point node info

Source code in src/code_context_agent/tools/graph/analysis.py
def find_entry_points(self) -> list[dict[str, Any]]:
    """Find likely entry points in the code.

    Entry points are nodes with no incoming call edges but
    outgoing calls - they initiate execution flow.

    Returns:
        List of dictionaries with entry point node info
    """
    view = self.graph.get_view([EdgeType.CALLS])

    entry_points = []
    for node in view.nodes():
        in_deg = view.in_degree(node)
        out_deg = view.out_degree(node)

        # Entry point: no callers but makes calls
        if in_deg == 0 and out_deg > 0:
            node_data = self.graph.get_node_data(node)
            entry_points.append(
                {
                    "id": node,
                    "out_degree": out_deg,
                    **(node_data or {}),
                },
            )

    # Also check for main/run/start patterns
    for node, data in self.graph.nodes(data=True):
        name = str(data.get("name", "")).lower()
        if any(p in name for p in ("main", "__main__", "run", "start", "app", "cli")):
            if not any(ep["id"] == node for ep in entry_points):
                entry_points.append(
                    {
                        "id": node,
                        "out_degree": view.out_degree(node) if view.has_node(node) else 0,
                        **data,
                    },
                )

    # Sort by out_degree (more calls = more significant entry point)
    entry_points.sort(key=lambda x: x.get("out_degree", 0), reverse=True)

    return entry_points

detect_modules

detect_modules(resolution=1.0)

Detect logical modules using Louvain community detection.

Uses the Louvain algorithm to find communities of densely connected code elements.

Parameters:

Name Type Description Default
resolution float

Clustering resolution (< 1 = larger clusters, > 1 = smaller)

1.0

Returns:

Type Description
list[dict[str, Any]]

List of module dictionaries with members and metrics

Source code in src/code_context_agent/tools/graph/analysis.py
def detect_modules(self, resolution: float = 1.0) -> list[dict[str, Any]]:
    """Detect logical modules using Louvain community detection.

    Uses the Louvain algorithm to find communities of densely
    connected code elements.

    Args:
        resolution: Clustering resolution (< 1 = larger clusters, > 1 = smaller)

    Returns:
        List of module dictionaries with members and metrics
    """
    view = self.graph.get_view([EdgeType.CALLS, EdgeType.IMPORTS])

    if view.number_of_nodes() < 2:
        return []

    # Louvain requires undirected graph
    undirected = view.to_undirected()

    try:
        # Try Leiden first (better community quality, requires backend)
        communities = nx.community.leiden_communities(undirected, resolution=resolution, seed=42)
    except (NotImplementedError, nx.NetworkXError, ValueError, RuntimeError):
        try:
            # Fallback to Louvain (pure NetworkX)
            communities = nx.community.louvain_communities(undirected, resolution=resolution, seed=42)
        except (nx.NetworkXError, ValueError, RuntimeError):
            return []

    modules = []
    for i, community in enumerate(communities):
        community_list = list(community)

        # Get key nodes (highest PageRank within community)
        subgraph = view.subgraph(community_list)
        if subgraph.number_of_nodes() > 0:
            try:
                local_pr = nx.pagerank(subgraph)
                key_nodes = sorted(local_pr.items(), key=lambda x: x[1], reverse=True)[:3]
            except (nx.NetworkXError, ValueError, RuntimeError):
                key_nodes = [(n, 0) for n in community_list[:3]]
        else:
            key_nodes = []

        # Calculate cohesion (internal/external edge ratio)
        cohesion = self._calculate_cohesion(view, community)

        modules.append(
            {
                "module_id": i,
                "size": len(community_list),
                "key_nodes": [{"id": n, "score": s} for n, s in key_nodes],
                "members": community_list,
                "cohesion": cohesion,
            },
        )

    # Sort by size (largest modules first)
    modules.sort(key=lambda x: x["size"], reverse=True)

    return modules

find_clusters_by_pattern

find_clusters_by_pattern(rule_id)

Find clusters of nodes matching a specific AST-grep rule.

Groups nodes by their rule_id metadata to find related business logic patterns.

Parameters:

Name Type Description Default
rule_id str

The rule identifier to filter by

required

Returns:

Type Description
list[dict[str, Any]]

List of matching nodes grouped by file

Source code in src/code_context_agent/tools/graph/analysis.py
def find_clusters_by_pattern(self, rule_id: str) -> list[dict[str, Any]]:
    """Find clusters of nodes matching a specific AST-grep rule.

    Groups nodes by their rule_id metadata to find related
    business logic patterns.

    Args:
        rule_id: The rule identifier to filter by

    Returns:
        List of matching nodes grouped by file
    """
    matching_nodes: dict[str, list[dict[str, Any]]] = {}

    for node_id, data in self.graph.nodes(data=True):
        if data.get("rule_id") == rule_id:
            file_path = data.get("file_path", "unknown")
            if file_path not in matching_nodes:
                matching_nodes[file_path] = []
            matching_nodes[file_path].append({"id": node_id, **data})

    return [{"file": f, "matches": m, "count": len(m)} for f, m in matching_nodes.items()]

find_clusters_by_category

find_clusters_by_category(category)

Find all nodes matching a business logic category.

Parameters:

Name Type Description Default
category str

Category to filter by (e.g., "db", "auth", "http")

required

Returns:

Type Description
list[dict[str, Any]]

List of matching nodes with their locations

Source code in src/code_context_agent/tools/graph/analysis.py
def find_clusters_by_category(self, category: str) -> list[dict[str, Any]]:
    """Find all nodes matching a business logic category.

    Args:
        category: Category to filter by (e.g., "db", "auth", "http")

    Returns:
        List of matching nodes with their locations
    """
    matches = []

    for node_id, data in self.graph.nodes(data=True):
        if data.get("category") == category:
            matches.append({"id": node_id, **data})

    return matches

find_triangles

find_triangles(top_k=10)

Find tightly-coupled code triads using triangle detection.

Triangles in the call/import graph indicate three pieces of code that all depend on each other — potential cohesion or coupling issues.

Parameters:

Name Type Description Default
top_k int

Maximum number of triangles to return

10

Returns:

Type Description
list[dict[str, Any]]

List of triangle dictionaries with the three node IDs

Source code in src/code_context_agent/tools/graph/analysis.py
def find_triangles(self, top_k: int = 10) -> list[dict[str, Any]]:
    """Find tightly-coupled code triads using triangle detection.

    Triangles in the call/import graph indicate three pieces of code
    that all depend on each other — potential cohesion or coupling issues.

    Args:
        top_k: Maximum number of triangles to return

    Returns:
        List of triangle dictionaries with the three node IDs
    """
    view = self.graph.get_view([EdgeType.CALLS, EdgeType.IMPORTS])
    undirected = view.to_undirected()

    triangles = []
    try:
        for triangle in nx.enumerate_all_cliques(undirected):
            if len(triangle) == 3:
                triangles.append(
                    {
                        "nodes": list(triangle),
                        "node_details": [{"id": n, **(self.graph.get_node_data(n) or {})} for n in triangle],
                    },
                )
                if len(triangles) >= top_k:
                    break
    except nx.NetworkXError:
        pass  # graph structure doesn't support triangle detection (e.g. directed)

    return triangles

get_similar_nodes

get_similar_nodes(node_id, top_k=5)

Find nodes similar to a given node based on graph structure.

Uses personalized PageRank to find nodes closely related to the target node.

Parameters:

Name Type Description Default
node_id str

The node to find similar nodes for

required
top_k int

Number of similar nodes to return

5

Returns:

Type Description
list[dict[str, Any]]

List of similar nodes with similarity scores

Source code in src/code_context_agent/tools/graph/analysis.py
def get_similar_nodes(self, node_id: str, top_k: int = 5) -> list[dict[str, Any]]:
    """Find nodes similar to a given node based on graph structure.

    Uses personalized PageRank to find nodes closely related
    to the target node.

    Args:
        node_id: The node to find similar nodes for
        top_k: Number of similar nodes to return

    Returns:
        List of similar nodes with similarity scores
    """
    view = self.graph.get_view()

    if not view.has_node(node_id):
        return []

    try:
        # Personalized PageRank with target node as seed
        ppr = nx.pagerank(view, personalization={node_id: 1}, alpha=0.85)
    except nx.NetworkXError:
        return []

    # Remove self, sort by score
    del ppr[node_id]
    ranked = sorted(ppr.items(), key=lambda x: x[1], reverse=True)[:top_k]

    return [{"id": n, "similarity": s, **(self.graph.get_node_data(n) or {})} for n, s in ranked if s > 0]

calculate_coupling

calculate_coupling(node_a, node_b)

Calculate coupling strength between two nodes.

Considers shared neighbors, direct edges, and path length.

Parameters:

Name Type Description Default
node_a str

First node ID

required
node_b str

Second node ID

required

Returns:

Type Description
dict[str, Any]

Dictionary with coupling metrics

Source code in src/code_context_agent/tools/graph/analysis.py
def calculate_coupling(self, node_a: str, node_b: str) -> dict[str, Any]:
    """Calculate coupling strength between two nodes.

    Considers shared neighbors, direct edges, and path length.

    Args:
        node_a: First node ID
        node_b: Second node ID

    Returns:
        Dictionary with coupling metrics
    """
    view = self.graph.get_view()

    if not view.has_node(node_a) or not view.has_node(node_b):
        return {"error": "Node not found", "coupling": 0.0}

    # Direct edge count
    direct_edges = 0
    if view.has_edge(node_a, node_b):
        direct_edges += 1
    if view.has_edge(node_b, node_a):
        direct_edges += 1

    # Shared neighbors
    neighbors_a = set(view.successors(node_a)) | set(view.predecessors(node_a))
    neighbors_b = set(view.successors(node_b)) | set(view.predecessors(node_b))
    shared = neighbors_a & neighbors_b

    # Shortest path length
    try:
        path_length = nx.shortest_path_length(view.to_undirected(), node_a, node_b)
    except nx.NetworkXNoPath:
        path_length = float("inf")

    # Calculate coupling score (higher = more coupled)
    coupling = direct_edges * 2.0 + len(shared) * 0.5 + (1.0 / (path_length + 1))

    return {
        "node_a": node_a,
        "node_b": node_b,
        "direct_edges": direct_edges,
        "shared_neighbors": len(shared),
        "path_length": path_length if path_length != float("inf") else None,
        "coupling": coupling,
    }

get_dependency_chain

get_dependency_chain(
    node_id, direction="outgoing", max_depth=5
)

Get the dependency chain from/to a node.

Parameters:

Name Type Description Default
node_id str

Starting node

required
direction str

"outgoing" (what this depends on) or "incoming" (what depends on this)

'outgoing'
max_depth int

Maximum depth to traverse

5

Returns:

Type Description
dict[str, Any]

Dictionary with nodes and edges in the chain

Source code in src/code_context_agent/tools/graph/analysis.py
def get_dependency_chain(self, node_id: str, direction: str = "outgoing", max_depth: int = 5) -> dict[str, Any]:
    """Get the dependency chain from/to a node.

    Args:
        node_id: Starting node
        direction: "outgoing" (what this depends on) or "incoming" (what depends on this)
        max_depth: Maximum depth to traverse

    Returns:
        Dictionary with nodes and edges in the chain
    """
    view = self.graph.get_view([EdgeType.CALLS, EdgeType.IMPORTS])

    if not view.has_node(node_id):
        return {"error": "Node not found"}

    if direction == "outgoing":
        nodes = dict(nx.single_source_shortest_path_length(view, node_id, cutoff=max_depth))
    else:
        # Incoming: traverse reverse graph
        reverse = view.reverse()
        nodes = dict(nx.single_source_shortest_path_length(reverse, node_id, cutoff=max_depth))

    # Get edges within the discovered nodes
    subgraph = view.subgraph(nodes.keys())
    edges = list(subgraph.edges(data=True))

    return {
        "root": node_id,
        "direction": direction,
        "depth": max_depth,
        "nodes": [{"id": n, "distance": d, **(self.graph.get_node_data(n) or {})} for n, d in nodes.items()],
        "edges": [{"source": u, "target": v, **d} for u, v, d in edges],
    }

find_unused_symbols

find_unused_symbols(node_types=None, exclude_patterns=None)

Find symbols with zero incoming cross-file references.

Identifies functions, classes, and methods that are defined but never referenced from other files — dead code candidates.

Parameters:

Name Type Description Default
node_types list[str] | None

Filter to specific types (default: function, class, method)

None
exclude_patterns list[str] | None

Regex patterns to exclude from results

None

Returns:

Type Description
list[dict[str, Any]]

List of unused symbol dicts with id, name, file_path, node_type

Source code in src/code_context_agent/tools/graph/analysis.py
def find_unused_symbols(
    self,
    node_types: list[str] | None = None,
    exclude_patterns: list[str] | None = None,
) -> list[dict[str, Any]]:
    """Find symbols with zero incoming cross-file references.

    Identifies functions, classes, and methods that are defined but
    never referenced from other files — dead code candidates.

    Args:
        node_types: Filter to specific types (default: function, class, method)
        exclude_patterns: Regex patterns to exclude from results

    Returns:
        List of unused symbol dicts with id, name, file_path, node_type
    """
    target_types = (
        set(node_types)
        if node_types
        else {
            NodeType.FUNCTION.value,
            NodeType.CLASS.value,
            NodeType.METHOD.value,
        }
    )
    default_excludes = [r"^test_", r"^_", r"__init__", r"__main__"]
    excludes = [re.compile(p) for p in (exclude_patterns or default_excludes)]

    view = self.graph.get_view([EdgeType.REFERENCES, EdgeType.CALLS, EdgeType.IMPORTS])

    unused = []
    for node_id, data in self.graph.nodes(data=True):
        if data.get("node_type") not in target_types:
            continue

        name = str(data.get("name", ""))
        if any(pat.search(name) for pat in excludes):
            continue

        node_file = data.get("file_path", "")
        if not node_file:
            continue

        # Count incoming edges from OTHER files
        cross_file_refs = 0
        if view.has_node(node_id):
            for pred in view.predecessors(node_id):
                pred_data = self.graph.get_node_data(pred)
                pred_file = (pred_data or {}).get("file_path", "")
                if pred_file and pred_file != node_file:
                    cross_file_refs += 1
                    break  # One is enough to disqualify

        if cross_file_refs == 0:
            unused.append(
                {
                    "id": node_id,
                    "name": name,
                    "file_path": node_file,
                    "node_type": data.get("node_type"),
                    "line_start": data.get("line_start", 0),
                },
            )

    unused.sort(key=lambda x: (x["file_path"], x.get("line_start", 0)))
    return unused

find_refactoring_candidates

find_refactoring_candidates(top_k=10)

Identify refactoring opportunities by combining multiple signals.

Combines: - Clone pairs (SIMILAR_TO edges) -> "extract shared helper" - Code smell pattern matches (rule_id contains "code_smell") -> structural issues - Unused symbols -> "dead code removal"

Parameters:

Name Type Description Default
top_k int

Maximum number of candidates to return

10

Returns:

Type Description
list[dict[str, Any]]

Ranked list of refactoring candidates with type, files, and rationale.

Source code in src/code_context_agent/tools/graph/analysis.py
def find_refactoring_candidates(self, top_k: int = 10) -> list[dict[str, Any]]:  # noqa: C901
    """Identify refactoring opportunities by combining multiple signals.

    Combines:
    - Clone pairs (SIMILAR_TO edges) -> "extract shared helper"
    - Code smell pattern matches (rule_id contains "code_smell") -> structural issues
    - Unused symbols -> "dead code removal"

    Args:
        top_k: Maximum number of candidates to return

    Returns:
        Ranked list of refactoring candidates with type, files, and rationale.
    """
    candidates: list[dict[str, Any]] = []

    # 1. Clone groups from SIMILAR_TO edges
    similar_edges = self.graph.get_edges_by_type(EdgeType.SIMILAR_TO)
    clone_groups: dict[str, list[str]] = {}
    for source, target, data in similar_edges:
        key = f"{source}|{target}" if source < target else f"{target}|{source}"
        if key not in clone_groups:
            clone_groups[key] = [source, target]
            candidates.append(
                {
                    "type": "extract_helper",
                    "pattern": f"Duplicate code between {source} and {target}",
                    "files": [source, target],
                    "occurrence_count": 2,
                    "duplicated_lines": int(data.get("duplicated_lines", 0)),
                    "score": int(data.get("duplicated_lines", 5)) * 2.0,
                },
            )

    # 2. Code smell patterns
    smell_counts: dict[str, list[str]] = {}
    for node_id, data in self.graph.nodes(data=True):
        rule_id = data.get("rule_id", "")
        note = data.get("note", "")
        if "code_smell" in note or "code_smell" in rule_id:
            if rule_id not in smell_counts:
                smell_counts[rule_id] = []
            smell_counts[rule_id].append(data.get("file_path", node_id))

    for rule_id, files in smell_counts.items():
        candidates.append(
            {
                "type": "code_smell",
                "pattern": rule_id,
                "files": list(set(files)),
                "occurrence_count": len(files),
                "duplicated_lines": 0,
                "score": len(files) * 1.5,
            },
        )

    # 3. Unused symbols
    unused = self.find_unused_symbols()
    if unused:
        # Group by file
        by_file: dict[str, list[str]] = {}
        for sym in unused:
            fp = sym["file_path"]
            if fp not in by_file:
                by_file[fp] = []
            by_file[fp].append(sym["name"])

        for fp, names in by_file.items():
            candidates.append(
                {
                    "type": "dead_code",
                    "pattern": f"{len(names)} unused symbol(s) in {fp}",
                    "files": [fp],
                    "occurrence_count": len(names),
                    "duplicated_lines": 0,
                    "score": len(names) * 1.0,
                },
            )

    # Sort by score descending, return top_k
    candidates.sort(key=lambda x: x["score"], reverse=True)
    return candidates[:top_k]