Flutter Engine
The Flutter Engine
Loading...
Searching...
No Matches
message_handler.cc
Go to the documentation of this file.
1// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include <utility>
6
8
9#include "vm/dart.h"
10#include "vm/heap/safepoint.h"
11#include "vm/isolate.h"
12#include "vm/lockers.h"
13#include "vm/object.h"
14#include "vm/object_store.h"
15#include "vm/os.h"
16#include "vm/port.h"
18
19namespace dart {
20
21DECLARE_FLAG(bool, trace_service_pause_events);
22
24 public:
25 explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
26 ASSERT(handler != nullptr);
27 }
28
29 virtual void Run() {
30 ASSERT(handler_ != nullptr);
31 handler_->TaskCallback();
32 }
33
34 private:
35 MessageHandler* handler_;
36
38};
39
40// static
42 switch (status) {
43 case kOK:
44 return "OK";
45 case kError:
46 return "Error";
47 case kShutdown:
48 return "Shutdown";
49 default:
51 return "Illegal";
52 }
53}
54
56 : queue_(new MessageQueue()),
57 oob_queue_(new MessageQueue()),
58 oob_message_handling_allowed_(true),
59 paused_for_messages_(false),
60 paused_(0),
61#if !defined(PRODUCT)
62 should_pause_on_start_(false),
63 should_pause_on_exit_(false),
64 is_paused_on_start_(false),
65 is_paused_on_exit_(false),
66 remembered_paused_on_exit_status_(kOK),
67 paused_timestamp_(-1),
68#endif
69 task_running_(false),
70 delete_me_(false),
71 pool_(nullptr),
72 start_callback_(nullptr),
73 end_callback_(nullptr),
74 callback_data_(0) {
75 ASSERT(queue_ != nullptr);
76 ASSERT(oob_queue_ != nullptr);
77}
78
80 delete queue_;
81 delete oob_queue_;
82 queue_ = nullptr;
83 oob_queue_ = nullptr;
84 pool_ = nullptr;
85}
86
87const char* MessageHandler::name() const {
88 return "<unnamed>";
89}
90
91#if defined(DEBUG)
92void MessageHandler::CheckAccess() const {
93 // By default there is no checking.
94}
95#endif
96
98 // By default, there is no custom message notification.
99}
100
102 StartCallback start_callback,
103 EndCallback end_callback,
105 MonitorLocker ml(&monitor_);
106 if (FLAG_trace_isolates) {
108 "[+] Starting message handler:\n"
109 "\thandler: %s\n",
110 name());
111 }
112 ASSERT(pool_ == nullptr);
113 ASSERT(!delete_me_);
114 pool_ = pool;
115 start_callback_ = start_callback;
116 end_callback_ = end_callback;
117 callback_data_ = data;
118 task_running_ = true;
119 bool result = pool_->Run<MessageHandlerTask>(this);
120 if (!result) {
121 pool_ = nullptr;
122 start_callback_ = nullptr;
123 end_callback_ = nullptr;
124 callback_data_ = 0;
125 task_running_ = false;
126 }
127 return result;
128}
129
130void MessageHandler::PostMessage(std::unique_ptr<Message> message,
131 bool before_events) {
132 Message::Priority saved_priority;
133
134 {
135 MonitorLocker ml(&monitor_);
136 if (FLAG_trace_isolates) {
137 Isolate* source_isolate = Isolate::Current();
138 if (source_isolate != nullptr) {
140 "[>] Posting message:\n"
141 "\tlen: %" Pd "\n\tsource: (%" Pd64
142 ") %s\n\tdest: %s\n"
143 "\tdest_port: %" Pd64 "\n",
144 message->Size(), static_cast<int64_t>(source_isolate->main_port()),
145 source_isolate->name(), name(), message->dest_port());
146 } else {
148 "[>] Posting message:\n"
149 "\tlen: %" Pd
150 "\n\tsource: <native code>\n"
151 "\tdest: %s\n"
152 "\tdest_port: %" Pd64 "\n",
153 message->Size(), name(), message->dest_port());
154 }
155 }
156
157 saved_priority = message->priority();
158 if (message->IsOOB()) {
159 oob_queue_->Enqueue(std::move(message), before_events);
160 } else {
161 queue_->Enqueue(std::move(message), before_events);
162 }
163 if (paused_for_messages_) {
164 ml.Notify();
165 }
166
167 if (pool_ != nullptr && !task_running_) {
168 ASSERT(!delete_me_);
169 task_running_ = true;
170 const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
171 ASSERT(launched_successfully);
172 }
173 }
174
175 // Invoke any custom message notification.
176 MessageNotify(saved_priority);
177}
178
179std::unique_ptr<Message> MessageHandler::DequeueMessage(
180 Message::Priority min_priority) {
181 // TODO(turnidge): Add assert that monitor_ is held here.
182 std::unique_ptr<Message> message = oob_queue_->Dequeue();
183 if ((message == nullptr) && (min_priority < Message::kOOBPriority)) {
184 message = queue_->Dequeue();
185 }
186 return message;
187}
188
189void MessageHandler::ClearOOBQueue() {
190 oob_queue_->Clear();
191}
192
193MessageHandler::MessageStatus MessageHandler::HandleMessages(
194 MonitorLocker* ml,
195 bool allow_normal_messages,
196 bool allow_multiple_normal_messages) {
197 ASSERT(monitor_.IsOwnedByCurrentThread());
198
199 // Scheduling of the mutator thread during the isolate start can cause this
200 // thread to safepoint.
201 // We want to avoid holding the message handler monitor during the safepoint
202 // operation to avoid possible deadlocks, which can occur if other threads are
203 // sending messages to this message handler.
204 //
205 // If isolate() returns nullptr [StartIsolateScope] does nothing.
206 ml->Exit();
207 StartIsolateScope start_isolate(isolate());
208 ml->Enter();
209
210 auto idle_time_handler =
211 isolate() != nullptr ? isolate()->group()->idle_time_handler() : nullptr;
212
213 MessageStatus max_status = kOK;
214 Message::Priority min_priority =
215 ((allow_normal_messages && !paused()) ? Message::kNormalPriority
216 : Message::kOOBPriority);
217 std::unique_ptr<Message> message = DequeueMessage(min_priority);
218 while (message != nullptr) {
219 intptr_t message_len = message->Size();
220 if (FLAG_trace_isolates) {
222 "[<] Handling message:\n"
223 "\tlen: %" Pd
224 "\n"
225 "\thandler: %s\n"
226 "\tport: %" Pd64 "\n",
227 message_len, name(), message->dest_port());
228 }
229
230 // Release the monitor_ temporarily while we handle the message.
231 // The monitor was acquired in MessageHandler::TaskCallback().
232 ml->Exit();
233 Message::Priority saved_priority = message->priority();
234 Dart_Port saved_dest_port = message->dest_port();
235 MessageStatus status = kOK;
236 {
237 DisableIdleTimerScope disable_idle_timer(idle_time_handler);
238 status = HandleMessage(std::move(message));
239 }
240 if (status > max_status) {
241 max_status = status;
242 }
243 ml->Enter();
244 if (FLAG_trace_isolates) {
246 "[.] Message handled (%s):\n"
247 "\tlen: %" Pd
248 "\n"
249 "\thandler: %s\n"
250 "\tport: %" Pd64 "\n",
251 MessageStatusString(status), message_len, name(), saved_dest_port);
252 }
253 // If we are shutting down, do not process any more messages.
254 if (status == kShutdown) {
255 ClearOOBQueue();
256 break;
257 }
258
259 // Remember time since the last message. Don't consider OOB messages so
260 // using Observatory doesn't trigger additional idle tasks.
261 if ((FLAG_idle_timeout_micros != 0) &&
262 (saved_priority == Message::kNormalPriority)) {
263 if (idle_time_handler != nullptr) {
264 idle_time_handler->UpdateStartIdleTime();
265 }
266 }
267
268 // Some callers want to process only one normal message and then quit. At
269 // the same time it is OK to process multiple OOB messages.
270 if ((saved_priority == Message::kNormalPriority) &&
271 !allow_multiple_normal_messages) {
272 // We processed one normal message. Allow no more.
273 allow_normal_messages = false;
274 }
275
276 // Reevaluate the minimum allowable priority. The paused state
277 // may have changed as part of handling the message. We may also
278 // have encountered an error during message processing.
279 //
280 // Even if we encounter an error, we still process pending OOB
281 // messages so that we don't lose the message notification.
282 min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
284 : Message::kOOBPriority);
285 message = DequeueMessage(min_priority);
286 }
287 return max_status;
288}
289
291 // We can only call HandleNextMessage when this handler is not
292 // assigned to a thread pool.
293 MonitorLocker ml(&monitor_);
294 ASSERT(pool_ == nullptr);
295 ASSERT(!delete_me_);
296#if defined(DEBUG)
297 CheckAccess();
298#endif
299 return HandleMessages(&ml, true, false);
300}
301
303 int64_t timeout_millis) {
304 MonitorLocker ml(&monitor_, /*no_safepoint_scope=*/false);
305 ASSERT(task_running_);
306 ASSERT(!delete_me_);
307#if defined(DEBUG)
308 CheckAccess();
309#endif
310 paused_for_messages_ = true;
311 while (queue_->IsEmpty() && oob_queue_->IsEmpty()) {
313 {
314 // Ensure this thread is at a safepoint while we wait for new messages to
315 // arrive.
317 wr = ml.Wait(timeout_millis);
318 }
319 ASSERT(task_running_);
320 ASSERT(!delete_me_);
321 if (wr == Monitor::kTimedOut) {
322 break;
323 }
324 if (queue_->IsEmpty()) {
325 // There are only OOB messages. Handle them and then continue waiting for
326 // normal messages unless there is an error.
327 MessageStatus status = HandleMessages(&ml, false, false);
328 if (status != kOK) {
329 paused_for_messages_ = false;
330 return status;
331 }
332 }
333 }
334 paused_for_messages_ = false;
335 return HandleMessages(&ml, true, true);
336}
337
339 if (!oob_message_handling_allowed_) {
340 return kOK;
341 }
342 MonitorLocker ml(&monitor_);
343 ASSERT(!delete_me_);
344#if defined(DEBUG)
345 CheckAccess();
346#endif
347 return HandleMessages(&ml, false, false);
348}
349
350#if !defined(PRODUCT)
352 Isolate* owning_isolate = isolate();
353 if (owning_isolate == nullptr) {
354 return false;
355 }
356 // If we are restarting or shutting down, we do not want to honor
357 // should_pause_on_start or should_pause_on_exit.
358 return (status != MessageHandler::kShutdown) && should_pause_on_start() &&
359 owning_isolate->is_runnable();
360}
361
363 Isolate* owning_isolate = isolate();
364 if (owning_isolate == nullptr) {
365 return false;
366 }
367 return (status != MessageHandler::kShutdown) && should_pause_on_exit() &&
368 owning_isolate->is_runnable();
369}
370#endif
371
373 MonitorLocker ml(&monitor_);
374 return !oob_queue_->IsEmpty();
375}
376
377#if defined(TESTING)
378std::unique_ptr<Message> MessageHandler::StealOOBMessage() {
379 MonitorLocker ml(&monitor_);
380 ASSERT(!oob_queue_->IsEmpty());
381 return oob_queue_->Dequeue();
382}
383#endif
384
386 MonitorLocker ml(&monitor_);
387 return !queue_->IsEmpty();
388}
389
390void MessageHandler::TaskCallback() {
391 ASSERT(Isolate::Current() == nullptr);
392 MessageStatus status = kOK;
393 bool run_end_callback = false;
394 bool delete_me = false;
395 EndCallback end_callback = nullptr;
396 CallbackData callback_data = 0;
397 {
398 // We will occasionally release and reacquire this monitor in this
399 // function. Whenever we reacquire the monitor we *must* process
400 // all pending OOB messages, or we may miss a request for vm
401 // shutdown.
402 MonitorLocker ml(&monitor_);
403
404 // This method is running on the message handler task. Which means no
405 // other message handler tasks will be started until this one sets
406 // [task_running_] to false.
407 ASSERT(task_running_);
408
409#if !defined(PRODUCT)
410 if (ShouldPauseOnStart(kOK)) {
411 if (!is_paused_on_start()) {
412 PausedOnStartLocked(&ml, true);
413 }
414 // More messages may have come in before we (re)acquired the monitor.
415 status = HandleMessages(&ml, false, false);
416 if (ShouldPauseOnStart(status)) {
417 // Still paused.
418 ASSERT(oob_queue_->IsEmpty());
419 task_running_ = false; // No task in queue.
420 return;
421 } else {
422 PausedOnStartLocked(&ml, false);
423 }
424 }
425 if (is_paused_on_exit()) {
426 status = HandleMessages(&ml, false, false);
427 if (ShouldPauseOnExit(status)) {
428 // Still paused.
429 ASSERT(oob_queue_->IsEmpty());
430 task_running_ = false; // No task in queue.
431 return;
432 } else {
433 PausedOnExitLocked(&ml, false);
434 if (status != kShutdown) {
435 status = remembered_paused_on_exit_status_;
436 }
437 }
438 }
439#endif // !defined(PRODUCT)
440
441 if (status == kOK) {
442 if (start_callback_ != nullptr) {
443 // Initialize the message handler by running its start function,
444 // if we have one. For an isolate, this will run the isolate's
445 // main() function.
446 //
447 // Release the monitor_ temporarily while we call the start callback.
448 ml.Exit();
449 status = start_callback_(callback_data_);
450 ASSERT(Isolate::Current() == nullptr);
451 start_callback_ = nullptr;
452 ml.Enter();
453 }
454
455 // Handle any pending messages for this message handler.
456 if (status != kShutdown) {
457 status = HandleMessages(&ml, (status == kOK), true);
458 }
459 }
460
461 // The isolate exits when it encounters an error or when it no
462 // longer has live ports.
463 if (status != kOK || !KeepAliveLocked()) {
464#if !defined(PRODUCT)
465 if (ShouldPauseOnExit(status)) {
466 if (FLAG_trace_service_pause_events) {
468 "Isolate %s paused before exiting. "
469 "Use the Observatory to release it.\n",
470 name());
471 }
472 remembered_paused_on_exit_status_ = status;
473 PausedOnExitLocked(&ml, true);
474 // More messages may have come in while we released the monitor.
475 status = HandleMessages(&ml, /*allow_normal_messages=*/false,
476 /*allow_multiple_normal_messagesfalse=*/false);
477 if (ShouldPauseOnExit(status)) {
478 // Still paused.
479 ASSERT(oob_queue_->IsEmpty());
480 task_running_ = false; // No task in queue.
481 return;
482 } else {
483 PausedOnExitLocked(&ml, false);
484 }
485 }
486#endif // !defined(PRODUCT)
487 if (FLAG_trace_isolates) {
488 if (status != kOK && thread() != nullptr) {
489 const Error& error = Error::Handle(thread()->sticky_error());
491 "[-] Stopping message handler (%s):\n"
492 "\thandler: %s\n"
493 "\terror: %s\n",
494 MessageStatusString(status), name(), error.ToCString());
495 } else {
497 "[-] Stopping message handler (%s):\n"
498 "\thandler: %s\n",
499 MessageStatusString(status), name());
500 }
501 }
502 pool_ = nullptr;
503 // Decide if we have a callback before releasing the monitor.
504 end_callback = end_callback_;
505 callback_data = callback_data_;
506 run_end_callback = end_callback_ != nullptr;
507 delete_me = delete_me_;
508 }
509
510 // Clear task_running_ last. This allows other tasks to potentially start
511 // for this message handler.
512 ASSERT(oob_queue_->IsEmpty());
513 task_running_ = false;
514 }
515
516 // The handler may have been deleted by another thread here if it is a native
517 // message handler.
518
519 // Message handlers either use delete_me or end_callback but not both.
520 ASSERT(!delete_me || !run_end_callback);
521
522 if (run_end_callback) {
523 ASSERT(end_callback != nullptr);
524 end_callback(callback_data);
525 // The handler may have been deleted after this point.
526 }
527 if (delete_me) {
528 delete this;
529 }
530}
531
533 if (FLAG_trace_isolates) {
534 MonitorLocker ml(&monitor_);
536 "[-] Closing port:\n"
537 "\thandler: %s\n"
538 "\tport: %" Pd64 "\n",
539 name(), port);
540 }
541}
542
544 MonitorLocker ml(&monitor_);
545 if (FLAG_trace_isolates) {
547 "[-] Closing all ports:\n"
548 "\thandler: %s\n",
549 name());
550 }
551 queue_->Clear();
552 oob_queue_->Clear();
553}
554
556 {
557 MonitorLocker ml(&monitor_);
558 if (task_running_) {
559 // This message handler currently has a task running on the thread pool.
560 delete_me_ = true;
561 return;
562 }
563 }
564
565 // This message handler has no current task. Delete it.
566 delete this;
567}
568
569#if !defined(PRODUCT)
573
575 MonitorLocker ml(&monitor_);
576 PausedOnStartLocked(&ml, paused);
577}
578
579void MessageHandler::PausedOnStartLocked(MonitorLocker* ml, bool paused) {
580 if (paused) {
581 ASSERT(!is_paused_on_start_);
582 ASSERT(paused_timestamp_ == -1);
583 paused_timestamp_ = OS::GetCurrentTimeMillis();
584 // Temporarily release the monitor when calling out to
585 // NotifyPauseOnStart. This avoids a dead lock that can occur
586 // when this message handler tries to post a message while a
587 // message is being posted to it.
588 ml->Exit();
590 ml->Enter();
591 is_paused_on_start_ = true;
592 } else {
593 ASSERT(is_paused_on_start_);
594 ASSERT(paused_timestamp_ != -1);
595 paused_timestamp_ = -1;
596 // Resumed. Clear the resume request of the owning isolate.
597 Isolate* owning_isolate = isolate();
598 if (owning_isolate != nullptr) {
599 owning_isolate->GetAndClearResumeRequest();
600 }
601 is_paused_on_start_ = false;
602 }
603}
604
606 MonitorLocker ml(&monitor_);
607 PausedOnExitLocked(&ml, paused);
608}
609
610void MessageHandler::PausedOnExitLocked(MonitorLocker* ml, bool paused) {
611 if (paused) {
612 ASSERT(!is_paused_on_exit_);
613 ASSERT(paused_timestamp_ == -1);
614 paused_timestamp_ = OS::GetCurrentTimeMillis();
615 // Temporarily release the monitor when calling out to
616 // NotifyPauseOnExit. This avoids a dead lock that can
617 // occur when this message handler tries to post a message
618 // while a message is being posted to it.
619 ml->Exit();
621 ml->Enter();
622 is_paused_on_exit_ = true;
623 } else {
624 ASSERT(is_paused_on_exit_);
625 ASSERT(paused_timestamp_ != -1);
626 paused_timestamp_ = -1;
627 // Resumed. Clear the resume request of the owning isolate.
628 Isolate* owning_isolate = isolate();
629 if (owning_isolate != nullptr) {
630 owning_isolate->GetAndClearResumeRequest();
631 }
632 is_paused_on_exit_ = false;
633 }
634}
635#endif // !defined(PRODUCT)
636
638 : handler_(handler), ml_(&handler->monitor_) {
639 ASSERT(handler != nullptr);
640 handler_->oob_message_handling_allowed_ = false;
641}
642
644 ASSERT(handler_ != nullptr);
645 handler_->oob_message_handling_allowed_ = true;
646}
647
648} // namespace dart
AutoreleasePool pool
#define UNREACHABLE()
Definition assert.h:248
IdleTimeHandler * idle_time_handler()
Definition isolate.h:317
static Isolate * Current()
Definition isolate.h:939
IsolateGroup * group() const
Definition isolate.h:990
bool is_runnable() const
Definition isolate.h:1048
Dart_Port main_port() const
Definition isolate.h:1001
const char * name() const
Definition isolate.h:996
MessageHandlerTask(MessageHandler *handler)
AcquiredQueues(MessageHandler *handler)
void PausedOnStart(bool paused)
virtual bool KeepAliveLocked()
void PostMessage(std::unique_ptr< Message > message, bool before_events=false)
void(* EndCallback)(CallbackData data)
void PausedOnExit(bool paused)
virtual void NotifyPauseOnExit()
virtual const char * name() const
bool ShouldPauseOnStart(MessageStatus status) const
Thread * thread() const
static const char * MessageStatusString(MessageStatus status)
bool Run(ThreadPool *pool, StartCallback start_callback, EndCallback end_callback, CallbackData data)
bool is_paused_on_start() const
virtual Isolate * isolate() const
virtual void NotifyPauseOnStart()
void ClosePort(Dart_Port port)
MessageStatus HandleNextMessage()
MessageStatus PauseAndHandleAllMessages(int64_t timeout_millis)
bool ShouldPauseOnExit(MessageStatus status) const
bool is_paused_on_exit() const
MessageStatus HandleOOBMessages()
bool should_pause_on_start() const
virtual MessageStatus HandleMessage(std::unique_ptr< Message > message)=0
bool should_pause_on_exit() const
virtual void MessageNotify(Message::Priority priority)
void Enqueue(std::unique_ptr< Message > msg, bool before_events)
Definition message.cc:98
std::unique_ptr< Message > Dequeue()
Definition message.cc:142
@ kNormalPriority
Definition message.h:28
Monitor::WaitResult Wait(int64_t millis=Monitor::kNoTimeout)
Definition lockers.h:172
void Enter() const
Definition lockers.h:155
void Exit() const
Definition lockers.h:163
bool IsOwnedByCurrentThread() const
Definition os_thread.h:370
static int64_t GetCurrentTimeMillis()
static void static void PrintErr(const char *format,...) PRINTF_ATTRIBUTE(1
static Object & Handle()
Definition object.h:407
static void DebugDumpForMessageHandler(MessageHandler *handler)
Definition port.cc:305
bool Run(Args &&... args)
Definition thread_pool.h:45
static Thread * Current()
Definition thread.h:361
int64_t Dart_Port
Definition dart_api.h:1524
#define ASSERT(E)
if(end==-1)
const uint8_t uint32_t uint32_t GError ** error
GAsyncResult * result
#define DECLARE_FLAG(type, name)
Definition flags.h:14
Win32Message message
static int8_t data[kExtLength]
#define Pd64
Definition globals.h:416
#define Pd
Definition globals.h:408
#define DISALLOW_COPY_AND_ASSIGN(TypeName)
Definition globals.h:581