Flutter Engine
The Flutter Engine
eventhandler_linux.cc
Go to the documentation of this file.
1// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "platform/globals.h"
6#if defined(DART_HOST_OS_LINUX) || defined(DART_HOST_OS_ANDROID)
7
8#include "bin/eventhandler.h"
10
11#include <errno.h> // NOLINT
12#include <fcntl.h> // NOLINT
13#include <pthread.h> // NOLINT
14#include <stdio.h> // NOLINT
15#include <string.h> // NOLINT
16#include <sys/epoll.h> // NOLINT
17#include <sys/stat.h> // NOLINT
18#include <sys/timerfd.h> // NOLINT
19#include <unistd.h> // NOLINT
20
21#include "bin/dartutils.h"
22#include "bin/fdutils.h"
23#include "bin/lockers.h"
24#include "bin/process.h"
25#include "bin/socket.h"
26#include "bin/thread.h"
27#include "platform/syslog.h"
28#include "platform/utils.h"
29
30namespace dart {
31namespace bin {
32
34 // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
35 // triggered anyway.
36 intptr_t events = 0;
37 if ((Mask() & (1 << kInEvent)) != 0) {
38 events |= EPOLLIN;
39 }
40 if ((Mask() & (1 << kOutEvent)) != 0) {
41 events |= EPOLLOUT;
42 }
43 return events;
44}
45
46// Unregister the file descriptor for a DescriptorInfo structure with
47// epoll.
48static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
50 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), nullptr));
51}
52
53static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
54 struct epoll_event event;
55 event.events = EPOLLRDHUP | di->GetPollEvents();
56 if (!di->IsListeningSocket()) {
57 event.events |= EPOLLET;
58 }
59 event.data.ptr = di;
60 int status =
61 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event));
62 if (status == -1) {
63 // TODO(dart:io): Verify that the dart end is handling this correctly.
64
65 // Epoll does not accept the file descriptor. It could be due to
66 // already closed file descriptor, or unsupported devices, such
67 // as /dev/null. In such case, mark the file descriptor as closed,
68 // so dart will handle it accordingly.
69 di->NotifyAllDartPorts(1 << kCloseEvent);
70 }
71}
72
74 : socket_map_(&SimpleHashMap::SamePointerValue, 16) {
75 intptr_t result;
76 result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
77 if (result != 0) {
78 FATAL("Pipe creation failed");
79 }
80 if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
81 FATAL("Failed to set pipe fd non blocking\n");
82 }
83 if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
84 FATAL("Failed to set pipe fd close on exec\n");
85 }
86 if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
87 FATAL("Failed to set pipe fd close on exec\n");
88 }
89 shutdown_ = false;
90 // The initial size passed to epoll_create is ignore on newer (>=
91 // 2.6.8) Linux versions
92 const int kEpollInitialSize = 64;
93 epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
94 if (epoll_fd_ == -1) {
95 FATAL("Failed creating epoll file descriptor: %i", errno);
96 }
97 if (!FDUtils::SetCloseOnExec(epoll_fd_)) {
98 FATAL("Failed to set epoll fd close on exec\n");
99 }
100 // Register the interrupt_fd with the epoll instance.
101 struct epoll_event event;
102 event.events = EPOLLIN;
103 event.data.ptr = nullptr;
104 int status = NO_RETRY_EXPECTED(
105 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
106 if (status == -1) {
107 FATAL("Failed adding interrupt fd to epoll instance");
108 }
109 timer_fd_ = NO_RETRY_EXPECTED(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC));
110 if (timer_fd_ == -1) {
111 FATAL("Failed creating timerfd file descriptor: %i", errno);
112 }
113 // Register the timer_fd_ with the epoll instance.
114 event.events = EPOLLIN;
115 event.data.fd = timer_fd_;
116 status =
117 NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &event));
118 if (status == -1) {
119 FATAL("Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_,
120 errno);
121 }
122}
123
124static void DeleteDescriptorInfo(void* info) {
125 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
126 di->Close();
127 delete di;
128}
129
130EventHandlerImplementation::~EventHandlerImplementation() {
131 socket_map_.Clear(DeleteDescriptorInfo);
132 close(epoll_fd_);
133 close(timer_fd_);
134 close(interrupt_fds_[0]);
135 close(interrupt_fds_[1]);
136}
137
138void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
139 DescriptorInfo* di) {
140 intptr_t new_mask = di->Mask();
141 if ((old_mask != 0) && (new_mask == 0)) {
142 RemoveFromEpollInstance(epoll_fd_, di);
143 } else if ((old_mask == 0) && (new_mask != 0)) {
144 AddToEpollInstance(epoll_fd_, di);
145 } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
146 ASSERT(!di->IsListeningSocket());
147 RemoveFromEpollInstance(epoll_fd_, di);
148 AddToEpollInstance(epoll_fd_, di);
149 }
150}
151
152DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
153 intptr_t fd,
154 bool is_listening) {
155 ASSERT(fd >= 0);
156 SimpleHashMap::Entry* entry = socket_map_.Lookup(
157 GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
158 ASSERT(entry != nullptr);
159 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
160 if (di == nullptr) {
161 // If there is no data in the hash map for this file descriptor a
162 // new DescriptorInfo for the file descriptor is inserted.
163 if (is_listening) {
164 di = new DescriptorInfoMultiple(fd);
165 } else {
166 di = new DescriptorInfoSingle(fd);
167 }
168 entry->value = di;
169 }
170 ASSERT(fd == di->fd());
171 return di;
172}
173
174void EventHandlerImplementation::WakeupHandler(intptr_t id,
175 Dart_Port dart_port,
176 int64_t data) {
177 InterruptMessage msg;
178 msg.id = id;
179 msg.dart_port = dart_port;
180 msg.data = data;
181 // WriteToBlocking will write up to 512 bytes atomically, and since our msg
182 // is smaller than 512, we don't need a thread lock.
183 // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
184 ASSERT(kInterruptMessageSize < PIPE_BUF);
185 intptr_t result =
186 FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
188 if (result == -1) {
189 FATAL("Interrupt message failure: %s", strerror(errno));
190 } else {
191 FATAL("Interrupt message failure: expected to write %" Pd
192 " bytes, but wrote %" Pd ".",
194 }
195 }
196}
197
198void EventHandlerImplementation::HandleInterruptFd() {
199 const intptr_t MAX_MESSAGES = kInterruptMessageSize;
200 InterruptMessage msg[MAX_MESSAGES];
202 read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
203 for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
204 if (msg[i].id == kTimerId) {
205 timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
206 UpdateTimerFd();
207 } else if (msg[i].id == kShutdownId) {
208 shutdown_ = true;
209 } else {
210 ASSERT((msg[i].data & COMMAND_MASK) != 0);
211 Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
212 RefCntReleaseScope<Socket> rs(socket);
213 if (socket->fd() == -1) {
214 continue;
215 }
216 DescriptorInfo* di =
217 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
219 ASSERT(!di->IsListeningSocket());
220 // Close the socket for reading.
221 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
222 } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
223 ASSERT(!di->IsListeningSocket());
224 // Close the socket for writing.
225 VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
226 } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
227 // Close the socket and free system resources and move on to next
228 // message.
229 if (IS_SIGNAL_SOCKET(msg[i].data)) {
230 Process::ClearSignalHandlerByFd(di->fd(), socket->isolate_port());
231 }
232 intptr_t old_mask = di->Mask();
233 Dart_Port port = msg[i].dart_port;
234 if (port != ILLEGAL_PORT) {
235 di->RemovePort(port);
236 }
237 intptr_t new_mask = di->Mask();
238 UpdateEpollInstance(old_mask, di);
239
240 intptr_t fd = di->fd();
241 ASSERT(fd == socket->fd());
242 if (di->IsListeningSocket()) {
243 // We only close the socket file descriptor from the operating
244 // system if there are no other dart socket objects which
245 // are listening on the same (address, port) combination.
246 ListeningSocketRegistry* registry =
247 ListeningSocketRegistry::Instance();
248
249 MutexLocker locker(registry->mutex());
250
251 if (registry->CloseSafe(socket)) {
252 ASSERT(new_mask == 0);
253 socket_map_.Remove(GetHashmapKeyFromFd(fd),
254 GetHashmapHashFromFd(fd));
255 di->Close();
256 delete di;
257 }
258 socket->CloseFd();
259 } else {
260 ASSERT(new_mask == 0);
261 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
262 di->Close();
263 delete di;
264 socket->CloseFd();
265 }
266 DartUtils::PostInt32(port, 1 << kDestroyedEvent);
267 } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
268 int count = TOKEN_COUNT(msg[i].data);
269 intptr_t old_mask = di->Mask();
270 di->ReturnTokens(msg[i].dart_port, count);
271 UpdateEpollInstance(old_mask, di);
272 } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
273 // `events` can only have kInEvent/kOutEvent flags set.
274 intptr_t events = msg[i].data & EVENT_MASK;
275 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
276
277 intptr_t old_mask = di->Mask();
278 di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
279 UpdateEpollInstance(old_mask, di);
280 } else {
281 UNREACHABLE();
282 }
283 }
284 }
285}
286
287void EventHandlerImplementation::UpdateTimerFd() {
288 struct itimerspec it;
289 memset(&it, 0, sizeof(it));
290 if (timeout_queue_.HasTimeout()) {
291 int64_t millis = timeout_queue_.CurrentTimeout();
292 it.it_value.tv_sec = millis / 1000;
293 it.it_value.tv_nsec = (millis % 1000) * 1000000;
294 }
296 timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, nullptr));
297}
298
299#ifdef DEBUG_POLL
300static void PrintEventMask(intptr_t fd, intptr_t events) {
301 Syslog::Print("%d ", fd);
302 if ((events & EPOLLIN) != 0) {
303 Syslog::Print("EPOLLIN ");
304 }
305 if ((events & EPOLLPRI) != 0) {
306 Syslog::Print("EPOLLPRI ");
307 }
308 if ((events & EPOLLOUT) != 0) {
309 Syslog::Print("EPOLLOUT ");
310 }
311 if ((events & EPOLLERR) != 0) {
312 Syslog::Print("EPOLLERR ");
313 }
314 if ((events & EPOLLHUP) != 0) {
315 Syslog::Print("EPOLLHUP ");
316 }
317 if ((events & EPOLLRDHUP) != 0) {
318 Syslog::Print("EPOLLRDHUP ");
319 }
320 int all_events =
321 EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
322 if ((events & ~all_events) != 0) {
323 Syslog::Print("(and %08x) ", events & ~all_events);
324 }
325 Syslog::Print("(available %d) ", FDUtils::AvailableBytes(fd));
326
327 Syslog::Print("\n");
328}
329#endif
330
331intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
332 DescriptorInfo* di) {
333#ifdef DEBUG_POLL
334 PrintEventMask(di->fd(), events);
335#endif
336 if ((events & EPOLLERR) != 0) {
337 // Return error only if EPOLLIN is present.
338 return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
339 }
340 intptr_t event_mask = 0;
341 if ((events & EPOLLIN) != 0) {
342 event_mask |= (1 << kInEvent);
343 }
344 if ((events & EPOLLOUT) != 0) {
345 event_mask |= (1 << kOutEvent);
346 }
347 if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
348 event_mask |= (1 << kCloseEvent);
349 }
350 return event_mask;
351}
352
353void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
354 int size) {
355 bool interrupt_seen = false;
356 for (int i = 0; i < size; i++) {
357 if (events[i].data.ptr == nullptr) {
358 interrupt_seen = true;
359 } else if (events[i].data.fd == timer_fd_) {
360 int64_t val;
362 read(timer_fd_, &val, sizeof(val)));
363 if (timeout_queue_.HasTimeout()) {
364 DartUtils::PostNull(timeout_queue_.CurrentPort());
365 timeout_queue_.RemoveCurrent();
366 }
367 UpdateTimerFd();
368 } else {
369 DescriptorInfo* di =
370 reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
371 const intptr_t old_mask = di->Mask();
372 const intptr_t event_mask = GetPollEvents(events[i].events, di);
373 if ((event_mask & (1 << kErrorEvent)) != 0) {
374 di->NotifyAllDartPorts(event_mask);
375 UpdateEpollInstance(old_mask, di);
376 } else if (event_mask != 0) {
377 Dart_Port port = di->NextNotifyDartPort(event_mask);
378 ASSERT(port != 0);
379 UpdateEpollInstance(old_mask, di);
380 DartUtils::PostInt32(port, event_mask);
381 }
382 }
383 }
384 if (interrupt_seen) {
385 // Handle after socket events, so we avoid closing a socket before we handle
386 // the current events.
387 HandleInterruptFd();
388 }
389}
390
391void EventHandlerImplementation::Poll(uword args) {
392 ThreadSignalBlocker signal_blocker(SIGPROF);
393 const intptr_t kMaxEvents = 16;
394 struct epoll_event events[kMaxEvents];
395 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
396 EventHandlerImplementation* handler_impl = &handler->delegate_;
397 ASSERT(handler_impl != nullptr);
398
399 while (!handler_impl->shutdown_) {
401 epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, -1));
402 ASSERT(EAGAIN == EWOULDBLOCK);
403 if (result <= 0) {
404 if (errno != EWOULDBLOCK) {
405 perror("Poll failed");
406 }
407 } else {
408 handler_impl->HandleEvents(events, result);
409 }
410 }
411 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
412 handler->NotifyShutdownDone();
413}
414
415void EventHandlerImplementation::Start(EventHandler* handler) {
416 int result =
417 Thread::Start("dart:io EventHandler", &EventHandlerImplementation::Poll,
418 reinterpret_cast<uword>(handler));
419 if (result != 0) {
420 FATAL("Failed to start event handler thread %d", result);
421 }
422}
423
425 SendData(kShutdownId, 0, 0);
426}
427
429 Dart_Port dart_port,
430 int64_t data) {
431 WakeupHandler(id, dart_port, data);
432}
433
434void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
435 // The hashmap does not support keys with value 0.
436 return reinterpret_cast<void*>(fd + 1);
437}
438
439uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
440 // The hashmap does not support keys with value 0.
441 return dart::Utils::WordHash(fd + 1);
442}
443
444} // namespace bin
445} // namespace dart
446
447#endif // defined(DART_HOST_OS_LINUX) || defined(DART_HOST_OS_ANDROID)
static void info(const char *fmt,...) SK_PRINTF_LIKE(1
Definition: DM.cpp:213
int count
Definition: FontMgrTest.cpp:50
static bool read(SkStream *stream, void *buffer, size_t amount)
#define UNREACHABLE()
Definition: assert.h:248
#define DEBUG_ASSERT(cond)
Definition: assert.h:321
static uint32_t WordHash(intptr_t key)
Definition: utils.cc:217
virtual intptr_t Mask()=0
#define ILLEGAL_PORT
Definition: dart_api.h:1535
int64_t Dart_Port
Definition: dart_api.h:1525
#define ASSERT(E)
#define IS_COMMAND(data, command_bit)
Definition: eventhandler.h:49
#define EVENT_MASK
Definition: eventhandler.h:44
#define IS_LISTENING_SOCKET(data)
Definition: eventhandler.h:56
#define COMMAND_MASK
Definition: eventhandler.h:39
#define IS_SIGNAL_SOCKET(data)
Definition: eventhandler.h:58
#define TOKEN_COUNT(data)
Definition: eventhandler.h:60
#define FATAL(error)
G_BEGIN_DECLS G_MODULE_EXPORT FlValue * args
FlKeyEvent * event
GAsyncResult * result
int SendData(MHD_Connection *connection, const SkData *data, const char *type, bool setContentDisposition, const char *dispositionString)
Definition: Response.cpp:64
static constexpr intptr_t kTimerId
Definition: eventhandler.h:109
@ kShutdownWriteCommand
Definition: eventhandler.h:30
@ kReturnTokenCommand
Definition: eventhandler.h:31
@ kShutdownReadCommand
Definition: eventhandler.h:29
@ kDestroyedEvent
Definition: eventhandler.h:27
@ kSetEventMaskCommand
Definition: eventhandler.h:32
static constexpr intptr_t kShutdownId
Definition: eventhandler.h:110
static constexpr intptr_t kInterruptMessageSize
Definition: eventhandler.h:107
static void Shutdown(Dart_NativeArguments args)
Definition: dart_vm.cc:33
uintptr_t uword
Definition: globals.h:501
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
Definition: switches.h:87
it will be possible to load the file into Perfetto s trace viewer disable asset Prevents usage of any non test fonts unless they were explicitly Loaded via prefetched default font Indicates whether the embedding started a prefetch of the default font manager before creating the engine run In non interactive keep the shell running after the Dart script has completed enable serial On low power devices with low core running concurrent GC tasks on threads can cause them to contend with the UI thread which could potentially lead to jank This option turns off all concurrent GC activities domain network JSON encoded network policy per domain This overrides the DisallowInsecureConnections switch Embedder can specify whether to allow or disallow insecure connections at a domain level old gen heap size
Definition: switches.h:259
#define Pd
Definition: globals.h:408
#define TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(expression)
#define NO_RETRY_EXPECTED(expression)
#define VOID_TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(expression)
#define VOID_NO_RETRY_EXPECTED(expression)
std::shared_ptr< const fml::Mapping > data
Definition: texture_gles.cc:63
const uintptr_t id
int timerfd_settime(int ufc, int flags, const struct itimerspec *utmr, struct itimerspec *otmr)
Definition: timerfd.cc:23
int timerfd_create(int clockid, int flags)
Definition: timerfd.cc:19
#define TFD_TIMER_ABSTIME
Definition: timerfd.h:29
#define TFD_CLOEXEC
Definition: timerfd.h:32