5#include "flutter/fml/concurrent_message_loop.h"
9#include "flutter/fml/thread.h"
10#include "flutter/fml/trace_event.h"
15 : worker_count_(
std::
max<size_t>(worker_count, 1ul)) {
16 for (
size_t i = 0;
i < worker_count_; ++
i) {
17 workers_.emplace_back([
i,
this]() {
24 for (
const auto& worker : workers_) {
25 worker_thread_ids_.emplace_back(worker.get_id());
31 for (
auto& worker : workers_) {
42 return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
45void ConcurrentMessageLoop::PostTask(
const fml::closure& task) {
50 std::unique_lock lock(tasks_mutex_);
55 <<
"Tried to post a task to shutdown concurrent message "
56 "loop. The task will be executed on the callers thread.";
69 tasks_condition_.notify_one();
72void ConcurrentMessageLoop::WorkerMain() {
74 std::unique_lock lock(tasks_mutex_);
75 tasks_condition_.wait(lock, [&]() {
76 return !tasks_.empty() || shutdown_ || HasThreadTasksLocked();
80 bool shutdown_now = shutdown_;
82 std::vector<fml::closure> thread_tasks;
84 if (!tasks_.empty()) {
85 task = tasks_.front();
89 if (HasThreadTasksLocked()) {
90 thread_tasks = GetThreadTasksLocked();
105 for (
const auto& thread_task : thread_tasks) {
120 std::scoped_lock lock(tasks_mutex_);
122 tasks_condition_.notify_all();
130 std::scoped_lock lock(tasks_mutex_);
131 for (
const auto& worker_thread_id : worker_thread_ids_) {
132 thread_tasks_[worker_thread_id].emplace_back(task);
134 tasks_condition_.notify_all();
137bool ConcurrentMessageLoop::HasThreadTasksLocked()
const {
138 return thread_tasks_.count(std::this_thread::get_id()) > 0;
141std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
142 auto found = thread_tasks_.find(std::this_thread::get_id());
144 std::vector<fml::closure> pending_tasks;
146 thread_tasks_.erase(found);
147 return pending_tasks;
151 std::weak_ptr<ConcurrentMessageLoop> weak_loop)
152 : weak_loop_(
std::move(weak_loop)) {}
161 if (
auto loop = weak_loop_.lock()) {
162 loop->PostTask(task);
167 <<
"Tried to post to a concurrent message loop that has already died. "
168 "Executing the task on the callers thread.";
173 std::scoped_lock lock(tasks_mutex_);
174 for (
const auto& worker_thread_id : worker_thread_ids_) {
175 if (worker_thread_id == std::this_thread::get_id()) {
void swap(sk_sp< T > &a, sk_sp< T > &b)
size_t GetWorkerCount() const
virtual ~ConcurrentMessageLoop()
virtual void ExecuteTask(const fml::closure &task)
bool RunsTasksOnCurrentThread()
void PostTaskToAllWorkers(const fml::closure &task)
std::shared_ptr< ConcurrentTaskRunner > GetTaskRunner()
ConcurrentMessageLoop(size_t worker_count)
void PostTask(const fml::closure &task) override
ConcurrentTaskRunner(std::weak_ptr< ConcurrentMessageLoop > weak_loop)
virtual ~ConcurrentTaskRunner()
static void SetCurrentThreadName(const ThreadConfig &config)
#define FML_DLOG(severity)
#define FML_DCHECK(condition)
static float max(float r, float g, float b)
std::function< void()> closure
static SkString to_string(int n)
The ThreadConfig is the thread info include thread name, thread priority.
#define TRACE_EVENT0(category_group, name)