Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -90,35 +90,41 @@ class TrafficSimEnv(gym.Env):
|
|
90 |
return obs, {} # Returns observation and info (info can be empty)
|
91 |
|
92 |
|
93 |
-
def
|
94 |
-
|
95 |
-
|
96 |
-
|
97 |
-
|
|
|
|
|
98 |
|
|
|
99 |
print(f"Action taken: {action}")
|
100 |
print(f"Signal before change: {self.current_signal}")
|
101 |
print(f"Congestion Level: {self.congestion_numeric}")
|
102 |
|
103 |
# Signal changes based on the action
|
104 |
-
|
105 |
-
|
106 |
-
|
|
|
|
|
|
|
107 |
# Reward Calculation based on congestion level and signal
|
108 |
reward = self._calculate_reward()
|
109 |
|
110 |
# Increment the step count
|
111 |
self.steps += 1
|
112 |
-
done = self.steps >= 10 # End condition
|
113 |
truncated = False # Default to False; can change based on custom conditions
|
114 |
|
115 |
-
#
|
116 |
obs = np.array([self.congestion_numeric, self.current_signal], dtype=np.float32)
|
117 |
print(f"Observation: {obs}")
|
118 |
|
119 |
info = {} # Additional info (can remain empty or contain any useful data)
|
120 |
|
121 |
-
return obs, reward, done, truncated, info
|
122 |
|
123 |
def _calculate_reward(self):
|
124 |
if self.congestion_level == "High":
|
|
|
90 |
return obs, {} # Returns observation and info (info can be empty)
|
91 |
|
92 |
|
93 |
+
def reset(self, seed=None, options=None):
|
94 |
+
self.steps = 0
|
95 |
+
self.current_signal = 30 # Reset the signal at the beginning
|
96 |
+
congestion_map = {"Low": 0, "Medium": 1, "High": 2}
|
97 |
+
self.congestion_numeric = congestion_map.get(self.congestion_level, 0)
|
98 |
+
obs = np.array([self.congestion_numeric, self.current_signal], dtype=np.float32)
|
99 |
+
return obs, {} # Returns observation and info (info can remain empty)
|
100 |
|
101 |
+
def step(self, action):
|
102 |
print(f"Action taken: {action}")
|
103 |
print(f"Signal before change: {self.current_signal}")
|
104 |
print(f"Congestion Level: {self.congestion_numeric}")
|
105 |
|
106 |
# Signal changes based on the action
|
107 |
+
if action == 0:
|
108 |
+
self.current_signal = max(20, self.current_signal - 5) # Decrease signal
|
109 |
+
elif action == 2:
|
110 |
+
self.current_signal = min(60, self.current_signal + 5) # Increase signal
|
111 |
+
# Action 1 is to maintain the current signal
|
112 |
+
|
113 |
# Reward Calculation based on congestion level and signal
|
114 |
reward = self._calculate_reward()
|
115 |
|
116 |
# Increment the step count
|
117 |
self.steps += 1
|
118 |
+
done = self.steps >= 10 # End condition: after 10 steps
|
119 |
truncated = False # Default to False; can change based on custom conditions
|
120 |
|
121 |
+
# Observation after the action
|
122 |
obs = np.array([self.congestion_numeric, self.current_signal], dtype=np.float32)
|
123 |
print(f"Observation: {obs}")
|
124 |
|
125 |
info = {} # Additional info (can remain empty or contain any useful data)
|
126 |
|
127 |
+
return obs, reward, done, truncated, info
|
128 |
|
129 |
def _calculate_reward(self):
|
130 |
if self.congestion_level == "High":
|