Flutter Engine
The Flutter Engine
concurrent_message_loop.cc
Go to the documentation of this file.
1// Copyright 2013 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "flutter/fml/concurrent_message_loop.h"
6
7#include <algorithm>
8
9#include "flutter/fml/thread.h"
10#include "flutter/fml/trace_event.h"
11
12namespace fml {
13
15 : worker_count_(std::max<size_t>(worker_count, 1ul)) {
16 for (size_t i = 0; i < worker_count_; ++i) {
17 workers_.emplace_back([i, this]() {
19 std::string{"io.worker." + std::to_string(i + 1)}));
20 WorkerMain();
21 });
22 }
23
24 for (const auto& worker : workers_) {
25 worker_thread_ids_.emplace_back(worker.get_id());
26 }
27}
28
30 Terminate();
31 for (auto& worker : workers_) {
32 FML_DCHECK(worker.joinable());
33 worker.join();
34 }
35}
36
38 return worker_count_;
39}
40
41std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
42 return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
43}
44
45void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
46 if (!task) {
47 return;
48 }
49
50 std::unique_lock lock(tasks_mutex_);
51
52 // Don't just drop tasks on the floor in case of shutdown.
53 if (shutdown_) {
54 FML_DLOG(WARNING)
55 << "Tried to post a task to shutdown concurrent message "
56 "loop. The task will be executed on the callers thread.";
57 lock.unlock();
58 ExecuteTask(task);
59 return;
60 }
61
62 tasks_.push(task);
63
64 // Unlock the mutex before notifying the condition variable because that mutex
65 // has to be acquired on the other thread anyway. Waiting in this scope till
66 // it is acquired there is a pessimization.
67 lock.unlock();
68
69 tasks_condition_.notify_one();
70}
71
72void ConcurrentMessageLoop::WorkerMain() {
73 while (true) {
74 std::unique_lock lock(tasks_mutex_);
75 tasks_condition_.wait(lock, [&]() {
76 return !tasks_.empty() || shutdown_ || HasThreadTasksLocked();
77 });
78
79 // Shutdown cannot be read with the task mutex unlocked.
80 bool shutdown_now = shutdown_;
81 fml::closure task;
82 std::vector<fml::closure> thread_tasks;
83
84 if (!tasks_.empty()) {
85 task = tasks_.front();
86 tasks_.pop();
87 }
88
89 if (HasThreadTasksLocked()) {
90 thread_tasks = GetThreadTasksLocked();
91 FML_DCHECK(!HasThreadTasksLocked());
92 }
93
94 // Don't hold onto the mutex while tasks are being executed as they could
95 // themselves try to post more tasks to the message loop.
96 lock.unlock();
97
98 TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
99 // Execute the primary task we woke up for.
100 if (task) {
101 ExecuteTask(task);
102 }
103
104 // Execute any thread tasks.
105 for (const auto& thread_task : thread_tasks) {
106 ExecuteTask(thread_task);
107 }
108
109 if (shutdown_now) {
110 break;
111 }
112 }
113}
114
116 task();
117}
118
120 std::scoped_lock lock(tasks_mutex_);
121 shutdown_ = true;
122 tasks_condition_.notify_all();
123}
124
126 if (!task) {
127 return;
128 }
129
130 std::scoped_lock lock(tasks_mutex_);
131 for (const auto& worker_thread_id : worker_thread_ids_) {
132 thread_tasks_[worker_thread_id].emplace_back(task);
133 }
134 tasks_condition_.notify_all();
135}
136
137bool ConcurrentMessageLoop::HasThreadTasksLocked() const {
138 return thread_tasks_.count(std::this_thread::get_id()) > 0;
139}
140
141std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
142 auto found = thread_tasks_.find(std::this_thread::get_id());
143 FML_DCHECK(found != thread_tasks_.end());
144 std::vector<fml::closure> pending_tasks;
145 std::swap(pending_tasks, found->second);
146 thread_tasks_.erase(found);
147 return pending_tasks;
148}
149
151 std::weak_ptr<ConcurrentMessageLoop> weak_loop)
152 : weak_loop_(std::move(weak_loop)) {}
153
155
157 if (!task) {
158 return;
159 }
160
161 if (auto loop = weak_loop_.lock()) {
162 loop->PostTask(task);
163 return;
164 }
165
166 FML_DLOG(WARNING)
167 << "Tried to post to a concurrent message loop that has already died. "
168 "Executing the task on the callers thread.";
169 task();
170}
171
173 std::scoped_lock lock(tasks_mutex_);
174 for (const auto& worker_thread_id : worker_thread_ids_) {
175 if (worker_thread_id == std::this_thread::get_id()) {
176 return true;
177 }
178 }
179 return false;
180}
181
182} // namespace fml
void swap(sk_sp< T > &a, sk_sp< T > &b)
Definition: SkRefCnt.h:341
virtual void ExecuteTask(const fml::closure &task)
void PostTaskToAllWorkers(const fml::closure &task)
std::shared_ptr< ConcurrentTaskRunner > GetTaskRunner()
ConcurrentMessageLoop(size_t worker_count)
void PostTask(const fml::closure &task) override
ConcurrentTaskRunner(std::weak_ptr< ConcurrentMessageLoop > weak_loop)
static void SetCurrentThreadName(const ThreadConfig &config)
Definition: thread.cc:135
#define FML_DLOG(severity)
Definition: logging.h:102
#define FML_DCHECK(condition)
Definition: logging.h:103
static float max(float r, float g, float b)
Definition: hsl.cpp:49
Definition: ascii_trie.cc:9
std::function< void()> closure
Definition: closure.h:14
Definition: ref_ptr.h:256
static SkString to_string(int n)
Definition: nanobench.cpp:119
The ThreadConfig is the thread info include thread name, thread priority.
Definition: thread.h:35
#define TRACE_EVENT0(category_group, name)
Definition: trace_event.h:131