Flutter Engine
The Flutter Engine
Loading...
Searching...
No Matches
eventhandler_fuchsia.cc
Go to the documentation of this file.
1// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "platform/globals.h"
6#if defined(DART_HOST_OS_FUCHSIA)
7
8#include "bin/eventhandler.h"
10
11#include <errno.h>
12#include <fcntl.h>
13#include <poll.h>
14#include <pthread.h>
15#include <stdio.h>
16#include <string.h>
17#include <sys/socket.h>
18#include <sys/stat.h>
19#include <unistd.h>
20#include <zircon/status.h>
21#include <zircon/syscalls.h>
22#include <zircon/syscalls/object.h>
23#include <zircon/syscalls/port.h>
24
25#include "bin/fdutils.h"
26#include "bin/lockers.h"
27#include "bin/socket.h"
28#include "bin/thread.h"
29#include "bin/utils.h"
30#include "platform/hashmap.h"
31#include "platform/syslog.h"
32#include "platform/utils.h"
33
34// The EventHandler for Fuchsia uses its "ports v2" API:
35// https://fuchsia.googlesource.com/fuchsia/+/HEAD/zircon/docs/syscalls/port_create.md
36// This API does not have epoll()-like edge triggering (EPOLLET). Since clients
37// of the EventHandler expect edge-triggered notifications, we must simulate it.
38// When a packet from zx_port_wait() indicates that a signal is asserted for a
39// handle, we unsubscribe from that signal until the event that asserted the
40// signal can be processed. For example:
41//
42// 1. We get ZX_SOCKET_WRITABLE from zx_port_wait() for a handle.
43// 2. We send kOutEvent to the Dart thread.
44// 3. We unsubscribe from further ZX_SOCKET_WRITABLE signals for the handle.
45// 4. Some time later the Dart thread actually does a write().
46// 5. After writing, the Dart thread resubscribes to write events.
47//
48// We use the same procedure for ZX_SOCKET_READABLE, and read()/accept().
49
50// define EVENTHANDLER_LOG_ERROR to get log messages only for errors.
51// define EVENTHANDLER_LOG_INFO to get log messages for both information and
52// errors.
53// #define EVENTHANDLER_LOG_INFO 1
54#define EVENTHANDLER_LOG_ERROR 1
55#if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
56#define LOG_ERR(msg, ...) \
57 { \
58 int err = errno; \
59 Syslog::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, \
60 __LINE__, ##__VA_ARGS__); \
61 errno = err; \
62 }
63#if defined(EVENTHANDLER_LOG_INFO)
64#define LOG_INFO(msg, ...) \
65 Syslog::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__, \
66 ##__VA_ARGS__)
67#else
68#define LOG_INFO(msg, ...)
69#endif // defined(EVENTHANDLER_LOG_INFO)
70#else
71#define LOG_ERR(msg, ...)
72#define LOG_INFO(msg, ...)
73#endif // defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
74
75namespace dart {
76namespace bin {
77
78intptr_t IOHandle::Read(void* buffer, intptr_t num_bytes) {
79 MutexLocker ml(&mutex_);
80 const ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd_, buffer, num_bytes));
81 const int err = errno;
82 LOG_INFO("IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes);
83
84 // Track the number of bytes available to read.
85 if (read_bytes > 0) {
86 available_bytes_ -=
87 (available_bytes_ >= read_bytes) ? read_bytes : available_bytes_;
88 }
89
90 // If we have read all available bytes, or if there was an error, then
91 // re-enable read events. We re-enable read events even if read() returns
92 // an error. The error might be, e.g. EWOULDBLOCK, in which case
93 // resubscription is necessary. Logic in the caller decides which errors
94 // are real, and which are ignore-and-continue.
95 if ((available_bytes_ == 0) || (read_bytes < 0)) {
96 // Resubscribe to read events.
97 read_events_enabled_ = true;
98 if (wait_key_ == 0) {
99 LOG_ERR("IOHandle::Read calling AsyncWaitLocked with wait_key_ == 0");
100 }
101 if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLIN, wait_key_)) {
102 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
103 }
104 }
105
106 errno = err;
107 return read_bytes;
108}
109
110intptr_t IOHandle::Write(const void* buffer, intptr_t num_bytes) {
111 MutexLocker ml(&mutex_);
112 const ssize_t written_bytes =
113 NO_RETRY_EXPECTED(write(fd_, buffer, num_bytes));
114 const int err = errno;
115 LOG_INFO("IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes);
116
117 // Resubscribe to write events.
118 write_events_enabled_ = true;
119 if (wait_key_ == 0) {
120 LOG_ERR("IOHandle::Write calling AsyncWaitLocked with wait_key_ == 0");
121 }
122 if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLOUT, wait_key_)) {
123 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
124 }
125
126 errno = err;
127 return written_bytes;
128}
129
130intptr_t IOHandle::Accept(struct sockaddr* addr, socklen_t* addrlen) {
131 MutexLocker ml(&mutex_);
132 const intptr_t socket = NO_RETRY_EXPECTED(accept(fd_, addr, addrlen));
133 const int err = errno;
134 LOG_INFO("IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket);
135
136 // Re-subscribe to read events.
137 read_events_enabled_ = true;
138 if (wait_key_ == 0) {
139 LOG_ERR("IOHandle::Accept calling AsyncWaitLocked with wait_key_ == 0");
140 }
141 if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLIN, wait_key_)) {
142 LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
143 }
144
145 errno = err;
146 return socket;
147}
148
149intptr_t IOHandle::AvailableBytes() {
150 MutexLocker ml(&mutex_);
151 ASSERT(fd_ >= 0);
152 intptr_t available = FDUtils::AvailableBytes(fd_);
153 LOG_INFO("IOHandle::AvailableBytes(): fd = %ld, bytes = %ld\n", fd_,
154 available);
155 if (available < 0) {
156 // If there is an error, we set available to 1 to trigger a read event that
157 // then propagates the error.
158 available = 1;
159 }
160 available_bytes_ = available;
161 return available;
162}
163
164void IOHandle::Close() {
165 MutexLocker ml(&mutex_);
166 VOID_NO_RETRY_EXPECTED(close(fd_));
167}
168
169uint32_t IOHandle::MaskToEpollEvents(intptr_t mask) {
170 MutexLocker ml(&mutex_);
171 // Do not ask for POLLERR and POLLHUP explicitly as they are
172 // triggered anyway.
173 uint32_t events = 0;
174 // Do not subscribe to read closed events when kCloseEvent has already been
175 // sent to the Dart thread.
176 if (close_events_enabled_) {
177 events |= POLLRDHUP;
178 }
179 if (read_events_enabled_ && ((mask & (1 << kInEvent)) != 0)) {
180 events |= POLLIN;
181 }
182 if (write_events_enabled_ && ((mask & (1 << kOutEvent)) != 0)) {
183 events |= POLLOUT;
184 }
185 return events;
186}
187
188intptr_t IOHandle::EpollEventsToMask(intptr_t events) {
189 if ((events & POLLERR) != 0) {
190 // Return error only if POLLIN is present.
191 return ((events & POLLIN) != 0) ? (1 << kErrorEvent) : 0;
192 }
193 intptr_t event_mask = 0;
194 if ((events & POLLIN) != 0) {
195 event_mask |= (1 << kInEvent);
196 }
197 if ((events & POLLOUT) != 0) {
198 event_mask |= (1 << kOutEvent);
199 }
200 if ((events & (POLLHUP | POLLRDHUP)) != 0) {
201 event_mask |= (1 << kCloseEvent);
202 }
203 return event_mask;
204}
205
206bool IOHandle::AsyncWaitLocked(zx_handle_t port,
207 uint32_t events,
208 uint64_t key) {
209 LOG_INFO("IOHandle::AsyncWaitLocked: fd = %ld\n", fd_);
210 if (key == 0) {
211 LOG_ERR("IOHandle::AsyncWaitLocked called with key == 0");
212 }
213 // The call to fdio_unsafe_fd_to_io() in the DescriptorInfo constructor may
214 // have returned nullptr. If it did, propagate the problem up to Dart.
215 if (fdio_ == nullptr) {
216 LOG_ERR("fdio_unsafe_fd_to_io(%ld) returned nullptr\n", fd_);
217 return false;
218 }
219
220 zx_handle_t handle;
221 zx_signals_t signals;
222 fdio_unsafe_wait_begin(fdio_, events, &handle, &signals);
223 if (handle == ZX_HANDLE_INVALID) {
224 LOG_ERR("fd = %ld fdio_unsafe_wait_begin returned an invalid handle\n",
225 fd_);
226 return false;
227 }
228
229 // Remember the port. Use the remembered port if the argument "port" is
230 // ZX_HANDLE_INVALID.
231 ASSERT((port != ZX_HANDLE_INVALID) || (port_ != ZX_HANDLE_INVALID));
232 if ((port_ == ZX_HANDLE_INVALID) || (port != ZX_HANDLE_INVALID)) {
233 port_ = port;
234 }
235
236 handle_ = handle;
237 wait_key_ = key;
238 LOG_INFO("zx_object_wait_async(fd = %ld, signals = %x)\n", fd_, signals);
239 zx_status_t status =
240 zx_object_wait_async(handle_, port_, key, signals, ZX_WAIT_ASYNC_ONCE);
241 if (status != ZX_OK) {
242 LOG_ERR("zx_object_wait_async failed: %s\n", zx_status_get_string(status));
243 return false;
244 }
245
246 return true;
247}
248
249bool IOHandle::AsyncWait(zx_handle_t port, uint32_t events, uint64_t key) {
250 MutexLocker ml(&mutex_);
251 return AsyncWaitLocked(port, events, key);
252}
253
254void IOHandle::CancelWait(zx_handle_t port, uint64_t key) {
255 MutexLocker ml(&mutex_);
256 LOG_INFO("IOHandle::CancelWait: fd = %ld\n", fd_);
257 ASSERT(port != ZX_HANDLE_INVALID);
258 ASSERT(handle_ != ZX_HANDLE_INVALID);
259 if (key == 0) {
260 LOG_ERR("IOHandle::CancelWait calling zx_port_cancel with key == 0");
261 }
262 zx_status_t status = zx_port_cancel(port, handle_, key);
263 if ((status != ZX_OK) && (status != ZX_ERR_NOT_FOUND)) {
264 LOG_ERR("zx_port_cancel failed: %s\n", zx_status_get_string(status));
265 }
266}
267
268uint32_t IOHandle::WaitEnd(zx_signals_t observed) {
269 MutexLocker ml(&mutex_);
270 uint32_t events = 0;
271 fdio_unsafe_wait_end(fdio_, observed, &events);
272 LOG_INFO("IOHandle::WaitEnd: fd = %ld, events = %x\n", fd_, events);
273 return events;
274}
275
276// This function controls the simulation of edge-triggering. It is responsible
277// for removing events from the event mask when they should be suppressed, and
278// for suppressing future events. Events are unsuppressed by their respective
279// operations by the Dart thread on the socket---that is, where the
280// *_events_enabled_ flags are set to true.
281intptr_t IOHandle::ToggleEvents(intptr_t event_mask) {
282 MutexLocker ml(&mutex_);
283 // If write events are disabled, then remove the kOutEvent bit from the
284 // event mask.
285 if (!write_events_enabled_) {
286 LOG_INFO(
287 "IOHandle::ToggleEvents: fd = %ld "
288 "de-asserting kOutEvent\n",
289 fd_);
290 event_mask = event_mask & ~(1 << kOutEvent);
291 }
292 // If the kOutEvent bit is set, then suppress future write events until the
293 // Dart thread writes.
294 if ((event_mask & (1 << kOutEvent)) != 0) {
295 LOG_INFO(
296 "IOHandle::ToggleEvents: fd = %ld "
297 "asserting kOutEvent and disabling\n",
298 fd_);
299 write_events_enabled_ = false;
300 }
301
302 // If read events are disabled, then remove the kInEvent bit from the event
303 // mask.
304 if (!read_events_enabled_) {
305 LOG_INFO(
306 "IOHandle::ToggleEvents: fd = %ld "
307 "de-asserting kInEvent\n",
308 fd_);
309 event_mask = event_mask & ~(1 << kInEvent);
310 }
311 // We may get In events without available bytes, so we must make sure there
312 // are actually bytes, or we will never resubscribe (due to a short-circuit
313 // on the Dart side).
314 //
315 // This happens due to how packets get enqueued on the port with all signals
316 // asserted at that time. Sometimes we enqueue a packet due to
317 // zx_object_wait_async e.g. for POLLOUT (writability) while the socket is
318 // readable and while we have a Read queued up on the Dart side. This packet
319 // will also have POLLIN (readable) asserted. We may then perform the Read
320 // and drain the socket before our zx_port_wait is serviced, at which point
321 // when we process the packet for POLLOUT with its stale POLLIN (readable)
322 // signal, the socket is no longer actually readable.
323 //
324 // As a detail, negative available bytes (errors) are handled specially; see
325 // IOHandle::AvailableBytes for more information.
326 if ((event_mask & (1 << kInEvent)) != 0) {
327 if (FDUtils::AvailableBytes(fd_) != 0) {
328 LOG_INFO(
329 "IOHandle::ToggleEvents: fd = %ld "
330 "asserting kInEvent and disabling with bytes available\n",
331 fd_);
332 read_events_enabled_ = false;
333 }
334 // Also suppress future read events if we get a kCloseEvent. This is to
335 // account for POLLIN being set by Fuchsia when the socket is read-closed.
336 if ((event_mask & (1 << kCloseEvent)) != 0) {
337 LOG_INFO(
338 "IOHandle::ToggleEvents: fd = %ld "
339 "asserting kInEvent and disabling due to a close event\n",
340 fd_);
341 read_events_enabled_ = false;
342 }
343 }
344
345 // If the close events are disabled, then remove the kCloseEvent bit from the
346 // event mask.
347 if (!close_events_enabled_) {
348 LOG_INFO(
349 "IOHandle::ToggleEvents: fd = %ld "
350 "de-asserting kCloseEvent\n",
351 fd_);
352 event_mask = event_mask & ~(1 << kCloseEvent);
353 }
354 // If the kCloseEvent bit is set, then suppress future close events, they will
355 // be ignored by the Dart thread. See _NativeSocket.multiplex in
356 // socket_patch.dart.
357 if ((event_mask & (1 << kCloseEvent)) != 0) {
358 LOG_INFO(
359 "IOHandle::ToggleEvents: fd = %ld "
360 "asserting kCloseEvent and disabling\n",
361 fd_);
362 close_events_enabled_ = false;
363 }
364 return event_mask;
365}
366
367void EventHandlerImplementation::AddToPort(zx_handle_t port_handle,
368 DescriptorInfo* di) {
369 const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask());
370 const uint64_t key = reinterpret_cast<uint64_t>(di);
371 if (key == 0) {
372 LOG_ERR(
373 "EventHandlerImplementation::AddToPort calling AsyncWait with key == "
374 "0");
375 }
376 if (!di->io_handle()->AsyncWait(port_handle, events, key)) {
377 di->NotifyAllDartPorts(1 << kCloseEvent);
378 }
379}
380
381void EventHandlerImplementation::RemoveFromPort(zx_handle_t port_handle,
382 DescriptorInfo* di) {
383 const uint64_t key = reinterpret_cast<uint64_t>(di);
384 if (key == 0) {
385 LOG_ERR(
386 "EventHandlerImplementation::RemoveFromPort calling CancelWait with "
387 "key == 0");
388 }
389 di->io_handle()->CancelWait(port_handle, key);
390}
391
393 : socket_map_(&SimpleHashMap::SamePointerValue, 16) {
394 shutdown_ = false;
395 // Create the port.
396 port_handle_ = ZX_HANDLE_INVALID;
397 zx_status_t status = zx_port_create(0, &port_handle_);
398 if (status != ZX_OK) {
399 // This is a FATAL because the VM won't work at all if we can't create this
400 // port.
401 FATAL("zx_port_create failed: %s\n", zx_status_get_string(status));
402 }
403 ASSERT(port_handle_ != ZX_HANDLE_INVALID);
404}
405
406static void DeleteDescriptorInfo(void* info) {
407 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
408 LOG_INFO("Closed %ld\n", di->io_handle()->fd());
409 di->Close();
410 delete di;
411}
412
413EventHandlerImplementation::~EventHandlerImplementation() {
414 socket_map_.Clear(DeleteDescriptorInfo);
415 zx_handle_close(port_handle_);
416 port_handle_ = ZX_HANDLE_INVALID;
417}
418
419void EventHandlerImplementation::UpdatePort(intptr_t old_mask,
420 DescriptorInfo* di) {
421 const intptr_t new_mask = di->Mask();
422 if ((old_mask != 0) && (new_mask == 0)) {
423 RemoveFromPort(port_handle_, di);
424 } else if ((old_mask == 0) && (new_mask != 0)) {
425 AddToPort(port_handle_, di);
426 } else if ((old_mask != 0) && (new_mask != 0)) {
427 ASSERT((old_mask == new_mask) || !di->IsListeningSocket());
428 RemoveFromPort(port_handle_, di);
429 AddToPort(port_handle_, di);
430 }
431}
432
433DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
434 intptr_t fd,
435 bool is_listening) {
436 IOHandle* handle = reinterpret_cast<IOHandle*>(fd);
437 ASSERT(handle->fd() >= 0);
438 SimpleHashMap::Entry* entry =
439 socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()),
440 GetHashmapHashFromFd(handle->fd()), true);
441 ASSERT(entry != nullptr);
442 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
443 if (di == nullptr) {
444 // If there is no data in the hash map for this file descriptor a
445 // new DescriptorInfo for the file descriptor is inserted.
446 if (is_listening) {
447 di = new DescriptorInfoMultiple(fd);
448 } else {
449 di = new DescriptorInfoSingle(fd);
450 }
451 entry->value = di;
452 }
453 ASSERT(fd == di->fd());
454 return di;
455}
456
457void EventHandlerImplementation::WakeupHandler(intptr_t id,
458 Dart_Port dart_port,
459 int64_t data) {
460 COMPILE_ASSERT(sizeof(InterruptMessage) <= sizeof(zx_packet_user_t));
461 zx_port_packet_t pkt;
462 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt.user);
463 pkt.key = kInterruptPacketKey;
464 msg->id = id;
465 msg->dart_port = dart_port;
466 msg->data = data;
467 zx_status_t status = zx_port_queue(port_handle_, &pkt);
468 if (status != ZX_OK) {
469 // This is a FATAL because the VM won't work at all if we can't send any
470 // messages to the EventHandler thread.
471 FATAL("zx_port_queue failed: %s\n", zx_status_get_string(status));
472 }
473}
474
475void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
476 if (msg->id == kTimerId) {
477 LOG_INFO("HandleInterrupt read timer update\n");
478 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
479 return;
480 } else if (msg->id == kShutdownId) {
481 LOG_INFO("HandleInterrupt read shutdown\n");
482 shutdown_ = true;
483 return;
484 }
485 ASSERT((msg->data & COMMAND_MASK) != 0);
486 LOG_INFO("HandleInterrupt command:\n");
487 Socket* socket = reinterpret_cast<Socket*>(msg->id);
488 RefCntReleaseScope<Socket> rs(socket);
489 if (socket->fd() == -1) {
490 return;
491 }
492 IOHandle* io_handle = reinterpret_cast<IOHandle*>(socket->fd());
493 const intptr_t fd = io_handle->fd();
494 DescriptorInfo* di =
495 GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg->data));
496 ASSERT(io_handle == di->io_handle());
497 if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
498 ASSERT(!di->IsListeningSocket());
499 // Close the socket for reading.
500 LOG_INFO("\tSHUT_RD: %ld\n", fd);
501 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_RD));
502 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
503 ASSERT(!di->IsListeningSocket());
504 // Close the socket for writing.
505 LOG_INFO("\tSHUT_WR: %ld\n", fd);
506 VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_WR));
507 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
508 // Close the socket and free system resources and move on to next
509 // message.
510 const intptr_t old_mask = di->Mask();
511 Dart_Port port = msg->dart_port;
512 if (port != ILLEGAL_PORT) {
513 di->RemovePort(port);
514 }
515 const intptr_t new_mask = di->Mask();
516 UpdatePort(old_mask, di);
517
518 LOG_INFO("\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask);
519 if (di->IsListeningSocket()) {
520 // We only close the socket file descriptor from the operating
521 // system if there are no other dart socket objects which
522 // are listening on the same (address, port) combination.
523 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
524
525 MutexLocker locker(registry->mutex());
526
527 if (registry->CloseSafe(socket)) {
528 ASSERT(new_mask == 0);
529 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
530 di->Close();
531 delete di;
532 socket->CloseFd();
533 }
534 socket->SetClosedFd();
535 } else {
536 ASSERT(new_mask == 0);
537 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
538 di->Close();
539 delete di;
540 socket->CloseFd();
541 }
542 if (port != 0) {
543 const bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
544 if (!success) {
545 LOG_INFO("Failed to post destroy event to port %ld\n", port);
546 }
547 }
548 } else if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
549 const int count = TOKEN_COUNT(msg->data);
550 const intptr_t old_mask = di->Mask();
551 LOG_INFO("\t Return Token: %ld: %lx\n", fd, old_mask);
552 di->ReturnTokens(msg->dart_port, count);
553 UpdatePort(old_mask, di);
554 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
555 // `events` can only have kInEvent/kOutEvent flags set.
556 const intptr_t events = msg->data & EVENT_MASK;
557 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
558
559 const intptr_t old_mask = di->Mask();
560 LOG_INFO("\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask,
561 msg->data & EVENT_MASK);
562 di->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
563 UpdatePort(old_mask, di);
564 } else {
565 UNREACHABLE();
566 }
567}
568
569void EventHandlerImplementation::HandlePacket(zx_port_packet_t* pkt) {
570 if (pkt->key == 0) {
571 LOG_ERR("HandlePacket called with pkt->key==0");
572 return;
573 }
574 LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key);
575 LOG_INFO("HandlePacket: Got event packet: type=%x\n", pkt->type);
576 LOG_INFO("HandlePacket: Got event packet: status=%d\n", pkt->status);
577
578 if (pkt->type == ZX_PKT_TYPE_USER) {
579 ASSERT(pkt->key == kInterruptPacketKey);
580 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user);
581 HandleInterrupt(msg);
582 return;
583 }
584
585 if (pkt->type != ZX_PKT_TYPE_SIGNAL_ONE) {
586 LOG_ERR("HandlePacket: Got unexpected packet type: key=%x\n", pkt->type);
587 return;
588 }
589
590 // Handle pkt->type == ZX_PKT_TYPE_SIGNAL_ONE
591 LOG_INFO("HandlePacket: Got event packet: observed = %x\n",
592 pkt->signal.observed);
593 LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count);
594 DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(pkt->key);
595 zx_signals_t observed = pkt->signal.observed;
596 const intptr_t old_mask = di->Mask();
597 const uint32_t epoll_event = di->io_handle()->WaitEnd(observed);
598 intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event);
599 if ((event_mask & (1 << kErrorEvent)) != 0) {
600 di->NotifyAllDartPorts(event_mask);
601 } else if (event_mask != 0) {
602 event_mask = di->io_handle()->ToggleEvents(event_mask);
603 if (event_mask != 0) {
604 Dart_Port port = di->NextNotifyDartPort(event_mask);
605 ASSERT(port != 0);
606 bool success = DartUtils::PostInt32(port, event_mask);
607 if (!success) {
608 // This can happen if e.g. the isolate that owns the port has died
609 // for some reason.
610 LOG_INFO("Failed to post event to port %ld\n", port);
611 }
612 }
613 }
614 UpdatePort(old_mask, di);
615}
616
617int64_t EventHandlerImplementation::GetTimeout() const {
618 if (!timeout_queue_.HasTimeout()) {
619 return kInfinityTimeout;
620 }
621 int64_t millis =
622 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
623 return (millis < 0) ? 0 : millis;
624}
625
626void EventHandlerImplementation::HandleTimeout() {
627 if (timeout_queue_.HasTimeout()) {
628 int64_t millis = timeout_queue_.CurrentTimeout() -
629 TimerUtils::GetCurrentMonotonicMillis();
630 if (millis <= 0) {
631 DartUtils::PostNull(timeout_queue_.CurrentPort());
632 timeout_queue_.RemoveCurrent();
633 }
634 }
635}
636
637void EventHandlerImplementation::Poll(uword args) {
638 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
639 EventHandlerImplementation* handler_impl = &handler->delegate_;
640 ASSERT(handler_impl != nullptr);
641
642 zx_port_packet_t pkt;
643 while (!handler_impl->shutdown_) {
644 int64_t millis = handler_impl->GetTimeout();
645 ASSERT((millis == kInfinityTimeout) || (millis >= 0));
646
647 LOG_INFO("zx_port_wait(millis = %ld)\n", millis);
648 zx_status_t status = zx_port_wait(handler_impl->port_handle_,
649 millis == kInfinityTimeout
650 ? ZX_TIME_INFINITE
651 : zx_deadline_after(ZX_MSEC(millis)),
652 &pkt);
653 if (status == ZX_ERR_TIMED_OUT) {
654 handler_impl->HandleTimeout();
655 } else if (status != ZX_OK) {
656 FATAL("zx_port_wait failed: %s\n", zx_status_get_string(status));
657 } else {
658 handler_impl->HandleTimeout();
659 handler_impl->HandlePacket(&pkt);
660 }
661 }
662 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
663 handler->NotifyShutdownDone();
664}
665
666void EventHandlerImplementation::Start(EventHandler* handler) {
667 int result =
668 Thread::Start("dart:io EventHandler", &EventHandlerImplementation::Poll,
669 reinterpret_cast<uword>(handler));
670 if (result != 0) {
671 FATAL("Failed to start event handler thread %d", result);
672 }
673}
674
675void EventHandlerImplementation::Shutdown() {
676 SendData(kShutdownId, 0, 0);
677}
678
679void EventHandlerImplementation::SendData(intptr_t id,
680 Dart_Port dart_port,
681 int64_t data) {
682 WakeupHandler(id, dart_port, data);
683}
684
685void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
686 // The hashmap does not support keys with value 0.
687 return reinterpret_cast<void*>(fd + 1);
688}
689
690uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
691 // The hashmap does not support keys with value 0.
692 return dart::Utils::WordHash(fd + 1);
693}
694
695} // namespace bin
696} // namespace dart
697
698#endif // defined(DART_HOST_OS_FUCHSIA)
static void info(const char *fmt,...) SK_PRINTF_LIKE(1
Definition DM.cpp:213
int count
#define LOG_INFO(...)
static bool read(SkStream *stream, void *buffer, size_t amount)
#define UNREACHABLE()
Definition assert.h:248
#define DEBUG_ASSERT(cond)
Definition assert.h:321
#define COMPILE_ASSERT(expr)
Definition assert.h:339
static uint32_t WordHash(intptr_t key)
Definition utils.cc:217
static intptr_t AvailableBytes(intptr_t fd)
bool AsyncWait(zx_handle_t port, uint32_t events, uint64_t key)
intptr_t Write(const void *buffer, intptr_t num_bytes)
uint32_t WaitEnd(zx_signals_t observed)
static intptr_t EpollEventsToMask(intptr_t events)
intptr_t AvailableBytes()
intptr_t ToggleEvents(intptr_t event_mask)
intptr_t Read(void *buffer, intptr_t num_bytes)
intptr_t Accept(struct sockaddr *addr, socklen_t *addrlen)
void CancelWait(zx_handle_t port, uint64_t key)
uint32_t MaskToEpollEvents(intptr_t mask)
#define ILLEGAL_PORT
Definition dart_api.h:1530
int64_t Dart_Port
Definition dart_api.h:1524
#define ASSERT(E)
#define IS_COMMAND(data, command_bit)
#define EVENT_MASK
#define IS_LISTENING_SOCKET(data)
#define COMMAND_MASK
#define TOKEN_COUNT(data)
#define FATAL(error)
G_BEGIN_DECLS G_MODULE_EXPORT FlValue * args
static const uint8_t buffer[]
static guint signals[kSignalLastSignal]
GAsyncResult * result
int SendData(MHD_Connection *connection, const SkData *data, const char *type, bool setContentDisposition, const char *dispositionString)
Definition Response.cpp:64
uintptr_t uword
Definition globals.h:501
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
Definition switches.h:87
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot data
Definition switches.h:41
#define NO_RETRY_EXPECTED(expression)
#define VOID_NO_RETRY_EXPECTED(expression)
void write(SkWStream *wStream, const T &text)
Definition skqp.cpp:188
const uintptr_t id