Flutter Engine
The Flutter Engine
message_handler.cc
Go to the documentation of this file.
1// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include <utility>
6
8
9#include "vm/dart.h"
10#include "vm/heap/safepoint.h"
11#include "vm/isolate.h"
12#include "vm/lockers.h"
13#include "vm/object.h"
14#include "vm/object_store.h"
15#include "vm/os.h"
16#include "vm/port.h"
18
19namespace dart {
20
21DECLARE_FLAG(bool, trace_service_pause_events);
22
24 public:
25 explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
26 ASSERT(handler != nullptr);
27 }
28
29 virtual void Run() {
30 ASSERT(handler_ != nullptr);
31 handler_->TaskCallback();
32 }
33
34 private:
35 MessageHandler* handler_;
36
37 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
38};
39
40// static
42 switch (status) {
43 case kOK:
44 return "OK";
45 case kError:
46 return "Error";
47 case kShutdown:
48 return "Shutdown";
49 default:
51 return "Illegal";
52 }
53}
54
56 : queue_(new MessageQueue()),
57 oob_queue_(new MessageQueue()),
58 oob_message_handling_allowed_(true),
59 paused_for_messages_(false),
60 paused_(0),
61#if !defined(PRODUCT)
62 should_pause_on_start_(false),
63 should_pause_on_exit_(false),
64 is_paused_on_start_(false),
65 is_paused_on_exit_(false),
66 remembered_paused_on_exit_status_(kOK),
67 paused_timestamp_(-1),
68#endif
69 task_running_(false),
70 delete_me_(false),
71 pool_(nullptr),
72 start_callback_(nullptr),
73 end_callback_(nullptr),
74 callback_data_(0) {
75 ASSERT(queue_ != nullptr);
76 ASSERT(oob_queue_ != nullptr);
77}
78
80 delete queue_;
81 delete oob_queue_;
82 queue_ = nullptr;
83 oob_queue_ = nullptr;
84 pool_ = nullptr;
85}
86
87const char* MessageHandler::name() const {
88 return "<unnamed>";
89}
90
91#if defined(DEBUG)
92void MessageHandler::CheckAccess() const {
93 // By default there is no checking.
94}
95#endif
96
98 // By default, there is no custom message notification.
99}
100
102 StartCallback start_callback,
103 EndCallback end_callback,
105 MonitorLocker ml(&monitor_);
106 if (FLAG_trace_isolates) {
108 "[+] Starting message handler:\n"
109 "\thandler: %s\n",
110 name());
111 }
112 ASSERT(pool_ == nullptr);
113 ASSERT(!delete_me_);
114 pool_ = pool;
115 start_callback_ = start_callback;
116 end_callback_ = end_callback;
117 callback_data_ = data;
118 task_running_ = true;
119 bool result = pool_->Run<MessageHandlerTask>(this);
120 if (!result) {
121 pool_ = nullptr;
122 start_callback_ = nullptr;
123 end_callback_ = nullptr;
124 callback_data_ = 0;
125 task_running_ = false;
126 }
127 return result;
128}
129
130void MessageHandler::PostMessage(std::unique_ptr<Message> message,
131 bool before_events) {
132 Message::Priority saved_priority;
133
134 {
135 MonitorLocker ml(&monitor_);
136 if (FLAG_trace_isolates) {
137 Isolate* source_isolate = Isolate::Current();
138 if (source_isolate != nullptr) {
140 "[>] Posting message:\n"
141 "\tlen: %" Pd "\n\tsource: (%" Pd64
142 ") %s\n\tdest: %s\n"
143 "\tdest_port: %" Pd64 "\n",
144 message->Size(), static_cast<int64_t>(source_isolate->main_port()),
145 source_isolate->name(), name(), message->dest_port());
146 } else {
148 "[>] Posting message:\n"
149 "\tlen: %" Pd
150 "\n\tsource: <native code>\n"
151 "\tdest: %s\n"
152 "\tdest_port: %" Pd64 "\n",
153 message->Size(), name(), message->dest_port());
154 }
155 }
156
157 saved_priority = message->priority();
158 if (message->IsOOB()) {
159 oob_queue_->Enqueue(std::move(message), before_events);
160 } else {
161 queue_->Enqueue(std::move(message), before_events);
162 }
163 if (paused_for_messages_) {
164 ml.Notify();
165 }
166
167 if (pool_ != nullptr && !task_running_) {
168 ASSERT(!delete_me_);
169 task_running_ = true;
170 const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
171 ASSERT(launched_successfully);
172 }
173 }
174
175 // Invoke any custom message notification.
176 MessageNotify(saved_priority);
177}
178
179std::unique_ptr<Message> MessageHandler::DequeueMessage(
180 Message::Priority min_priority) {
181 // TODO(turnidge): Add assert that monitor_ is held here.
182 std::unique_ptr<Message> message = oob_queue_->Dequeue();
183 if ((message == nullptr) && (min_priority < Message::kOOBPriority)) {
184 message = queue_->Dequeue();
185 }
186 return message;
187}
188
189void MessageHandler::ClearOOBQueue() {
190 oob_queue_->Clear();
191}
192
193MessageHandler::MessageStatus MessageHandler::HandleMessages(
194 MonitorLocker* ml,
195 bool allow_normal_messages,
196 bool allow_multiple_normal_messages) {
197 ASSERT(monitor_.IsOwnedByCurrentThread());
198
199 // Scheduling of the mutator thread during the isolate start can cause this
200 // thread to safepoint.
201 // We want to avoid holding the message handler monitor during the safepoint
202 // operation to avoid possible deadlocks, which can occur if other threads are
203 // sending messages to this message handler.
204 //
205 // If isolate() returns nullptr [StartIsolateScope] does nothing.
206 ml->Exit();
207 StartIsolateScope start_isolate(isolate());
208 ml->Enter();
209
210 auto idle_time_handler =
211 isolate() != nullptr ? isolate()->group()->idle_time_handler() : nullptr;
212
213 MessageStatus max_status = kOK;
214 Message::Priority min_priority =
215 ((allow_normal_messages && !paused()) ? Message::kNormalPriority
217 std::unique_ptr<Message> message = DequeueMessage(min_priority);
218 while (message != nullptr) {
219 intptr_t message_len = message->Size();
220 if (FLAG_trace_isolates) {
222 "[<] Handling message:\n"
223 "\tlen: %" Pd
224 "\n"
225 "\thandler: %s\n"
226 "\tport: %" Pd64 "\n",
227 message_len, name(), message->dest_port());
228 }
229
230 // Release the monitor_ temporarily while we handle the message.
231 // The monitor was acquired in MessageHandler::TaskCallback().
232 ml->Exit();
233 Message::Priority saved_priority = message->priority();
234 Dart_Port saved_dest_port = message->dest_port();
235 MessageStatus status = kOK;
236 {
237 DisableIdleTimerScope disable_idle_timer(idle_time_handler);
238 status = HandleMessage(std::move(message));
239 }
240 if (status > max_status) {
241 max_status = status;
242 }
243 ml->Enter();
244 if (FLAG_trace_isolates) {
246 "[.] Message handled (%s):\n"
247 "\tlen: %" Pd
248 "\n"
249 "\thandler: %s\n"
250 "\tport: %" Pd64 "\n",
251 MessageStatusString(status), message_len, name(), saved_dest_port);
252 }
253 // If we are shutting down, do not process any more messages.
254 if (status == kShutdown) {
255 ClearOOBQueue();
256 break;
257 }
258
259 // Remember time since the last message. Don't consider OOB messages so
260 // using Observatory doesn't trigger additional idle tasks.
261 if ((FLAG_idle_timeout_micros != 0) &&
262 (saved_priority == Message::kNormalPriority)) {
263 if (idle_time_handler != nullptr) {
264 idle_time_handler->UpdateStartIdleTime();
265 }
266 }
267
268 // Some callers want to process only one normal message and then quit. At
269 // the same time it is OK to process multiple OOB messages.
270 if ((saved_priority == Message::kNormalPriority) &&
271 !allow_multiple_normal_messages) {
272 // We processed one normal message. Allow no more.
273 allow_normal_messages = false;
274 }
275
276 // Reevaluate the minimum allowable priority. The paused state
277 // may have changed as part of handling the message. We may also
278 // have encountered an error during message processing.
279 //
280 // Even if we encounter an error, we still process pending OOB
281 // messages so that we don't lose the message notification.
282 min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
285 message = DequeueMessage(min_priority);
286 }
287 return max_status;
288}
289
291 // We can only call HandleNextMessage when this handler is not
292 // assigned to a thread pool.
293 MonitorLocker ml(&monitor_);
294 ASSERT(pool_ == nullptr);
295 ASSERT(!delete_me_);
296#if defined(DEBUG)
297 CheckAccess();
298#endif
299 return HandleMessages(&ml, true, false);
300}
301
303 int64_t timeout_millis) {
304 MonitorLocker ml(&monitor_, /*no_safepoint_scope=*/false);
305 ASSERT(task_running_);
306 ASSERT(!delete_me_);
307#if defined(DEBUG)
308 CheckAccess();
309#endif
310 paused_for_messages_ = true;
311 while (queue_->IsEmpty() && oob_queue_->IsEmpty()) {
313 {
314 // Ensure this thread is at a safepoint while we wait for new messages to
315 // arrive.
317 wr = ml.Wait(timeout_millis);
318 }
319 ASSERT(task_running_);
320 ASSERT(!delete_me_);
321 if (wr == Monitor::kTimedOut) {
322 break;
323 }
324 if (queue_->IsEmpty()) {
325 // There are only OOB messages. Handle them and then continue waiting for
326 // normal messages unless there is an error.
327 MessageStatus status = HandleMessages(&ml, false, false);
328 if (status != kOK) {
329 paused_for_messages_ = false;
330 return status;
331 }
332 }
333 }
334 paused_for_messages_ = false;
335 return HandleMessages(&ml, true, true);
336}
337
339 if (!oob_message_handling_allowed_) {
340 return kOK;
341 }
342 MonitorLocker ml(&monitor_);
343 ASSERT(!delete_me_);
344#if defined(DEBUG)
345 CheckAccess();
346#endif
347 return HandleMessages(&ml, false, false);
348}
349
350#if !defined(PRODUCT)
352 Isolate* owning_isolate = isolate();
353 if (owning_isolate == nullptr) {
354 return false;
355 }
356 // If we are restarting or shutting down, we do not want to honor
357 // should_pause_on_start or should_pause_on_exit.
358 return (status != MessageHandler::kShutdown) && should_pause_on_start() &&
359 owning_isolate->is_runnable();
360}
361
363 Isolate* owning_isolate = isolate();
364 if (owning_isolate == nullptr) {
365 return false;
366 }
367 return (status != MessageHandler::kShutdown) && should_pause_on_exit() &&
368 owning_isolate->is_runnable();
369}
370#endif
371
373 MonitorLocker ml(&monitor_);
374 return !oob_queue_->IsEmpty();
375}
376
377#if defined(TESTING)
378std::unique_ptr<Message> MessageHandler::StealOOBMessage() {
379 MonitorLocker ml(&monitor_);
380 ASSERT(!oob_queue_->IsEmpty());
381 return oob_queue_->Dequeue();
382}
383#endif
384
386 MonitorLocker ml(&monitor_);
387 return !queue_->IsEmpty();
388}
389
390void MessageHandler::TaskCallback() {
391 ASSERT(Isolate::Current() == nullptr);
392 MessageStatus status = kOK;
393 bool run_end_callback = false;
394 bool delete_me = false;
395 EndCallback end_callback = nullptr;
396 CallbackData callback_data = 0;
397 {
398 // We will occasionally release and reacquire this monitor in this
399 // function. Whenever we reacquire the monitor we *must* process
400 // all pending OOB messages, or we may miss a request for vm
401 // shutdown.
402 MonitorLocker ml(&monitor_);
403
404 // This method is running on the message handler task. Which means no
405 // other message handler tasks will be started until this one sets
406 // [task_running_] to false.
407 ASSERT(task_running_);
408
409#if !defined(PRODUCT)
410 if (ShouldPauseOnStart(kOK)) {
411 if (!is_paused_on_start()) {
412 PausedOnStartLocked(&ml, true);
413 }
414 // More messages may have come in before we (re)acquired the monitor.
415 status = HandleMessages(&ml, false, false);
416 if (ShouldPauseOnStart(status)) {
417 // Still paused.
418 ASSERT(oob_queue_->IsEmpty());
419 task_running_ = false; // No task in queue.
420 return;
421 } else {
422 PausedOnStartLocked(&ml, false);
423 }
424 }
425 if (is_paused_on_exit()) {
426 status = HandleMessages(&ml, false, false);
427 if (ShouldPauseOnExit(status)) {
428 // Still paused.
429 ASSERT(oob_queue_->IsEmpty());
430 task_running_ = false; // No task in queue.
431 return;
432 } else {
433 PausedOnExitLocked(&ml, false);
434 if (status != kShutdown) {
435 status = remembered_paused_on_exit_status_;
436 }
437 }
438 }
439#endif // !defined(PRODUCT)
440
441 if (status == kOK) {
442 if (start_callback_ != nullptr) {
443 // Initialize the message handler by running its start function,
444 // if we have one. For an isolate, this will run the isolate's
445 // main() function.
446 //
447 // Release the monitor_ temporarily while we call the start callback.
448 ml.Exit();
449 status = start_callback_(callback_data_);
450 ASSERT(Isolate::Current() == nullptr);
451 start_callback_ = nullptr;
452 ml.Enter();
453 }
454
455 // Handle any pending messages for this message handler.
456 if (status != kShutdown) {
457 status = HandleMessages(&ml, (status == kOK), true);
458 }
459 }
460
461 // The isolate exits when it encounters an error or when it no
462 // longer has live ports.
463 if (status != kOK || !KeepAliveLocked()) {
464#if !defined(PRODUCT)
465 if (ShouldPauseOnExit(status)) {
466 if (FLAG_trace_service_pause_events) {
468 "Isolate %s paused before exiting. "
469 "Use the Observatory to release it.\n",
470 name());
471 }
472 remembered_paused_on_exit_status_ = status;
473 PausedOnExitLocked(&ml, true);
474 // More messages may have come in while we released the monitor.
475 status = HandleMessages(&ml, /*allow_normal_messages=*/false,
476 /*allow_multiple_normal_messagesfalse=*/false);
477 if (ShouldPauseOnExit(status)) {
478 // Still paused.
479 ASSERT(oob_queue_->IsEmpty());
480 task_running_ = false; // No task in queue.
481 return;
482 } else {
483 PausedOnExitLocked(&ml, false);
484 }
485 }
486#endif // !defined(PRODUCT)
487 if (FLAG_trace_isolates) {
488 if (status != kOK && thread() != nullptr) {
489 const Error& error = Error::Handle(thread()->sticky_error());
491 "[-] Stopping message handler (%s):\n"
492 "\thandler: %s\n"
493 "\terror: %s\n",
494 MessageStatusString(status), name(), error.ToCString());
495 } else {
497 "[-] Stopping message handler (%s):\n"
498 "\thandler: %s\n",
499 MessageStatusString(status), name());
500 }
501 }
502 pool_ = nullptr;
503 // Decide if we have a callback before releasing the monitor.
504 end_callback = end_callback_;
505 callback_data = callback_data_;
506 run_end_callback = end_callback_ != nullptr;
507 delete_me = delete_me_;
508 }
509
510 // Clear task_running_ last. This allows other tasks to potentially start
511 // for this message handler.
512 ASSERT(oob_queue_->IsEmpty());
513 task_running_ = false;
514 }
515
516 // The handler may have been deleted by another thread here if it is a native
517 // message handler.
518
519 // Message handlers either use delete_me or end_callback but not both.
520 ASSERT(!delete_me || !run_end_callback);
521
522 if (run_end_callback) {
523 ASSERT(end_callback != nullptr);
524 end_callback(callback_data);
525 // The handler may have been deleted after this point.
526 }
527 if (delete_me) {
528 delete this;
529 }
530}
531
533 if (FLAG_trace_isolates) {
534 MonitorLocker ml(&monitor_);
536 "[-] Closing port:\n"
537 "\thandler: %s\n"
538 "\tport: %" Pd64 "\n",
539 name(), port);
540 }
541}
542
544 MonitorLocker ml(&monitor_);
545 if (FLAG_trace_isolates) {
547 "[-] Closing all ports:\n"
548 "\thandler: %s\n",
549 name());
550 }
551 queue_->Clear();
552 oob_queue_->Clear();
553}
554
556 {
557 MonitorLocker ml(&monitor_);
558 if (task_running_) {
559 // This message handler currently has a task running on the thread pool.
560 delete_me_ = true;
561 return;
562 }
563 }
564
565 // This message handler has no current task. Delete it.
566 delete this;
567}
568
569#if !defined(PRODUCT)
572}
573
575 MonitorLocker ml(&monitor_);
576 PausedOnStartLocked(&ml, paused);
577}
578
579void MessageHandler::PausedOnStartLocked(MonitorLocker* ml, bool paused) {
580 if (paused) {
581 ASSERT(!is_paused_on_start_);
582 ASSERT(paused_timestamp_ == -1);
583 paused_timestamp_ = OS::GetCurrentTimeMillis();
584 // Temporarily release the monitor when calling out to
585 // NotifyPauseOnStart. This avoids a dead lock that can occur
586 // when this message handler tries to post a message while a
587 // message is being posted to it.
588 ml->Exit();
590 ml->Enter();
591 is_paused_on_start_ = true;
592 } else {
593 ASSERT(is_paused_on_start_);
594 ASSERT(paused_timestamp_ != -1);
595 paused_timestamp_ = -1;
596 // Resumed. Clear the resume request of the owning isolate.
597 Isolate* owning_isolate = isolate();
598 if (owning_isolate != nullptr) {
599 owning_isolate->GetAndClearResumeRequest();
600 }
601 is_paused_on_start_ = false;
602 }
603}
604
606 MonitorLocker ml(&monitor_);
607 PausedOnExitLocked(&ml, paused);
608}
609
610void MessageHandler::PausedOnExitLocked(MonitorLocker* ml, bool paused) {
611 if (paused) {
612 ASSERT(!is_paused_on_exit_);
613 ASSERT(paused_timestamp_ == -1);
614 paused_timestamp_ = OS::GetCurrentTimeMillis();
615 // Temporarily release the monitor when calling out to
616 // NotifyPauseOnExit. This avoids a dead lock that can
617 // occur when this message handler tries to post a message
618 // while a message is being posted to it.
619 ml->Exit();
621 ml->Enter();
622 is_paused_on_exit_ = true;
623 } else {
624 ASSERT(is_paused_on_exit_);
625 ASSERT(paused_timestamp_ != -1);
626 paused_timestamp_ = -1;
627 // Resumed. Clear the resume request of the owning isolate.
628 Isolate* owning_isolate = isolate();
629 if (owning_isolate != nullptr) {
630 owning_isolate->GetAndClearResumeRequest();
631 }
632 is_paused_on_exit_ = false;
633 }
634}
635#endif // !defined(PRODUCT)
636
638 : handler_(handler), ml_(&handler->monitor_) {
639 ASSERT(handler != nullptr);
640 handler_->oob_message_handling_allowed_ = false;
641}
642
644 ASSERT(handler_ != nullptr);
645 handler_->oob_message_handling_allowed_ = true;
646}
647
648} // namespace dart
AutoreleasePool pool
#define UNREACHABLE()
Definition: assert.h:248
IdleTimeHandler * idle_time_handler()
Definition: isolate.h:318
static Isolate * Current()
Definition: isolate.h:986
IsolateGroup * group() const
Definition: isolate.h:1037
bool is_runnable() const
Definition: isolate.h:1095
Dart_Port main_port() const
Definition: isolate.h:1048
const char * name() const
Definition: isolate.h:1043
MessageHandlerTask(MessageHandler *handler)
AcquiredQueues(MessageHandler *handler)
void PausedOnStart(bool paused)
virtual bool KeepAliveLocked()
void PostMessage(std::unique_ptr< Message > message, bool before_events=false)
void(* EndCallback)(CallbackData data)
void PausedOnExit(bool paused)
virtual void NotifyPauseOnExit()
virtual const char * name() const
bool ShouldPauseOnStart(MessageStatus status) const
Thread * thread() const
static const char * MessageStatusString(MessageStatus status)
bool Run(ThreadPool *pool, StartCallback start_callback, EndCallback end_callback, CallbackData data)
bool is_paused_on_start() const
virtual Isolate * isolate() const
virtual void NotifyPauseOnStart()
void ClosePort(Dart_Port port)
MessageStatus HandleNextMessage()
MessageStatus PauseAndHandleAllMessages(int64_t timeout_millis)
bool ShouldPauseOnExit(MessageStatus status) const
bool is_paused_on_exit() const
MessageStatus HandleOOBMessages()
bool should_pause_on_start() const
virtual MessageStatus HandleMessage(std::unique_ptr< Message > message)=0
bool should_pause_on_exit() const
virtual void MessageNotify(Message::Priority priority)
void Enqueue(std::unique_ptr< Message > msg, bool before_events)
Definition: message.cc:98
std::unique_ptr< Message > Dequeue()
Definition: message.cc:142
@ kNormalPriority
Definition: message.h:28
@ kOOBPriority
Definition: message.h:29
Monitor::WaitResult Wait(int64_t millis=Monitor::kNoTimeout)
Definition: lockers.h:172
void Enter() const
Definition: lockers.h:155
void Exit() const
Definition: lockers.h:163
bool IsOwnedByCurrentThread() const
Definition: os_thread.h:371
static int64_t GetCurrentTimeMillis()
static void static void PrintErr(const char *format,...) PRINTF_ATTRIBUTE(1
static Object & Handle()
Definition: object.h:407
static void DebugDumpForMessageHandler(MessageHandler *handler)
Definition: port.cc:305
bool Run(Args &&... args)
Definition: thread_pool.h:45
static Thread * Current()
Definition: thread.h:362
int64_t Dart_Port
Definition: dart_api.h:1525
#define ASSERT(E)
if(end==-1)
const uint8_t uint32_t uint32_t GError ** error
GAsyncResult * result
Win32Message message
Definition: dart_vm.cc:33
static int8_t data[kExtLength]
DECLARE_FLAG(bool, show_invisible_frames)
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
Definition: switches.h:87
#define Pd64
Definition: globals.h:416
#define Pd
Definition: globals.h:408