ixa/
plan.rs

1//! A priority queue that stores arbitrary data sorted by time and priority
2//!
3//! Defines a [`Queue`]`<T, P>` that is intended to store a queue of items of type
4//! `T` - sorted by `f64` time and definable priority `P` - called 'plans'.
5//! This queue has methods for adding plans, cancelling plans, and retrieving
6//! the earliest plan in the queue. Adding a plan is *O*(log(*n*)) while
7//! cancellation and retrieval are *O*(1).
8//!
9//! This queue is used by [`Context`](crate::Context) to store future events where some callback
10//! closure `FnOnce(&mut Context)` will be executed at a given point in time.
11
12use std::cmp::Ordering;
13use std::collections::BinaryHeap;
14
15use crate::{trace, HashMap, HashMapExt};
16
17/// A priority queue that stores arbitrary data sorted by time
18///
19/// Items of type `T` are stored in order by `f64` time and called [`Plan`]`<T>`.
20/// Plans can have priorities given by some specified orderable type `P`.
21/// When plans are created they are sequentially assigned a [`PlanId`] that is a
22/// wrapped `u64`. If two plans are scheduled for the same time then the plan
23/// with the lowest priority is placed earlier. If two plans have the same time
24/// and priority then the plan that is scheduled first (i.e., that has the
25/// lowest id) is placed earlier.
26///
27/// The time, plan id, and priority are stored in a binary heap of [`PlanSchedule`]`<P>`
28/// objects. The data payload of the event is stored in a hash map by plan id.
29/// Plan cancellation occurs by removing the corresponding entry from the data
30/// hash map.
31pub struct Queue<T, P: Eq + PartialEq + Ord> {
32    queue: BinaryHeap<PlanSchedule<P>>,
33    data_map: HashMap<u64, T>,
34    plan_counter: u64,
35}
36
37impl<T, P: Eq + PartialEq + Ord> Queue<T, P> {
38    /// Create a new empty `Queue<T>`
39    #[must_use]
40    pub fn new() -> Queue<T, P> {
41        Queue {
42            queue: BinaryHeap::new(),
43            data_map: HashMap::new(),
44            plan_counter: 0,
45        }
46    }
47
48    /// Add a plan to the queue at the specified time
49    ///
50    /// Returns a [`PlanId`] for the newly-added plan that can be used to cancel it
51    /// if needed.
52    pub fn add_plan(&mut self, time: f64, data: T, priority: P) -> PlanId {
53        trace!("adding plan at {time}");
54        // Add plan to queue, store data, and increment counter
55        let plan_id = self.plan_counter;
56        self.queue.push(PlanSchedule {
57            plan_id,
58            time,
59            priority,
60        });
61        self.data_map.insert(plan_id, data);
62        self.plan_counter += 1;
63        PlanId(plan_id)
64    }
65
66    /// Cancel a plan that has been added to the queue
67    pub fn cancel_plan(&mut self, plan_id: &PlanId) -> Option<T> {
68        trace!("cancel plan {plan_id:?}");
69        // Delete the plan from the map, but leave in the queue
70        // It will be skipped when the plan is popped from the queue
71        self.data_map.remove(&plan_id.0)
72    }
73
74    #[must_use]
75    pub fn is_empty(&self) -> bool {
76        self.queue.is_empty()
77    }
78
79    #[must_use]
80    pub fn next_time(&self) -> Option<f64> {
81        self.queue.peek().map(|e| e.time)
82    }
83
84    #[allow(dead_code)]
85    pub(crate) fn clear(&mut self) {
86        self.data_map.clear();
87        self.queue.clear();
88        self.plan_counter = 0;
89    }
90
91    #[must_use]
92    #[allow(dead_code)]
93    pub(crate) fn peek(&self) -> Option<(&PlanSchedule<P>, &T)> {
94        // Iterate over queue until we find a plan with data or queue is empty
95        for entry in &self.queue {
96            // Skip plans that have been cancelled and thus have no data
97            if let Some(data) = self.data_map.get(&entry.plan_id) {
98                return Some((entry, data));
99            }
100        }
101        None
102    }
103
104    /// Retrieve the earliest plan in the queue
105    ///
106    /// Returns the next plan if it exists or else `None` if the queue is empty
107    pub fn get_next_plan(&mut self) -> Option<Plan<T>> {
108        trace!("getting next plan");
109        loop {
110            // Pop from queue until we find a plan with data or queue is empty
111            match self.queue.pop() {
112                Some(entry) => {
113                    // Skip plans that have been cancelled and thus have no data
114                    if let Some(data) = self.data_map.remove(&entry.plan_id) {
115                        return Some(Plan {
116                            time: entry.time,
117                            data,
118                        });
119                    }
120                }
121                None => {
122                    return None;
123                }
124            }
125        }
126    }
127
128    /// Returns a list of length `at_most`, or unbounded if `at_most=0`, of active scheduled
129    /// [`PlanSchedule`]s ordered as they are in the queue itself.
130    #[must_use]
131    pub fn list_schedules(&self, at_most: usize) -> Vec<&PlanSchedule<P>> {
132        let mut items = vec![];
133
134        // Iterate over queue until we find a plan with data or queue is empty
135        for entry in &self.queue {
136            // Skip plans that have been cancelled and thus have no data
137            if self.data_map.contains_key(&entry.plan_id) {
138                items.push(entry);
139                if items.len() == at_most {
140                    break;
141                }
142            }
143        }
144        items
145    }
146
147    #[doc(hidden)]
148    pub(crate) fn remaining_plan_count(&self) -> usize {
149        self.queue.len()
150    }
151}
152
153impl<T, P: Eq + PartialEq + Ord> Default for Queue<T, P> {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159/// A time, id, and priority object used to order plans in the [`Queue`]`<T>`
160///
161/// [`PlanSchedule`] objects are sorted in increasing order of time, priority and then
162/// plan id
163#[derive(PartialEq, Debug)]
164pub struct PlanSchedule<P: Eq + PartialEq + Ord> {
165    pub plan_id: u64,
166    pub time: f64,
167    pub priority: P,
168}
169
170#[allow(clippy::expl_impl_clone_on_copy)] // Clippy false positive
171impl<P: Eq + PartialEq + Ord + Clone> Clone for PlanSchedule<P> {
172    fn clone(&self) -> Self {
173        PlanSchedule {
174            priority: self.priority.clone(),
175            ..*self
176        }
177    }
178}
179
180impl<P: Eq + PartialEq + Ord + Copy + Clone> Copy for PlanSchedule<P> {}
181
182impl<P: Eq + PartialEq + Ord> Eq for PlanSchedule<P> {}
183
184impl<P: Eq + PartialEq + Ord> PartialOrd for PlanSchedule<P> {
185    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
186        Some(self.cmp(other))
187    }
188}
189
190/// Entry objects are ordered in increasing order by time, priority, and then
191/// plan id
192impl<P: Eq + PartialEq + Ord> Ord for PlanSchedule<P> {
193    fn cmp(&self, other: &Self) -> Ordering {
194        let time_ordering = self.time.partial_cmp(&other.time).unwrap().reverse();
195        match time_ordering {
196            // Break time ties in order of priority and then plan id
197            Ordering::Equal => {
198                let priority_ordering = self
199                    .priority
200                    .partial_cmp(&other.priority)
201                    .unwrap()
202                    .reverse();
203                match priority_ordering {
204                    Ordering::Equal => self.plan_id.cmp(&other.plan_id).reverse(),
205                    _ => priority_ordering,
206                }
207            }
208            _ => time_ordering,
209        }
210    }
211}
212
213/// A unique identifier for a plan added to a [`Queue`]`<T>`
214#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
215pub struct PlanId(pub(crate) u64);
216
217/// A plan that holds data of type `T` intended to be used at the specified time
218pub struct Plan<T> {
219    pub time: f64,
220    pub data: T,
221}
222
223#[cfg(test)]
224#[allow(clippy::float_cmp)]
225mod tests {
226    use super::Queue;
227
228    #[test]
229    fn empty_queue() {
230        let mut plan_queue = Queue::<(), ()>::new();
231        assert!(plan_queue.get_next_plan().is_none());
232    }
233
234    #[test]
235    fn add_plans() {
236        let mut plan_queue = Queue::new();
237        plan_queue.add_plan(1.0, 1, ());
238        plan_queue.add_plan(3.0, 3, ());
239        plan_queue.add_plan(2.0, 2, ());
240        assert!(!plan_queue.is_empty());
241
242        let next_plan = plan_queue.get_next_plan().unwrap();
243        assert_eq!(next_plan.time, 1.0);
244        assert_eq!(next_plan.data, 1);
245
246        assert!(!plan_queue.is_empty());
247        let next_plan = plan_queue.get_next_plan().unwrap();
248        assert_eq!(next_plan.time, 2.0);
249        assert_eq!(next_plan.data, 2);
250
251        assert!(!plan_queue.is_empty());
252        let next_plan = plan_queue.get_next_plan().unwrap();
253        assert_eq!(next_plan.time, 3.0);
254        assert_eq!(next_plan.data, 3);
255
256        assert!(plan_queue.is_empty());
257        assert!(plan_queue.get_next_plan().is_none());
258    }
259
260    #[test]
261    fn add_plans_at_same_time_with_same_priority() {
262        let mut plan_queue = Queue::new();
263        plan_queue.add_plan(1.0, 1, ());
264        plan_queue.add_plan(1.0, 2, ());
265        assert!(!plan_queue.is_empty());
266
267        let next_plan = plan_queue.get_next_plan().unwrap();
268        assert_eq!(next_plan.time, 1.0);
269        assert_eq!(next_plan.data, 1);
270
271        assert!(!plan_queue.is_empty());
272        let next_plan = plan_queue.get_next_plan().unwrap();
273        assert_eq!(next_plan.time, 1.0);
274        assert_eq!(next_plan.data, 2);
275
276        assert!(plan_queue.is_empty());
277        assert!(plan_queue.get_next_plan().is_none());
278    }
279
280    #[test]
281    fn add_plans_at_same_time_with_different_priority() {
282        let mut plan_queue = Queue::new();
283        plan_queue.add_plan(1.0, 1, 1);
284        plan_queue.add_plan(1.0, 2, 0);
285
286        assert!(!plan_queue.is_empty());
287        let next_plan = plan_queue.get_next_plan().unwrap();
288        assert_eq!(next_plan.time, 1.0);
289        assert_eq!(next_plan.data, 2);
290
291        let next_plan = plan_queue.get_next_plan().unwrap();
292        assert_eq!(next_plan.time, 1.0);
293        assert_eq!(next_plan.data, 1);
294
295        assert!(plan_queue.is_empty());
296        assert!(plan_queue.get_next_plan().is_none());
297    }
298
299    #[test]
300    fn add_and_cancel_plans() {
301        let mut plan_queue = Queue::new();
302        plan_queue.add_plan(1.0, 1, ());
303        let plan_to_cancel = plan_queue.add_plan(2.0, 2, ());
304        plan_queue.add_plan(3.0, 3, ());
305        plan_queue.cancel_plan(&plan_to_cancel);
306        assert!(!plan_queue.is_empty());
307
308        let next_plan = plan_queue.get_next_plan().unwrap();
309        assert_eq!(next_plan.time, 1.0);
310        assert_eq!(next_plan.data, 1);
311
312        assert!(!plan_queue.is_empty());
313        let next_plan = plan_queue.get_next_plan().unwrap();
314        assert_eq!(next_plan.time, 3.0);
315        assert_eq!(next_plan.data, 3);
316
317        assert!(plan_queue.is_empty());
318        assert!(plan_queue.get_next_plan().is_none());
319    }
320
321    #[test]
322    fn add_and_get_plans() {
323        let mut plan_queue = Queue::new();
324        plan_queue.add_plan(1.0, 1, ());
325        plan_queue.add_plan(2.0, 2, ());
326        assert!(!plan_queue.is_empty());
327
328        let next_plan = plan_queue.get_next_plan().unwrap();
329        assert_eq!(next_plan.time, 1.0);
330        assert_eq!(next_plan.data, 1);
331
332        plan_queue.add_plan(1.5, 3, ());
333
334        assert!(!plan_queue.is_empty());
335        let next_plan = plan_queue.get_next_plan().unwrap();
336        assert_eq!(next_plan.time, 1.5);
337        assert_eq!(next_plan.data, 3);
338
339        assert!(!plan_queue.is_empty());
340        let next_plan = plan_queue.get_next_plan().unwrap();
341        assert_eq!(next_plan.time, 2.0);
342        assert_eq!(next_plan.data, 2);
343
344        assert!(plan_queue.is_empty());
345        assert!(plan_queue.get_next_plan().is_none());
346    }
347
348    #[test]
349    fn cancel_invalid_plan() {
350        let mut plan_queue = Queue::new();
351        let plan_to_cancel = plan_queue.add_plan(1.0, (), ());
352        // is_empty just checks for a plan existing, not whether it is valid/has data
353        assert!(!plan_queue.is_empty());
354        plan_queue.get_next_plan();
355        assert!(plan_queue.is_empty());
356        let result = plan_queue.cancel_plan(&plan_to_cancel);
357        assert!(result.is_none());
358    }
359}