Automated Defect Detection System Using Classical and Deep Learning Techniques

 

Automated Defect Detection System Using Classical and Deep Learning Techniques

Overview

In industrial inspection systems, detecting surface defects accurately and efficiently is critical for maintaining high product quality. This project presents a complete pipeline to simulate, detect, and analyze defects using both traditional image processing and deep learning models. The core components include:

  1. Synthetic Defect Generation

  2. Traditional Defect Detection via Image Differencing

  3. Deep Learning-based Semantic Segmentation using U-Net

  4. Inference on Unseen Test Data with Trained DL Model


1. Synthetic Defect Generation (defect_gen.py)

This module generates synthetic defect images by applying random ellipses or polygons to a clean master image. The result is a set of artificially defected images that can be used for training and evaluation.

Code

import cv2
import numpy as np
import random
import os
from pathlib import Path

def create_random_defect(height, width, min_area=2500):
    """
    Create a random defect mask with area larger than min_area
    Returns a binary mask with the defect
    """
    # Create an empty mask
    mask = np.zeros((height, width), dtype=np.uint8)
   
    # Generate random parameters for the defect
    center_x = random.randint(0, width-1)
    center_y = random.randint(0, height-1)
   
    # Generate random shape (either ellipse or polygon)
    shape_type = random.choice(['ellipse', 'polygon'])
   
    if shape_type == 'ellipse':
        # Keep generating until we get a defect with sufficient area
        while True:
            axes_length = (
                random.randint(30, 100),  # major axis
                random.randint(30, 100)   # minor axis
            )
            angle = random.randint(0, 360)
           
            # Draw the ellipse
            cv2.ellipse(mask, (center_x, center_y), axes_length,
                       angle, 0, 360, 255, -1)
           
            # Check area
            if cv2.countNonZero(mask) >= min_area:
                break
            mask.fill(0)  # Clear and try again
           
    else:  # polygon
        while True:
            # Generate 3-6 points for the polygon
            num_points = random.randint(3, 6)
            points = []
            for _ in range(num_points):
                point_x = center_x + random.randint(-100, 100)
                point_y = center_y + random.randint(-100, 100)
                points.append([point_x, point_y])
           
            # Draw the polygon
            points = np.array(points, dtype=np.int32)
            cv2.fillPoly(mask, [points], 255)
           
            # Check area
            if cv2.countNonZero(mask) >= min_area:
                break
            mask.fill(0)  # Clear and try again
   
    return mask

def apply_defect(image, mask):
    """Apply the defect mask to the image"""
    # Create a darker region for the defect
    darkening_factor = random.uniform(0.3, 0.7)
    defect_image = image.copy()
    defect_image[mask > 0] = defect_image[mask > 0] * darkening_factor
    return defect_image

def generate_defect_images(master_image_path, output_dir, num_images=2000):
    """Generate multiple defect images from a master image"""
    # Create output directory if it doesn't exist
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
   
    # Read the master image
    master_img = cv2.imread(master_image_path)
    if master_img is None:
        raise ValueError(f"Could not read master image: {master_image_path}")
   
    height, width = master_img.shape[:2]
   
    # Generate images
    for i in range(num_images):
        # Decide number of defects (0-5)
        num_defects = random.randint(0, 5)
       
        # Create a copy of master image
        result_img = master_img.copy()
       
        # Generate and apply defects
        for _ in range(num_defects):
            defect_mask = create_random_defect(height, width)
            result_img = apply_defect(result_img, defect_mask)
       
        # Save the image
        output_path = output_dir / f"defect_{i:04d}_n{num_defects}.jpg"
        cv2.imwrite(str(output_path), result_img)
       
        # Print progress every 100 images
        if (i + 1) % 100 == 0:
            print(f"Generated {i + 1}/{num_images} images")

if __name__ == "__main__":
    master_path = "master.jpg"
    output_dir = "generated_defects"
   
    try:
        generate_defect_images(master_path, output_dir)
        print("Defect image generation completed successfully!")
    except Exception as e:
        print(f"Error occurred: {str(e)}")


2. Traditional Defect Detection (defect_detection.py)

This script performs pixel-wise comparison between the master image and the defected image using grayscale difference and morphological operations to localize and analyze defects.

Key Steps

  • Image differencing

  • Thresholding and morphological cleaning

  • Contour detection and property analysis

Code

import cv2
import numpy as np
from pathlib import Path
import json

class DefectDetector:
    def __init__(self, master_path):
        """Initialize the detector with a master image"""
        self.master_img = cv2.imread(master_path)
        if self.master_img is None:
            raise ValueError(f"Could not read master image: {master_path}")
       
        # Convert master image to grayscale
        self.master_gray = cv2.cvtColor(self.master_img, cv2.COLOR_BGR2GRAY)
   
    def detect_defects(self, defect_image_path, threshold=30):
        """
        Detect defects by comparing with master image
        Args:
            defect_image_path: Path to the defect image
            threshold: Threshold for difference detection (default: 30)
        Returns:
            contours: List of detected contours
            diff_mask: Binary mask showing differences
            annotated_image: Original image with contours drawn
        """
        # Read defect image
        defect_img = cv2.imread(str(defect_image_path))
        if defect_img is None:
            raise ValueError(f"Could not read defect image: {defect_image_path}")
       
        # Convert to grayscale
        defect_gray = cv2.cvtColor(defect_img, cv2.COLOR_BGR2GRAY)
       
        # Calculate absolute difference
        diff = cv2.absdiff(self.master_gray, defect_gray)
       
        # Apply threshold to get binary mask
        _, diff_mask = cv2.threshold(diff, threshold, 255, cv2.THRESH_BINARY)
       
        # Apply some morphological operations to clean up the mask
        kernel = np.ones((3,3), np.uint8)
        diff_mask = cv2.morphologyEx(diff_mask, cv2.MORPH_OPEN, kernel)
        diff_mask = cv2.morphologyEx(diff_mask, cv2.MORPH_CLOSE, kernel)
       
        # Find contours
        contours, _ = cv2.findContours(diff_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
       
        # Draw contours on the original image
        annotated_image = defect_img.copy()
        cv2.drawContours(annotated_image, contours, -1, (0, 255, 0), 2)
       
        return contours, diff_mask, annotated_image
   
    def analyze_defects(self, contours):
        """
        Analyze detected defects and return their properties
        """
        defect_properties = []
       
        for i, contour in enumerate(contours):
            # Calculate basic properties
            area = cv2.contourArea(contour)
            perimeter = cv2.arcLength(contour, True)
            x, y, w, h = cv2.boundingRect(contour)
           
            # Calculate center point
            M = cv2.moments(contour)
            if M["m00"] != 0:
                center_x = int(M["m10"] / M["m00"])
                center_y = int(M["m01"] / M["m00"])
            else:
                center_x = x + w//2
                center_y = y + h//2
           
            defect_properties.append({
                "id": i,
                "area": float(area),
                "perimeter": float(perimeter),
                "width": int(w),
                "height": int(h),
                "center": (int(center_x), int(center_y))
            })
       
        return defect_properties

def process_defect_images(master_path, defects_dir, output_dir, threshold=30):
    """
    Process all defect images in a directory and save results
    """
    # Create output directory
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
   
    # Initialize detector
    detector = DefectDetector(master_path)
   
    # Process all images in defects directory
    defects_dir = Path(defects_dir)
    results = {}
   
    for img_path in defects_dir.glob("*.jpg"):
        try:
            # Detect defects
            contours, diff_mask, annotated_image = detector.detect_defects(img_path, threshold)
           
            # Analyze defects
            defect_properties = detector.analyze_defects(contours)
           
            # Save results
            base_name = img_path.stem
           
            # Save annotated image
            cv2.imwrite(str(output_dir / f"{base_name}_annotated.jpg"), annotated_image)
           
            # Save difference mask
            cv2.imwrite(str(output_dir / f"{base_name}_mask.jpg"), diff_mask)
           
            # Store analysis results
            results[base_name] = {
                "num_defects": len(contours),
                "defects": defect_properties
            }
           
            print(f"Processed {img_path.name}")
           
        except Exception as e:
            print(f"Error processing {img_path.name}: {str(e)}")
   
    # Save analysis results to JSON
    with open(output_dir / "defect_analysis.json", 'w') as f:
        json.dump(results, f, indent=4)

if __name__ == "__main__":
    master_path = "master.jpg"
    defects_dir = "generated_defects"
    output_dir = "detection_results"
   
    try:
        process_defect_images(master_path, defects_dir, output_dir)
        print("Defect detection completed successfully!")
    except Exception as e:
        print(f"Error occurred: {str(e)}")

3. Deep Learning-Based Defect Detection (defect_detection_dl.py)

We leverage a modified U-Net model to perform pixel-wise binary segmentation on image pairs (master + defect). The input is a 6-channel image formed by stacking the master and defect images.

Highlights

  • Custom U-Net architecture with 6 input channels

  • PyTorch DataLoader support with preprocessing

  • Detailed training loop with checkpoint saving and validation

  • Train/validation split with accuracy tracking

Code

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import cv2
import numpy as np
from pathlib import Path
import json
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torch.nn.functional as F
from tqdm import tqdm
import time

class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class UNet(nn.Module):
    def __init__(self, in_channels=6):
        super().__init__()
       
        # Encoder
        self.enc1 = DoubleConv(in_channels, 64)
        self.enc2 = DoubleConv(64, 128)
        self.enc3 = DoubleConv(128, 256)
        self.enc4 = DoubleConv(256, 512)
       
        # Decoder
        self.dec1 = DoubleConv(512 + 256, 256)
        self.dec2 = DoubleConv(256 + 128, 128)
        self.dec3 = DoubleConv(128 + 64, 64)
       
        # Final layer
        self.final = nn.Conv2d(64, 1, kernel_size=1)
       
        # Pooling
        self.pool = nn.MaxPool2d(2)
       
    def forward(self, x):
        # Encoder
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool(e1))
        e3 = self.enc3(self.pool(e2))
        e4 = self.enc4(self.pool(e3))
       
        # Decoder with dynamic resizing
        # Upsample e4 and concatenate with e3
        d1 = F.interpolate(e4, size=e3.shape[2:], mode='bilinear', align_corners=True)
        d1 = self.dec1(torch.cat([d1, e3], dim=1))
       
        # Upsample d1 and concatenate with e2
        d2 = F.interpolate(d1, size=e2.shape[2:], mode='bilinear', align_corners=True)
        d2 = self.dec2(torch.cat([d2, e2], dim=1))
       
        # Upsample d2 and concatenate with e1
        d3 = F.interpolate(d2, size=e1.shape[2:], mode='bilinear', align_corners=True)
        d3 = self.dec3(torch.cat([d3, e1], dim=1))
       
        # Final layer
        out = torch.sigmoid(self.final(d3))
        return out

class DefectDataset(Dataset):
    def __init__(self, master_path, defect_paths, mask_paths, transform=None, target_size=(512, 512)):
        self.master_path = master_path
        self.defect_paths = defect_paths
        self.mask_paths = mask_paths
        self.transform = transform
        self.target_size = target_size
       
        # Load master image once
        self.master_img = cv2.imread(str(master_path))
        if self.master_img is None:
            raise ValueError(f"Could not read master image: {master_path}")
        self.master_img = cv2.cvtColor(self.master_img, cv2.COLOR_BGR2RGB)
        self.master_img = cv2.resize(self.master_img, target_size)
       
    def __len__(self):
        return len(self.defect_paths)
   
    def __getitem__(self, idx):
        # Load defect image
        defect_img = cv2.imread(str(self.defect_paths[idx]))
        if defect_img is None:
            raise ValueError(f"Could not read defect image: {self.defect_paths[idx]}")
        defect_img = cv2.cvtColor(defect_img, cv2.COLOR_BGR2RGB)
        defect_img = cv2.resize(defect_img, self.target_size)
       
        # Load mask
        mask = cv2.imread(str(self.mask_paths[idx]), cv2.IMREAD_GRAYSCALE)
        if mask is None:
            raise ValueError(f"Could not read mask image: {self.mask_paths[idx]}")
        mask = cv2.resize(mask, self.target_size)
        mask = mask / 255.0  # Normalize to [0, 1]
       
        if self.transform:
            defect_img = self.transform(defect_img)
            self.master_img_t = self.transform(self.master_img)
            mask = torch.from_numpy(mask).float()
       
        # Stack master and defect images along channel dimension
        x = torch.cat([self.master_img_t, defect_img], dim=0)
        return x, mask.unsqueeze(0)

def train_model(model, train_loader, val_loader, device, num_epochs=3, target_accuracy=98.0):
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
   
    train_losses = []
    val_losses = []
    best_val_loss = float('inf')
   
    # Create models directory if it doesn't exist
    models_dir = Path("models")
    models_dir.mkdir(exist_ok=True)
   
    # Calculate total steps for progress tracking
    total_train_steps = len(train_loader)
    total_val_steps = len(val_loader)
   
    print("\nStarting training...")
    print(f"Total training batches per epoch: {total_train_steps}")
    print(f"Total validation batches per epoch: {total_val_steps}")
    print(f"Training on device: {device}")
    print(f"Target validation accuracy: {target_accuracy}%")
    print(f"Models will be saved in: {models_dir}/")
   
    start_time = time.time()
   
    for epoch in range(num_epochs):
        epoch_start_time = time.time()
       
        # Training phase
        model.train()
        train_loss = 0
        correct_pixels = 0
        total_pixels = 0
       
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print("Training phase:")
       
        # Training progress bar
        train_pbar = tqdm(train_loader, total=total_train_steps,
                         desc=f"Training Epoch {epoch+1}/{num_epochs}")
       
        for batch_idx, (inputs, masks) in enumerate(train_pbar):
            inputs, masks = inputs.to(device), masks.to(device)
           
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, masks)
            loss.backward()
            optimizer.step()
           
            # Calculate metrics
            train_loss += loss.item()
            pred = (outputs > 0.5).float()
            correct_pixels += (pred == masks).sum().item()
            total_pixels += masks.numel()
           
            # Update progress bar
            current_loss = train_loss / (batch_idx + 1)
            current_acc = 100. * correct_pixels / total_pixels
            train_pbar.set_postfix({
                'loss': f'{current_loss:.4f}',
                'acc': f'{current_acc:.2f}%'
            })
       
        train_pbar.close()
        avg_train_loss = train_loss / total_train_steps
        train_accuracy = 100. * correct_pixels / total_pixels
        train_losses.append(avg_train_loss)
       
        # Validation phase
        model.eval()
        val_loss = 0
        correct_pixels = 0
        total_pixels = 0
       
        print("Validation phase:")
        val_pbar = tqdm(val_loader, total=total_val_steps,
                       desc=f"Validation Epoch {epoch+1}/{num_epochs}")
       
        with torch.no_grad():
            for batch_idx, (inputs, masks) in enumerate(val_pbar):
                inputs, masks = inputs.to(device), masks.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, masks)
               
                # Calculate metrics
                val_loss += loss.item()
                pred = (outputs > 0.5).float()
                correct_pixels += (pred == masks).sum().item()
                total_pixels += masks.numel()
               
                # Update progress bar
                current_loss = val_loss / (batch_idx + 1)
                current_acc = 100. * correct_pixels / total_pixels
                val_pbar.set_postfix({
                    'loss': f'{current_loss:.4f}',
                    'acc': f'{current_acc:.2f}%'
                })
       
        val_pbar.close()
        avg_val_loss = val_loss / total_val_steps
        val_accuracy = 100. * correct_pixels / total_pixels
        val_losses.append(avg_val_loss)
       
        # Save model for this epoch
        epoch_model_path = models_dir / f"model_epoch_{epoch+1:03d}_acc_{val_accuracy:.2f}.pth"
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_loss': avg_train_loss,
            'val_loss': avg_val_loss,
            'train_accuracy': train_accuracy,
            'val_accuracy': val_accuracy,
            'total_time': time.time() - start_time
        }, epoch_model_path)
        print(f"✓ Saved epoch model to {epoch_model_path}")
       
        # Save best model if this is the best validation loss
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            best_model_path = models_dir / "best_model.pth"
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'train_loss': avg_train_loss,
                'val_loss': avg_val_loss,
                'train_accuracy': train_accuracy,
                'val_accuracy': val_accuracy,
                'total_time': time.time() - start_time
            }, best_model_path)
            print("✓ Saved new best model!")
       
        # Calculate epoch time
        epoch_time = time.time() - epoch_start_time
        total_time = time.time() - start_time
       
        # Print epoch summary
        print("\nEpoch Summary:")
        print(f"Training Loss: {avg_train_loss:.4f} | Training Accuracy: {train_accuracy:.2f}%")
        print(f"Validation Loss: {avg_val_loss:.4f} | Validation Accuracy: {val_accuracy:.2f}%")
        print(f"Epoch Time: {epoch_time:.1f}s | Total Time: {total_time:.1f}s")
        print("-" * 80)
       
        # Check if validation accuracy exceeds target
        if val_accuracy >= target_accuracy:
            print(f"\n🎯 Target validation accuracy of {target_accuracy}% reached!")
            print(f"Training stopped at epoch {epoch+1}")
            print(f"Final validation accuracy: {val_accuracy:.2f}%")
            print(f"Total training time: {total_time:.1f}s")
            break
   
    # Save final model
    final_model_path = models_dir / "final_model.pth"
    torch.save({
        'epoch': epoch + 1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'train_loss': train_losses[-1],
        'val_loss': val_losses[-1],
        'train_accuracy': train_accuracy,
        'val_accuracy': val_accuracy,
        'total_time': time.time() - start_time
    }, final_model_path)
    print(f"\n✓ Saved final model to {final_model_path}")
   
    # Print final training summary
    print("\nTraining completed!")
    print(f"Best validation loss: {best_val_loss:.4f}")
    print(f"Total training time: {total_time:.1f}s")
    print(f"Models saved in: {models_dir}/")
   
    return train_losses, val_losses

def prepare_data(master_path, defects_dir, detection_results_dir):
    defects_dir = Path(defects_dir)
    detection_results_dir = Path(detection_results_dir)
   
    # Get all defect images and their corresponding masks
    defect_paths = list(defects_dir.glob('*.jpg'))
    mask_paths = [detection_results_dir / f"{p.stem}_mask.jpg" for p in defect_paths]
   
    # Filter out pairs where both files exist
    valid_pairs = [(d, m) for d, m in zip(defect_paths, mask_paths) if d.exists() and m.exists()]
    if not valid_pairs:
        raise ValueError("No valid image-mask pairs found!")
   
    defect_paths, mask_paths = zip(*valid_pairs)
    print(f"Found {len(defect_paths)} valid image-mask pairs")
   
    # Split into train and validation sets
    train_defects, val_defects, train_masks, val_masks = train_test_split(
        defect_paths, mask_paths, test_size=0.2, random_state=42
    )
   
    return train_defects, val_defects, train_masks, val_masks

def main():
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
   
    # Paths
    master_path = "master.jpg"
    defects_dir = "generated_defects"
    detection_results_dir = "detection_results"
    model_save_path = "defect_model.pth"
   
    try:
        # Prepare data
        train_defects, val_defects, train_masks, val_masks = prepare_data(
            master_path, defects_dir, detection_results_dir
        )
       
        # Define transforms
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
       
        # Create datasets with fixed size
        target_size = (512, 512)  # Use power of 2 for better feature map alignment
        train_dataset = DefectDataset(master_path, train_defects, train_masks, transform, target_size)
        val_dataset = DefectDataset(master_path, val_defects, val_masks, transform, target_size)
       
        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)
       
        # Initialize model
        model = UNet(in_channels=6).to(device)
       
        # Train model with target accuracy
        train_losses, val_losses = train_model(
            model,
            train_loader,
            val_loader,
            device,
            target_accuracy=98.0  # Set target accuracy to 98%
        )
       
        # Save final model
        torch.save(model.state_dict(), model_save_path)
        print(f"Model saved to {model_save_path}")
       
        # Plot training history
        plt.figure(figsize=(10, 5))
        plt.plot(train_losses, label='Training Loss')
        plt.plot(val_losses, label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Training History')
        plt.legend()
        plt.savefig('training_history.png')
        plt.close()
       
    except Exception as e:
        print(f"Error occurred: {str(e)}")
        raise

if __name__ == "__main__":
    main()

4. Inference Module (defect_inference.py)

This module loads the trained U-Net model and performs inference on new defect images. It outputs binary masks, annotated images, and JSON-formatted analysis reports.

Features

  • Automatic resizing and transformation

  • Thresholding predictions for binary masks

  • Contour-based defect analysis

  • Batch processing of test images

Code

import torch
import torch.nn as nn
import torchvision.transforms as transforms
import cv2
import numpy as np
from pathlib import Path
from defect_detection_dl import UNet
import json

class DefectPredictor:
    def __init__(self, model_path, master_path, device=None, target_size=(512, 512)):
        if device is None:
            self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        else:
            self.device = device
           
        self.target_size = target_size
           
        # Load master image
        self.master_img = cv2.imread(str(master_path))
        if self.master_img is None:
            raise ValueError(f"Could not read master image: {master_path}")
        self.master_img = cv2.cvtColor(self.master_img, cv2.COLOR_BGR2RGB)
        self.original_size = self.master_img.shape[:2][::-1]  # (width, height)
        self.master_img = cv2.resize(self.master_img, target_size)
       
        # Load model
        self.model = UNet(in_channels=6).to(self.device)
       
        # Load checkpoint
        checkpoint = torch.load(model_path, map_location=self.device)
        if 'model_state_dict' in checkpoint:
            # New checkpoint format
            self.model.load_state_dict(checkpoint['model_state_dict'])
            self.epoch = checkpoint['epoch']
            self.metrics = {
                'train_loss': checkpoint['train_loss'],
                'val_loss': checkpoint['val_loss'],
                'train_accuracy': checkpoint['train_accuracy'],
                'val_accuracy': checkpoint['val_accuracy']
            }
            print(f"Loaded model from epoch {self.epoch} with validation accuracy: {self.metrics['val_accuracy']:.2f}%")
        else:
            # Old format (direct state dict)
            self.model.load_state_dict(checkpoint)
            print("Loaded model (old format)")
           
        self.model.eval()
       
        # Define transform
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
       
        # Transform master image once
        self.master_tensor = self.transform(self.master_img).to(self.device)
   
    def predict(self, image_path, threshold=0.5):
        """
        Predict defects in the given image
        Returns:
            mask: Binary mask of predicted defects
            annotated_image: Original image with predicted defects outlined
            contours: List of detected contours
        """
        # Read and preprocess image
        img = cv2.imread(str(image_path))
        if img is None:
            raise ValueError(f"Could not read image: {image_path}")
        original_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
       
        # Resize for model
        img_resized = cv2.resize(original_img, self.target_size)
        img_tensor = self.transform(img_resized).to(self.device)
       
        # Stack master and input images
        x = torch.cat([self.master_tensor, img_tensor], dim=0).unsqueeze(0)
       
        # Predict
        with torch.no_grad():
            pred = self.model(x)
            pred = pred.squeeze().cpu().numpy()
       
        # Convert to binary mask
        mask = (pred > threshold).astype(np.uint8) * 255
       
        # Resize mask back to original size
        mask = cv2.resize(mask, self.original_size)
       
        # Find contours
        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
       
        # Draw contours on original image
        annotated_image = cv2.cvtColor(original_img, cv2.COLOR_RGB2BGR)
        cv2.drawContours(annotated_image, contours, -1, (0, 255, 0), 2)
       
        return mask, annotated_image, contours
   
    def analyze_defects(self, contours):
        """Analyze properties of detected defects"""
        defect_properties = []
       
        for i, contour in enumerate(contours):
            # Calculate basic properties
            area = cv2.contourArea(contour)
            perimeter = cv2.arcLength(contour, True)
            x, y, w, h = cv2.boundingRect(contour)
           
            # Calculate center point
            M = cv2.moments(contour)
            if M["m00"] != 0:
                center_x = int(M["m10"] / M["m00"])
                center_y = int(M["m01"] / M["m00"])
            else:
                center_x = x + w//2
                center_y = y + h//2
           
            defect_properties.append({
                "id": i,
                "area": float(area),
                "perimeter": float(perimeter),
                "width": int(w),
                "height": int(h),
                "center": (int(center_x), int(center_y))
            })
       
        return defect_properties

def main():
    # Paths
    model_path = "models/best_model.pth"  # Updated path to use models directory
    master_path = "master.jpg"
    test_dir = "test_defects"  # Directory containing test images
    output_dir = "dl_detection_results"
   
    try:
        # Create output directory
        output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
       
        # Initialize predictor
        print("Initializing model...")
        target_size = (512, 512)  # Same as training
        predictor = DefectPredictor(model_path, master_path, target_size=target_size)
        print(f"Using device: {predictor.device}")
       
        # Process test images
        test_dir = Path(test_dir)
        if not test_dir.exists():
            raise ValueError(f"Test directory not found: {test_dir}")
       
        test_images = list(test_dir.glob("*.jpg"))
        if not test_images:
            raise ValueError(f"No jpg images found in {test_dir}")
       
        print(f"Found {len(test_images)} images to process")
        for img_path in test_images:
            try:
                print(f"Processing {img_path.name}...")
                # Predict defects
                mask, annotated_image, contours = predictor.predict(img_path)
               
                # Analyze defects
                defect_properties = predictor.analyze_defects(contours)
               
                # Save results
                base_name = img_path.stem
                cv2.imwrite(str(output_dir / f"{base_name}_pred_mask.jpg"), mask)
                cv2.imwrite(str(output_dir / f"{base_name}_pred_annotated.jpg"), annotated_image)
               
                # Save analysis results
                with open(output_dir / f"{base_name}_analysis.json", 'w') as f:
                    json.dump({
                        "num_defects": len(contours),
                        "defects": defect_properties
                    }, f, indent=4)
               
                print(f"Found {len(contours)} defects")
               
            except Exception as e:
                print(f"Error processing {img_path.name}: {str(e)}")
       
        print("Processing completed!")
       
    except Exception as e:
        print(f"Error occurred: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Conclusion

This pipeline effectively demonstrates how synthetic data can be leveraged to train robust deep learning models for defect detection. By combining classical computer vision with modern DL architectures, we achieve high accuracy and practical deployment readiness.

This project is ideal for:

  • Researchers experimenting with defect segmentation tasks

  • Engineers developing industrial inspection software

  • Educators looking to demonstrate AI in manufacturing scenarios

For any questions or contributions, feel free to reach out or fork the repository!







Comments

Popular posts from this blog

🍓 Fun Fruit Math Game for Kids – Learn Multiplication & Division with Smiles!

Visualize Permutations and Combinations with Fruits!

🏞️ River Distance Explorer – Learn Trigonometry Through a Fun Interactive Game