Home Wiki Engineering Mathematics Graph Theory and Applications in Industrial Networks
Engineering Mathematics

Graph Theory and Applications in Industrial Networks

What Is Graph Theory and Why Does It Matter in Industry?

Look at any factory from above: machines connected by conveyors, pipes linking pumps to tanks, network cables connecting PLCs to each other. All of these systems can be represented mathematically as a graph -- a set of nodes and edges connecting them.

Graph theory is the branch of mathematics that studies these networks. It helps answer questions like:

  • What is the shortest path to move a product from the warehouse to the shipping dock?
  • If this electrical switch fails, which departments lose power?
  • How do we distribute tasks across machines to minimize production time?

Fundamental Concepts: Nodes and Edges

A graph G = (V, E) consists of:

  • V = a set of vertices (nodes) -- representing entities (machines, stations, devices)
  • E = a set of edges -- representing connections between nodes
# Simple graph representing a factory network
# Nodes: production stations
nodes = ["raw_warehouse", "cutting", "welding", "painting", "assembly", "inspection", "shipping"]

# Edges: transport lines between stations (with distance in meters)
edges = [
    ("raw_warehouse", "cutting", 30),
    ("raw_warehouse", "welding", 50),
    ("cutting", "welding", 15),
    ("cutting", "painting", 40),
    ("welding", "assembly", 20),
    ("painting", "assembly", 25),
    ("assembly", "inspection", 10),
    ("inspection", "shipping", 35),
    ("assembly", "shipping", 45),
]

print(f"Number of nodes: {len(nodes)}")
print(f"Number of edges: {len(edges)}")
print("\nConnections:")
for src, dst, weight in edges:
    print(f"  {src} --({weight}m)--> {dst}")

Types of Graphs: Directed and Undirected

Type Description Industrial Example
Undirected Edges work both ways Factory Ethernet network
Directed Edges have one direction Production line (material flows one way)
Weighted Each edge has a value (weight) Distance, cost, transfer time
Unweighted All edges are equal Connection exists or not
# Represent graph as an adjacency list
def build_adjacency_list(edges, directed=False):
    graph = {}
    for src, dst, weight in edges:
        if src not in graph:
            graph[src] = []
        graph[src].append((dst, weight))
        if not directed:
            if dst not in graph:
                graph[dst] = []
            graph[dst].append((src, weight))
    return graph

# Undirected graph (factory network)
graph_undirected = build_adjacency_list(edges, directed=False)

# Directed graph (production line)
graph_directed = build_adjacency_list(edges, directed=True)

print("Neighbors of 'cutting' (undirected):")
for neighbor, weight in graph_undirected.get("cutting", []):
    print(f"  -> {neighbor} ({weight}m)")

print("\nNeighbors of 'cutting' (directed):")
for neighbor, weight in graph_directed.get("cutting", []):
    print(f"  -> {neighbor} ({weight}m)")

Graph Representation: Adjacency Matrix vs. Adjacency List

There are two main ways to store a graph in memory:

Adjacency Matrix: An n x n matrix where M[i][j] = weight if an edge exists:

import numpy as np

# Index the nodes
node_index = {name: i for i, name in enumerate(nodes)}
n = len(nodes)

# Build adjacency matrix
adj_matrix = np.zeros((n, n))
for src, dst, weight in edges:
    i, j = node_index[src], node_index[dst]
    adj_matrix[i][j] = weight
    adj_matrix[j][i] = weight  # undirected

print("Adjacency matrix (row = source, column = target):")
print(f"{'':>15}", end="")
for name in nodes:
    print(f"{name[:5]:>7}", end="")
print()
for i, name in enumerate(nodes):
    print(f"{name:>15}", end="")
    for j in range(n):
        val = int(adj_matrix[i][j])
        print(f"{val:>7}", end="")
    print()
Representation Memory Edge Lookup Best For
Adjacency matrix O(V^2) O(1) Dense graphs
Adjacency list O(V+E) O(degree) Sparse graphs (most cases)

Dijkstra's Algorithm: Shortest Path

Dijkstra's algorithm finds the shortest path from a source node to all other nodes in a weighted graph with non-negative weights.

Idea: Start from the source; at each step, pick the nearest unvisited node and update distances through it.

import heapq

def dijkstra(graph, start):
    """Dijkstra's shortest path algorithm"""
    distances = {node: float('inf') for node in graph}
    distances[start] = 0
    previous = {node: None for node in graph}
    visited = set()

    # Priority queue (min-heap)
    pq = [(0, start)]

    while pq:
        current_dist, current_node = heapq.heappop(pq)

        if current_node in visited:
            continue
        visited.add(current_node)

        for neighbor, weight in graph.get(current_node, []):
            distance = current_dist + weight
            if distance < distances.get(neighbor, float('inf')):
                distances[neighbor] = distance
                previous[neighbor] = current_node
                heapq.heappush(pq, (distance, neighbor))

    return distances, previous

def reconstruct_path(previous, start, end):
    """Reconstruct the path from start to end"""
    path = []
    current = end
    while current is not None:
        path.append(current)
        current = previous.get(current)
    path.reverse()
    if path[0] == start:
        return path
    return []  # no path

# Apply to the factory network
distances, previous = dijkstra(graph_undirected, "raw_warehouse")

print("Shortest distances from the warehouse:")
for node in sorted(distances, key=distances.get):
    path = reconstruct_path(previous, "raw_warehouse", node)
    print(f"  To {node}: {distances[node]:.0f}m via {' -> '.join(path)}")

Time complexity: O((V + E) log V) with a priority queue -- very fast even for large networks.

Node Degree and Centrality: Who Is Most Important?

Node degree is the number of edges connected to it. Nodes with high degree are network hubs -- their failure causes the most damage.

def calculate_degree(graph):
    """Calculate the degree of each node"""
    degrees = {}
    for node in graph:
        degrees[node] = len(graph[node])
    return degrees

def calculate_betweenness_simple(graph, nodes):
    """Simplified betweenness centrality:
    how many shortest paths pass through each node
    """
    betweenness = {node: 0 for node in nodes}

    for start in nodes:
        distances, previous = dijkstra(graph, start)
        for end in nodes:
            if start != end and distances.get(end, float('inf')) < float('inf'):
                path = reconstruct_path(previous, start, end)
                for node in path[1:-1]:  # exclude source and target
                    betweenness[node] += 1

    return betweenness

degrees = calculate_degree(graph_undirected)
betweenness = calculate_betweenness_simple(graph_undirected, nodes)

print("Node importance analysis:")
print(f"{'Station':<15} {'Degree':>8} {'Betweenness':>12}")
print("-" * 37)
for node in sorted(nodes, key=lambda n: betweenness.get(n, 0), reverse=True):
    print(f"{node:<15} {degrees.get(node, 0):>8} {betweenness.get(node, 0):>12}")

# Identify bottleneck
max_betweenness_node = max(betweenness, key=betweenness.get)
print(f"\nBottleneck: {max_betweenness_node} (betweenness: {betweenness[max_betweenness_node]})")
print("Failure of this station disrupts the most paths!")

Vulnerability Analysis: What If a Node Fails?

In a factory, vulnerability analysis reveals single points of failure:

def analyze_vulnerability(graph, nodes, edges):
    """Analyze the impact of removing each node"""
    results = []

    for removed_node in nodes:
        # Build graph without the removed node
        reduced_edges = [(s, d, w) for s, d, w in edges
                        if s != removed_node and d != removed_node]
        reduced_nodes = [n for n in nodes if n != removed_node]

        if not reduced_edges:
            results.append((removed_node, len(nodes) - 1, True))
            continue

        reduced_graph = build_adjacency_list(reduced_edges, directed=False)

        # Check connectivity: are all remaining nodes still connected?
        visited = set()
        start = reduced_nodes[0]
        queue = [start]
        visited.add(start)

        while queue:
            current = queue.pop(0)
            for neighbor, _ in reduced_graph.get(current, []):
                if neighbor not in visited and neighbor in reduced_nodes:
                    visited.add(neighbor)
                    queue.append(neighbor)

        disconnected = len(reduced_nodes) - len(visited)
        is_critical = disconnected > 0

        results.append((removed_node, disconnected, is_critical))

    print("Vulnerability analysis -- impact of each station failure:")
    print(f"{'Station':<15} {'Disconnected':>13} {'Critical?':>10}")
    print("-" * 40)
    for node, disc, critical in sorted(results, key=lambda x: x[1], reverse=True):
        status = "YES!" if critical else "No"
        print(f"{node:<15} {disc:>13} {status:>10}")

    critical_nodes = [r[0] for r in results if r[2]]
    if critical_nodes:
        print(f"\nCritical failure points: {', '.join(critical_nodes)}")
        print("Add redundant paths for these stations!")
    else:
        print("\nNetwork is robust -- no single points of failure")

analyze_vulnerability(graph_undirected, nodes, edges)

Industrial Application: Optimizing a Factory Network

Problem: A factory wants to optimize its industrial Ethernet network between controllers. Goal: minimize maximum latency while ensuring robustness.

import heapq

# Industrial Ethernet network
network_nodes = ["PLC_main", "PLC_line1", "PLC_line2", "PLC_line3",
                 "SCADA", "HMI_1", "HMI_2", "Switch_A", "Switch_B"]

network_edges = [
    ("PLC_main", "Switch_A", 2),    # ms (latency)
    ("PLC_main", "Switch_B", 3),
    ("Switch_A", "PLC_line1", 1),
    ("Switch_A", "PLC_line2", 1),
    ("Switch_A", "SCADA", 2),
    ("Switch_B", "PLC_line3", 1),
    ("Switch_B", "HMI_1", 2),
    ("Switch_B", "HMI_2", 2),
    ("Switch_A", "Switch_B", 1),     # inter-switch link
    ("SCADA", "HMI_1", 3),
]

network_graph = build_adjacency_list(network_edges, directed=False)

# Analysis: max latency from SCADA to every device
distances, _ = dijkstra(network_graph, "SCADA")

print("Latency from SCADA to each device:")
print(f"{'Device':<15} {'Latency (ms)':>13}")
print("-" * 30)
for node in sorted(distances, key=distances.get):
    if node != "SCADA":
        print(f"{node:<15} {distances[node]:>13.0f}")

max_latency_node = max(
    [n for n in distances if n != "SCADA"],
    key=lambda n: distances[n]
)
print(f"\nSlowest device: {max_latency_node} ({distances[max_latency_node]:.0f} ms)")

# Test: Is Switch_A a critical failure point?
print("\n--- Testing Switch_A failure ---")
edges_without_switch_a = [(s, d, w) for s, d, w in network_edges
                          if s != "Switch_A" and d != "Switch_A"]
reduced_graph = build_adjacency_list(edges_without_switch_a, directed=False)

distances_reduced, _ = dijkstra(reduced_graph, "SCADA")
unreachable = [n for n in network_nodes
               if n != "SCADA" and n != "Switch_A"
               and distances_reduced.get(n, float('inf')) == float('inf')]

if unreachable:
    print(f"Devices that lose connectivity: {', '.join(unreachable)}")
    print("Recommendation: add direct link between SCADA and PLC_line1/PLC_line2")
else:
    print("All devices remain connected (network is robust)")

Minimum Spanning Tree: Lowest Cost to Connect the Network

A Minimum Spanning Tree (MST) connects all nodes at the lowest total cost without cycles. Useful for planning cable and pipe layouts.

def kruskal_mst(nodes, edges):
    """Kruskal's algorithm for minimum spanning tree"""
    # Sort edges by weight
    sorted_edges = sorted(edges, key=lambda e: e[2])

    # Union-Find for cycle detection
    parent = {node: node for node in nodes}

    def find(x):
        while parent[x] != x:
            parent[x] = parent[parent[x]]
            x = parent[x]
        return x

    def union(x, y):
        px, py = find(x), find(y)
        if px == py:
            return False
        parent[px] = py
        return True

    mst_edges = []
    total_cost = 0

    for src, dst, weight in sorted_edges:
        if union(src, dst):
            mst_edges.append((src, dst, weight))
            total_cost += weight
            if len(mst_edges) == len(nodes) - 1:
                break

    return mst_edges, total_cost

mst, cost = kruskal_mst(nodes, edges)
print(f"Minimum Spanning Tree (lowest cable cost):")
print(f"Total cost: {cost}m of cable")
print("\nRequired connections:")
for src, dst, weight in mst:
    print(f"  {src} --- {dst} ({weight}m)")

Summary and Practical Rules

Concept Description Industrial Application
Graph Nodes + edges Modeling any network
Dijkstra Shortest path Optimizing transport routes
Node degree Number of connections Identifying critical hubs
Vulnerability analysis Impact of node failure Finding failure points
MST Minimum cost connection Planning cables and pipes

Practical tip: Before designing any industrial network -- electrical, data, piping -- draw it as a graph first. Analyze weak points, find alternative paths, and calculate the optimal cost. One hour of analysis on paper saves weeks of re-cabling.

graph-theory nodes edges shortest-path topology network-analysis نظرية البيان العقد الأضلاع أقصر مسار الطوبولوجيا تحليل الشبكات