1use std::cmp::Ordering;
13use std::collections::BinaryHeap;
14
15use crate::{trace, HashMap, HashMapExt};
16
17pub struct Queue<T, P: Eq + PartialEq + Ord> {
32 queue: BinaryHeap<PlanSchedule<P>>,
33 data_map: HashMap<u64, T>,
34 plan_counter: u64,
37 #[cfg(feature = "profiling")]
40 pub(crate) max_plans_in_flight: u64,
41 #[cfg(feature = "profiling")]
42 pub(crate) max_memory_in_use: u64,
43}
44
45impl<T, P: Eq + PartialEq + Ord> Queue<T, P> {
46 #[must_use]
48 pub fn new() -> Queue<T, P> {
49 Queue {
50 queue: BinaryHeap::new(),
51 data_map: HashMap::new(),
52 plan_counter: 0,
53 #[cfg(feature = "profiling")]
54 max_plans_in_flight: 0,
55 #[cfg(feature = "profiling")]
56 max_memory_in_use: 0,
57 }
58 }
59
60 pub fn add_plan(&mut self, time: f64, data: T, priority: P) -> PlanId {
65 trace!("adding plan at {time}");
66 let plan_id = self.plan_counter;
68 self.queue.push(PlanSchedule {
69 plan_id,
70 time,
71 priority,
72 });
73 self.data_map.insert(plan_id, data);
74 self.plan_counter += 1;
75 #[cfg(feature = "profiling")]
76 {
77 self.max_plans_in_flight = self.max_plans_in_flight.max(self.queue.len() as u64);
78 self.max_memory_in_use = self
79 .max_memory_in_use
80 .max(self.estimated_memory_in_use() as u64);
81 }
82
83 PlanId(plan_id)
84 }
85
86 pub fn cancel_plan(&mut self, plan_id: &PlanId) -> Option<T> {
88 trace!("cancel plan {plan_id:?}");
89 self.data_map.remove(&plan_id.0)
92 }
93
94 #[must_use]
95 pub fn is_empty(&self) -> bool {
96 self.queue.is_empty()
97 }
98
99 #[must_use]
100 pub fn next_time(&self) -> Option<f64> {
101 self.queue.peek().map(|e| e.time)
102 }
103
104 #[allow(dead_code)]
105 pub(crate) fn clear(&mut self) {
106 self.data_map.clear();
107 self.queue.clear();
108 self.plan_counter = 0;
109 }
110
111 #[must_use]
112 #[allow(dead_code)]
113 pub(crate) fn peek(&self) -> Option<(&PlanSchedule<P>, &T)> {
114 for entry in &self.queue {
116 if let Some(data) = self.data_map.get(&entry.plan_id) {
118 return Some((entry, data));
119 }
120 }
121 None
122 }
123
124 pub fn get_next_plan(&mut self) -> Option<Plan<T>> {
128 trace!("getting next plan");
129 loop {
130 match self.queue.pop() {
132 Some(entry) => {
133 if let Some(data) = self.data_map.remove(&entry.plan_id) {
135 return Some(Plan {
136 time: entry.time,
137 data,
138 });
139 }
140 }
141 None => {
142 return None;
143 }
144 }
145 }
146 }
147
148 #[must_use]
151 pub fn list_schedules(&self, at_most: usize) -> Vec<&PlanSchedule<P>> {
152 let mut items = vec![];
153
154 for entry in &self.queue {
156 if self.data_map.contains_key(&entry.plan_id) {
158 items.push(entry);
159 if items.len() == at_most {
160 break;
161 }
162 }
163 }
164 items
165 }
166
167 #[doc(hidden)]
168 pub(crate) fn remaining_plan_count(&self) -> usize {
169 self.queue.len()
170 }
171
172 #[cfg(feature = "profiling")]
173 fn estimated_memory_in_use(&self) -> usize {
174 let queue_bytes = self.queue.capacity() * size_of::<PlanSchedule<P>>();
175
176 let map_entry_bytes = self.data_map.capacity() * size_of::<(u64, T)>();
177
178 queue_bytes + map_entry_bytes
179 }
180}
181
182impl<T, P: Eq + PartialEq + Ord> Default for Queue<T, P> {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188#[derive(PartialEq, Debug)]
193pub struct PlanSchedule<P: Eq + PartialEq + Ord> {
194 pub plan_id: u64,
195 pub time: f64,
196 pub priority: P,
197}
198
199#[allow(clippy::expl_impl_clone_on_copy)] impl<P: Eq + PartialEq + Ord + Clone> Clone for PlanSchedule<P> {
201 fn clone(&self) -> Self {
202 PlanSchedule {
203 priority: self.priority.clone(),
204 ..*self
205 }
206 }
207}
208
209impl<P: Eq + PartialEq + Ord + Copy + Clone> Copy for PlanSchedule<P> {}
210
211impl<P: Eq + PartialEq + Ord> Eq for PlanSchedule<P> {}
212
213impl<P: Eq + PartialEq + Ord> PartialOrd for PlanSchedule<P> {
214 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
215 Some(self.cmp(other))
216 }
217}
218
219impl<P: Eq + PartialEq + Ord> Ord for PlanSchedule<P> {
222 fn cmp(&self, other: &Self) -> Ordering {
223 let time_ordering = self.time.partial_cmp(&other.time).unwrap().reverse();
224 match time_ordering {
225 Ordering::Equal => {
227 let priority_ordering = self
228 .priority
229 .partial_cmp(&other.priority)
230 .unwrap()
231 .reverse();
232 match priority_ordering {
233 Ordering::Equal => self.plan_id.cmp(&other.plan_id).reverse(),
234 _ => priority_ordering,
235 }
236 }
237 _ => time_ordering,
238 }
239 }
240}
241
242#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
244pub struct PlanId(pub(crate) u64);
245
246pub struct Plan<T> {
248 pub time: f64,
249 pub data: T,
250}
251
252#[cfg(test)]
253#[allow(clippy::float_cmp)]
254mod tests {
255 use super::Queue;
256
257 #[test]
258 fn empty_queue() {
259 let mut plan_queue = Queue::<(), ()>::new();
260 assert!(plan_queue.get_next_plan().is_none());
261 }
262
263 #[test]
264 fn add_plans() {
265 let mut plan_queue = Queue::new();
266 plan_queue.add_plan(1.0, 1, ());
267 plan_queue.add_plan(3.0, 3, ());
268 plan_queue.add_plan(2.0, 2, ());
269 assert!(!plan_queue.is_empty());
270
271 let next_plan = plan_queue.get_next_plan().unwrap();
272 assert_eq!(next_plan.time, 1.0);
273 assert_eq!(next_plan.data, 1);
274
275 assert!(!plan_queue.is_empty());
276 let next_plan = plan_queue.get_next_plan().unwrap();
277 assert_eq!(next_plan.time, 2.0);
278 assert_eq!(next_plan.data, 2);
279
280 assert!(!plan_queue.is_empty());
281 let next_plan = plan_queue.get_next_plan().unwrap();
282 assert_eq!(next_plan.time, 3.0);
283 assert_eq!(next_plan.data, 3);
284
285 assert!(plan_queue.is_empty());
286 assert!(plan_queue.get_next_plan().is_none());
287 }
288
289 #[test]
290 fn add_plans_at_same_time_with_same_priority() {
291 let mut plan_queue = Queue::new();
292 plan_queue.add_plan(1.0, 1, ());
293 plan_queue.add_plan(1.0, 2, ());
294 assert!(!plan_queue.is_empty());
295
296 let next_plan = plan_queue.get_next_plan().unwrap();
297 assert_eq!(next_plan.time, 1.0);
298 assert_eq!(next_plan.data, 1);
299
300 assert!(!plan_queue.is_empty());
301 let next_plan = plan_queue.get_next_plan().unwrap();
302 assert_eq!(next_plan.time, 1.0);
303 assert_eq!(next_plan.data, 2);
304
305 assert!(plan_queue.is_empty());
306 assert!(plan_queue.get_next_plan().is_none());
307 }
308
309 #[test]
310 fn add_plans_at_same_time_with_different_priority() {
311 let mut plan_queue = Queue::new();
312 plan_queue.add_plan(1.0, 1, 1);
313 plan_queue.add_plan(1.0, 2, 0);
314
315 assert!(!plan_queue.is_empty());
316 let next_plan = plan_queue.get_next_plan().unwrap();
317 assert_eq!(next_plan.time, 1.0);
318 assert_eq!(next_plan.data, 2);
319
320 let next_plan = plan_queue.get_next_plan().unwrap();
321 assert_eq!(next_plan.time, 1.0);
322 assert_eq!(next_plan.data, 1);
323
324 assert!(plan_queue.is_empty());
325 assert!(plan_queue.get_next_plan().is_none());
326 }
327
328 #[test]
329 fn add_and_cancel_plans() {
330 let mut plan_queue = Queue::new();
331 plan_queue.add_plan(1.0, 1, ());
332 let plan_to_cancel = plan_queue.add_plan(2.0, 2, ());
333 plan_queue.add_plan(3.0, 3, ());
334 plan_queue.cancel_plan(&plan_to_cancel);
335 assert!(!plan_queue.is_empty());
336
337 let next_plan = plan_queue.get_next_plan().unwrap();
338 assert_eq!(next_plan.time, 1.0);
339 assert_eq!(next_plan.data, 1);
340
341 assert!(!plan_queue.is_empty());
342 let next_plan = plan_queue.get_next_plan().unwrap();
343 assert_eq!(next_plan.time, 3.0);
344 assert_eq!(next_plan.data, 3);
345
346 assert!(plan_queue.is_empty());
347 assert!(plan_queue.get_next_plan().is_none());
348 }
349
350 #[test]
351 fn add_and_get_plans() {
352 let mut plan_queue = Queue::new();
353 plan_queue.add_plan(1.0, 1, ());
354 plan_queue.add_plan(2.0, 2, ());
355 assert!(!plan_queue.is_empty());
356
357 let next_plan = plan_queue.get_next_plan().unwrap();
358 assert_eq!(next_plan.time, 1.0);
359 assert_eq!(next_plan.data, 1);
360
361 plan_queue.add_plan(1.5, 3, ());
362
363 assert!(!plan_queue.is_empty());
364 let next_plan = plan_queue.get_next_plan().unwrap();
365 assert_eq!(next_plan.time, 1.5);
366 assert_eq!(next_plan.data, 3);
367
368 assert!(!plan_queue.is_empty());
369 let next_plan = plan_queue.get_next_plan().unwrap();
370 assert_eq!(next_plan.time, 2.0);
371 assert_eq!(next_plan.data, 2);
372
373 assert!(plan_queue.is_empty());
374 assert!(plan_queue.get_next_plan().is_none());
375 }
376
377 #[test]
378 fn cancel_invalid_plan() {
379 let mut plan_queue = Queue::new();
380 let plan_to_cancel = plan_queue.add_plan(1.0, (), ());
381 assert!(!plan_queue.is_empty());
383 plan_queue.get_next_plan();
384 assert!(plan_queue.is_empty());
385 let result = plan_queue.cancel_plan(&plan_to_cancel);
386 assert!(result.is_none());
387 }
388}