Flutter Engine
concurrent_message_loop.cc
Go to the documentation of this file.
1 // Copyright 2013 The Flutter Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "flutter/fml/concurrent_message_loop.h"
6 
7 #include <algorithm>
8 
9 #include "flutter/fml/thread.h"
10 #include "flutter/fml/trace_event.h"
11 
12 namespace fml {
13 
14 std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
15  size_t worker_count) {
16  return std::shared_ptr<ConcurrentMessageLoop>{
17  new ConcurrentMessageLoop(worker_count)};
18 }
19 
20 ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
21  : worker_count_(std::max<size_t>(worker_count, 1ul)) {
22  for (size_t i = 0; i < worker_count_; ++i) {
23  workers_.emplace_back([i, this]() {
25  std::string{"io.flutter.worker." + std::to_string(i + 1)});
26  WorkerMain();
27  });
28  }
29 
30  for (const auto& worker : workers_) {
31  worker_thread_ids_.emplace_back(worker.get_id());
32  }
33 }
34 
36  Terminate();
37  for (auto& worker : workers_) {
38  worker.join();
39  }
40 }
41 
43  return worker_count_;
44 }
45 
46 std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
47  return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
48 }
49 
50 void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
51  if (!task) {
52  return;
53  }
54 
55  std::unique_lock lock(tasks_mutex_);
56 
57  // Don't just drop tasks on the floor in case of shutdown.
58  if (shutdown_) {
59  FML_DLOG(WARNING)
60  << "Tried to post a task to shutdown concurrent message "
61  "loop. The task will be executed on the callers thread.";
62  lock.unlock();
63  task();
64  return;
65  }
66 
67  tasks_.push(task);
68 
69  // Unlock the mutex before notifying the condition variable because that mutex
70  // has to be acquired on the other thread anyway. Waiting in this scope till
71  // it is acquired there is a pessimization.
72  lock.unlock();
73 
74  tasks_condition_.notify_one();
75 }
76 
77 void ConcurrentMessageLoop::WorkerMain() {
78  while (true) {
79  std::unique_lock lock(tasks_mutex_);
80  tasks_condition_.wait(lock, [&]() {
81  return tasks_.size() > 0 || shutdown_ || HasThreadTasksLocked();
82  });
83 
84  // Shutdown cannot be read with the task mutex unlocked.
85  bool shutdown_now = shutdown_;
86  fml::closure task;
87  std::vector<fml::closure> thread_tasks;
88 
89  if (tasks_.size() != 0) {
90  task = tasks_.front();
91  tasks_.pop();
92  }
93 
94  if (HasThreadTasksLocked()) {
95  thread_tasks = GetThreadTasksLocked();
96  FML_DCHECK(!HasThreadTasksLocked());
97  }
98 
99  // Don't hold onto the mutex while tasks are being executed as they could
100  // themselves try to post more tasks to the message loop.
101  lock.unlock();
102 
103  TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
104  // Execute the primary task we woke up for.
105  if (task) {
106  task();
107  }
108 
109  // Execute any thread tasks.
110  for (const auto& thread_task : thread_tasks) {
111  thread_task();
112  }
113 
114  if (shutdown_now) {
115  break;
116  }
117  }
118 }
119 
121  std::scoped_lock lock(tasks_mutex_);
122  shutdown_ = true;
123  tasks_condition_.notify_all();
124 }
125 
127  if (!task) {
128  return;
129  }
130 
131  std::scoped_lock lock(tasks_mutex_);
132  for (const auto& worker_thread_id : worker_thread_ids_) {
133  thread_tasks_[worker_thread_id].emplace_back(task);
134  }
135  tasks_condition_.notify_all();
136 }
137 
138 bool ConcurrentMessageLoop::HasThreadTasksLocked() const {
139  return thread_tasks_.count(std::this_thread::get_id()) > 0;
140 }
141 
142 std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
143  auto found = thread_tasks_.find(std::this_thread::get_id());
144  FML_DCHECK(found != thread_tasks_.end());
145  std::vector<fml::closure> pending_tasks;
146  std::swap(pending_tasks, found->second);
147  thread_tasks_.erase(found);
148  return pending_tasks;
149 }
150 
152  std::weak_ptr<ConcurrentMessageLoop> weak_loop)
153  : weak_loop_(std::move(weak_loop)) {}
154 
156 
158  if (!task) {
159  return;
160  }
161 
162  if (auto loop = weak_loop_.lock()) {
163  loop->PostTask(task);
164  return;
165  }
166 
167  FML_DLOG(WARNING)
168  << "Tried to post to a concurrent message loop that has already died. "
169  "Executing the task on the callers thread.";
170  task();
171 }
172 
173 } // namespace fml
#define TRACE_EVENT0(category_group, name)
Definition: trace_event.h:75
#define FML_DCHECK(condition)
Definition: logging.h:86
std::shared_ptr< ConcurrentTaskRunner > GetTaskRunner()
Definition: ref_ptr.h:252
void PostTask(const fml::closure &task)
void swap(scoped_nsprotocol< C > &p1, scoped_nsprotocol< C > &p2)
Definition: ascii_trie.cc:9
void PostTaskToAllWorkers(fml::closure task)
std::function< void()> closure
Definition: closure.h:14
static std::shared_ptr< ConcurrentMessageLoop > Create(size_t worker_count=std::thread::hardware_concurrency())
ConcurrentTaskRunner(std::weak_ptr< ConcurrentMessageLoop > weak_loop)
static void SetCurrentThreadName(const std::string &name)
Definition: thread.cc:70
#define FML_DLOG(severity)
Definition: logging.h:85