ixa/
plan.rs

1//! A priority queue that stores arbitrary data sorted by time and priority
2//!
3//! Defines a [`Queue`]`<T, P>` that is intended to store a queue of items of type
4//! `T` - sorted by `f64` time and definable priority `P` - called 'plans'.
5//! This queue has methods for adding plans, cancelling plans, and retrieving
6//! the earliest plan in the queue. Adding a plan is *O*(log(*n*)) while
7//! cancellation and retrieval are *O*(1).
8//!
9//! This queue is used by [`Context`](crate::Context) to store future events where some callback
10//! closure `FnOnce(&mut Context)` will be executed at a given point in time.
11
12use std::cmp::Ordering;
13use std::collections::BinaryHeap;
14
15use crate::{trace, HashMap, HashMapExt};
16
17/// A priority queue that stores arbitrary data sorted by time
18///
19/// Items of type `T` are stored in order by `f64` time and called [`Plan`]`<T>`.
20/// Plans can have priorities given by some specified orderable type `P`.
21/// When plans are created they are sequentially assigned a [`PlanId`] that is a
22/// wrapped `u64`. If two plans are scheduled for the same time then the plan
23/// with the lowest priority is placed earlier. If two plans have the same time
24/// and priority then the plan that is scheduled first (i.e., that has the
25/// lowest id) is placed earlier.
26///
27/// The time, plan id, and priority are stored in a binary heap of [`PlanSchedule`]`<P>`
28/// objects. The data payload of the event is stored in a hash map by plan id.
29/// Plan cancellation occurs by removing the corresponding entry from the data
30/// hash map.
31pub struct Queue<T, P: Eq + PartialEq + Ord> {
32    queue: BinaryHeap<PlanSchedule<P>>,
33    data_map: HashMap<u64, T>,
34    /// The number of plans that have been added; equivalently, the next plan ID that
35    /// will be issued.
36    plan_counter: u64,
37    /// Tracks the high water mark of plans in flight (scheduled but not yet executed).
38    /// This is the max of `self.queue.len()`, not of `self.data_map.len()`.
39    #[cfg(feature = "profiling")]
40    pub(crate) max_plans_in_flight: u64,
41    #[cfg(feature = "profiling")]
42    pub(crate) max_memory_in_use: u64,
43}
44
45impl<T, P: Eq + PartialEq + Ord> Queue<T, P> {
46    /// Create a new empty `Queue<T>`
47    #[must_use]
48    pub fn new() -> Queue<T, P> {
49        Queue {
50            queue: BinaryHeap::new(),
51            data_map: HashMap::new(),
52            plan_counter: 0,
53            #[cfg(feature = "profiling")]
54            max_plans_in_flight: 0,
55            #[cfg(feature = "profiling")]
56            max_memory_in_use: 0,
57        }
58    }
59
60    /// Add a plan to the queue at the specified time
61    ///
62    /// Returns a [`PlanId`] for the newly-added plan that can be used to cancel it
63    /// if needed.
64    pub fn add_plan(&mut self, time: f64, data: T, priority: P) -> PlanId {
65        trace!("adding plan at {time}");
66        // Add plan to queue, store data, and increment counter
67        let plan_id = self.plan_counter;
68        self.queue.push(PlanSchedule {
69            plan_id,
70            time,
71            priority,
72        });
73        self.data_map.insert(plan_id, data);
74        self.plan_counter += 1;
75        #[cfg(feature = "profiling")]
76        {
77            self.max_plans_in_flight = self.max_plans_in_flight.max(self.queue.len() as u64);
78            self.max_memory_in_use = self
79                .max_memory_in_use
80                .max(self.estimated_memory_in_use() as u64);
81        }
82
83        PlanId(plan_id)
84    }
85
86    /// Cancel a plan that has been added to the queue
87    pub fn cancel_plan(&mut self, plan_id: &PlanId) -> Option<T> {
88        trace!("cancel plan {plan_id:?}");
89        // Delete the plan from the map, but leave in the queue
90        // It will be skipped when the plan is popped from the queue
91        self.data_map.remove(&plan_id.0)
92    }
93
94    #[must_use]
95    pub fn is_empty(&self) -> bool {
96        self.queue.is_empty()
97    }
98
99    #[must_use]
100    pub fn next_time(&self) -> Option<f64> {
101        self.queue.peek().map(|e| e.time)
102    }
103
104    #[allow(dead_code)]
105    pub(crate) fn clear(&mut self) {
106        self.data_map.clear();
107        self.queue.clear();
108        self.plan_counter = 0;
109    }
110
111    #[must_use]
112    #[allow(dead_code)]
113    pub(crate) fn peek(&self) -> Option<(&PlanSchedule<P>, &T)> {
114        // Iterate over queue until we find a plan with data or queue is empty
115        for entry in &self.queue {
116            // Skip plans that have been cancelled and thus have no data
117            if let Some(data) = self.data_map.get(&entry.plan_id) {
118                return Some((entry, data));
119            }
120        }
121        None
122    }
123
124    /// Retrieve the earliest plan in the queue
125    ///
126    /// Returns the next plan if it exists or else `None` if the queue is empty
127    pub fn get_next_plan(&mut self) -> Option<Plan<T>> {
128        trace!("getting next plan");
129        loop {
130            // Pop from queue until we find a plan with data or queue is empty
131            match self.queue.pop() {
132                Some(entry) => {
133                    // Skip plans that have been cancelled and thus have no data
134                    if let Some(data) = self.data_map.remove(&entry.plan_id) {
135                        return Some(Plan {
136                            time: entry.time,
137                            data,
138                        });
139                    }
140                }
141                None => {
142                    return None;
143                }
144            }
145        }
146    }
147
148    /// Returns a list of length `at_most`, or unbounded if `at_most=0`, of active scheduled
149    /// [`PlanSchedule`]s ordered as they are in the queue itself.
150    #[must_use]
151    pub fn list_schedules(&self, at_most: usize) -> Vec<&PlanSchedule<P>> {
152        let mut items = vec![];
153
154        // Iterate over queue until we find a plan with data or queue is empty
155        for entry in &self.queue {
156            // Skip plans that have been cancelled and thus have no data
157            if self.data_map.contains_key(&entry.plan_id) {
158                items.push(entry);
159                if items.len() == at_most {
160                    break;
161                }
162            }
163        }
164        items
165    }
166
167    #[doc(hidden)]
168    pub(crate) fn remaining_plan_count(&self) -> usize {
169        self.queue.len()
170    }
171
172    #[cfg(feature = "profiling")]
173    fn estimated_memory_in_use(&self) -> usize {
174        let queue_bytes = self.queue.capacity() * size_of::<PlanSchedule<P>>();
175
176        let map_entry_bytes = self.data_map.capacity() * size_of::<(u64, T)>();
177
178        queue_bytes + map_entry_bytes
179    }
180}
181
182impl<T, P: Eq + PartialEq + Ord> Default for Queue<T, P> {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188/// A time, id, and priority object used to order plans in the [`Queue`]`<T>`
189///
190/// [`PlanSchedule`] objects are sorted in increasing order of time, priority and then
191/// plan id
192#[derive(PartialEq, Debug)]
193pub struct PlanSchedule<P: Eq + PartialEq + Ord> {
194    pub plan_id: u64,
195    pub time: f64,
196    pub priority: P,
197}
198
199#[allow(clippy::expl_impl_clone_on_copy)] // Clippy false positive
200impl<P: Eq + PartialEq + Ord + Clone> Clone for PlanSchedule<P> {
201    fn clone(&self) -> Self {
202        PlanSchedule {
203            priority: self.priority.clone(),
204            ..*self
205        }
206    }
207}
208
209impl<P: Eq + PartialEq + Ord + Copy + Clone> Copy for PlanSchedule<P> {}
210
211impl<P: Eq + PartialEq + Ord> Eq for PlanSchedule<P> {}
212
213impl<P: Eq + PartialEq + Ord> PartialOrd for PlanSchedule<P> {
214    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
215        Some(self.cmp(other))
216    }
217}
218
219/// Entry objects are ordered in increasing order by time, priority, and then
220/// plan id
221impl<P: Eq + PartialEq + Ord> Ord for PlanSchedule<P> {
222    fn cmp(&self, other: &Self) -> Ordering {
223        let time_ordering = self.time.partial_cmp(&other.time).unwrap().reverse();
224        match time_ordering {
225            // Break time ties in order of priority and then plan id
226            Ordering::Equal => {
227                let priority_ordering = self
228                    .priority
229                    .partial_cmp(&other.priority)
230                    .unwrap()
231                    .reverse();
232                match priority_ordering {
233                    Ordering::Equal => self.plan_id.cmp(&other.plan_id).reverse(),
234                    _ => priority_ordering,
235                }
236            }
237            _ => time_ordering,
238        }
239    }
240}
241
242/// A unique identifier for a plan added to a [`Queue`]`<T>`
243#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
244pub struct PlanId(pub(crate) u64);
245
246/// A plan that holds data of type `T` intended to be used at the specified time
247pub struct Plan<T> {
248    pub time: f64,
249    pub data: T,
250}
251
252#[cfg(test)]
253#[allow(clippy::float_cmp)]
254mod tests {
255    use super::Queue;
256
257    #[test]
258    fn empty_queue() {
259        let mut plan_queue = Queue::<(), ()>::new();
260        assert!(plan_queue.get_next_plan().is_none());
261    }
262
263    #[test]
264    fn add_plans() {
265        let mut plan_queue = Queue::new();
266        plan_queue.add_plan(1.0, 1, ());
267        plan_queue.add_plan(3.0, 3, ());
268        plan_queue.add_plan(2.0, 2, ());
269        assert!(!plan_queue.is_empty());
270
271        let next_plan = plan_queue.get_next_plan().unwrap();
272        assert_eq!(next_plan.time, 1.0);
273        assert_eq!(next_plan.data, 1);
274
275        assert!(!plan_queue.is_empty());
276        let next_plan = plan_queue.get_next_plan().unwrap();
277        assert_eq!(next_plan.time, 2.0);
278        assert_eq!(next_plan.data, 2);
279
280        assert!(!plan_queue.is_empty());
281        let next_plan = plan_queue.get_next_plan().unwrap();
282        assert_eq!(next_plan.time, 3.0);
283        assert_eq!(next_plan.data, 3);
284
285        assert!(plan_queue.is_empty());
286        assert!(plan_queue.get_next_plan().is_none());
287    }
288
289    #[test]
290    fn add_plans_at_same_time_with_same_priority() {
291        let mut plan_queue = Queue::new();
292        plan_queue.add_plan(1.0, 1, ());
293        plan_queue.add_plan(1.0, 2, ());
294        assert!(!plan_queue.is_empty());
295
296        let next_plan = plan_queue.get_next_plan().unwrap();
297        assert_eq!(next_plan.time, 1.0);
298        assert_eq!(next_plan.data, 1);
299
300        assert!(!plan_queue.is_empty());
301        let next_plan = plan_queue.get_next_plan().unwrap();
302        assert_eq!(next_plan.time, 1.0);
303        assert_eq!(next_plan.data, 2);
304
305        assert!(plan_queue.is_empty());
306        assert!(plan_queue.get_next_plan().is_none());
307    }
308
309    #[test]
310    fn add_plans_at_same_time_with_different_priority() {
311        let mut plan_queue = Queue::new();
312        plan_queue.add_plan(1.0, 1, 1);
313        plan_queue.add_plan(1.0, 2, 0);
314
315        assert!(!plan_queue.is_empty());
316        let next_plan = plan_queue.get_next_plan().unwrap();
317        assert_eq!(next_plan.time, 1.0);
318        assert_eq!(next_plan.data, 2);
319
320        let next_plan = plan_queue.get_next_plan().unwrap();
321        assert_eq!(next_plan.time, 1.0);
322        assert_eq!(next_plan.data, 1);
323
324        assert!(plan_queue.is_empty());
325        assert!(plan_queue.get_next_plan().is_none());
326    }
327
328    #[test]
329    fn add_and_cancel_plans() {
330        let mut plan_queue = Queue::new();
331        plan_queue.add_plan(1.0, 1, ());
332        let plan_to_cancel = plan_queue.add_plan(2.0, 2, ());
333        plan_queue.add_plan(3.0, 3, ());
334        plan_queue.cancel_plan(&plan_to_cancel);
335        assert!(!plan_queue.is_empty());
336
337        let next_plan = plan_queue.get_next_plan().unwrap();
338        assert_eq!(next_plan.time, 1.0);
339        assert_eq!(next_plan.data, 1);
340
341        assert!(!plan_queue.is_empty());
342        let next_plan = plan_queue.get_next_plan().unwrap();
343        assert_eq!(next_plan.time, 3.0);
344        assert_eq!(next_plan.data, 3);
345
346        assert!(plan_queue.is_empty());
347        assert!(plan_queue.get_next_plan().is_none());
348    }
349
350    #[test]
351    fn add_and_get_plans() {
352        let mut plan_queue = Queue::new();
353        plan_queue.add_plan(1.0, 1, ());
354        plan_queue.add_plan(2.0, 2, ());
355        assert!(!plan_queue.is_empty());
356
357        let next_plan = plan_queue.get_next_plan().unwrap();
358        assert_eq!(next_plan.time, 1.0);
359        assert_eq!(next_plan.data, 1);
360
361        plan_queue.add_plan(1.5, 3, ());
362
363        assert!(!plan_queue.is_empty());
364        let next_plan = plan_queue.get_next_plan().unwrap();
365        assert_eq!(next_plan.time, 1.5);
366        assert_eq!(next_plan.data, 3);
367
368        assert!(!plan_queue.is_empty());
369        let next_plan = plan_queue.get_next_plan().unwrap();
370        assert_eq!(next_plan.time, 2.0);
371        assert_eq!(next_plan.data, 2);
372
373        assert!(plan_queue.is_empty());
374        assert!(plan_queue.get_next_plan().is_none());
375    }
376
377    #[test]
378    fn cancel_invalid_plan() {
379        let mut plan_queue = Queue::new();
380        let plan_to_cancel = plan_queue.add_plan(1.0, (), ());
381        // is_empty just checks for a plan existing, not whether it is valid/has data
382        assert!(!plan_queue.is_empty());
383        plan_queue.get_next_plan();
384        assert!(plan_queue.is_empty());
385        let result = plan_queue.cancel_plan(&plan_to_cancel);
386        assert!(result.is_none());
387    }
388}