ixa/
plan.rs

1//! A priority queue that stores arbitrary data sorted by time and priority
2//!
3//! Defines a `Queue<T, P>` that is intended to store a queue of items of type
4//! T - sorted by `f64` time and definable priority `P` - called 'plans'.
5//! This queue has methods for adding plans, cancelling plans, and retrieving
6//! the earliest plan in the queue. Adding a plan is *O*(log(*n*)) while
7//! cancellation and retrieval are *O*(1).
8//!
9//! This queue is used by `Context` to store future events where some callback
10//! closure `FnOnce(&mut Context)` will be executed at a given point in time.
11
12use crate::trace;
13use crate::{HashMap, HashMapExt};
14use std::{cmp::Ordering, collections::BinaryHeap};
15
16/// A priority queue that stores arbitrary data sorted by time
17///
18/// Items of type `T` are stored in order by `f64` time and called `Plan<T>`.
19/// Plans can have priorities given by some specified orderable type `P`.
20/// When plans are created they are sequentially assigned a `PlanId` that is a
21/// wrapped `u64`. If two plans are scheduled for the same time then the plan
22/// with the lowest priority is placed earlier. If two plans have the same time
23/// and priority then the plan that is scheduled first (i.e., that has the
24/// lowest id) is placed earlier.
25///
26/// The time, plan id, and priority are stored in a binary heap of `Entry<P>`
27/// objects. The data payload of the event is stored in a hash map by plan id.
28/// Plan cancellation occurs by removing the corresponding entry from the data
29/// hash map.
30pub struct Queue<T, P: Eq + PartialEq + Ord> {
31    queue: BinaryHeap<PlanSchedule<P>>,
32    data_map: HashMap<u64, T>,
33    plan_counter: u64,
34}
35
36impl<T, P: Eq + PartialEq + Ord> Queue<T, P> {
37    /// Create a new empty `Queue<T>`
38    #[must_use]
39    pub fn new() -> Queue<T, P> {
40        Queue {
41            queue: BinaryHeap::new(),
42            data_map: HashMap::new(),
43            plan_counter: 0,
44        }
45    }
46
47    /// Add a plan to the queue at the specified time
48    ///
49    /// Returns a `PlanId` for the newly-added plan that can be used to cancel it
50    /// if needed.
51    pub fn add_plan(&mut self, time: f64, data: T, priority: P) -> PlanId {
52        trace!("adding plan at {time}");
53        // Add plan to queue, store data, and increment counter
54        let plan_id = self.plan_counter;
55        self.queue.push(PlanSchedule {
56            plan_id,
57            time,
58            priority,
59        });
60        self.data_map.insert(plan_id, data);
61        self.plan_counter += 1;
62        PlanId(plan_id)
63    }
64
65    /// Cancel a plan that has been added to the queue
66    pub fn cancel_plan(&mut self, plan_id: &PlanId) -> Option<T> {
67        trace!("cancel plan {plan_id:?}");
68        // Delete the plan from the map, but leave in the queue
69        // It will be skipped when the plan is popped from the queue
70        self.data_map.remove(&plan_id.0)
71    }
72
73    #[must_use]
74    pub fn is_empty(&self) -> bool {
75        self.queue.is_empty()
76    }
77
78    #[must_use]
79    pub fn next_time(&self) -> Option<f64> {
80        self.queue.peek().map(|e| e.time)
81    }
82
83    #[allow(dead_code)]
84    pub(crate) fn clear(&mut self) {
85        self.data_map.clear();
86        self.queue.clear();
87        self.plan_counter = 0;
88    }
89
90    #[must_use]
91    #[allow(dead_code)]
92    pub(crate) fn peek(&self) -> Option<(&PlanSchedule<P>, &T)> {
93        // Iterate over queue until we find a plan with data or queue is empty
94        for entry in &self.queue {
95            // Skip plans that have been cancelled and thus have no data
96            if let Some(data) = self.data_map.get(&entry.plan_id) {
97                return Some((entry, data));
98            }
99        }
100        None
101    }
102
103    /// Retrieve the earliest plan in the queue
104    ///
105    /// Returns the next plan if it exists or else `None` if the queue is empty
106    pub fn get_next_plan(&mut self) -> Option<Plan<T>> {
107        trace!("getting next plan");
108        loop {
109            // Pop from queue until we find a plan with data or queue is empty
110            match self.queue.pop() {
111                Some(entry) => {
112                    // Skip plans that have been cancelled and thus have no data
113                    if let Some(data) = self.data_map.remove(&entry.plan_id) {
114                        return Some(Plan {
115                            time: entry.time,
116                            data,
117                        });
118                    }
119                }
120                None => {
121                    return None;
122                }
123            }
124        }
125    }
126
127    /// Returns a list of length `at_most`, or unbounded if `at_most=0`, of active scheduled
128    /// `PlanSchedule`s ordered as they are in the queue itself.
129    #[must_use]
130    pub fn list_schedules(&self, at_most: usize) -> Vec<&PlanSchedule<P>> {
131        let mut items = vec![];
132
133        // Iterate over queue until we find a plan with data or queue is empty
134        for entry in &self.queue {
135            // Skip plans that have been cancelled and thus have no data
136            if self.data_map.contains_key(&entry.plan_id) {
137                items.push(entry);
138                if items.len() == at_most {
139                    break;
140                }
141            }
142        }
143        items
144    }
145
146    #[doc(hidden)]
147    pub(crate) fn remaining_plan_count(&self) -> usize {
148        self.queue.len()
149    }
150}
151
152impl<T, P: Eq + PartialEq + Ord> Default for Queue<T, P> {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157
158/// A time, id, and priority object used to order plans in the `Queue<T>`
159///
160/// `Entry` objects are sorted in increasing order of time, priority and then
161/// plan id
162#[derive(PartialEq, Debug)]
163pub struct PlanSchedule<P: Eq + PartialEq + Ord> {
164    pub plan_id: u64,
165    pub time: f64,
166    pub priority: P,
167}
168
169#[allow(clippy::expl_impl_clone_on_copy)] // Clippy false positive
170impl<P: Eq + PartialEq + Ord + Clone> Clone for PlanSchedule<P> {
171    fn clone(&self) -> Self {
172        PlanSchedule {
173            priority: self.priority.clone(),
174            ..*self
175        }
176    }
177}
178
179impl<P: Eq + PartialEq + Ord + Copy + Clone> Copy for PlanSchedule<P> {}
180
181impl<P: Eq + PartialEq + Ord> Eq for PlanSchedule<P> {}
182
183impl<P: Eq + PartialEq + Ord> PartialOrd for PlanSchedule<P> {
184    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
185        Some(self.cmp(other))
186    }
187}
188
189/// Entry objects are ordered in increasing order by time, priority, and then
190/// plan id
191impl<P: Eq + PartialEq + Ord> Ord for PlanSchedule<P> {
192    fn cmp(&self, other: &Self) -> Ordering {
193        let time_ordering = self.time.partial_cmp(&other.time).unwrap().reverse();
194        match time_ordering {
195            // Break time ties in order of priority and then plan id
196            Ordering::Equal => {
197                let priority_ordering = self
198                    .priority
199                    .partial_cmp(&other.priority)
200                    .unwrap()
201                    .reverse();
202                match priority_ordering {
203                    Ordering::Equal => self.plan_id.cmp(&other.plan_id).reverse(),
204                    _ => priority_ordering,
205                }
206            }
207            _ => time_ordering,
208        }
209    }
210}
211
212/// A unique identifier for a plan added to a `Queue<T>`
213#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
214pub struct PlanId(pub(crate) u64);
215
216/// A plan that holds data of type `T` intended to be used at the specified time
217pub struct Plan<T> {
218    pub time: f64,
219    pub data: T,
220}
221
222#[cfg(test)]
223#[allow(clippy::float_cmp)]
224mod tests {
225    use super::Queue;
226
227    #[test]
228    fn empty_queue() {
229        let mut plan_queue = Queue::<(), ()>::new();
230        assert!(plan_queue.get_next_plan().is_none());
231    }
232
233    #[test]
234    fn add_plans() {
235        let mut plan_queue = Queue::new();
236        plan_queue.add_plan(1.0, 1, ());
237        plan_queue.add_plan(3.0, 3, ());
238        plan_queue.add_plan(2.0, 2, ());
239        assert!(!plan_queue.is_empty());
240
241        let next_plan = plan_queue.get_next_plan().unwrap();
242        assert_eq!(next_plan.time, 1.0);
243        assert_eq!(next_plan.data, 1);
244
245        assert!(!plan_queue.is_empty());
246        let next_plan = plan_queue.get_next_plan().unwrap();
247        assert_eq!(next_plan.time, 2.0);
248        assert_eq!(next_plan.data, 2);
249
250        assert!(!plan_queue.is_empty());
251        let next_plan = plan_queue.get_next_plan().unwrap();
252        assert_eq!(next_plan.time, 3.0);
253        assert_eq!(next_plan.data, 3);
254
255        assert!(plan_queue.is_empty());
256        assert!(plan_queue.get_next_plan().is_none());
257    }
258
259    #[test]
260    fn add_plans_at_same_time_with_same_priority() {
261        let mut plan_queue = Queue::new();
262        plan_queue.add_plan(1.0, 1, ());
263        plan_queue.add_plan(1.0, 2, ());
264        assert!(!plan_queue.is_empty());
265
266        let next_plan = plan_queue.get_next_plan().unwrap();
267        assert_eq!(next_plan.time, 1.0);
268        assert_eq!(next_plan.data, 1);
269
270        assert!(!plan_queue.is_empty());
271        let next_plan = plan_queue.get_next_plan().unwrap();
272        assert_eq!(next_plan.time, 1.0);
273        assert_eq!(next_plan.data, 2);
274
275        assert!(plan_queue.is_empty());
276        assert!(plan_queue.get_next_plan().is_none());
277    }
278
279    #[test]
280    fn add_plans_at_same_time_with_different_priority() {
281        let mut plan_queue = Queue::new();
282        plan_queue.add_plan(1.0, 1, 1);
283        plan_queue.add_plan(1.0, 2, 0);
284
285        assert!(!plan_queue.is_empty());
286        let next_plan = plan_queue.get_next_plan().unwrap();
287        assert_eq!(next_plan.time, 1.0);
288        assert_eq!(next_plan.data, 2);
289
290        let next_plan = plan_queue.get_next_plan().unwrap();
291        assert_eq!(next_plan.time, 1.0);
292        assert_eq!(next_plan.data, 1);
293
294        assert!(plan_queue.is_empty());
295        assert!(plan_queue.get_next_plan().is_none());
296    }
297
298    #[test]
299    fn add_and_cancel_plans() {
300        let mut plan_queue = Queue::new();
301        plan_queue.add_plan(1.0, 1, ());
302        let plan_to_cancel = plan_queue.add_plan(2.0, 2, ());
303        plan_queue.add_plan(3.0, 3, ());
304        plan_queue.cancel_plan(&plan_to_cancel);
305        assert!(!plan_queue.is_empty());
306
307        let next_plan = plan_queue.get_next_plan().unwrap();
308        assert_eq!(next_plan.time, 1.0);
309        assert_eq!(next_plan.data, 1);
310
311        assert!(!plan_queue.is_empty());
312        let next_plan = plan_queue.get_next_plan().unwrap();
313        assert_eq!(next_plan.time, 3.0);
314        assert_eq!(next_plan.data, 3);
315
316        assert!(plan_queue.is_empty());
317        assert!(plan_queue.get_next_plan().is_none());
318    }
319
320    #[test]
321    fn add_and_get_plans() {
322        let mut plan_queue = Queue::new();
323        plan_queue.add_plan(1.0, 1, ());
324        plan_queue.add_plan(2.0, 2, ());
325        assert!(!plan_queue.is_empty());
326
327        let next_plan = plan_queue.get_next_plan().unwrap();
328        assert_eq!(next_plan.time, 1.0);
329        assert_eq!(next_plan.data, 1);
330
331        plan_queue.add_plan(1.5, 3, ());
332
333        assert!(!plan_queue.is_empty());
334        let next_plan = plan_queue.get_next_plan().unwrap();
335        assert_eq!(next_plan.time, 1.5);
336        assert_eq!(next_plan.data, 3);
337
338        assert!(!plan_queue.is_empty());
339        let next_plan = plan_queue.get_next_plan().unwrap();
340        assert_eq!(next_plan.time, 2.0);
341        assert_eq!(next_plan.data, 2);
342
343        assert!(plan_queue.is_empty());
344        assert!(plan_queue.get_next_plan().is_none());
345    }
346
347    #[test]
348    fn cancel_invalid_plan() {
349        let mut plan_queue = Queue::new();
350        let plan_to_cancel = plan_queue.add_plan(1.0, (), ());
351        // is_empty just checks for a plan existing, not whether it is valid/has data
352        assert!(!plan_queue.is_empty());
353        plan_queue.get_next_plan();
354        assert!(plan_queue.is_empty());
355        let result = plan_queue.cancel_plan(&plan_to_cancel);
356        assert!(result.is_none());
357    }
358}