Hello,
I have a Dynamo + Python script to divert pipes around an obstacle.
Selection in the script:
1- Select the pipe(s) to divert
2- Select the obstacle (here, the duct)
3- Choose between diverting from above or below
4- Choose the divert angle: 30°, 45°, or 90°
5- Enter the minimum distance above or below between the two elements
However, the script only works with 90° bends; it ignores the 30° and 45° angles.
To be honest, I didn’t write the script myself; it was written by ChatGPT (using the free version), and now it can’t find the cause of the problem.
# ==============================================================================
# SCRIPT DYNAMO - DÉVOIEMENT RÉSEAUX MEP AVEC RACCORDS COUDES
# Version : Dynamo 2.13 / Revit 2023 / CPython3
# Correction : connexion des segments via connecteurs MEP + coudes automatiques
# ==============================================================================
import clr
import math
clr.AddReference('RevitAPI')
clr.AddReference('RevitAPIUI')
clr.AddReference('RevitServices')
clr.AddReference('RevitNodes')
from Autodesk.Revit.DB import *
from Autodesk.Revit.DB.Mechanical import *
from Autodesk.Revit.DB.Plumbing import *
from Autodesk.Revit.UI import *
from RevitServices.Persistence import DocumentManager
from RevitServices.Transactions import TransactionManager
import Revit
clr.ImportExtensions(Revit.Elements)
clr.ImportExtensions(Revit.GeometryConversion)
# ==============================================================================
# ENTRÉES DYNAMO
# IN[0] : Éléments MEP à dévoyer (tubes/Pipe)
# IN[1] : Obstacle (gaine de ventilation)
# IN[2] : True = passer DESSUS / False = passer DESSOUS
# IN[3] : Angle en degrés (30, 45 ou 90)
# IN[4] : Garde minimale en mm
# ==============================================================================
mep_elements_input = IN[0]
obstacle_input = IN[1]
direction_up = IN[2]
angle_deg = IN[3]
clearance_mm = IN[4]
# ==============================================================================
# CONSTANTES
# ==============================================================================
MM_TO_FT = 1.0 / 304.8
DEG_TO_RAD = math.pi / 180.0
doc = DocumentManager.Instance.CurrentDBDocument
uiapp = DocumentManager.Instance.CurrentUIApplication
# ==============================================================================
# HELPERS XYZ — CPython3 (pas de surcharge opérateurs sur XYZ)
# ==============================================================================
def xyz_add(a, b):
return XYZ(a.X + b.X, a.Y + b.Y, a.Z + b.Z)
def xyz_sub(a, b):
return XYZ(a.X - b.X, a.Y - b.Y, a.Z - b.Z)
def xyz_scale(v, s):
return XYZ(v.X * s, v.Y * s, v.Z * s)
def xyz_dot(a, b):
return a.X * b.X + a.Y * b.Y + a.Z * b.Z
def xyz_length(v):
return math.sqrt(v.X * v.X + v.Y * v.Y + v.Z * v.Z)
def xyz_normalize(v):
lg = xyz_length(v)
if lg < 1e-10:
return XYZ(0.0, 0.0, 0.0)
return XYZ(v.X / lg, v.Y / lg, v.Z / lg)
def xyz_dist(a, b):
return xyz_length(xyz_sub(a, b))
def pts_equal(a, b, tol=0.001):
return xyz_dist(a, b) < tol
# ==============================================================================
# NORMALISATION DES ENTRÉES DYNAMO
# ==============================================================================
def to_revit_element(obj):
if hasattr(obj, 'InternalElement'):
return obj.InternalElement
return obj
def flatten(lst):
result = []
for item in lst:
if isinstance(item, (list, tuple)):
result.extend(flatten(item))
else:
result.append(item)
return result
if not isinstance(mep_elements_input, list):
mep_elements_input = [mep_elements_input]
mep_elements_input = flatten(mep_elements_input)
mep_elements = [to_revit_element(e) for e in mep_elements_input]
obstacle = to_revit_element(obstacle_input)
angle_deg = float(angle_deg)
for candidate in [30.0, 45.0, 90.0]:
if abs(angle_deg - candidate) < 1.0:
angle_deg = candidate
break
clearance_ft = float(clearance_mm) * MM_TO_FT
# ==============================================================================
# FONCTIONS GÉOMÉTRIQUES
# ==============================================================================
def get_pipe_curve(pipe):
loc = pipe.Location
if isinstance(loc, LocationCurve):
return loc.Curve
raise TypeError("Element {} : pas de LocationCurve.".format(pipe.Id.IntegerValue))
def get_pipe_start_end(pipe):
curve = get_pipe_curve(pipe)
return curve.GetEndPoint(0), curve.GetEndPoint(1)
def compute_pipe_radius(pipe):
try:
param = pipe.get_Parameter(BuiltInParameter.RBS_PIPE_OUTER_DIAMETER)
if param and param.AsDouble() > 0:
return param.AsDouble() / 2.0
except Exception:
pass
try:
bb = pipe.get_BoundingBox(None)
return (bb.Max.Z - bb.Min.Z) / 2.0
except Exception:
return 0.05
def compute_obstacle_projection_on_pipe(pipe, obs_bb):
pipe_start, pipe_end = get_pipe_start_end(pipe)
length_vec = xyz_sub(pipe_end, pipe_start)
pipe_length = xyz_length(length_vec)
direction = xyz_normalize(length_vec)
bmin = obs_bb.Min
bmax = obs_bb.Max
corners = [
XYZ(bmin.X, bmin.Y, bmin.Z), XYZ(bmax.X, bmin.Y, bmin.Z),
XYZ(bmin.X, bmax.Y, bmin.Z), XYZ(bmax.X, bmax.Y, bmin.Z),
XYZ(bmin.X, bmin.Y, bmax.Z), XYZ(bmax.X, bmin.Y, bmax.Z),
XYZ(bmin.X, bmax.Y, bmax.Z), XYZ(bmax.X, bmax.Y, bmax.Z),
]
t_values = [xyz_dot(xyz_sub(c, pipe_start), direction) for c in corners]
t_min = max(0.0, min(t_values))
t_max = min(pipe_length, max(t_values))
if t_min >= t_max:
return None
pt_obs_start = xyz_add(pipe_start, xyz_scale(direction, t_min))
pt_obs_end = xyz_add(pipe_start, xyz_scale(direction, t_max))
return (pt_obs_start, pt_obs_end, t_min, t_max, pipe_length, direction, pipe_start, pipe_end)
def compute_bypass_delta_z(pipe, obs_bb, direction_up, clearance_ft):
start, end = get_pipe_start_end(pipe)
pipe_z = (start.Z + end.Z) / 2.0
pipe_radius = compute_pipe_radius(pipe)
if direction_up:
target_z = obs_bb.Max.Z + clearance_ft + pipe_radius
else:
target_z = obs_bb.Min.Z - clearance_ft - pipe_radius
return target_z - pipe_z
def ramp_horizontal_distance(delta_z, angle_deg):
if angle_deg >= 90.0:
return 0.0
return abs(delta_z) / math.tan(angle_deg * DEG_TO_RAD)
# ==============================================================================
# GESTION DES CONNECTEURS MEP
# ==============================================================================
def get_pipe_connectors(pipe):
"""Retourne la liste des connecteurs d'un tube."""
connectors = []
conn_set = pipe.ConnectorManager.Connectors
for conn in conn_set:
connectors.append(conn)
return connectors
def get_connector_at_point(pipe, point, tol=0.01):
"""Retourne le connecteur du tube le plus proche du point donné."""
best = None
best_dist = 1e10
for conn in get_pipe_connectors(pipe):
d = xyz_dist(conn.Origin, point)
if d < best_dist:
best_dist = d
best = conn
if best_dist < tol:
return best
return None
def create_elbow_between_connectors(doc, conn_i, conn_j):
"""
Crée un coude entre deux connecteurs MEP.
Essaie plusieurs méthodes API selon la version Revit / disponibilité.
Retourne (fitting_element, message).
Méthodes tentées dans l'ordre :
1. doc.Create.NewElbowFitting(conn_i, conn_j) ← API standard Revit 2023
2. PlumbingUtils.CreateElbowFitting(doc, ...) ← certaines versions
3. Connexion directe ConnectTo() ← fallback sans géométrie
"""
# ── Méthode 1 : doc.Create.NewElbowFitting (Revit 2019+) ─────────────────
try:
elbow = doc.Create.NewElbowFitting(conn_i, conn_j)
if elbow is not None:
return elbow, "OK (NewElbowFitting)"
except Exception as ex1:
pass
# ── Méthode 2 : PlumbingUtils (certaines versions CPython3) ───────────────
try:
elbow = PlumbingUtils.CreateElbowFitting(doc, conn_i, conn_j)
if elbow is not None:
return elbow, "OK (PlumbingUtils)"
except Exception as ex2:
pass
# ── Méthode 3 : connexion logique ConnectTo sans raccord géométrique ──────
try:
conn_i.ConnectTo(conn_j)
return None, "WARN: connexion logique seulement (pas de coude geometrique)"
except Exception as ex3:
return None, "ECHEC total: ex1={} | ex3={}".format(str(ex1), str(ex3))
def get_pipe_system_type_id(pipe):
"""Récupère l'ElementId du type de système du tube."""
try:
p = pipe.get_Parameter(BuiltInParameter.RBS_PIPING_SYSTEM_TYPE_PARAM)
if p:
return p.AsElementId()
except Exception:
pass
return ElementId.InvalidElementId
def get_pipe_diameter(pipe):
"""Récupère le diamètre nominal du tube (en pieds)."""
try:
p = pipe.get_Parameter(BuiltInParameter.RBS_PIPE_DIAMETER_PARAM)
if p and p.AsDouble() > 0:
return p.AsDouble()
except Exception:
pass
return 0.082 # ~25 mm
def create_pipe_segment(doc, sys_type_id, pipe_type_id, level_id, diameter, pt_a, pt_b):
"""Crée un segment de tube entre pt_a et pt_b."""
new_p = Pipe.Create(doc, sys_type_id, pipe_type_id, level_id, pt_a, pt_b)
try:
pd = new_p.get_Parameter(BuiltInParameter.RBS_PIPE_DIAMETER_PARAM)
if pd and not pd.IsReadOnly:
pd.Set(diameter)
except Exception:
pass
return new_p
# ==============================================================================
# DÉVOIEMENT D'UN TUBE AVEC CONNEXIONS PAR COUDES
# ==============================================================================
def devoyer_pipe(pipe, obstacle, direction_up, angle_deg, clearance_ft):
"""
Dévoie le tube et connecte les segments par des coudes du système de canalisation.
Schéma du dévoiement (vue de côté) :
─────────────────────────────────────────────────────────────────────
pipe_start ─── [seg0] ─── A ╗ coude1 coude4 ╔ D ─── [seg4] ─── pipe_end
╚── [seg1] ── B ─── [seg_top] ── C ─── [seg3] ──╝
coude2 coude3
─────────────────────────────────────────────────────────────────────
A, D : points de début/fin de rampe (sur l'axe original Z)
B, C : points hauts des rampes (décalés en Z = tronçon dévoyé)
"""
result = {
'pipe_id' : pipe.Id.IntegerValue,
'success' : False,
'message' : '',
'new_elements': [],
'elbows' : []
}
# ── 1. BoundingBox obstacle ───────────────────────────────────────────────
obs_bb = obstacle.get_BoundingBox(None)
if obs_bb is None:
result['message'] = "Impossible d'obtenir la BoundingBox de l'obstacle."
return result
# ── 2. Projection obstacle sur tube ──────────────────────────────────────
proj = compute_obstacle_projection_on_pipe(pipe, obs_bb)
if proj is None:
result['message'] = "L'obstacle ne croise pas ce tube."
result['success'] = True
return result
(pt_obs_start, pt_obs_end,
t_min, t_max,
pipe_length, direction,
pipe_start, pipe_end) = proj
# ── 3. Delta Z de dévoiement ──────────────────────────────────────────────
delta_z = compute_bypass_delta_z(pipe, obs_bb, direction_up, clearance_ft)
if abs(delta_z) < 0.001:
result['message'] = "Delta Z negligeable, pas de devoiement necessaire."
result['success'] = True
return result
# ── 4. Distance horizontale de rampe ──────────────────────────────────────
horiz_dist = ramp_horizontal_distance(delta_z, angle_deg)
# ── 5. Calcul des 4 points clés ───────────────────────────────────────────
pt_A_raw = xyz_sub(pt_obs_start, xyz_scale(direction, horiz_dist))
pt_D_raw = xyz_add(pt_obs_end, xyz_scale(direction, horiz_dist))
def clamp_on_pipe(pt):
vec = xyz_sub(pt, pipe_start)
t = xyz_dot(vec, direction)
t = max(0.003, min(pipe_length - 0.003, t))
return xyz_add(pipe_start, xyz_scale(direction, t))
pt_A = clamp_on_pipe(pt_A_raw)
pt_D = clamp_on_pipe(pt_D_raw)
dz_vec = XYZ(0.0, 0.0, delta_z)
pt_B = xyz_add(pt_A, dz_vec) # sommet rampe gauche
pt_C = xyz_add(pt_D, dz_vec) # sommet rampe droite
# ── 6. Paramètres du tube d'origine ──────────────────────────────────────
pipe_type_id = pipe.GetTypeId()
level_id = pipe.LevelId
sys_type_id = get_pipe_system_type_id(pipe)
diameter = get_pipe_diameter(pipe)
# ── 7. Filtrage des segments à créer ─────────────────────────────────────
MIN_LEN = 0.005 # ~1.5 mm
# Définition des 5 segments : (pt_debut, pt_fin, label)
segs_def = [
(pipe_start, pt_A, "seg_gauche"),
(pt_A, pt_B, "rampe_montee"),
(pt_B, pt_C, "seg_devoye"),
(pt_C, pt_D, "rampe_descente"),
(pt_D, pipe_end, "seg_droit"),
]
segs_def = [
(a, b, lbl) for (a, b, lbl) in segs_def
if xyz_length(xyz_sub(b, a)) > MIN_LEN
]
if not segs_def:
result['message'] = "Segments calcules trop courts."
return result
# ── 8. Suppression du tube original ──────────────────────────────────────
try:
doc.Delete(pipe.Id)
except Exception as ex:
result['message'] = "Impossible de supprimer le tube original : {}".format(str(ex))
return result
# ── 9. Création des nouveaux segments ────────────────────────────────────
new_pipes = []
try:
for (pt_a, pt_b, lbl) in segs_def:
seg = create_pipe_segment(
doc, sys_type_id, pipe_type_id, level_id, diameter, pt_a, pt_b
)
new_pipes.append((seg, pt_a, pt_b, lbl))
except Exception as ex:
result['message'] = "Erreur creation segment : {}".format(str(ex))
return result
# ── 10. Connexion par coudes entre segments adjacents ────────────────────
#
# Pour chaque jonction entre segment[i] et segment[i+1] :
# - pt_jonction = pt_b du segment[i] = pt_a du segment[i+1]
# - On cherche les connecteurs libres (IsConnected=False) à ce point
# - doc.Create.NewElbowFitting(conn_i, conn_j) crée le coude Revit natif
# qui utilise le type de raccord défini dans le routing preference du système
#
# IMPORTANT : Revit.DB.Document.Create.NewElbowFitting est l'API correcte
# pour Revit 2023 / CPython3. PlumbingUtils n'expose pas cette
# méthode dans toutes les versions.
elbows = []
elbow_errors = []
for i in range(len(new_pipes) - 1):
seg_i, pt_a_i, pt_b_i, lbl_i = new_pipes[i]
seg_j, pt_a_j, pt_b_j, lbl_j = new_pipes[i + 1]
# Point de jonction commun
pt_junction = pt_b_i
# Connecteur de seg_i côté jonction (son extrémité "fin")
conn_i = get_connector_at_point(seg_i, pt_junction, tol=0.02)
# Connecteur de seg_j côté jonction (son extrémité "début")
conn_j = get_connector_at_point(seg_j, pt_junction, tol=0.02)
if conn_i is None or conn_j is None:
elbow_errors.append(
"Jonction {}/{} : connecteur introuvable (pt {:.4f},{:.4f},{:.4f})".format(
lbl_i, lbl_j,
pt_junction.X, pt_junction.Y, pt_junction.Z
)
)
continue
# Segments colinéaires → connexion directe, pas de coude
dir_i = xyz_normalize(xyz_sub(pt_b_i, pt_a_i))
dir_j = xyz_normalize(xyz_sub(pt_b_j, pt_a_j))
dot = abs(xyz_dot(dir_i, dir_j))
if dot > 0.9999:
try:
conn_i.ConnectTo(conn_j)
except Exception:
pass
continue
# Segments non colinéaires → créer un coude
elbow_elem, msg = create_elbow_between_connectors(doc, conn_i, conn_j)
if elbow_elem is not None:
elbows.append(elbow_elem.Id.IntegerValue)
else:
elbow_errors.append("Coude {}/{} : {}".format(lbl_i, lbl_j, msg))
result['success'] = True
result['new_elements'] = [p.Id.IntegerValue for (p, a, b, lbl) in new_pipes]
result['elbows'] = elbows
result['message'] = (
"OK: {} segments, {} coudes crees. "
"deltaZ={:.0f}mm, angle={}deg, rampe={:.0f}mm"
).format(
len(new_pipes),
len(elbows),
delta_z * 304.8,
int(angle_deg),
horiz_dist * 304.8
)
if elbow_errors:
result['message'] += " | Avert coudes: {}".format("; ".join(elbow_errors))
return result
# ==============================================================================
# BOUCLE PRINCIPALE
# ==============================================================================
results = []
errors = []
debug_lines = []
success_count = 0
TransactionManager.Instance.EnsureInTransaction(doc)
try:
for elem in mep_elements:
if elem is None:
errors.append("Element None ignore.")
continue
type_name = elem.GetType().Name
debug_lines.append("Id={} type={}".format(elem.Id.IntegerValue, type_name))
if not isinstance(elem, Pipe):
errors.append(
"Id={} ignore : '{}' non supporte (Pipe attendu).".format(
elem.Id.IntegerValue, type_name
)
)
continue
res = devoyer_pipe(
elem,
obstacle,
bool(direction_up),
float(angle_deg),
clearance_ft
)
results.append(res)
if res['success']:
success_count += 1
else:
errors.append("Tube {} : {}".format(res['pipe_id'], res['message']))
except Exception as global_ex:
import traceback
errors.append("ERREUR GLOBALE : {}".format(str(global_ex)))
errors.append(traceback.format_exc())
TransactionManager.Instance.ForceCloseTransaction()
else:
TransactionManager.Instance.TransactionTaskDone()
# ==============================================================================
# SORTIE
# ==============================================================================
summary = (
"Devoiements reussis : {}/{}\n"
"Direction : {}\n"
"Angle : {}deg\n"
"Garde : {} mm\n"
"\nElements detectes :\n{}"
).format(
success_count,
len(mep_elements),
"DESSUS" if direction_up else "DESSOUS",
int(angle_deg),
clearance_mm,
"\n".join(debug_lines) if debug_lines else "(aucun)"
)
if errors:
summary += "\n\n--- Erreurs ---\n" + "\n".join(errors)
OUT = [summary, results]
MEP_Devoiement avec raccord v2.dyn (34.6 KB)
code py.txt (19.9 KB)


