Pipe Diversion Around an Obstacle

Hello,
I have a Dynamo + Python script to divert pipes around an obstacle.
Selection in the script:

1- Select the pipe(s) to divert
2- Select the obstacle (here, the duct)
3- Choose between diverting from above or below
4- Choose the divert angle: 30°, 45°, or 90°
5- Enter the minimum distance above or below between the two elements

However, the script only works with 90° bends; it ignores the 30° and 45° angles.
To be honest, I didn’t write the script myself; it was written by ChatGPT (using the free version), and now it can’t find the cause of the problem.

# ==============================================================================
# SCRIPT DYNAMO - DÉVOIEMENT RÉSEAUX MEP AVEC RACCORDS COUDES
# Version : Dynamo 2.13 / Revit 2023 / CPython3
# Correction : connexion des segments via connecteurs MEP + coudes automatiques
# ==============================================================================

import clr
import math

clr.AddReference('RevitAPI')
clr.AddReference('RevitAPIUI')
clr.AddReference('RevitServices')
clr.AddReference('RevitNodes')

from Autodesk.Revit.DB import *
from Autodesk.Revit.DB.Mechanical import *
from Autodesk.Revit.DB.Plumbing import *
from Autodesk.Revit.UI import *
from RevitServices.Persistence import DocumentManager
from RevitServices.Transactions import TransactionManager

import Revit
clr.ImportExtensions(Revit.Elements)
clr.ImportExtensions(Revit.GeometryConversion)

# ==============================================================================
# ENTRÉES DYNAMO
# IN[0] : Éléments MEP à dévoyer (tubes/Pipe)
# IN[1] : Obstacle (gaine de ventilation)
# IN[2] : True = passer DESSUS  /  False = passer DESSOUS
# IN[3] : Angle en degrés (30, 45 ou 90)
# IN[4] : Garde minimale en mm
# ==============================================================================
mep_elements_input = IN[0]
obstacle_input     = IN[1]
direction_up       = IN[2]
angle_deg          = IN[3]
clearance_mm       = IN[4]

# ==============================================================================
# CONSTANTES
# ==============================================================================
MM_TO_FT   = 1.0 / 304.8
DEG_TO_RAD = math.pi / 180.0

doc   = DocumentManager.Instance.CurrentDBDocument
uiapp = DocumentManager.Instance.CurrentUIApplication

# ==============================================================================
# HELPERS XYZ — CPython3 (pas de surcharge opérateurs sur XYZ)
# ==============================================================================
def xyz_add(a, b):
    return XYZ(a.X + b.X, a.Y + b.Y, a.Z + b.Z)

def xyz_sub(a, b):
    return XYZ(a.X - b.X, a.Y - b.Y, a.Z - b.Z)

def xyz_scale(v, s):
    return XYZ(v.X * s, v.Y * s, v.Z * s)

def xyz_dot(a, b):
    return a.X * b.X + a.Y * b.Y + a.Z * b.Z

def xyz_length(v):
    return math.sqrt(v.X * v.X + v.Y * v.Y + v.Z * v.Z)

def xyz_normalize(v):
    lg = xyz_length(v)
    if lg < 1e-10:
        return XYZ(0.0, 0.0, 0.0)
    return XYZ(v.X / lg, v.Y / lg, v.Z / lg)

def xyz_dist(a, b):
    return xyz_length(xyz_sub(a, b))

def pts_equal(a, b, tol=0.001):
    return xyz_dist(a, b) < tol

# ==============================================================================
# NORMALISATION DES ENTRÉES DYNAMO
# ==============================================================================
def to_revit_element(obj):
    if hasattr(obj, 'InternalElement'):
        return obj.InternalElement
    return obj

def flatten(lst):
    result = []
    for item in lst:
        if isinstance(item, (list, tuple)):
            result.extend(flatten(item))
        else:
            result.append(item)
    return result

if not isinstance(mep_elements_input, list):
    mep_elements_input = [mep_elements_input]
mep_elements_input = flatten(mep_elements_input)

mep_elements = [to_revit_element(e) for e in mep_elements_input]
obstacle     = to_revit_element(obstacle_input)

angle_deg = float(angle_deg)
for candidate in [30.0, 45.0, 90.0]:
    if abs(angle_deg - candidate) < 1.0:
        angle_deg = candidate
        break

clearance_ft = float(clearance_mm) * MM_TO_FT

# ==============================================================================
# FONCTIONS GÉOMÉTRIQUES
# ==============================================================================
def get_pipe_curve(pipe):
    loc = pipe.Location
    if isinstance(loc, LocationCurve):
        return loc.Curve
    raise TypeError("Element {} : pas de LocationCurve.".format(pipe.Id.IntegerValue))

def get_pipe_start_end(pipe):
    curve = get_pipe_curve(pipe)
    return curve.GetEndPoint(0), curve.GetEndPoint(1)

def compute_pipe_radius(pipe):
    try:
        param = pipe.get_Parameter(BuiltInParameter.RBS_PIPE_OUTER_DIAMETER)
        if param and param.AsDouble() > 0:
            return param.AsDouble() / 2.0
    except Exception:
        pass
    try:
        bb = pipe.get_BoundingBox(None)
        return (bb.Max.Z - bb.Min.Z) / 2.0
    except Exception:
        return 0.05

def compute_obstacle_projection_on_pipe(pipe, obs_bb):
    pipe_start, pipe_end = get_pipe_start_end(pipe)
    length_vec  = xyz_sub(pipe_end, pipe_start)
    pipe_length = xyz_length(length_vec)
    direction   = xyz_normalize(length_vec)

    bmin = obs_bb.Min
    bmax = obs_bb.Max
    corners = [
        XYZ(bmin.X, bmin.Y, bmin.Z), XYZ(bmax.X, bmin.Y, bmin.Z),
        XYZ(bmin.X, bmax.Y, bmin.Z), XYZ(bmax.X, bmax.Y, bmin.Z),
        XYZ(bmin.X, bmin.Y, bmax.Z), XYZ(bmax.X, bmin.Y, bmax.Z),
        XYZ(bmin.X, bmax.Y, bmax.Z), XYZ(bmax.X, bmax.Y, bmax.Z),
    ]
    t_values = [xyz_dot(xyz_sub(c, pipe_start), direction) for c in corners]

    t_min = max(0.0, min(t_values))
    t_max = min(pipe_length, max(t_values))
    if t_min >= t_max:
        return None

    pt_obs_start = xyz_add(pipe_start, xyz_scale(direction, t_min))
    pt_obs_end   = xyz_add(pipe_start, xyz_scale(direction, t_max))
    return (pt_obs_start, pt_obs_end, t_min, t_max, pipe_length, direction, pipe_start, pipe_end)

def compute_bypass_delta_z(pipe, obs_bb, direction_up, clearance_ft):
    start, end  = get_pipe_start_end(pipe)
    pipe_z      = (start.Z + end.Z) / 2.0
    pipe_radius = compute_pipe_radius(pipe)
    if direction_up:
        target_z = obs_bb.Max.Z + clearance_ft + pipe_radius
    else:
        target_z = obs_bb.Min.Z - clearance_ft - pipe_radius
    return target_z - pipe_z

def ramp_horizontal_distance(delta_z, angle_deg):
    if angle_deg >= 90.0:
        return 0.0
    return abs(delta_z) / math.tan(angle_deg * DEG_TO_RAD)

# ==============================================================================
# GESTION DES CONNECTEURS MEP
# ==============================================================================

def get_pipe_connectors(pipe):
    """Retourne la liste des connecteurs d'un tube."""
    connectors = []
    conn_set = pipe.ConnectorManager.Connectors
    for conn in conn_set:
        connectors.append(conn)
    return connectors

def get_connector_at_point(pipe, point, tol=0.01):
    """Retourne le connecteur du tube le plus proche du point donné."""
    best     = None
    best_dist = 1e10
    for conn in get_pipe_connectors(pipe):
        d = xyz_dist(conn.Origin, point)
        if d < best_dist:
            best_dist = d
            best      = conn
    if best_dist < tol:
        return best
    return None

def create_elbow_between_connectors(doc, conn_i, conn_j):
    """
    Crée un coude entre deux connecteurs MEP.
    Essaie plusieurs méthodes API selon la version Revit / disponibilité.
    Retourne (fitting_element, message).

    Méthodes tentées dans l'ordre :
      1. doc.Create.NewElbowFitting(conn_i, conn_j)   ← API standard Revit 2023
      2. PlumbingUtils.CreateElbowFitting(doc, ...)   ← certaines versions
      3. Connexion directe ConnectTo()                ← fallback sans géométrie
    """
    # ── Méthode 1 : doc.Create.NewElbowFitting (Revit 2019+) ─────────────────
    try:
        elbow = doc.Create.NewElbowFitting(conn_i, conn_j)
        if elbow is not None:
            return elbow, "OK (NewElbowFitting)"
    except Exception as ex1:
        pass

    # ── Méthode 2 : PlumbingUtils (certaines versions CPython3) ───────────────
    try:
        elbow = PlumbingUtils.CreateElbowFitting(doc, conn_i, conn_j)
        if elbow is not None:
            return elbow, "OK (PlumbingUtils)"
    except Exception as ex2:
        pass

    # ── Méthode 3 : connexion logique ConnectTo sans raccord géométrique ──────
    try:
        conn_i.ConnectTo(conn_j)
        return None, "WARN: connexion logique seulement (pas de coude geometrique)"
    except Exception as ex3:
        return None, "ECHEC total: ex1={} | ex3={}".format(str(ex1), str(ex3))

def get_pipe_system_type_id(pipe):
    """Récupère l'ElementId du type de système du tube."""
    try:
        p = pipe.get_Parameter(BuiltInParameter.RBS_PIPING_SYSTEM_TYPE_PARAM)
        if p:
            return p.AsElementId()
    except Exception:
        pass
    return ElementId.InvalidElementId

def get_pipe_diameter(pipe):
    """Récupère le diamètre nominal du tube (en pieds)."""
    try:
        p = pipe.get_Parameter(BuiltInParameter.RBS_PIPE_DIAMETER_PARAM)
        if p and p.AsDouble() > 0:
            return p.AsDouble()
    except Exception:
        pass
    return 0.082  # ~25 mm

def create_pipe_segment(doc, sys_type_id, pipe_type_id, level_id, diameter, pt_a, pt_b):
    """Crée un segment de tube entre pt_a et pt_b."""
    new_p = Pipe.Create(doc, sys_type_id, pipe_type_id, level_id, pt_a, pt_b)
    try:
        pd = new_p.get_Parameter(BuiltInParameter.RBS_PIPE_DIAMETER_PARAM)
        if pd and not pd.IsReadOnly:
            pd.Set(diameter)
    except Exception:
        pass
    return new_p

# ==============================================================================
# DÉVOIEMENT D'UN TUBE AVEC CONNEXIONS PAR COUDES
# ==============================================================================

def devoyer_pipe(pipe, obstacle, direction_up, angle_deg, clearance_ft):
    """
    Dévoie le tube et connecte les segments par des coudes du système de canalisation.

    Schéma du dévoiement (vue de côté) :
    ─────────────────────────────────────────────────────────────────────
    pipe_start ─── [seg0] ─── A ╗ coude1    coude4 ╔ D ─── [seg4] ─── pipe_end
                                 ╚── [seg1] ── B ─── [seg_top] ── C ─── [seg3] ──╝
                                           coude2          coude3
    ─────────────────────────────────────────────────────────────────────
    A, D : points de début/fin de rampe  (sur l'axe original Z)
    B, C : points hauts des rampes       (décalés en Z = tronçon dévoyé)
    """
    result = {
        'pipe_id'     : pipe.Id.IntegerValue,
        'success'     : False,
        'message'     : '',
        'new_elements': [],
        'elbows'      : []
    }

    # ── 1. BoundingBox obstacle ───────────────────────────────────────────────
    obs_bb = obstacle.get_BoundingBox(None)
    if obs_bb is None:
        result['message'] = "Impossible d'obtenir la BoundingBox de l'obstacle."
        return result

    # ── 2. Projection obstacle sur tube ──────────────────────────────────────
    proj = compute_obstacle_projection_on_pipe(pipe, obs_bb)
    if proj is None:
        result['message'] = "L'obstacle ne croise pas ce tube."
        result['success'] = True
        return result

    (pt_obs_start, pt_obs_end,
     t_min, t_max,
     pipe_length, direction,
     pipe_start, pipe_end) = proj

    # ── 3. Delta Z de dévoiement ──────────────────────────────────────────────
    delta_z = compute_bypass_delta_z(pipe, obs_bb, direction_up, clearance_ft)
    if abs(delta_z) < 0.001:
        result['message'] = "Delta Z negligeable, pas de devoiement necessaire."
        result['success'] = True
        return result

    # ── 4. Distance horizontale de rampe ──────────────────────────────────────
    horiz_dist = ramp_horizontal_distance(delta_z, angle_deg)

    # ── 5. Calcul des 4 points clés ───────────────────────────────────────────
    pt_A_raw = xyz_sub(pt_obs_start, xyz_scale(direction, horiz_dist))
    pt_D_raw = xyz_add(pt_obs_end,   xyz_scale(direction, horiz_dist))

    def clamp_on_pipe(pt):
        vec = xyz_sub(pt, pipe_start)
        t   = xyz_dot(vec, direction)
        t   = max(0.003, min(pipe_length - 0.003, t))
        return xyz_add(pipe_start, xyz_scale(direction, t))

    pt_A = clamp_on_pipe(pt_A_raw)
    pt_D = clamp_on_pipe(pt_D_raw)

    dz_vec = XYZ(0.0, 0.0, delta_z)
    pt_B   = xyz_add(pt_A, dz_vec)  # sommet rampe gauche
    pt_C   = xyz_add(pt_D, dz_vec)  # sommet rampe droite

    # ── 6. Paramètres du tube d'origine ──────────────────────────────────────
    pipe_type_id = pipe.GetTypeId()
    level_id     = pipe.LevelId
    sys_type_id  = get_pipe_system_type_id(pipe)
    diameter     = get_pipe_diameter(pipe)

    # ── 7. Filtrage des segments à créer ─────────────────────────────────────
    MIN_LEN = 0.005  # ~1.5 mm

    # Définition des 5 segments : (pt_debut, pt_fin, label)
    segs_def = [
        (pipe_start, pt_A, "seg_gauche"),
        (pt_A,       pt_B, "rampe_montee"),
        (pt_B,       pt_C, "seg_devoye"),
        (pt_C,       pt_D, "rampe_descente"),
        (pt_D,       pipe_end, "seg_droit"),
    ]
    segs_def = [
        (a, b, lbl) for (a, b, lbl) in segs_def
        if xyz_length(xyz_sub(b, a)) > MIN_LEN
    ]

    if not segs_def:
        result['message'] = "Segments calcules trop courts."
        return result

    # ── 8. Suppression du tube original ──────────────────────────────────────
    try:
        doc.Delete(pipe.Id)
    except Exception as ex:
        result['message'] = "Impossible de supprimer le tube original : {}".format(str(ex))
        return result

    # ── 9. Création des nouveaux segments ────────────────────────────────────
    new_pipes = []
    try:
        for (pt_a, pt_b, lbl) in segs_def:
            seg = create_pipe_segment(
                doc, sys_type_id, pipe_type_id, level_id, diameter, pt_a, pt_b
            )
            new_pipes.append((seg, pt_a, pt_b, lbl))
    except Exception as ex:
        result['message'] = "Erreur creation segment : {}".format(str(ex))
        return result

    # ── 10. Connexion par coudes entre segments adjacents ────────────────────
    #
    # Pour chaque jonction entre segment[i] et segment[i+1] :
    #   - pt_jonction = pt_b du segment[i] = pt_a du segment[i+1]
    #   - On cherche les connecteurs libres (IsConnected=False) à ce point
    #   - doc.Create.NewElbowFitting(conn_i, conn_j) crée le coude Revit natif
    #     qui utilise le type de raccord défini dans le routing preference du système
    #
    # IMPORTANT : Revit.DB.Document.Create.NewElbowFitting est l'API correcte
    #             pour Revit 2023 / CPython3. PlumbingUtils n'expose pas cette
    #             méthode dans toutes les versions.

    elbows       = []
    elbow_errors = []

    for i in range(len(new_pipes) - 1):
        seg_i, pt_a_i, pt_b_i, lbl_i = new_pipes[i]
        seg_j, pt_a_j, pt_b_j, lbl_j = new_pipes[i + 1]

        # Point de jonction commun
        pt_junction = pt_b_i

        # Connecteur de seg_i côté jonction (son extrémité "fin")
        conn_i = get_connector_at_point(seg_i, pt_junction, tol=0.02)
        # Connecteur de seg_j côté jonction (son extrémité "début")
        conn_j = get_connector_at_point(seg_j, pt_junction, tol=0.02)

        if conn_i is None or conn_j is None:
            elbow_errors.append(
                "Jonction {}/{} : connecteur introuvable (pt {:.4f},{:.4f},{:.4f})".format(
                    lbl_i, lbl_j,
                    pt_junction.X, pt_junction.Y, pt_junction.Z
                )
            )
            continue

        # Segments colinéaires → connexion directe, pas de coude
        dir_i = xyz_normalize(xyz_sub(pt_b_i, pt_a_i))
        dir_j = xyz_normalize(xyz_sub(pt_b_j, pt_a_j))
        dot   = abs(xyz_dot(dir_i, dir_j))
        if dot > 0.9999:
            try:
                conn_i.ConnectTo(conn_j)
            except Exception:
                pass
            continue

        # Segments non colinéaires → créer un coude
        elbow_elem, msg = create_elbow_between_connectors(doc, conn_i, conn_j)
        if elbow_elem is not None:
            elbows.append(elbow_elem.Id.IntegerValue)
        else:
            elbow_errors.append("Coude {}/{} : {}".format(lbl_i, lbl_j, msg))

    result['success']      = True
    result['new_elements'] = [p.Id.IntegerValue for (p, a, b, lbl) in new_pipes]
    result['elbows']       = elbows
    result['message']      = (
        "OK: {} segments, {} coudes crees. "
        "deltaZ={:.0f}mm, angle={}deg, rampe={:.0f}mm"
    ).format(
        len(new_pipes),
        len(elbows),
        delta_z    * 304.8,
        int(angle_deg),
        horiz_dist * 304.8
    )
    if elbow_errors:
        result['message'] += " | Avert coudes: {}".format("; ".join(elbow_errors))

    return result

# ==============================================================================
# BOUCLE PRINCIPALE
# ==============================================================================
results       = []
errors        = []
debug_lines   = []
success_count = 0

TransactionManager.Instance.EnsureInTransaction(doc)

try:
    for elem in mep_elements:
        if elem is None:
            errors.append("Element None ignore.")
            continue

        type_name = elem.GetType().Name
        debug_lines.append("Id={} type={}".format(elem.Id.IntegerValue, type_name))

        if not isinstance(elem, Pipe):
            errors.append(
                "Id={} ignore : '{}' non supporte (Pipe attendu).".format(
                    elem.Id.IntegerValue, type_name
                )
            )
            continue

        res = devoyer_pipe(
            elem,
            obstacle,
            bool(direction_up),
            float(angle_deg),
            clearance_ft
        )
        results.append(res)

        if res['success']:
            success_count += 1
        else:
            errors.append("Tube {} : {}".format(res['pipe_id'], res['message']))

except Exception as global_ex:
    import traceback
    errors.append("ERREUR GLOBALE : {}".format(str(global_ex)))
    errors.append(traceback.format_exc())
    TransactionManager.Instance.ForceCloseTransaction()
else:
    TransactionManager.Instance.TransactionTaskDone()

# ==============================================================================
# SORTIE
# ==============================================================================
summary = (
    "Devoiements reussis : {}/{}\n"
    "Direction : {}\n"
    "Angle     : {}deg\n"
    "Garde     : {} mm\n"
    "\nElements detectes :\n{}"
).format(
    success_count,
    len(mep_elements),
    "DESSUS" if direction_up else "DESSOUS",
    int(angle_deg),
    clearance_mm,
    "\n".join(debug_lines) if debug_lines else "(aucun)"
)

if errors:
    summary += "\n\n--- Erreurs ---\n" + "\n".join(errors)

OUT = [summary, results]

MEP_Devoiement avec raccord v2.dyn (34.6 KB)

code py.txt (19.9 KB)

Hey,

So you got some working code out, and that’s great. I think its valid for you to keep with the AI route, but you need a plan for when it doesn’t work?

For me, that’s debug info. Whatever the specifics of this code and its problems, you need to be OUTing information at each stage to see where it goes wrong, and catching Exceptions and their Information. If you combine this with a visual review of the geometry you can hopefully find the error (in devoyer_pipe?).

Something like this will return the failure… (note the use of the </> button at the top, to format the code nicely)

import traceback

elbow_errors = []

try: elbow = doc.Create.NewElbowFitting(conn_i, conn_j) 

except Exception: 
    elbow_errors.append(traceback.format_exc())

OUT = elbow_errors

It might be worth trying the same operation manually to ensure that the pipe families and elbows will allow the 45 degrees in the same way as you are trying to code? That’s a good check for the specifics of your families and their settings?

You can also look around in the API samples and forum posts to see if someone else has the same problem… Pipe Fittings: change 45 degrees elbows - Autodesk Community

If you provide the Revit file that will help encourage forum folks to assist.

Hope that helps,

Mark

Hello,
I don’t think the problem lies here, as Dynamo isn’t generating any errors.

There’s also a possible error detection feature.

I simply think the code isn’t designed to handle a 45° network.

code erreur

\# ── Méthode 2 : PlumbingUtils (certaines versions CPython3) ───────────────
**try**:
    elbow = PlumbingUtils.**CreateElbowFitting**(doc, conn_i, conn_j)
    **if** elbow **is** **not** None:
        return elbow, "OK (PlumbingUtils)"
**except** Exception **as** ex2:
    pass

# ── Méthode 3 : connexion logique ConnectTo sans raccord géométrique ──────
**try**:
    conn_i.**ConnectTo**(conn_j)
    return None, "WARN: connexion logique seulement (pas de coude geometrique)"
**except** Exception **as** ex3:
    return None, "ECHEC total: ex1={} | ex3={}".**format**(**str**(ex1), **str**(ex3))

Hey,

I think my more general point, was that if you can output the exceptions, and return the working parts of your code for you to see the results… You will be able to control the code, rather than being dependant on whatever AI gives you.

I think the pertinent part is here…

`dz_vec = XYZ(0.0, 0.0, delta_z)`

pt_B = xyz_add(pt_A, dz_vec)# sommet rampe gauche
pt_C = xyz_add(pt_D, dz_vec)

You are defining pt_B as “Point A + Z” (straight up)?

But you need to define Point B as “Point A +Z + the horizontal distance along the pipe direction” (or just an angled vector).

That is going to give you the angled join? And similar with Point C?

You should see this in dynamo if you return the points and watch the 3D preview? If the Z is changing but not the XY, it must be a problem with that point creation logic? But if you see it moving in the XY, you’d know it was more about the Revit API?

Even if I am right, it is likely that there are other issues with the code, so working through and understanding it (which AI can help you with), is going to get you where you want to be.

Hope that helps,

Mark

Modified and practically functional version.
Works for 30 and 45 mm tubes, but for 90 mm tubes, the vertical part is too close to the sheath.

The new Python script is attached.

devoiement_mep_dynamo 45_ ok 2.py (18.0 KB)

# ==============================================================================
# SCRIPT DYNAMO - DÉVOIEMENT RÉSEAUX MEP AVEC RACCORDS COUDES
# Version : Dynamo 2.13 / Revit 2023 / CPython3
# Correction : connexion des segments via connecteurs MEP + coudes automatiques
# ==============================================================================
import clr
import math
clr.AddReference('RevitAPI')
clr.AddReference('RevitAPIUI')
clr.AddReference('RevitServices')
clr.AddReference('RevitNodes')
from Autodesk.Revit.DB import *
from Autodesk.Revit.DB.Mechanical import *
from Autodesk.Revit.DB.Plumbing import *
from Autodesk.Revit.UI import *
from RevitServices.Persistence import DocumentManager
from RevitServices.Transactions import TransactionManager
import Revit
clr.ImportExtensions(Revit.Elements)
clr.ImportExtensions(Revit.GeometryConversion)

# ==============================================================================
# ENTRÉES DYNAMO
# IN[0] : Éléments MEP à dévoyer (tubes/Pipe)
# IN[1] : Obstacle (gaine de ventilation)
# IN[2] : True = passer DESSUS  /  False = passer DESSOUS
# IN[3] : Angle en degrés (30, 45 ou 90)
# IN[4] : Garde minimale en mm
# ==============================================================================
mep_elements_input = IN[0]
obstacle_input     = IN[1]
direction_up       = IN[2]
angle_deg          = IN[3]
clearance_mm       = IN[4]

# ==============================================================================
# CONSTANTES
# ==============================================================================
MM_TO_FT   = 1.0 / 304.8
DEG_TO_RAD = math.pi / 180.0

doc   = DocumentManager.Instance.CurrentDBDocument
uiapp = DocumentManager.Instance.CurrentUIApplication

# ==============================================================================
# FONCTIONS UTILITAIRES
# ==============================================================================

def unwrap_element(el):
    """Déverrouille un élément Dynamo vers un élément Revit natif."""
    if hasattr(el, 'InternalElement'):
        return el.InternalElement
    return el

def get_pipe_points(pipe):
    """Retourne les deux extrémités (start, end) d'un tube Revit."""
    lc = pipe.Location
    if isinstance(lc, LocationCurve):
        line = lc.Curve
        return line.GetEndPoint(0), line.GetEndPoint(1)
    raise Exception("L'élément n'est pas un tube avec LocationCurve.")

def get_bounding_box_z(element):
    """Retourne Zmin et Zmax de la bounding box d'un élément."""
    bb = element.get_BoundingBox(None)
    if bb is None:
        raise Exception("Impossible d'obtenir la bounding box de l'obstacle.")
    return bb.Min.Z, bb.Max.Z

def get_bounding_box_xz(element):
    """Retourne les limites X et Z de la bounding box d'un élément."""
    bb = element.get_BoundingBox(None)
    if bb is None:
        raise Exception("Impossible d'obtenir la bounding box de l'obstacle.")
    return bb.Min.X, bb.Max.X, bb.Min.Z, bb.Max.Z

def get_bounding_box_full(element):
    """Retourne la bounding box complète d'un élément."""
    bb = element.get_BoundingBox(None)
    if bb is None:
        raise Exception("Impossible d'obtenir la bounding box de l'obstacle.")
    return bb

def xyz_sub(a, b):
    """Soustraction de deux XYZ compatible CPython3."""
    return XYZ(a.X - b.X, a.Y - b.Y, a.Z - b.Z)

def xyz_add(a, b):
    """Addition de deux XYZ compatible CPython3."""
    return XYZ(a.X + b.X, a.Y + b.Y, a.Z + b.Z)

def xyz_scale(a, s):
    """Multiplication scalaire d'un XYZ."""
    return XYZ(a.X * s, a.Y * s, a.Z * s)

def xyz_dot(a, b):
    """Produit scalaire de deux XYZ."""
    return a.X * b.X + a.Y * b.Y + a.Z * b.Z

def xyz_normalize(v):
    """Normalise un vecteur XYZ."""
    length = math.sqrt(v.X**2 + v.Y**2 + v.Z**2)
    if length < 1e-12:
        return XYZ(0, 0, 0)
    return XYZ(v.X / length, v.Y / length, v.Z / length)

def project_point_on_line(pt, line_start, line_end):
    """Projette un point sur une ligne définie par deux points (en 3D)."""
    d = xyz_sub(line_end, line_start)
    length_sq = xyz_dot(d, d)
    if length_sq < 1e-12:
        return line_start
    t = xyz_dot(xyz_sub(pt, line_start), d) / length_sq
    t = max(0.0, min(1.0, t))
    return xyz_add(line_start, xyz_scale(d, t))

def find_connector_at_point(element, point, tol=0.01):
    """Trouve le connecteur d'un élément MEP le plus proche d'un point donné."""
    conn_manager = element.ConnectorManager
    if conn_manager is None:
        return None
    best = None
    best_dist = float('inf')
    for conn in conn_manager.Connectors:
        dist = conn.Origin.DistanceTo(point)
        if dist < best_dist:
            best_dist = dist
            best = conn
    if best_dist < tol:
        return best
    return None

def get_pipe_level_id(pipe):
    """Récupère l'ID du niveau associé au tube."""
    param = pipe.get_Parameter(BuiltInParameter.RBS_START_LEVEL_PARAM)
    if param is not None:
        return param.AsElementId()
    return ElementId.InvalidElementId

def get_pipe_type_id(pipe):
    """Récupère le type du tube."""
    return pipe.GetTypeId()

def get_pipe_system_type_id(pipe):
    """Récupère le type de système du tube."""
    param = pipe.get_Parameter(BuiltInParameter.RBS_PIPING_SYSTEM_TYPE_PARAM)
    if param is not None:
        return param.AsElementId()
    return ElementId.InvalidElementId

def get_pipe_outer_radius(pipe):
    """
    Retourne le rayon depuis les paramètres Revit - à titre indicatif uniquement.
    Le calcul réel de positionnement utilise la bounding box du tube (plus fiable).
    Note : RBS_PIPE_OUTER_DIAMETER retourne le rayon nominal en unités internes,
    pas le diamètre - comportement confirmé sur Revit 2023.
    """
    param = pipe.get_Parameter(BuiltInParameter.RBS_PIPE_OUTER_DIAMETER)
    if param is not None and param.AsDouble() > 0:
        return param.AsDouble()
    param = pipe.get_Parameter(BuiltInParameter.RBS_PIPE_DIAMETER_PARAM)
    if param is not None and param.AsDouble() > 0:
        return param.AsDouble()
    return 0.0

def calculate_offset_distance(angle_deg, vertical_offset_ft):
    """
    Calcule la distance horizontale nécessaire pour le dévoiement.
    vertical_offset_ft = différence Z entre l'axe du tube dévié et l'axe du tube original.
    La distance horizontale = offset vertical / tan(angle).
    """
    angle_rad = angle_deg * DEG_TO_RAD
    horizontal_dist = abs(vertical_offset_ft) / math.tan(angle_rad)
    return horizontal_dist

# ==============================================================================
# FONCTION PRINCIPALE DE DÉVOIEMENT D'UN TUBE
# ==============================================================================

def devoyer_tube(pipe, obstacle, direction_up, angle_deg, clearance_ft):
    """
    Dévie un tube pour éviter un obstacle en créant 3 segments + 2 coudes.
    
    Stratégie :
    1. Calcul de la zone de conflit entre le tube et l'obstacle (projection sur l'axe du tube)
    2. Calcul du décalage vertical nécessaire
    3. Suppression du segment original
    4. Création de 3 segments (avant, traversée, après)
    5. Connexion avec des coudes automatiques
    """
    results = []

    # --- Géométrie du tube ---
    p_start, p_end = get_pipe_points(pipe)
    pipe_dir = xyz_normalize(xyz_sub(p_end, p_start))
    
    pipe_level_id   = get_pipe_level_id(pipe)
    pipe_type_id    = get_pipe_type_id(pipe)
    pipe_sys_type_id = get_pipe_system_type_id(pipe)
    pipe_outer_radius = get_pipe_outer_radius(pipe)  # rayon réel en pieds

    FT = 304.8  # 1 ft en mm (pour debug)

    # --------------------------------------------------------------------------
    # PRINCIPE : la clearance est mesurée entre bounding boxes.
    #
    # Pour garantir la garde entre BB tube et BB gaine, il faut connaître
    # la "demi-hauteur BB" du tube, c'est-à-dire la distance entre son AXE
    # et le bord supérieur (ou inférieur) de sa bounding box.
    # Cette valeur inclut rayon + isolant éventuel + épaisseur de paroi.
    #
    # On la calcule en lisant la BB du tube ORIGINAL avant de le supprimer.
    # --------------------------------------------------------------------------

    # BB du tube original → demi-hauteur réelle de la BB tube
    # Pour un tube horizontal, la BB en Z couvre le diamètre complet.
    # pipe_bb_half = (bb.Max.Z - bb.Min.Z) / 2  = rayon + isolant éventuel
    bb_pipe_orig = pipe.get_BoundingBox(None)
    if bb_pipe_orig is None:
        raise Exception("Impossible de lire la bounding box du tube original.")

    pipe_bb_half = (bb_pipe_orig.Max.Z - bb_pipe_orig.Min.Z) / 2.0
    # Axe Z du tube = milieu de sa BB en Z (plus fiable que p_start.Z)
    z_pipe = (bb_pipe_orig.Min.Z + bb_pipe_orig.Max.Z) / 2.0

    # BB de la gaine obstacle
    bb_obs  = get_bounding_box_full(obstacle)
    obs_bb_min_z = bb_obs.Min.Z
    obs_bb_max_z = bb_obs.Max.Z

    # --------------------------------------------------------------------------
    # z_devoi : Z de l'AXE du tube dévié tel que :
    #
    #   BB_tube_dévié.Min.Z = obs_bb_max_z + clearance_ft  (dessus)
    #   BB_tube_dévié.Max.Z = obs_bb_min_z - clearance_ft  (dessous)
    #
    #   Or BB_tube.Min.Z = z_devoi - pipe_bb_half
    #      BB_tube.Max.Z = z_devoi + pipe_bb_half
    #
    #   Donc :
    #   DESSUS  : z_devoi = obs_bb_max_z + clearance_ft + pipe_bb_half
    #   DESSOUS : z_devoi = obs_bb_min_z - clearance_ft - pipe_bb_half
    # --------------------------------------------------------------------------
    if direction_up:
        z_devoi = obs_bb_max_z + clearance_ft + pipe_bb_half
    else:
        z_devoi = obs_bb_min_z - clearance_ft - pipe_bb_half

    # Garde réelle entre bounding boxes (doit être = clearance_ft)
    if direction_up:
        clearance_reelle_ft = (z_devoi - pipe_bb_half) - obs_bb_max_z
    else:
        clearance_reelle_ft = obs_bb_min_z - (z_devoi + pipe_bb_half)

    # Offset vertical entre axe tube original et axe tube dévié
    vertical_offset = abs(z_devoi - z_pipe)
    if vertical_offset < 1e-6:
        raise Exception(
            "Offset vertical nul : le tube est déjà au bon niveau. "
            "Vérifiez positions tube et obstacle."
        )

    # Distance horizontale pour la rampe à l'angle demandé
    h_dist = calculate_offset_distance(angle_deg, vertical_offset)

    # --- Données de diagnostic (en mm) ---
    diag = {
        "z_tube_origine_mm"       : round(z_pipe           * FT, 1),
        "z_BB_basse_gaine_mm"     : round(obs_bb_min_z     * FT, 1),
        "z_BB_haute_gaine_mm"     : round(obs_bb_max_z     * FT, 1),
        "z_axe_tube_devie_mm"     : round(z_devoi          * FT, 1),
        "pipe_outer_radius_mm"    : round(pipe_outer_radius * FT, 1),
        "pipe_bb_half_mm"         : round(pipe_bb_half      * FT, 1),
        "clearance_demandee_mm"   : round(clearance_ft      * FT, 1),
        "clearance_reelle_BB_mm"  : round(clearance_reelle_ft * FT, 1),
        "offset_vertical_mm"      : round(vertical_offset   * FT, 1),
        "distance_horiz_rampe_mm" : round(h_dist            * FT, 1),
    }

    # BB de l'obstacle pour la projection horizontale (centre XY)
    obs_min = bb_obs.Min
    obs_max = bb_obs.Max
    obs_center_xy = XYZ(
        (obs_min.X + obs_max.X) / 2.0,
        (obs_min.Y + obs_max.Y) / 2.0,
        z_pipe  # on projette au niveau Z du tube pour la recherche sur l'axe
    )

    # --- Points de projection sur le tube ---
    obs_proj = project_point_on_line(obs_center_xy, p_start, p_end)

    # --- Demi-largeur de l'obstacle selon l'axe du tube (projection BB) ---
    obs_half_along = (
        abs(pipe_dir.X) * (obs_max.X - obs_min.X) / 2.0 +
        abs(pipe_dir.Y) * (obs_max.Y - obs_min.Y) / 2.0
    )

    pt_inflect_1 = xyz_sub(obs_proj, xyz_scale(pipe_dir, h_dist + obs_half_along))
    pt_inflect_2 = xyz_add(obs_proj, xyz_scale(pipe_dir, h_dist + obs_half_along))

    # Correction Z : les points d'inflexion restent au niveau Z du tube
    pt_inflect_1 = XYZ(pt_inflect_1.X, pt_inflect_1.Y, z_pipe)
    pt_inflect_2 = XYZ(pt_inflect_2.X, pt_inflect_2.Y, z_pipe)

    # Points au niveau du dévoiement (sur l'obstacle)
    pt_devoi_1 = XYZ(pt_inflect_1.X + pipe_dir.X * h_dist,
                     pt_inflect_1.Y + pipe_dir.Y * h_dist,
                     z_devoi)
    pt_devoi_2 = XYZ(pt_inflect_2.X - pipe_dir.X * h_dist,
                     pt_inflect_2.Y - pipe_dir.Y * h_dist,
                     z_devoi)

    # Vérification : les points doivent être dans l'ordre sur l'axe du tube
    def dot_dir(pt):
        return xyz_dot(xyz_sub(pt, p_start), pipe_dir)

    d_start   = dot_dir(p_start)
    d_end     = dot_dir(p_end)
    d_inf1    = dot_dir(pt_inflect_1)
    d_inf2    = dot_dir(pt_inflect_2)
    d_dv1     = dot_dir(pt_devoi_1)
    d_dv2     = dot_dir(pt_devoi_2)

    if d_inf1 <= d_start + 1e-6 or d_inf2 >= d_end - 1e-6:
        raise Exception(
            "Le tube est trop court pour le dévoiement demandé. "
            "Augmentez la longueur du tube ou réduisez la garde/l'angle."
        )

    # ===========================================================================
    # CRÉATION DES SEGMENTS ET COUDES
    # ===========================================================================
    
    # Segment 1 : p_start -> pt_inflect_1  (niveau Z original)
    seg1_start = p_start
    seg1_end   = pt_inflect_1

    # Segment 2 : pt_devoi_1 -> pt_devoi_2  (niveau Z dévié, passage sur/sous obstacle)
    seg2_start = pt_devoi_1
    seg2_end   = pt_devoi_2

    # Segment 3 : pt_inflect_2 -> p_end  (retour niveau Z original)
    seg3_start = pt_inflect_2
    seg3_end   = p_end

    # Segment de montée : pt_inflect_1 -> pt_devoi_1
    seg_up_start = pt_inflect_1
    seg_up_end   = pt_devoi_1

    # Segment de descente : pt_devoi_2 -> pt_inflect_2
    seg_dn_start = pt_devoi_2
    seg_dn_end   = pt_inflect_2

    # --- Lire le diamètre original AVANT suppression du tube ---
    # RBS_PIPE_OUTER_DIAMETER retourne le rayon en unités internes Revit,
    # mais pour SET le diamètre on utilise RBS_PIPE_DIAMETER_PARAM (diamètre nominal)
    # qui lui aussi retourne le rayon → on stocke la valeur brute telle quelle.
    param_diam_orig = pipe.get_Parameter(BuiltInParameter.RBS_PIPE_DIAMETER_PARAM)
    orig_diam_value = param_diam_orig.AsDouble() if param_diam_orig is not None else None

    # --- Suppression du tube original ---
    doc.Delete(pipe.Id)

    # --- Création des 5 segments ---
    def create_pipe_segment(start_pt, end_pt):
        new_pipe = Pipe.Create(
            doc,
            pipe_sys_type_id,
            pipe_type_id,
            pipe_level_id,
            start_pt,
            end_pt
        )
        # Réappliquer le diamètre nominal du tube original
        if orig_diam_value is not None:
            p = new_pipe.get_Parameter(BuiltInParameter.RBS_PIPE_DIAMETER_PARAM)
            if p is not None and not p.IsReadOnly:
                p.Set(orig_diam_value)
        return new_pipe

    pipe1    = create_pipe_segment(seg1_start,   seg1_end)
    pipe_up  = create_pipe_segment(seg_up_start, seg_up_end)
    pipe2    = create_pipe_segment(seg2_start,   seg2_end)
    pipe_dn  = create_pipe_segment(seg_dn_start, seg_dn_end)
    pipe3    = create_pipe_segment(seg3_start,   seg3_end)

    # --- Connexion des segments avec coudes ---
    def connect_with_elbow(pipe_a, end_a, pipe_b, start_b):
        """Connecte deux tuyaux en créant un coude au point de jonction."""
        conn_a = find_connector_at_point(pipe_a, end_a)
        conn_b = find_connector_at_point(pipe_b, start_b)
        if conn_a is None or conn_b is None:
            raise Exception(
                "Impossible de trouver les connecteurs pour créer le coude. "
                "Vérifiez les points de jonction."
            )
        elbow = doc.Create.NewElbowFitting(conn_a, conn_b)
        return elbow

    elbow1 = connect_with_elbow(pipe1,   seg1_end,   pipe_up,  seg_up_start)
    elbow2 = connect_with_elbow(pipe_up,  seg_up_end, pipe2,   seg2_start)
    elbow3 = connect_with_elbow(pipe2,   seg2_end,   pipe_dn,  seg_dn_start)
    elbow4 = connect_with_elbow(pipe_dn,  seg_dn_end, pipe3,   seg3_start)

    results.append({
        "pipes"      : [pipe1, pipe_up, pipe2, pipe_dn, pipe3],
        "elbows"     : [elbow1, elbow2, elbow3, elbow4],
        "diagnostic" : diag
    })
    return results

# ==============================================================================
# TRAITEMENT PRINCIPAL
# ==============================================================================

# Normalisation des entrées
if not isinstance(mep_elements_input, list):
    mep_elements_input = [mep_elements_input]

obstacle  = unwrap_element(obstacle_input)
clearance_ft = float(clearance_mm) * MM_TO_FT
angle     = float(angle_deg)

output_results = []
errors         = []

TransactionManager.Instance.EnsureInTransaction(doc)

try:
    for el_input in mep_elements_input:
        pipe = unwrap_element(el_input)
        
        # Vérification du type d'élément
        if not isinstance(pipe, Pipe):
            errors.append("Élément ignoré (non Pipe) : {}".format(pipe.Id.IntegerValue))
            continue
        
        try:
            res = devoyer_tube(pipe, obstacle, direction_up, angle, clearance_ft)
            output_results.extend(res)
        except Exception as e_pipe:
            errors.append(
                "Erreur sur tube {} : {}".format(pipe.Id.IntegerValue, str(e_pipe))
            )

    TransactionManager.Instance.TransactionTaskDone()

except Exception as e_global:
    TransactionManager.Instance.ForceCloseTransaction()
    errors.append("Erreur globale : {}".format(str(e_global)))

# ==============================================================================
# SORTIE
# ==============================================================================
summary = []
for r in output_results:
    pipe_ids  = [p.Id.IntegerValue for p in r["pipes"]]
    elbow_ids = [e.Id.IntegerValue for e in r["elbows"] if e is not None]
    summary.append({
        "tubes créés"  : pipe_ids,
        "coudes créés" : elbow_ids,
        "diagnostic"   : r["diagnostic"]
    })

OUT = {
    "résultats" : summary,
    "erreurs"   : errors if errors else "Aucune erreur"
}

Hi, I can do like much simpler things for things like this, I’ve tried doing it in one python code, but there are so many things that can go wrong and it can be hard to debug, so I try with small bites of the output, and then put it together, maybe just me, have you tried that?
explorer03nJnJ5NLP

I am not a MEP engineer, but that looks sweet :heart_eyes:.

hahah you should be…we have lot of fun :rofl:

Naaah, i enjoy modelling the building MEP engineers put their stuff in :sweat_smile:.

No ducts and piping hell fo rme :see_no_evil:

hahaha but in the end its just about sort a list :laughing: no matter what, so lets us call it arch, mep, struc…no matter

Hi,

You can also split the original pipe and rotate the elements that make up the slope.

Here’s an example; you need to add the calculations for the cut points.

import clr
import sys
import System
#import net library
from System import Array
from System.Collections.Generic import List, IList, Dictionary, HashSet
#
clr.AddReference('ProtoGeometry')
from Autodesk.DesignScript.Geometry import *
import Autodesk.DesignScript.Geometry as DS

#import Revit API
clr.AddReference('RevitAPI')
import Autodesk
from Autodesk.Revit.DB import *
import Autodesk.Revit.DB as DB
#import specify namespace
from Autodesk.Revit.DB.Plumbing import *
from Autodesk.Revit.DB.Mechanical import *

clr.AddReference('RevitNodes')
import Revit
clr.ImportExtensions(Revit.Elements)
clr.ImportExtensions(Revit.GeometryConversion)

#import transactionManager and DocumentManager (RevitServices is specific to Dynamo)
clr.AddReference('RevitServices')
import RevitServices
from RevitServices.Persistence import DocumentManager
from RevitServices.Transactions import TransactionManager
doc = DocumentManager.Instance.CurrentDBDocument

import math


def find_intersection(elemCurveA, elemCurveB):
    curveA = elemCurveA.Location.Curve
    curveB = elemCurveB.Location.Curve
    curveA.MakeUnbound()
    curveB.MakeUnbound()
    interResult = curveA.Intersect(curveB, CurveIntersectResultOption.Detailed)
    if interResult.Result == SetComparisonResult.Overlap:
        overlap_pt : CurveOverlapPoint  = interResult.GetOverlaps()[0]
        pt = overlap_pt.Point
        return pt
    return None
    
def move_nearest_connector(elemCurveA, pt):
    all_con = [con for con in elemCurveA.ConnectorManager.Connectors]
    all_con.sort(key = lambda c : c.Origin.DistanceTo(pt))
    con = all_con[0]
    con.Origin = pt
    return con

#Preparing input from dynamo to revit
elemA : DB.MEPCurve = UnwrapElement(IN[0])
elemB : DB.MEPCurve = UnwrapElement(IN[1])
angle = IN[2] * 0.0174533

array_pts = []
outIds = []
out = []

curveA : DB.Curve = elemA.Location.Curve
vectA = curveA.Direction
curveB : DB.Curve = elemB.Location.Curve
vectUp = XYZ(0,0,1.5) # vector offset up


interResult = curveA.Intersect(curveB, CurveIntersectResultOption.Detailed)
# compute the points to split
if interResult.Result == SetComparisonResult.Overlap:
    overlap_pt : CurveOverlapPoint  = interResult.GetOverlaps()[0]
    para1 = overlap_pt.FirstParameter
    pt = overlap_pt.Point
    for i in [-1.8, -0.9, 0.9, 1.8]:
        ptx = curveA.Evaluate(para1 + i, False)
        array_pts.append(ptx)
    
    TransactionManager.Instance.EnsureInTransaction(doc)
    # split at points
    array_pts.sort(key = lambda p : p.DistanceTo(curveA.GetEndPoint(0)) )
    pta, ptb, ptc, ptd = array_pts
    elemId = elemA.Id
    outIds.append(elemId)
    #
    for pt in array_pts:
        newfam = None
        try:
                newId = MechanicalUtils.BreakCurve(doc, elemId, pt)
        except Exception as ex:
                print(ex)
                newId = PlumbingUtils.BreakCurve(doc, elemId, pt)
        #
        doc.Regenerate()
        outIds.append(newId)
    #
    new_elems = [doc.GetElement(xId) for xId in outIds]
    new_elems.sort(key = lambda e : e.Location.Curve.GetEndPoint(0).DistanceTo(curveA.GetEndPoint(0)) )
    new_elems_up_ids = List[ElementId]([x.Id for x in new_elems[1:4]])
    # move up
    ElementTransformUtils.MoveElements(doc, new_elems_up_ids, vectUp)
    # rotate 45 degrees
    elemR1 = new_elems[1]
    elemR2 = new_elems[2]
    elemR3 = new_elems[3]
    #
    axis1 = DB.Line.CreateUnbound(ptc + vectUp, vectA.CrossProduct(XYZ.BasisZ))
    axis2 = DB.Line.CreateUnbound(ptb + vectUp, vectA.CrossProduct(XYZ.BasisZ))
    
    ElementTransformUtils.RotateElement(doc, elemR1.Id, axis1, -angle + math.pi)
    ElementTransformUtils.RotateElement(doc, elemR3.Id, axis2, angle + math.pi)
    # find intersection projection
    ptI1 = find_intersection(new_elems[0], elemR1)
    ptI2 = find_intersection(new_elems[-1], elemR3)
    # conect all
    for elem1, elem2, pt_interconect in [
                                        [new_elems[0], elemR1, ptI1], # pair 1
                                        [new_elems[-1], elemR3, ptI2], # pair 2
                                        [elemR1, elemR2, ptc + vectUp], # pair 3
                                        [elemR2, elemR3, ptb + vectUp] # pair 4
                                    ]:
                                        
        con1 = move_nearest_connector(elem1, pt_interconect)
        con2 = move_nearest_connector(elem2, pt_interconect)
        elbow = doc.Create.NewElbowFitting(con1, con2)
        out.append(elbow)
    #
    TransactionManager.Instance.TransactionTaskDone()
    
OUT = out