Home Wiki AI Fundamentals Anomaly Detection in Industrial Data
AI Fundamentals

Anomaly Detection in Industrial Data

Anomaly Detection: How Does the System Know Something Is Wrong?

Imagine listening to the sound of a motor every day — after a while, you would instantly recognize any abnormal change in the sound even if you cannot describe it in words. Your brain learned the "normal pattern" and any deviation triggers your attention.

Anomaly Detection is the same concept but expressed mathematically — teaching a computer what "normal" looks like and then alerting us when something unexpected occurs. In factories, this means detecting equipment faults before they cause costly downtime.

Classical Statistical Methods

The simplest and oldest anomaly detection techniques rely on statistics — no AI or neural networks required.

Z-Score Method

Measures how far a reading deviates from the mean relative to the standard deviation:

import numpy as np

def z_score_detection(data, threshold=3.0):
    """
    Anomaly detection using Z-Score.
    Rule: any reading more than 3 standard deviations from the mean = anomaly
    """
    mean = np.mean(data)
    std = np.std(data)

    if std == 0:
        return []  # No variance in data

    anomalies = []
    for i, value in enumerate(data):
        z = abs(value - mean) / std
        if z > threshold:
            anomalies.append({
                "index": i,
                "value": value,
                "z_score": z,
                "direction": "high" if value > mean else "low"
            })
    return anomalies

# Motor vibration data (mm/s RMS)
vibration_data = [2.1, 2.3, 2.0, 2.2, 2.1, 2.4, 2.0, 8.5, 2.2, 2.1, 2.3, 9.1]
anomalies = z_score_detection(vibration_data)
for a in anomalies:
    print(f"Anomaly at index {a['index']}: value={a['value']} (Z={a['z_score']:.1f}, {a['direction']})")

When to Use Z-Score:

  • Data follows a normal (Gaussian) distribution
  • You need quick and simple detection
  • Sensors measure a stable physical quantity (steady temperature, steady pressure)

Interquartile Range (IQR) Method

More resistant to the outliers themselves — it is not skewed by them the way the mean is:

def iqr_detection(data, factor=1.5):
    """
    Anomaly detection using Interquartile Range.
    Less affected by extreme values compared to Z-Score.
    """
    q1 = np.percentile(data, 25)
    q3 = np.percentile(data, 75)
    iqr = q3 - q1

    lower_fence = q1 - factor * iqr
    upper_fence = q3 + factor * iqr

    anomalies = []
    for i, value in enumerate(data):
        if value < lower_fence or value > upper_fence:
            anomalies.append({
                "index": i,
                "value": value,
                "lower_fence": lower_fence,
                "upper_fence": upper_fence
            })
    return anomalies

# Pressure data (bar) with anomalies
pressure = [4.2, 4.3, 4.1, 4.4, 4.0, 4.3, 1.2, 4.2, 4.5, 4.1, 7.8, 4.3]
results = iqr_detection(pressure)
print(f"Anomalies detected: {len(results)}")
Method Advantages Disadvantages Best Use
Z-Score Simple, fast Assumes normal distribution Stable data, single dimension
IQR Resistant to extremes Less sensitive Non-normally distributed data
Modified Z-Score (MAD) Best of both Slightly slower General industrial data

Isolation Forest

Imagine trying to isolate a single point from a group — normal points are surrounded by similar ones and require many steps to isolate. But an anomalous point is already isolated, so one or two steps suffice.

Isolation Forest builds random trees and measures how many steps are needed to isolate each point — fewer steps means higher probability of being anomalous:

from sklearn.ensemble import IsolationForest
import numpy as np

class IndustrialAnomalyDetector:
    """Industrial anomaly detector using Isolation Forest"""

    def __init__(self, contamination=0.05):
        """
        contamination: expected proportion of anomalies (5% default)
        """
        self.model = IsolationForest(
            n_estimators=200,        # Number of trees
            contamination=contamination,
            random_state=42,
            n_jobs=-1                # Use all processors
        )

    def fit(self, normal_data):
        """Train on normal operating data"""
        self.model.fit(normal_data)

    def detect(self, new_data):
        """Detect anomalies in new data"""
        # predict: 1 = normal, -1 = anomaly
        predictions = self.model.predict(new_data)
        scores = self.model.score_samples(new_data)

        anomalies = []
        for i, (pred, score) in enumerate(zip(predictions, scores)):
            if pred == -1:
                anomalies.append({
                    "index": i,
                    "score": score,
                    "severity": "critical" if score < -0.7 else "warning"
                })
        return anomalies

# Example: monitoring a motor with three dimensions
# [temperature, vibration, electrical current]
normal_operation = np.array([
    [65, 2.1, 15.2], [67, 2.3, 15.5], [64, 2.0, 15.1],
    [66, 2.2, 15.3], [68, 2.4, 15.6], [65, 2.1, 15.0],
    [67, 2.3, 15.4], [66, 2.0, 15.2], [64, 2.1, 15.1],
    # ... hundreds of normal readings
])

detector = IndustrialAnomalyDetector(contamination=0.05)
detector.fit(normal_operation)

# New data - some anomalous
new_readings = np.array([
    [66, 2.2, 15.3],   # Normal
    [85, 5.8, 22.1],   # Anomalous: all values elevated
    [65, 2.1, 15.0],   # Normal
    [67, 8.5, 15.4],   # Anomalous: vibration very high
])

anomalies = detector.detect(new_readings)
for a in anomalies:
    print(f"Anomaly at reading {a['index']}: severity={a['severity']}")

Why Isolation Forest Is Excellent for Industry:

  • Works with multi-dimensional data (temperature + vibration + current together)
  • Does not assume any specific distribution shape
  • Fast in both training and prediction
  • Detects complex anomaly patterns

Autoencoders for Anomaly Detection

Imagine asking someone to memorize an image and then redraw it from memory. If the image is familiar (like a human face), they will draw it accurately. But if you give them a strange image they have never seen, the drawing will be poor.

An Autoencoder works the same way — it learns to compress normal data and reconstruct it. When anomalous data arrives, reconstruction fails, the reconstruction error rises, and an alert is triggered:

import numpy as np

class SimpleAutoencoder:
    """
    Simple autoencoder for industrial anomaly detection.
    (In practice, built with PyTorch or TensorFlow)
    """

    def __init__(self, input_dim, encoding_dim):
        self.input_dim = input_dim
        self.encoding_dim = encoding_dim
        # Real implementation:
        # Encoder: input_dim -> 64 -> 32 -> encoding_dim
        # Decoder: encoding_dim -> 32 -> 64 -> input_dim

    def train(self, normal_data, epochs=100):
        """
        Train on normal data only.
        Objective: minimize reconstruction error (MSE)
        """
        # loss = MSE(input, reconstructed_output)
        pass

    def detect_anomaly(self, data, threshold=None):
        """
        Anomaly detection: if reconstruction error > threshold = anomaly
        """
        reconstructed = self.reconstruct(data)
        errors = np.mean((data - reconstructed) ** 2, axis=1)

        if threshold is None:
            threshold = self.calculate_threshold(errors)

        results = []
        for i, error in enumerate(errors):
            results.append({
                "index": i,
                "reconstruction_error": error,
                "is_anomaly": error > threshold,
                "anomaly_score": error / threshold  # > 1 = anomaly
            })
        return results

    def calculate_threshold(self, training_errors, percentile=95):
        """Calculate threshold from training data"""
        return np.percentile(training_errors, percentile)

Autoencoder Architecture for Industrial Data:

Input (10 sensors)
    |
    v
+-------------------------+
| Encoder (compress)      |
| 10 -> 64 -> 32 -> 8    |  <- Compressed representation (Latent Space)
+-------------------------+
| Decoder (decompress)    |
| 8 -> 32 -> 64 -> 10    |  <- Reconstruct input
+-------------------------+
    |
    v
Reconstruction Error = |input - output|^2
If error > threshold -> Anomaly detected!

Threshold Setting

The hardest decision in anomaly detection: where do we draw the line between normal and anomalous?

A threshold too low = many false alarms (False Positives). A threshold too high = real faults go undetected (False Negatives).

def optimize_threshold(scores, labels, cost_fp=1, cost_fn=10):
    """
    Optimize threshold based on economic cost.
    cost_fp: cost of a false alarm (unnecessary inspection)
    cost_fn: cost of a missed fault (line shutdown)
    """
    best_threshold = None
    min_cost = float('inf')

    for threshold in np.linspace(min(scores), max(scores), 1000):
        fp = sum(1 for s, l in zip(scores, labels) if s > threshold and l == 0)
        fn = sum(1 for s, l in zip(scores, labels) if s <= threshold and l == 1)
        total_cost = fp * cost_fp + fn * cost_fn

        if total_cost < min_cost:
            min_cost = total_cost
            best_threshold = threshold

    return best_threshold, min_cost

Threshold Strategies:

Strategy Description When to Use
Fixed Single unchanging threshold Simple, stable systems
Adaptive Adjusts with changing conditions Systems with multiple operating modes
Multi-level Warning at 2 sigma, alarm at 3 sigma Critical systems needing escalation
Cost-based Minimizes total cost When you know the cost of each error type

False Positive Management

In factories, false alarms are a real problem — if the system triggers 50 alerts daily and 48 are false, operators will ignore all alerts including the real ones.

class AlertManager:
    """Intelligent alert management system for industrial use"""

    def __init__(self):
        self.alert_history = []

    def evaluate_alert(self, anomaly_score, sensor_id, context):
        """Evaluate an alert before sending it"""

        # 1. Is the anomaly persistent or momentary?
        if not self.is_persistent(sensor_id, duration_seconds=30):
            return {"action": "ignore", "reason": "Momentary anomaly (< 30 seconds)"}

        # 2. Is there confirmation from correlated sensors?
        correlated = self.check_correlated_sensors(sensor_id)
        if not correlated:
            return {"action": "monitor", "reason": "No confirmation from other sensors"}

        # 3. Classify severity
        severity = self.classify_severity(anomaly_score, context)

        # 4. Prevent duplicate alerts (Debouncing)
        if self.was_recently_alerted(sensor_id, minutes=15):
            return {"action": "merge", "reason": "Duplicate alert within 15 minutes"}

        return {
            "action": "alert",
            "severity": severity,
            "correlated_sensors": correlated,
            "recommended_action": self.suggest_action(severity, sensor_id)
        }

Real-World Industrial Applications

Vibration Monitoring

# Detecting bearing faults from vibration data
vibration_features = {
    "rms": 2.1,              # Root Mean Square value
    "peak": 5.8,             # Peak value
    "crest_factor": 2.76,    # Peak to RMS ratio
    "kurtosis": 3.2,         # Kurtosis (> 3.5 = potential problem)
    "bpfo": 0.15,            # Ball Pass Frequency Outer race
    "bpfi": 0.08,            # Ball Pass Frequency Inner race
}

Temperature Monitoring

# Detecting gradual temperature rise in a power transformer
def detect_thermal_anomaly(temp_history, window=60):
    """Detect abnormal temperature rise"""
    recent = temp_history[-window:]
    baseline = temp_history[-window*3:-window]

    rate_of_change = (recent[-1] - recent[0]) / window  # C/minute
    baseline_avg = np.mean(baseline)
    current_avg = np.mean(recent)
    deviation = current_avg - baseline_avg

    if rate_of_change > 0.5:  # Rapid rise
        return "ALARM: Rapid temperature increase"
    elif deviation > 10:       # Deviation from baseline
        return "WARNING: Temperature above normal"
    return "Normal"

Pressure Monitoring

Anomaly Type Description Probable Cause
Sudden spike Sharp pressure jump Blockage or valve closure
Gradual drop Slow leak Pipe or fitting corrosion
Oscillation Unstable pressure Worn pump or stuck valve
Abnormal flatness No change despite load change Faulty sensor

Comprehensive Comparison of Anomaly Detection Methods

Method Complexity Data Required Multi-dimensional Real-time Best Use
Z-Score Low Small No Yes Single sensor, stable data
IQR Low Small No Yes Data with extremes
Isolation Forest Medium Medium Yes Yes Complex anomaly patterns
Autoencoder High Large Yes Yes (after training) Complex multi-sensor systems

Practical Tips

  1. Start with simple statistics: Z-Score and IQR solve 70% of problems — do not jump to neural networks immediately
  2. Collect sufficient "normal" data: At least one month covering all operating patterns
  3. Classify your alerts: Not every anomaly is a fault — some are intentional operational changes
  4. Monitor your false positive rate: If it exceeds 20%, the system needs recalibration
  5. Use multi-level thresholds: Warning then alarm then emergency shutdown
  6. Document every true alert: This data is gold for improving the model in the future
anomaly-detection outlier autoencoder threshold isolation-forest z-score كشف الشذوذ القيم المتطرفة المشفر التلقائي العتبة الغابة العازلة القيمة المعيارية