IntersectionControl

Getting started

Running an experiment/simulation

To run a simulation, you will need an implemented algorithm (which determines the behaviour of vehicles and intersection managers through the intersection) and an environment (the simulation environment in which the experiment runs). For a more detailed description of the structure of IntersectionControl, refer to the Project Structure page.

Let’s see how you would set up an experiment for a query-based intersection control algorithm in the Sumo simulation environment.

Note that you will need Sumo network (.net.xml and .sumocfg) files to run a Sumo simulation. A simple single-laned 4-way intersection network configuration can be found in intersection_control/environments/sumo/networks/single_intersection

Import the desired algorithm/environment:

from intersection_control.environments.sumo import SumoEnvironment, RandomDemandGenerator
from intersection_control.algorithms.qb_im import QBIMIntersectionManager, QBIMVehicle

Instantiate the environment:

Note

The RandomDemandGenerator here is used to programmatically add vehicles to specifically to the Sumo environment. Alternatively, Sumo based demand generation could be used

demand_generator = RandomDemandGenerator({
    "NE": 2, "NS": 2, "NW": 2, "EN": 2, "ES": 2, "EW": 2,
    "SN": 2, "SE": 2, "SW": 2, "WN": 2, "WE": 2, "WS": 2
}, 0.05)
env = SumoEnvironment("path/to/intersection.sumocfg",
                      demand_generator=demand_generator, time_step=0.05, gui=True)

Instantiate the vehicles and intersection managers:

intersection_managers = {QBIMIntersectionManager(intersection_id, env, 10, 0.05) for intersection_id in
                         env.intersections.get_ids()}  # In this Sumo network there is only one intersection
vehicles = {QBIMVehicle(vehicle_id, env, communication_range=75) for vehicle_id in env.vehicles.get_ids()}

Run the main loop:

STEP_COUNT = 360000  # 1 hour
for _ in range(STEP_COUNT):
    env.step()
    removed_vehicles = {v for v in vehicles if v.get_id() in env.get_removed_vehicles()}
    for v in removed_vehicles:
        v.destroy()
    new_vehicles = {QBIMVehicle(vehicle_id, env, communication_range=75)
                    for vehicle_id in env.get_added_vehicles()}
    vehicles = (vehicles - removed_vehicles).union(new_vehicles)
    for vehicle in vehicles:
        vehicle.step()
    for intersection_manager in intersection_managers:
        intersection_manager.step()

When run, this should result in a simulation that looks something like this: QBIM Simulation

Implementing an intersection control algorithm

This is done by creating a new subclass of intersection_control.core.algorithm.Vehicle and intersection_control.core.algorithm.IntersectionManager. Note that if you would like to implement a decentralised control algorithm (one where there is no central intersection manager), you would only have to subclass Vehicle.

Let’s create a very simple and stupid algorithm where at every step, vehicles communicate their speed to the nearby intersection manager, and then increases its speed by 1m/s:

from intersection_control.core import Vehicle, IntersectionManager, Environment, Message
from intersection_control.communication import DistanceBasedUnit


class StupidVehicle(Vehicle):
    def __init__(self, vehicle_id: str, environment: Environment):
        super().__init__(vehicle_id, environment)
        self.messaging_unit = DistanceBasedUnit(vehicle_id, 50, self.get_position)

    def step(self):
        # This assumes every intersection manager id will start with "intersection"
        for im in [im for im in self.messaging_unit.discover() if im.startswith("intersection")]:
            self.messaging_unit.send(im, Message(self.vehicle_id, {"speed": self.get_speed()}))
        self.set_desired_speed(self.get_speed() + 1)

    # Required when using the DistanceBasedUnit
    def destroy(self):
        self.messaging_unit.destroy()


class StupidIntersectionManager(IntersectionManager):
    def __init__(self, intersection_id: str, environment: Environment):
        super().__init__(intersection_id, environment)
        self.messaging_unit = DistanceBasedUnit(intersection_id, 50, self.get_position)

    def step(self):
        for message in self.messaging_unit.receive():
            print(f"Received message from {message.sender}: {message.contents}")

As you can see, implementing an intersection control algorithm simply requires defining the step() function for each vehicle and intersection manager, which will determine its behaviour at each simulation step.

Unsurprisingly, this algorithm leads to lots of crashes: Stupid Simulation

Implementing an intersection environment

Implementing your own intersection environment is much more involved that implementing a control algorithm. To do this, you will need to subclass intersection_control.core.environment.Environment, intersection_control.core.environment.VehicleHandler and intersection_control.core.environment.IntersectionHandler. Each of those classes will have a number of abstract methods that will need to be implemented.

from typing import List, Tuple
import time
from intersection_control.core.environment import Environment, IntersectionHandler, VehicleHandler


class UselessVehicleHandler(VehicleHandler):
    def get_speed(self, vehicle_id: str) -> float:
        return 0

    def get_position(self, vehicle_id) -> Tuple[float, float]:
        return 0, 0
    # ...


class UselessIntersectionHandler(IntersectionHandler):
    def get_width(self, intersection_id: str) -> float:
        return 0

    def get_height(self, intersection_id: str) -> float:
        return 0

    def get_position(self, intersection_id: str) -> Tuple[float, float]:
        return 0, 0
    # ...


class UselessEnvironment(Environment):
    def __init__(self):
        self._intersections = UselessIntersectionHandler()
        self._vehicles = UselessVehicleHandler()

    @property
    def intersections(self) -> IntersectionHandler:
        return self._intersections

    @property
    def vehicles(self) -> VehicleHandler:
        return self._vehicles

    def get_current_time(self) -> float:
        return time.time()
    # ...