""" InferencePipeline Usage Examples ================================ This file demonstrates how to use the InferencePipeline for various scenarios: 1. Single stage (equivalent to MultiDongle) 2. Two-stage cascade (detection -> classification) 3. Multi-stage complex pipeline """ import cv2 import numpy as np import time from InferencePipeline import ( InferencePipeline, StageConfig, create_feature_extractor_preprocessor, create_result_aggregator_postprocessor ) from Multidongle import PreProcessor, PostProcessor, WebcamSource, RTSPSource # ============================================================================= # Example 1: Single Stage Pipeline (Basic Usage) # ============================================================================= def example_single_stage(): """Single stage pipeline - equivalent to using MultiDongle directly""" print("=== Single Stage Pipeline Example ===") # Create stage configuration stage_config = StageConfig( stage_id="fire_detection", port_ids=[28, 32], scpu_fw_path="fw_scpu.bin", ncpu_fw_path="fw_ncpu.bin", model_path="fire_detection_520.nef", upload_fw=True, max_queue_size=30 # Note: No inter-stage processors needed for single stage # MultiDongle will handle internal preprocessing/postprocessing ) # Create pipeline with single stage pipeline = InferencePipeline( stage_configs=[stage_config], pipeline_name="SingleStageFireDetection" ) # Initialize and start pipeline.initialize() pipeline.start() # Process some data data_source = WebcamSource(camera_id=0) data_source.start() def handle_result(pipeline_data): result = pipeline_data.stage_results.get("fire_detection", {}) print(f"Fire Detection: {result.get('result', 'Unknown')} " f"(Prob: {result.get('probability', 0.0):.3f})") def handle_error(pipeline_data): print(f"āŒ Error: {pipeline_data.stage_results}") pipeline.set_result_callback(handle_result) pipeline.set_error_callback(handle_error) try: print("šŸš€ Starting single stage pipeline...") for i in range(100): # Process 100 frames frame = data_source.get_frame() if frame is not None: success = pipeline.put_data(frame, timeout=1.0) if not success: print("Pipeline input queue full, dropping frame") time.sleep(0.1) except KeyboardInterrupt: print("\nStopping...") finally: data_source.stop() pipeline.stop() print("Single stage pipeline test completed") # ============================================================================= # Example 2: Two-Stage Cascade Pipeline # ============================================================================= def example_two_stage_cascade(): """Two-stage cascade: Object Detection -> Fire Classification""" print("=== Two-Stage Cascade Pipeline Example ===") # Custom preprocessor for second stage def roi_extraction_preprocess(frame, target_size): """Extract ROI from detection results and prepare for classification""" # This would normally extract bounding box from first stage results # For demo, we'll just do center crop h, w = frame.shape[:2] if len(frame.shape) == 3 else frame.shape center_x, center_y = w // 2, h // 2 crop_size = min(w, h) // 2 x1 = max(0, center_x - crop_size // 2) y1 = max(0, center_y - crop_size // 2) x2 = min(w, center_x + crop_size // 2) y2 = min(h, center_y + crop_size // 2) if len(frame.shape) == 3: cropped = frame[y1:y2, x1:x2] else: cropped = frame[y1:y2, x1:x2] return cv2.resize(cropped, target_size) # Custom postprocessor for combining results def combine_detection_classification(raw_output, **kwargs): """Combine detection and classification results""" if raw_output.size > 0: classification_prob = float(raw_output[0]) # Get detection result from metadata (would be passed from first stage) detection_confidence = kwargs.get('detection_conf', 0.5) # Combined confidence combined_prob = (classification_prob * 0.7) + (detection_confidence * 0.3) return { 'combined_probability': combined_prob, 'classification_prob': classification_prob, 'detection_conf': detection_confidence, 'result': 'Fire Detected' if combined_prob > 0.6 else 'No Fire', 'confidence': 'High' if combined_prob > 0.8 else 'Medium' if combined_prob > 0.5 else 'Low' } return {'combined_probability': 0.0, 'result': 'No Fire', 'confidence': 'Low'} # Set up callbacks def handle_cascade_result(pipeline_data): """Handle results from cascade pipeline""" detection_result = pipeline_data.stage_results.get("object_detection", {}) classification_result = pipeline_data.stage_results.get("fire_classification", {}) print(f"Detection: {detection_result.get('result', 'Unknown')} " f"(Prob: {detection_result.get('probability', 0.0):.3f})") print(f"Classification: {classification_result.get('result', 'Unknown')} " f"(Combined: {classification_result.get('combined_probability', 0.0):.3f})") print(f"Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s") print("-" * 50) def handle_pipeline_stats(stats): """Handle pipeline statistics""" print(f"\nšŸ“Š Pipeline Stats:") print(f" Submitted: {stats['pipeline_input_submitted']}") print(f" Completed: {stats['pipeline_completed']}") print(f" Errors: {stats['pipeline_errors']}") for stage_stat in stats['stage_statistics']: print(f" Stage {stage_stat['stage_id']}: " f"Processed={stage_stat['processed_count']}, " f"AvgTime={stage_stat['avg_processing_time']:.3f}s") # Stage 1: Object Detection stage1_config = StageConfig( stage_id="object_detection", port_ids=[28, 30], # First set of dongles scpu_fw_path="fw_scpu.bin", ncpu_fw_path="fw_ncpu.bin", model_path="object_detection_520.nef", upload_fw=True, max_queue_size=30 ) # Stage 2: Fire Classification stage2_config = StageConfig( stage_id="fire_classification", port_ids=[32, 34], # Second set of dongles scpu_fw_path="fw_scpu.bin", ncpu_fw_path="fw_ncpu.bin", model_path="fire_classification_520.nef", upload_fw=True, max_queue_size=30, # Inter-stage processing input_preprocessor=PreProcessor(resize_fn=roi_extraction_preprocess), output_postprocessor=PostProcessor(process_fn=combine_detection_classification) ) # Create two-stage pipeline pipeline = InferencePipeline( stage_configs=[stage1_config, stage2_config], pipeline_name="TwoStageCascade" ) pipeline.set_result_callback(handle_cascade_result) pipeline.set_stats_callback(handle_pipeline_stats) # Initialize and start pipeline.initialize() pipeline.start() pipeline.start_stats_reporting(interval=10.0) # Stats every 10 seconds # Process data # data_source = RTSPSource("rtsp://your-camera-url") data_source = WebcamSource(0) data_source.start() try: frame_count = 0 while frame_count < 200: frame = data_source.get_frame() if frame is not None: if pipeline.put_data(frame, timeout=1.0): frame_count += 1 else: print("Pipeline input queue full, dropping frame") time.sleep(0.05) except KeyboardInterrupt: print("\nStopping cascade pipeline...") finally: data_source.stop() pipeline.stop() # ============================================================================= # Example 3: Complex Multi-Stage Pipeline # ============================================================================= def example_complex_pipeline(): """Complex multi-stage pipeline with feature extraction and fusion""" print("=== Complex Multi-Stage Pipeline Example ===") # Custom processors for different stages def edge_detection_preprocess(frame, target_size): """Extract edge features""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 50, 150) edges_3ch = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) return cv2.resize(edges_3ch, target_size) def thermal_simulation_preprocess(frame, target_size): """Simulate thermal-like processing""" # Convert to HSV and extract V channel as pseudo-thermal hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) thermal_like = hsv[:, :, 2] # Value channel thermal_3ch = cv2.cvtColor(thermal_like, cv2.COLOR_GRAY2BGR) return cv2.resize(thermal_3ch, target_size) def fusion_postprocess(raw_output, **kwargs): """Fuse results from multiple modalities""" if raw_output.size > 0: current_prob = float(raw_output[0]) # This would get previous stage results from pipeline metadata # For demo, we'll simulate rgb_confidence = kwargs.get('rgb_conf', 0.5) edge_confidence = kwargs.get('edge_conf', 0.5) # Weighted fusion fused_prob = (current_prob * 0.5) + (rgb_confidence * 0.3) + (edge_confidence * 0.2) return { 'fused_probability': fused_prob, 'individual_probs': { 'thermal': current_prob, 'rgb': rgb_confidence, 'edge': edge_confidence }, 'result': 'Fire Detected' if fused_prob > 0.6 else 'No Fire', 'confidence': 'Very High' if fused_prob > 0.9 else 'High' if fused_prob > 0.7 else 'Medium' if fused_prob > 0.5 else 'Low' } return {'fused_probability': 0.0, 'result': 'No Fire', 'confidence': 'Low'} # Stage 1: RGB Analysis rgb_stage = StageConfig( stage_id="rgb_analysis", port_ids=[28, 30], scpu_fw_path="fw_scpu.bin", ncpu_fw_path="fw_ncpu.bin", model_path="rgb_fire_detection_520.nef", upload_fw=True ) # Stage 2: Edge Feature Analysis edge_stage = StageConfig( stage_id="edge_analysis", port_ids=[32, 34], scpu_fw_path="fw_scpu.bin", ncpu_fw_path="fw_ncpu.bin", model_path="edge_fire_detection_520.nef", upload_fw=True, input_preprocessor=PreProcessor(resize_fn=edge_detection_preprocess) ) # Stage 3: Thermal-like Analysis thermal_stage = StageConfig( stage_id="thermal_analysis", port_ids=[36, 38], scpu_fw_path="fw_scpu.bin", ncpu_fw_path="fw_ncpu.bin", model_path="thermal_fire_detection_520.nef", upload_fw=True, input_preprocessor=PreProcessor(resize_fn=thermal_simulation_preprocess) ) # Stage 4: Fusion fusion_stage = StageConfig( stage_id="result_fusion", port_ids=[40, 42], scpu_fw_path="fw_scpu.bin", ncpu_fw_path="fw_ncpu.bin", model_path="fusion_520.nef", upload_fw=True, output_postprocessor=PostProcessor(process_fn=fusion_postprocess) ) # Create complex pipeline pipeline = InferencePipeline( stage_configs=[rgb_stage, edge_stage, thermal_stage, fusion_stage], pipeline_name="ComplexMultiModalPipeline" ) # Advanced result handling def handle_complex_result(pipeline_data): """Handle complex pipeline results""" print(f"\nšŸ”„ Multi-Modal Fire Detection Results:") print(f" Pipeline ID: {pipeline_data.pipeline_id}") for stage_id, result in pipeline_data.stage_results.items(): if 'probability' in result: print(f" {stage_id}: {result.get('result', 'Unknown')} " f"(Prob: {result.get('probability', 0.0):.3f})") # Final fused result if 'result_fusion' in pipeline_data.stage_results: fusion_result = pipeline_data.stage_results['result_fusion'] print(f" šŸŽÆ FINAL: {fusion_result.get('result', 'Unknown')} " f"(Fused: {fusion_result.get('fused_probability', 0.0):.3f})") print(f" Confidence: {fusion_result.get('confidence', 'Unknown')}") print(f" Total Processing Time: {pipeline_data.metadata.get('total_processing_time', 0.0):.3f}s") print("=" * 60) def handle_error(pipeline_data): """Handle pipeline errors""" print(f"āŒ Pipeline Error for {pipeline_data.pipeline_id}") for stage_id, result in pipeline_data.stage_results.items(): if 'error' in result: print(f" Stage {stage_id} error: {result['error']}") pipeline.set_result_callback(handle_complex_result) pipeline.set_error_callback(handle_error) # Initialize and start try: pipeline.initialize() pipeline.start() # Simulate data input data_source = WebcamSource(camera_id=0) data_source.start() print("šŸš€ Complex pipeline started. Processing frames...") frame_count = 0 start_time = time.time() while frame_count < 50: # Process 50 frames for demo frame = data_source.get_frame() if frame is not None: if pipeline.put_data(frame): frame_count += 1 if frame_count % 10 == 0: elapsed = time.time() - start_time fps = frame_count / elapsed print(f"šŸ“ˆ Processed {frame_count} frames, Pipeline FPS: {fps:.2f}") time.sleep(0.1) except Exception as e: print(f"Error in complex pipeline: {e}") finally: data_source.stop() pipeline.stop() # Final statistics final_stats = pipeline.get_pipeline_statistics() print(f"\nšŸ“Š Final Pipeline Statistics:") print(f" Total Input: {final_stats['pipeline_input_submitted']}") print(f" Completed: {final_stats['pipeline_completed']}") print(f" Success Rate: {final_stats['pipeline_completed']/max(final_stats['pipeline_input_submitted'], 1)*100:.1f}%") # ============================================================================= # Main Function - Run Examples # ============================================================================= if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="InferencePipeline Examples") parser.add_argument("--example", choices=["single", "cascade", "complex"], default="single", help="Which example to run") args = parser.parse_args() if args.example == "single": example_single_stage() elif args.example == "cascade": example_two_stage_cascade() elif args.example == "complex": example_complex_pipeline() else: print("Available examples:") print(" python pipeline_example.py --example single") print(" python pipeline_example.py --example cascade") print(" python pipeline_example.py --example complex")