Flutter Engine
message_loop_task_queues.cc
Go to the documentation of this file.
1 // Copyright 2013 The Flutter Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #define FML_USED_ON_EMBEDDER
6 
7 #include "flutter/fml/message_loop_task_queues.h"
8 
9 #include <iostream>
10 #include <memory>
11 #include <optional>
12 
13 #include "flutter/fml/make_copyable.h"
14 #include "flutter/fml/task_source.h"
15 #include "flutter/fml/thread_local.h"
16 
17 namespace fml {
18 
19 std::mutex MessageLoopTaskQueues::creation_mutex_;
20 
21 const size_t TaskQueueId::kUnmerged = ULONG_MAX;
22 
23 // Guarded by creation_mutex_.
24 fml::RefPtr<MessageLoopTaskQueues> MessageLoopTaskQueues::instance_;
25 
26 namespace {
27 
28 // iOS prior to version 9 prevents c++11 thread_local and __thread specifier,
29 // having us resort to boxed enum containers.
30 class TaskSourceGradeHolder {
31  public:
32  TaskSourceGrade task_source_grade;
33 
34  explicit TaskSourceGradeHolder(TaskSourceGrade task_source_grade_arg)
35  : task_source_grade(task_source_grade_arg) {}
36 };
37 } // namespace
38 
39 // Guarded by creation_mutex_.
40 FML_THREAD_LOCAL ThreadLocalUniquePtr<TaskSourceGradeHolder>
42 
44  : subsumed_by(_kUnmerged), created_for(created_for_arg) {
45  wakeable = NULL;
47  task_source = std::make_unique<TaskSource>(created_for);
48 }
49 
51  std::scoped_lock creation(creation_mutex_);
52  if (!instance_) {
53  instance_ = fml::MakeRefCounted<MessageLoopTaskQueues>();
55  new TaskSourceGradeHolder{TaskSourceGrade::kUnspecified});
56  }
57  return instance_;
58 }
59 
61  std::lock_guard guard(queue_mutex_);
62  TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
63  ++task_queue_id_counter_;
64  queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(loop_id);
65  return loop_id;
66 }
67 
68 MessageLoopTaskQueues::MessageLoopTaskQueues()
69  : task_queue_id_counter_(0), order_(0) {}
70 
71 MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
72 
74  std::lock_guard guard(queue_mutex_);
75  const auto& queue_entry = queue_entries_.at(queue_id);
76  FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
77  auto& subsumed_set = queue_entry->owner_of;
78  for (auto& subsumed : subsumed_set) {
79  queue_entries_.erase(subsumed);
80  }
81  // Erase owner queue_id at last to avoid &subsumed_set from being invalid
82  queue_entries_.erase(queue_id);
83 }
84 
86  std::lock_guard guard(queue_mutex_);
87  const auto& queue_entry = queue_entries_.at(queue_id);
88  FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
89  auto& subsumed_set = queue_entry->owner_of;
90  queue_entry->task_source->ShutDown();
91  for (auto& subsumed : subsumed_set) {
92  queue_entries_.at(subsumed)->task_source->ShutDown();
93  }
94 }
95 
97  std::scoped_lock creation(creation_mutex_);
98  return tls_task_source_grade.get()->task_source_grade;
99 }
100 
102  TaskQueueId queue_id,
103  const fml::closure& task,
104  fml::TimePoint target_time,
105  fml::TaskSourceGrade task_source_grade) {
106  std::lock_guard guard(queue_mutex_);
107  size_t order = order_++;
108  const auto& queue_entry = queue_entries_.at(queue_id);
109  queue_entry->task_source->RegisterTask(
110  {order, task, target_time, task_source_grade});
111  TaskQueueId loop_to_wake = queue_id;
112  if (queue_entry->subsumed_by != _kUnmerged) {
113  loop_to_wake = queue_entry->subsumed_by;
114  }
115 
116  // This can happen when the secondary tasks are paused.
117  if (HasPendingTasksUnlocked(loop_to_wake)) {
118  WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));
119  }
120 }
121 
123  std::lock_guard guard(queue_mutex_);
124  return HasPendingTasksUnlocked(queue_id);
125 }
126 
128  fml::TimePoint from_time) {
129  std::lock_guard guard(queue_mutex_);
130  if (!HasPendingTasksUnlocked(queue_id)) {
131  return nullptr;
132  }
133  TaskSource::TopTask top = PeekNextTaskUnlocked(queue_id);
134 
135  if (!HasPendingTasksUnlocked(queue_id)) {
136  WakeUpUnlocked(queue_id, fml::TimePoint::Max());
137  } else {
138  WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
139  }
140 
141  if (top.task.GetTargetTime() > from_time) {
142  return nullptr;
143  }
144  fml::closure invocation = top.task.GetTask();
145  queue_entries_.at(top.task_queue_id)
146  ->task_source->PopTask(top.task.GetTaskSourceGrade());
147  {
148  std::scoped_lock creation(creation_mutex_);
149  const auto task_source_grade = top.task.GetTaskSourceGrade();
150  tls_task_source_grade.reset(new TaskSourceGradeHolder{task_source_grade});
151  }
152  return invocation;
153 }
154 
155 void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
156  fml::TimePoint time) const {
157  if (queue_entries_.at(queue_id)->wakeable) {
158  queue_entries_.at(queue_id)->wakeable->WakeUp(time);
159  }
160 }
161 
163  std::lock_guard guard(queue_mutex_);
164  const auto& queue_entry = queue_entries_.at(queue_id);
165  if (queue_entry->subsumed_by != _kUnmerged) {
166  return 0;
167  }
168 
169  size_t total_tasks = 0;
170  total_tasks += queue_entry->task_source->GetNumPendingTasks();
171 
172  auto& subsumed_set = queue_entry->owner_of;
173  for (auto& subsumed : subsumed_set) {
174  const auto& subsumed_entry = queue_entries_.at(subsumed);
175  total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
176  }
177  return total_tasks;
178 }
179 
181  intptr_t key,
182  const fml::closure& callback) {
183  std::lock_guard guard(queue_mutex_);
184  FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
185  queue_entries_.at(queue_id)->task_observers[key] = callback;
186 }
187 
189  intptr_t key) {
190  std::lock_guard guard(queue_mutex_);
191  queue_entries_.at(queue_id)->task_observers.erase(key);
192 }
193 
195  TaskQueueId queue_id) const {
196  std::lock_guard guard(queue_mutex_);
197  std::vector<fml::closure> observers;
198 
199  if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
200  return observers;
201  }
202 
203  for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
204  observers.push_back(observer.second);
205  }
206 
207  auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
208  for (auto& subsumed : subsumed_set) {
209  for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
210  observers.push_back(observer.second);
211  }
212  }
213 
214  return observers;
215 }
216 
219  std::lock_guard guard(queue_mutex_);
220  FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
221  << "Wakeable can only be set once.";
222  queue_entries_.at(queue_id)->wakeable = wakeable;
223 }
224 
226  if (owner == subsumed) {
227  return true;
228  }
229  std::lock_guard guard(queue_mutex_);
230  auto& owner_entry = queue_entries_.at(owner);
231  auto& subsumed_entry = queue_entries_.at(subsumed);
232  auto& subsumed_set = owner_entry->owner_of;
233  if (subsumed_set.find(subsumed) != subsumed_set.end()) {
234  return true;
235  }
236 
237  // Won't check owner_entry->owner_of, because it may contains items when
238  // merged with other different queues.
239 
240  // Ensure owner_entry->subsumed_by being _kUnmerged
241  if (owner_entry->subsumed_by != _kUnmerged) {
242  FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
243  "subsumed by others, owner="
244  << owner << ", subsumed=" << subsumed
245  << ", owner->subsumed_by=" << owner_entry->subsumed_by;
246  return false;
247  }
248  // Ensure subsumed_entry->owner_of being empty
249  if (!subsumed_entry->owner_of.empty()) {
250  FML_LOG(WARNING)
251  << "Thread merging failed: subsumed_entry already owns others, owner="
252  << owner << ", subsumed=" << subsumed
253  << ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
254  return false;
255  }
256  // Ensure subsumed_entry->subsumed_by being _kUnmerged
257  if (subsumed_entry->subsumed_by != _kUnmerged) {
258  FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
259  "subsumed by others, owner="
260  << owner << ", subsumed=" << subsumed
261  << ", subsumed->subsumed_by="
262  << subsumed_entry->subsumed_by;
263  return false;
264  }
265  // All checking is OK, set merged state.
266  owner_entry->owner_of.insert(subsumed);
267  subsumed_entry->subsumed_by = owner;
268 
269  if (HasPendingTasksUnlocked(owner)) {
270  WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
271  }
272 
273  return true;
274 }
275 
277  std::lock_guard guard(queue_mutex_);
278  const auto& owner_entry = queue_entries_.at(owner);
279  if (owner_entry->owner_of.empty()) {
280  FML_LOG(WARNING)
281  << "Thread unmerging failed: owner_entry doesn't own anyone, owner="
282  << owner << ", subsumed=" << subsumed;
283  return false;
284  }
285  if (owner_entry->subsumed_by != _kUnmerged) {
286  FML_LOG(WARNING)
287  << "Thread unmerging failed: owner_entry was subsumed by others, owner="
288  << owner << ", subsumed=" << subsumed
289  << ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
290  return false;
291  }
292  if (queue_entries_.at(subsumed)->subsumed_by == _kUnmerged) {
293  FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
294  "subsumed by others, owner="
295  << owner << ", subsumed=" << subsumed;
296  return false;
297  }
298  if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
299  FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
300  "given subsumed queue id, owner="
301  << owner << ", subsumed=" << subsumed;
302  return false;
303  }
304 
305  queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
306  owner_entry->owner_of.erase(subsumed);
307 
308  if (HasPendingTasksUnlocked(owner)) {
309  WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
310  }
311 
312  if (HasPendingTasksUnlocked(subsumed)) {
313  WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
314  }
315 
316  return true;
317 }
318 
320  TaskQueueId subsumed) const {
321  std::lock_guard guard(queue_mutex_);
322  if (owner == _kUnmerged || subsumed == _kUnmerged) {
323  return false;
324  }
325  auto& subsumed_set = queue_entries_.at(owner)->owner_of;
326  return subsumed_set.find(subsumed) != subsumed_set.end();
327 }
328 
330  TaskQueueId owner) const {
331  std::lock_guard guard(queue_mutex_);
332  return queue_entries_.at(owner)->owner_of;
333 }
334 
336  std::lock_guard guard(queue_mutex_);
337  queue_entries_.at(queue_id)->task_source->PauseSecondary();
338 }
339 
341  std::lock_guard guard(queue_mutex_);
342  queue_entries_.at(queue_id)->task_source->ResumeSecondary();
343  // Schedule a wake as needed.
344  if (HasPendingTasksUnlocked(queue_id)) {
345  WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
346  }
347 }
348 
349 // Subsumed queues will never have pending tasks.
350 // Owning queues will consider both their and their subsumed tasks.
351 bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
352  TaskQueueId queue_id) const {
353  const auto& entry = queue_entries_.at(queue_id);
354  bool is_subsumed = entry->subsumed_by != _kUnmerged;
355  if (is_subsumed) {
356  return false;
357  }
358 
359  if (!entry->task_source->IsEmpty()) {
360  return true;
361  }
362 
363  auto& subsumed_set = entry->owner_of;
364  return std::any_of(
365  subsumed_set.begin(), subsumed_set.end(), [&](const auto& subsumed) {
366  return !queue_entries_.at(subsumed)->task_source->IsEmpty();
367  });
368 }
369 
370 fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
371  TaskQueueId queue_id) const {
372  return PeekNextTaskUnlocked(queue_id).task.GetTargetTime();
373 }
374 
375 TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
376  TaskQueueId owner) const {
377  FML_DCHECK(HasPendingTasksUnlocked(owner));
378  const auto& entry = queue_entries_.at(owner);
379  if (entry->owner_of.empty()) {
380  FML_CHECK(!entry->task_source->IsEmpty());
381  return entry->task_source->Top();
382  }
383 
384  // Use optional for the memory of TopTask object.
385  std::optional<TaskSource::TopTask> top_task;
386 
387  std::function<void(const TaskSource*)> top_task_updater =
388  [&top_task](const TaskSource* source) {
389  if (source && !source->IsEmpty()) {
390  TaskSource::TopTask other_task = source->Top();
391  if (!top_task.has_value() || top_task->task > other_task.task) {
392  top_task.emplace(other_task);
393  }
394  }
395  };
396 
397  TaskSource* owner_tasks = entry->task_source.get();
398  top_task_updater(owner_tasks);
399 
400  for (TaskQueueId subsumed : entry->owner_of) {
401  TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();
402  top_task_updater(subsumed_tasks);
403  }
404  // At least one task at the top because PeekNextTaskUnlocked() is called after
405  // HasPendingTasksUnlocked()
406  FML_CHECK(top_task.has_value());
407  return top_task.value();
408 }
409 
410 } // namespace fml
bool HasPendingTasks(TaskQueueId queue_id) const
static TaskSourceGrade GetCurrentTaskSourceGrade()
bool Owns(TaskQueueId owner, TaskQueueId subsumed) const
Returns true if owner owns the subsumed task queue.
#define FML_DCHECK(condition)
Definition: logging.h:86
void PauseSecondarySource(TaskQueueId queue_id)
std::map< intptr_t, fml::closure > TaskObservers
void RemoveTaskObserver(TaskQueueId queue_id, intptr_t key)
std::set< TaskQueueId > GetSubsumedTaskQueueId(TaskQueueId owner) const
bool Merge(TaskQueueId owner, TaskQueueId subsumed)
TaskQueueEntry(TaskQueueId created_for)
FML_THREAD_LOCAL ThreadLocalUniquePtr< TaskSourceGradeHolder > tls_task_source_grade
#define FML_LOG(severity)
Definition: logging.h:65
FlKeyEvent FlKeyResponderAsyncCallback callback
Definition: ascii_trie.cc:9
bool Unmerge(TaskQueueId owner, TaskQueueId subsumed)
void ResumeSecondarySource(TaskQueueId queue_id)
std::function< void()> closure
Definition: closure.h:14
TaskQueueId task_queue_id
Definition: task_source.h:38
static fml::RefPtr< MessageLoopTaskQueues > GetInstance()
TaskQueueId created_for
TaskObservers task_observers
#define FML_THREAD_LOCAL
Definition: thread_local.h:61
std::unique_ptr< TaskSource > task_source
static constexpr TimePoint Max()
Definition: time_point.h:34
static const TaskQueueId _kUnmerged
void DisposeTasks(TaskQueueId queue_id)
std::vector< fml::closure > GetObserversToNotify(TaskQueueId queue_id) const
void Dispose(TaskQueueId queue_id)
fml::closure GetNextTaskToRun(TaskQueueId queue_id, fml::TimePoint from_time)
const fml::closure & GetTask() const
Definition: delayed_task.cc:24
#define FML_CHECK(condition)
Definition: logging.h:68
Wakeable * wakeable
void AddTaskObserver(TaskQueueId queue_id, intptr_t key, const fml::closure &callback)
fml::TimePoint GetTargetTime() const
Definition: delayed_task.cc:28
static const size_t kUnmerged
Definition: task_queue_id.h:19
void SetWakeable(TaskQueueId queue_id, fml::Wakeable *wakeable)
size_t GetNumPendingTasks(TaskQueueId queue_id) const
fml::TaskSourceGrade GetTaskSourceGrade() const
Definition: delayed_task.cc:32
void RegisterTask(TaskQueueId queue_id, const fml::closure &task, fml::TimePoint target_time, fml::TaskSourceGrade task_source_grade=fml::TaskSourceGrade::kUnspecified)
const DelayedTask & task
Definition: task_source.h:39
The absence of a specialized TaskSourceGrade.