structured and object oriented programming
import os
import logging
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, session
from werkzeug.utils import secure_filename
import numpy as np
import tempfile
import shutil
import time
# Setup logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Import local modules
from lidar_processor import LidarProcessor
from object_detector import ObjectDetector
from utils.data_loader import DataLoader
from utils.metrics import calculate_map
from utils.visualization_utils import prepare_visualization_data
# Initialize Flask app
app = Flask(__name__)
app.secret_key = os.environ.get("SESSION_SECRET", "lidar-detection-secret")
# Configure upload folder
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads')
ALLOWED_EXTENSIONS = {'bin', 'pcd'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB max upload
# Create upload directory if it doesn't exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Initialize components
lidar_processor = LidarProcessor()
object_detector = ObjectDetector()
data_loader = DataLoader()
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['GET', 'POST'])
def upload():
if request.method == 'POST':
try:
# Check if the post request has the file part
if 'file' not in request.files:
flash('No file selected')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
try:
# Generate unique filename to prevent overwrites
filename = secure_filename(f"{int(time.time())}_{file.filename}")
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
# Save the file
file.save(file_path)
# Verify file exists and is readable
if not os.path.exists(file_path):
raise FileNotFoundError("File was not saved successfully")
# Clean up old files from session if they exist
old_file = session.get('current_file')
if old_file and os.path.exists(old_file):
try:
os.remove(old_file)
except Exception as e:
logger.warning(f"Could not remove old file: {str(e)}")
# Store file path in session
session['current_file'] = file_path
logger.info(f"File uploaded successfully: {filename}")
return redirect(url_for('visualization'))
except Exception as e:
logger.error(f"Error saving file: {str(e)}")
flash(f"Error saving file: {str(e)}")
return redirect(request.url)
else:
flash('Allowed file types are .bin and .pcd')
return redirect(request.url)
except Exception as e:
logger.error(f"Error during file upload: {str(e)}")
flash(f"Error during file upload: {str(e)}")
return redirect(request.url)
return render_template('upload.html')
@app.route('/visualization')
def visualization():
if 'current_file' not in session:
flash('No file uploaded yet')
return redirect(url_for('upload'))
file_path = session.get('current_file')
if not file_path or not os.path.exists(file_path):
flash('File not found. Please upload again.')
return redirect(url_for('upload'))
try:
# Verify file is readable
if not os.access(file_path, os.R_OK):
raise PermissionError("File is not readable")
# Process point cloud and detect objects
point_cloud = lidar_processor.load_point_cloud(file_path)
# Use PointPillars by default (can be changed based on user preference)
model_type = session.get('model_type', 'PointPillars')
# Process with selected model
detections = object_detector.detect(point_cloud, model_type)
# Prepare data for visualization
visualization_data = prepare_visualization_data(point_cloud, detections)
return render_template('visualization.html',
visualization_data=visualization_data,
model_type=model_type)
except Exception as e:
logger.error(f"Error during visualization: {str(e)}")
flash(f"Error during visualization: {str(e)}")
return redirect(url_for('upload'))
@app.route('/select_model', methods=['POST'])
def select_model():
model_type = request.form.get('model_type')
if model_type in ['PointPillars', 'VoxelNet']:
session['model_type'] = model_type
flash(f'Model changed to {model_type}')
else:
flash('Invalid model selection')
return redirect(url_for('visualization') if 'current_file' in session else url_for('upload'))
@app.route('/training')
def training():
return render_template('training.html')
@app.route('/kittidataset')
def kittidataset():
return render_template('kittidataset.html')
@app.route('/start_training', methods=['POST'])
def start_training():
dataset_type = request.form.get('dataset_type')
model_type = request.form.get('model_type')
epochs = int(request.form.get('epochs', 10))
batch_size = int(request.form.get('batch_size', 4))
# In a real application, this would start a background training process
# For now, just simulate training
result = {
'status': 'success',
'message': f'Started training {model_type} on {dataset_type} dataset with {epochs} epochs',
'dataset': dataset_type,
'model': model_type,
'epochs': epochs,
'batch_size': batch_size
}
return jsonify(result)
@app.route('/evaluation')
def evaluation():
# In a real application, this would load actual metrics
# For demonstration, use placeholder metrics
metrics = {
'PointPillars': {
'mAP': 0.78,
'mAP_car': 0.86,
'mAP_pedestrian': 0.65,
'mAP_cyclist': 0.72,
'inference_time': 42 # ms
},
'VoxelNet': {
'mAP': 0.82,
'mAP_car': 0.89,
'mAP_pedestrian': 0.72,
'mAP_cyclist': 0.77,
'inference_time': 65 # ms
}
}
return render_template('evaluation.html', metrics=metrics)
@app.route('/get_point_cloud_data')
def get_point_cloud_data():
if 'current_file' not in session:
return jsonify({'error': 'No file uploaded'}), 400
file_path = session['current_file']
try:
# Process point cloud and detect objects
point_cloud = lidar_processor.load_point_cloud(file_path)
# Use model from session or default to PointPillars
model_type = session.get('model_type', 'PointPillars')
# Get detections
detections = object_detector.detect(point_cloud, model_type)
# Prepare visualization data
visualization_data = prepare_visualization_data(point_cloud, detections)
return jsonify(visualization_data)
except Exception as e:
logger.error(f"Error getting point cloud data: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/toggle_mode', methods=['POST'])
def toggle_mode():
current_mode = session.get('mode', 'light')
new_mode = 'dark' if current_mode == 'light' else 'light'
session['mode'] = new_mode
return jsonify({'mode': new_mode})
@app.context_processor
def inject_mode():
return {'mode': session.get('mode', 'light')}
# Cleanup temporary files on application shutdown
@app.teardown_appcontext
def cleanup_temp_files(exception):
try:
if os.path.exists(UPLOAD_FOLDER):
shutil.rmtree(UPLOAD_FOLDER)
except Exception as e:
logger.error(f"Error cleaning up temporary files: {str(e)}")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)
No comments:
Post a Comment