Flutter Engine
The Flutter Engine
message_handler_test.cc
Go to the documentation of this file.
1// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include <utility>
6
8#include "vm/port.h"
9#include "vm/unit_test.h"
10
11namespace dart {
12
14 public:
16 : handler_(handler) {}
17
18 void PostMessage(std::unique_ptr<Message> message) {
19 handler_->PostMessage(std::move(message));
20 }
21 void ClosePort(Dart_Port port) { handler_->ClosePort(port); }
22 void CloseAllPorts() { handler_->CloseAllPorts(); }
23
24 MessageQueue* queue() const { return handler_->queue_; }
25 MessageQueue* oob_queue() const { return handler_->oob_queue_; }
26
27 private:
28 MessageHandler* handler_;
29
30 DISALLOW_COPY_AND_ASSIGN(MessageHandlerTestPeer);
31};
32
34 public:
36 : port_buffer_(nullptr),
37 port_buffer_size_(0),
38 notify_count_(0),
39 message_count_(0),
40 start_called_(false),
41 end_called_(false),
42 results_(nullptr),
43 monitor_() {}
44
47 delete[] port_buffer_;
48 }
49
51 MonitorLocker ml(&monitor_);
52 notify_count_++;
53 ml.Notify();
54 }
55
56 MessageStatus HandleMessage(std::unique_ptr<Message> message) {
57 // For testing purposes, keep a list of the ports
58 // for all messages we receive.
59 MonitorLocker ml(&monitor_);
60 AddPortToBuffer(message->dest_port());
61 message_count_++;
62 MessageStatus status = kOK;
63 if (results_ != nullptr) {
64 status = results_[0];
65 results_++;
66 }
67 ml.Notify();
68 return status;
69 }
70
72 start_called_ = true;
73 return kOK;
74 }
75
76 void End() {
77 MonitorLocker ml(&monitor_);
78 end_called_ = true;
79 AddPortToBuffer(-2);
80 ml.Notify();
81 }
82
83 Dart_Port* port_buffer() const { return port_buffer_; }
84 int notify_count() const { return notify_count_; }
85 int message_count() const { return message_count_; }
86 bool start_called() const { return start_called_; }
87 bool end_called() const { return end_called_; }
88
89 void set_results(MessageStatus* results) { results_ = results; }
90
91 Monitor* monitor() { return &monitor_; }
92
93 private:
94 void AddPortToBuffer(Dart_Port port) {
95 if (port_buffer_ == nullptr) {
96 port_buffer_ = new Dart_Port[10];
97 port_buffer_size_ = 10;
98 } else if (message_count_ == port_buffer_size_) {
99 int new_port_buffer_size_ = 2 * port_buffer_size_;
100 Dart_Port* new_port_buffer_ = new Dart_Port[new_port_buffer_size_];
101 for (int i = 0; i < port_buffer_size_; i++) {
102 new_port_buffer_[i] = port_buffer_[i];
103 }
104 delete[] port_buffer_;
105 port_buffer_ = new_port_buffer_;
106 port_buffer_size_ = new_port_buffer_size_;
107 }
108 port_buffer_[message_count_] = port;
109 }
110
111 Dart_Port* port_buffer_;
112 int port_buffer_size_;
113 int notify_count_;
114 int message_count_;
115 bool start_called_;
116 bool end_called_;
117 MessageStatus* results_;
118 Monitor monitor_;
119
120 DISALLOW_COPY_AND_ASSIGN(TestMessageHandler);
121};
122
124 return (reinterpret_cast<TestMessageHandler*>(data))->Start();
125}
126
128 return (reinterpret_cast<TestMessageHandler*>(data))->End();
129}
130
131static std::unique_ptr<Message> BlankMessage(Dart_Port dest,
132 Message::Priority priority) {
133 return Message::New(dest, reinterpret_cast<uint8_t*>(malloc(1)), 1, nullptr,
134 priority);
135}
136
137VM_UNIT_TEST_CASE(MessageHandler_PostMessage) {
138 TestMessageHandler handler;
139 MessageHandlerTestPeer handler_peer(&handler);
140 EXPECT_EQ(0, handler.notify_count());
141
142 // Post a message.
143 std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority);
144 Message* raw_message = message.get();
145 handler_peer.PostMessage(std::move(message));
146
147 // The notify callback is called.
148 EXPECT_EQ(1, handler.notify_count());
149
150 // The message has been added to the correct queue.
151 EXPECT(raw_message == handler_peer.queue()->Dequeue().get());
152 EXPECT(nullptr == handler_peer.oob_queue()->Dequeue());
153
154 // Post an oob message.
156 raw_message = message.get();
157 handler_peer.PostMessage(std::move(message));
158
159 // The notify callback is called.
160 EXPECT_EQ(2, handler.notify_count());
161
162 // The message has been added to the correct queue.
163 EXPECT(raw_message == handler_peer.oob_queue()->Dequeue().get());
164 EXPECT(nullptr == handler_peer.queue()->Dequeue());
165}
166
167VM_UNIT_TEST_CASE(MessageHandler_HasOOBMessages) {
168 TestMessageHandler handler;
169 MessageHandlerTestPeer handler_peer(&handler);
170
171 EXPECT(!handler.HasOOBMessages());
172
173 // Post a normal message.
174 std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority);
175 handler_peer.PostMessage(std::move(message));
176 EXPECT(!handler.HasOOBMessages());
177 {
178 // Acquire ownership of message handler queues, verify one regular message.
180 EXPECT(aq.queue()->Length() == 1);
181 }
182
183 // Post an oob message.
185 handler_peer.PostMessage(std::move(message));
186 EXPECT(handler.HasOOBMessages());
187 {
188 // Acquire ownership of message handler queues, verify one regular and one
189 // OOB message.
191 EXPECT(aq.queue()->Length() == 1);
192 EXPECT(aq.oob_queue()->Length() == 1);
193 }
194
195 // Delete all pending messages.
196 handler_peer.CloseAllPorts();
197}
198
199VM_UNIT_TEST_CASE(MessageHandler_ClosePort) {
200 TestMessageHandler handler;
201 MessageHandlerTestPeer handler_peer(&handler);
202 std::unique_ptr<Message> message;
204 Message* raw_message1 = message.get();
205 handler_peer.PostMessage(std::move(message));
207 Message* raw_message2 = message.get();
208 handler_peer.PostMessage(std::move(message));
209
210 handler_peer.ClosePort(1);
211
212 // Closing the port does not drop the messages from the queue.
213 EXPECT(raw_message1 == handler_peer.queue()->Dequeue().get());
214 EXPECT(raw_message2 == handler_peer.queue()->Dequeue().get());
215}
216
217VM_UNIT_TEST_CASE(MessageHandler_CloseAllPorts) {
218 TestMessageHandler handler;
219 MessageHandlerTestPeer handler_peer(&handler);
222
223 handler_peer.CloseAllPorts();
224
225 // All messages are dropped from the queue.
226 EXPECT(nullptr == handler_peer.queue()->Dequeue());
227}
228
229VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage) {
230 TestMessageHandler handler;
231 MessageHandlerTestPeer handler_peer(&handler);
232 Dart_Port port1 = PortMap::CreatePort(&handler);
233 Dart_Port port2 = PortMap::CreatePort(&handler);
234 Dart_Port port3 = PortMap::CreatePort(&handler);
236 handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
238 handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
239
240 // We handle both oob messages and a single normal message.
241 EXPECT_EQ(MessageHandler::kOK, handler.HandleNextMessage());
242 EXPECT_EQ(3, handler.message_count());
243 Dart_Port* ports = handler.port_buffer();
244 EXPECT_EQ(port2, ports[0]);
245 EXPECT_EQ(port3, ports[1]);
246 EXPECT_EQ(port1, ports[2]);
247}
248
249VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_ProcessOOBAfterError) {
250 TestMessageHandler handler;
252 MessageHandler::kError, // oob_message1
253 MessageHandler::kOK, // oob_message2
254 MessageHandler::kOK, // unused
255 };
256 handler.set_results(results);
257 MessageHandlerTestPeer handler_peer(&handler);
258 Dart_Port port1 = PortMap::CreatePort(&handler);
259 Dart_Port port2 = PortMap::CreatePort(&handler);
260 Dart_Port port3 = PortMap::CreatePort(&handler);
262 handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
263 handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
264
265 // When we get an error, we continue processing oob messages but
266 // stop handling normal messages.
267 EXPECT_EQ(MessageHandler::kError, handler.HandleNextMessage());
268 EXPECT_EQ(2, handler.message_count());
269 Dart_Port* ports = handler.port_buffer();
270 EXPECT_EQ(port2, ports[0]); // oob_message1, error
271 EXPECT_EQ(port3, ports[1]); // oob_message2, ok
272 handler_peer.CloseAllPorts();
273}
274
275VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_Shutdown) {
276 TestMessageHandler handler;
278 MessageHandler::kOK, // oob_message1
279 MessageHandler::kShutdown, // oob_message2
280 MessageHandler::kOK, // unused
281 MessageHandler::kOK, // unused
282 };
283 handler.set_results(results);
284 MessageHandlerTestPeer handler_peer(&handler);
285 Dart_Port port1 = PortMap::CreatePort(&handler);
286 Dart_Port port2 = PortMap::CreatePort(&handler);
287 Dart_Port port3 = PortMap::CreatePort(&handler);
288 Dart_Port port4 = PortMap::CreatePort(&handler);
290 handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
291 handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
292 handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority));
293
294 // When we get a shutdown message, we stop processing all messages.
295 EXPECT_EQ(MessageHandler::kShutdown, handler.HandleNextMessage());
296 EXPECT_EQ(2, handler.message_count());
297 Dart_Port* ports = handler.port_buffer();
298 EXPECT_EQ(port2, ports[0]); // oob_message1, ok
299 EXPECT_EQ(port3, ports[1]); // oob_message2, shutdown
300 {
301 // The oob queue has been cleared. oob_message3 is gone.
303 EXPECT(aq.oob_queue()->Length() == 0);
304 }
305 handler_peer.CloseAllPorts();
306}
307
308VM_UNIT_TEST_CASE(MessageHandler_HandleOOBMessages) {
309 TestMessageHandler handler;
310 MessageHandlerTestPeer handler_peer(&handler);
311 Dart_Port port1 = PortMap::CreatePort(&handler);
312 Dart_Port port2 = PortMap::CreatePort(&handler);
313 Dart_Port port3 = PortMap::CreatePort(&handler);
314 Dart_Port port4 = PortMap::CreatePort(&handler);
317 handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
318 handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority));
319
320 // We handle both oob messages but no normal messages.
321 EXPECT_EQ(MessageHandler::kOK, handler.HandleOOBMessages());
322 EXPECT_EQ(2, handler.message_count());
323 Dart_Port* ports = handler.port_buffer();
324 EXPECT_EQ(port3, ports[0]);
325 EXPECT_EQ(port4, ports[1]);
326 handler_peer.CloseAllPorts();
327}
328
332 int count;
334};
335
336static void SendMessages(uword param) {
337 ThreadStartInfo* info = reinterpret_cast<ThreadStartInfo*>(param);
339 MessageHandler* handler = info->handler;
340 MessageHandlerTestPeer handler_peer(handler);
341 for (int i = 0; i < info->count; i++) {
342 handler_peer.PostMessage(
344 }
345}
346
347VM_UNIT_TEST_CASE(MessageHandler_Run) {
348 TestMessageHandler handler;
350 MessageHandlerTestPeer handler_peer(&handler);
351
353 reinterpret_cast<uword>(&handler));
354
355 EXPECT(!PortMap::HasPorts(&handler));
357 EXPECT(PortMap::HasPorts(&handler));
358
360
361 // Wait for the first message to be handled.
362 {
363 MonitorLocker ml(handler.monitor());
364 while (handler.message_count() < 1) {
365 ml.Wait();
366 }
367 EXPECT_EQ(1, handler.message_count());
368 EXPECT(handler.start_called());
369 EXPECT(!handler.end_called());
370 Dart_Port* handler_ports = handler.port_buffer();
371 EXPECT_EQ(port, handler_ports[0]);
372 }
373
374 // Start a thread which sends more messages.
375 Dart_Port ports[10];
376 for (int i = 0; i < 10; i++) {
377 ports[i] = PortMap::CreatePort(&handler);
378 }
380 info.handler = &handler;
381 info.ports = ports;
382 info.count = 10;
384 OSThread::Start("SendMessages", SendMessages, reinterpret_cast<uword>(&info));
385
386 // Wait for the messages to be handled.
387 {
388 MonitorLocker ml(handler.monitor());
389 while (handler.message_count() < 11) {
390 ml.Wait();
391 }
392 Dart_Port* handler_ports = handler.port_buffer();
393 EXPECT_EQ(11, handler.message_count());
394 EXPECT(handler.start_called());
395 EXPECT(!handler.end_called());
396 EXPECT_EQ(port, handler_ports[0]);
397 for (int i = 1; i < 11; i++) {
398 EXPECT_EQ(ports[i - 1], handler_ports[i]);
399 }
400 }
401
402 for (int i = 0; i < 10; i++) {
403 PortMap::ClosePort(ports[i]);
404 }
406 EXPECT(!PortMap::HasPorts(&handler));
407
408 // Must join the thread or the VM shutdown is racing with any VM state the
409 // thread touched.
411 OSThread::Join(info.join_id);
412}
413
414} // namespace dart
AutoreleasePool pool
static void info(const char *fmt,...) SK_PRINTF_LIKE(1
Definition: DM.cpp:213
#define EXPECT(type, expectedAlignment, expectedSize)
MessageHandlerTestPeer(MessageHandler *handler)
MessageQueue * oob_queue() const
void PostMessage(std::unique_ptr< Message > message)
void PostMessage(std::unique_ptr< Message > message, bool before_events=false)
bool Run(ThreadPool *pool, StartCallback start_callback, EndCallback end_callback, CallbackData data)
void ClosePort(Dart_Port port)
MessageStatus HandleNextMessage()
MessageStatus HandleOOBMessages()
intptr_t Length() const
Definition: message.cc:191
std::unique_ptr< Message > Dequeue()
Definition: message.cc:142
static std::unique_ptr< Message > New(Args &&... args)
Definition: message.h:72
@ kNormalPriority
Definition: message.h:28
@ kOOBPriority
Definition: message.h:29
Monitor::WaitResult Wait(int64_t millis=Monitor::kNoTimeout)
Definition: lockers.h:172
static int Start(const char *name, ThreadStartFunction function, uword parameter)
static void Join(ThreadJoinId id)
static OSThread * Current()
Definition: os_thread.h:179
static ThreadJoinId GetCurrentThreadJoinId(OSThread *thread)
static const ThreadJoinId kInvalidThreadJoinId
Definition: os_thread.h:249
static bool ClosePort(Dart_Port id, MessageHandler **message_handler=nullptr)
Definition: port.cc:90
static void ClosePorts(MessageHandler *handler)
Definition: port.cc:128
static Dart_Port CreatePort(MessageHandler *handler)
Definition: port.cc:55
void set_results(MessageStatus *results)
Dart_Port * port_buffer() const
MessageStatus HandleMessage(std::unique_ptr< Message > message)
void MessageNotify(Message::Priority priority)
int64_t Dart_Port
Definition: dart_api.h:1525
#define ASSERT(E)
Win32Message message
Definition: dart_vm.cc:33
MessageHandler::MessageStatus TestStartFunction(uword data)
pthread_t ThreadJoinId
static void SendMessages(uword param)
void TestEndFunction(uword data)
static std::unique_ptr< Message > BlankMessage(Dart_Port dest, Message::Priority priority)
void * malloc(size_t size)
Definition: allocation.cc:19
uintptr_t uword
Definition: globals.h:501
static int8_t data[kExtLength]
VM_UNIT_TEST_CASE(DirectoryCurrentNoScope)
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
Definition: switches.h:87
dest
Definition: zip.py:79