Source code for pydhn.utilities.matrices

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute, EPFL
#
# SPDX-FileContributor: Roberto Boghetti <roberto.boghetti@idiap.ch>
#
# SPDX-License-Identifier: AGPL-3.0-only


"""List of utilities for computing network matrices"""

import networkx as nx
import numpy as np


def _paths_to_matrix(G, paths):
    """
    Converts a list of paths in the form of list of nodes into an adjacency
    matrix.
    """
    idxs = dict([x[::-1] for x in list(enumerate(G.edges()))])

    matrix = np.array(np.zeros((len(paths), len(G.edges()))))

    for i, path in enumerate(paths):
        for j in range(len(path)):
            start = paths[i][j - 1]
            end = paths[i][j]
            if G.has_edge(start, end):
                idx = idxs[(start, end)]
                matrix[i, idx] = 1
            elif G.has_edge(end, start):
                idx = idxs[(end, start)]
                matrix[i, idx] = -1
            else:
                msg = f"Attempted to add edge {(start, end)} not in graph"
                raise ValueError(msg)
    return matrix


[docs] def compute_consumers_cycle_matrix(net): """ Computes the edge-loop incidence matrix, or cycle matrix, of the network containing loops from the producer to each consumer. """ producers = list(net.producers_pointer.edges()) main_prod_return, main_prod_supply = producers[0] paths = [] for u, v in net.consumers_pointer.edges(): sup_path = nx.shortest_path(net.supply_line_pointer, main_prod_supply, u) ret_path = nx.shortest_path(net.return_line_pointer, v, main_prod_return) paths.append(sup_path + ret_path) for u, v in producers: if not (u, v) == (main_prod_return, main_prod_supply): sup_path = nx.shortest_path( net.supply_line_pointer.to_undirected(), main_prod_supply, v ) ret_path = nx.shortest_path( net.return_line_pointer.to_undirected(), u, main_prod_return ) paths.append(sup_path + ret_path) G = net._graph return _paths_to_matrix(G=G, paths=paths)
def _compute_cycle_matrix_nx(G): """ Computes the edge-loop incidence matrix, or cycle matrix, of a cycle basis of graph G. """ cycles = list(nx.cycle_basis(G.to_undirected())) return _paths_to_matrix(G=G, paths=cycles)
[docs] def compute_imposed_mdot_matrix(net): """ Computes the edge-loop incidence matrix, or cycle matrix, of the network containing loops from the producer to each consumer. """ edges = net.edges() fixed_mdot_edges = edges[net.mass_flow_setpoints_mask] main_prod_return, main_prod_supply = edges[net.pressure_setpoints_mask][0] paths = [] for u, v in fixed_mdot_edges: ret_path = nx.shortest_path( net.return_line_pointer.to_undirected(), main_prod_return, u ) sup_path = nx.shortest_path( net.supply_line_pointer.to_undirected(), v, main_prod_supply ) paths.append(sup_path + ret_path) G = net._graph return _paths_to_matrix(G=G, paths=paths)
[docs] def compute_network_cycle_matrix(net): """ Computes the edge-loop incidence matrix, or cycle matrix, of the network graph such that: - There is a single cycle passing from each consumer, which includes only pipes and the main consumer - There is a single cycle passing from each secondary producer, which includes only pipes and the main consumer This allows for a better flexibility on how to impose setpoints to the simulations. """ # Find spanning tree starting from main edge # Split the two lines and find the direction of the main "out of line" edge G = net._graph S = net.branch_components_pointer main_idx = net.main_edge_mask[0] # TODO:; assert is not empty G0, G1 = tuple(S.subgraph(c) for c in nx.connected_components(S.to_undirected())) main_start, main_end = np.asarray(G.edges())[main_idx] if main_start in G1.nodes(): G0, G1 = G1, G0 # Convert G0 and G1 to trees, starting from the main edge tree_0 = nx.bfs_tree(G0.to_undirected(), source=main_start) tree_0 = tree_0.to_undirected() tree_1 = nx.bfs_tree(G1.to_undirected(), source=main_end) tree_1 = tree_1.to_undirected() # Get masks leaves = net.leaf_components_mask main_idx = net.main_edge_mask[0] secondary = np.setdiff1d(leaves, main_idx) # Find cycles cycles = [] # Add all other "out of line" edges edges = np.array(G.edges()) for u, v in edges[secondary]: if u in tree_0.nodes: path_0 = nx.shortest_path(tree_0, u, main_start) path_1 = nx.shortest_path(tree_1, main_end, v) else: path_0 = nx.shortest_path(tree_0, v, main_start) path_1 = nx.shortest_path(tree_1, main_end, u) cycle = path_0 + path_1 # cycle = nx.shortest_path(S, main_start, main_end) cycles.append(cycle) # Add internal loops for e in G0.edges(): if e not in tree_0.edges(): G_copy = tree_0.copy() G_copy.add_edge(*e) cycle_edges = nx.find_cycle(G_copy) cycles.append([u for u, v in cycle_edges]) for e in G1.edges(): if e not in tree_1.edges(): G_copy = tree_1.copy() G_copy.add_edge(*e) cycle_edges = nx.find_cycle(G_copy) cycles.append([u for u, v in cycle_edges]) return _paths_to_matrix(G=G, paths=cycles)
[docs] def compute_cycle_matrix(net, method="nx"): """ Computes the edge-loop incidence matrix, or cycle matrix, of a cycle basis of the Network net. """ if method == "net_spanning_tree": return compute_network_cycle_matrix(net) else: return _compute_cycle_matrix_nx(net._graph)
[docs] def compute_adjacency_matrix(net) -> np.ndarray: nodelist = net._graph.nodes() return nx.adjacency_matrix(net._graph, nodelist=nodelist).toarray()
[docs] def compute_incidence_matrix(net, oriented: bool = True) -> np.ndarray: nodelist = net._graph.nodes() edgelist = net._graph.edges() return nx.incidence_matrix( net._graph, nodelist=nodelist, edgelist=edgelist, oriented=oriented ).toarray()