Source code for pydhn.components.base_components_thermal

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute, EPFL
#
# SPDX-FileContributor: Roberto Boghetti <roberto.boghetti@idiap.ch>
#
# SPDX-License-Identifier: AGPL-3.0-only


"""Functions to compute average and outlet temperature for base components"""

from copy import deepcopy
from warnings import warn

import numpy as np

from pydhn.default_values import CP_FLUID
from pydhn.default_values import DEPTH
from pydhn.default_values import DISCRETIZATION
from pydhn.default_values import K_CASING
from pydhn.default_values import K_FLUID
from pydhn.default_values import K_INTERNAL_PIPE
from pydhn.default_values import K_SOIL
from pydhn.default_values import MU_FLUID
from pydhn.default_values import RHO_FLUID
from pydhn.default_values import T_SOIL
from pydhn.fluids.dimensionless_numbers import compute_nusselt
from pydhn.utilities import isiter
from pydhn.utilities import np_cache
from pydhn.utilities import safe_divide

"""
Functions are divided by element type. For each element, the upper section
contains generic functions that work with Numpy arrays. Functions that need a
Network object and a Fluid object are instead in the second section.
"""


# --------                         Base pipe                         -------- #


@np_cache(maxsize=256)
def _compute_convection_resistance(
    diameter,
    reynolds,
    length,
    friction_factor,
    cp_fluid=CP_FLUID,
    mu_fluid=MU_FLUID,
    k_fluid=K_FLUID,
):
    """
    Computes the thermal resistance due to internal convection.
    """
    nu = compute_nusselt(
        reynolds=reynolds,
        diameter=diameter,
        length=length,
        friction_factor=friction_factor,
        cp_fluid=cp_fluid,
        mu_fluid=mu_fluid,
        k_fluid=k_fluid,
    )
    return safe_divide(k_fluid * nu, diameter)


def _compute_layer_resistance(r_1, r_2, k):
    """
    Computes the thermal resistance of a single layer in a circular pipe.

    Parameters
    ----------
    r_1 : array, float
        Radius [m] of the innermost surface of the layer.
    r_2 : array, float
        Radius [m] of the outermost surface of the layer..
    k : array, float
        Thermal conductivity [W/(K·m)] of the layer material.

    Returns
    -------
    array, float
        Thermal resistance [(K·m)/W] of the layer.

    """
    if np.any(r_1 > r_2):
        raise ValueError(
            "Error: the inner radius should be greater than the \
                         outer radius!"
        )
    num = np.log(safe_divide(r_2, r_1))
    denom = 2 * k * np.pi
    return safe_divide(num, denom)


def _compute_soil_resistance(depth, rad_ext, k_soil=K_SOIL):
    x = safe_divide(depth, rad_ext)
    # depth > 3r
    res_soil_1 = safe_divide(np.log(2 * x), 2 * 3.14 * k_soil)
    # depth <= 3r
    res_soil_2 = safe_divide(np.log(x + np.power(x * x - 1, 0.5)), 2 * 3.14 * k_soil)
    res_soil = np.where(depth > 3 * rad_ext, res_soil_1, res_soil_2)
    return res_soil


def _compute_pipe_outlet_temp(
    t_in,
    mdot,
    length,
    diameter,
    k_insulation,
    thickness_ins,
    reynolds,
    friction_factor,
    delta_p_friction,
    k_internal_pipe=K_INTERNAL_PIPE,
    thickness_internal_pipe=0.0,
    k_casing=K_CASING,
    thickness_casing=0.0,
    t_soil=T_SOIL,
    depth=DEPTH,
    k_soil=K_SOIL,
    cp_fluid=CP_FLUID,
    mu_fluid=MU_FLUID,
    k_fluid=K_FLUID,
    rho_fluid=RHO_FLUID,
):
    """Computes the outlet temperature of a pipe from the inlet temperature"""
    # Mass flow must be positive, as the direction is assumed the same as the
    # pipe reference frame
    mdot = np.abs(mdot)

    # Compute radii
    rad_1 = safe_divide(diameter, 2)  # Internal radius
    rad_2 = rad_1 + thickness_internal_pipe  # Pipe external radius
    rad_3 = rad_2 + thickness_ins  # Insulation external radius
    rad_4 = rad_3 + thickness_casing  # External radius

    # Compute pipe thermal resistances
    res_1_2 = _compute_layer_resistance(rad_1, rad_2, k_internal_pipe)
    res_2_3 = _compute_layer_resistance(rad_2, rad_3, k_insulation)
    res_3_4 = _compute_layer_resistance(rad_3, rad_4, k_casing)

    # Compute soil resistance
    res_soil = _compute_soil_resistance(depth=depth, rad_ext=rad_4, k_soil=k_soil)

    # Convection resistance
    h_convection = _compute_convection_resistance(
        diameter=diameter,
        reynolds=reynolds,
        length=length,
        friction_factor=friction_factor,
        cp_fluid=cp_fluid,
        mu_fluid=mu_fluid,
        k_fluid=k_fluid,
    )

    res_convection = safe_divide(1, h_convection * 2 * np.pi * rad_1)

    # Compute losses
    r_tot = res_1_2 + res_2_3 + res_3_4 + res_soil + res_convection
    heat_gain = safe_divide(mdot, rho_fluid) * np.abs(delta_p_friction)
    loss = (t_in - t_soil) / r_tot * length

    # If the fluid velocity is close to 0, the fluid is considered still and
    # there is no friction
    loss = np.where(loss > heat_gain, loss - heat_gain, loss)
    t_out = t_in - safe_divide(loss, mdot * cp_fluid)
    t_out = np.where(
        t_soil < t_in,
        np.clip(t_out, a_min=t_soil, a_max=None),
        np.clip(t_out, a_min=None, a_max=t_soil),
    )

    t_out = np.where(mdot == 0.0, t_soil, t_out)

    t_out_der = 1 - safe_divide(length, mdot * cp_fluid * r_tot)
    t_out_der = np.where(mdot == 0.0, 0.0, t_out_der)
    t_out_der = np.where(t_out <= t_soil, 0.0, t_out_der)
    return t_out, t_out_der


[docs] def compute_pipe_temp( t_in, mdot, length, diameter, reynolds, thickness_ins, k_insulation, friction_factor, delta_p_friction, k_internal_pipe=K_INTERNAL_PIPE, thickness_internal_pipe=0.0, k_casing=K_CASING, thickness_casing=0.0, t_soil=T_SOIL, depth=DEPTH, k_soil=K_SOIL, cp_fluid=CP_FLUID, mu_fluid=MU_FLUID, k_fluid=K_FLUID, rho_fluid=RHO_FLUID, discretization=DISCRETIZATION, ): """ Computes the outlet temperature of a pipe from the inlet temperature. The pipe is split into segments of length ~ discretization for better results. """ # Get mass flow sign sign = np.sign(mdot) # Get original t_in t_in_orig = deepcopy(t_in) # Split the pipe in n segments of equal length. Segment length should be # as close as possible to the discretization length given. n_segments = np.round(length // discretization + (length % discretization != 0)) l_segments = safe_divide(length, n_segments) # Initialize empty vector for summing the averages if isiter(n_segments): t_avg_sum = np.zeros(len(n_segments)) else: t_avg_sum = 0 t_out_der_tot = 1 # Distributed pressure losses are considered linear and equally split among # segments delta_p_segments = safe_divide(delta_p_friction, n_segments) # Iterate a number of times equal to the maximum number of segments for i in range(int(np.max(n_segments))): mask = i >= n_segments # Compute t_out where there are still segments to compute t_out, t_out_der = _compute_pipe_outlet_temp( t_in=t_in, mdot=mdot * sign, length=l_segments, diameter=diameter, reynolds=reynolds, friction_factor=friction_factor, delta_p_friction=delta_p_segments, thickness_ins=thickness_ins, k_insulation=k_insulation, k_internal_pipe=k_internal_pipe, thickness_internal_pipe=thickness_internal_pipe, k_casing=k_casing, thickness_casing=thickness_casing, t_soil=t_soil, depth=depth, k_soil=k_soil, cp_fluid=cp_fluid, mu_fluid=mu_fluid, k_fluid=k_fluid, rho_fluid=rho_fluid, ) t_out = np.where(mask, t_in, t_out) # Add the averages to the total t_avg_segments = safe_divide(t_in + t_out, 2) t_avg_sum = np.where(mask, t_avg_sum, t_avg_sum + t_avg_segments) t_out_der_tot = np.where(mask, t_out_der_tot, t_out_der_tot * t_out_der) # Impose T_out as new T_in t_in = t_out # T_avg is the sum of averages divided by the number of segments t_avg = safe_divide(t_avg_sum, n_segments) # Compute thermal losses delta_q = mdot * cp_fluid * (t_out - t_in_orig) # Return t_0, t_1 and t_avg return t_out, t_avg, t_out_der_tot, delta_q
[docs] def compute_pipe_temp_net(net, fluid, soil, ts_id=None): # Get pipe attributes data = ( "mass_flow", "length", "diameter", "reynolds", "friction_factor", "delta_p_friction", "depth", "insulation_thickness", "k_insulation", "internal_pipe_thickness", "k_internal_pipe", "casing_thickness", "k_casing", "discretization", ) ( edges, mass_flow, length, diameter, reynolds, friction_factor, delta_p_friction, depth, thickness_ins, k_insulation, thickness_internal_pipe, k_internal_pipe, thickness_casing, k_casing, discretization, ) = net.edges(data=data, mask=net.pipes_mask) # Get temperature at startnode and endnode of pipe nodes, t_nodes = net.nodes(data="temperature") u, v = edges.T nodes_t_dict = dict(zip(nodes, t_nodes)) t_0 = np.fromiter((nodes_t_dict[n] for n in u), dtype=float) t_1 = np.fromiter((nodes_t_dict[n] for n in v), dtype=float) # Get inlet temperature according to the flow direction t_in = np.where(mass_flow >= 0, t_0, t_1) # Get fluid properties at inlet temperature cp_fluid = fluid.get_cp(t_in) k_fluid = fluid.get_k(t_in) mu_fluid = fluid.get_mu(t_in) rho_fluid = fluid.get_rho(t_in) # Get soil properties k_soil = soil.get_k(depth=depth) t_soil = soil.get_temp(depth=depth) # Compute t_out t_out, t_avg, t_out_der, delta_q = compute_pipe_temp( t_in=t_in, mdot=mass_flow, length=length, diameter=diameter, reynolds=reynolds, thickness_ins=thickness_ins, k_insulation=k_insulation, friction_factor=friction_factor, delta_p_friction=delta_p_friction, k_internal_pipe=k_internal_pipe, thickness_internal_pipe=thickness_internal_pipe, k_casing=k_casing, thickness_casing=thickness_casing, t_soil=t_soil, k_soil=k_soil, depth=depth, cp_fluid=cp_fluid, mu_fluid=mu_fluid, k_fluid=k_fluid, rho_fluid=rho_fluid, discretization=discretization, ) return t_in, t_out, t_avg, t_out_der, delta_q
# -------- Base Consumer and Producer -------- # """ Base Consumers and Producers share the same function for computing the outlet and average temperatures. """ def _compute_hx_outlet_temp( t_in, mass_flow, power_max, t_out_min, setpoint_type, setpoint_value, cp_fluid ): # Mass flow must be positive, as the direction is assumed the same as the # HX reference frame mass_flow = np.abs(mass_flow) # Find what setpoints are given w1 = setpoint_type == "t_out" w2 = setpoint_type == "delta_t" w3 = setpoint_type == "delta_q" t_out = np.where( # First condition: t_out is imposed w1, setpoint_value, # Other conditions: # If delta_t is imposed np.where( w2, t_in + setpoint_value, # If delta_q is used np.where( w3, t_in + safe_divide(setpoint_value, mass_flow * cp_fluid), # If none of the above, just return the inlet temperature t_in, ), ), ) t_out_der = np.where( # First condition: t_out is imposed w1, 0.0, # Other conditions: # If delta_t is imposed np.where( w2, 1.0, # If delta_q is used np.where( w3, 1, # If none of the above, just return the inlet temperature 0.0, ), ), ) # Clip if t_out lower than threshold lower_limit = np.where(np.isnan(t_out_min), -np.inf, t_out_min) t_out = np.clip(t_out, lower_limit, None) t_out_der = np.where(t_out <= lower_limit, 0.0, t_out_der) # Compute delta_q delta_q = mass_flow * cp_fluid * (t_out - t_in) # Clip if above max power limit = np.where(np.isnan(power_max), np.inf, power_max) delta_q = np.clip(delta_q, -np.abs(limit), np.abs(limit)) # Recompute t_out based on clipped values t_out = safe_divide(delta_q, mass_flow * cp_fluid) + t_in return t_out, t_out_der
[docs] def compute_hx_temp( mass_flow, t_in, setpoint_type, setpoint_type_rev, setpoint_value, setpoint_value_rev, power_max, t_out_min, cp_fluid, ts_id=None, ): # Get setpoint type and value set_t = np.where(mass_flow >= 0, setpoint_type, setpoint_type_rev) set_v = np.where(mass_flow >= 0, setpoint_value, setpoint_value_rev) # Compute outlet temperature t_out, t_out_der = _compute_hx_outlet_temp( t_in=t_in, mass_flow=mass_flow, setpoint_type=set_t, setpoint_value=set_v, power_max=power_max, t_out_min=t_out_min, cp_fluid=cp_fluid, ) # T_avg is just the average between t_0 and t_1 t_avg = safe_divide(t_in + t_out, 2) # Compute thermal losses delta_q = mass_flow * cp_fluid * (t_out - t_in) # Return t_out, t_avg and t_out_der return t_out, t_avg, t_out_der, delta_q
[docs] def compute_hx_temp_net(net, fluid, soil, mask, ts_id=None): """ Computes the outlet temperature of a heat exchanger from the inlet temperature based on the given HX type. """ # Get HX attributes data = ( "mass_flow", "power_max_hx", "t_out_min_hx", "setpoint_type_hx", "setpoint_value_hx", "setpoint_type_hx_rev", "setpoint_value_hx_rev", ) ( edges, mass_flow, power_max, t_out_min, setpoint_type, setpoint_value, setpoint_type_rev, setpoint_value_rev, ) = net.edges(data=data, mask=mask) # Get temperature at startnode and endnode of HX nodes, t_nodes = net.nodes(data="temperature") u, v = edges.T nodes_t_dict = dict(zip(nodes, t_nodes)) t_0 = np.fromiter((nodes_t_dict[n] for n in u), dtype=float) t_1 = np.fromiter((nodes_t_dict[n] for n in v), dtype=float) t_in = np.where(mass_flow >= 0, t_0, t_1) # Get fluid properties at inlet temperature cp_fluid = fluid.get_cp(t_in) t_out, t_avg, t_out_der, delta_q = compute_hx_temp( mass_flow=mass_flow, t_in=t_in, setpoint_type=setpoint_type, setpoint_type_rev=setpoint_type_rev, setpoint_value=setpoint_value, setpoint_value_rev=setpoint_value_rev, power_max=power_max, t_out_min=t_out_min, cp_fluid=cp_fluid, ) return t_in, t_out, t_avg, t_out_der, delta_q
# -------- Base Consumer and Producer -------- #
[docs] def compute_cons_temp_net(net, fluid, soil, ts_id=None): return compute_hx_temp_net( net=net, fluid=fluid, soil=soil, mask=net.consumers_mask, ts_id=ts_id )
[docs] def compute_prod_temp_net(net, fluid, soil, ts_id=None): return compute_hx_temp_net( net=net, fluid=fluid, soil=soil, mask=net.producers_mask, ts_id=ts_id )