Flutter Engine
The Flutter Engine
Public Member Functions | Static Public Member Functions | List of all members
fml::MessageLoopTaskQueues Class Reference

#include <message_loop_task_queues.h>

Public Member Functions

TaskQueueId CreateTaskQueue ()
 
void Dispose (TaskQueueId queue_id)
 
void DisposeTasks (TaskQueueId queue_id)
 
void RegisterTask (TaskQueueId queue_id, const fml::closure &task, fml::TimePoint target_time, fml::TaskSourceGrade task_source_grade=fml::TaskSourceGrade::kUnspecified)
 
bool HasPendingTasks (TaskQueueId queue_id) const
 
fml::closure GetNextTaskToRun (TaskQueueId queue_id, fml::TimePoint from_time)
 
size_t GetNumPendingTasks (TaskQueueId queue_id) const
 
void AddTaskObserver (TaskQueueId queue_id, intptr_t key, const fml::closure &callback)
 
void RemoveTaskObserver (TaskQueueId queue_id, intptr_t key)
 
std::vector< fml::closureGetObserversToNotify (TaskQueueId queue_id) const
 
void SetWakeable (TaskQueueId queue_id, fml::Wakeable *wakeable)
 
bool Merge (TaskQueueId owner, TaskQueueId subsumed)
 
bool Unmerge (TaskQueueId owner, TaskQueueId subsumed)
 
bool Owns (TaskQueueId owner, TaskQueueId subsumed) const
 Returns true if owner owns the subsumed task queue. More...
 
std::set< TaskQueueIdGetSubsumedTaskQueueId (TaskQueueId owner) const
 
void PauseSecondarySource (TaskQueueId queue_id)
 
void ResumeSecondarySource (TaskQueueId queue_id)
 

Static Public Member Functions

static MessageLoopTaskQueuesGetInstance ()
 
static TaskSourceGrade GetCurrentTaskSourceGrade ()
 

Detailed Description

A singleton container for all tasks and observers associated with all fml::MessageLoops.

This also wakes up the loop at the required times.

See also
fml::MessageLoop
fml::Wakeable

Definition at line 66 of file message_loop_task_queues.h.

Member Function Documentation

◆ AddTaskObserver()

void fml::MessageLoopTaskQueues::AddTaskObserver ( TaskQueueId  queue_id,
intptr_t  key,
const fml::closure callback 
)

Definition at line 167 of file message_loop_task_queues.cc.

169 {
170 std::lock_guard guard(queue_mutex_);
171 FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
172 queue_entries_.at(queue_id)->task_observers[key] = callback;
173}
FlKeyEvent uint64_t FlKeyResponderAsyncCallback callback
#define FML_DCHECK(condition)
Definition: logging.h:103

◆ CreateTaskQueue()

TaskQueueId fml::MessageLoopTaskQueues::CreateTaskQueue ( )

Definition at line 49 of file message_loop_task_queues.cc.

49 {
50 std::lock_guard guard(queue_mutex_);
51 TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
52 ++task_queue_id_counter_;
53 queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(loop_id);
54 return loop_id;
55}

◆ Dispose()

void fml::MessageLoopTaskQueues::Dispose ( TaskQueueId  queue_id)

Definition at line 64 of file message_loop_task_queues.cc.

64 {
65 std::lock_guard guard(queue_mutex_);
66 const auto& queue_entry = queue_entries_.at(queue_id);
67 FML_DCHECK(queue_entry->subsumed_by == kUnmerged);
68 auto& subsumed_set = queue_entry->owner_of;
69 for (auto& subsumed : subsumed_set) {
70 queue_entries_.erase(subsumed);
71 }
72 // Erase owner queue_id at last to avoid &subsumed_set from being invalid
73 queue_entries_.erase(queue_id);
74}
static const TaskQueueId kUnmerged

◆ DisposeTasks()

void fml::MessageLoopTaskQueues::DisposeTasks ( TaskQueueId  queue_id)

Definition at line 76 of file message_loop_task_queues.cc.

76 {
77 std::lock_guard guard(queue_mutex_);
78 const auto& queue_entry = queue_entries_.at(queue_id);
79 FML_DCHECK(queue_entry->subsumed_by == kUnmerged);
80 auto& subsumed_set = queue_entry->owner_of;
81 queue_entry->task_source->ShutDown();
82 for (auto& subsumed : subsumed_set) {
83 queue_entries_.at(subsumed)->task_source->ShutDown();
84 }
85}

◆ GetCurrentTaskSourceGrade()

TaskSourceGrade fml::MessageLoopTaskQueues::GetCurrentTaskSourceGrade ( )
static

Definition at line 87 of file message_loop_task_queues.cc.

87 {
88 return tls_task_source_grade.get()->task_source_grade;
89}
static thread_local std::unique_ptr< TaskSourceGradeHolder > tls_task_source_grade

◆ GetInstance()

MessageLoopTaskQueues * fml::MessageLoopTaskQueues::GetInstance ( )
static

Definition at line 44 of file message_loop_task_queues.cc.

44 {
45 static MessageLoopTaskQueues* instance = new MessageLoopTaskQueues;
46 return instance;
47}
VkInstance instance
Definition: main.cc:48

◆ GetNextTaskToRun()

fml::closure fml::MessageLoopTaskQueues::GetNextTaskToRun ( TaskQueueId  queue_id,
fml::TimePoint  from_time 
)

Definition at line 117 of file message_loop_task_queues.cc.

118 {
119 std::lock_guard guard(queue_mutex_);
120 if (!HasPendingTasksUnlocked(queue_id)) {
121 return nullptr;
122 }
123 TaskSource::TopTask top = PeekNextTaskUnlocked(queue_id);
124
125 if (!HasPendingTasksUnlocked(queue_id)) {
126 WakeUpUnlocked(queue_id, fml::TimePoint::Max());
127 } else {
128 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
129 }
130
131 if (top.task.GetTargetTime() > from_time) {
132 return nullptr;
133 }
134 fml::closure invocation = top.task.GetTask();
135 queue_entries_.at(top.task_queue_id)
136 ->task_source->PopTask(top.task.GetTaskSourceGrade());
137 const auto task_source_grade = top.task.GetTaskSourceGrade();
138 tls_task_source_grade.reset(new TaskSourceGradeHolder{task_source_grade});
139 return invocation;
140}
static constexpr TimePoint Max()
Definition: time_point.h:39
std::function< void()> closure
Definition: closure.h:14

◆ GetNumPendingTasks()

size_t fml::MessageLoopTaskQueues::GetNumPendingTasks ( TaskQueueId  queue_id) const

Definition at line 149 of file message_loop_task_queues.cc.

149 {
150 std::lock_guard guard(queue_mutex_);
151 const auto& queue_entry = queue_entries_.at(queue_id);
152 if (queue_entry->subsumed_by != kUnmerged) {
153 return 0;
154 }
155
156 size_t total_tasks = 0;
157 total_tasks += queue_entry->task_source->GetNumPendingTasks();
158
159 auto& subsumed_set = queue_entry->owner_of;
160 for (auto& subsumed : subsumed_set) {
161 const auto& subsumed_entry = queue_entries_.at(subsumed);
162 total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
163 }
164 return total_tasks;
165}

◆ GetObserversToNotify()

std::vector< fml::closure > fml::MessageLoopTaskQueues::GetObserversToNotify ( TaskQueueId  queue_id) const

Definition at line 181 of file message_loop_task_queues.cc.

182 {
183 std::lock_guard guard(queue_mutex_);
184 std::vector<fml::closure> observers;
185
186 if (queue_entries_.at(queue_id)->subsumed_by != kUnmerged) {
187 return observers;
188 }
189
190 for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
191 observers.push_back(observer.second);
192 }
193
194 auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
195 for (auto& subsumed : subsumed_set) {
196 for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
197 observers.push_back(observer.second);
198 }
199 }
200
201 return observers;
202}

◆ GetSubsumedTaskQueueId()

std::set< TaskQueueId > fml::MessageLoopTaskQueues::GetSubsumedTaskQueueId ( TaskQueueId  owner) const

Definition at line 316 of file message_loop_task_queues.cc.

317 {
318 std::lock_guard guard(queue_mutex_);
319 return queue_entries_.at(owner)->owner_of;
320}

◆ HasPendingTasks()

bool fml::MessageLoopTaskQueues::HasPendingTasks ( TaskQueueId  queue_id) const

Definition at line 112 of file message_loop_task_queues.cc.

112 {
113 std::lock_guard guard(queue_mutex_);
114 return HasPendingTasksUnlocked(queue_id);
115}

◆ Merge()

bool fml::MessageLoopTaskQueues::Merge ( TaskQueueId  owner,
TaskQueueId  subsumed 
)

Definition at line 212 of file message_loop_task_queues.cc.

212 {
213 if (owner == subsumed) {
214 return true;
215 }
216 std::lock_guard guard(queue_mutex_);
217 auto& owner_entry = queue_entries_.at(owner);
218 auto& subsumed_entry = queue_entries_.at(subsumed);
219 auto& subsumed_set = owner_entry->owner_of;
220 if (subsumed_set.find(subsumed) != subsumed_set.end()) {
221 return true;
222 }
223
224 // Won't check owner_entry->owner_of, because it may contains items when
225 // merged with other different queues.
226
227 // Ensure owner_entry->subsumed_by being kUnmerged
228 if (owner_entry->subsumed_by != kUnmerged) {
229 FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
230 "subsumed by others, owner="
231 << owner << ", subsumed=" << subsumed
232 << ", owner->subsumed_by=" << owner_entry->subsumed_by;
233 return false;
234 }
235 // Ensure subsumed_entry->owner_of being empty
236 if (!subsumed_entry->owner_of.empty()) {
237 FML_LOG(WARNING)
238 << "Thread merging failed: subsumed_entry already owns others, owner="
239 << owner << ", subsumed=" << subsumed
240 << ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
241 return false;
242 }
243 // Ensure subsumed_entry->subsumed_by being kUnmerged
244 if (subsumed_entry->subsumed_by != kUnmerged) {
245 FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
246 "subsumed by others, owner="
247 << owner << ", subsumed=" << subsumed
248 << ", subsumed->subsumed_by="
249 << subsumed_entry->subsumed_by;
250 return false;
251 }
252 // All checking is OK, set merged state.
253 owner_entry->owner_of.insert(subsumed);
254 subsumed_entry->subsumed_by = owner;
255
256 if (HasPendingTasksUnlocked(owner)) {
257 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
258 }
259
260 return true;
261}
#define FML_LOG(severity)
Definition: logging.h:82

◆ Owns()

bool fml::MessageLoopTaskQueues::Owns ( TaskQueueId  owner,
TaskQueueId  subsumed 
) const

Returns true if owner owns the subsumed task queue.

Definition at line 306 of file message_loop_task_queues.cc.

307 {
308 std::lock_guard guard(queue_mutex_);
309 if (owner == kUnmerged || subsumed == kUnmerged) {
310 return false;
311 }
312 auto& subsumed_set = queue_entries_.at(owner)->owner_of;
313 return subsumed_set.find(subsumed) != subsumed_set.end();
314}

◆ PauseSecondarySource()

void fml::MessageLoopTaskQueues::PauseSecondarySource ( TaskQueueId  queue_id)

Definition at line 322 of file message_loop_task_queues.cc.

322 {
323 std::lock_guard guard(queue_mutex_);
324 queue_entries_.at(queue_id)->task_source->PauseSecondary();
325}

◆ RegisterTask()

void fml::MessageLoopTaskQueues::RegisterTask ( TaskQueueId  queue_id,
const fml::closure task,
fml::TimePoint  target_time,
fml::TaskSourceGrade  task_source_grade = fml::TaskSourceGrade::kUnspecified 
)

Definition at line 91 of file message_loop_task_queues.cc.

95 {
96 std::lock_guard guard(queue_mutex_);
97 size_t order = order_++;
98 const auto& queue_entry = queue_entries_.at(queue_id);
99 queue_entry->task_source->RegisterTask(
100 {order, task, target_time, task_source_grade});
101 TaskQueueId loop_to_wake = queue_id;
102 if (queue_entry->subsumed_by != kUnmerged) {
103 loop_to_wake = queue_entry->subsumed_by;
104 }
105
106 // This can happen when the secondary tasks are paused.
107 if (HasPendingTasksUnlocked(loop_to_wake)) {
108 WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));
109 }
110}

◆ RemoveTaskObserver()

void fml::MessageLoopTaskQueues::RemoveTaskObserver ( TaskQueueId  queue_id,
intptr_t  key 
)

Definition at line 175 of file message_loop_task_queues.cc.

176 {
177 std::lock_guard guard(queue_mutex_);
178 queue_entries_.at(queue_id)->task_observers.erase(key);
179}

◆ ResumeSecondarySource()

void fml::MessageLoopTaskQueues::ResumeSecondarySource ( TaskQueueId  queue_id)

Definition at line 327 of file message_loop_task_queues.cc.

327 {
328 std::lock_guard guard(queue_mutex_);
329 queue_entries_.at(queue_id)->task_source->ResumeSecondary();
330 // Schedule a wake as needed.
331 if (HasPendingTasksUnlocked(queue_id)) {
332 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
333 }
334}

◆ SetWakeable()

void fml::MessageLoopTaskQueues::SetWakeable ( TaskQueueId  queue_id,
fml::Wakeable wakeable 
)

Definition at line 204 of file message_loop_task_queues.cc.

205 {
206 std::lock_guard guard(queue_mutex_);
207 FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
208 << "Wakeable can only be set once.";
209 queue_entries_.at(queue_id)->wakeable = wakeable;
210}
#define FML_CHECK(condition)
Definition: logging.h:85

◆ Unmerge()

bool fml::MessageLoopTaskQueues::Unmerge ( TaskQueueId  owner,
TaskQueueId  subsumed 
)

Definition at line 263 of file message_loop_task_queues.cc.

263 {
264 std::lock_guard guard(queue_mutex_);
265 const auto& owner_entry = queue_entries_.at(owner);
266 if (owner_entry->owner_of.empty()) {
267 FML_LOG(WARNING)
268 << "Thread unmerging failed: owner_entry doesn't own anyone, owner="
269 << owner << ", subsumed=" << subsumed;
270 return false;
271 }
272 if (owner_entry->subsumed_by != kUnmerged) {
273 FML_LOG(WARNING)
274 << "Thread unmerging failed: owner_entry was subsumed by others, owner="
275 << owner << ", subsumed=" << subsumed
276 << ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
277 return false;
278 }
279 if (queue_entries_.at(subsumed)->subsumed_by == kUnmerged) {
280 FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
281 "subsumed by others, owner="
282 << owner << ", subsumed=" << subsumed;
283 return false;
284 }
285 if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
286 FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
287 "given subsumed queue id, owner="
288 << owner << ", subsumed=" << subsumed;
289 return false;
290 }
291
292 queue_entries_.at(subsumed)->subsumed_by = kUnmerged;
293 owner_entry->owner_of.erase(subsumed);
294
295 if (HasPendingTasksUnlocked(owner)) {
296 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
297 }
298
299 if (HasPendingTasksUnlocked(subsumed)) {
300 WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
301 }
302
303 return true;
304}

The documentation for this class was generated from the following files: