1use std::cmp::Ordering;
13use std::collections::BinaryHeap;
14
15use crate::{trace, HashMap, HashMapExt};
16
17pub struct Queue<T, P: Eq + PartialEq + Ord> {
32 queue: BinaryHeap<PlanSchedule<P>>,
33 data_map: HashMap<u64, T>,
34 plan_counter: u64,
35}
36
37impl<T, P: Eq + PartialEq + Ord> Queue<T, P> {
38 #[must_use]
40 pub fn new() -> Queue<T, P> {
41 Queue {
42 queue: BinaryHeap::new(),
43 data_map: HashMap::new(),
44 plan_counter: 0,
45 }
46 }
47
48 pub fn add_plan(&mut self, time: f64, data: T, priority: P) -> PlanId {
53 trace!("adding plan at {time}");
54 let plan_id = self.plan_counter;
56 self.queue.push(PlanSchedule {
57 plan_id,
58 time,
59 priority,
60 });
61 self.data_map.insert(plan_id, data);
62 self.plan_counter += 1;
63 PlanId(plan_id)
64 }
65
66 pub fn cancel_plan(&mut self, plan_id: &PlanId) -> Option<T> {
68 trace!("cancel plan {plan_id:?}");
69 self.data_map.remove(&plan_id.0)
72 }
73
74 #[must_use]
75 pub fn is_empty(&self) -> bool {
76 self.queue.is_empty()
77 }
78
79 #[must_use]
80 pub fn next_time(&self) -> Option<f64> {
81 self.queue.peek().map(|e| e.time)
82 }
83
84 #[allow(dead_code)]
85 pub(crate) fn clear(&mut self) {
86 self.data_map.clear();
87 self.queue.clear();
88 self.plan_counter = 0;
89 }
90
91 #[must_use]
92 #[allow(dead_code)]
93 pub(crate) fn peek(&self) -> Option<(&PlanSchedule<P>, &T)> {
94 for entry in &self.queue {
96 if let Some(data) = self.data_map.get(&entry.plan_id) {
98 return Some((entry, data));
99 }
100 }
101 None
102 }
103
104 pub fn get_next_plan(&mut self) -> Option<Plan<T>> {
108 trace!("getting next plan");
109 loop {
110 match self.queue.pop() {
112 Some(entry) => {
113 if let Some(data) = self.data_map.remove(&entry.plan_id) {
115 return Some(Plan {
116 time: entry.time,
117 data,
118 });
119 }
120 }
121 None => {
122 return None;
123 }
124 }
125 }
126 }
127
128 #[must_use]
131 pub fn list_schedules(&self, at_most: usize) -> Vec<&PlanSchedule<P>> {
132 let mut items = vec![];
133
134 for entry in &self.queue {
136 if self.data_map.contains_key(&entry.plan_id) {
138 items.push(entry);
139 if items.len() == at_most {
140 break;
141 }
142 }
143 }
144 items
145 }
146
147 #[doc(hidden)]
148 pub(crate) fn remaining_plan_count(&self) -> usize {
149 self.queue.len()
150 }
151}
152
153impl<T, P: Eq + PartialEq + Ord> Default for Queue<T, P> {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159#[derive(PartialEq, Debug)]
164pub struct PlanSchedule<P: Eq + PartialEq + Ord> {
165 pub plan_id: u64,
166 pub time: f64,
167 pub priority: P,
168}
169
170#[allow(clippy::expl_impl_clone_on_copy)] impl<P: Eq + PartialEq + Ord + Clone> Clone for PlanSchedule<P> {
172 fn clone(&self) -> Self {
173 PlanSchedule {
174 priority: self.priority.clone(),
175 ..*self
176 }
177 }
178}
179
180impl<P: Eq + PartialEq + Ord + Copy + Clone> Copy for PlanSchedule<P> {}
181
182impl<P: Eq + PartialEq + Ord> Eq for PlanSchedule<P> {}
183
184impl<P: Eq + PartialEq + Ord> PartialOrd for PlanSchedule<P> {
185 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
186 Some(self.cmp(other))
187 }
188}
189
190impl<P: Eq + PartialEq + Ord> Ord for PlanSchedule<P> {
193 fn cmp(&self, other: &Self) -> Ordering {
194 let time_ordering = self.time.partial_cmp(&other.time).unwrap().reverse();
195 match time_ordering {
196 Ordering::Equal => {
198 let priority_ordering = self
199 .priority
200 .partial_cmp(&other.priority)
201 .unwrap()
202 .reverse();
203 match priority_ordering {
204 Ordering::Equal => self.plan_id.cmp(&other.plan_id).reverse(),
205 _ => priority_ordering,
206 }
207 }
208 _ => time_ordering,
209 }
210 }
211}
212
213#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
215pub struct PlanId(pub(crate) u64);
216
217pub struct Plan<T> {
219 pub time: f64,
220 pub data: T,
221}
222
223#[cfg(test)]
224#[allow(clippy::float_cmp)]
225mod tests {
226 use super::Queue;
227
228 #[test]
229 fn empty_queue() {
230 let mut plan_queue = Queue::<(), ()>::new();
231 assert!(plan_queue.get_next_plan().is_none());
232 }
233
234 #[test]
235 fn add_plans() {
236 let mut plan_queue = Queue::new();
237 plan_queue.add_plan(1.0, 1, ());
238 plan_queue.add_plan(3.0, 3, ());
239 plan_queue.add_plan(2.0, 2, ());
240 assert!(!plan_queue.is_empty());
241
242 let next_plan = plan_queue.get_next_plan().unwrap();
243 assert_eq!(next_plan.time, 1.0);
244 assert_eq!(next_plan.data, 1);
245
246 assert!(!plan_queue.is_empty());
247 let next_plan = plan_queue.get_next_plan().unwrap();
248 assert_eq!(next_plan.time, 2.0);
249 assert_eq!(next_plan.data, 2);
250
251 assert!(!plan_queue.is_empty());
252 let next_plan = plan_queue.get_next_plan().unwrap();
253 assert_eq!(next_plan.time, 3.0);
254 assert_eq!(next_plan.data, 3);
255
256 assert!(plan_queue.is_empty());
257 assert!(plan_queue.get_next_plan().is_none());
258 }
259
260 #[test]
261 fn add_plans_at_same_time_with_same_priority() {
262 let mut plan_queue = Queue::new();
263 plan_queue.add_plan(1.0, 1, ());
264 plan_queue.add_plan(1.0, 2, ());
265 assert!(!plan_queue.is_empty());
266
267 let next_plan = plan_queue.get_next_plan().unwrap();
268 assert_eq!(next_plan.time, 1.0);
269 assert_eq!(next_plan.data, 1);
270
271 assert!(!plan_queue.is_empty());
272 let next_plan = plan_queue.get_next_plan().unwrap();
273 assert_eq!(next_plan.time, 1.0);
274 assert_eq!(next_plan.data, 2);
275
276 assert!(plan_queue.is_empty());
277 assert!(plan_queue.get_next_plan().is_none());
278 }
279
280 #[test]
281 fn add_plans_at_same_time_with_different_priority() {
282 let mut plan_queue = Queue::new();
283 plan_queue.add_plan(1.0, 1, 1);
284 plan_queue.add_plan(1.0, 2, 0);
285
286 assert!(!plan_queue.is_empty());
287 let next_plan = plan_queue.get_next_plan().unwrap();
288 assert_eq!(next_plan.time, 1.0);
289 assert_eq!(next_plan.data, 2);
290
291 let next_plan = plan_queue.get_next_plan().unwrap();
292 assert_eq!(next_plan.time, 1.0);
293 assert_eq!(next_plan.data, 1);
294
295 assert!(plan_queue.is_empty());
296 assert!(plan_queue.get_next_plan().is_none());
297 }
298
299 #[test]
300 fn add_and_cancel_plans() {
301 let mut plan_queue = Queue::new();
302 plan_queue.add_plan(1.0, 1, ());
303 let plan_to_cancel = plan_queue.add_plan(2.0, 2, ());
304 plan_queue.add_plan(3.0, 3, ());
305 plan_queue.cancel_plan(&plan_to_cancel);
306 assert!(!plan_queue.is_empty());
307
308 let next_plan = plan_queue.get_next_plan().unwrap();
309 assert_eq!(next_plan.time, 1.0);
310 assert_eq!(next_plan.data, 1);
311
312 assert!(!plan_queue.is_empty());
313 let next_plan = plan_queue.get_next_plan().unwrap();
314 assert_eq!(next_plan.time, 3.0);
315 assert_eq!(next_plan.data, 3);
316
317 assert!(plan_queue.is_empty());
318 assert!(plan_queue.get_next_plan().is_none());
319 }
320
321 #[test]
322 fn add_and_get_plans() {
323 let mut plan_queue = Queue::new();
324 plan_queue.add_plan(1.0, 1, ());
325 plan_queue.add_plan(2.0, 2, ());
326 assert!(!plan_queue.is_empty());
327
328 let next_plan = plan_queue.get_next_plan().unwrap();
329 assert_eq!(next_plan.time, 1.0);
330 assert_eq!(next_plan.data, 1);
331
332 plan_queue.add_plan(1.5, 3, ());
333
334 assert!(!plan_queue.is_empty());
335 let next_plan = plan_queue.get_next_plan().unwrap();
336 assert_eq!(next_plan.time, 1.5);
337 assert_eq!(next_plan.data, 3);
338
339 assert!(!plan_queue.is_empty());
340 let next_plan = plan_queue.get_next_plan().unwrap();
341 assert_eq!(next_plan.time, 2.0);
342 assert_eq!(next_plan.data, 2);
343
344 assert!(plan_queue.is_empty());
345 assert!(plan_queue.get_next_plan().is_none());
346 }
347
348 #[test]
349 fn cancel_invalid_plan() {
350 let mut plan_queue = Queue::new();
351 let plan_to_cancel = plan_queue.add_plan(1.0, (), ());
352 assert!(!plan_queue.is_empty());
354 plan_queue.get_next_plan();
355 assert!(plan_queue.is_empty());
356 let result = plan_queue.cancel_plan(&plan_to_cancel);
357 assert!(result.is_none());
358 }
359}