1use crate::trace;
13use crate::{HashMap, HashMapExt};
14use std::{cmp::Ordering, collections::BinaryHeap};
15
16pub struct Queue<T, P: Eq + PartialEq + Ord> {
31 queue: BinaryHeap<PlanSchedule<P>>,
32 data_map: HashMap<u64, T>,
33 plan_counter: u64,
34}
35
36impl<T, P: Eq + PartialEq + Ord> Queue<T, P> {
37 #[must_use]
39 pub fn new() -> Queue<T, P> {
40 Queue {
41 queue: BinaryHeap::new(),
42 data_map: HashMap::new(),
43 plan_counter: 0,
44 }
45 }
46
47 pub fn add_plan(&mut self, time: f64, data: T, priority: P) -> PlanId {
52 trace!("adding plan at {time}");
53 let plan_id = self.plan_counter;
55 self.queue.push(PlanSchedule {
56 plan_id,
57 time,
58 priority,
59 });
60 self.data_map.insert(plan_id, data);
61 self.plan_counter += 1;
62 PlanId(plan_id)
63 }
64
65 pub fn cancel_plan(&mut self, plan_id: &PlanId) -> Option<T> {
67 trace!("cancel plan {plan_id:?}");
68 self.data_map.remove(&plan_id.0)
71 }
72
73 #[must_use]
74 pub fn is_empty(&self) -> bool {
75 self.queue.is_empty()
76 }
77
78 #[must_use]
79 pub fn next_time(&self) -> Option<f64> {
80 self.queue.peek().map(|e| e.time)
81 }
82
83 #[allow(dead_code)]
84 pub(crate) fn clear(&mut self) {
85 self.data_map.clear();
86 self.queue.clear();
87 self.plan_counter = 0;
88 }
89
90 #[must_use]
91 #[allow(dead_code)]
92 pub(crate) fn peek(&self) -> Option<(&PlanSchedule<P>, &T)> {
93 for entry in &self.queue {
95 if let Some(data) = self.data_map.get(&entry.plan_id) {
97 return Some((entry, data));
98 }
99 }
100 None
101 }
102
103 pub fn get_next_plan(&mut self) -> Option<Plan<T>> {
107 trace!("getting next plan");
108 loop {
109 match self.queue.pop() {
111 Some(entry) => {
112 if let Some(data) = self.data_map.remove(&entry.plan_id) {
114 return Some(Plan {
115 time: entry.time,
116 data,
117 });
118 }
119 }
120 None => {
121 return None;
122 }
123 }
124 }
125 }
126
127 #[must_use]
130 pub fn list_schedules(&self, at_most: usize) -> Vec<&PlanSchedule<P>> {
131 let mut items = vec![];
132
133 for entry in &self.queue {
135 if self.data_map.contains_key(&entry.plan_id) {
137 items.push(entry);
138 if items.len() == at_most {
139 break;
140 }
141 }
142 }
143 items
144 }
145
146 #[doc(hidden)]
147 pub(crate) fn remaining_plan_count(&self) -> usize {
148 self.queue.len()
149 }
150}
151
152impl<T, P: Eq + PartialEq + Ord> Default for Queue<T, P> {
153 fn default() -> Self {
154 Self::new()
155 }
156}
157
158#[derive(PartialEq, Debug)]
163pub struct PlanSchedule<P: Eq + PartialEq + Ord> {
164 pub plan_id: u64,
165 pub time: f64,
166 pub priority: P,
167}
168
169#[allow(clippy::expl_impl_clone_on_copy)] impl<P: Eq + PartialEq + Ord + Clone> Clone for PlanSchedule<P> {
171 fn clone(&self) -> Self {
172 PlanSchedule {
173 priority: self.priority.clone(),
174 ..*self
175 }
176 }
177}
178
179impl<P: Eq + PartialEq + Ord + Copy + Clone> Copy for PlanSchedule<P> {}
180
181impl<P: Eq + PartialEq + Ord> Eq for PlanSchedule<P> {}
182
183impl<P: Eq + PartialEq + Ord> PartialOrd for PlanSchedule<P> {
184 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
185 Some(self.cmp(other))
186 }
187}
188
189impl<P: Eq + PartialEq + Ord> Ord for PlanSchedule<P> {
192 fn cmp(&self, other: &Self) -> Ordering {
193 let time_ordering = self.time.partial_cmp(&other.time).unwrap().reverse();
194 match time_ordering {
195 Ordering::Equal => {
197 let priority_ordering = self
198 .priority
199 .partial_cmp(&other.priority)
200 .unwrap()
201 .reverse();
202 match priority_ordering {
203 Ordering::Equal => self.plan_id.cmp(&other.plan_id).reverse(),
204 _ => priority_ordering,
205 }
206 }
207 _ => time_ordering,
208 }
209 }
210}
211
212#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
214pub struct PlanId(pub(crate) u64);
215
216pub struct Plan<T> {
218 pub time: f64,
219 pub data: T,
220}
221
222#[cfg(test)]
223#[allow(clippy::float_cmp)]
224mod tests {
225 use super::Queue;
226
227 #[test]
228 fn empty_queue() {
229 let mut plan_queue = Queue::<(), ()>::new();
230 assert!(plan_queue.get_next_plan().is_none());
231 }
232
233 #[test]
234 fn add_plans() {
235 let mut plan_queue = Queue::new();
236 plan_queue.add_plan(1.0, 1, ());
237 plan_queue.add_plan(3.0, 3, ());
238 plan_queue.add_plan(2.0, 2, ());
239 assert!(!plan_queue.is_empty());
240
241 let next_plan = plan_queue.get_next_plan().unwrap();
242 assert_eq!(next_plan.time, 1.0);
243 assert_eq!(next_plan.data, 1);
244
245 assert!(!plan_queue.is_empty());
246 let next_plan = plan_queue.get_next_plan().unwrap();
247 assert_eq!(next_plan.time, 2.0);
248 assert_eq!(next_plan.data, 2);
249
250 assert!(!plan_queue.is_empty());
251 let next_plan = plan_queue.get_next_plan().unwrap();
252 assert_eq!(next_plan.time, 3.0);
253 assert_eq!(next_plan.data, 3);
254
255 assert!(plan_queue.is_empty());
256 assert!(plan_queue.get_next_plan().is_none());
257 }
258
259 #[test]
260 fn add_plans_at_same_time_with_same_priority() {
261 let mut plan_queue = Queue::new();
262 plan_queue.add_plan(1.0, 1, ());
263 plan_queue.add_plan(1.0, 2, ());
264 assert!(!plan_queue.is_empty());
265
266 let next_plan = plan_queue.get_next_plan().unwrap();
267 assert_eq!(next_plan.time, 1.0);
268 assert_eq!(next_plan.data, 1);
269
270 assert!(!plan_queue.is_empty());
271 let next_plan = plan_queue.get_next_plan().unwrap();
272 assert_eq!(next_plan.time, 1.0);
273 assert_eq!(next_plan.data, 2);
274
275 assert!(plan_queue.is_empty());
276 assert!(plan_queue.get_next_plan().is_none());
277 }
278
279 #[test]
280 fn add_plans_at_same_time_with_different_priority() {
281 let mut plan_queue = Queue::new();
282 plan_queue.add_plan(1.0, 1, 1);
283 plan_queue.add_plan(1.0, 2, 0);
284
285 assert!(!plan_queue.is_empty());
286 let next_plan = plan_queue.get_next_plan().unwrap();
287 assert_eq!(next_plan.time, 1.0);
288 assert_eq!(next_plan.data, 2);
289
290 let next_plan = plan_queue.get_next_plan().unwrap();
291 assert_eq!(next_plan.time, 1.0);
292 assert_eq!(next_plan.data, 1);
293
294 assert!(plan_queue.is_empty());
295 assert!(plan_queue.get_next_plan().is_none());
296 }
297
298 #[test]
299 fn add_and_cancel_plans() {
300 let mut plan_queue = Queue::new();
301 plan_queue.add_plan(1.0, 1, ());
302 let plan_to_cancel = plan_queue.add_plan(2.0, 2, ());
303 plan_queue.add_plan(3.0, 3, ());
304 plan_queue.cancel_plan(&plan_to_cancel);
305 assert!(!plan_queue.is_empty());
306
307 let next_plan = plan_queue.get_next_plan().unwrap();
308 assert_eq!(next_plan.time, 1.0);
309 assert_eq!(next_plan.data, 1);
310
311 assert!(!plan_queue.is_empty());
312 let next_plan = plan_queue.get_next_plan().unwrap();
313 assert_eq!(next_plan.time, 3.0);
314 assert_eq!(next_plan.data, 3);
315
316 assert!(plan_queue.is_empty());
317 assert!(plan_queue.get_next_plan().is_none());
318 }
319
320 #[test]
321 fn add_and_get_plans() {
322 let mut plan_queue = Queue::new();
323 plan_queue.add_plan(1.0, 1, ());
324 plan_queue.add_plan(2.0, 2, ());
325 assert!(!plan_queue.is_empty());
326
327 let next_plan = plan_queue.get_next_plan().unwrap();
328 assert_eq!(next_plan.time, 1.0);
329 assert_eq!(next_plan.data, 1);
330
331 plan_queue.add_plan(1.5, 3, ());
332
333 assert!(!plan_queue.is_empty());
334 let next_plan = plan_queue.get_next_plan().unwrap();
335 assert_eq!(next_plan.time, 1.5);
336 assert_eq!(next_plan.data, 3);
337
338 assert!(!plan_queue.is_empty());
339 let next_plan = plan_queue.get_next_plan().unwrap();
340 assert_eq!(next_plan.time, 2.0);
341 assert_eq!(next_plan.data, 2);
342
343 assert!(plan_queue.is_empty());
344 assert!(plan_queue.get_next_plan().is_none());
345 }
346
347 #[test]
348 fn cancel_invalid_plan() {
349 let mut plan_queue = Queue::new();
350 let plan_to_cancel = plan_queue.add_plan(1.0, (), ());
351 assert!(!plan_queue.is_empty());
353 plan_queue.get_next_plan();
354 assert!(plan_queue.is_empty());
355 let result = plan_queue.cancel_plan(&plan_to_cancel);
356 assert!(result.is_none());
357 }
358}