canns.task.open_loop_navigation

Classes

ActionPolicy

Abstract base class for action policies that control agent movement.

CustomOpenLoopNavigationTask

Open-loop navigation task driven by a custom action policy.

OpenLoopNavigationData

Container for open-loop navigation trajectories and derived signals.

OpenLoopNavigationTask

Open-loop navigation task that synthesizes trajectories.

RasterScanNavigationTask

Preset open-loop task for cyclic dual-mode raster scan exploration.

StateAwareRasterScanPolicy

State-aware raster scan policy with cyclic dual-mode exploration.

TMazeOpenLoopNavigationTask

Open-loop navigation task in a T-maze environment.

TMazeRecessOpenLoopNavigationTask

Open-loop navigation task in a T-maze with recesses at the junction.

Functions

map2pi(a)

Wrap angles to the range [-pi, pi].

Module Contents

class canns.task.open_loop_navigation.ActionPolicy[source]

Bases: abc.ABC

Abstract base class for action policies that control agent movement.

Action policies compute parameters for agent.update() at each simulation step, enabling reusable and testable control strategies.

Workflow:

Setup -> Implement compute_action. Execute -> Pass the policy into a task and call get_data(). Result -> Task data is generated using the policy-controlled actions.

Examples

>>> import numpy as np
>>> import brainpy.math as bm
>>> from canns.task.open_loop_navigation import ActionPolicy, CustomOpenLoopNavigationTask
>>>
>>> class ConstantDriftPolicy(ActionPolicy):
...     def __init__(self, drift_direction):
...         self.drift = np.array(drift_direction, dtype=float)
...
...     def compute_action(self, step_idx, agent):
...         return {"drift_velocity": self.drift, "drift_to_random_strength_ratio": 10.0}
>>>
>>> bm.set_dt(0.1)
>>> task = CustomOpenLoopNavigationTask(
...     duration=0.5,
...     width=1.0,
...     height=1.0,
...     dt=bm.get_dt(),
...     action_policy=ConstantDriftPolicy([0.1, 0.0]),
...     progress_bar=False,
... )
>>> task.get_data()
>>> task.data.position.shape[1]
2
abstractmethod compute_action(step_idx, agent)[source]

Compute action parameters for the current simulation step.

Parameters:
  • step_idx (int) – Current simulation step (0 to total_steps-1)

  • agent (canns_lib.spatial.Agent) – Agent instance (for state-aware policies)

Returns:

Keyword arguments for agent.update()

Supported keys: - drift_velocity: np.ndarray of shape (2,) for 2D drift - drift_to_random_strength_ratio: float (typically 5.0-20.0) - forced_next_position: np.ndarray of shape (2,)

Return type:

dict

class canns.task.open_loop_navigation.CustomOpenLoopNavigationTask(*args, action_policy=None, **kwargs)[source]

Bases: OpenLoopNavigationTask

Open-loop navigation task driven by a custom action policy.

Provide an ActionPolicy to control how the agent moves at each step.

Workflow:

Setup -> Implement a policy and create the task. Execute -> Call get_data(). Result -> Trajectory data reflects the policy-driven actions.

Examples

>>> import numpy as np
>>> import brainpy.math as bm
>>> from canns.task.open_loop_navigation import ActionPolicy, CustomOpenLoopNavigationTask
>>>
>>> class MyPolicy(ActionPolicy):
...     def compute_action(self, step_idx, agent):
...         return {"drift_velocity": np.array([0.05, 0.0])}
>>>
>>> bm.set_dt(0.1)
>>> task = CustomOpenLoopNavigationTask(
...     duration=0.5,
...     width=1.0,
...     height=1.0,
...     dt=bm.get_dt(),
...     action_policy=MyPolicy(),
...     progress_bar=False,
... )
>>> task.get_data()
>>> task.data.velocity.shape[1]
2

Initializes the Task instance.

Parameters:

data_class (type, optional) – A dataclass type for structured data. If provided, the task will use this class to structure the loaded or generated data.

action_policy = None[source]
class canns.task.open_loop_navigation.OpenLoopNavigationData[source]

Container for open-loop navigation trajectories and derived signals.

It stores position, velocity, speed, movement direction, head direction, and rotational velocity. Optional fields are added for theta sweep analysis.

Workflow:

Setup -> Create an OpenLoopNavigationTask. Execute -> Call get_data(). Result -> Access trajectories from task.data.

Examples

>>> import brainpy.math as bm
>>> from canns.task.open_loop_navigation import OpenLoopNavigationTask
>>>
>>> bm.set_dt(0.1)
>>> task = OpenLoopNavigationTask(
...     duration=1.0,
...     width=1.0,
...     height=1.0,
...     dt=bm.get_dt(),
...     progress_bar=False,
... )
>>> task.get_data()
>>> task.data.position.shape[1]
2
ang_speed_gains: numpy.ndarray | None = None[source]
ang_velocity: numpy.ndarray | None = None[source]
hd_angle: numpy.ndarray[source]
linear_speed_gains: numpy.ndarray | None = None[source]
movement_direction: numpy.ndarray[source]
position: numpy.ndarray[source]
rot_vel: numpy.ndarray[source]
speed: numpy.ndarray[source]
velocity: numpy.ndarray[source]
class canns.task.open_loop_navigation.OpenLoopNavigationTask(duration=20.0, start_pos=(2.5, 2.5), initial_head_direction=None, progress_bar=True, width=5, height=5, dimensionality='2D', boundary_conditions='solid', scale=None, dx=0.01, grid_dx=None, grid_dy=None, boundary=None, walls=None, holes=None, objects=None, dt=None, speed_mean=0.04, speed_std=0.016, speed_coherence_time=0.7, rotational_velocity_coherence_time=0.08, rotational_velocity_std=120 * np.pi / 180, head_direction_smoothing_timescale=0.15, thigmotaxis=0.5, wall_repel_distance=0.1, wall_repel_strength=1.0, rng_seed=None)[source]

Bases: canns.task.navigation_base.BaseNavigationTask

Open-loop navigation task that synthesizes trajectories.

The trajectory is generated without real-time feedback control. This is useful for producing reproducible paths for model evaluation.

Workflow:

Setup -> Instantiate the task with environment and motion settings. Execute -> Call get_data() to generate a trajectory. Result -> Read task.data for positions, velocities, and speed.

Examples

>>> import brainpy.math as bm
>>> from canns.task.open_loop_navigation import OpenLoopNavigationTask
>>>
>>> bm.set_dt(0.1)
>>> task = OpenLoopNavigationTask(
...     duration=1.0,
...     width=1.0,
...     height=1.0,
...     start_pos=(0.5, 0.5),
...     dt=bm.get_dt(),
...     progress_bar=False,
... )
>>> task.get_data()
>>> task.data.position.shape[0] == task.total_steps
True

Initializes the Task instance.

Parameters:

data_class (type, optional) – A dataclass type for structured data. If provided, the task will use this class to structure the loaded or generated data.

calculate_theta_sweep_data()[source]

Calculate additional fields needed for theta sweep analysis. This should be called after get_data() to add ang_velocity, linear_speed_gains, and ang_speed_gains to the data.

get_data()[source]

Generates the inputs for the agent based on its current position.

get_empty_trajectory()[source]

Returns an empty trajectory data structure with the same shape as the generated trajectory. This is useful for initializing the trajectory data structure without any actual data.

import_data(position_data, times=None, dt=None, head_direction=None, initial_pos=None)[source]

Import external position coordinates and calculate derived features.

This method allows importing external trajectory data (e.g., from experimental recordings or other simulations) instead of using the built-in random motion model. The imported data will be processed to calculate velocity, speed, movement direction, head direction, and rotational velocity.

Parameters:
  • position_data (np.ndarray) – Array of position coordinates with shape (n_steps, 2) for 2D trajectories or (n_steps, 1) for 1D trajectories.

  • times (np.ndarray, optional) – Array of time points corresponding to position_data. If None, uniform time steps with dt will be assumed.

  • dt (float, optional) – Time step between consecutive positions. If None, uses self.dt. Required if times is None.

  • head_direction (np.ndarray, optional) – Array of head direction angles in radians with shape (n_steps,). If None, head direction will be derived from movement direction.

  • initial_pos (np.ndarray, optional) – Initial position for the agent. If None, uses the first position from position_data.

Raises:

ValueError – If position_data has invalid dimensions or if required parameters are missing.

Example

```python # Import experimental trajectory data positions = np.array([[0, 0], [0.1, 0.05], [0.2, 0.1], …]) # shape: (n_steps, 2) times = np.array([0, 0.1, 0.2, …]) # shape: (n_steps,)

task = OpenLoopNavigationTask(…) task.import_data(position_data=positions, times=times)

# Or with uniform time steps task.import_data(position_data=positions, dt=0.1) ```

reset()[source]

Resets the agent’s position to the starting position.

show_trajectory_analysis(show=True, save_path=None, figsize=(12, 3), smooth_window=50, **kwargs)[source]

Display comprehensive trajectory analysis including position, speed, and direction changes.

Parameters:
  • show (bool) – Whether to display the plot

  • save_path (str | None) – Path to save the figure

  • figsize (tuple[int, int]) – Figure size (width, height)

  • smooth_window (int) – Window size for smoothing speed and direction plots (set to 0 to disable smoothing)

  • **kwargs – Additional matplotlib parameters

duration = 20.0[source]
progress_bar = True[source]
run_steps[source]
total_steps[source]
class canns.task.open_loop_navigation.RasterScanNavigationTask(duration, width=1.0, height=1.0, step_size=0.03, margin=0.05, speed=0.15, drift_strength=15.0, **kwargs)[source]

Bases: CustomOpenLoopNavigationTask

Preset open-loop task for cyclic dual-mode raster scan exploration.

The task alternates between horizontal and vertical sweep phases to cover the environment while avoiding walls.

Workflow:

Setup -> Instantiate the task with scan parameters. Execute -> Call get_data(). Result -> Access the generated trajectory in task.data.

Examples

>>> import brainpy.math as bm
>>> from canns.task.open_loop_navigation import RasterScanNavigationTask
>>>
>>> bm.set_dt(0.1)
>>> task = RasterScanNavigationTask(
...     duration=0.5,
...     width=1.0,
...     height=1.0,
...     step_size=0.05,
...     dt=bm.get_dt(),
...     progress_bar=False,
... )
>>> task.get_data()
>>> task.data.position.shape[1]
2

Initializes the Task instance.

Parameters:

data_class (type, optional) – A dataclass type for structured data. If provided, the task will use this class to structure the loaded or generated data.

class canns.task.open_loop_navigation.StateAwareRasterScanPolicy(width, height, margin=0.05, step_size=0.03, speed=0.15, drift_strength=15.0)[source]

Bases: ActionPolicy

State-aware raster scan policy with cyclic dual-mode exploration.

Scanning strategy:
  1. Horizontal mode: left-right sweeps moving downward

  2. Vertical mode: up-down sweeps moving rightward

  3. Cycles continuously to avoid walls and improve coverage

Workflow:

Setup -> Instantiate the policy with environment size. Execute -> Use it in CustomOpenLoopNavigationTask.get_data(). Result -> The trajectory follows a raster-scan pattern.

Examples

>>> import brainpy.math as bm
>>> from canns.task.open_loop_navigation import (
...     StateAwareRasterScanPolicy,
...     CustomOpenLoopNavigationTask,
... )
>>>
>>> bm.set_dt(0.1)
>>> policy = StateAwareRasterScanPolicy(width=1.0, height=1.0)
>>> task = CustomOpenLoopNavigationTask(
...     duration=0.5,
...     width=1.0,
...     height=1.0,
...     dt=bm.get_dt(),
...     action_policy=policy,
...     progress_bar=False,
... )
>>> task.get_data()
>>> task.data.position.shape[1]
2
compute_action(step_idx, agent)[source]

Compute next action based on current agent position and scanning mode.

Implements cyclic dual-mode scanning: - Horizontal mode: Left-right sweeps moving downward - Vertical mode: Up-down sweeps moving rightward - Auto-switches between modes to avoid walls and maintain coverage

current_direction = 1.0[source]
drift_strength = 15.0[source]
height[source]
is_turning = False[source]
margin = 0.05[source]
mode = 'horizontal'[source]
speed = 0.15[source]
step_size = 0.03[source]
turn_steps_remaining = 0[source]
width[source]
class canns.task.open_loop_navigation.TMazeOpenLoopNavigationTask(w=0.3, l_s=1.0, l_arm=0.75, t=0.3, start_pos=(0.0, 0.15), duration=20.0, dt=None, **kwargs)[source]

Bases: OpenLoopNavigationTask

Open-loop navigation task in a T-maze environment.

The environment boundary is configured to a classic T-maze layout.

Workflow:

Setup -> Instantiate the task with maze geometry. Execute -> Call get_data(). Result -> Use task.data.position as the trajectory.

Examples

>>> import brainpy.math as bm
>>> from canns.task.open_loop_navigation import TMazeOpenLoopNavigationTask
>>>
>>> bm.set_dt(0.1)
>>> task = TMazeOpenLoopNavigationTask(duration=0.5, dt=bm.get_dt(), progress_bar=False)
>>> task.get_data()
>>> task.data.position.shape[1]
2

Initialize T-maze open-loop navigation task.

Parameters:
  • w – Width of the corridor (default: 0.3)

  • l_s – Length of the stem (default: 1.0)

  • l_arm – Length of each arm (default: 0.75)

  • t – Thickness of the walls (default: 0.3)

  • start_pos – Starting position of the agent (default: (0.0, 0.15))

  • duration – Duration of the trajectory in seconds (default: 20.0)

  • dt – Time step (default: None, uses bm.get_dt())

  • **kwargs – Additional keyword arguments passed to OpenLoopNavigationTask

class canns.task.open_loop_navigation.TMazeRecessOpenLoopNavigationTask(w=0.3, l_s=1.0, l_arm=0.75, t=0.3, recess_width=None, recess_depth=None, start_pos=(0.0, 0.15), duration=20.0, dt=None, **kwargs)[source]

Bases: TMazeOpenLoopNavigationTask

Open-loop navigation task in a T-maze with recesses at the junction.

Recesses add small indentations near the stem-arm junctions, providing extra spatial structure.

Workflow:

Setup -> Instantiate the task with recess geometry. Execute -> Call get_data(). Result -> Use task.data for downstream modeling.

Examples

>>> import brainpy.math as bm
>>> from canns.task.open_loop_navigation import TMazeRecessOpenLoopNavigationTask
>>>
>>> bm.set_dt(0.1)
>>> task = TMazeRecessOpenLoopNavigationTask(duration=0.5, dt=bm.get_dt(), progress_bar=False)
>>> task.get_data()
>>> task.data.position.shape[1]
2

Initialize T-maze with recesses open-loop navigation task.

Parameters:
  • w – Width of the corridor (default: 0.3)

  • l_s – Length of the stem (default: 1.0)

  • l_arm – Length of each arm (default: 0.75)

  • t – Thickness of the walls (default: 0.3)

  • recess_width – Width of recesses at stem-arm junctions (default: t/4)

  • recess_depth – Depth of recesses extending downward (default: t/4)

  • start_pos – Starting position of the agent (default: (0.0, 0.15))

  • duration – Duration of the trajectory in seconds (default: 20.0)

  • dt – Time step (default: None, uses bm.get_dt())

  • **kwargs – Additional keyword arguments passed to OpenLoopNavigationTask

canns.task.open_loop_navigation.map2pi(a)[source]

Wrap angles to the range [-pi, pi].

Workflow:

Setup -> Provide angles (scalar or array-like). Execute -> Call map2pi. Result -> Angles wrapped into [-pi, pi].

Examples

>>> import numpy as np
>>> import brainpy.math as bm
>>> from canns.task.open_loop_navigation import map2pi
>>>
>>> angles = bm.array([3.5, -4.0])
>>> wrapped = map2pi(angles)
>>> bool(((wrapped >= -np.pi) & (wrapped <= np.pi)).all())
True