5#define FML_USED_ON_EMBEDDER
7#include "flutter/fml/message_loop_task_queues.h"
14#include "flutter/fml/make_copyable.h"
15#include "flutter/fml/task_source.h"
25class TaskSourceGradeHolder {
30 : task_source_grade(task_source_grade_arg) {}
34static thread_local std::unique_ptr<TaskSourceGradeHolder>
38 : subsumed_by(
kUnmerged), created_for(created_for_arg) {
50 std::lock_guard guard(queue_mutex_);
52 ++task_queue_id_counter_;
53 queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(loop_id);
57MessageLoopTaskQueues::MessageLoopTaskQueues() : order_(0) {
62MessageLoopTaskQueues::~MessageLoopTaskQueues() =
default;
65 std::lock_guard guard(queue_mutex_);
66 const auto& queue_entry = queue_entries_.at(queue_id);
68 auto& subsumed_set = queue_entry->owner_of;
69 for (
auto& subsumed : subsumed_set) {
70 queue_entries_.erase(subsumed);
73 queue_entries_.erase(queue_id);
77 std::lock_guard guard(queue_mutex_);
78 const auto& queue_entry = queue_entries_.at(queue_id);
80 auto& subsumed_set = queue_entry->owner_of;
81 queue_entry->task_source->ShutDown();
82 for (
auto& subsumed : subsumed_set) {
83 queue_entries_.at(subsumed)->task_source->ShutDown();
96 std::lock_guard guard(queue_mutex_);
97 size_t order = order_++;
98 const auto& queue_entry = queue_entries_.at(queue_id);
99 queue_entry->task_source->RegisterTask(
100 {order, task, target_time, task_source_grade});
102 if (queue_entry->subsumed_by !=
kUnmerged) {
103 loop_to_wake = queue_entry->subsumed_by;
107 if (HasPendingTasksUnlocked(loop_to_wake)) {
108 WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));
113 std::lock_guard guard(queue_mutex_);
114 return HasPendingTasksUnlocked(queue_id);
119 std::lock_guard guard(queue_mutex_);
120 if (!HasPendingTasksUnlocked(queue_id)) {
125 if (!HasPendingTasksUnlocked(queue_id)) {
128 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
142void MessageLoopTaskQueues::WakeUpUnlocked(
TaskQueueId queue_id,
144 if (queue_entries_.at(queue_id)->wakeable) {
145 queue_entries_.at(queue_id)->wakeable->WakeUp(
time);
150 std::lock_guard guard(queue_mutex_);
151 const auto& queue_entry = queue_entries_.at(queue_id);
152 if (queue_entry->subsumed_by !=
kUnmerged) {
156 size_t total_tasks = 0;
157 total_tasks += queue_entry->task_source->GetNumPendingTasks();
159 auto& subsumed_set = queue_entry->owner_of;
160 for (
auto& subsumed : subsumed_set) {
161 const auto& subsumed_entry = queue_entries_.at(subsumed);
162 total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
170 std::lock_guard guard(queue_mutex_);
172 queue_entries_.at(queue_id)->task_observers[
key] =
callback;
177 std::lock_guard guard(queue_mutex_);
178 queue_entries_.at(queue_id)->task_observers.erase(
key);
183 std::lock_guard guard(queue_mutex_);
184 std::vector<fml::closure> observers;
186 if (queue_entries_.at(queue_id)->subsumed_by !=
kUnmerged) {
190 for (
const auto& observer : queue_entries_.at(queue_id)->task_observers) {
191 observers.push_back(observer.second);
194 auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
195 for (
auto& subsumed : subsumed_set) {
196 for (
const auto& observer : queue_entries_.at(subsumed)->task_observers) {
197 observers.push_back(observer.second);
206 std::lock_guard guard(queue_mutex_);
207 FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
208 <<
"Wakeable can only be set once.";
209 queue_entries_.at(queue_id)->wakeable = wakeable;
213 if (owner == subsumed) {
216 std::lock_guard guard(queue_mutex_);
217 auto& owner_entry = queue_entries_.at(owner);
218 auto& subsumed_entry = queue_entries_.at(subsumed);
219 auto& subsumed_set = owner_entry->owner_of;
220 if (subsumed_set.find(subsumed) != subsumed_set.end()) {
228 if (owner_entry->subsumed_by !=
kUnmerged) {
229 FML_LOG(WARNING) <<
"Thread merging failed: owner_entry was already "
230 "subsumed by others, owner="
231 << owner <<
", subsumed=" << subsumed
232 <<
", owner->subsumed_by=" << owner_entry->subsumed_by;
236 if (!subsumed_entry->owner_of.empty()) {
238 <<
"Thread merging failed: subsumed_entry already owns others, owner="
239 << owner <<
", subsumed=" << subsumed
240 <<
", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
244 if (subsumed_entry->subsumed_by !=
kUnmerged) {
245 FML_LOG(WARNING) <<
"Thread merging failed: subsumed_entry was already "
246 "subsumed by others, owner="
247 << owner <<
", subsumed=" << subsumed
248 <<
", subsumed->subsumed_by="
249 << subsumed_entry->subsumed_by;
253 owner_entry->owner_of.insert(subsumed);
254 subsumed_entry->subsumed_by = owner;
256 if (HasPendingTasksUnlocked(owner)) {
257 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
264 std::lock_guard guard(queue_mutex_);
265 const auto& owner_entry = queue_entries_.at(owner);
266 if (owner_entry->owner_of.empty()) {
268 <<
"Thread unmerging failed: owner_entry doesn't own anyone, owner="
269 << owner <<
", subsumed=" << subsumed;
272 if (owner_entry->subsumed_by !=
kUnmerged) {
274 <<
"Thread unmerging failed: owner_entry was subsumed by others, owner="
275 << owner <<
", subsumed=" << subsumed
276 <<
", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
279 if (queue_entries_.at(subsumed)->subsumed_by ==
kUnmerged) {
280 FML_LOG(WARNING) <<
"Thread unmerging failed: subsumed_entry wasn't "
281 "subsumed by others, owner="
282 << owner <<
", subsumed=" << subsumed;
285 if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
286 FML_LOG(WARNING) <<
"Thread unmerging failed: owner_entry didn't own the "
287 "given subsumed queue id, owner="
288 << owner <<
", subsumed=" << subsumed;
292 queue_entries_.at(subsumed)->subsumed_by =
kUnmerged;
293 owner_entry->owner_of.erase(subsumed);
295 if (HasPendingTasksUnlocked(owner)) {
296 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
299 if (HasPendingTasksUnlocked(subsumed)) {
300 WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
308 std::lock_guard guard(queue_mutex_);
312 auto& subsumed_set = queue_entries_.at(owner)->owner_of;
313 return subsumed_set.find(subsumed) != subsumed_set.end();
318 std::lock_guard guard(queue_mutex_);
319 return queue_entries_.at(owner)->owner_of;
323 std::lock_guard guard(queue_mutex_);
324 queue_entries_.at(queue_id)->task_source->PauseSecondary();
328 std::lock_guard guard(queue_mutex_);
329 queue_entries_.at(queue_id)->task_source->ResumeSecondary();
331 if (HasPendingTasksUnlocked(queue_id)) {
332 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
338bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
340 const auto& entry = queue_entries_.at(queue_id);
341 bool is_subsumed = entry->subsumed_by !=
kUnmerged;
346 if (!entry->task_source->IsEmpty()) {
350 auto& subsumed_set = entry->owner_of;
352 subsumed_set.begin(), subsumed_set.end(), [&](
const auto& subsumed) {
353 return !queue_entries_.at(subsumed)->task_source->IsEmpty();
358 TaskQueueId queue_id)
const {
362TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
363 TaskQueueId owner)
const {
365 const auto& entry = queue_entries_.at(owner);
366 if (entry->owner_of.empty()) {
367 FML_CHECK(!entry->task_source->IsEmpty());
368 return entry->task_source->Top();
372 std::optional<TaskSource::TopTask> top_task;
375 [&top_task](
const TaskSource*
source) {
377 TaskSource::TopTask other_task =
source->Top();
378 if (!top_task.has_value() || top_task->task > other_task.task) {
379 top_task.emplace(other_task);
384 TaskSource* owner_tasks = entry->task_source.get();
385 top_task_updater(owner_tasks);
387 for (TaskQueueId subsumed : entry->owner_of) {
388 TaskSource* subsumed_tasks = queue_entries_.at(subsumed)->task_source.get();
389 top_task_updater(subsumed_tasks);
396 return top_task.value();
fml::TimePoint GetTargetTime() const
const fml::closure & GetTask() const
fml::TaskSourceGrade GetTaskSourceGrade() const
fml::closure GetNextTaskToRun(TaskQueueId queue_id, fml::TimePoint from_time)
bool HasPendingTasks(TaskQueueId queue_id) const
size_t GetNumPendingTasks(TaskQueueId queue_id) const
static TaskSourceGrade GetCurrentTaskSourceGrade()
static MessageLoopTaskQueues * GetInstance()
void ResumeSecondarySource(TaskQueueId queue_id)
void DisposeTasks(TaskQueueId queue_id)
bool Merge(TaskQueueId owner, TaskQueueId subsumed)
bool Unmerge(TaskQueueId owner, TaskQueueId subsumed)
TaskQueueId CreateTaskQueue()
void RegisterTask(TaskQueueId queue_id, const fml::closure &task, fml::TimePoint target_time, fml::TaskSourceGrade task_source_grade=fml::TaskSourceGrade::kUnspecified)
std::vector< fml::closure > GetObserversToNotify(TaskQueueId queue_id) const
void PauseSecondarySource(TaskQueueId queue_id)
bool Owns(TaskQueueId owner, TaskQueueId subsumed) const
Returns true if owner owns the subsumed task queue.
void AddTaskObserver(TaskQueueId queue_id, intptr_t key, const fml::closure &callback)
void RemoveTaskObserver(TaskQueueId queue_id, intptr_t key)
std::set< TaskQueueId > GetSubsumedTaskQueueId(TaskQueueId owner) const
void SetWakeable(TaskQueueId queue_id, fml::Wakeable *wakeable)
void Dispose(TaskQueueId queue_id)
std::unique_ptr< TaskSource > task_source
TaskQueueEntry(TaskQueueId created_for)
TaskObservers task_observers
std::map< intptr_t, fml::closure > TaskObservers
static const size_t kUnmerged
static constexpr TimePoint Max()
FlKeyEvent uint64_t FlKeyResponderAsyncCallback callback
#define FML_LOG(severity)
#define FML_CHECK(condition)
#define FML_DCHECK(condition)
Dart_NativeFunction function
static const TaskQueueId kUnmerged
std::function< void()> closure
@ kUnspecified
The absence of a specialized TaskSourceGrade.
static thread_local std::unique_ptr< TaskSourceGradeHolder > tls_task_source_grade
static double time(int loops, Benchmark *bench, Target *target)
TaskQueueId task_queue_id