Flutter Engine
 
Loading...
Searching...
No Matches
fml::MessageLoopTaskQueues Class Reference

#include <message_loop_task_queues.h>

Public Member Functions

TaskQueueId CreateTaskQueue ()
 
void Dispose (TaskQueueId queue_id)
 
void DisposeTasks (TaskQueueId queue_id)
 
void RegisterTask (TaskQueueId queue_id, const fml::closure &task, fml::TimePoint target_time, fml::TaskSourceGrade task_source_grade=fml::TaskSourceGrade::kUnspecified)
 
bool HasPendingTasks (TaskQueueId queue_id) const
 
fml::closure GetNextTaskToRun (TaskQueueId queue_id, fml::TimePoint from_time)
 
size_t GetNumPendingTasks (TaskQueueId queue_id) const
 
void AddTaskObserver (TaskQueueId queue_id, intptr_t key, const fml::closure &callback)
 
void RemoveTaskObserver (TaskQueueId queue_id, intptr_t key)
 
std::vector< fml::closureGetObserversToNotify (TaskQueueId queue_id) const
 
void SetWakeable (TaskQueueId queue_id, fml::Wakeable *wakeable)
 
bool Merge (TaskQueueId owner, TaskQueueId subsumed)
 
bool Unmerge (TaskQueueId owner, TaskQueueId subsumed)
 
bool Owns (TaskQueueId owner, TaskQueueId subsumed) const
 Returns true if owner owns the subsumed task queue.
 
std::set< TaskQueueIdGetSubsumedTaskQueueId (TaskQueueId owner) const
 
void PauseSecondarySource (TaskQueueId queue_id)
 
void ResumeSecondarySource (TaskQueueId queue_id)
 

Static Public Member Functions

static MessageLoopTaskQueuesGetInstance ()
 
static TaskSourceGrade GetCurrentTaskSourceGrade ()
 

Detailed Description

A singleton container for all tasks and observers associated with all fml::MessageLoops.

This also wakes up the loop at the required times.

See also
fml::MessageLoop
fml::Wakeable

Definition at line 65 of file message_loop_task_queues.h.

Member Function Documentation

◆ AddTaskObserver()

void fml::MessageLoopTaskQueues::AddTaskObserver ( TaskQueueId  queue_id,
intptr_t  key,
const fml::closure callback 
)

Definition at line 167 of file message_loop_task_queues.cc.

169 {
170 std::lock_guard guard(queue_mutex_);
171 FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
172 queue_entries_.at(queue_id)->task_observers[key] = callback;
173}
FlutterDesktopBinaryReply callback
#define FML_DCHECK(condition)
Definition logging.h:122

References callback, FML_DCHECK, and key.

Referenced by fml::MessageLoopImpl::AddTaskObserver(), flutter::testing::ShellTest::CreateSettingsForFixture(), FlutterDartProject::defaultBundleIdentifier, and main().

◆ CreateTaskQueue()

TaskQueueId fml::MessageLoopTaskQueues::CreateTaskQueue ( )

Definition at line 50 of file message_loop_task_queues.cc.

50 {
51 std::lock_guard guard(queue_mutex_);
52 TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
53 ++task_queue_id_counter_;
54 queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(loop_id);
55 return loop_id;
56}

◆ Dispose()

void fml::MessageLoopTaskQueues::Dispose ( TaskQueueId  queue_id)

Definition at line 65 of file message_loop_task_queues.cc.

65 {
66 std::lock_guard guard(queue_mutex_);
67 const auto& queue_entry = queue_entries_.at(queue_id);
68 FML_DCHECK(queue_entry->subsumed_by == kUnmerged);
69 auto& subsumed_set = queue_entry->owner_of;
70 for (auto& subsumed : subsumed_set) {
71 queue_entries_.erase(subsumed);
72 }
73 // Erase owner queue_id at last to avoid &subsumed_set from being invalid
74 queue_entries_.erase(queue_id);
75}
static const TaskQueueId kUnmerged

References FML_DCHECK, and fml::kUnmerged.

Referenced by fml::MessageLoopImpl::~MessageLoopImpl().

◆ DisposeTasks()

void fml::MessageLoopTaskQueues::DisposeTasks ( TaskQueueId  queue_id)

Definition at line 77 of file message_loop_task_queues.cc.

77 {
78 std::lock_guard guard(queue_mutex_);
79 const auto& queue_entry = queue_entries_.at(queue_id);
80 FML_DCHECK(queue_entry->subsumed_by == kUnmerged);
81 auto& subsumed_set = queue_entry->owner_of;
82 queue_entry->task_source->ShutDown();
83 for (auto& subsumed : subsumed_set) {
84 queue_entries_.at(subsumed)->task_source->ShutDown();
85 }
86}

References FML_DCHECK, and fml::kUnmerged.

Referenced by fml::MessageLoopImpl::DoRun().

◆ GetCurrentTaskSourceGrade()

TaskSourceGrade fml::MessageLoopTaskQueues::GetCurrentTaskSourceGrade ( )
static

Definition at line 88 of file message_loop_task_queues.cc.

88 {
89 return tls_task_source_grade.get()->task_source_grade;
90}
static thread_local std::unique_ptr< TaskSourceGradeHolder > tls_task_source_grade

References fml::tls_task_source_grade.

Referenced by flutter::testing::TEST_F().

◆ GetInstance()

◆ GetNextTaskToRun()

fml::closure fml::MessageLoopTaskQueues::GetNextTaskToRun ( TaskQueueId  queue_id,
fml::TimePoint  from_time 
)

Definition at line 118 of file message_loop_task_queues.cc.

119 {
120 std::lock_guard guard(queue_mutex_);
121 if (!HasPendingTasksUnlocked(queue_id)) {
122 return nullptr;
123 }
124 TaskSource::TopTask top = PeekNextTaskUnlocked(queue_id);
125
126 if (!HasPendingTasksUnlocked(queue_id)) {
127 WakeUpUnlocked(queue_id, fml::TimePoint::Max());
128 } else {
129 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
130 }
131
132 if (top.task.GetTargetTime() > from_time) {
133 return nullptr;
134 }
135 fml::closure invocation = top.task.GetTask();
136 const auto task_source_grade = top.task.GetTaskSourceGrade();
137 queue_entries_.at(top.task_queue_id)->task_source->PopTask(task_source_grade);
138 tls_task_source_grade.reset(new TaskSourceGradeHolder{task_source_grade});
139 return invocation;
140}
static constexpr TimePoint Max()
Definition time_point.h:39
std::function< void()> closure
Definition closure.h:14

References fml::DelayedTask::GetTargetTime(), fml::DelayedTask::GetTask(), fml::DelayedTask::GetTaskSourceGrade(), fml::TimePoint::Max(), fml::TaskSource::TopTask::task, fml::TaskSource::TopTask::task_queue_id, and fml::tls_task_source_grade.

Referenced by fml::testing::CountRemainingTasks().

◆ GetNumPendingTasks()

size_t fml::MessageLoopTaskQueues::GetNumPendingTasks ( TaskQueueId  queue_id) const

Definition at line 149 of file message_loop_task_queues.cc.

149 {
150 std::lock_guard guard(queue_mutex_);
151 const auto& queue_entry = queue_entries_.at(queue_id);
152 if (queue_entry->subsumed_by != kUnmerged) {
153 return 0;
154 }
155
156 size_t total_tasks = 0;
157 total_tasks += queue_entry->task_source->GetNumPendingTasks();
158
159 auto& subsumed_set = queue_entry->owner_of;
160 for (auto& subsumed : subsumed_set) {
161 const auto& subsumed_entry = queue_entries_.at(subsumed);
162 total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
163 }
164 return total_tasks;
165}

References fml::kUnmerged.

◆ GetObserversToNotify()

std::vector< fml::closure > fml::MessageLoopTaskQueues::GetObserversToNotify ( TaskQueueId  queue_id) const

Definition at line 181 of file message_loop_task_queues.cc.

182 {
183 std::lock_guard guard(queue_mutex_);
184 std::vector<fml::closure> observers;
185
186 if (queue_entries_.at(queue_id)->subsumed_by != kUnmerged) {
187 return observers;
188 }
189
190 for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
191 observers.push_back(observer.second);
192 }
193
194 auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
195 for (auto& subsumed : subsumed_set) {
196 for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
197 observers.push_back(observer.second);
198 }
199 }
200
201 return observers;
202}

References fml::kUnmerged.

◆ GetSubsumedTaskQueueId()

std::set< TaskQueueId > fml::MessageLoopTaskQueues::GetSubsumedTaskQueueId ( TaskQueueId  owner) const

Definition at line 316 of file message_loop_task_queues.cc.

317 {
318 std::lock_guard guard(queue_mutex_);
319 return queue_entries_.at(owner)->owner_of;
320}

◆ HasPendingTasks()

bool fml::MessageLoopTaskQueues::HasPendingTasks ( TaskQueueId  queue_id) const

Definition at line 113 of file message_loop_task_queues.cc.

113 {
114 std::lock_guard guard(queue_mutex_);
115 return HasPendingTasksUnlocked(queue_id);
116}

◆ Merge()

bool fml::MessageLoopTaskQueues::Merge ( TaskQueueId  owner,
TaskQueueId  subsumed 
)

Definition at line 212 of file message_loop_task_queues.cc.

212 {
213 if (owner == subsumed) {
214 return true;
215 }
216 std::lock_guard guard(queue_mutex_);
217 auto& owner_entry = queue_entries_.at(owner);
218 auto& subsumed_entry = queue_entries_.at(subsumed);
219 auto& subsumed_set = owner_entry->owner_of;
220 if (subsumed_set.find(subsumed) != subsumed_set.end()) {
221 return true;
222 }
223
224 // Won't check owner_entry->owner_of, because it may contains items when
225 // merged with other different queues.
226
227 // Ensure owner_entry->subsumed_by being kUnmerged
228 if (owner_entry->subsumed_by != kUnmerged) {
229 FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
230 "subsumed by others, owner="
231 << owner << ", subsumed=" << subsumed
232 << ", owner->subsumed_by=" << owner_entry->subsumed_by;
233 return false;
234 }
235 // Ensure subsumed_entry->owner_of being empty
236 if (!subsumed_entry->owner_of.empty()) {
237 FML_LOG(WARNING)
238 << "Thread merging failed: subsumed_entry already owns others, owner="
239 << owner << ", subsumed=" << subsumed
240 << ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
241 return false;
242 }
243 // Ensure subsumed_entry->subsumed_by being kUnmerged
244 if (subsumed_entry->subsumed_by != kUnmerged) {
245 FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
246 "subsumed by others, owner="
247 << owner << ", subsumed=" << subsumed
248 << ", subsumed->subsumed_by="
249 << subsumed_entry->subsumed_by;
250 return false;
251 }
252 // All checking is OK, set merged state.
253 owner_entry->owner_of.insert(subsumed);
254 subsumed_entry->subsumed_by = owner;
255
256 if (HasPendingTasksUnlocked(owner)) {
257 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
258 }
259
260 return true;
261}
#define FML_LOG(severity)
Definition logging.h:101

References FML_LOG, and fml::kUnmerged.

Referenced by fml::SharedThreadMerger::MergeWithLease(), flutter::Engine::Run(), and fml::testing::TEST().

◆ Owns()

bool fml::MessageLoopTaskQueues::Owns ( TaskQueueId  owner,
TaskQueueId  subsumed 
) const

Returns true if owner owns the subsumed task queue.

Definition at line 306 of file message_loop_task_queues.cc.

307 {
308 std::lock_guard guard(queue_mutex_);
309 if (owner == kUnmerged || subsumed == kUnmerged) {
310 return false;
311 }
312 auto& subsumed_set = queue_entries_.at(owner)->owner_of;
313 return subsumed_set.find(subsumed) != subsumed_set.end();
314}

References fml::kUnmerged.

◆ PauseSecondarySource()

void fml::MessageLoopTaskQueues::PauseSecondarySource ( TaskQueueId  queue_id)

Definition at line 322 of file message_loop_task_queues.cc.

322 {
323 std::lock_guard guard(queue_mutex_);
324 queue_entries_.at(queue_id)->task_source->PauseSecondary();
325}

◆ RegisterTask()

void fml::MessageLoopTaskQueues::RegisterTask ( TaskQueueId  queue_id,
const fml::closure task,
fml::TimePoint  target_time,
fml::TaskSourceGrade  task_source_grade = fml::TaskSourceGrade::kUnspecified 
)

Definition at line 92 of file message_loop_task_queues.cc.

96 {
97 std::lock_guard guard(queue_mutex_);
98 size_t order = order_++;
99 const auto& queue_entry = queue_entries_.at(queue_id);
100 queue_entry->task_source->RegisterTask(
101 {order, task, target_time, task_source_grade});
102 TaskQueueId loop_to_wake = queue_id;
103 if (queue_entry->subsumed_by != kUnmerged) {
104 loop_to_wake = queue_entry->subsumed_by;
105 }
106
107 // This can happen when the secondary tasks are paused.
108 if (HasPendingTasksUnlocked(loop_to_wake)) {
109 WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));
110 }
111}

References fml::kUnmerged.

Referenced by fml::MessageLoopImpl::PostTask().

◆ RemoveTaskObserver()

void fml::MessageLoopTaskQueues::RemoveTaskObserver ( TaskQueueId  queue_id,
intptr_t  key 
)

Definition at line 175 of file message_loop_task_queues.cc.

176 {
177 std::lock_guard guard(queue_mutex_);
178 queue_entries_.at(queue_id)->task_observers.erase(key);
179}

References key.

Referenced by flutter::testing::ShellTest::CreateSettingsForFixture(), FlutterDartProject::defaultBundleIdentifier, main(), and fml::MessageLoopImpl::RemoveTaskObserver().

◆ ResumeSecondarySource()

void fml::MessageLoopTaskQueues::ResumeSecondarySource ( TaskQueueId  queue_id)

Definition at line 327 of file message_loop_task_queues.cc.

327 {
328 std::lock_guard guard(queue_mutex_);
329 queue_entries_.at(queue_id)->task_source->ResumeSecondary();
330 // Schedule a wake as needed.
331 if (HasPendingTasksUnlocked(queue_id)) {
332 WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
333 }
334}

◆ SetWakeable()

void fml::MessageLoopTaskQueues::SetWakeable ( TaskQueueId  queue_id,
fml::Wakeable wakeable 
)

Definition at line 204 of file message_loop_task_queues.cc.

205 {
206 std::lock_guard guard(queue_mutex_);
207 FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
208 << "Wakeable can only be set once.";
209 queue_entries_.at(queue_id)->wakeable = wakeable;
210}
#define FML_CHECK(condition)
Definition logging.h:104

References FML_CHECK.

Referenced by fml::MessageLoopImpl::MessageLoopImpl().

◆ Unmerge()

bool fml::MessageLoopTaskQueues::Unmerge ( TaskQueueId  owner,
TaskQueueId  subsumed 
)

Definition at line 263 of file message_loop_task_queues.cc.

263 {
264 std::lock_guard guard(queue_mutex_);
265 const auto& owner_entry = queue_entries_.at(owner);
266 if (owner_entry->owner_of.empty()) {
267 FML_LOG(WARNING)
268 << "Thread unmerging failed: owner_entry doesn't own anyone, owner="
269 << owner << ", subsumed=" << subsumed;
270 return false;
271 }
272 if (owner_entry->subsumed_by != kUnmerged) {
273 FML_LOG(WARNING)
274 << "Thread unmerging failed: owner_entry was subsumed by others, owner="
275 << owner << ", subsumed=" << subsumed
276 << ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
277 return false;
278 }
279 if (queue_entries_.at(subsumed)->subsumed_by == kUnmerged) {
280 FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
281 "subsumed by others, owner="
282 << owner << ", subsumed=" << subsumed;
283 return false;
284 }
285 if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
286 FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
287 "given subsumed queue id, owner="
288 << owner << ", subsumed=" << subsumed;
289 return false;
290 }
291
292 queue_entries_.at(subsumed)->subsumed_by = kUnmerged;
293 owner_entry->owner_of.erase(subsumed);
294
295 if (HasPendingTasksUnlocked(owner)) {
296 WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
297 }
298
299 if (HasPendingTasksUnlocked(subsumed)) {
300 WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
301 }
302
303 return true;
304}

References FML_LOG, and fml::kUnmerged.

Referenced by fml::testing::TEST().


The documentation for this class was generated from the following files: