14 worker_timeout_millis,
16 "Free workers when they have been idle for this amount of time.");
19 int64_t worker_timeout_micros =
21 if (worker_timeout_micros <= 0) {
26 if (waited >= worker_timeout_micros) {
32 return worker_timeout_micros - waited;
38 : all_workers_dead_(
false), max_pool_size_(max_pool_size) {}
49 shutting_down_ =
true;
53 all_workers_dead_ =
true;
64 while (!all_workers_dead_) {
69 ASSERT(count_running_ == 0);
76 ObtainDeadWorkersLocked(&dead_workers_to_join);
78 JoinDeadWorkersLocked(&dead_workers_to_join);
84bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
85 Worker* new_worker =
nullptr;
91 new_worker = ScheduleTaskLocked(&ml, std::move(task));
93 if (new_worker !=
nullptr) {
94 new_worker->StartThread();
102 return worker !=
nullptr && worker->pool_ ==
this;
108 Worker* new_worker =
nullptr;
109 if (worker !=
nullptr) {
111 ASSERT(!worker->is_blocked_);
112 worker->is_blocked_ =
true;
113 if (max_pool_size_ > 0) {
119 if (idle_workers_.
IsEmpty() && pending_tasks_ > 0) {
120 new_worker =
new Worker(
this);
121 idle_workers_.
Append(new_worker);
126 if (new_worker !=
nullptr) {
127 new_worker->StartThread();
134 if (worker !=
nullptr) {
136 if (worker->is_blocked_) {
137 worker->is_blocked_ =
false;
138 if (max_pool_size_ > 0) {
140 ASSERT(max_pool_size_ > 0);
146void ThreadPool::WorkerLoop(Worker* worker) {
147 WorkerList dead_workers_to_join;
153 IdleToRunningLocked(worker);
162 RunningToIdleLocked(worker);
165 if (running_workers_.
IsEmpty()) {
173 if (shutting_down_) {
174 ObtainDeadWorkersLocked(&dead_workers_to_join);
175 IdleToDeadLocked(worker);
194 ObtainDeadWorkersLocked(&dead_workers_to_join);
195 IdleToDeadLocked(worker);
204 JoinDeadWorkersLocked(&dead_workers_to_join);
207void ThreadPool::IdleToRunningLocked(Worker* worker) {
209 idle_workers_.
Remove(worker);
210 running_workers_.
Append(worker);
215void ThreadPool::RunningToIdleLocked(Worker* worker) {
219 running_workers_.
Remove(worker);
220 idle_workers_.
Append(worker);
225void ThreadPool::IdleToDeadLocked(Worker* worker) {
229 idle_workers_.
Remove(worker);
230 dead_workers_.
Append(worker);
235 if (shutting_down_) {
237 all_workers_dead_ =
true;
238 MonitorLocker eml(&exit_monitor_);
244void ThreadPool::ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join) {
245 dead_workers_to_join->AppendList(&dead_workers_);
250void ThreadPool::JoinDeadWorkersLocked(WorkerList* dead_workers_to_join) {
251 auto it = dead_workers_to_join->begin();
252 while (it != dead_workers_to_join->end()) {
253 Worker* worker = *it;
254 it = dead_workers_to_join->Erase(it);
259 ASSERT(dead_workers_to_join->IsEmpty());
262ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
263 std::unique_ptr<Task> task) {
265 tasks_.
Append(task.release());
267 ASSERT(pending_tasks_ >= 1);
270 if (count_idle_ >= pending_tasks_) {
278 if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
279 if (!idle_workers_.
IsEmpty()) {
286 auto new_worker =
new Worker(
this);
287 idle_workers_.
Append(new_worker);
292ThreadPool::Worker::Worker(ThreadPool*
pool)
293 : pool_(
pool), join_id_(OSThread::kInvalidThreadJoinId) {}
295void ThreadPool::Worker::StartThread() {
297 reinterpret_cast<uword>(
this));
299 FATAL(
"Could not start worker thread: result = %d.",
result);
307 if (start_cb !=
nullptr) {
312 ASSERT(os_thread !=
nullptr);
314 Worker* worker =
reinterpret_cast<Worker*
>(
args);
315 ThreadPool*
pool = worker->pool_;
317 os_thread->owning_thread_pool_worker_ = worker;
318 worker->os_thread_ = os_thread;
325 MonitorLocker ml(&
pool->pool_monitor_);
326 ASSERT(
pool->idle_workers_.ContainsForDebugging(worker));
330 pool->WorkerLoop(worker);
332 worker->os_thread_ =
nullptr;
333 os_thread->owning_thread_pool_worker_ =
nullptr;
338 if (exit_cb !=
nullptr) {
static void done(const char *config, const char *src, const char *srcOptions, const char *name)
static Dart_ThreadStartCallback thread_start_callback()
static Dart_ThreadExitCallback thread_exit_callback()
bool ContainsForDebugging(const T *a)
static Isolate * Current()
Monitor::WaitResult Wait(int64_t millis=Monitor::kNoTimeout)
static int Start(const char *name, ThreadStartFunction function, uword parameter)
static void Join(ThreadJoinId id)
static OSThread * Current()
static ThreadJoinId GetCurrentThreadJoinId(OSThread *thread)
static int64_t GetCurrentMonotonicMicros()
ThreadPool(uintptr_t max_pool_size=0)
void MarkCurrentWorkerAsBlocked()
virtual void OnEnterIdleLocked(MonitorLocker *ml)
bool CurrentThreadIsWorker()
void MarkCurrentWorkerAsUnBlocked()
void(* Dart_ThreadStartCallback)(void)
void(* Dart_ThreadExitCallback)(void)
G_BEGIN_DECLS G_MODULE_EXPORT FlValue * args
int Main(int argc, char **argv)
constexpr intptr_t kMicrosecondsPerMillisecond
DEFINE_FLAG(bool, print_cluster_information, false, "Print information about clusters written to snapshot")
static int64_t ComputeTimeout(int64_t idle_start)