Anomaly Detection in Industrial Data
Anomaly Detection: How Does the System Know Something Is Wrong?
Imagine listening to the sound of a motor every day — after a while, you would instantly recognize any abnormal change in the sound even if you cannot describe it in words. Your brain learned the "normal pattern" and any deviation triggers your attention.
Anomaly Detection is the same concept but expressed mathematically — teaching a computer what "normal" looks like and then alerting us when something unexpected occurs. In factories, this means detecting equipment faults before they cause costly downtime.
Classical Statistical Methods
The simplest and oldest anomaly detection techniques rely on statistics — no AI or neural networks required.
Z-Score Method
Measures how far a reading deviates from the mean relative to the standard deviation:
import numpy as np
def z_score_detection(data, threshold=3.0):
"""
Anomaly detection using Z-Score.
Rule: any reading more than 3 standard deviations from the mean = anomaly
"""
mean = np.mean(data)
std = np.std(data)
if std == 0:
return [] # No variance in data
anomalies = []
for i, value in enumerate(data):
z = abs(value - mean) / std
if z > threshold:
anomalies.append({
"index": i,
"value": value,
"z_score": z,
"direction": "high" if value > mean else "low"
})
return anomalies
# Motor vibration data (mm/s RMS)
vibration_data = [2.1, 2.3, 2.0, 2.2, 2.1, 2.4, 2.0, 8.5, 2.2, 2.1, 2.3, 9.1]
anomalies = z_score_detection(vibration_data)
for a in anomalies:
print(f"Anomaly at index {a['index']}: value={a['value']} (Z={a['z_score']:.1f}, {a['direction']})")
When to Use Z-Score:
- Data follows a normal (Gaussian) distribution
- You need quick and simple detection
- Sensors measure a stable physical quantity (steady temperature, steady pressure)
Interquartile Range (IQR) Method
More resistant to the outliers themselves — it is not skewed by them the way the mean is:
def iqr_detection(data, factor=1.5):
"""
Anomaly detection using Interquartile Range.
Less affected by extreme values compared to Z-Score.
"""
q1 = np.percentile(data, 25)
q3 = np.percentile(data, 75)
iqr = q3 - q1
lower_fence = q1 - factor * iqr
upper_fence = q3 + factor * iqr
anomalies = []
for i, value in enumerate(data):
if value < lower_fence or value > upper_fence:
anomalies.append({
"index": i,
"value": value,
"lower_fence": lower_fence,
"upper_fence": upper_fence
})
return anomalies
# Pressure data (bar) with anomalies
pressure = [4.2, 4.3, 4.1, 4.4, 4.0, 4.3, 1.2, 4.2, 4.5, 4.1, 7.8, 4.3]
results = iqr_detection(pressure)
print(f"Anomalies detected: {len(results)}")
| Method | Advantages | Disadvantages | Best Use |
|---|---|---|---|
| Z-Score | Simple, fast | Assumes normal distribution | Stable data, single dimension |
| IQR | Resistant to extremes | Less sensitive | Non-normally distributed data |
| Modified Z-Score (MAD) | Best of both | Slightly slower | General industrial data |
Isolation Forest
Imagine trying to isolate a single point from a group — normal points are surrounded by similar ones and require many steps to isolate. But an anomalous point is already isolated, so one or two steps suffice.
Isolation Forest builds random trees and measures how many steps are needed to isolate each point — fewer steps means higher probability of being anomalous:
from sklearn.ensemble import IsolationForest
import numpy as np
class IndustrialAnomalyDetector:
"""Industrial anomaly detector using Isolation Forest"""
def __init__(self, contamination=0.05):
"""
contamination: expected proportion of anomalies (5% default)
"""
self.model = IsolationForest(
n_estimators=200, # Number of trees
contamination=contamination,
random_state=42,
n_jobs=-1 # Use all processors
)
def fit(self, normal_data):
"""Train on normal operating data"""
self.model.fit(normal_data)
def detect(self, new_data):
"""Detect anomalies in new data"""
# predict: 1 = normal, -1 = anomaly
predictions = self.model.predict(new_data)
scores = self.model.score_samples(new_data)
anomalies = []
for i, (pred, score) in enumerate(zip(predictions, scores)):
if pred == -1:
anomalies.append({
"index": i,
"score": score,
"severity": "critical" if score < -0.7 else "warning"
})
return anomalies
# Example: monitoring a motor with three dimensions
# [temperature, vibration, electrical current]
normal_operation = np.array([
[65, 2.1, 15.2], [67, 2.3, 15.5], [64, 2.0, 15.1],
[66, 2.2, 15.3], [68, 2.4, 15.6], [65, 2.1, 15.0],
[67, 2.3, 15.4], [66, 2.0, 15.2], [64, 2.1, 15.1],
# ... hundreds of normal readings
])
detector = IndustrialAnomalyDetector(contamination=0.05)
detector.fit(normal_operation)
# New data - some anomalous
new_readings = np.array([
[66, 2.2, 15.3], # Normal
[85, 5.8, 22.1], # Anomalous: all values elevated
[65, 2.1, 15.0], # Normal
[67, 8.5, 15.4], # Anomalous: vibration very high
])
anomalies = detector.detect(new_readings)
for a in anomalies:
print(f"Anomaly at reading {a['index']}: severity={a['severity']}")
Why Isolation Forest Is Excellent for Industry:
- Works with multi-dimensional data (temperature + vibration + current together)
- Does not assume any specific distribution shape
- Fast in both training and prediction
- Detects complex anomaly patterns
Autoencoders for Anomaly Detection
Imagine asking someone to memorize an image and then redraw it from memory. If the image is familiar (like a human face), they will draw it accurately. But if you give them a strange image they have never seen, the drawing will be poor.
An Autoencoder works the same way — it learns to compress normal data and reconstruct it. When anomalous data arrives, reconstruction fails, the reconstruction error rises, and an alert is triggered:
import numpy as np
class SimpleAutoencoder:
"""
Simple autoencoder for industrial anomaly detection.
(In practice, built with PyTorch or TensorFlow)
"""
def __init__(self, input_dim, encoding_dim):
self.input_dim = input_dim
self.encoding_dim = encoding_dim
# Real implementation:
# Encoder: input_dim -> 64 -> 32 -> encoding_dim
# Decoder: encoding_dim -> 32 -> 64 -> input_dim
def train(self, normal_data, epochs=100):
"""
Train on normal data only.
Objective: minimize reconstruction error (MSE)
"""
# loss = MSE(input, reconstructed_output)
pass
def detect_anomaly(self, data, threshold=None):
"""
Anomaly detection: if reconstruction error > threshold = anomaly
"""
reconstructed = self.reconstruct(data)
errors = np.mean((data - reconstructed) ** 2, axis=1)
if threshold is None:
threshold = self.calculate_threshold(errors)
results = []
for i, error in enumerate(errors):
results.append({
"index": i,
"reconstruction_error": error,
"is_anomaly": error > threshold,
"anomaly_score": error / threshold # > 1 = anomaly
})
return results
def calculate_threshold(self, training_errors, percentile=95):
"""Calculate threshold from training data"""
return np.percentile(training_errors, percentile)
Autoencoder Architecture for Industrial Data:
Input (10 sensors)
|
v
+-------------------------+
| Encoder (compress) |
| 10 -> 64 -> 32 -> 8 | <- Compressed representation (Latent Space)
+-------------------------+
| Decoder (decompress) |
| 8 -> 32 -> 64 -> 10 | <- Reconstruct input
+-------------------------+
|
v
Reconstruction Error = |input - output|^2
If error > threshold -> Anomaly detected!
Threshold Setting
The hardest decision in anomaly detection: where do we draw the line between normal and anomalous?
A threshold too low = many false alarms (False Positives). A threshold too high = real faults go undetected (False Negatives).
def optimize_threshold(scores, labels, cost_fp=1, cost_fn=10):
"""
Optimize threshold based on economic cost.
cost_fp: cost of a false alarm (unnecessary inspection)
cost_fn: cost of a missed fault (line shutdown)
"""
best_threshold = None
min_cost = float('inf')
for threshold in np.linspace(min(scores), max(scores), 1000):
fp = sum(1 for s, l in zip(scores, labels) if s > threshold and l == 0)
fn = sum(1 for s, l in zip(scores, labels) if s <= threshold and l == 1)
total_cost = fp * cost_fp + fn * cost_fn
if total_cost < min_cost:
min_cost = total_cost
best_threshold = threshold
return best_threshold, min_cost
Threshold Strategies:
| Strategy | Description | When to Use |
|---|---|---|
| Fixed | Single unchanging threshold | Simple, stable systems |
| Adaptive | Adjusts with changing conditions | Systems with multiple operating modes |
| Multi-level | Warning at 2 sigma, alarm at 3 sigma | Critical systems needing escalation |
| Cost-based | Minimizes total cost | When you know the cost of each error type |
False Positive Management
In factories, false alarms are a real problem — if the system triggers 50 alerts daily and 48 are false, operators will ignore all alerts including the real ones.
class AlertManager:
"""Intelligent alert management system for industrial use"""
def __init__(self):
self.alert_history = []
def evaluate_alert(self, anomaly_score, sensor_id, context):
"""Evaluate an alert before sending it"""
# 1. Is the anomaly persistent or momentary?
if not self.is_persistent(sensor_id, duration_seconds=30):
return {"action": "ignore", "reason": "Momentary anomaly (< 30 seconds)"}
# 2. Is there confirmation from correlated sensors?
correlated = self.check_correlated_sensors(sensor_id)
if not correlated:
return {"action": "monitor", "reason": "No confirmation from other sensors"}
# 3. Classify severity
severity = self.classify_severity(anomaly_score, context)
# 4. Prevent duplicate alerts (Debouncing)
if self.was_recently_alerted(sensor_id, minutes=15):
return {"action": "merge", "reason": "Duplicate alert within 15 minutes"}
return {
"action": "alert",
"severity": severity,
"correlated_sensors": correlated,
"recommended_action": self.suggest_action(severity, sensor_id)
}
Real-World Industrial Applications
Vibration Monitoring
# Detecting bearing faults from vibration data
vibration_features = {
"rms": 2.1, # Root Mean Square value
"peak": 5.8, # Peak value
"crest_factor": 2.76, # Peak to RMS ratio
"kurtosis": 3.2, # Kurtosis (> 3.5 = potential problem)
"bpfo": 0.15, # Ball Pass Frequency Outer race
"bpfi": 0.08, # Ball Pass Frequency Inner race
}
Temperature Monitoring
# Detecting gradual temperature rise in a power transformer
def detect_thermal_anomaly(temp_history, window=60):
"""Detect abnormal temperature rise"""
recent = temp_history[-window:]
baseline = temp_history[-window*3:-window]
rate_of_change = (recent[-1] - recent[0]) / window # C/minute
baseline_avg = np.mean(baseline)
current_avg = np.mean(recent)
deviation = current_avg - baseline_avg
if rate_of_change > 0.5: # Rapid rise
return "ALARM: Rapid temperature increase"
elif deviation > 10: # Deviation from baseline
return "WARNING: Temperature above normal"
return "Normal"
Pressure Monitoring
| Anomaly Type | Description | Probable Cause |
|---|---|---|
| Sudden spike | Sharp pressure jump | Blockage or valve closure |
| Gradual drop | Slow leak | Pipe or fitting corrosion |
| Oscillation | Unstable pressure | Worn pump or stuck valve |
| Abnormal flatness | No change despite load change | Faulty sensor |
Comprehensive Comparison of Anomaly Detection Methods
| Method | Complexity | Data Required | Multi-dimensional | Real-time | Best Use |
|---|---|---|---|---|---|
| Z-Score | Low | Small | No | Yes | Single sensor, stable data |
| IQR | Low | Small | No | Yes | Data with extremes |
| Isolation Forest | Medium | Medium | Yes | Yes | Complex anomaly patterns |
| Autoencoder | High | Large | Yes | Yes (after training) | Complex multi-sensor systems |
Practical Tips
- Start with simple statistics: Z-Score and IQR solve 70% of problems — do not jump to neural networks immediately
- Collect sufficient "normal" data: At least one month covering all operating patterns
- Classify your alerts: Not every anomaly is a fault — some are intentional operational changes
- Monitor your false positive rate: If it exceeds 20%, the system needs recalibration
- Use multi-level thresholds: Warning then alarm then emergency shutdown
- Document every true alert: This data is gold for improving the model in the future