Flutter Engine
message_loop_task_queues.cc
Go to the documentation of this file.
1 // Copyright 2013 The Flutter Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #define FML_USED_ON_EMBEDDER
6 
7 #include "flutter/fml/message_loop_task_queues.h"
8 
9 #include <iostream>
10 
11 #include "flutter/fml/make_copyable.h"
12 #include "flutter/fml/message_loop_impl.h"
13 
14 namespace fml {
15 
16 std::mutex MessageLoopTaskQueues::creation_mutex_;
17 
18 const size_t TaskQueueId::kUnmerged = ULONG_MAX;
19 
20 fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;
21 
23  : owner_of(_kUnmerged), subsumed_by(_kUnmerged) {
24  wakeable = NULL;
27 }
28 
30  std::scoped_lock creation(creation_mutex_);
31  if (!instance_) {
32  instance_ = fml::MakeRefCounted<MessageLoopTaskQueues>();
33  }
34  return instance_;
35 }
36 
38  std::lock_guard guard(queue_mutex_);
39  TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
40  ++task_queue_id_counter_;
41  queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>();
42  return loop_id;
43 }
44 
45 MessageLoopTaskQueues::MessageLoopTaskQueues()
46  : task_queue_id_counter_(0), order_(0) {}
47 
48 MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
49 
51  std::lock_guard guard(queue_mutex_);
52  const auto& queue_entry = queue_entries_.at(queue_id);
53  FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
54  TaskQueueId subsumed = queue_entry->owner_of;
55  queue_entries_.erase(queue_id);
56  if (subsumed != _kUnmerged) {
57  queue_entries_.erase(subsumed);
58  }
59 }
60 
62  std::lock_guard guard(queue_mutex_);
63  const auto& queue_entry = queue_entries_.at(queue_id);
64  FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
65  TaskQueueId subsumed = queue_entry->owner_of;
66  queue_entry->delayed_tasks = {};
67  if (subsumed != _kUnmerged) {
68  queue_entries_.at(subsumed)->delayed_tasks = {};
69  }
70 }
71 
73  const fml::closure& task,
74  fml::TimePoint target_time) {
75  std::lock_guard guard(queue_mutex_);
76  size_t order = order_++;
77  const auto& queue_entry = queue_entries_.at(queue_id);
78  queue_entry->delayed_tasks.push({order, task, target_time});
79  TaskQueueId loop_to_wake = queue_id;
80  if (queue_entry->subsumed_by != _kUnmerged) {
81  loop_to_wake = queue_entry->subsumed_by;
82  }
83  WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));
84 }
85 
87  std::lock_guard guard(queue_mutex_);
88  return HasPendingTasksUnlocked(queue_id);
89 }
90 
92  fml::TimePoint from_time) {
93  std::lock_guard guard(queue_mutex_);
94  if (!HasPendingTasksUnlocked(queue_id)) {
95  return nullptr;
96  }
97  TaskQueueId top_queue = _kUnmerged;
98  const auto& top = PeekNextTaskUnlocked(queue_id, top_queue);
99 
100  if (!HasPendingTasksUnlocked(queue_id)) {
101  WakeUpUnlocked(queue_id, fml::TimePoint::Max());
102  } else {
103  WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
104  }
105 
106  if (top.GetTargetTime() > from_time) {
107  return nullptr;
108  }
109  fml::closure invocation = top.GetTask();
110  queue_entries_.at(top_queue)->delayed_tasks.pop();
111  return invocation;
112 }
113 
114 void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
115  fml::TimePoint time) const {
116  if (queue_entries_.at(queue_id)->wakeable) {
117  queue_entries_.at(queue_id)->wakeable->WakeUp(time);
118  }
119 }
120 
122  std::lock_guard guard(queue_mutex_);
123  const auto& queue_entry = queue_entries_.at(queue_id);
124  if (queue_entry->subsumed_by != _kUnmerged) {
125  return 0;
126  }
127 
128  size_t total_tasks = 0;
129  total_tasks += queue_entry->delayed_tasks.size();
130 
131  TaskQueueId subsumed = queue_entry->owner_of;
132  if (subsumed != _kUnmerged) {
133  const auto& subsumed_entry = queue_entries_.at(subsumed);
134  total_tasks += subsumed_entry->delayed_tasks.size();
135  }
136  return total_tasks;
137 }
138 
140  intptr_t key,
141  const fml::closure& callback) {
142  std::lock_guard guard(queue_mutex_);
143  FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
144  queue_entries_.at(queue_id)->task_observers[key] = callback;
145 }
146 
148  intptr_t key) {
149  std::lock_guard guard(queue_mutex_);
150  queue_entries_.at(queue_id)->task_observers.erase(key);
151 }
152 
154  TaskQueueId queue_id) const {
155  std::lock_guard guard(queue_mutex_);
156  std::vector<fml::closure> observers;
157 
158  if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
159  return observers;
160  }
161 
162  for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
163  observers.push_back(observer.second);
164  }
165 
166  TaskQueueId subsumed = queue_entries_.at(queue_id)->owner_of;
167  if (subsumed != _kUnmerged) {
168  for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
169  observers.push_back(observer.second);
170  }
171  }
172 
173  return observers;
174 }
175 
178  std::lock_guard guard(queue_mutex_);
179  FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
180  << "Wakeable can only be set once.";
181  queue_entries_.at(queue_id)->wakeable = wakeable;
182 }
183 
185  if (owner == subsumed) {
186  return true;
187  }
188  std::lock_guard guard(queue_mutex_);
189  auto& owner_entry = queue_entries_.at(owner);
190  auto& subsumed_entry = queue_entries_.at(subsumed);
191 
192  if (owner_entry->owner_of == subsumed) {
193  return true;
194  }
195 
196  std::vector<TaskQueueId> owner_subsumed_keys = {
197  owner_entry->owner_of, owner_entry->subsumed_by, subsumed_entry->owner_of,
198  subsumed_entry->subsumed_by};
199 
200  for (auto key : owner_subsumed_keys) {
201  if (key != _kUnmerged) {
202  return false;
203  }
204  }
205 
206  owner_entry->owner_of = subsumed;
207  subsumed_entry->subsumed_by = owner;
208 
209  if (HasPendingTasksUnlocked(owner)) {
210  WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
211  }
212 
213  return true;
214 }
215 
217  std::lock_guard guard(queue_mutex_);
218  const auto& owner_entry = queue_entries_.at(owner);
219  const TaskQueueId subsumed = owner_entry->owner_of;
220  if (subsumed == _kUnmerged) {
221  return false;
222  }
223 
224  queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
225  owner_entry->owner_of = _kUnmerged;
226 
227  if (HasPendingTasksUnlocked(owner)) {
228  WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
229  }
230 
231  if (HasPendingTasksUnlocked(subsumed)) {
232  WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
233  }
234 
235  return true;
236 }
237 
239  TaskQueueId subsumed) const {
240  std::lock_guard guard(queue_mutex_);
241  return subsumed == queue_entries_.at(owner)->owner_of;
242 }
243 
244 // Subsumed queues will never have pending tasks.
245 // Owning queues will consider both their and their subsumed tasks.
246 bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
247  TaskQueueId queue_id) const {
248  const auto& entry = queue_entries_.at(queue_id);
249  bool is_subsumed = entry->subsumed_by != _kUnmerged;
250  if (is_subsumed) {
251  return false;
252  }
253 
254  if (!entry->delayed_tasks.empty()) {
255  return true;
256  }
257 
258  const TaskQueueId subsumed = entry->owner_of;
259  if (subsumed == _kUnmerged) {
260  // this is not an owner and queue is empty.
261  return false;
262  } else {
263  return !queue_entries_.at(subsumed)->delayed_tasks.empty();
264  }
265 }
266 
267 fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
268  TaskQueueId queue_id) const {
269  TaskQueueId tmp = _kUnmerged;
270  return PeekNextTaskUnlocked(queue_id, tmp).GetTargetTime();
271 }
272 
273 const DelayedTask& MessageLoopTaskQueues::PeekNextTaskUnlocked(
274  TaskQueueId owner,
275  TaskQueueId& top_queue_id) const {
276  FML_DCHECK(HasPendingTasksUnlocked(owner));
277  const auto& entry = queue_entries_.at(owner);
278  const TaskQueueId subsumed = entry->owner_of;
279  if (subsumed == _kUnmerged) {
280  top_queue_id = owner;
281  return entry->delayed_tasks.top();
282  }
283 
284  const auto& owner_tasks = entry->delayed_tasks;
285  const auto& subsumed_tasks = queue_entries_.at(subsumed)->delayed_tasks;
286 
287  // we are owning another task queue
288  const bool subsumed_has_task = !subsumed_tasks.empty();
289  const bool owner_has_task = !owner_tasks.empty();
290  if (owner_has_task && subsumed_has_task) {
291  const auto owner_task = owner_tasks.top();
292  const auto subsumed_task = subsumed_tasks.top();
293  if (owner_task > subsumed_task) {
294  top_queue_id = subsumed;
295  } else {
296  top_queue_id = owner;
297  }
298  } else if (owner_has_task) {
299  top_queue_id = owner;
300  } else {
301  top_queue_id = subsumed;
302  }
303  return queue_entries_.at(top_queue_id)->delayed_tasks.top();
304 }
305 
306 } // namespace fml
bool HasPendingTasks(TaskQueueId queue_id) const
bool Owns(TaskQueueId owner, TaskQueueId subsumed) const
#define FML_DCHECK(condition)
Definition: logging.h:86
std::map< intptr_t, fml::closure > TaskObservers
void RemoveTaskObserver(TaskQueueId queue_id, intptr_t key)
bool Merge(TaskQueueId owner, TaskQueueId subsumed)
Definition: ascii_trie.cc:9
std::function< void()> closure
Definition: closure.h:14
static fml::RefPtr< MessageLoopTaskQueues > GetInstance()
TaskObservers task_observers
static constexpr TimePoint Max()
Definition: time_point.h:32
static const TaskQueueId _kUnmerged
void DisposeTasks(TaskQueueId queue_id)
std::vector< fml::closure > GetObserversToNotify(TaskQueueId queue_id) const
void Dispose(TaskQueueId queue_id)
TaskQueueEntry()
fml::closure GetNextTaskToRun(TaskQueueId queue_id, fml::TimePoint from_time)
DelayedTaskQueue delayed_tasks
void RegisterTask(TaskQueueId queue_id, const fml::closure &task, fml::TimePoint target_time)
#define FML_CHECK(condition)
Definition: logging.h:68
Wakeable * wakeable
void AddTaskObserver(TaskQueueId queue_id, intptr_t key, const fml::closure &callback)
static const size_t kUnmerged
void SetWakeable(TaskQueueId queue_id, fml::Wakeable *wakeable)
std::priority_queue< DelayedTask, std::deque< DelayedTask >, std::greater< DelayedTask > > DelayedTaskQueue
Definition: delayed_task.h:39
size_t GetNumPendingTasks(TaskQueueId queue_id) const