6#if defined(DART_HOST_OS_FUCHSIA)
17#include <sys/socket.h>
20#include <zircon/status.h>
21#include <zircon/syscalls.h>
22#include <zircon/syscalls/object.h>
23#include <zircon/syscalls/port.h>
54#define EVENTHANDLER_LOG_ERROR 1
55#if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
56#define LOG_ERR(msg, ...) \
59 Syslog::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, \
60 __LINE__, ##__VA_ARGS__); \
63#if defined(EVENTHANDLER_LOG_INFO)
64#define LOG_INFO(msg, ...) \
65 Syslog::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__, \
68#define LOG_INFO(msg, ...)
71#define LOG_ERR(msg, ...)
72#define LOG_INFO(msg, ...)
79 MutexLocker ml(&mutex_);
81 const int err = errno;
82 LOG_INFO(
"IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes);
87 (available_bytes_ >= read_bytes) ? read_bytes : available_bytes_;
95 if ((available_bytes_ == 0) || (read_bytes < 0)) {
97 read_events_enabled_ =
true;
99 LOG_ERR(
"IOHandle::Read calling AsyncWaitLocked with wait_key_ == 0");
101 if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLIN, wait_key_)) {
102 LOG_ERR(
"IOHandle::AsyncWait failed for fd = %ld\n", fd_);
111 MutexLocker ml(&mutex_);
112 const ssize_t written_bytes =
114 const int err = errno;
115 LOG_INFO(
"IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes);
118 write_events_enabled_ =
true;
119 if (wait_key_ == 0) {
120 LOG_ERR(
"IOHandle::Write calling AsyncWaitLocked with wait_key_ == 0");
122 if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLOUT, wait_key_)) {
123 LOG_ERR(
"IOHandle::AsyncWait failed for fd = %ld\n", fd_);
127 return written_bytes;
131 MutexLocker ml(&mutex_);
133 const int err = errno;
134 LOG_INFO(
"IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket);
137 read_events_enabled_ =
true;
138 if (wait_key_ == 0) {
139 LOG_ERR(
"IOHandle::Accept calling AsyncWaitLocked with wait_key_ == 0");
141 if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLIN, wait_key_)) {
142 LOG_ERR(
"IOHandle::AsyncWait failed for fd = %ld\n", fd_);
150 MutexLocker ml(&mutex_);
153 LOG_INFO(
"IOHandle::AvailableBytes(): fd = %ld, bytes = %ld\n", fd_,
160 available_bytes_ = available;
165 MutexLocker ml(&mutex_);
170 MutexLocker ml(&mutex_);
176 if (close_events_enabled_) {
179 if (read_events_enabled_ && ((mask & (1 <<
kInEvent)) != 0)) {
182 if (write_events_enabled_ && ((mask & (1 <<
kOutEvent)) != 0)) {
189 if ((events & POLLERR) != 0) {
191 return ((events & POLLIN) != 0) ? (1 <<
kErrorEvent) : 0;
193 intptr_t event_mask = 0;
194 if ((events & POLLIN) != 0) {
197 if ((events & POLLOUT) != 0) {
200 if ((events & (POLLHUP | POLLRDHUP)) != 0) {
206bool IOHandle::AsyncWaitLocked(zx_handle_t
port,
209 LOG_INFO(
"IOHandle::AsyncWaitLocked: fd = %ld\n", fd_);
211 LOG_ERR(
"IOHandle::AsyncWaitLocked called with key == 0");
215 if (fdio_ ==
nullptr) {
216 LOG_ERR(
"fdio_unsafe_fd_to_io(%ld) returned nullptr\n", fd_);
222 fdio_unsafe_wait_begin(fdio_, events, &handle, &
signals);
223 if (handle == ZX_HANDLE_INVALID) {
224 LOG_ERR(
"fd = %ld fdio_unsafe_wait_begin returned an invalid handle\n",
231 ASSERT((
port != ZX_HANDLE_INVALID) || (port_ != ZX_HANDLE_INVALID));
232 if ((port_ == ZX_HANDLE_INVALID) || (
port != ZX_HANDLE_INVALID)) {
238 LOG_INFO(
"zx_object_wait_async(fd = %ld, signals = %x)\n", fd_,
signals);
240 zx_object_wait_async(handle_, port_,
key,
signals, ZX_WAIT_ASYNC_ONCE);
241 if (status != ZX_OK) {
242 LOG_ERR(
"zx_object_wait_async failed: %s\n", zx_status_get_string(status));
250 MutexLocker ml(&mutex_);
251 return AsyncWaitLocked(
port, events,
key);
255 MutexLocker ml(&mutex_);
256 LOG_INFO(
"IOHandle::CancelWait: fd = %ld\n", fd_);
258 ASSERT(handle_ != ZX_HANDLE_INVALID);
260 LOG_ERR(
"IOHandle::CancelWait calling zx_port_cancel with key == 0");
262 zx_status_t status = zx_port_cancel(
port, handle_,
key);
263 if ((status != ZX_OK) && (status != ZX_ERR_NOT_FOUND)) {
264 LOG_ERR(
"zx_port_cancel failed: %s\n", zx_status_get_string(status));
269 MutexLocker ml(&mutex_);
271 fdio_unsafe_wait_end(fdio_, observed, &events);
272 LOG_INFO(
"IOHandle::WaitEnd: fd = %ld, events = %x\n", fd_, events);
282 MutexLocker ml(&mutex_);
285 if (!write_events_enabled_) {
287 "IOHandle::ToggleEvents: fd = %ld "
288 "de-asserting kOutEvent\n",
290 event_mask = event_mask & ~(1 <<
kOutEvent);
294 if ((event_mask & (1 <<
kOutEvent)) != 0) {
296 "IOHandle::ToggleEvents: fd = %ld "
297 "asserting kOutEvent and disabling\n",
299 write_events_enabled_ =
false;
304 if (!read_events_enabled_) {
306 "IOHandle::ToggleEvents: fd = %ld "
307 "de-asserting kInEvent\n",
309 event_mask = event_mask & ~(1 <<
kInEvent);
326 if ((event_mask & (1 <<
kInEvent)) != 0) {
329 "IOHandle::ToggleEvents: fd = %ld "
330 "asserting kInEvent and disabling with bytes available\n",
332 read_events_enabled_ =
false;
338 "IOHandle::ToggleEvents: fd = %ld "
339 "asserting kInEvent and disabling due to a close event\n",
341 read_events_enabled_ =
false;
347 if (!close_events_enabled_) {
349 "IOHandle::ToggleEvents: fd = %ld "
350 "de-asserting kCloseEvent\n",
359 "IOHandle::ToggleEvents: fd = %ld "
360 "asserting kCloseEvent and disabling\n",
362 close_events_enabled_ =
false;
367void EventHandlerImplementation::AddToPort(zx_handle_t port_handle,
368 DescriptorInfo* di) {
369 const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask());
370 const uint64_t
key =
reinterpret_cast<uint64_t
>(di);
373 "EventHandlerImplementation::AddToPort calling AsyncWait with key == "
376 if (!di->io_handle()->AsyncWait(port_handle, events,
key)) {
381void EventHandlerImplementation::RemoveFromPort(zx_handle_t port_handle,
382 DescriptorInfo* di) {
383 const uint64_t
key =
reinterpret_cast<uint64_t
>(di);
386 "EventHandlerImplementation::RemoveFromPort calling CancelWait with "
389 di->io_handle()->CancelWait(port_handle,
key);
393 : socket_map_(&SimpleHashMap::SamePointerValue, 16) {
396 port_handle_ = ZX_HANDLE_INVALID;
397 zx_status_t status = zx_port_create(0, &port_handle_);
398 if (status != ZX_OK) {
401 FATAL(
"zx_port_create failed: %s\n", zx_status_get_string(status));
403 ASSERT(port_handle_ != ZX_HANDLE_INVALID);
406static void DeleteDescriptorInfo(
void*
info) {
407 DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*
>(
info);
408 LOG_INFO(
"Closed %ld\n", di->io_handle()->fd());
413EventHandlerImplementation::~EventHandlerImplementation() {
414 socket_map_.Clear(DeleteDescriptorInfo);
415 zx_handle_close(port_handle_);
416 port_handle_ = ZX_HANDLE_INVALID;
419void EventHandlerImplementation::UpdatePort(intptr_t old_mask,
420 DescriptorInfo* di) {
421 const intptr_t new_mask = di->Mask();
422 if ((old_mask != 0) && (new_mask == 0)) {
423 RemoveFromPort(port_handle_, di);
424 }
else if ((old_mask == 0) && (new_mask != 0)) {
425 AddToPort(port_handle_, di);
426 }
else if ((old_mask != 0) && (new_mask != 0)) {
427 ASSERT((old_mask == new_mask) || !di->IsListeningSocket());
428 RemoveFromPort(port_handle_, di);
429 AddToPort(port_handle_, di);
433DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
436 IOHandle* handle =
reinterpret_cast<IOHandle*
>(fd);
437 ASSERT(handle->fd() >= 0);
439 socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()),
440 GetHashmapHashFromFd(handle->fd()),
true);
442 DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*
>(entry->value);
447 di =
new DescriptorInfoMultiple(fd);
449 di =
new DescriptorInfoSingle(fd);
457void EventHandlerImplementation::WakeupHandler(intptr_t
id,
460 COMPILE_ASSERT(
sizeof(InterruptMessage) <=
sizeof(zx_packet_user_t));
461 zx_port_packet_t pkt;
462 InterruptMessage* msg =
reinterpret_cast<InterruptMessage*
>(&pkt.user);
463 pkt.key = kInterruptPacketKey;
465 msg->dart_port = dart_port;
467 zx_status_t status = zx_port_queue(port_handle_, &pkt);
468 if (status != ZX_OK) {
471 FATAL(
"zx_port_queue failed: %s\n", zx_status_get_string(status));
475void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
477 LOG_INFO(
"HandleInterrupt read timer update\n");
478 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
481 LOG_INFO(
"HandleInterrupt read shutdown\n");
486 LOG_INFO(
"HandleInterrupt command:\n");
487 Socket* socket =
reinterpret_cast<Socket*
>(msg->id);
488 RefCntReleaseScope<Socket> rs(socket);
489 if (socket->fd() == -1) {
492 IOHandle* io_handle =
reinterpret_cast<IOHandle*
>(socket->fd());
493 const intptr_t fd = io_handle->fd();
496 ASSERT(io_handle == di->io_handle());
498 ASSERT(!di->IsListeningSocket());
503 ASSERT(!di->IsListeningSocket());
510 const intptr_t old_mask = di->Mask();
513 di->RemovePort(
port);
515 const intptr_t new_mask = di->Mask();
516 UpdatePort(old_mask, di);
518 LOG_INFO(
"\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask);
519 if (di->IsListeningSocket()) {
523 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
525 MutexLocker locker(registry->mutex());
527 if (registry->CloseSafe(socket)) {
529 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
534 socket->SetClosedFd();
537 socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
545 LOG_INFO(
"Failed to post destroy event to port %ld\n",
port);
550 const intptr_t old_mask = di->Mask();
551 LOG_INFO(
"\t Return Token: %ld: %lx\n", fd, old_mask);
552 di->ReturnTokens(msg->dart_port,
count);
553 UpdatePort(old_mask, di);
556 const intptr_t events = msg->data &
EVENT_MASK;
559 const intptr_t old_mask = di->Mask();
560 LOG_INFO(
"\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask,
562 di->SetPortAndMask(msg->dart_port, msg->data &
EVENT_MASK);
563 UpdatePort(old_mask, di);
569void EventHandlerImplementation::HandlePacket(zx_port_packet_t* pkt) {
571 LOG_ERR(
"HandlePacket called with pkt->key==0");
574 LOG_INFO(
"HandlePacket: Got event packet: key=%lx\n", pkt->key);
575 LOG_INFO(
"HandlePacket: Got event packet: type=%x\n", pkt->type);
576 LOG_INFO(
"HandlePacket: Got event packet: status=%d\n", pkt->status);
578 if (pkt->type == ZX_PKT_TYPE_USER) {
579 ASSERT(pkt->key == kInterruptPacketKey);
580 InterruptMessage* msg =
reinterpret_cast<InterruptMessage*
>(&pkt->user);
581 HandleInterrupt(msg);
585 if (pkt->type != ZX_PKT_TYPE_SIGNAL_ONE) {
586 LOG_ERR(
"HandlePacket: Got unexpected packet type: key=%x\n", pkt->type);
591 LOG_INFO(
"HandlePacket: Got event packet: observed = %x\n",
592 pkt->signal.observed);
593 LOG_INFO(
"HandlePacket: Got event packet: count = %ld\n", pkt->signal.count);
594 DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*
>(pkt->key);
595 zx_signals_t observed = pkt->signal.observed;
596 const intptr_t old_mask = di->Mask();
597 const uint32_t epoll_event = di->io_handle()->WaitEnd(observed);
598 intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event);
600 di->NotifyAllDartPorts(event_mask);
601 }
else if (event_mask != 0) {
602 event_mask = di->io_handle()->ToggleEvents(event_mask);
603 if (event_mask != 0) {
606 bool success = DartUtils::PostInt32(
port, event_mask);
614 UpdatePort(old_mask, di);
617int64_t EventHandlerImplementation::GetTimeout()
const {
618 if (!timeout_queue_.HasTimeout()) {
622 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
623 return (millis < 0) ? 0 : millis;
626void EventHandlerImplementation::HandleTimeout() {
627 if (timeout_queue_.HasTimeout()) {
628 int64_t millis = timeout_queue_.CurrentTimeout() -
629 TimerUtils::GetCurrentMonotonicMillis();
631 DartUtils::PostNull(timeout_queue_.CurrentPort());
632 timeout_queue_.RemoveCurrent();
637void EventHandlerImplementation::Poll(
uword args) {
638 EventHandler* handler =
reinterpret_cast<EventHandler*
>(
args);
639 EventHandlerImplementation* handler_impl = &handler->delegate_;
640 ASSERT(handler_impl !=
nullptr);
642 zx_port_packet_t pkt;
643 while (!handler_impl->shutdown_) {
644 int64_t millis = handler_impl->GetTimeout();
647 LOG_INFO(
"zx_port_wait(millis = %ld)\n", millis);
648 zx_status_t status = zx_port_wait(handler_impl->port_handle_,
651 : zx_deadline_after(ZX_MSEC(millis)),
653 if (status == ZX_ERR_TIMED_OUT) {
654 handler_impl->HandleTimeout();
655 }
else if (status != ZX_OK) {
656 FATAL(
"zx_port_wait failed: %s\n", zx_status_get_string(status));
658 handler_impl->HandleTimeout();
659 handler_impl->HandlePacket(&pkt);
662 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
663 handler->NotifyShutdownDone();
666void EventHandlerImplementation::Start(EventHandler* handler) {
668 Thread::Start(
"dart:io EventHandler", &EventHandlerImplementation::Poll,
669 reinterpret_cast<uword>(handler));
671 FATAL(
"Failed to start event handler thread %d",
result);
682 WakeupHandler(
id, dart_port,
data);
685void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
687 return reinterpret_cast<void*
>(fd + 1);
690uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
static void info(const char *fmt,...) SK_PRINTF_LIKE(1
static bool read(SkStream *stream, void *buffer, size_t amount)
#define DEBUG_ASSERT(cond)
#define COMPILE_ASSERT(expr)
static uint32_t WordHash(intptr_t key)
EventHandlerImplementation()
static intptr_t AvailableBytes(intptr_t fd)
bool AsyncWait(zx_handle_t port, uint32_t events, uint64_t key)
intptr_t Write(const void *buffer, intptr_t num_bytes)
uint32_t WaitEnd(zx_signals_t observed)
static intptr_t EpollEventsToMask(intptr_t events)
intptr_t AvailableBytes()
intptr_t ToggleEvents(intptr_t event_mask)
intptr_t Read(void *buffer, intptr_t num_bytes)
intptr_t Accept(struct sockaddr *addr, socklen_t *addrlen)
void CancelWait(zx_handle_t port, uint64_t key)
uint32_t MaskToEpollEvents(intptr_t mask)
#define IS_COMMAND(data, command_bit)
#define IS_LISTENING_SOCKET(data)
#define TOKEN_COUNT(data)
G_BEGIN_DECLS G_MODULE_EXPORT FlValue * args
static guint signals[kSignalLastSignal]
int SendData(MHD_Connection *connection, const SkData *data, const char *type, bool setContentDisposition, const char *dispositionString)
static constexpr intptr_t kTimerId
static constexpr intptr_t kShutdownId
static constexpr intptr_t kInfinityTimeout
static void Shutdown(Dart_NativeArguments args)
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service A custom Dart VM Service port The default is to pick a randomly available open port disable vm Disable the Dart VM Service The Dart VM Service is never available in release mode disable vm service Disable mDNS Dart VM Service publication Bind to the IPv6 localhost address for the Dart VM Service Ignored if vm service host is set endless trace buffer
#define NO_RETRY_EXPECTED(expression)
#define VOID_NO_RETRY_EXPECTED(expression)
void write(SkWStream *wStream, const T &text)
std::shared_ptr< const fml::Mapping > data