1
0

Initial implementation

This commit is contained in:
2021-04-15 13:30:43 +03:00
commit 1157182678
42 changed files with 6502 additions and 0 deletions

0
library/build.gradle Normal file
View File

View File

@ -0,0 +1,21 @@
package drones;
import java.util.ResourceBundle;
/**
* Convenience class for retrieving translated strings
*
* @author John Ahlroos
*/
public interface I18N {
/**
* Get a translated string
*
* @param key the translation key
* @return the translated message
*/
static String get(String key) {
return ResourceBundle.getBundle("drones-messages").getString(key);
}
}

View File

@ -0,0 +1,259 @@
package drones.dispatcher;
import drones.I18N;
import drones.drone.Drone;
import drones.geo.Point;
import drones.geo.TubeStation;
import drones.messages.EventBus;
import drones.messages.EventBus.Listener;
import drones.messages.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
/**
* A Drone dispatcher for controlling {@link Drone}'s
*
* @author John Ahlroos
*/
public class Dispatcher implements Runnable, Listener<Message> {
public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern(I18N.get("date.format"));
private static final DecimalFormat REPORT_NUMBER_FORMAT = new DecimalFormat(I18N.get("number.format"));
private static final Logger LOGGER = LoggerFactory.getLogger("Dispatcher");
private static final Logger REPORT_LOGGER = LoggerFactory.getLogger("Report");
private final LocalDateTime shutdownTime;
private final Map<Long, BufferedReader> dataReaders = new HashMap<>();
private final double simulatedTimeFactor;
private LocalDateTime currentTime;
private final EventBus eventBus;
private final List<TubeStation> tubeStations = new ArrayList<>();
private final Set<String> tubeStationsFound = new HashSet<>();
private int tuneStationReports = 0;
private final Map<Long, Long> waypointsSent = new HashMap<>();
/**
* Creates a new dispatcher
*
* @param currentTime the current time of the dispatcher
* @param shutdownTime the time when the dispatcher should terminate
* @param simulatedTimeFactor used for controlling simulated time. Should be between 0-1.
*/
public Dispatcher(LocalDateTime currentTime, LocalDateTime shutdownTime, double simulatedTimeFactor) {
this.currentTime = currentTime;
this.shutdownTime = shutdownTime;
this.simulatedTimeFactor = Math.max(0.0001, simulatedTimeFactor);
this.eventBus = new EventBus(simulatedTimeFactor);
this.eventBus.register(this);
LOGGER.info(I18N.get("dispatcher.running.stats.1"));
LOGGER.info(I18N.get("dispatcher.running.stats.2"), this.currentTime);
LOGGER.info(I18N.get("dispatcher.running.stats.3"), this.shutdownTime);
LOGGER.info(I18N.get("dispatcher.running.stats.4"), this.simulatedTimeFactor);
LOGGER.info("");
}
/**
* Register tube stations to report on.
* <p>
* The format is:
* "<Station name>",<latitude>,<longitude>
*
* @param reader A reader referencing a file and another data stream
* @throws IOException if reading the file fails
*/
public void registerTubeStations(BufferedReader reader) throws IOException {
try (reader) {
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
var scanner = new Scanner(line).useDelimiter(",");
var name = scanner.next().replace("\"", "");
var lat = Double.parseDouble(scanner.next().replace("\"", ""));
var lon = Double.parseDouble(scanner.next().replace("\"", ""));
var tb = new TubeStation(new Point(lat, lon, null), name);
LOGGER.info("Registered Tube Station: {}", tb);
tubeStations.add(tb);
}
}
}
/**
* Registers a new drone with the dispatcher.
*
* @param id the drone id
* @param reader the data source reader, can point to a file or another data stream.
* The format is:
* <drone id>,"<latitude>","<longitude>","<time>"
* The time format is:
* 2011-03-22 07:49:10
* @return returns a representation of the drone
*/
public Runnable registerDroneWithReader(long id, BufferedReader reader) {
dataReaders.put(id, reader);
var drone = new Drone(id, currentTime, simulatedTimeFactor, eventBus, tubeStations);
eventBus.register(drone);
LOGGER.info(I18N.get("dispatcher.drone.registration"), drone.getId());
return drone;
}
@Override
public void run() {
LOGGER.info(I18N.get("dispatcher.started"));
while (dataReaders.size() > 0) {
if (currentTime.isEqual(shutdownTime)) {
LOGGER.info(I18N.get("dispatcher.send.shutdown.signals"), dataReaders.size());
for (Long droneId : dataReaders.keySet()) {
LOGGER.info(I18N.get("dispatcher.event.drone.shutdown"), droneId);
eventBus.push(new Message.ShutdownSignal(droneId, droneId, shutdownTime));
}
}
if (Thread.currentThread().isInterrupted()) {
LOGGER.info(I18N.get("dispatcher.shutdown"));
break;
}
incrementTime();
}
LOGGER.info(I18N.get("dispatcher.terminating"));
LOGGER.info(I18N.get("dispatcher.termination.stats.1"), tubeStationsFound);
LOGGER.info(I18N.get("dispatcher.termination.stats.2"), tuneStationReports);
LOGGER.info(I18N.get("dispatcher.termination.stats.3"));
waypointsSent.forEach((k, v) -> LOGGER.info(I18N.get("dispatcher.termination.stats.4"), k, v));
}
@Override
public void onEvent(Message message) {
// No more drones
if (dataReaders.isEmpty()) {
return;
}
// Ensure we are only processing our messages
if (message.targetId() != Message.DISPATCHER_ID) {
return;
}
// Received shutdown confirmation from Drone, cleanup resources
if (message instanceof Message.ShutdownSignal shutdown) {
try {
if (dataReaders.containsKey(shutdown.droneId())) {
dataReaders.get(shutdown.droneId()).close();
dataReaders.remove(shutdown.droneId());
}
} catch (IOException e) {
LOGGER.warn(I18N.get("dispatcher.event.drone.data.stream.error"), shutdown.droneId());
}
}
// Is drone request route waypoints?
if (message instanceof Message.DroneCapacity droneCapacity) {
LOGGER.info(I18N.get("dispatcher.event.drone.capacity.request"), droneCapacity.droneId(), droneCapacity.capacity());
updateDroneRoute(droneCapacity.droneId(), droneCapacity.capacity());
return;
}
// Has drone sent a traffic condition?
if (message instanceof Message.TrafficCondition condition) {
tubeStationsFound.add(condition.tubeStation());
tuneStationReports++;
REPORT_LOGGER.info(I18N.get("dispatcher.event.drone.traffic.report"),
condition.tubeStation(),
condition.time().toLocalTime(),
condition.condition().toString(),
condition.droneId(), REPORT_NUMBER_FORMAT.format(condition.speed() * 3.6),
REPORT_NUMBER_FORMAT.format(condition.distanceToStation()));
return;
}
}
/**
* Reads the next waypoint from the data source.
*
* @param id The id of the drone
* @return If no waypoint can be read, either by reaching the end-of-file or by an error, an empty Optional is
* returned. Otherwise the read point is returned.
*/
private Optional<Point> readNextPosition(long id) {
try {
var reader = dataReaders.get(id);
if (reader == null) {
return Optional.empty();
}
var line = reader.readLine();
if (line == null) {
return Optional.empty();
}
var scanner = new Scanner(line).useDelimiter(",");
if (scanner.nextLong(10) != id) {
throw new IllegalStateException(I18N.get("dispatcher.data.file.id.mismatch"));
}
var lat = Double.parseDouble(scanner.next().replace("\"", ""));
var lon = Double.parseDouble(scanner.next().replace("\"", ""));
var time = LocalDateTime.parse(scanner.next().replace("\"", ""), DATE_FORMAT);
if (time.isAfter(shutdownTime)) {
// Ignore points after shutdown time
return Optional.empty();
}
return Optional.of(new Point(lat, lon, time));
} catch (IOException e) {
LOGGER.warn(I18N.get("dispatcher.data.file.stream.error"));
return Optional.empty();
}
}
/**
* Updates a drone with new waypoints if there is capacity
*
* @param droneId the id of the drone to update
* @param fillCapacity the amount of waypoints to send
*/
private void updateDroneRoute(long droneId, int fillCapacity) {
var addedWaypoints = 0;
while (fillCapacity > 0) {
var wp = readNextPosition(droneId);
if (wp.isEmpty()) {
break;
}
eventBus.push(new Message.PointData(droneId, wp.get()));
waypointsSent.computeIfPresent(droneId, (k, v) -> ++v);
waypointsSent.putIfAbsent(droneId, 1L);
addedWaypoints++;
fillCapacity--;
}
if (addedWaypoints > 0) {
LOGGER.info(I18N.get("dispatcher.waypoints.sent"), addedWaypoints, droneId);
}
}
/**
* Simulates latency of the dispatcher
* <p>
* Controlled by the {@link #simulatedTimeFactor} property. If using real-time (factor 1) then there is a 10
* second latency.
*/
private void incrementTime() {
try {
currentTime = currentTime.plusSeconds(1);
Thread.sleep(Math.round(simulatedTimeFactor * 1000));
} catch (InterruptedException e) {
LOGGER.debug(I18N.get("dispatcher.thread.sleep.error"));
}
}
}

View File

@ -0,0 +1,322 @@
package drones.drone;
import drones.I18N;
import drones.geo.Point;
import drones.geo.TubeStation;
import drones.messages.EventBus;
import drones.messages.EventBus.Listener;
import drones.messages.Message;
import drones.messages.Message.TrafficCondition.Condition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
/**
* Drone implementation to be controlled by a {@link drones.dispatcher.Dispatcher}
*
* @author John Ahlroos
*/
public class Drone implements Runnable, Listener<Message> {
private static final int ROUTE_CAPACITY = 10;
private static final int REQUEST_WAYPOINT_THRESHOLD = 3; // 30% capacity remaining
public static final int TUBE_STATION_RANGE = 350;
private final long id;
private double speed = 0;
private Point position;
private final Deque<Point> route = new ConcurrentLinkedDeque<>();
private DroneState state = DroneState.HOLDING;
private LocalDateTime currentTime;
private final double simulatedTimeFactor;
private final EventBus eventBus;
private final Collection<TubeStation> tubeStations;
private final Set<TubeStation> tubeStationsInRange = new HashSet<>();
private final Set<TubeStation> tubeStationsFound = new HashSet<>();
private LocalDateTime shutdownTime;
private int waypointsVisited = 0;
private double longestDistanceTraveled = 0;
private double fastestSpeed = 0;
/**
* Creates a new drone
*
* @param id the id of the drone. Must be unique
* @param currentTime the current time
* @param simulatedTimeFactor time factor to simulate time. Should be between 0-1.
* @param eventBus the event bus for sending and recieving messages
* @param tubeStations the tube stations to report on
*/
public Drone(long id, LocalDateTime currentTime, double simulatedTimeFactor, EventBus eventBus,
Collection<TubeStation> tubeStations) {
this.id = id;
this.currentTime = currentTime;
this.simulatedTimeFactor = Math.max(0.0001, simulatedTimeFactor);
this.eventBus = eventBus;
this.tubeStations = tubeStations;
this.position = new Point(0, 0, currentTime);
getLogger().info(I18N.get("drone.running.stats.1"));
getLogger().info(I18N.get("drone.running.stats.2"), this.currentTime);
getLogger().info(I18N.get("drone.running.stats.3"), this.simulatedTimeFactor);
getLogger().info("");
}
/**
* Get the id of the drone. Must be unique.
*
* @return the drone id
*/
public long getId() {
return id;
}
/**
* Get the current position of the drone.
* <p>
* If the drone has not yet recieved any waypoints then this is <code>null</code>. This will be set initially to
* the first waypoint and then updated as the drone flies.
* </p>
*
* @return the current position
*/
public Point getPosition() {
return position;
}
/**
* Get how many waypoints remain on the current route. Might be updated by the dispatcher.
*
* @return the amount of waypoints remaining
*/
public int getWaypointsRemaining() {
return route.size();
}
/**
* The current state of the drone.
*
* <ul>
* <li>{@link DroneState#HOLDING}: The drone is currently holding at the same position</li>
* <li>{@link DroneState#MOVING}: The drone is currently moving to a waypoint</li>
* <li>{@link DroneState#TERMINATED}: The drone has terminated operation. No more waypoints will be
* processed</li>
* </ul>
*
* @return the current drone state.
*/
public DroneState getState() {
return state;
}
/**
* Returns how many waypoints have been visited by the drone
*
* @return the number of waypoints visited
*/
public int getWaypointsVisited() {
return waypointsVisited;
}
/**
* Returns which tube stations have been found while operating
*
* @return the names of the tube stations found
*/
public Set<TubeStation> getTubeStationsFound() {
return Collections.unmodifiableSet(tubeStationsFound);
}
@Override
public void run() {
getLogger().info(I18N.get("drone.started"));
// Request initial waypoints
requestWaypoints();
while (state != DroneState.TERMINATED) {
// Received termination signal
if (Thread.currentThread().isInterrupted()) {
getLogger().info(this.toString());
break;
}
// Shutdown
if (shutdownTime != null && (currentTime.isEqual(shutdownTime) || currentTime.isAfter(shutdownTime))) {
if (route.isEmpty()) {
setState(DroneState.TERMINATED);
getLogger().info(this.toString());
break;
}
}
// Log current location and drone details
getLogger().info(this.toString());
// Check if we have any more waypoints, if not hold position
var nextWaypoint = route.peekFirst();
if (nextWaypoint == null) {
getLogger().info(I18N.get("drone.holding.position"));
setState(DroneState.HOLDING);
speed = 0.0;
incrementTime();
continue;
}
// Check if we received our first waypoint, adjust dummy position
if (position.latitude() == 0 && nextWaypoint.time().isBefore(currentTime)) {
getLogger().info(I18N.get("drone.holding.position"));
setState(DroneState.HOLDING);
speed = 0.0;
incrementTime();
continue;
} else if (position.latitude() == 0) {
position = Point.adjustTime(nextWaypoint, position.time());
}
// Check for tube stations
tubeStationsInRange.removeIf(tb -> !tb.point().isInRange(position, TUBE_STATION_RANGE));
tubeStations.stream()
.filter(tb -> !tubeStationsInRange.contains(tb))
.filter(tb -> tb.point().isInRange(position, TUBE_STATION_RANGE))
.peek(tubeStationsInRange::add)
.peek(tubeStationsFound::add)
.forEach(tb -> eventBus.push(new Message.TrafficCondition(
Message.DISPATCHER_ID, getId(), tb.name(),
position.time(), speed,
Condition.values()[new Random().nextInt(Condition.values().length)],
position.distanceTo(tb.point()))));
// Check if we are at the destination waypoint
if (nextWaypoint.equals(position)) {
setState(DroneState.HOLDING);
speed = 0.0;
currentTime = nextWaypoint.time();
waypointsVisited++;
getLogger().info(I18N.get("drone.arrived.at.waypoint"), nextWaypoint);
position = route.removeFirst();
requestWaypoints();
incrementTime();
continue;
}
// Calculate new speed if we are departing to a new waypoint
var distanceToNextWaypoint = position.distanceTo(nextWaypoint);
longestDistanceTraveled = Math.max(longestDistanceTraveled, distanceToNextWaypoint);
if (state == DroneState.HOLDING) {
var timeToNextWaypoint = position.timeTo(nextWaypoint);
speed = distanceToNextWaypoint / timeToNextWaypoint.getSeconds();
getLogger().debug(I18N.get("drone.speed.calculation"), timeToNextWaypoint.getSeconds(), distanceToNextWaypoint);
}
// Prevent overshooting the waypoint by reducing speed as we approach the waypoint
if (speed > distanceToNextWaypoint) {
speed = distanceToNextWaypoint;
getLogger().info(I18N.get("drone.speed.reduced"), speed);
}
// Start moving to the new waypoint
fastestSpeed = Math.max(fastestSpeed, speed);
position = position.moveTowards(nextWaypoint, speed, 1);
setState(DroneState.MOVING);
getLogger().debug(I18N.get("drone.next.waypoint.leg"), position);
incrementTime();
}
getLogger().info(I18N.get("drone.shutdown"), position);
eventBus.push(new Message.ShutdownSignal(Message.DISPATCHER_ID, id, shutdownTime));
getLogger().info(I18N.get("drone.termination.stats.1"), waypointsVisited);
getLogger().info(I18N.get("drone.termination.stats.2"), tubeStationsFound);
getLogger().info(String.format(I18N.get("drone.termination.stats.3"), longestDistanceTraveled / 1000.0));
getLogger().info(String.format(I18N.get("drone.termination.stats.4"), fastestSpeed * 3.6));
}
private void requestWaypoints() {
var capacity = availableRouteCapacity();
if (capacity > ROUTE_CAPACITY - REQUEST_WAYPOINT_THRESHOLD) {
eventBus.push(new Message.DroneCapacity(Message.DISPATCHER_ID, getId(), capacity));
getLogger().info(I18N.get("drone.new.route.request"), capacity);
}
}
private void setState(DroneState state) {
if (this.state == DroneState.TERMINATED) {
return;
}
this.state = state;
}
@Override
public String toString() {
var lat = position.latitude();
var lon = position.longitude();
var time = position.time();
return String.format(I18N.get("drone.position"), speed, state, lat, lon, route.size(), waypointsVisited,
time.toLocalTime());
}
@Override
public void onEvent(Message message) {
// Check that the message is for this drone
if (message.targetId() != id) {
return;
}
// Check for signals
if (message instanceof Message.ShutdownSignal signal) {
getLogger().info(I18N.get("drone.event.signal"), signal);
shutdownTime = signal.time();
getLogger().info(I18N.get("drone.event.signal.shutdown"));
return;
}
// Check for new waypoints
if (message instanceof Message.PointData data) {
var point = data.point();
if (availableRouteCapacity() > 0) {
getLogger().info(I18N.get("drone.event.new.waypoint"), point);
route.add(point);
} else {
getLogger().warn(I18N.get("drone.event.new.waypoint.cache.full"), point);
}
return;
}
}
/**
* Get the amount of points we can load in the waypoint cache
*
* @return number of points we can load
*/
private int availableRouteCapacity() {
return ROUTE_CAPACITY - route.size();
}
/**
* Get the named logger for the drone (allows us to split the logs into different files)
*
* @return the logger
*/
private Logger getLogger() {
return LoggerFactory.getLogger("Drone." + id);
}
/**
* Simulates latency and delay while flying. Allows us to model in real-time the fight path
*/
private void incrementTime() {
try {
currentTime = currentTime.plusSeconds(1);
position = Point.adjustTime(position, currentTime);
Thread.sleep(Math.round(simulatedTimeFactor * 1000));
} catch (InterruptedException e) {
getLogger().debug(I18N.get("drone.thread.sleep.error"));
}
}
}

View File

@ -0,0 +1,25 @@
package drones.drone;
/**
* The state of the drone
*
* @author John Ahlroos
*/
public enum DroneState {
/**
* The drone is moving in some direction
*/
MOVING,
/**
* The drone is holding at a location
*/
HOLDING,
/**
* The drone has terminated operation
*/
TERMINATED
}

View File

@ -0,0 +1,127 @@
package drones.geo;
import drones.I18N;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* Represents a point in the world.
*
* @author John Ahlroos
*/
public record Point(double latitude, double longitude, LocalDateTime time) {
private static final double EARTH_RADIUS = 6371.01;
private static final double ACCURACY = 0.00001;
/**
* Measures the distance between this point and another point.
*
* @param point the point to measure distance to
* @return the distance in meters
*/
public double distanceTo(Point point) {
var lat1 = Math.toRadians(latitude());
var lon1 = Math.toRadians(longitude());
var lat2 = Math.toRadians(point.latitude());
var lon2 = Math.toRadians(point.longitude());
double earthRadius = EARTH_RADIUS * 1000; //Meters
return earthRadius * Math.acos(Math.sin(lat1) * Math.sin(lat2) + Math.cos(lat1) * Math.cos(lat2) * Math.cos(lon1 - lon2));
}
/**
* Checks if another point is withing a certain bounds
*
* @param point the point to check
* @param meters the range to check for
* @return <code>true</code> if it is within range, <code>false</code> otherwise.
*/
public boolean isInRange(Point point, double meters) {
return distanceTo(point) <= meters;
}
/**
* The amount of time between two point timestamps
*
* @param point the other point to compare to
* @return the duraction between the points
*/
public Duration timeTo(Point point) {
var time = Duration.between(time(), point.time().plusSeconds(1)).abs();
return time.isZero() ? Duration.ofSeconds(1) : time;
}
/**
* The bearing, measured in radians, to another points
*
* @param point the other point to point to
* @return the bearing in radians
*/
public double bearingTo(Point point) {
var dL = point.longitude() - longitude();
var x = Math.cos(point.latitude()) * Math.sin(dL);
var y = Math.cos(latitude()) * Math.sin(point.latitude()) -
Math.sin(latitude()) * Math.cos(point.latitude()) * Math.cos(dL);
return Math.atan2(x, y);
}
/**
* Calculates what the next point would be if we would move (meters) amount
* for (seconds) toward another point.
*
* @param destination The destination point to head for
* @param meters the amount of meters to travel
* @param seconds the amount of time it should take
* @return the next point we would reach
*/
public Point moveTowards(Point destination, double meters, long seconds) {
if (meters <= 0) {
throw new IllegalArgumentException(I18N.get("point.meters.positive.value"));
}
if (seconds <= 0) {
throw new IllegalArgumentException(I18N.get("point.seconds.positive.value"));
}
var brng = bearingTo(destination);
var d = meters / 1000.0;
var lat1 = Math.toRadians(latitude());
var lon1 = Math.toRadians(longitude());
var lat2 = Math.asin(Math.sin(lat1) * Math.cos(d / EARTH_RADIUS) +
Math.cos(lat1) * Math.sin(d / EARTH_RADIUS) * Math.cos(brng));
var lon2 = lon1 + Math.atan2(Math.sin(brng) * Math.sin(d / EARTH_RADIUS) * Math.cos(lat1),
Math.cos(d / EARTH_RADIUS) - Math.sin(lat1) * Math.sin(lat2));
return new Point(Math.toDegrees(lat2), Math.toDegrees(lon2), time());
}
/**
* Returns a new point where the time has been adjusted to the new time
*
* @param point the point to use as basis
* @param time the time to set
* @return a new point using the old point but with changed time
*/
public static Point adjustTime(Point point, LocalDateTime time) {
return new Point(point.latitude(), point.longitude(), time);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
var point = (Point) o;
var latDiff = Math.abs(point.latitude - latitude);
var lonDiff = Math.abs(point.longitude - longitude);
return latDiff < ACCURACY && lonDiff < ACCURACY;
}
@Override
public int hashCode() {
return Objects.hash(latitude, longitude);
}
@Override
public String toString() {
return String.format("(lat=%f, lon=%f, time=%s)", latitude, longitude, time);
}
}

View File

@ -0,0 +1,29 @@
package drones.geo;
import java.util.Objects;
/**
* Represents a tube station
*
* @author John Ahlroos
*/
public record TubeStation(Point point, String name) {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TubeStation that = (TubeStation) o;
return Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
@Override
public String toString() {
return String.format("(name=%s, lat=%f, lon=%f)", name, point.latitude(), point.longitude());
}
}

View File

@ -0,0 +1,65 @@
package drones.messages;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Represent a simple event bus between the {@link drones.drone.Drone} and the {@link drones.dispatcher.Dispatcher}
*
* @author John Ahlroos
*/
public class EventBus {
private final List<Listener<Message>> listeners = new CopyOnWriteArrayList<>();
private final double simulatedTimeFactor;
public EventBus(double simulatedTimeFactor) {
this.simulatedTimeFactor = simulatedTimeFactor;
}
/**
* Push a new message to the event bus
*
* @param message the message to send
*/
public void push(Message message) {
latency();
listeners.forEach(l -> l.onEvent(message));
}
/**
* Register a receiver for the messages
*
* @param listener the listener
*/
public void register(Listener<Message> listener) {
listeners.add(listener);
}
/**
* Simulates a slight latency in the event bus (messages moving over the air)
*/
private void latency() {
try {
Thread.sleep(Math.round(simulatedTimeFactor * 300));
} catch (InterruptedException e) {
// Ignore
}
}
/**
* Listener interface for receiving messages from the event bus
*
* @param <Message> messsage on the buss
*/
@FunctionalInterface
public interface Listener<Message> {
/**
* Triggered when a new message is published on the bus
*
* @param message the message that was published
*/
void onEvent(Message message);
}
}

View File

@ -0,0 +1,63 @@
package drones.messages;
import drones.geo.Point;
import java.time.LocalDateTime;
/**
* Represent a message on the {@link EventBus}
*
* @author John Ahlroos
*/
public interface Message {
long DISPATCHER_ID = 0;
/**
* The recipient of the message
*/
long targetId();
/**
* Represent a message with point data for the drone
*/
record PointData(long targetId, Point point) implements Message {
}
/**
* Shutdown signal for the drone. Published by the {@link drones.dispatcher.Dispatcher}
*/
record ShutdownSignal(long targetId, long droneId, LocalDateTime time) implements Message {
}
/**
* Represents the traffic condition at a tube station
*/
record TrafficCondition(long targetId, long droneId, String tubeStation, LocalDateTime time,
double speed, Condition condition, double distanceToStation) implements Message {
/**
* The condition of the traffic
*/
public enum Condition {
/**
* Heavy traffic
*/
HEAVY,
/**
* Light traffic
*/
LIGHT,
/**
* Moderate traffic
*/
MODERATE
}
}
/**
* Represents a drone capacity message posted by the drone to request more waypoints
*/
record DroneCapacity(long targetId, long droneId, int capacity) implements Message {
}
}

View File

@ -0,0 +1,49 @@
date.format=yyyy-MM-dd HH:mm:ss
number.format=0
# Dispatcher
dispatcher.running.stats.1=Running with the following settings:
dispatcher.running.stats.2=Start time:\t{}
dispatcher.running.stats.3=Shutdown time:\t{}
dispatcher.running.stats.4=Simulated time factor:\t{}
dispatcher.drone.registration=Registered drone {}
dispatcher.started=Started
dispatcher.shutdown=Shutting down dispatcher
dispatcher.terminating=No more drones, terminating...
dispatcher.send.shutdown.signals=Sending shutdown signal to {} drones
dispatcher.event.drone.shutdown=Sending shutdown signal for drone {}
dispatcher.event.drone.data.stream.error=Failed to close drone {} data stream
dispatcher.event.drone.capacity.request=Received capacity request from drone {} (capacity: {})
dispatcher.event.drone.traffic.report={} @ {}: {} (drone: {},speed: {}km/h, distanceToStation: {}m)
dispatcher.data.file.id.mismatch=Data file is corrupt, id mismatch
dispatcher.data.file.stream.error=Failed to read data stream
dispatcher.waypoints.sent=Sent {} waypoints to drone {}
dispatcher.thread.sleep.error=Sleep interrupted
dispatcher.termination.stats.1=Tube stations found: {}
dispatcher.termination.stats.2=Tube station reports:\t{}
dispatcher.termination.stats.3=Waypoints sent:
dispatcher.termination.stats.4=Drone {}: {}
# Drone
drone.running.stats.1=Running with the following settings:
drone.running.stats.3=Start time:\t{}
drone.running.stats.2=Simulated time factor:\t{}
drone.started=Started
drone.shutdown=Shutdown at {}
drone.holding.position=Holding position, no new waypoints to go to...
drone.arrived.at.waypoint=Arrived to waypoint {}
drone.new.route.request=Requested new route information ({} waypoints)
drone.speed.calculation=Re-calculated speed for next waypoint (time: {} seconds, distance: {}m)
drone.speed.reduced=Reduced speed to {} m/s
drone.next.waypoint.leg=Selected next position on waypoint leg {}
drone.position=speed=%.2f m/s, state=%s, lat=%f, lon=%f, pointsRemaining=%d, pointsVisited=%d, time=%s
drone.event.signal=Received signal {}
drone.event.signal.shutdown=Received shutdown, stopping processing thread...
drone.event.new.waypoint=Received new waypoint {}
drone.event.new.waypoint.cache.full=Drone cache was full!, skipped waypoint {}
drone.thread.sleep.error=Sleep interrupted
drone.termination.stats.1=Waypoints visited:\t\t{}
drone.termination.stats.2=Tube stations found:\t{}
drone.termination.stats.3=Longest leg travelled:\t%.2f km
drone.termination.stats.4=Fastest speed:\t\t%.2f km/h
# Point
point.meters.positive.value=Meters must be a positive value
point.seconds.positive.value=Seconds must be a positive value

View File

@ -0,0 +1,102 @@
package drones.dispatcher;
import drones.drone.Drone;
import drones.drone.DroneState;
import org.junit.jupiter.api.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.time.LocalDateTime;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class DispatcherTest {
@Test
public void setupDrone() {
var start = LocalDateTime.parse("2011-03-22 07:55:20", Dispatcher.DATE_FORMAT);
var end = LocalDateTime.parse("2011-03-22 07:55:40", Dispatcher.DATE_FORMAT);
var route = """
5937,"51.476105","-0.100224","2011-03-22 07:55:26"
5937,"51.475967","-0.100368","2011-03-22 07:55:40"
""";
var dispatcher = new Dispatcher(start, end, 0);
var drone = (Drone) dispatcher.registerDroneWithReader(5937, new BufferedReader(new StringReader(route)));
assertEquals(drone.getPosition().time(), start);
assertEquals(0, drone.getWaypointsRemaining());
assertEquals(5937, drone.getId());
}
@Test
public void pushDataToDrone() throws InterruptedException {
var start = LocalDateTime.parse("2011-03-22 07:55:20", Dispatcher.DATE_FORMAT);
var end = LocalDateTime.parse("2011-03-22 07:56:00", Dispatcher.DATE_FORMAT);
var route = """
5937,"51.476105","-0.100224","2011-03-22 07:55:26"
5937,"51.475967","-0.100368","2011-03-22 07:55:27"
5937,"51.476021","-0.100246","2011-03-22 07:55:28"
5937,"51.476051","-0.100078","2011-03-22 07:55:29"
5937,"51.476009","-0.099922","2011-03-22 07:55:30"
5937,"51.476044","-0.099775","2011-03-22 07:55:31"
5937,"51.476074","-0.099968","2011-03-22 07:55:42"
5937,"51.476086","-0.100047","2011-03-22 07:55:43"
5937,"51.476074","-0.100123","2011-03-22 07:55:44"
5937,"51.476089","-0.10019","2011-03-22 07:55:45"
5937,"51.476112","-0.100246","2011-03-22 07:55:46"
5937,"51.476112","-0.100264","2011-03-22 07:55:47"
5937,"51.476116","-0.10028","2011-03-22 07:55:48"
5937,"51.476112","-0.100356","2011-03-22 07:55:49"
5937,"51.476135","-0.100378","2011-03-22 07:55:50"
5937,"51.476154","-0.100392","2011-03-22 07:55:51"
5937,"51.476189","-0.100396","2011-03-22 07:55:52"
5937,"51.476219","-0.100387","2011-03-22 07:55:53"
5937,"51.476257","-0.100379","2011-03-22 07:55:54"
""";
var dispatcher = new Dispatcher(start, end, 0);
var drone = (Drone) dispatcher.registerDroneWithReader(5937, new BufferedReader(new StringReader(route)));
var droneRunner = new Thread(drone);
droneRunner.start();
dispatcher.run();
droneRunner.join();
assertEquals(route.lines().count(), drone.getWaypointsVisited());
assertEquals(0, drone.getWaypointsRemaining());
assertEquals(DroneState.TERMINATED, drone.getState());
}
@Test
public void registerTubeStations() throws IOException, InterruptedException {
var start = LocalDateTime.parse("2011-03-22 07:55:20", Dispatcher.DATE_FORMAT);
var end = LocalDateTime.parse("2011-03-22 07:56:00", Dispatcher.DATE_FORMAT);
var dispatcher = new Dispatcher(start, end, 0);
var stations = """
"Acton Town",51.503071,-0.280303
"Aldgate",51.514342,-0.075627
"Aldgate East",51.51503,-0.073162
"All Saints (DLR)",51.510477,-0.012625
""";
dispatcher.registerTubeStations(new BufferedReader(new StringReader(stations)));
var route = """
5937,"51.476105","-0.100224","2011-03-22 07:55:26"
5937,"51.51503","-0.073162","2011-03-22 07:55:27"
5937,"51.476021","-0.100246","2011-03-22 07:55:28"
""";
var drone = (Drone) dispatcher.registerDroneWithReader(5937, new BufferedReader(new StringReader(route)));
var droneRunner = new Thread(drone);
droneRunner.start();
dispatcher.run();
droneRunner.join();
var foundTubeStations = drone.getTubeStationsFound();
assertTrue(foundTubeStations.stream().anyMatch(tb -> tb.name().equals("Aldgate")));
assertTrue(foundTubeStations.stream().anyMatch(tb -> tb.name().equals("Aldgate East")));
}
}

View File

@ -0,0 +1,92 @@
package drones.drone;
import drones.dispatcher.Dispatcher;
import drones.geo.Point;
import drones.messages.EventBus;
import drones.messages.Message;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class DroneTest {
@Test
public void setupDrone() {
var points = generatePoints();
var eventBus = new EventBus(0);
var drone = new Drone(1L, getStartTime(), 1, eventBus, Collections.emptyList());
eventBus.register(drone);
eventBus.push(new Message.PointData(drone.getId(), points.get(0)));
eventBus.push(new Message.PointData(drone.getId(), points.get(1)));
eventBus.push(new Message.PointData(drone.getId(), points.get(2)));
assertEquals(3, drone.getWaypointsRemaining());
assertEquals(0, drone.getWaypointsVisited());
}
@Test
public void initialWaypointIsSetAsPosition() throws InterruptedException {
var points = generatePoints();
var eventBus = new EventBus(0);
var drone = new Drone(1L, getStartTime(), 0, eventBus, Collections.emptyList());
eventBus.register(drone);
var droneThread = new Thread(drone);
droneThread.start();
eventBus.push(new Message.PointData(drone.getId(), points.get(0)));
eventBus.push(new Message.ShutdownSignal(drone.getId(), drone.getId(), getEndTime()));
droneThread.join();
assertEquals(drone.getPosition(), points.get(0));
assertEquals(drone.getWaypointsVisited(), 1);
assertEquals(drone.getWaypointsRemaining(), 0);
}
@Test
public void travelBetweenWaypoints() throws InterruptedException {
var points = generatePoints();
var eventBus = new EventBus(0);
var drone = new Drone(1L, getStartTime(), 0, eventBus, Collections.emptyList());
eventBus.register(drone);
var droneThread = new Thread(drone);
droneThread.start();
eventBus.push(new Message.PointData(drone.getId(), points.get(0)));
eventBus.push(new Message.PointData(drone.getId(), points.get(1)));
eventBus.push(new Message.PointData(drone.getId(), points.get(2)));
eventBus.push(new Message.ShutdownSignal(drone.getId(), drone.getId(), getEndTime()));
droneThread.join();
assertEquals(DroneState.TERMINATED, drone.getState());
assertEquals(getEndTime(), drone.getPosition().time());
assertEquals(0, drone.getWaypointsRemaining());
assertEquals(3, drone.getWaypointsVisited());
}
private List<Point> generatePoints() {
var point1 = new Point(51.476105, -0.100224, LocalDateTime.parse("2011-03-22 07:55:26", Dispatcher.DATE_FORMAT));
var point2 = new Point(51.475967, -0.100368, LocalDateTime.parse("2011-03-22 07:55:30", Dispatcher.DATE_FORMAT));
var point3 = new Point(51.476021, -0.100246, LocalDateTime.parse("2011-03-22 07:55:34", Dispatcher.DATE_FORMAT));
return List.of(point1, point2, point3);
}
private LocalDateTime getStartTime() {
return LocalDateTime.parse("2011-03-22 07:55:20", Dispatcher.DATE_FORMAT);
}
private LocalDateTime getEndTime() {
return LocalDateTime.parse("2011-03-22 07:55:40", Dispatcher.DATE_FORMAT);
}
}

View File

@ -0,0 +1,26 @@
package drones.messages;
import drones.geo.Point;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.ArrayList;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class EventBusTest {
@Test
public void listenerTriggeredOnEvent() {
var triggered = new ArrayList<Message>();
var eventBus = new EventBus(0);
eventBus.register(triggered::add);
eventBus.push(new Message.PointData(1L, new Point(1, 2, LocalDateTime.now())));
eventBus.push(new Message.ShutdownSignal(2, 2, LocalDateTime.now()));
assertEquals(2, triggered.size());
assertEquals(1, triggered.get(0).targetId());
assertEquals(2, triggered.get(1).targetId());
}
}