Flutter Engine
The Flutter Engine
message_loop_task_queues.cc
Go to the documentation of this file.
1// Copyright 2013 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#define FML_USED_ON_EMBEDDER
6
7#include "flutter/fml/message_loop_task_queues.h"
8
9#include <algorithm>
10#include <iostream>
11#include <memory>
12#include <optional>
13
14#include "flutter/fml/make_copyable.h"
15#include "flutter/fml/task_source.h"
16
17namespace fml {
18
19const size_t TaskQueueId::kUnmerged = ULONG_MAX;
20
21namespace {
22
23// iOS prior to version 9 prevents c++11 thread_local and __thread specifier,
24// having us resort to boxed enum containers.
25class TaskSourceGradeHolder {
26 public:
27 TaskSourceGrade task_source_grade;
28
29 explicit TaskSourceGradeHolder(TaskSourceGrade task_source_grade_arg)
30 : task_source_grade(task_source_grade_arg) {}
31};
32} // namespace
33
34static thread_local std::unique_ptr<TaskSourceGradeHolder>
36
38 : subsumed_by(kUnmerged), created_for(created_for_arg) {
39 wakeable = NULL;
41 task_source = std::make_unique<TaskSource>(created_for);
42}
43
46 return instance;
47}
48
50 std::lock_guard guard(queue_mutex_);
51 TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
52 ++task_queue_id_counter_;
53 queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(loop_id);
54 return loop_id;
55}
56
57MessageLoopTaskQueues::MessageLoopTaskQueues() : order_(0) {
59 new TaskSourceGradeHolder{TaskSourceGrade::kUnspecified});
60}
61
62MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
63
65 std::lock_guard guard(queue_mutex_);
66 const auto& queue_entry = queue_entries_.at(queue_id);
67 FML_DCHECK(queue_entry->subsumed_by == kUnmerged);
68 auto& subsumed_set = queue_entry->owner_of;
69 for (auto& subsumed : subsumed_set) {
70 queue_entries_.erase(subsumed);
71 }
72 // Erase owner queue_id at last to avoid &subsumed_set from being invalid
73 queue_entries_.erase(queue_id);
74}
75
77 std::lock_guard guard(queue_mutex_);
78 const auto& queue_entry = queue_entries_.at(queue_id);
79 FML_DCHECK(queue_entry->subsumed_by == kUnmerged);
80 auto& subsumed_set = queue_entry->owner_of;
81 queue_entry->task_source->ShutDown();
82 for (auto& subsumed : subsumed_set) {
83 queue_entries_.at(subsumed)->task_source->ShutDown();
84 }
85}
86
88 return tls_task_source_grade.get()->task_source_grade;
89}
90
92 TaskQueueId queue_id,
93 const fml::closure& task,
94 fml::TimePoint target_time,
95 fml::TaskSourceGrade task_source_grade) {
96 std::lock_guard guard(queue_mutex_);
97 size_t order = order_++;
98 const auto& queue_entry = queue_entries_.at(queue_id);
99 queue_entry->task_source->RegisterTask(
100 {order, task, target_time, task_source_grade});
101 TaskQueueId loop_to_wake = queue_id;
102 if (queue_entry->subsumed_by != kUnmerged) {
103 loop_to_wake = queue_entry->subsumed_by;
104 }
105
106 // This can happen when the secondary tasks are paused.
107 if (HasPendingTasksUnlocked(loop_to_wake)) {
108 WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));
109 }
110}
111
113 std::lock_guard guard(queue_mutex_);
114 return HasPendingTasksUnlocked(queue_id);
115}
116
118 fml::TimePoint from_time) {
119 std::lock_guard guard(queue_mutex_);
120 if (!HasPendingTasksUnlocked(queue_id)) {
121 return nullptr;
122 }
123 TaskSource::TopTask top = PeekNextTaskUnlocked(queue_id);
124
125 if (!HasPendingTasksUnlocked(queue_id)) {
126 WakeUpUnlocked(queue_id, fml::TimePoint::Max());
127 } else {
128 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
129 }
130
131 if (top.task.GetTargetTime() > from_time) {
132 return nullptr;
133 }
134 fml::closure invocation = top.task.GetTask();
135 queue_entries_.at(top.task_queue_id)
136 ->task_source->PopTask(top.task.GetTaskSourceGrade());
137 const auto task_source_grade = top.task.GetTaskSourceGrade();
138 tls_task_source_grade.reset(new TaskSourceGradeHolder{task_source_grade});
139 return invocation;
140}
141
142void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
143 fml::TimePoint time) const {
144 if (queue_entries_.at(queue_id)->wakeable) {
145 queue_entries_.at(queue_id)->wakeable->WakeUp(time);
146 }
147}
148
150 std::lock_guard guard(queue_mutex_);
151 const auto& queue_entry = queue_entries_.at(queue_id);
152 if (queue_entry->subsumed_by != kUnmerged) {
153 return 0;
154 }
155
156 size_t total_tasks = 0;
157 total_tasks += queue_entry->task_source->GetNumPendingTasks();
158
159 auto& subsumed_set = queue_entry->owner_of;
160 for (auto& subsumed : subsumed_set) {
161 const auto& subsumed_entry = queue_entries_.at(subsumed);
162 total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
163 }
164 return total_tasks;
165}
166
168 intptr_t key,
169 const fml::closure& callback) {
170 std::lock_guard guard(queue_mutex_);
171 FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
172 queue_entries_.at(queue_id)->task_observers[key] = callback;
173}
174
176 intptr_t key) {
177 std::lock_guard guard(queue_mutex_);
178 queue_entries_.at(queue_id)->task_observers.erase(key);
179}
180
182 TaskQueueId queue_id) const {
183 std::lock_guard guard(queue_mutex_);
184 std::vector<fml::closure> observers;
185
186 if (queue_entries_.at(queue_id)->subsumed_by != kUnmerged) {
187 return observers;
188 }
189
190 for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
191 observers.push_back(observer.second);
192 }
193
194 auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
195 for (auto& subsumed : subsumed_set) {
196 for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
197 observers.push_back(observer.second);
198 }
199 }
200
201 return observers;
202}
203
205 fml::Wakeable* wakeable) {
206 std::lock_guard guard(queue_mutex_);
207 FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
208 << "Wakeable can only be set once.";
209 queue_entries_.at(queue_id)->wakeable = wakeable;
210}
211
213 if (owner == subsumed) {
214 return true;
215 }
216 std::lock_guard guard(queue_mutex_);
217 auto& owner_entry = queue_entries_.at(owner);
218 auto& subsumed_entry = queue_entries_.at(subsumed);
219 auto& subsumed_set = owner_entry->owner_of;
220 if (subsumed_set.find(subsumed) != subsumed_set.end()) {
221 return true;
222 }
223
224 // Won't check owner_entry->owner_of, because it may contains items when
225 // merged with other different queues.
226
227 // Ensure owner_entry->subsumed_by being kUnmerged
228 if (owner_entry->subsumed_by != kUnmerged) {
229 FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
230 "subsumed by others, owner="
231 << owner << ", subsumed=" << subsumed
232 << ", owner->subsumed_by=" << owner_entry->subsumed_by;
233 return false;
234 }
235 // Ensure subsumed_entry->owner_of being empty
236 if (!subsumed_entry->owner_of.empty()) {
237 FML_LOG(WARNING)
238 << "Thread merging failed: subsumed_entry already owns others, owner="
239 << owner << ", subsumed=" << subsumed
240 << ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
241 return false;
242 }
243 // Ensure subsumed_entry->subsumed_by being kUnmerged
244 if (subsumed_entry->subsumed_by != kUnmerged) {
245 FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
246 "subsumed by others, owner="
247 << owner << ", subsumed=" << subsumed
248 << ", subsumed->subsumed_by="
249 << subsumed_entry->subsumed_by;
250 return false;
251 }
252 // All checking is OK, set merged state.
253 owner_entry->owner_of.insert(subsumed);
254 subsumed_entry->subsumed_by = owner;
255
256 if (HasPendingTasksUnlocked(owner)) {
257 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
258 }
259
260 return true;
261}
262
264 std::lock_guard guard(queue_mutex_);
265 const auto& owner_entry = queue_entries_.at(owner);
266 if (owner_entry->owner_of.empty()) {
267 FML_LOG(WARNING)
268 << "Thread unmerging failed: owner_entry doesn't own anyone, owner="
269 << owner << ", subsumed=" << subsumed;
270 return false;
271 }
272 if (owner_entry->subsumed_by != kUnmerged) {
273 FML_LOG(WARNING)
274 << "Thread unmerging failed: owner_entry was subsumed by others, owner="
275 << owner << ", subsumed=" << subsumed
276 << ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
277 return false;
278 }
279 if (queue_entries_.at(subsumed)->subsumed_by == kUnmerged) {
280 FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
281 "subsumed by others, owner="
282 << owner << ", subsumed=" << subsumed;
283 return false;
284 }
285 if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
286 FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
287 "given subsumed queue id, owner="
288 << owner << ", subsumed=" << subsumed;
289 return false;
290 }
291
292 queue_entries_.at(subsumed)->subsumed_by = kUnmerged;
293 owner_entry->owner_of.erase(subsumed);
294
295 if (HasPendingTasksUnlocked(owner)) {
296 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
297 }
298
299 if (HasPendingTasksUnlocked(subsumed)) {
300 WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
301 }
302
303 return true;
304}
305
307 TaskQueueId subsumed) const {
308 std::lock_guard guard(queue_mutex_);
309 if (owner == kUnmerged || subsumed == kUnmerged) {
310 return false;
311 }
312 auto& subsumed_set = queue_entries_.at(owner)->owner_of;
313 return subsumed_set.find(subsumed) != subsumed_set.end();
314}
315
317 TaskQueueId owner) const {
318 std::lock_guard guard(queue_mutex_);
319 return queue_entries_.at(owner)->owner_of;
320}
321
323 std::lock_guard guard(queue_mutex_);
324 queue_entries_.at(queue_id)->task_source->PauseSecondary();
325}
326
328 std::lock_guard guard(queue_mutex_);
329 queue_entries_.at(queue_id)->task_source->ResumeSecondary();
330 // Schedule a wake as needed.
331 if (HasPendingTasksUnlocked(queue_id)) {
332 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
333 }
334}
335
336// Subsumed queues will never have pending tasks.
337// Owning queues will consider both their and their subsumed tasks.
338bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
339 TaskQueueId queue_id) const {
340 const auto& entry = queue_entries_.at(queue_id);
341 bool is_subsumed = entry->subsumed_by != kUnmerged;
342 if (is_subsumed) {
343 return false;
344 }
345
346 if (!entry->task_source->IsEmpty()) {
347 return true;
348 }
349
350 auto& subsumed_set = entry->owner_of;
351 return std::any_of(
352 subsumed_set.begin(), subsumed_set.end(), [&](const auto& subsumed) {
353 return !queue_entries_.at(subsumed)->task_source->IsEmpty();
354 });
355}
356
357fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
358 TaskQueueId queue_id) const {
359 return PeekNextTaskUnlocked(queue_id).task.GetTargetTime();
360}
361
362TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
363 TaskQueueId owner) const {
364 FML_DCHECK(HasPendingTasksUnlocked(owner));
365 const auto& entry = queue_entries_.at(owner);
366 if (entry->owner_of.empty()) {
367 FML_CHECK(!entry->task_source->IsEmpty());
368 return entry->task_source->Top();
369 }
370
371 // Use optional for the memory of TopTask object.
372 std::optional<TaskSource::TopTask> top_task;
373
374 std::function<void(const TaskSource*)> top_task_updater =
375 [&top_task](const TaskSource* source) {
376 if (source && !source->IsEmpty()) {
377 TaskSource::TopTask other_task = source->Top();
378 if (!top_task.has_value() || top_task->task > other_task.task) {
379 top_task.emplace(other_task);
380 }
381 }
382 };
383
384 TaskSource* owner_tasks = entry->task_source.get();
385 top_task_updater(owner_tasks);
386
387 for (TaskQueueId subsumed : entry->owner_of) {
388 TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();
389 top_task_updater(subsumed_tasks);
390 }
391 // At least one task at the top because PeekNextTaskUnlocked() is called after
392 // HasPendingTasksUnlocked()
393 FML_CHECK(top_task.has_value());
394 // Covered by FML_CHECK.
395 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
396 return top_task.value();
397}
398
399} // namespace fml
fml::TimePoint GetTargetTime() const
Definition: delayed_task.cc:28
const fml::closure & GetTask() const
Definition: delayed_task.cc:24
fml::TaskSourceGrade GetTaskSourceGrade() const
Definition: delayed_task.cc:32
fml::closure GetNextTaskToRun(TaskQueueId queue_id, fml::TimePoint from_time)
bool HasPendingTasks(TaskQueueId queue_id) const
size_t GetNumPendingTasks(TaskQueueId queue_id) const
static TaskSourceGrade GetCurrentTaskSourceGrade()
static MessageLoopTaskQueues * GetInstance()
void ResumeSecondarySource(TaskQueueId queue_id)
void DisposeTasks(TaskQueueId queue_id)
bool Merge(TaskQueueId owner, TaskQueueId subsumed)
bool Unmerge(TaskQueueId owner, TaskQueueId subsumed)
void RegisterTask(TaskQueueId queue_id, const fml::closure &task, fml::TimePoint target_time, fml::TaskSourceGrade task_source_grade=fml::TaskSourceGrade::kUnspecified)
std::vector< fml::closure > GetObserversToNotify(TaskQueueId queue_id) const
void PauseSecondarySource(TaskQueueId queue_id)
bool Owns(TaskQueueId owner, TaskQueueId subsumed) const
Returns true if owner owns the subsumed task queue.
void AddTaskObserver(TaskQueueId queue_id, intptr_t key, const fml::closure &callback)
void RemoveTaskObserver(TaskQueueId queue_id, intptr_t key)
std::set< TaskQueueId > GetSubsumedTaskQueueId(TaskQueueId owner) const
void SetWakeable(TaskQueueId queue_id, fml::Wakeable *wakeable)
void Dispose(TaskQueueId queue_id)
std::unique_ptr< TaskSource > task_source
TaskQueueEntry(TaskQueueId created_for)
std::map< intptr_t, fml::closure > TaskObservers
static const size_t kUnmerged
Definition: task_queue_id.h:19
static constexpr TimePoint Max()
Definition: time_point.h:39
VkInstance instance
Definition: main.cc:48
SkBitmap source
Definition: examples.cpp:28
FlKeyEvent uint64_t FlKeyResponderAsyncCallback callback
#define FML_LOG(severity)
Definition: logging.h:82
#define FML_CHECK(condition)
Definition: logging.h:85
#define FML_DCHECK(condition)
Definition: logging.h:103
Dart_NativeFunction function
Definition: fuchsia.cc:51
Definition: ascii_trie.cc:9
static const TaskQueueId kUnmerged
std::function< void()> closure
Definition: closure.h:14
@ kUnspecified
The absence of a specialized TaskSourceGrade.
static thread_local std::unique_ptr< TaskSourceGradeHolder > tls_task_source_grade
static double time(int loops, Benchmark *bench, Target *target)
Definition: nanobench.cpp:394
const DelayedTask & task
Definition: task_source.h:39
TaskQueueId task_queue_id
Definition: task_source.h:38