Flutter Engine
 
Loading...
Searching...
No Matches
message_loop_task_queues.cc
Go to the documentation of this file.
1// Copyright 2013 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#define FML_USED_ON_EMBEDDER
6
8
9#include <algorithm>
10#include <iostream>
11#include <memory>
12#include <optional>
13
16
17namespace fml {
18
19const size_t TaskQueueId::kUnmerged = ULONG_MAX;
20const size_t TaskQueueId::kInvalid = ULONG_MAX - 1;
21
22namespace {
23
24// iOS prior to version 9 prevents c++11 thread_local and __thread specifier,
25// having us resort to boxed enum containers.
26class TaskSourceGradeHolder {
27 public:
28 TaskSourceGrade task_source_grade;
29
30 explicit TaskSourceGradeHolder(TaskSourceGrade task_source_grade_arg)
31 : task_source_grade(task_source_grade_arg) {}
32};
33} // namespace
34
35static thread_local std::unique_ptr<TaskSourceGradeHolder>
37
39 : subsumed_by(kUnmerged), created_for(created_for_arg) {
40 wakeable = NULL;
42 task_source = std::make_unique<TaskSource>(created_for);
43}
44
49
51 std::lock_guard guard(queue_mutex_);
52 TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
53 ++task_queue_id_counter_;
54 queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(loop_id);
55 return loop_id;
56}
57
58MessageLoopTaskQueues::MessageLoopTaskQueues() : order_(0) {
60 new TaskSourceGradeHolder{TaskSourceGrade::kUnspecified});
61}
62
63MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
64
66 std::lock_guard guard(queue_mutex_);
67 const auto& queue_entry = queue_entries_.at(queue_id);
68 FML_DCHECK(queue_entry->subsumed_by == kUnmerged);
69 auto& subsumed_set = queue_entry->owner_of;
70 for (auto& subsumed : subsumed_set) {
71 queue_entries_.erase(subsumed);
72 }
73 // Erase owner queue_id at last to avoid &subsumed_set from being invalid
74 queue_entries_.erase(queue_id);
75}
76
78 std::lock_guard guard(queue_mutex_);
79 const auto& queue_entry = queue_entries_.at(queue_id);
80 FML_DCHECK(queue_entry->subsumed_by == kUnmerged);
81 auto& subsumed_set = queue_entry->owner_of;
82 queue_entry->task_source->ShutDown();
83 for (auto& subsumed : subsumed_set) {
84 queue_entries_.at(subsumed)->task_source->ShutDown();
85 }
86}
87
91
93 TaskQueueId queue_id,
94 const fml::closure& task,
95 fml::TimePoint target_time,
96 fml::TaskSourceGrade task_source_grade) {
97 std::lock_guard guard(queue_mutex_);
98 size_t order = order_++;
99 const auto& queue_entry = queue_entries_.at(queue_id);
100 queue_entry->task_source->RegisterTask(
101 {order, task, target_time, task_source_grade});
102 TaskQueueId loop_to_wake = queue_id;
103 if (queue_entry->subsumed_by != kUnmerged) {
104 loop_to_wake = queue_entry->subsumed_by;
105 }
106
107 // This can happen when the secondary tasks are paused.
108 if (HasPendingTasksUnlocked(loop_to_wake)) {
109 WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));
110 }
111}
112
114 std::lock_guard guard(queue_mutex_);
115 return HasPendingTasksUnlocked(queue_id);
116}
117
119 fml::TimePoint from_time) {
120 std::lock_guard guard(queue_mutex_);
121 if (!HasPendingTasksUnlocked(queue_id)) {
122 return nullptr;
123 }
124 TaskSource::TopTask top = PeekNextTaskUnlocked(queue_id);
125
126 if (!HasPendingTasksUnlocked(queue_id)) {
127 WakeUpUnlocked(queue_id, fml::TimePoint::Max());
128 } else {
129 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
130 }
131
132 if (top.task.GetTargetTime() > from_time) {
133 return nullptr;
134 }
135 fml::closure invocation = top.task.GetTask();
136 const auto task_source_grade = top.task.GetTaskSourceGrade();
137 queue_entries_.at(top.task_queue_id)->task_source->PopTask(task_source_grade);
138 tls_task_source_grade.reset(new TaskSourceGradeHolder{task_source_grade});
139 return invocation;
140}
141
142void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
143 fml::TimePoint time) const {
144 if (queue_entries_.at(queue_id)->wakeable) {
145 queue_entries_.at(queue_id)->wakeable->WakeUp(time);
146 }
147}
148
150 std::lock_guard guard(queue_mutex_);
151 const auto& queue_entry = queue_entries_.at(queue_id);
152 if (queue_entry->subsumed_by != kUnmerged) {
153 return 0;
154 }
155
156 size_t total_tasks = 0;
157 total_tasks += queue_entry->task_source->GetNumPendingTasks();
158
159 auto& subsumed_set = queue_entry->owner_of;
160 for (auto& subsumed : subsumed_set) {
161 const auto& subsumed_entry = queue_entries_.at(subsumed);
162 total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
163 }
164 return total_tasks;
165}
166
168 intptr_t key,
169 const fml::closure& callback) {
170 std::lock_guard guard(queue_mutex_);
171 FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
172 queue_entries_.at(queue_id)->task_observers[key] = callback;
173}
174
176 intptr_t key) {
177 std::lock_guard guard(queue_mutex_);
178 queue_entries_.at(queue_id)->task_observers.erase(key);
179}
180
182 TaskQueueId queue_id) const {
183 std::lock_guard guard(queue_mutex_);
184 std::vector<fml::closure> observers;
185
186 if (queue_entries_.at(queue_id)->subsumed_by != kUnmerged) {
187 return observers;
188 }
189
190 for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
191 observers.push_back(observer.second);
192 }
193
194 auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
195 for (auto& subsumed : subsumed_set) {
196 for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
197 observers.push_back(observer.second);
198 }
199 }
200
201 return observers;
202}
203
205 fml::Wakeable* wakeable) {
206 std::lock_guard guard(queue_mutex_);
207 FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
208 << "Wakeable can only be set once.";
209 queue_entries_.at(queue_id)->wakeable = wakeable;
210}
211
213 if (owner == subsumed) {
214 return true;
215 }
216 std::lock_guard guard(queue_mutex_);
217 auto& owner_entry = queue_entries_.at(owner);
218 auto& subsumed_entry = queue_entries_.at(subsumed);
219 auto& subsumed_set = owner_entry->owner_of;
220 if (subsumed_set.find(subsumed) != subsumed_set.end()) {
221 return true;
222 }
223
224 // Won't check owner_entry->owner_of, because it may contains items when
225 // merged with other different queues.
226
227 // Ensure owner_entry->subsumed_by being kUnmerged
228 if (owner_entry->subsumed_by != kUnmerged) {
229 FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
230 "subsumed by others, owner="
231 << owner << ", subsumed=" << subsumed
232 << ", owner->subsumed_by=" << owner_entry->subsumed_by;
233 return false;
234 }
235 // Ensure subsumed_entry->owner_of being empty
236 if (!subsumed_entry->owner_of.empty()) {
237 FML_LOG(WARNING)
238 << "Thread merging failed: subsumed_entry already owns others, owner="
239 << owner << ", subsumed=" << subsumed
240 << ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
241 return false;
242 }
243 // Ensure subsumed_entry->subsumed_by being kUnmerged
244 if (subsumed_entry->subsumed_by != kUnmerged) {
245 FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
246 "subsumed by others, owner="
247 << owner << ", subsumed=" << subsumed
248 << ", subsumed->subsumed_by="
249 << subsumed_entry->subsumed_by;
250 return false;
251 }
252 // All checking is OK, set merged state.
253 owner_entry->owner_of.insert(subsumed);
254 subsumed_entry->subsumed_by = owner;
255
256 if (HasPendingTasksUnlocked(owner)) {
257 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
258 }
259
260 return true;
261}
262
264 std::lock_guard guard(queue_mutex_);
265 const auto& owner_entry = queue_entries_.at(owner);
266 if (owner_entry->owner_of.empty()) {
267 FML_LOG(WARNING)
268 << "Thread unmerging failed: owner_entry doesn't own anyone, owner="
269 << owner << ", subsumed=" << subsumed;
270 return false;
271 }
272 if (owner_entry->subsumed_by != kUnmerged) {
273 FML_LOG(WARNING)
274 << "Thread unmerging failed: owner_entry was subsumed by others, owner="
275 << owner << ", subsumed=" << subsumed
276 << ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
277 return false;
278 }
279 if (queue_entries_.at(subsumed)->subsumed_by == kUnmerged) {
280 FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
281 "subsumed by others, owner="
282 << owner << ", subsumed=" << subsumed;
283 return false;
284 }
285 if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
286 FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
287 "given subsumed queue id, owner="
288 << owner << ", subsumed=" << subsumed;
289 return false;
290 }
291
292 queue_entries_.at(subsumed)->subsumed_by = kUnmerged;
293 owner_entry->owner_of.erase(subsumed);
294
295 if (HasPendingTasksUnlocked(owner)) {
296 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
297 }
298
299 if (HasPendingTasksUnlocked(subsumed)) {
300 WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
301 }
302
303 return true;
304}
305
307 TaskQueueId subsumed) const {
308 std::lock_guard guard(queue_mutex_);
309 if (owner == kUnmerged || subsumed == kUnmerged) {
310 return false;
311 }
312 auto& subsumed_set = queue_entries_.at(owner)->owner_of;
313 return subsumed_set.find(subsumed) != subsumed_set.end();
314}
315
317 TaskQueueId owner) const {
318 std::lock_guard guard(queue_mutex_);
319 return queue_entries_.at(owner)->owner_of;
320}
321
323 std::lock_guard guard(queue_mutex_);
324 queue_entries_.at(queue_id)->task_source->PauseSecondary();
325}
326
328 std::lock_guard guard(queue_mutex_);
329 queue_entries_.at(queue_id)->task_source->ResumeSecondary();
330 // Schedule a wake as needed.
331 if (HasPendingTasksUnlocked(queue_id)) {
332 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
333 }
334}
335
336// Subsumed queues will never have pending tasks.
337// Owning queues will consider both their and their subsumed tasks.
338bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
339 TaskQueueId queue_id) const {
340 const auto& entry = queue_entries_.at(queue_id);
341 bool is_subsumed = entry->subsumed_by != kUnmerged;
342 if (is_subsumed) {
343 return false;
344 }
345
346 if (!entry->task_source->IsEmpty()) {
347 return true;
348 }
349
350 auto& subsumed_set = entry->owner_of;
351 return std::any_of(
352 subsumed_set.begin(), subsumed_set.end(), [&](const auto& subsumed) {
353 return !queue_entries_.at(subsumed)->task_source->IsEmpty();
354 });
355}
356
357fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
358 TaskQueueId queue_id) const {
359 return PeekNextTaskUnlocked(queue_id).task.GetTargetTime();
360}
361
362TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
363 TaskQueueId owner) const {
364 FML_DCHECK(HasPendingTasksUnlocked(owner));
365 const auto& entry = queue_entries_.at(owner);
366 if (entry->owner_of.empty()) {
367 FML_CHECK(!entry->task_source->IsEmpty());
368 return entry->task_source->Top();
369 }
370
371 // Use optional for the memory of TopTask object.
372 std::optional<TaskSource::TopTask> top_task;
373
374 std::function<void(const TaskSource*)> top_task_updater =
375 [&top_task](const TaskSource* source) {
376 if (source && !source->IsEmpty()) {
377 TaskSource::TopTask other_task = source->Top();
378 if (!top_task.has_value() || top_task->task > other_task.task) {
379 top_task.emplace(other_task);
380 }
381 }
382 };
383
384 TaskSource* owner_tasks = entry->task_source.get();
385 top_task_updater(owner_tasks);
386
387 for (TaskQueueId subsumed : entry->owner_of) {
388 TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();
389 top_task_updater(subsumed_tasks);
390 }
391 // At least one task at the top because PeekNextTaskUnlocked() is called after
392 // HasPendingTasksUnlocked()
393 FML_CHECK(top_task.has_value());
394 // Covered by FML_CHECK.
395 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
396 return top_task.value();
397}
398
399} // namespace fml
fml::TimePoint GetTargetTime() const
const fml::closure & GetTask() const
fml::TaskSourceGrade GetTaskSourceGrade() const
fml::closure GetNextTaskToRun(TaskQueueId queue_id, fml::TimePoint from_time)
bool HasPendingTasks(TaskQueueId queue_id) const
size_t GetNumPendingTasks(TaskQueueId queue_id) const
static TaskSourceGrade GetCurrentTaskSourceGrade()
static MessageLoopTaskQueues * GetInstance()
void ResumeSecondarySource(TaskQueueId queue_id)
void DisposeTasks(TaskQueueId queue_id)
bool Merge(TaskQueueId owner, TaskQueueId subsumed)
bool Unmerge(TaskQueueId owner, TaskQueueId subsumed)
void RegisterTask(TaskQueueId queue_id, const fml::closure &task, fml::TimePoint target_time, fml::TaskSourceGrade task_source_grade=fml::TaskSourceGrade::kUnspecified)
std::vector< fml::closure > GetObserversToNotify(TaskQueueId queue_id) const
void PauseSecondarySource(TaskQueueId queue_id)
bool Owns(TaskQueueId owner, TaskQueueId subsumed) const
Returns true if owner owns the subsumed task queue.
void AddTaskObserver(TaskQueueId queue_id, intptr_t key, const fml::closure &callback)
void RemoveTaskObserver(TaskQueueId queue_id, intptr_t key)
std::set< TaskQueueId > GetSubsumedTaskQueueId(TaskQueueId owner) const
void SetWakeable(TaskQueueId queue_id, fml::Wakeable *wakeable)
void Dispose(TaskQueueId queue_id)
TaskQueueId created_for
std::unique_ptr< TaskSource > task_source
Wakeable * wakeable
TaskQueueEntry(TaskQueueId created_for)
TaskObservers task_observers
std::map< intptr_t, fml::closure > TaskObservers
static const size_t kUnmerged
static const size_t kInvalid
static constexpr TimePoint Max()
Definition time_point.h:39
VkInstance instance
Definition main.cc:64
FlutterDesktopBinaryReply callback
#define FML_LOG(severity)
Definition logging.h:101
#define FML_CHECK(condition)
Definition logging.h:104
#define FML_DCHECK(condition)
Definition logging.h:122
static const TaskQueueId kUnmerged
std::function< void()> closure
Definition closure.h:14
@ kUnspecified
The absence of a specialized TaskSourceGrade.
static thread_local std::unique_ptr< TaskSourceGradeHolder > tls_task_source_grade
const DelayedTask & task
Definition task_source.h:39