Flutter Engine
fml::MessageLoopTaskQueues Class Reference

#include <message_loop_task_queues.h>

Inheritance diagram for fml::MessageLoopTaskQueues:
fml::RefCountedThreadSafe< MessageLoopTaskQueues > fml::internal::RefCountedThreadSafeBase

Public Member Functions

TaskQueueId CreateTaskQueue ()
 
void Dispose (TaskQueueId queue_id)
 
void DisposeTasks (TaskQueueId queue_id)
 
void RegisterTask (TaskQueueId queue_id, const fml::closure &task, fml::TimePoint target_time, fml::TaskSourceGrade task_source_grade=fml::TaskSourceGrade::kUnspecified)
 
bool HasPendingTasks (TaskQueueId queue_id) const
 
fml::closure GetNextTaskToRun (TaskQueueId queue_id, fml::TimePoint from_time)
 
size_t GetNumPendingTasks (TaskQueueId queue_id) const
 
void AddTaskObserver (TaskQueueId queue_id, intptr_t key, const fml::closure &callback)
 
void RemoveTaskObserver (TaskQueueId queue_id, intptr_t key)
 
std::vector< fml::closureGetObserversToNotify (TaskQueueId queue_id) const
 
void SetWakeable (TaskQueueId queue_id, fml::Wakeable *wakeable)
 
bool Merge (TaskQueueId owner, TaskQueueId subsumed)
 
bool Unmerge (TaskQueueId owner, TaskQueueId subsumed)
 
bool Owns (TaskQueueId owner, TaskQueueId subsumed) const
 Returns true if owner owns the subsumed task queue. More...
 
std::set< TaskQueueIdGetSubsumedTaskQueueId (TaskQueueId owner) const
 
void PauseSecondarySource (TaskQueueId queue_id)
 
void ResumeSecondarySource (TaskQueueId queue_id)
 
- Public Member Functions inherited from fml::RefCountedThreadSafe< MessageLoopTaskQueues >
void Release () const
 
- Public Member Functions inherited from fml::internal::RefCountedThreadSafeBase
void AddRef () const
 
bool HasOneRef () const
 
void AssertHasOneRef () const
 

Static Public Member Functions

static fml::RefPtr< MessageLoopTaskQueuesGetInstance ()
 
static TaskSourceGrade GetCurrentTaskSourceGrade ()
 

Additional Inherited Members

- Protected Member Functions inherited from fml::RefCountedThreadSafe< MessageLoopTaskQueues >
 RefCountedThreadSafe ()
 
 ~RefCountedThreadSafe ()
 
- Protected Member Functions inherited from fml::internal::RefCountedThreadSafeBase
 RefCountedThreadSafeBase ()
 
 ~RefCountedThreadSafeBase ()
 
bool Release () const
 
void Adopt ()
 

Detailed Description

A singleton container for all tasks and observers associated with all fml::MessageLoops.

This also wakes up the loop at the required times.

See also
fml::MessageLoop
fml::Wakeable

Definition at line 65 of file message_loop_task_queues.h.

Member Function Documentation

◆ AddTaskObserver()

void fml::MessageLoopTaskQueues::AddTaskObserver ( TaskQueueId  queue_id,
intptr_t  key,
const fml::closure callback 
)

Definition at line 180 of file message_loop_task_queues.cc.

References callback, FML_DCHECK, and key.

182  {
183  std::lock_guard guard(queue_mutex_);
184  FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
185  queue_entries_.at(queue_id)->task_observers[key] = callback;
186 }
#define FML_DCHECK(condition)
Definition: logging.h:86
FlKeyEvent FlKeyResponderAsyncCallback callback

◆ CreateTaskQueue()

TaskQueueId fml::MessageLoopTaskQueues::CreateTaskQueue ( )

Definition at line 60 of file message_loop_task_queues.cc.

60  {
61  std::lock_guard guard(queue_mutex_);
62  TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
63  ++task_queue_id_counter_;
64  queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(loop_id);
65  return loop_id;
66 }

◆ Dispose()

void fml::MessageLoopTaskQueues::Dispose ( TaskQueueId  queue_id)

Definition at line 73 of file message_loop_task_queues.cc.

References fml::_kUnmerged, and FML_DCHECK.

73  {
74  std::lock_guard guard(queue_mutex_);
75  const auto& queue_entry = queue_entries_.at(queue_id);
76  FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
77  auto& subsumed_set = queue_entry->owner_of;
78  for (auto& subsumed : subsumed_set) {
79  queue_entries_.erase(subsumed);
80  }
81  // Erase owner queue_id at last to avoid &subsumed_set from being invalid
82  queue_entries_.erase(queue_id);
83 }
#define FML_DCHECK(condition)
Definition: logging.h:86
static const TaskQueueId _kUnmerged

◆ DisposeTasks()

void fml::MessageLoopTaskQueues::DisposeTasks ( TaskQueueId  queue_id)

Definition at line 85 of file message_loop_task_queues.cc.

References fml::_kUnmerged, and FML_DCHECK.

85  {
86  std::lock_guard guard(queue_mutex_);
87  const auto& queue_entry = queue_entries_.at(queue_id);
88  FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
89  auto& subsumed_set = queue_entry->owner_of;
90  queue_entry->task_source->ShutDown();
91  for (auto& subsumed : subsumed_set) {
92  queue_entries_.at(subsumed)->task_source->ShutDown();
93  }
94 }
#define FML_DCHECK(condition)
Definition: logging.h:86
static const TaskQueueId _kUnmerged

◆ GetCurrentTaskSourceGrade()

TaskSourceGrade fml::MessageLoopTaskQueues::GetCurrentTaskSourceGrade ( )
static

Definition at line 96 of file message_loop_task_queues.cc.

References fml::tls_task_source_grade.

Referenced by flutter::testing::TEST_F().

96  {
97  std::scoped_lock creation(creation_mutex_);
98  return tls_task_source_grade.get()->task_source_grade;
99 }
FML_THREAD_LOCAL ThreadLocalUniquePtr< TaskSourceGradeHolder > tls_task_source_grade

◆ GetInstance()

fml::RefPtr< MessageLoopTaskQueues > fml::MessageLoopTaskQueues::GetInstance ( )
static

Definition at line 50 of file message_loop_task_queues.cc.

References fml::kUnspecified, and fml::tls_task_source_grade.

Referenced by fml::benchmarking::BM_RegisterAndGetTasks(), flutter::VsyncWaiter::FireCallback(), flutter::DartIsolate::LoadLoadingUnitError(), fml::TaskRunnerChecker::RunsOnTheSameThread(), fml::testing::TEST(), and fml::testing::TestNotifyObservers().

50  {
51  std::scoped_lock creation(creation_mutex_);
52  if (!instance_) {
53  instance_ = fml::MakeRefCounted<MessageLoopTaskQueues>();
55  new TaskSourceGradeHolder{TaskSourceGrade::kUnspecified});
56  }
57  return instance_;
58 }
FML_THREAD_LOCAL ThreadLocalUniquePtr< TaskSourceGradeHolder > tls_task_source_grade
The absence of a specialized TaskSourceGrade.

◆ GetNextTaskToRun()

fml::closure fml::MessageLoopTaskQueues::GetNextTaskToRun ( TaskQueueId  queue_id,
fml::TimePoint  from_time 
)

Definition at line 127 of file message_loop_task_queues.cc.

References fml::DelayedTask::GetTargetTime(), fml::DelayedTask::GetTask(), fml::DelayedTask::GetTaskSourceGrade(), fml::TimePoint::Max(), fml::TaskSource::TopTask::task, fml::TaskSource::TopTask::task_queue_id, and fml::tls_task_source_grade.

128  {
129  std::lock_guard guard(queue_mutex_);
130  if (!HasPendingTasksUnlocked(queue_id)) {
131  return nullptr;
132  }
133  TaskSource::TopTask top = PeekNextTaskUnlocked(queue_id);
134 
135  if (!HasPendingTasksUnlocked(queue_id)) {
136  WakeUpUnlocked(queue_id, fml::TimePoint::Max());
137  } else {
138  WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
139  }
140 
141  if (top.task.GetTargetTime() > from_time) {
142  return nullptr;
143  }
144  fml::closure invocation = top.task.GetTask();
145  queue_entries_.at(top.task_queue_id)
146  ->task_source->PopTask(top.task.GetTaskSourceGrade());
147  {
148  std::scoped_lock creation(creation_mutex_);
149  const auto task_source_grade = top.task.GetTaskSourceGrade();
150  tls_task_source_grade.reset(new TaskSourceGradeHolder{task_source_grade});
151  }
152  return invocation;
153 }
FML_THREAD_LOCAL ThreadLocalUniquePtr< TaskSourceGradeHolder > tls_task_source_grade
std::function< void()> closure
Definition: closure.h:14
static constexpr TimePoint Max()
Definition: time_point.h:34

◆ GetNumPendingTasks()

size_t fml::MessageLoopTaskQueues::GetNumPendingTasks ( TaskQueueId  queue_id) const

Definition at line 162 of file message_loop_task_queues.cc.

References fml::_kUnmerged.

162  {
163  std::lock_guard guard(queue_mutex_);
164  const auto& queue_entry = queue_entries_.at(queue_id);
165  if (queue_entry->subsumed_by != _kUnmerged) {
166  return 0;
167  }
168 
169  size_t total_tasks = 0;
170  total_tasks += queue_entry->task_source->GetNumPendingTasks();
171 
172  auto& subsumed_set = queue_entry->owner_of;
173  for (auto& subsumed : subsumed_set) {
174  const auto& subsumed_entry = queue_entries_.at(subsumed);
175  total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
176  }
177  return total_tasks;
178 }
static const TaskQueueId _kUnmerged

◆ GetObserversToNotify()

std::vector< fml::closure > fml::MessageLoopTaskQueues::GetObserversToNotify ( TaskQueueId  queue_id) const

Definition at line 194 of file message_loop_task_queues.cc.

References fml::_kUnmerged.

195  {
196  std::lock_guard guard(queue_mutex_);
197  std::vector<fml::closure> observers;
198 
199  if (queue_entries_.at(queue_id)->subsumed_by != _kUnmerged) {
200  return observers;
201  }
202 
203  for (const auto& observer : queue_entries_.at(queue_id)->task_observers) {
204  observers.push_back(observer.second);
205  }
206 
207  auto& subsumed_set = queue_entries_.at(queue_id)->owner_of;
208  for (auto& subsumed : subsumed_set) {
209  for (const auto& observer : queue_entries_.at(subsumed)->task_observers) {
210  observers.push_back(observer.second);
211  }
212  }
213 
214  return observers;
215 }
static const TaskQueueId _kUnmerged

◆ GetSubsumedTaskQueueId()

std::set< TaskQueueId > fml::MessageLoopTaskQueues::GetSubsumedTaskQueueId ( TaskQueueId  owner) const

Definition at line 329 of file message_loop_task_queues.cc.

330  {
331  std::lock_guard guard(queue_mutex_);
332  return queue_entries_.at(owner)->owner_of;
333 }

◆ HasPendingTasks()

bool fml::MessageLoopTaskQueues::HasPendingTasks ( TaskQueueId  queue_id) const

Definition at line 122 of file message_loop_task_queues.cc.

122  {
123  std::lock_guard guard(queue_mutex_);
124  return HasPendingTasksUnlocked(queue_id);
125 }

◆ Merge()

bool fml::MessageLoopTaskQueues::Merge ( TaskQueueId  owner,
TaskQueueId  subsumed 
)

Definition at line 225 of file message_loop_task_queues.cc.

References fml::_kUnmerged, and FML_LOG.

Referenced by fml::SharedThreadMerger::MergeWithLease().

225  {
226  if (owner == subsumed) {
227  return true;
228  }
229  std::lock_guard guard(queue_mutex_);
230  auto& owner_entry = queue_entries_.at(owner);
231  auto& subsumed_entry = queue_entries_.at(subsumed);
232  auto& subsumed_set = owner_entry->owner_of;
233  if (subsumed_set.find(subsumed) != subsumed_set.end()) {
234  return true;
235  }
236 
237  // Won't check owner_entry->owner_of, because it may contains items when
238  // merged with other different queues.
239 
240  // Ensure owner_entry->subsumed_by being _kUnmerged
241  if (owner_entry->subsumed_by != _kUnmerged) {
242  FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
243  "subsumed by others, owner="
244  << owner << ", subsumed=" << subsumed
245  << ", owner->subsumed_by=" << owner_entry->subsumed_by;
246  return false;
247  }
248  // Ensure subsumed_entry->owner_of being empty
249  if (!subsumed_entry->owner_of.empty()) {
250  FML_LOG(WARNING)
251  << "Thread merging failed: subsumed_entry already owns others, owner="
252  << owner << ", subsumed=" << subsumed
253  << ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
254  return false;
255  }
256  // Ensure subsumed_entry->subsumed_by being _kUnmerged
257  if (subsumed_entry->subsumed_by != _kUnmerged) {
258  FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
259  "subsumed by others, owner="
260  << owner << ", subsumed=" << subsumed
261  << ", subsumed->subsumed_by="
262  << subsumed_entry->subsumed_by;
263  return false;
264  }
265  // All checking is OK, set merged state.
266  owner_entry->owner_of.insert(subsumed);
267  subsumed_entry->subsumed_by = owner;
268 
269  if (HasPendingTasksUnlocked(owner)) {
270  WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
271  }
272 
273  return true;
274 }
#define FML_LOG(severity)
Definition: logging.h:65
static const TaskQueueId _kUnmerged

◆ Owns()

bool fml::MessageLoopTaskQueues::Owns ( TaskQueueId  owner,
TaskQueueId  subsumed 
) const

Returns true if owner owns the subsumed task queue.

Definition at line 319 of file message_loop_task_queues.cc.

References fml::_kUnmerged.

320  {
321  std::lock_guard guard(queue_mutex_);
322  if (owner == _kUnmerged || subsumed == _kUnmerged) {
323  return false;
324  }
325  auto& subsumed_set = queue_entries_.at(owner)->owner_of;
326  return subsumed_set.find(subsumed) != subsumed_set.end();
327 }
static const TaskQueueId _kUnmerged

◆ PauseSecondarySource()

void fml::MessageLoopTaskQueues::PauseSecondarySource ( TaskQueueId  queue_id)

Definition at line 335 of file message_loop_task_queues.cc.

335  {
336  std::lock_guard guard(queue_mutex_);
337  queue_entries_.at(queue_id)->task_source->PauseSecondary();
338 }

◆ RegisterTask()

void fml::MessageLoopTaskQueues::RegisterTask ( TaskQueueId  queue_id,
const fml::closure task,
fml::TimePoint  target_time,
fml::TaskSourceGrade  task_source_grade = fml::TaskSourceGrade::kUnspecified 
)

Definition at line 101 of file message_loop_task_queues.cc.

References fml::_kUnmerged.

105  {
106  std::lock_guard guard(queue_mutex_);
107  size_t order = order_++;
108  const auto& queue_entry = queue_entries_.at(queue_id);
109  queue_entry->task_source->RegisterTask(
110  {order, task, target_time, task_source_grade});
111  TaskQueueId loop_to_wake = queue_id;
112  if (queue_entry->subsumed_by != _kUnmerged) {
113  loop_to_wake = queue_entry->subsumed_by;
114  }
115 
116  // This can happen when the secondary tasks are paused.
117  if (HasPendingTasksUnlocked(loop_to_wake)) {
118  WakeUpUnlocked(loop_to_wake, GetNextWakeTimeUnlocked(loop_to_wake));
119  }
120 }
static const TaskQueueId _kUnmerged

◆ RemoveTaskObserver()

void fml::MessageLoopTaskQueues::RemoveTaskObserver ( TaskQueueId  queue_id,
intptr_t  key 
)

Definition at line 188 of file message_loop_task_queues.cc.

189  {
190  std::lock_guard guard(queue_mutex_);
191  queue_entries_.at(queue_id)->task_observers.erase(key);
192 }

◆ ResumeSecondarySource()

void fml::MessageLoopTaskQueues::ResumeSecondarySource ( TaskQueueId  queue_id)

Definition at line 340 of file message_loop_task_queues.cc.

References fml::_kUnmerged, FML_CHECK, FML_DCHECK, and fml::TaskSource::TopTask::task.

340  {
341  std::lock_guard guard(queue_mutex_);
342  queue_entries_.at(queue_id)->task_source->ResumeSecondary();
343  // Schedule a wake as needed.
344  if (HasPendingTasksUnlocked(queue_id)) {
345  WakeUpUnlocked(queue_id, GetNextWakeTimeUnlocked(queue_id));
346  }
347 }

◆ SetWakeable()

void fml::MessageLoopTaskQueues::SetWakeable ( TaskQueueId  queue_id,
fml::Wakeable wakeable 
)

Definition at line 217 of file message_loop_task_queues.cc.

References FML_CHECK, and fml::TaskQueueEntry::wakeable.

218  {
219  std::lock_guard guard(queue_mutex_);
220  FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
221  << "Wakeable can only be set once.";
222  queue_entries_.at(queue_id)->wakeable = wakeable;
223 }
#define FML_CHECK(condition)
Definition: logging.h:68

◆ Unmerge()

bool fml::MessageLoopTaskQueues::Unmerge ( TaskQueueId  owner,
TaskQueueId  subsumed 
)

Definition at line 276 of file message_loop_task_queues.cc.

References fml::_kUnmerged, and FML_LOG.

Referenced by fml::SharedThreadMerger::MergeWithLease().

276  {
277  std::lock_guard guard(queue_mutex_);
278  const auto& owner_entry = queue_entries_.at(owner);
279  if (owner_entry->owner_of.empty()) {
280  FML_LOG(WARNING)
281  << "Thread unmerging failed: owner_entry doesn't own anyone, owner="
282  << owner << ", subsumed=" << subsumed;
283  return false;
284  }
285  if (owner_entry->subsumed_by != _kUnmerged) {
286  FML_LOG(WARNING)
287  << "Thread unmerging failed: owner_entry was subsumed by others, owner="
288  << owner << ", subsumed=" << subsumed
289  << ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
290  return false;
291  }
292  if (queue_entries_.at(subsumed)->subsumed_by == _kUnmerged) {
293  FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
294  "subsumed by others, owner="
295  << owner << ", subsumed=" << subsumed;
296  return false;
297  }
298  if (owner_entry->owner_of.find(subsumed) == owner_entry->owner_of.end()) {
299  FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
300  "given subsumed queue id, owner="
301  << owner << ", subsumed=" << subsumed;
302  return false;
303  }
304 
305  queue_entries_.at(subsumed)->subsumed_by = _kUnmerged;
306  owner_entry->owner_of.erase(subsumed);
307 
308  if (HasPendingTasksUnlocked(owner)) {
309  WakeUpUnlocked(owner, GetNextWakeTimeUnlocked(owner));
310  }
311 
312  if (HasPendingTasksUnlocked(subsumed)) {
313  WakeUpUnlocked(subsumed, GetNextWakeTimeUnlocked(subsumed));
314  }
315 
316  return true;
317 }
#define FML_LOG(severity)
Definition: logging.h:65
static const TaskQueueId _kUnmerged

The documentation for this class was generated from the following files: