|
| 1 | +import logging |
| 2 | +import numpy as np |
| 3 | +from mesa import Agent |
| 4 | +import math |
| 5 | +from shapely.geometry import Point, Polygon |
| 6 | +from shapely.strtree import STRtree |
| 7 | + |
| 8 | +from a_star import astar |
| 9 | +from dynamic_window_approach import dwa_control, motion |
| 10 | + |
| 11 | +class Ship(Agent): |
| 12 | + def __init__(self, model, id, start_port, all_ports, dwa_config): |
| 13 | + super().__init__(self, model) |
| 14 | + self.unique_id = id |
| 15 | + self.destination_port = self.assign_destination(all_ports, start_port) |
| 16 | + self.global_path = self.calculate_global_path(start_port.pos, self.destination_port.pos) |
| 17 | + self.dwa_config = dwa_config |
| 18 | + |
| 19 | + self.heading_deviation = 0.0 |
| 20 | + self.heading_drift_duration = 0 |
| 21 | + |
| 22 | + # Assign a random max speed within the speed range |
| 23 | + dwa_config["max_speed"] = self.random.uniform(self.model.max_speed_range[0], self.model.max_speed_range[1]) |
| 24 | + self.original_max_speed = self.dwa_config["max_speed"] |
| 25 | + logging.info(f"Ship {self.unique_id} has a maximum speed of {dwa_config['max_speed']}.") |
| 26 | + |
| 27 | + if self.global_path and len(self.global_path) > 1: |
| 28 | + first_waypoint = self.global_path[1] # Ensure it doesn't use the port position |
| 29 | + dx = first_waypoint[0] - start_port.pos[0] |
| 30 | + dy = first_waypoint[1] - start_port.pos[1] |
| 31 | + |
| 32 | + # Set initial heading (theta) towards the first waypoint |
| 33 | + initial_theta = math.atan2(dy, dx) |
| 34 | + else: |
| 35 | + initial_theta = 0.0 |
| 36 | + |
| 37 | + # Ship's state (x, y, theta, v, w) |
| 38 | + self.state = (start_port.pos[0], start_port.pos[1], initial_theta, 0.0, 0.0) |
| 39 | + |
| 40 | + self.current_wp_idx = 0 |
| 41 | + |
| 42 | + def step(self): |
| 43 | + if self.pos[0] == self.destination_port.pos[0] and self.pos[1] == self.destination_port.pos[1]: |
| 44 | + return |
| 45 | + |
| 46 | + """Move the ship along the calculated global path.""" |
| 47 | + if self.global_path and len(self.global_path) > 1: |
| 48 | + local_goal = self.get_local_goal(self.state, self.global_path, lookahead=self.model.lookahead) |
| 49 | + self.dwa_config["max_speed"] = self.get_speed_limit() |
| 50 | + |
| 51 | + if self.model.speed_variation["enabled"]: |
| 52 | + self.dwa_config["max_speed"] = self.get_noisy_speed() |
| 53 | + |
| 54 | + # if self.model.directional_variation["enabled"]: |
| 55 | + # noisy_state = self.get_noisy_state() |
| 56 | + # else: |
| 57 | + # noisy_state = self.state |
| 58 | + |
| 59 | + if self.model.directional_variation["enabled"]: |
| 60 | + # Randomly trigger heading deviation |
| 61 | + if self.heading_drift_duration > 0: |
| 62 | + # Continue existing deviation |
| 63 | + noisy_theta = self.state[2] + self.heading_deviation |
| 64 | + self.heading_drift_duration -= 1 |
| 65 | + logging.info(f"Continue deviation. Ship {self.unique_id}, Theta = {self.heading_deviation}") |
| 66 | + else: |
| 67 | + # Random chance to start a new deviation |
| 68 | + if self.random.random() < self.model.deviation_chance: |
| 69 | + self.heading_deviation = self.random.uniform(-self.model.max_heading_deviation, self.model.max_heading_deviation) |
| 70 | + self.heading_drift_duration = self.model.deviation_duration |
| 71 | + noisy_theta = self.state[2] + self.heading_deviation |
| 72 | + logging.info(f"Starting directional deviation. Ship {self.unique_id}, Theta = {self.heading_deviation}") |
| 73 | + else: |
| 74 | + noisy_theta = self.state[2] |
| 75 | + |
| 76 | + # Normalize heading |
| 77 | + noisy_theta = (noisy_theta + math.pi) % (2 * math.pi) - math.pi |
| 78 | + noisy_state = (self.state[0], self.state[1], noisy_theta, self.state[3], self.state[4]) |
| 79 | + else: |
| 80 | + noisy_state = self.state |
| 81 | + |
| 82 | + control, predicted_trajectory, cost_info = dwa_control( |
| 83 | + noisy_state, self.dwa_config, self.model.obstacle_tree, |
| 84 | + self.model.buffered_obstacles, local_goal |
| 85 | + ) |
| 86 | + |
| 87 | + # Check if we should dock |
| 88 | + if self.should_dock(control[0]): |
| 89 | + self.move_to_destination() |
| 90 | + else: |
| 91 | + self.state = motion(self.state, control[0], control[1], self.dwa_config["dt"]) |
| 92 | + self.move_position() |
| 93 | + |
| 94 | + # After motion, restore true max speed |
| 95 | + self.dwa_config["max_speed"] = self.original_max_speed |
| 96 | + |
| 97 | + def get_noisy_speed(self): |
| 98 | + # Assign ± speed variation as a fraction of max_speed |
| 99 | + max_speed_variation = self.model.max_speed_variation |
| 100 | + variation_amount = self.dwa_config["max_speed"] * max_speed_variation |
| 101 | + noisy_speed = self.dwa_config["max_speed"] + self.random.uniform(-variation_amount, variation_amount) |
| 102 | + |
| 103 | + # Clamp to valid range |
| 104 | + noisy_speed = max(self.dwa_config["min_speed"], min(noisy_speed, self.dwa_config["max_speed"])) |
| 105 | + return noisy_speed |
| 106 | + |
| 107 | + def should_dock(self, current_speed): |
| 108 | + """Determine if the ship should dock at its destination.""" |
| 109 | + remaining_distance = math.hypot( |
| 110 | + self.state[0] - self.destination_port.pos[0], |
| 111 | + self.state[1] - self.destination_port.pos[1] |
| 112 | + ) |
| 113 | + |
| 114 | + # Dock if the remaining distance is smaller than what the next step would move |
| 115 | + if remaining_distance <= current_speed*self.dwa_config['dt']: |
| 116 | + return True |
| 117 | + |
| 118 | + return False |
| 119 | + |
| 120 | + def move_position(self): |
| 121 | + """Update the ship's position in the simulation space.""" |
| 122 | + self.pos = (float(self.state[0]), float(self.state[1])) |
| 123 | + new_pos = np.array([self.pos[0], self.pos[1]]) |
| 124 | + self.model.space.move_agent(self, new_pos) |
| 125 | + logging.info( |
| 126 | + f"Ship {self.unique_id} moving towards {self.destination_port.pos}. " |
| 127 | + f"Current position: {new_pos}." |
| 128 | + f"Speed: {self.state[3]}." |
| 129 | + ) |
| 130 | + |
| 131 | + def move_to_destination(self): |
| 132 | + """Move the ship directly to its destination.""" |
| 133 | + self.pos = self.destination_port.pos |
| 134 | + self.model.space.move_agent(self, self.destination_port.pos) |
| 135 | + logging.info( |
| 136 | + f"Ship {self.unique_id} arrived at port {self.destination_port.pos}." |
| 137 | + ) |
| 138 | + |
| 139 | + def assign_destination(self, all_ports, start_port): |
| 140 | + """Select a destination port different from the starting port.""" |
| 141 | + possible_destinations = [port for port in all_ports if port != start_port] |
| 142 | + return self.random.choice(possible_destinations) if possible_destinations else start_port |
| 143 | + |
| 144 | + def calculate_global_path(self, start, destination): |
| 145 | + """Calculate a path using A* algorithm.""" |
| 146 | + grid_start = (int(start[0] / self.model.resolution), int(start[1] / self.model.resolution)) |
| 147 | + grid_goal = (int(destination[0] / self.model.resolution), int(destination[1] / self.model.resolution)) |
| 148 | + global_path_indices = astar(self.model.occupancy_grid, grid_start, grid_goal) |
| 149 | + |
| 150 | + if global_path_indices is None: |
| 151 | + logging.info(f"No global path for ship {self.unique_id}.") |
| 152 | + return |
| 153 | + |
| 154 | + global_path = [((i) * self.model.resolution, (j) * self.model.resolution) |
| 155 | + for (i, j) in global_path_indices] |
| 156 | + return global_path |
| 157 | + |
| 158 | + def get_speed_limit(self): |
| 159 | + """Check if the ship is inside a speed-limited zone and return the max speed.""" |
| 160 | + ship_position = Point(self.state[0], self.state[1]) |
| 161 | + |
| 162 | + for zone in self.model.speed_limit_zones: |
| 163 | + polygon = Polygon(zone["zone"]) |
| 164 | + if polygon.contains(ship_position): |
| 165 | + return zone["max_speed"] # Apply the speed limit |
| 166 | + |
| 167 | + return self.original_max_speed # Restore original speed if outside all zones |
| 168 | + |
| 169 | + def get_local_goal(self, state, global_path, lookahead=3.0): |
| 170 | + """ |
| 171 | + Given the robot's current state and the global path, return a local goal. |
| 172 | + Returns a tuple (local_goal, new_wp_idx) where local_goal is a waypoint (x, y) |
| 173 | + that is at least 'lookahead' distance ahead of the current position. |
| 174 | + """ |
| 175 | + x, y = state[0], state[1] |
| 176 | + for idx in range(self.current_wp_idx, len(global_path)): |
| 177 | + wp = global_path[idx] |
| 178 | + if math.hypot(wp[0] - x, wp[1] - y) > lookahead: |
| 179 | + self.current_wp_idx = idx |
| 180 | + return wp |
| 181 | + self.current_wp_idx = len(global_path) - 1 |
| 182 | + return global_path[-1] |
0 commit comments