Fzina commited on
Commit
90c82b4
·
verified ·
1 Parent(s): 3021d5b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +35 -54
app.py CHANGED
@@ -66,59 +66,43 @@ class TrafficSimEnv(gym.Env):
66
  def __init__(self, congestion_level):
67
  super(TrafficSimEnv, self).__init__()
68
  self.congestion_level = congestion_level
69
- self.action_space = spaces.Discrete(3) # Actions: 0 (decrease signal), 1 (maintain), 2 (increase signal)
70
- self.observation_space = spaces.Box(
71
- low=np.array([0, 20]),
72
- high=np.array([2, 60]),
73
- dtype=np.float32
74
- )
75
- self.current_signal = 30 # Starting signal duration (in seconds)
76
  self.steps = 0
77
 
78
- def reset(self, *, seed=None, options=None):
79
- """
80
- Reset the environment and return the initial observation.
81
- """
82
  self.steps = 0
83
- self.current_signal = 30 # Reset the signal at the beginning
84
  congestion_map = {"Low": 0, "Medium": 1, "High": 2}
85
  self.congestion_numeric = congestion_map.get(self.congestion_level, 0)
 
86
  obs = np.array([self.congestion_numeric, self.current_signal], dtype=np.float32)
87
- return obs, {} # Return as single observation (no batch)
 
88
 
89
  def step(self, action):
90
- """
91
- Execute a step in the environment, taking an action and returning the next state and reward.
92
- """
93
- # Signal changes based on the action
94
- if action == 0:
95
- self.current_signal = max(20, self.current_signal - 5) # Decrease signal
96
- elif action == 2:
97
- self.current_signal = min(60, self.current_signal + 5) # Increase signal
98
- # Action 1 is to maintain the current signal
99
-
100
- # Reward Calculation based on congestion level and signal
101
  reward = self._calculate_reward()
102
 
103
- # Increment the step count
104
  self.steps += 1
105
- done = self.steps >= 10 # End condition: after 10 steps
106
- truncated = False # Default to False; can change based on custom conditions
107
 
108
- # Observation after the action
109
  obs = np.array([self.congestion_numeric, self.current_signal], dtype=np.float32)
 
110
 
111
- info = {} # Additional info (can remain empty or contain any useful data)
112
-
113
- return obs, reward, done, truncated, info # Return as individual values
114
 
115
  def _calculate_reward(self):
116
  if self.congestion_level == "High":
117
- return -abs(40 - self.current_signal) # Negative reward if signal is far from 40s
118
  elif self.congestion_level == "Medium":
119
- return -abs(30 - self.current_signal) # Negative reward if signal is far from 30s
120
  else:
121
- return -abs(20 - self.current_signal) # Negative reward if signal is far from 20s
122
 
123
  def render(self, mode="human"):
124
  print(f"Current Signal: {self.current_signal}s")
@@ -129,40 +113,37 @@ class TrafficSimEnv(gym.Env):
129
  # Prior to commit had a lot of errors regarding expected output errors
130
  def optimize_signal_rl(congestion_level):
131
  try:
132
- # Create the environment with DummyVecEnv to wrap TrafficSimEnv
133
  env = DummyVecEnv([lambda: TrafficSimEnv(congestion_level)])
134
 
135
- # Initialize PPO model (policy = "MlpPolicy", for multi-layer perceptron model)
136
  model = PPO("MlpPolicy", env, verbose=0)
137
 
138
- # Train the model on the environment for 1000 timesteps
139
  model.learn(total_timesteps=1000)
140
 
141
- # Reset environment to start the simulation
142
- obs, info = env.reset() # Reset the environment and get initial observation (info is empty by default)
143
-
144
- # Since env.reset() returns a batch, extract the first observation and info
145
- obs = obs[0] # Extract the first observation from the batch
146
- info = info[0] # Extract the first info from the batch
147
 
148
- # Loop through to simulate for 10 timesteps
149
  for _ in range(10):
 
150
  action, _ = model.predict(obs)
151
 
152
- # Step through the environment with the predicted action
153
- obs, reward, done, truncated, info = env.step(action) # Step returns 5 values
154
- obs = obs[0] # Extract the first element of the batch
155
- reward = reward[0] # Extract the first element of the reward batch
156
- done = done[0] # Extract the first element of the done flag batch
157
- truncated = truncated[0] # Extract the first element of the truncated batch
158
- info = info[0] # Extract the first element of the info batch
159
 
160
- # Stop when the environment signals that the episode is done or truncated
161
  if done or truncated:
162
  break
163
 
164
- # Extract the optimal signal duration (second observation value) from `obs`
165
- optimal_duration = int(obs[1]) if len(obs) > 1 else 30 # Ensure the signal value is within range
166
  return f"Green for {optimal_duration}s, Red for {60 - optimal_duration}s"
167
  except Exception as e:
168
  logging.error(f"Error optimizing signal with RL: {e}")
 
66
  def __init__(self, congestion_level):
67
  super(TrafficSimEnv, self).__init__()
68
  self.congestion_level = congestion_level
69
+ self.action_space = spaces.Discrete(3)
70
+ self.observation_space = spaces.Box(low=np.array([0, 20]), high=np.array([2, 60]), dtype=np.float32)
71
+ self.current_signal = 30
 
 
 
 
72
  self.steps = 0
73
 
74
+ def reset(self, seed=None, options=None):
 
 
 
75
  self.steps = 0
76
+ self.current_signal = 30
77
  congestion_map = {"Low": 0, "Medium": 1, "High": 2}
78
  self.congestion_numeric = congestion_map.get(self.congestion_level, 0)
79
+
80
  obs = np.array([self.congestion_numeric, self.current_signal], dtype=np.float32)
81
+ info = {} # Empty dictionary as info
82
+ return obs, info
83
 
84
  def step(self, action):
85
+ signal_change = {0: -5, 1: 0, 2: 5}[action]
86
+ self.current_signal = max(20, min(60, self.current_signal + signal_change))
87
+
 
 
 
 
 
 
 
 
88
  reward = self._calculate_reward()
89
 
 
90
  self.steps += 1
91
+ done = self.steps >= 10
92
+ truncated = False
93
 
 
94
  obs = np.array([self.congestion_numeric, self.current_signal], dtype=np.float32)
95
+ info = {} # Info dictionary (can be populated with useful debugging data)
96
 
97
+ return obs, reward, done, truncated, info
 
 
98
 
99
  def _calculate_reward(self):
100
  if self.congestion_level == "High":
101
+ return -abs(40 - self.current_signal)
102
  elif self.congestion_level == "Medium":
103
+ return -abs(30 - self.current_signal)
104
  else:
105
+ return -abs(20 - self.current_signal)
106
 
107
  def render(self, mode="human"):
108
  print(f"Current Signal: {self.current_signal}s")
 
113
  # Prior to commit had a lot of errors regarding expected output errors
114
  def optimize_signal_rl(congestion_level):
115
  try:
116
+ # Create the environment wrapped in DummyVecEnv
117
  env = DummyVecEnv([lambda: TrafficSimEnv(congestion_level)])
118
 
119
+ # Initialize PPO model
120
  model = PPO("MlpPolicy", env, verbose=0)
121
 
122
+ # Train the model
123
  model.learn(total_timesteps=1000)
124
 
125
+ # Reset environment
126
+ obs, info = env.reset() # For Gymnasium 1.0.0, reset() returns (obs, info)
127
+ obs = obs[0] # Extract first observation from batch
 
 
 
128
 
 
129
  for _ in range(10):
130
+ # Predict action
131
  action, _ = model.predict(obs)
132
 
133
+ # Take a step
134
+ obs, reward, done, truncated, info = env.step(action)
135
+ obs = obs[0] # Extract first observation from batch
136
+ reward = reward[0] # Extract first reward from batch
137
+ done = done[0] # Extract first done flag from batch
138
+ truncated = truncated[0] # Extract first truncated flag from batch
139
+ info = info[0] # Extract first info from batch
140
 
141
+ # End simulation if episode is done or truncated
142
  if done or truncated:
143
  break
144
 
145
+ # Extract optimal signal duration
146
+ optimal_duration = int(obs[1]) if len(obs) > 1 else 30
147
  return f"Green for {optimal_duration}s, Red for {60 - optimal_duration}s"
148
  except Exception as e:
149
  logging.error(f"Error optimizing signal with RL: {e}")