Flutter Engine
The Flutter Engine
eventhandler_macos.cc
Go to the documentation of this file.
1// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "platform/globals.h"
6#if defined(DART_HOST_OS_MACOS)
7
8#include "bin/eventhandler.h"
10
11#include <errno.h> // NOLINT
12#include <fcntl.h> // NOLINT
13#include <pthread.h> // NOLINT
14#include <stdio.h> // NOLINT
15#include <string.h> // NOLINT
16#include <sys/event.h> // NOLINT
17#include <unistd.h> // NOLINT
18
19#include "bin/dartutils.h"
20#include "bin/fdutils.h"
21#include "bin/lockers.h"
22#include "bin/process.h"
23#include "bin/socket.h"
24#include "bin/thread.h"
25#include "bin/utils.h"
26#include "platform/hashmap.h"
27#include "platform/syslog.h"
28#include "platform/utils.h"
29
30namespace dart {
31namespace bin {
32
34 return (Mask() & (1 << kInEvent)) != 0;
35}
36
38 return (Mask() & (1 << kOutEvent)) != 0;
39}
40
41// Unregister the file descriptor for a SocketData structure with kqueue.
42static void RemoveFromKqueue(intptr_t kqueue_fd_, DescriptorInfo* di) {
43 if (!di->tracked_by_kqueue()) {
44 return;
45 }
46 const intptr_t kMaxChanges = 2;
47 struct kevent events[kMaxChanges];
48 EV_SET(events, di->fd(), EVFILT_READ, EV_DELETE, 0, 0, nullptr);
49 VOID_NO_RETRY_EXPECTED(kevent(kqueue_fd_, events, 1, nullptr, 0, nullptr));
50 EV_SET(events, di->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
51 VOID_NO_RETRY_EXPECTED(kevent(kqueue_fd_, events, 1, nullptr, 0, nullptr));
52 di->set_tracked_by_kqueue(false);
53}
54
55// Update the kqueue registration for SocketData structure to reflect
56// the events currently of interest.
57static void AddToKqueue(intptr_t kqueue_fd_, DescriptorInfo* di) {
58 ASSERT(!di->tracked_by_kqueue());
59 const intptr_t kMaxChanges = 2;
60 intptr_t changes = 0;
61 struct kevent events[kMaxChanges];
62 int flags = EV_ADD;
63 if (!di->IsListeningSocket()) {
64 flags |= EV_CLEAR;
65 }
66
67 ASSERT(di->HasReadEvent() || di->HasWriteEvent());
68
69 // Register or unregister READ filter if needed.
70 if (di->HasReadEvent()) {
71 EV_SET(events + changes, di->fd(), EVFILT_READ, flags, 0, 0, di);
72 ++changes;
73 }
74 // Register or unregister WRITE filter if needed.
75 if (di->HasWriteEvent()) {
76 EV_SET(events + changes, di->fd(), EVFILT_WRITE, flags, 0, 0, di);
77 ++changes;
78 }
79 ASSERT(changes > 0);
80 ASSERT(changes <= kMaxChanges);
81 int status = NO_RETRY_EXPECTED(
82 kevent(kqueue_fd_, events, changes, nullptr, 0, nullptr));
83 if (status == -1) {
84 // TODO(dart:io): Verify that the dart end is handling this correctly.
85
86 // kQueue does not accept the file descriptor. It could be due to
87 // already closed file descriptor, or unsupported devices, such
88 // as /dev/null. In such case, mark the file descriptor as closed,
89 // so dart will handle it accordingly.
90 di->NotifyAllDartPorts(1 << kCloseEvent);
91 } else {
92 di->set_tracked_by_kqueue(true);
93 }
94}
95
97 : socket_map_(&SimpleHashMap::SamePointerValue, 16) {
98 intptr_t result;
99 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
100 if (result != 0) {
101 FATAL("Pipe creation failed");
102 }
103 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
104 FATAL("Failed to set pipe fd non-blocking\n");
105 }
106 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
107 FATAL("Failed to set pipe fd close on exec\n");
108 }
109 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
110 FATAL("Failed to set pipe fd close on exec\n");
111 }
112 shutdown_ = false;
113
114 kqueue_fd_ = NO_RETRY_EXPECTED(kqueue());
115 if (kqueue_fd_ == -1) {
116 FATAL("Failed creating kqueue");
117 }
118 if (!FDUtils::SetCloseOnExec(kqueue_fd_)) {
119 FATAL("Failed to set kqueue fd close on exec\n");
120 }
121 // Register the interrupt_fd with the kqueue.
122 struct kevent event;
123 EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, nullptr);
124 int status =
125 NO_RETRY_EXPECTED(kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr));
126 if (status == -1) {
127 const int kBufferSize = 1024;
128 char error_message[kBufferSize];
129 Utils::StrError(errno, error_message, kBufferSize);
130 FATAL("Failed adding interrupt fd to kqueue: %s\n", error_message);
131 }
132}
133
134static void DeleteDescriptorInfo(void* info) {
135 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
136 di->Close();
137 delete di;
138}
139
140EventHandlerImplementation::~EventHandlerImplementation() {
141 socket_map_.Clear(DeleteDescriptorInfo);
142 close(kqueue_fd_);
143 close(interrupt_fds_[0]);
144 close(interrupt_fds_[1]);
145}
146
147void EventHandlerImplementation::UpdateKQueueInstance(intptr_t old_mask,
148 DescriptorInfo* di) {
149 intptr_t new_mask = di->Mask();
150 if (old_mask != 0 && new_mask == 0) {
151 RemoveFromKqueue(kqueue_fd_, di);
152 } else if ((old_mask == 0) && (new_mask != 0)) {
153 AddToKqueue(kqueue_fd_, di);
154 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
155 ASSERT(!di->IsListeningSocket());
156 RemoveFromKqueue(kqueue_fd_, di);
157 AddToKqueue(kqueue_fd_, di);
158 }
159}
160
161DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
162 intptr_t fd,
163 bool is_listening) {
164 ASSERT(fd >= 0);
165 SimpleHashMap::Entry* entry = socket_map_.Lookup(
166 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
167 ASSERT(entry != nullptr);
168 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
169 if (di == nullptr) {
170 // If there is no data in the hash map for this file descriptor a
171 // new DescriptorInfo for the file descriptor is inserted.
172 if (is_listening) {
173 di = new DescriptorInfoMultiple(fd);
174 } else {
175 di = new DescriptorInfoSingle(fd);
176 }
177 entry->value = di;
178 }
179 ASSERT(fd == di->fd());
180 return di;
181}
182
183void EventHandlerImplementation::WakeupHandler(intptr_t id,
184 Dart_Port dart_port,
185 int64_t data) {
186 InterruptMessage msg;
187 msg.id = id;
188 msg.dart_port = dart_port;
189 msg.data = data;
190 // WriteToBlocking will write up to 512 bytes atomically, and since our msg
191 // is smaller than 512, we don't need a thread lock.
192 ASSERT(kInterruptMessageSize < PIPE_BUF);
193 intptr_t result =
194 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
196 if (result == -1) {
197 FATAL("Interrupt message failure: %s", strerror(errno));
198 } else {
199 FATAL("Interrupt message failure: expected to write %" Pd
200 " bytes, but wrote %" Pd ".",
202 }
203 }
204}
205
206void EventHandlerImplementation::HandleInterruptFd() {
207 const intptr_t MAX_MESSAGES = kInterruptMessageSize;
208 InterruptMessage msg[MAX_MESSAGES];
209 ssize_t bytes = TEMP_FAILURE_RETRY(
210 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
211 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
212 if (msg[i].id == kTimerId) {
213 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
214 } else if (msg[i].id == kShutdownId) {
215 shutdown_ = true;
216 } else {
217 ASSERT((msg[i].data & COMMAND_MASK) != 0);
218 Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
219 RefCntReleaseScope<Socket> rs(socket);
220 if (socket->fd() == -1) {
221 continue;
222 }
223 DescriptorInfo* di =
224 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
226 ASSERT(!di->IsListeningSocket());
227 // Close the socket for reading.
228 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
229 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
230 ASSERT(!di->IsListeningSocket());
231 // Close the socket for writing.
232 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
233 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
234 // Close the socket and free system resources and move on to next
235 // message.
236 if (IS_SIGNAL_SOCKET(msg[i].data)) {
237 Process::ClearSignalHandlerByFd(di->fd(), socket->isolate_port());
238 }
239 intptr_t old_mask = di->Mask();
240 Dart_Port port = msg[i].dart_port;
241 if (port != ILLEGAL_PORT) {
242 di->RemovePort(port);
243 }
244 intptr_t new_mask = di->Mask();
245 UpdateKQueueInstance(old_mask, di);
246
247 intptr_t fd = di->fd();
248 if (di->IsListeningSocket()) {
249 // We only close the socket file descriptor from the operating
250 // system if there are no other dart socket objects which
251 // are listening on the same (address, port) combination.
252 ListeningSocketRegistry* registry =
253 ListeningSocketRegistry::Instance();
254
255 MutexLocker locker(registry->mutex());
256
257 if (registry->CloseSafe(socket)) {
258 ASSERT(new_mask == 0);
259 socket_map_.Remove(GetHashmapKeyFromFd(fd),
260 GetHashmapHashFromFd(fd));
261 di->Close();
262 delete di;
263 }
264 socket->CloseFd();
265 } else {
266 ASSERT(new_mask == 0);
267 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
268 di->Close();
269 delete di;
270 socket->CloseFd();
271 }
272
273 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
274 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
275 intptr_t old_mask = di->Mask();
276 di->ReturnTokens(msg[i].dart_port, TOKEN_COUNT(msg[i].data));
277 UpdateKQueueInstance(old_mask, di);
278 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
279 // `events` can only have kInEvent/kOutEvent flags set.
280 intptr_t events = msg[i].data & EVENT_MASK;
281 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
282
283 intptr_t old_mask = di->Mask();
284 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
285 UpdateKQueueInstance(old_mask, di);
286 } else {
287 UNREACHABLE();
288 }
289 }
290 }
291}
292
293#ifdef DEBUG_KQUEUE
294static void PrintEventMask(intptr_t fd, struct kevent* event) {
295 Syslog::Print("%d ", static_cast<int>(fd));
296
297 Syslog::Print("filter=0x%x:", event->filter);
298 if (event->filter == EVFILT_READ) {
299 Syslog::Print("EVFILT_READ ");
300 }
301 if (event->filter == EVFILT_WRITE) {
302 Syslog::Print("EVFILT_WRITE ");
303 }
304
305 Syslog::Print("flags: %x: ", event->flags);
306 if ((event->flags & EV_EOF) != 0) {
307 Syslog::Print("EV_EOF ");
308 }
309 if ((event->flags & EV_ERROR) != 0) {
310 Syslog::Print("EV_ERROR ");
311 }
312 if ((event->flags & EV_CLEAR) != 0) {
313 Syslog::Print("EV_CLEAR ");
314 }
315 if ((event->flags & EV_ADD) != 0) {
316 Syslog::Print("EV_ADD ");
317 }
318 if ((event->flags & EV_DELETE) != 0) {
319 Syslog::Print("EV_DELETE ");
320 }
321
322 Syslog::Print("- fflags: %d ", event->fflags);
323 Syslog::Print("- data: %ld ", event->data);
324 Syslog::Print("(available %d) ",
325 static_cast<int>(FDUtils::AvailableBytes(fd)));
326 Syslog::Print("\n");
327}
328#endif
329
330intptr_t EventHandlerImplementation::GetEvents(struct kevent* event,
331 DescriptorInfo* di) {
332#ifdef DEBUG_KQUEUE
333 PrintEventMask(di->fd(), event);
334#endif
335 intptr_t event_mask = 0;
336 if (di->IsListeningSocket()) {
337 // On a listening socket the READ event means that there are
338 // connections ready to be accepted.
339 if (event->filter == EVFILT_READ) {
340 if ((event->flags & EV_EOF) != 0) {
341 if (event->fflags != 0) {
342 event_mask |= (1 << kErrorEvent);
343 } else {
344 event_mask |= (1 << kCloseEvent);
345 }
346 }
347 if (event_mask == 0) {
348 event_mask |= (1 << kInEvent);
349 }
350 } else {
351 UNREACHABLE();
352 }
353 } else {
354 // Prioritize data events over close and error events.
355 if (event->filter == EVFILT_READ) {
356 event_mask = (1 << kInEvent);
357 if ((event->flags & EV_EOF) != 0) {
358 if (event->fflags != 0) {
359 event_mask = (1 << kErrorEvent);
360 } else {
361 event_mask |= (1 << kCloseEvent);
362 }
363 }
364 } else if (event->filter == EVFILT_WRITE) {
365 event_mask |= (1 << kOutEvent);
366 if ((event->flags & EV_EOF) != 0) {
367 if (event->fflags != 0) {
368 event_mask = (1 << kErrorEvent);
369 }
370 }
371 } else {
372 UNREACHABLE();
373 }
374 }
375
376 return event_mask;
377}
378
379void EventHandlerImplementation::HandleEvents(struct kevent* events, int size) {
380 bool interrupt_seen = false;
381 for (int i = 0; i < size; i++) {
382 // If flag EV_ERROR is set it indicates an error in kevent processing.
383 if ((events[i].flags & EV_ERROR) != 0) {
384 const int kBufferSize = 1024;
385 char error_message[kBufferSize];
386 Utils::StrError(events[i].data, error_message, kBufferSize);
387 FATAL("kevent failed %s\n", error_message);
388 }
389 if (events[i].udata == nullptr) {
390 interrupt_seen = true;
391 } else {
392 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(events[i].udata);
393 const intptr_t old_mask = di->Mask();
394 const intptr_t event_mask = GetEvents(events + i, di);
395 if ((event_mask & (1 << kErrorEvent)) != 0) {
396 di->NotifyAllDartPorts(event_mask);
397 UpdateKQueueInstance(old_mask, di);
398 } else if (event_mask != 0) {
399 Dart_Port port = di->NextNotifyDartPort(event_mask);
400 ASSERT(port != 0);
401 UpdateKQueueInstance(old_mask, di);
402 DartUtils::PostInt32(port, event_mask);
403 }
404 }
405 }
406 if (interrupt_seen) {
407 // Handle after socket events, so we avoid closing a socket before we handle
408 // the current events.
409 HandleInterruptFd();
410 }
411}
412
413int64_t EventHandlerImplementation::GetTimeout() {
414 if (!timeout_queue_.HasTimeout()) {
415 return kInfinityTimeout;
416 }
417 int64_t millis =
418 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
419 return (millis < 0) ? 0 : millis;
420}
421
422void EventHandlerImplementation::HandleTimeout() {
423 if (timeout_queue_.HasTimeout()) {
424 int64_t millis = timeout_queue_.CurrentTimeout() -
425 TimerUtils::GetCurrentMonotonicMillis();
426 if (millis <= 0) {
427 DartUtils::PostNull(timeout_queue_.CurrentPort());
428 timeout_queue_.RemoveCurrent();
429 }
430 }
431}
432
433void EventHandlerImplementation::EventHandlerEntry(uword args) {
434 const intptr_t kMaxEvents = 16;
435 struct kevent events[kMaxEvents];
436 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
437 EventHandlerImplementation* handler_impl = &handler->delegate_;
438 ASSERT(handler_impl != nullptr);
439
440 while (!handler_impl->shutdown_) {
441 int64_t millis = handler_impl->GetTimeout();
442 ASSERT(millis == kInfinityTimeout || millis >= 0);
443 if (millis > kMaxInt32) {
444 millis = kMaxInt32;
445 }
446 // nullptr pointer timespec for infinite timeout.
448 struct timespec* timeout = nullptr;
449 struct timespec ts;
450 if (millis >= 0) {
451 int32_t millis32 = static_cast<int32_t>(millis);
452 int32_t secs = millis32 / 1000;
453 ts.tv_sec = secs;
454 ts.tv_nsec = (millis32 - (secs * 1000)) * 1000000;
455 timeout = &ts;
456 }
457 // We have to use TEMP_FAILURE_RETRY for mac, as kevent can modify the
458 // current sigmask.
459 intptr_t result = TEMP_FAILURE_RETRY(kevent(
460 handler_impl->kqueue_fd_, nullptr, 0, events, kMaxEvents, timeout));
461 if (result == -1) {
462 const int kBufferSize = 1024;
463 char error_message[kBufferSize];
464 Utils::StrError(errno, error_message, kBufferSize);
465 FATAL("kevent failed %s\n", error_message);
466 } else {
467 handler_impl->HandleTimeout();
468 handler_impl->HandleEvents(events, result);
469 }
470 }
471 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
472 handler->NotifyShutdownDone();
473}
474
475void EventHandlerImplementation::Start(EventHandler* handler) {
476 int result = Thread::Start("dart:io EventHandler",
477 &EventHandlerImplementation::EventHandlerEntry,
478 reinterpret_cast<uword>(handler));
479 if (result != 0) {
480 FATAL("Failed to start event handler thread %d", result);
481 }
482}
483
485 SendData(kShutdownId, 0, 0);
486}
487
489 Dart_Port dart_port,
490 int64_t data) {
491 WakeupHandler(id, dart_port, data);
492}
493
494void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
495 // The hashmap does not support keys with value 0.
496 return reinterpret_cast<void*>(fd + 1);
497}
498
499uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
500 // The hashmap does not support keys with value 0.
501 return dart::Utils::WordHash(fd + 1);
502}
503
504} // namespace bin
505} // namespace dart
506
507#endif // defined(DART_HOST_OS_MACOS)
static void info(const char *fmt,...) SK_PRINTF_LIKE(1
Definition: DM.cpp:213
static bool read(SkStream *stream, void *buffer, size_t amount)
static const size_t kBufferSize
Definition: SkString.cpp:27
#define UNREACHABLE()
Definition: assert.h:248
#define DEBUG_ASSERT(cond)
Definition: assert.h:321
static uint32_t WordHash(intptr_t key)
Definition: utils.cc:217
virtual intptr_t Mask()=0
#define ILLEGAL_PORT
Definition: dart_api.h:1535
int64_t Dart_Port
Definition: dart_api.h:1525
#define ASSERT(E)
#define IS_COMMAND(data, command_bit)
Definition: eventhandler.h:49
#define EVENT_MASK
Definition: eventhandler.h:44
#define IS_LISTENING_SOCKET(data)
Definition: eventhandler.h:56
#define COMMAND_MASK
Definition: eventhandler.h:39
#define IS_SIGNAL_SOCKET(data)
Definition: eventhandler.h:58
#define TOKEN_COUNT(data)
Definition: eventhandler.h:60
#define FATAL(error)
FlutterSemanticsFlag flags
G_BEGIN_DECLS G_MODULE_EXPORT FlValue * args
FlKeyEvent * event
GAsyncResult * result
int SendData(MHD_Connection *connection, const SkData *data, const char *type, bool setContentDisposition, const char *dispositionString)
Definition: Response.cpp:64
static constexpr intptr_t kTimerId
Definition: eventhandler.h:109
@ kShutdownWriteCommand
Definition: eventhandler.h:30
@ kReturnTokenCommand
Definition: eventhandler.h:31
@ kShutdownReadCommand
Definition: eventhandler.h:29
@ kDestroyedEvent
Definition: eventhandler.h:27
@ kSetEventMaskCommand
Definition: eventhandler.h:32
static constexpr intptr_t kShutdownId
Definition: eventhandler.h:110
static constexpr intptr_t kInfinityTimeout
Definition: eventhandler.h:108
static constexpr intptr_t kInterruptMessageSize
Definition: eventhandler.h:107
static void Shutdown(Dart_NativeArguments args)
Definition: dart_vm.cc:33
uintptr_t uword
Definition: globals.h:501
constexpr int32_t kMaxInt32
Definition: globals.h:483
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
Definition: switches.h:87
it will be possible to load the file into Perfetto s trace viewer disable asset Prevents usage of any non test fonts unless they were explicitly Loaded via prefetched default font Indicates whether the embedding started a prefetch of the default font manager before creating the engine run In non interactive keep the shell running after the Dart script has completed enable serial On low power devices with low core running concurrent GC tasks on threads can cause them to contend with the UI thread which could potentially lead to jank This option turns off all concurrent GC activities domain network JSON encoded network policy per domain This overrides the DisallowInsecureConnections switch Embedder can specify whether to allow or disallow insecure connections at a domain level old gen heap size
Definition: switches.h:259
def timeout(deadline, cmd)
#define Pd
Definition: globals.h:408
#define NO_RETRY_EXPECTED(expression)
#define VOID_NO_RETRY_EXPECTED(expression)
#define TEMP_FAILURE_RETRY(expression)
std::shared_ptr< const fml::Mapping > data
Definition: texture_gles.cc:63
const uintptr_t id