Wednesday, April 2, 2025

INTEGRATED M.TECH SOFTWARE

 structured and object oriented programming 

import os
import logging
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, session
from werkzeug.utils import secure_filename
import numpy as np
import tempfile
import shutil
import time

# Setup logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Import local modules
from lidar_processor import LidarProcessor
from object_detector import ObjectDetector
from utils.data_loader import DataLoader
from utils.metrics import calculate_map
from utils.visualization_utils import prepare_visualization_data

# Initialize Flask app
app = Flask(__name__)
app.secret_key = os.environ.get("SESSION_SECRET", "lidar-detection-secret")

# Configure upload folder
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads')
ALLOWED_EXTENSIONS = {'bin', 'pcd'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024  # 50MB max upload

# Create upload directory if it doesn't exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)

# Initialize components
lidar_processor = LidarProcessor()
object_detector = ObjectDetector()
data_loader = DataLoader()

def allowed_file(filename):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/upload', methods=['GET', 'POST'])
def upload():
    if request.method == 'POST':
        try:
            # Check if the post request has the file part
            if 'file' not in request.files:
                flash('No file selected')
                return redirect(request.url)

            file = request.files['file']
            if file.filename == '':
                flash('No selected file')
                return redirect(request.url)

            if file and allowed_file(file.filename):
                try:
                    # Generate unique filename to prevent overwrites
                    filename = secure_filename(f"{int(time.time())}_{file.filename}")
                    file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)

                    # Save the file
                    file.save(file_path)

                    # Verify file exists and is readable
                    if not os.path.exists(file_path):
                        raise FileNotFoundError("File was not saved successfully")

                    # Clean up old files from session if they exist
                    old_file = session.get('current_file')
                    if old_file and os.path.exists(old_file):
                        try:
                            os.remove(old_file)
                        except Exception as e:
                            logger.warning(f"Could not remove old file: {str(e)}")

                    # Store file path in session
                    session['current_file'] = file_path
                    logger.info(f"File uploaded successfully: {filename}")

                    return redirect(url_for('visualization'))
                except Exception as e:
                    logger.error(f"Error saving file: {str(e)}")
                    flash(f"Error saving file: {str(e)}")
                    return redirect(request.url)
            else:
                flash('Allowed file types are .bin and .pcd')
                return redirect(request.url)
        except Exception as e:
            logger.error(f"Error during file upload: {str(e)}")
            flash(f"Error during file upload: {str(e)}")
            return redirect(request.url)

    return render_template('upload.html')

@app.route('/visualization')
def visualization():
    if 'current_file' not in session:
        flash('No file uploaded yet')
        return redirect(url_for('upload'))

    file_path = session.get('current_file')

    if not file_path or not os.path.exists(file_path):
        flash('File not found. Please upload again.')
        return redirect(url_for('upload'))

    try:
        # Verify file is readable
        if not os.access(file_path, os.R_OK):
            raise PermissionError("File is not readable")

        # Process point cloud and detect objects
        point_cloud = lidar_processor.load_point_cloud(file_path)

        # Use PointPillars by default (can be changed based on user preference)
        model_type = session.get('model_type', 'PointPillars')

        # Process with selected model
        detections = object_detector.detect(point_cloud, model_type)

        # Prepare data for visualization
        visualization_data = prepare_visualization_data(point_cloud, detections)

        return render_template('visualization.html',
                               visualization_data=visualization_data,
                               model_type=model_type)

    except Exception as e:
        logger.error(f"Error during visualization: {str(e)}")
        flash(f"Error during visualization: {str(e)}")
        return redirect(url_for('upload'))

@app.route('/select_model', methods=['POST'])
def select_model():
    model_type = request.form.get('model_type')
    if model_type in ['PointPillars', 'VoxelNet']:
        session['model_type'] = model_type
        flash(f'Model changed to {model_type}')
    else:
        flash('Invalid model selection')

    return redirect(url_for('visualization') if 'current_file' in session else url_for('upload'))

@app.route('/training')
def training():
    return render_template('training.html')

@app.route('/kittidataset')
def kittidataset():
    return render_template('kittidataset.html')

@app.route('/start_training', methods=['POST'])
def start_training():
    dataset_type = request.form.get('dataset_type')
    model_type = request.form.get('model_type')
    epochs = int(request.form.get('epochs', 10))
    batch_size = int(request.form.get('batch_size', 4))

    # In a real application, this would start a background training process
    # For now, just simulate training
    result = {
        'status': 'success',
        'message': f'Started training {model_type} on {dataset_type} dataset with {epochs} epochs',
        'dataset': dataset_type,
        'model': model_type,
        'epochs': epochs,
        'batch_size': batch_size
    }

    return jsonify(result)

@app.route('/evaluation')
def evaluation():
    # In a real application, this would load actual metrics
    # For demonstration, use placeholder metrics
    metrics = {
        'PointPillars': {
            'mAP': 0.78,
            'mAP_car': 0.86,
            'mAP_pedestrian': 0.65,
            'mAP_cyclist': 0.72,
            'inference_time': 42  # ms
        },
        'VoxelNet': {
            'mAP': 0.82,
            'mAP_car': 0.89,
            'mAP_pedestrian': 0.72,
            'mAP_cyclist': 0.77,
            'inference_time': 65  # ms
        }
    }

    return render_template('evaluation.html', metrics=metrics)

@app.route('/get_point_cloud_data')
def get_point_cloud_data():
    if 'current_file' not in session:
        return jsonify({'error': 'No file uploaded'}), 400

    file_path = session['current_file']

    try:
        # Process point cloud and detect objects
        point_cloud = lidar_processor.load_point_cloud(file_path)

        # Use model from session or default to PointPillars
        model_type = session.get('model_type', 'PointPillars')

        # Get detections
        detections = object_detector.detect(point_cloud, model_type)

        # Prepare visualization data
        visualization_data = prepare_visualization_data(point_cloud, detections)

        return jsonify(visualization_data)

    except Exception as e:
        logger.error(f"Error getting point cloud data: {str(e)}")
        return jsonify({'error': str(e)}), 500

@app.route('/toggle_mode', methods=['POST'])
def toggle_mode():
    current_mode = session.get('mode', 'light')
    new_mode = 'dark' if current_mode == 'light' else 'light'
    session['mode'] = new_mode
    return jsonify({'mode': new_mode})

@app.context_processor
def inject_mode():
    return {'mode': session.get('mode', 'light')}

# Cleanup temporary files on application shutdown
@app.teardown_appcontext
def cleanup_temp_files(exception):
    try:
        if os.path.exists(UPLOAD_FOLDER):
            shutil.rmtree(UPLOAD_FOLDER)
    except Exception as e:
        logger.error(f"Error cleaning up temporary files: {str(e)}")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=True)

No comments:

Post a Comment