import os import logging import requests from geopy.geocoders import Nominatim # Valid import from stable_baselines3 import PPO from stable_baselines3.common.vec_env import DummyVecEnv import gymnasium as gym from gymnasium import spaces import numpy as np from PIL import Image import gradio as gr # Environment Variables HOSTED_API_URL = os.getenv("HOSTED_API_URL") # FastAPI backend URL WEATHER_API_KEY = os.getenv("WEATHER_API_KEY") # OpenWeatherMap API key # Logging setup logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # Valid logging format # Validate Environment Variables if not HOSTED_API_URL: logging.error("HOSTED_API_URL environment variable is not set.") raise ValueError("HOSTED_API_URL must be set.") if not WEATHER_API_KEY: logging.error("WEATHER_API_KEY environment variable is not set.") raise ValueError("WEATHER_API_KEY must be set.") # OpenStreetMap Setup geolocator = Nominatim(user_agent="traffic_management_system") # Valid usage of Nominatim # Analyze Traffic using FastAPI API URL def analyze_traffic(image_path): """ Sends the traffic image to the FastAPI backend for analysis. Returns traffic details and the processed image from the backend. """ try: with open(image_path, "rb") as image_file: files = {"file": image_file} response = requests.post(f"{HOSTED_API_URL}/analyze_traffic/", files=files) if response.status_code == 200: result = response.json() vehicle_count = result.get("vehicle_count", 0) congestion_level = result.get("congestion_level", "Unknown") flow_rate = result.get("flow_rate", "Unknown") # Fetch processed image processed_image_url = result.get("processed_image_url", None) processed_image = None if processed_image_url: img_response = requests.get(processed_image_url) if img_response.status_code == 200: processed_image = Image.open(BytesIO(img_response.content)) return vehicle_count, congestion_level, flow_rate, processed_image else: logging.error(f"Error analyzing traffic: {response.text}") return 0, "Error", "Error", None except Exception as e: logging.error(f"Error analyzing traffic: {e}") return 0, "Error", "Error", None # RL Optimization Class and Methods class TrafficSimEnv(gym.Env): def __init__(self, congestion_level): super(TrafficSimEnv, self).__init__() self.congestion_level = congestion_level # Define observation space: [congestion_level, signal_duration] self.observation_space = spaces.Box(low=np.array([0, 0]), high=np.array([10, 60]), dtype=np.float32) # Define action space: [increase, decrease, maintain] self.action_space = spaces.Discrete(3) # Initial state self.state = np.array([congestion_level, 30], dtype=np.float32) self.done = False def reset(self, *, seed=None, options=None): super().reset(seed=seed) self.state = np.array([self.congestion_level, 30], dtype=np.float32) self.done = False return self.state, {} def step(self, action): if self.done: raise RuntimeError("Cannot call step() on a terminated environment. Please reset the environment.") # Extract state components congestion, signal_duration = self.state # Apply action if action == 0: # Decrease signal duration signal_duration = max(10, signal_duration - 5) elif action == 1: # Maintain signal duration signal_duration = signal_duration elif action == 2: # Increase signal duration signal_duration = min(60, signal_duration + 5) # Update congestion (simple simulation logic for this example) if signal_duration > 30: congestion += 1 else: congestion -= 1 # Set rewards (example logic) if 20 <= signal_duration <= 40: reward = 0 else: reward = -abs(signal_duration - 30) # Check if done self.done = congestion <= 0 or congestion >= 10 # Update state self.state = np.array([congestion, signal_duration], dtype=np.float32) return self.state, reward, self.done, False, {} def render(self): print(f"State: {self.state}") def close(self): pass # Prior to commit had a lot of errors regarding expected output errors def optimize_signal_rl(congestion_level): try: # Create environment env = DummyVecEnv([lambda: TrafficSimEnv(congestion_level)]) # Initialize PPO model model = PPO("MlpPolicy", env, verbose=0) # Train the model model.learn(total_timesteps=1000) # Reset the environment obs, info = env.reset() obs = obs[0] # Extract first observation from batch for _ in range(10): action, _ = model.predict(obs, deterministic=True) obs, reward, done, truncated, info = env.step(action) # Ensure single observation extraction obs = obs[0] reward = reward[0] done = done[0] truncated = truncated[0] info = info[0] # Log each step for debugging print(f"STEP: action={action}, obs={obs}, reward={reward}, done={done}, truncated={truncated}, info={info}") if done or truncated: break optimal_duration = int(obs[1]) if len(obs) > 1 else 30 return f"Green for {optimal_duration}s, Red for {60 - optimal_duration}s" except Exception as e: logging.error(f"Error optimizing signal with RL: {e}") return "Error in RL Optimization" def process_traffic_image(image): """ Orchestrates the traffic analysis workflow. """ # Save the uploaded image temporarily image_path = "temp_traffic_image.jpg" image.save(image_path) try: # Send the image to the backend for analysis with open(image_path, "rb") as img_file: response = requests.post( f"{HOSTED_API_URL}/analyze_traffic/", files={"file": img_file} ) except Exception as e: logging.error(f"Error in backend request: {e}") return "Error in backend request.", None finally: os.remove(image_path) # Clean up the temporary image # Process backend response if response.status_code == 200: data = response.json() vehicle_count = data.get("vehicle_count", 0) congestion_level = data.get("congestion_level", "Unknown") flow_rate = data.get("flow_rate", "Unknown") processed_image_base64 = data.get("processed_image", None) # Decode the processed image (if provided) processed_image = None if processed_image_base64: try: processed_image = Image.open(io.BytesIO(base64.b64decode(processed_image_base64))) except Exception as e: logging.error(f"Error decoding processed image: {e}") processed_image = None # Signal timing optimization signal_timing = optimize_signal_rl(congestion_level) # Return the results return ( f"Detected Vehicles: {vehicle_count}\n" f"Congestion Level: {congestion_level}\n" f"Traffic Flow: {flow_rate}\n" f"Signal Timing Suggestion: {signal_timing}", processed_image ) else: logging.error(f"Error from backend: {response.text}") return f"Error from backend: {response.status_code}", None # Gradio Interface def gradio_interface(image): """ Wrapper for Gradio to handle input/output for traffic analysis. """ try: results, analyzed_image = process_traffic_image(image) return results, analyzed_image except Exception as e: logging.error(f"Error in Gradio interface: {e}") return "An error occurred. Please try again with a valid traffic image.", None if __name__ == "__main__": # UI interface = gr.Interface( fn=gradio_interface, inputs=gr.Image(type="pil", label="Upload Traffic Image"), outputs=[ gr.Textbox(label="Traffic Analysis Results"), gr.Image(label="Analyzed Traffic Image") ], title="Traffic Management System", description="Upload a traffic image to analyze congestion and get signal timing suggestions." ) # Launch Gradio app interface.launch()