Graph Theory and Applications in Industrial Networks
What Is Graph Theory and Why Does It Matter in Industry?
Look at any factory from above: machines connected by conveyors, pipes linking pumps to tanks, network cables connecting PLCs to each other. All of these systems can be represented mathematically as a graph -- a set of nodes and edges connecting them.
Graph theory is the branch of mathematics that studies these networks. It helps answer questions like:
- What is the shortest path to move a product from the warehouse to the shipping dock?
- If this electrical switch fails, which departments lose power?
- How do we distribute tasks across machines to minimize production time?
Fundamental Concepts: Nodes and Edges
A graph G = (V, E) consists of:
V= a set of vertices (nodes) -- representing entities (machines, stations, devices)E= a set of edges -- representing connections between nodes
# Simple graph representing a factory network
# Nodes: production stations
nodes = ["raw_warehouse", "cutting", "welding", "painting", "assembly", "inspection", "shipping"]
# Edges: transport lines between stations (with distance in meters)
edges = [
("raw_warehouse", "cutting", 30),
("raw_warehouse", "welding", 50),
("cutting", "welding", 15),
("cutting", "painting", 40),
("welding", "assembly", 20),
("painting", "assembly", 25),
("assembly", "inspection", 10),
("inspection", "shipping", 35),
("assembly", "shipping", 45),
]
print(f"Number of nodes: {len(nodes)}")
print(f"Number of edges: {len(edges)}")
print("\nConnections:")
for src, dst, weight in edges:
print(f" {src} --({weight}m)--> {dst}")
Types of Graphs: Directed and Undirected
| Type | Description | Industrial Example |
|---|---|---|
| Undirected | Edges work both ways | Factory Ethernet network |
| Directed | Edges have one direction | Production line (material flows one way) |
| Weighted | Each edge has a value (weight) | Distance, cost, transfer time |
| Unweighted | All edges are equal | Connection exists or not |
# Represent graph as an adjacency list
def build_adjacency_list(edges, directed=False):
graph = {}
for src, dst, weight in edges:
if src not in graph:
graph[src] = []
graph[src].append((dst, weight))
if not directed:
if dst not in graph:
graph[dst] = []
graph[dst].append((src, weight))
return graph
# Undirected graph (factory network)
graph_undirected = build_adjacency_list(edges, directed=False)
# Directed graph (production line)
graph_directed = build_adjacency_list(edges, directed=True)
print("Neighbors of 'cutting' (undirected):")
for neighbor, weight in graph_undirected.get("cutting", []):
print(f" -> {neighbor} ({weight}m)")
print("\nNeighbors of 'cutting' (directed):")
for neighbor, weight in graph_directed.get("cutting", []):
print(f" -> {neighbor} ({weight}m)")
Graph Representation: Adjacency Matrix vs. Adjacency List
There are two main ways to store a graph in memory:
Adjacency Matrix: An n x n matrix where M[i][j] = weight if an edge exists:
import numpy as np
# Index the nodes
node_index = {name: i for i, name in enumerate(nodes)}
n = len(nodes)
# Build adjacency matrix
adj_matrix = np.zeros((n, n))
for src, dst, weight in edges:
i, j = node_index[src], node_index[dst]
adj_matrix[i][j] = weight
adj_matrix[j][i] = weight # undirected
print("Adjacency matrix (row = source, column = target):")
print(f"{'':>15}", end="")
for name in nodes:
print(f"{name[:5]:>7}", end="")
print()
for i, name in enumerate(nodes):
print(f"{name:>15}", end="")
for j in range(n):
val = int(adj_matrix[i][j])
print(f"{val:>7}", end="")
print()
| Representation | Memory | Edge Lookup | Best For |
|---|---|---|---|
| Adjacency matrix | O(V^2) | O(1) | Dense graphs |
| Adjacency list | O(V+E) | O(degree) | Sparse graphs (most cases) |
Dijkstra's Algorithm: Shortest Path
Dijkstra's algorithm finds the shortest path from a source node to all other nodes in a weighted graph with non-negative weights.
Idea: Start from the source; at each step, pick the nearest unvisited node and update distances through it.
import heapq
def dijkstra(graph, start):
"""Dijkstra's shortest path algorithm"""
distances = {node: float('inf') for node in graph}
distances[start] = 0
previous = {node: None for node in graph}
visited = set()
# Priority queue (min-heap)
pq = [(0, start)]
while pq:
current_dist, current_node = heapq.heappop(pq)
if current_node in visited:
continue
visited.add(current_node)
for neighbor, weight in graph.get(current_node, []):
distance = current_dist + weight
if distance < distances.get(neighbor, float('inf')):
distances[neighbor] = distance
previous[neighbor] = current_node
heapq.heappush(pq, (distance, neighbor))
return distances, previous
def reconstruct_path(previous, start, end):
"""Reconstruct the path from start to end"""
path = []
current = end
while current is not None:
path.append(current)
current = previous.get(current)
path.reverse()
if path[0] == start:
return path
return [] # no path
# Apply to the factory network
distances, previous = dijkstra(graph_undirected, "raw_warehouse")
print("Shortest distances from the warehouse:")
for node in sorted(distances, key=distances.get):
path = reconstruct_path(previous, "raw_warehouse", node)
print(f" To {node}: {distances[node]:.0f}m via {' -> '.join(path)}")
Time complexity: O((V + E) log V) with a priority queue -- very fast even for large networks.
Node Degree and Centrality: Who Is Most Important?
Node degree is the number of edges connected to it. Nodes with high degree are network hubs -- their failure causes the most damage.
def calculate_degree(graph):
"""Calculate the degree of each node"""
degrees = {}
for node in graph:
degrees[node] = len(graph[node])
return degrees
def calculate_betweenness_simple(graph, nodes):
"""Simplified betweenness centrality:
how many shortest paths pass through each node
"""
betweenness = {node: 0 for node in nodes}
for start in nodes:
distances, previous = dijkstra(graph, start)
for end in nodes:
if start != end and distances.get(end, float('inf')) < float('inf'):
path = reconstruct_path(previous, start, end)
for node in path[1:-1]: # exclude source and target
betweenness[node] += 1
return betweenness
degrees = calculate_degree(graph_undirected)
betweenness = calculate_betweenness_simple(graph_undirected, nodes)
print("Node importance analysis:")
print(f"{'Station':<15} {'Degree':>8} {'Betweenness':>12}")
print("-" * 37)
for node in sorted(nodes, key=lambda n: betweenness.get(n, 0), reverse=True):
print(f"{node:<15} {degrees.get(node, 0):>8} {betweenness.get(node, 0):>12}")
# Identify bottleneck
max_betweenness_node = max(betweenness, key=betweenness.get)
print(f"\nBottleneck: {max_betweenness_node} (betweenness: {betweenness[max_betweenness_node]})")
print("Failure of this station disrupts the most paths!")
Vulnerability Analysis: What If a Node Fails?
In a factory, vulnerability analysis reveals single points of failure:
def analyze_vulnerability(graph, nodes, edges):
"""Analyze the impact of removing each node"""
results = []
for removed_node in nodes:
# Build graph without the removed node
reduced_edges = [(s, d, w) for s, d, w in edges
if s != removed_node and d != removed_node]
reduced_nodes = [n for n in nodes if n != removed_node]
if not reduced_edges:
results.append((removed_node, len(nodes) - 1, True))
continue
reduced_graph = build_adjacency_list(reduced_edges, directed=False)
# Check connectivity: are all remaining nodes still connected?
visited = set()
start = reduced_nodes[0]
queue = [start]
visited.add(start)
while queue:
current = queue.pop(0)
for neighbor, _ in reduced_graph.get(current, []):
if neighbor not in visited and neighbor in reduced_nodes:
visited.add(neighbor)
queue.append(neighbor)
disconnected = len(reduced_nodes) - len(visited)
is_critical = disconnected > 0
results.append((removed_node, disconnected, is_critical))
print("Vulnerability analysis -- impact of each station failure:")
print(f"{'Station':<15} {'Disconnected':>13} {'Critical?':>10}")
print("-" * 40)
for node, disc, critical in sorted(results, key=lambda x: x[1], reverse=True):
status = "YES!" if critical else "No"
print(f"{node:<15} {disc:>13} {status:>10}")
critical_nodes = [r[0] for r in results if r[2]]
if critical_nodes:
print(f"\nCritical failure points: {', '.join(critical_nodes)}")
print("Add redundant paths for these stations!")
else:
print("\nNetwork is robust -- no single points of failure")
analyze_vulnerability(graph_undirected, nodes, edges)
Industrial Application: Optimizing a Factory Network
Problem: A factory wants to optimize its industrial Ethernet network between controllers. Goal: minimize maximum latency while ensuring robustness.
import heapq
# Industrial Ethernet network
network_nodes = ["PLC_main", "PLC_line1", "PLC_line2", "PLC_line3",
"SCADA", "HMI_1", "HMI_2", "Switch_A", "Switch_B"]
network_edges = [
("PLC_main", "Switch_A", 2), # ms (latency)
("PLC_main", "Switch_B", 3),
("Switch_A", "PLC_line1", 1),
("Switch_A", "PLC_line2", 1),
("Switch_A", "SCADA", 2),
("Switch_B", "PLC_line3", 1),
("Switch_B", "HMI_1", 2),
("Switch_B", "HMI_2", 2),
("Switch_A", "Switch_B", 1), # inter-switch link
("SCADA", "HMI_1", 3),
]
network_graph = build_adjacency_list(network_edges, directed=False)
# Analysis: max latency from SCADA to every device
distances, _ = dijkstra(network_graph, "SCADA")
print("Latency from SCADA to each device:")
print(f"{'Device':<15} {'Latency (ms)':>13}")
print("-" * 30)
for node in sorted(distances, key=distances.get):
if node != "SCADA":
print(f"{node:<15} {distances[node]:>13.0f}")
max_latency_node = max(
[n for n in distances if n != "SCADA"],
key=lambda n: distances[n]
)
print(f"\nSlowest device: {max_latency_node} ({distances[max_latency_node]:.0f} ms)")
# Test: Is Switch_A a critical failure point?
print("\n--- Testing Switch_A failure ---")
edges_without_switch_a = [(s, d, w) for s, d, w in network_edges
if s != "Switch_A" and d != "Switch_A"]
reduced_graph = build_adjacency_list(edges_without_switch_a, directed=False)
distances_reduced, _ = dijkstra(reduced_graph, "SCADA")
unreachable = [n for n in network_nodes
if n != "SCADA" and n != "Switch_A"
and distances_reduced.get(n, float('inf')) == float('inf')]
if unreachable:
print(f"Devices that lose connectivity: {', '.join(unreachable)}")
print("Recommendation: add direct link between SCADA and PLC_line1/PLC_line2")
else:
print("All devices remain connected (network is robust)")
Minimum Spanning Tree: Lowest Cost to Connect the Network
A Minimum Spanning Tree (MST) connects all nodes at the lowest total cost without cycles. Useful for planning cable and pipe layouts.
def kruskal_mst(nodes, edges):
"""Kruskal's algorithm for minimum spanning tree"""
# Sort edges by weight
sorted_edges = sorted(edges, key=lambda e: e[2])
# Union-Find for cycle detection
parent = {node: node for node in nodes}
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(x, y):
px, py = find(x), find(y)
if px == py:
return False
parent[px] = py
return True
mst_edges = []
total_cost = 0
for src, dst, weight in sorted_edges:
if union(src, dst):
mst_edges.append((src, dst, weight))
total_cost += weight
if len(mst_edges) == len(nodes) - 1:
break
return mst_edges, total_cost
mst, cost = kruskal_mst(nodes, edges)
print(f"Minimum Spanning Tree (lowest cable cost):")
print(f"Total cost: {cost}m of cable")
print("\nRequired connections:")
for src, dst, weight in mst:
print(f" {src} --- {dst} ({weight}m)")
Summary and Practical Rules
| Concept | Description | Industrial Application |
|---|---|---|
| Graph | Nodes + edges | Modeling any network |
| Dijkstra | Shortest path | Optimizing transport routes |
| Node degree | Number of connections | Identifying critical hubs |
| Vulnerability analysis | Impact of node failure | Finding failure points |
| MST | Minimum cost connection | Planning cables and pipes |
Practical tip: Before designing any industrial network -- electrical, data, piping -- draw it as a graph first. Analyze weak points, find alternative paths, and calculate the optimal cost. One hour of analysis on paper saves weeks of re-cabling.