use crate::debugger::enter_debugger;
use crate::plan::{PlanId, PlanSchedule, Queue};
use crate::{error, trace};
use crate::{HashMap, HashMapExt};
use std::fmt::{Display, Formatter};
use std::{
any::{Any, TypeId},
collections::VecDeque,
rc::Rc,
};
type Callback = dyn FnOnce(&mut Context);
type EventHandler<E> = dyn Fn(&mut Context, E);
pub trait IxaEvent {
fn on_subscribe(_context: &mut Context) {}
}
#[derive(PartialEq, Eq, Ord, Clone, Copy, PartialOrd, Hash, Debug)]
pub enum ExecutionPhase {
First,
Normal,
Last,
}
impl Display for ExecutionPhase {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
pub struct Context {
plan_queue: Queue<Box<Callback>, ExecutionPhase>,
callback_queue: VecDeque<Box<Callback>>,
event_handlers: HashMap<TypeId, Box<dyn Any>>,
data_plugins: HashMap<TypeId, Box<dyn Any>>,
breakpoints_scheduled: Queue<Box<Callback>, ExecutionPhase>,
current_time: f64,
shutdown_requested: bool,
break_requested: bool,
breakpoints_enabled: bool,
}
impl Context {
#[must_use]
pub fn new() -> Context {
Context {
plan_queue: Queue::new(),
callback_queue: VecDeque::new(),
event_handlers: HashMap::new(),
data_plugins: HashMap::new(),
breakpoints_scheduled: Queue::new(),
current_time: 0.0,
shutdown_requested: false,
break_requested: false,
breakpoints_enabled: true,
}
}
pub fn schedule_debugger(
&mut self,
time: f64,
priority: Option<ExecutionPhase>,
callback: Box<Callback>,
) {
trace!("scheduling debugger");
let priority = priority.unwrap_or(ExecutionPhase::First);
self.breakpoints_scheduled
.add_plan(time, callback, priority);
}
#[allow(clippy::missing_panics_doc)]
pub fn subscribe_to_event<E: IxaEvent + Copy + 'static>(
&mut self,
handler: impl Fn(&mut Context, E) + 'static,
) {
let handler_vec = self
.event_handlers
.entry(TypeId::of::<E>())
.or_insert_with(|| Box::<Vec<Rc<EventHandler<E>>>>::default());
let handler_vec: &mut Vec<Rc<EventHandler<E>>> = handler_vec.downcast_mut().unwrap();
handler_vec.push(Rc::new(handler));
E::on_subscribe(self);
}
#[allow(clippy::missing_panics_doc)]
pub fn emit_event<E: IxaEvent + Copy + 'static>(&mut self, event: E) {
let Context {
event_handlers,
callback_queue,
..
} = self;
if let Some(handler_vec) = event_handlers.get(&TypeId::of::<E>()) {
let handler_vec: &Vec<Rc<EventHandler<E>>> = handler_vec.downcast_ref().unwrap();
for handler in handler_vec {
let handler_clone = Rc::clone(handler);
callback_queue.push_back(Box::new(move |context| handler_clone(context, event)));
}
}
}
pub fn add_plan(&mut self, time: f64, callback: impl FnOnce(&mut Context) + 'static) -> PlanId {
self.add_plan_with_phase(time, callback, ExecutionPhase::Normal)
}
pub fn add_plan_with_phase(
&mut self,
time: f64,
callback: impl FnOnce(&mut Context) + 'static,
phase: ExecutionPhase,
) -> PlanId {
assert!(
!time.is_nan() && !time.is_infinite() && time >= self.current_time,
"Time is invalid"
);
self.plan_queue.add_plan(time, Box::new(callback), phase)
}
fn evaluate_periodic_and_schedule_next(
&mut self,
period: f64,
callback: impl Fn(&mut Context) + 'static,
phase: ExecutionPhase,
) {
trace!(
"evaluate periodic at {} (period={})",
self.current_time,
period
);
callback(self);
if !self.plan_queue.is_empty() {
let next_time = self.current_time + period;
self.add_plan_with_phase(
next_time,
move |context| context.evaluate_periodic_and_schedule_next(period, callback, phase),
phase,
);
}
}
pub fn add_periodic_plan_with_phase(
&mut self,
period: f64,
callback: impl Fn(&mut Context) + 'static,
phase: ExecutionPhase,
) {
assert!(
period > 0.0 && !period.is_nan() && !period.is_infinite(),
"Period must be greater than 0"
);
self.add_plan_with_phase(
0.0,
move |context| context.evaluate_periodic_and_schedule_next(period, callback, phase),
phase,
);
}
pub fn cancel_plan(&mut self, plan_id: &PlanId) {
trace!("canceling plan {plan_id:?}");
let result = self.plan_queue.cancel_plan(plan_id);
if result.is_none() {
error!("Tried to cancel nonexistent plan with ID = {plan_id:?}");
}
}
#[doc(hidden)]
#[allow(dead_code)]
pub(crate) fn remaining_plan_count(&self) -> usize {
self.plan_queue.remaining_plan_count()
}
pub fn queue_callback(&mut self, callback: impl FnOnce(&mut Context) + 'static) {
trace!("queuing callback");
self.callback_queue.push_back(Box::new(callback));
}
#[must_use]
#[allow(clippy::missing_panics_doc)]
#[allow(clippy::needless_pass_by_value)]
pub fn get_data_container_mut<T: DataPlugin>(
&mut self,
_data_plugin: T,
) -> &mut T::DataContainer {
self.data_plugins
.entry(TypeId::of::<T>())
.or_insert_with(|| Box::new(T::create_data_container()))
.downcast_mut::<T::DataContainer>()
.unwrap() }
#[must_use]
#[allow(clippy::needless_pass_by_value)]
pub fn get_data_container<T: DataPlugin>(&self, _data_plugin: T) -> Option<&T::DataContainer> {
if let Some(data) = self.data_plugins.get(&TypeId::of::<T>()) {
data.downcast_ref::<T::DataContainer>()
} else {
None
}
}
pub fn shutdown(&mut self) {
trace!("shutdown context");
self.shutdown_requested = true;
}
#[must_use]
pub fn get_current_time(&self) -> f64 {
self.current_time
}
pub fn request_debugger(&mut self) {
self.break_requested = true;
}
pub fn cancel_debugger_request(&mut self) {
self.break_requested = false;
}
pub fn disable_breakpoints(&mut self) {
self.breakpoints_enabled = false;
}
pub fn enable_breakpoints(&mut self) {
self.breakpoints_enabled = true;
}
#[must_use]
pub fn breakpoints_are_enabled(&self) -> bool {
self.breakpoints_enabled
}
pub fn delete_breakpoint(&mut self, breakpoint_id: u64) -> Option<Box<Callback>> {
self.breakpoints_scheduled
.cancel_plan(&PlanId(breakpoint_id))
}
#[must_use]
pub fn list_breakpoints(&self, at_most: usize) -> Vec<&PlanSchedule<ExecutionPhase>> {
self.breakpoints_scheduled.list_schedules(at_most)
}
pub fn clear_breakpoints(&mut self) {
self.breakpoints_scheduled.clear();
}
pub fn execute(&mut self) {
trace!("entering event loop");
loop {
if self.break_requested {
enter_debugger(self);
} else if self.shutdown_requested {
break;
} else {
self.execute_single_step();
}
}
}
pub fn execute_single_step(&mut self) {
if let Some((bp, _)) = self.breakpoints_scheduled.peek() {
if let Some(plan_time) = self.plan_queue.next_time() {
if (bp.priority == ExecutionPhase::First && bp.time <= plan_time)
|| (bp.priority == ExecutionPhase::Last && bp.time < plan_time)
{
self.breakpoints_scheduled.get_next_plan(); if self.breakpoints_enabled {
self.break_requested = true;
return;
}
}
} else {
self.breakpoints_scheduled.get_next_plan(); if self.breakpoints_enabled {
self.break_requested = true;
return;
}
}
}
if let Some(callback) = self.callback_queue.pop_front() {
trace!("calling callback");
callback(self);
}
else if let Some(plan) = self.plan_queue.get_next_plan() {
trace!("calling plan at {:.6}", plan.time);
self.current_time = plan.time;
(plan.data)(self);
} else {
trace!("No callbacks or plans; exiting event loop");
self.shutdown_requested = true;
}
}
}
pub(crate) fn run_with_plugin<T: DataPlugin>(
context: &mut Context,
f: impl Fn(&mut Context, &mut T::DataContainer),
) {
let mut data_container_box = context.data_plugins.remove(&TypeId::of::<T>()).unwrap();
let data_container = data_container_box
.downcast_mut::<T::DataContainer>()
.unwrap();
f(context, data_container);
context
.data_plugins
.insert(TypeId::of::<T>(), data_container_box);
}
impl Default for Context {
fn default() -> Self {
Self::new()
}
}
pub trait DataPlugin: Any {
type DataContainer;
fn create_data_container() -> Self::DataContainer;
}
#[macro_export]
macro_rules! define_data_plugin {
($plugin:ident, $data_container:ty, $default: expr) => {
struct $plugin;
impl $crate::context::DataPlugin for $plugin {
type DataContainer = $data_container;
fn create_data_container() -> Self::DataContainer {
$default
}
}
};
}
pub use define_data_plugin;
#[cfg(test)]
#[allow(clippy::float_cmp)]
mod tests {
use std::cell::RefCell;
use super::*;
use ixa_derive::IxaEvent;
define_data_plugin!(ComponentA, Vec<u32>, vec![]);
#[test]
fn empty_context() {
let mut context = Context::new();
context.execute();
assert_eq!(context.get_current_time(), 0.0);
}
#[test]
fn get_data_container() {
let mut context = Context::new();
context.get_data_container_mut(ComponentA).push(1);
assert_eq!(*context.get_data_container(ComponentA).unwrap(), vec![1],);
}
#[test]
fn get_uninitialized_data_container() {
let context = Context::new();
assert!(context.get_data_container(ComponentA).is_none());
}
fn add_plan(context: &mut Context, time: f64, value: u32) -> PlanId {
context.add_plan(time, move |context| {
context.get_data_container_mut(ComponentA).push(value);
})
}
fn add_plan_with_phase(
context: &mut Context,
time: f64,
value: u32,
phase: ExecutionPhase,
) -> PlanId {
context.add_plan_with_phase(
time,
move |context| {
context.get_data_container_mut(ComponentA).push(value);
},
phase,
)
}
#[test]
#[should_panic(expected = "Time is invalid")]
fn negative_plan_time() {
let mut context = Context::new();
add_plan(&mut context, -1.0, 0);
}
#[test]
#[should_panic(expected = "Time is invalid")]
fn infinite_plan_time() {
let mut context = Context::new();
add_plan(&mut context, f64::INFINITY, 0);
}
#[test]
#[should_panic(expected = "Time is invalid")]
fn nan_plan_time() {
let mut context = Context::new();
add_plan(&mut context, f64::NAN, 0);
}
#[test]
fn timed_plan_only() {
let mut context = Context::new();
add_plan(&mut context, 1.0, 1);
context.execute();
assert_eq!(context.get_current_time(), 1.0);
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1]);
}
#[test]
fn callback_only() {
let mut context = Context::new();
context.queue_callback(|context| {
context.get_data_container_mut(ComponentA).push(1);
});
context.execute();
assert_eq!(context.get_current_time(), 0.0);
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1]);
}
#[test]
fn callback_before_timed_plan() {
let mut context = Context::new();
context.queue_callback(|context| {
context.get_data_container_mut(ComponentA).push(1);
});
add_plan(&mut context, 1.0, 2);
context.execute();
assert_eq!(context.get_current_time(), 1.0);
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1, 2]);
}
#[test]
fn callback_adds_timed_plan() {
let mut context = Context::new();
context.queue_callback(|context| {
context.get_data_container_mut(ComponentA).push(1);
add_plan(context, 1.0, 2);
context.get_data_container_mut(ComponentA).push(3);
});
context.execute();
assert_eq!(context.get_current_time(), 1.0);
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1, 3, 2]);
}
#[test]
fn callback_adds_callback_and_timed_plan() {
let mut context = Context::new();
context.queue_callback(|context| {
context.get_data_container_mut(ComponentA).push(1);
add_plan(context, 1.0, 2);
context.queue_callback(|context| {
context.get_data_container_mut(ComponentA).push(4);
});
context.get_data_container_mut(ComponentA).push(3);
});
context.execute();
assert_eq!(context.get_current_time(), 1.0);
assert_eq!(
*context.get_data_container_mut(ComponentA),
vec![1, 3, 4, 2]
);
}
#[test]
fn timed_plan_adds_callback_and_timed_plan() {
let mut context = Context::new();
context.add_plan(1.0, |context| {
context.get_data_container_mut(ComponentA).push(1);
add_plan(context, 2.0, 3);
context.queue_callback(|context| {
context.get_data_container_mut(ComponentA).push(2);
});
});
context.execute();
assert_eq!(context.get_current_time(), 2.0);
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1, 2, 3]);
}
#[test]
fn cancel_plan() {
let mut context = Context::new();
let to_cancel = add_plan(&mut context, 2.0, 1);
context.add_plan(1.0, move |context| {
context.cancel_plan(&to_cancel);
});
context.execute();
assert_eq!(context.get_current_time(), 1.0);
let test_vec: Vec<u32> = vec![];
assert_eq!(*context.get_data_container_mut(ComponentA), test_vec);
}
#[test]
fn add_plan_with_current_time() {
let mut context = Context::new();
context.add_plan(1.0, move |context| {
context.get_data_container_mut(ComponentA).push(1);
add_plan(context, 1.0, 2);
context.queue_callback(|context| {
context.get_data_container_mut(ComponentA).push(3);
});
});
context.execute();
assert_eq!(context.get_current_time(), 1.0);
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1, 3, 2]);
}
#[test]
fn plans_at_same_time_fire_in_order() {
let mut context = Context::new();
add_plan(&mut context, 1.0, 1);
add_plan(&mut context, 1.0, 2);
context.execute();
assert_eq!(context.get_current_time(), 1.0);
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1, 2]);
}
#[test]
fn check_plan_phase_ordering() {
assert!(ExecutionPhase::First < ExecutionPhase::Normal);
assert!(ExecutionPhase::Normal < ExecutionPhase::Last);
}
#[test]
fn plans_at_same_time_follow_phase() {
let mut context = Context::new();
add_plan(&mut context, 1.0, 1);
add_plan_with_phase(&mut context, 1.0, 5, ExecutionPhase::Last);
add_plan_with_phase(&mut context, 1.0, 3, ExecutionPhase::First);
add_plan(&mut context, 1.0, 2);
add_plan_with_phase(&mut context, 1.0, 6, ExecutionPhase::Last);
add_plan_with_phase(&mut context, 1.0, 4, ExecutionPhase::First);
context.execute();
assert_eq!(context.get_current_time(), 1.0);
assert_eq!(
*context.get_data_container_mut(ComponentA),
vec![3, 4, 1, 2, 5, 6]
);
}
#[derive(Copy, Clone, IxaEvent)]
struct Event1 {
pub data: usize,
}
#[derive(Copy, Clone, IxaEvent)]
struct Event2 {
pub data: usize,
}
#[test]
fn simple_event() {
let mut context = Context::new();
let obs_data = Rc::new(RefCell::new(0));
let obs_data_clone = Rc::clone(&obs_data);
context.subscribe_to_event::<Event1>(move |_, event| {
*obs_data_clone.borrow_mut() = event.data;
});
context.emit_event(Event1 { data: 1 });
context.execute();
assert_eq!(*obs_data.borrow(), 1);
}
#[test]
fn multiple_events() {
let mut context = Context::new();
let obs_data = Rc::new(RefCell::new(0));
let obs_data_clone = Rc::clone(&obs_data);
context.subscribe_to_event::<Event1>(move |_, event| {
*obs_data_clone.borrow_mut() += event.data;
});
context.emit_event(Event1 { data: 1 });
context.emit_event(Event1 { data: 2 });
context.execute();
assert_eq!(*obs_data.borrow(), 3);
}
#[test]
fn multiple_event_handlers() {
let mut context = Context::new();
let obs_data1 = Rc::new(RefCell::new(0));
let obs_data1_clone = Rc::clone(&obs_data1);
let obs_data2 = Rc::new(RefCell::new(0));
let obs_data2_clone = Rc::clone(&obs_data2);
context.subscribe_to_event::<Event1>(move |_, event| {
*obs_data1_clone.borrow_mut() = event.data;
});
context.subscribe_to_event::<Event1>(move |_, event| {
*obs_data2_clone.borrow_mut() = event.data;
});
context.emit_event(Event1 { data: 1 });
context.execute();
assert_eq!(*obs_data1.borrow(), 1);
assert_eq!(*obs_data2.borrow(), 1);
}
#[test]
fn multiple_event_types() {
let mut context = Context::new();
let obs_data1 = Rc::new(RefCell::new(0));
let obs_data1_clone = Rc::clone(&obs_data1);
let obs_data2 = Rc::new(RefCell::new(0));
let obs_data2_clone = Rc::clone(&obs_data2);
context.subscribe_to_event::<Event1>(move |_, event| {
*obs_data1_clone.borrow_mut() = event.data;
});
context.subscribe_to_event::<Event2>(move |_, event| {
*obs_data2_clone.borrow_mut() = event.data;
});
context.emit_event(Event1 { data: 1 });
context.emit_event(Event2 { data: 2 });
context.execute();
assert_eq!(*obs_data1.borrow(), 1);
assert_eq!(*obs_data2.borrow(), 2);
}
#[test]
fn subscribe_after_event() {
let mut context = Context::new();
let obs_data = Rc::new(RefCell::new(0));
let obs_data_clone = Rc::clone(&obs_data);
context.emit_event(Event1 { data: 1 });
context.subscribe_to_event::<Event1>(move |_, event| {
*obs_data_clone.borrow_mut() = event.data;
});
context.execute();
assert_eq!(*obs_data.borrow(), 0);
}
#[test]
fn shutdown_cancels_plans() {
let mut context = Context::new();
add_plan(&mut context, 1.0, 1);
context.add_plan(1.5, Context::shutdown);
add_plan(&mut context, 2.0, 2);
context.execute();
assert_eq!(context.get_current_time(), 1.5);
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1]);
}
#[test]
fn shutdown_cancels_callbacks() {
let mut context = Context::new();
add_plan(&mut context, 1.0, 1);
context.add_plan(1.5, |context| {
context.queue_callback(|context| {
context.get_data_container_mut(ComponentA).push(3);
});
context.shutdown();
});
context.execute();
assert_eq!(context.get_current_time(), 1.5);
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1]);
}
#[test]
fn shutdown_cancels_events() {
let mut context = Context::new();
let obs_data = Rc::new(RefCell::new(0));
let obs_data_clone = Rc::clone(&obs_data);
context.subscribe_to_event::<Event1>(move |_, event| {
*obs_data_clone.borrow_mut() = event.data;
});
context.emit_event(Event1 { data: 1 });
context.shutdown();
context.execute();
assert_eq!(*obs_data.borrow(), 0);
}
#[test]
#[allow(clippy::cast_sign_loss)]
#[allow(clippy::cast_possible_truncation)]
fn periodic_plan_self_schedules() {
let mut context = Context::new();
context.add_periodic_plan_with_phase(
1.0,
|context| {
let time = context.get_current_time();
context.get_data_container_mut(ComponentA).push(time as u32);
},
ExecutionPhase::Last,
);
context.add_plan(1.0, move |_context| {});
context.add_plan(1.5, move |_context| {});
context.execute();
assert_eq!(context.get_current_time(), 2.0);
assert_eq!(
*context.get_data_container(ComponentA).unwrap(),
vec![0, 1, 2]
); }
}