@@ -14,7 +14,10 @@ def __init__(self, model, id, start_port, all_ports, dwa_config):
1414 self .destination_port = self .assign_destination (all_ports , start_port )
1515 self .global_path = self .calculate_global_path (start_port .pos , self .destination_port .pos )
1616 self .dwa_config = dwa_config
17-
17+
18+ self .heading_deviation = 0.0
19+ self .heading_drift_duration = 0
20+
1821 # Assign a random max speed within the speed range
1922 dwa_config ["max_speed" ] = self .random .uniform (self .model .max_speed_range [0 ], self .model .max_speed_range [1 ])
2023 self .original_max_speed = self .dwa_config ["max_speed" ]
@@ -43,8 +46,40 @@ def step(self):
4346 if self .global_path and len (self .global_path ) > 1 :
4447 local_goal = self .get_local_goal (self .state , self .global_path , lookahead = self .model .lookahead )
4548 self .dwa_config ["max_speed" ] = self .get_speed_limit ()
49+
50+ if self .model .speed_variation ["enabled" ]:
51+ self .dwa_config ["max_speed" ] = self .get_noisy_speed ()
52+
53+ # if self.model.directional_variation["enabled"]:
54+ # noisy_state = self.get_noisy_state()
55+ # else:
56+ # noisy_state = self.state
57+
58+ if self .model .directional_variation ["enabled" ]:
59+ # Randomly trigger heading deviation
60+ if self .heading_drift_duration > 0 :
61+ # Continue existing deviation
62+ noisy_theta = self .state [2 ] + self .heading_deviation
63+ self .heading_drift_duration -= 1
64+ logging .info (f"Continue deviation. Ship { self .unique_id } , Theta = { self .heading_deviation } " )
65+ else :
66+ # Random chance to start a new deviation
67+ if self .random .random () < self .model .deviation_chance :
68+ self .heading_deviation = self .random .uniform (- self .model .max_heading_deviation , self .model .max_heading_deviation )
69+ self .heading_drift_duration = self .model .deviation_duration
70+ noisy_theta = self .state [2 ] + self .heading_deviation
71+ logging .info (f"Starting directional deviation. Ship { self .unique_id } , Theta = { self .heading_deviation } " )
72+ else :
73+ noisy_theta = self .state [2 ]
74+
75+ # Normalize heading
76+ noisy_theta = (noisy_theta + math .pi ) % (2 * math .pi ) - math .pi
77+ noisy_state = (self .state [0 ], self .state [1 ], noisy_theta , self .state [3 ], self .state [4 ])
78+ else :
79+ noisy_state = self .state
80+
4681 control , predicted_trajectory , cost_info = dwa_control (
47- self . state , self .dwa_config , self .model .obstacle_tree ,
82+ noisy_state , self .dwa_config , self .model .obstacle_tree ,
4883 self .model .buffered_obstacles , local_goal
4984 )
5085
@@ -54,6 +89,42 @@ def step(self):
5489 else :
5590 self .state = motion (self .state , control [0 ], control [1 ], self .dwa_config ["dt" ])
5691 self .move_position ()
92+
93+ # After motion, restore true max speed
94+ self .dwa_config ["max_speed" ] = self .original_max_speed
95+
96+ def get_noisy_speed (self ):
97+ # Assign ± speed variation as a fraction of max_speed
98+ max_speed_variation = self .model .max_speed_variation
99+ variation_amount = self .dwa_config ["max_speed" ] * max_speed_variation
100+ noisy_speed = self .dwa_config ["max_speed" ] + self .random .uniform (- variation_amount , variation_amount )
101+
102+ # Clamp to valid range
103+ noisy_speed = max (self .dwa_config ["min_speed" ], min (noisy_speed , self .dwa_config ["max_speed" ]))
104+ return noisy_speed
105+
106+ def get_noisy_state (self ):
107+ # Randomly trigger heading deviation
108+ if self .heading_drift_duration > 0 :
109+ # Continue existing deviation
110+ noisy_theta = self .state [2 ] + self .heading_deviation
111+ self .heading_drift_duration -= 1
112+ logging .info (f"Continue deviation. Ship { self .unique_id } , Theta = { self .heading_deviation } " )
113+ else :
114+ # Random chance to start a new deviation
115+ if self .random .random () < self .model .deviation_chance :
116+ self .heading_deviation = self .random .uniform (- self .model .max_heading_deviation , self .model .max_heading_deviation )
117+ self .heading_drift_duration = self .model .deviation_duration
118+ noisy_theta = self .state [2 ] + self .heading_deviation
119+ logging .info (f"Starting directional deviation. Ship { self .unique_id } , Theta = { self .heading_deviation } " )
120+ else :
121+ noisy_theta = self .state [2 ]
122+
123+ # Normalize heading
124+ noisy_theta = (noisy_theta + math .pi ) % (2 * math .pi ) - math .pi
125+ noisy_state = (self .state [0 ], self .state [1 ], noisy_theta , self .state [3 ], self .state [4 ])
126+
127+ return noisy_state
57128
58129 def should_dock (self , current_speed ):
59130 """Determine if the ship should dock at its destination."""
0 commit comments