Flutter Engine
The Flutter Engine
eventhandler_win.cc
Go to the documentation of this file.
1// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "platform/globals.h"
6#if defined(DART_HOST_OS_WINDOWS)
7
8#include "bin/eventhandler.h"
10
11#include <fcntl.h> // NOLINT
12#include <io.h> // NOLINT
13#include <mswsock.h> // NOLINT
14#include <winsock2.h> // NOLINT
15#include <ws2tcpip.h> // NOLINT
16#include <utility>
17
18#include "bin/builtin.h"
19#include "bin/dartutils.h"
20#include "bin/lockers.h"
21#include "bin/process.h"
22#include "bin/socket.h"
23#include "bin/thread.h"
24#include "bin/utils.h"
25#include "platform/syslog.h"
26
27#include "platform/utils.h"
28
29namespace dart {
30namespace bin {
31
32// kBufferSize must be >= kMaxUDPPackageLength so that a complete UDP packet
33// can fit in the buffer.
34static constexpr int kBufferSize = 64 * 1024;
35static constexpr int kStdOverlappedBufferSize = 16 * 1024;
36static constexpr int kMaxUDPPackageLength = 64 * 1024;
37// For AcceptEx there needs to be buffer storage for address
38// information for two addresses (local and remote address). The
39// AcceptEx documentation says: "This value must be at least 16
40// bytes more than the maximum address length for the transport
41// protocol in use."
42static constexpr int kAcceptExAddressAdditionalBytes = 16;
43static constexpr int kAcceptExAddressStorageSize =
44 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes;
45
46OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size,
47 Operation operation) {
48 OverlappedBuffer* buffer =
49 new (buffer_size) OverlappedBuffer(buffer_size, operation);
50 return buffer;
51}
52
54 OverlappedBuffer* buffer = AllocateBuffer(buffer_size, kAccept);
55 return buffer;
56}
57
59 return AllocateBuffer(buffer_size, kRead);
60}
61
63 // For calling recvfrom additional buffer space is needed for the source
64 // address information.
65 buffer_size += sizeof(socklen_t) + sizeof(struct sockaddr_storage);
66 return AllocateBuffer(buffer_size, kRecvFrom);
67}
68
70 return AllocateBuffer(buffer_size, kWrite);
71}
72
74 return AllocateBuffer(buffer_size, kSendTo);
75}
76
78 return AllocateBuffer(0, kDisconnect);
79}
80
82 return AllocateBuffer(0, kConnect);
83}
84
85void OverlappedBuffer::DisposeBuffer(OverlappedBuffer* buffer) {
86 delete buffer;
87}
88
89OverlappedBuffer* OverlappedBuffer::GetFromOverlapped(OVERLAPPED* overlapped) {
90 OverlappedBuffer* buffer =
91 CONTAINING_RECORD(overlapped, OverlappedBuffer, overlapped_);
92 return buffer;
93}
94
95int OverlappedBuffer::Read(void* buffer, int num_bytes) {
96 if (num_bytes > GetRemainingLength()) {
97 num_bytes = GetRemainingLength();
98 }
99 memmove(buffer, GetBufferStart() + index_, num_bytes);
100 index_ += num_bytes;
101 return num_bytes;
102}
103
104int OverlappedBuffer::Write(const void* buffer, int num_bytes) {
105 ASSERT(num_bytes == buflen_);
106 memmove(GetBufferStart(), buffer, num_bytes);
107 data_length_ = num_bytes;
108 return num_bytes;
109}
110
112 ASSERT(operation_ == kRead || operation_ == kRecvFrom);
113 return data_length_ - index_;
114}
115
116Handle::Handle(intptr_t handle)
117 : ReferenceCounted(),
118 DescriptorInfoBase(handle),
119 monitor_(),
120 handle_(reinterpret_cast<HANDLE>(handle)),
121 completion_port_(INVALID_HANDLE_VALUE),
122 event_handler_(nullptr),
123 data_ready_(),
124 pending_read_(nullptr),
125 pending_write_(nullptr),
126 last_error_(NOERROR),
127 read_thread_id_(Thread::kInvalidThreadId),
128 read_thread_handle_(nullptr),
129 read_thread_starting_(false),
130 read_thread_finished_(false),
131 flags_(0) {}
132
133Handle::~Handle() {}
134
135bool Handle::CreateCompletionPort(HANDLE completion_port) {
136 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
137 // A reference to the Handle is Retained by the IO completion port.
138 // It is Released by DeleteIfClosed.
139 Retain();
140 completion_port_ = CreateIoCompletionPort(
141 handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0);
142 return (completion_port_ != nullptr);
143}
144
145void Handle::Close() {
146 MonitorLocker ml(&monitor_);
147 if (!SupportsOverlappedIO()) {
148 // If the handle uses synchronous I/O (e.g. stdin), cancel any pending
149 // operation before closing the handle, so the read thread is not blocked.
150 BOOL result = CancelIoEx(handle_, nullptr);
151 ASSERT(result || (GetLastError() == ERROR_NOT_FOUND));
152 }
153 if (!IsClosing()) {
154 // Close the socket and set the closing state. This close method can be
155 // called again if this socket has pending IO operations in flight.
156 MarkClosing();
157 // Perform handle type specific closing.
158 DoClose();
159 }
160 ASSERT(IsHandleClosed());
161}
162
163void Handle::DoClose() {
164 if (!IsHandleClosed()) {
165 CloseHandle(handle_);
167 }
168}
169
170bool Handle::HasPendingRead() {
171 return pending_read_ != nullptr;
172}
173
174bool Handle::HasPendingWrite() {
175 return pending_write_ != nullptr;
176}
177
178void Handle::WaitForReadThreadStarted() {
179 MonitorLocker ml(&monitor_);
180 while (read_thread_starting_) {
181 ml.Wait();
182 }
183}
184
185void Handle::WaitForReadThreadFinished() {
186 HANDLE to_join = nullptr;
187 {
188 MonitorLocker ml(&monitor_);
189 if (read_thread_id_ != Thread::kInvalidThreadId) {
190 while (!read_thread_finished_) {
191 ml.Wait();
192 }
193 read_thread_finished_ = false;
194 read_thread_id_ = Thread::kInvalidThreadId;
195 to_join = read_thread_handle_;
196 read_thread_handle_ = nullptr;
197 }
198 }
199 if (to_join != nullptr) {
200 // Join the read thread.
201 DWORD res = WaitForSingleObject(to_join, INFINITE);
202 CloseHandle(to_join);
203 ASSERT(res == WAIT_OBJECT_0);
204 }
205}
206
207void Handle::ReadComplete(OverlappedBuffer* buffer) {
208 WaitForReadThreadStarted();
209 {
210 MonitorLocker ml(&monitor_);
211 // Currently only one outstanding read at the time.
212 ASSERT(pending_read_ == buffer);
213 ASSERT(data_ready_ == nullptr);
214 if (!IsClosing()) {
215 data_ready_.reset(pending_read_);
216 } else {
217 OverlappedBuffer::DisposeBuffer(buffer);
218 }
219 pending_read_ = nullptr;
220 }
221 WaitForReadThreadFinished();
222}
223
224void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
225 ReadComplete(buffer);
226}
227
228void Handle::WriteComplete(OverlappedBuffer* buffer) {
229 MonitorLocker ml(&monitor_);
230 // Currently only one outstanding write at the time.
231 ASSERT(pending_write_ == buffer);
232 OverlappedBuffer::DisposeBuffer(buffer);
233 pending_write_ = nullptr;
234}
235
236static void ReadFileThread(uword args) {
237 Handle* handle = reinterpret_cast<Handle*>(args);
238 handle->ReadSyncCompleteAsync();
239}
240
241void Handle::NotifyReadThreadStarted() {
242 MonitorLocker ml(&monitor_);
243 ASSERT(read_thread_starting_);
244 ASSERT(read_thread_id_ == Thread::kInvalidThreadId);
245 read_thread_id_ = Thread::GetCurrentThreadId();
246 read_thread_handle_ = OpenThread(SYNCHRONIZE, false, read_thread_id_);
247 read_thread_starting_ = false;
248 ml.Notify();
249}
250
251void Handle::NotifyReadThreadFinished() {
252 MonitorLocker ml(&monitor_);
253 ASSERT(!read_thread_finished_);
254 ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
255 read_thread_finished_ = true;
256 ml.Notify();
257}
258
259void Handle::ReadSyncCompleteAsync() {
260 NotifyReadThreadStarted();
261 ASSERT(HasPendingRead());
262 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
263
264 DWORD buffer_size = pending_read_->GetBufferSize();
265 if (GetFileType(handle_) == FILE_TYPE_CHAR) {
266 buffer_size = kStdOverlappedBufferSize;
267 }
268 char* buffer_start = pending_read_->GetBufferStart();
269 DWORD bytes_read = 0;
270 BOOL ok = ReadFile(handle_, buffer_start, buffer_size, &bytes_read, nullptr);
271 if (!ok) {
272 bytes_read = 0;
273 }
274 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped();
275 ok =
276 PostQueuedCompletionStatus(event_handler_->completion_port(), bytes_read,
277 reinterpret_cast<ULONG_PTR>(this), overlapped);
278 if (!ok) {
279 FATAL("PostQueuedCompletionStatus failed");
280 }
281 NotifyReadThreadFinished();
282}
283
284bool Handle::IssueRead() {
285 ASSERT(type_ != kListenSocket);
286 ASSERT(!HasPendingRead());
287 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
288 if (SupportsOverlappedIO()) {
289 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
290
291 BOOL ok =
292 ReadFile(handle_, buffer->GetBufferStart(), buffer->GetBufferSize(),
293 nullptr, buffer->GetCleanOverlapped());
294 if (ok || (GetLastError() == ERROR_IO_PENDING)) {
295 // Completing asynchronously.
296 pending_read_ = buffer;
297 return true;
298 }
299 OverlappedBuffer::DisposeBuffer(buffer);
300 HandleIssueError();
301 return false;
302 } else {
303 // Completing asynchronously through thread.
304 pending_read_ = buffer;
305 read_thread_starting_ = true;
306 int result = Thread::Start("dart:io ReadFile", ReadFileThread,
307 reinterpret_cast<uword>(this));
308 if (result != 0) {
309 FATAL("Failed to start read file thread %d", result);
310 }
311 return true;
312 }
313}
314
315bool Handle::IssueRecvFrom() {
316 return false;
317}
318
319bool Handle::IssueWrite() {
320 MonitorLocker ml(&monitor_);
321 ASSERT(type_ != kListenSocket);
322 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
323 ASSERT(HasPendingWrite());
324 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
325
326 OverlappedBuffer* buffer = pending_write_;
327 BOOL ok =
328 WriteFile(handle_, buffer->GetBufferStart(), buffer->GetBufferSize(),
329 nullptr, buffer->GetCleanOverlapped());
330 if (ok || (GetLastError() == ERROR_IO_PENDING)) {
331 // Completing asynchronously.
332 pending_write_ = buffer;
333 return true;
334 }
335 OverlappedBuffer::DisposeBuffer(buffer);
336 HandleIssueError();
337 return false;
338}
339
340bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
341 return false;
342}
343
344static void HandleClosed(Handle* handle) {
345 if (!handle->IsClosing()) {
346 int event_mask = 1 << kCloseEvent;
347 handle->NotifyAllDartPorts(event_mask);
348 }
349}
350
351static void HandleError(Handle* handle) {
352 handle->set_last_error(WSAGetLastError());
353 handle->MarkError();
354 if (!handle->IsClosing()) {
355 handle->NotifyAllDartPorts(1 << kErrorEvent);
356 }
357}
358
359void Handle::HandleIssueError() {
361 if (error == ERROR_BROKEN_PIPE) {
362 HandleClosed(this);
363 } else {
364 HandleError(this);
365 }
367}
368
369void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
370 MonitorLocker ml(&monitor_);
371 event_handler_ = event_handler;
372 if (completion_port_ == INVALID_HANDLE_VALUE) {
373 if (SupportsOverlappedIO()) {
374 CreateCompletionPort(event_handler_->completion_port());
375 } else {
376 // We need to retain the Handle even if overlapped IO is not supported.
377 // It is Released by DeleteIfClosed after ReadSyncCompleteAsync
378 // manually puts an event on the IO completion port.
379 Retain();
380 completion_port_ = event_handler_->completion_port();
381 }
382 }
383}
384
385bool FileHandle::IsClosed() {
386 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
387}
388
389void DirectoryWatchHandle::EnsureInitialized(
390 EventHandlerImplementation* event_handler) {
391 MonitorLocker ml(&monitor_);
392 event_handler_ = event_handler;
393 if (completion_port_ == INVALID_HANDLE_VALUE) {
394 CreateCompletionPort(event_handler_->completion_port());
395 }
396}
397
398bool DirectoryWatchHandle::IsClosed() {
399 return IsClosing() && !HasPendingRead();
400}
401
402bool DirectoryWatchHandle::IssueRead() {
403 // It may have been started before, as we start the directory-handler when
404 // we create it.
405 if (HasPendingRead() || (data_ready_ != nullptr)) {
406 return true;
407 }
408 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
409 // Set up pending_read_ before ReadDirectoryChangesW because it might be
410 // needed in ReadComplete invoked on event loop thread right away if data is
411 // also ready right away.
412 pending_read_ = buffer;
413 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
414 BOOL ok = ReadDirectoryChangesW(
415 handle_, buffer->GetBufferStart(), buffer->GetBufferSize(), recursive_,
416 events_, nullptr, buffer->GetCleanOverlapped(), nullptr);
417 if (ok || (GetLastError() == ERROR_IO_PENDING)) {
418 // Completing asynchronously.
419 return true;
420 }
421 pending_read_ = nullptr;
422 OverlappedBuffer::DisposeBuffer(buffer);
423 return false;
424}
425
426void DirectoryWatchHandle::Stop() {
427 MonitorLocker ml(&monitor_);
428 // Stop the outstanding read, so we can close the handle.
429
430 if (HasPendingRead()) {
431 CancelIoEx(handle(), pending_read_->GetCleanOverlapped());
432 // Don't dispose of the buffer, as it will still complete (with length 0).
433 }
434
435 DoClose();
436}
437
438void SocketHandle::HandleIssueError() {
439 int error = WSAGetLastError();
440 if (error == WSAECONNRESET) {
441 HandleClosed(this);
442 } else {
443 HandleError(this);
444 }
445 WSASetLastError(error);
446}
447
448bool ListenSocket::LoadAcceptEx() {
449 // Load the AcceptEx function into memory using WSAIoctl.
450 GUID guid_accept_ex = WSAID_ACCEPTEX;
451 DWORD bytes;
452 int status = WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
453 &guid_accept_ex, sizeof(guid_accept_ex), &AcceptEx_,
454 sizeof(AcceptEx_), &bytes, nullptr, nullptr);
455 return (status != SOCKET_ERROR);
456}
457
458bool ListenSocket::LoadGetAcceptExSockaddrs() {
459 // Load the GetAcceptExSockaddrs function into memory using WSAIoctl.
460 GUID guid_get_accept_ex_sockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
461 DWORD bytes;
462 int status =
463 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
464 &guid_get_accept_ex_sockaddrs,
465 sizeof(guid_get_accept_ex_sockaddrs), &GetAcceptExSockaddrs_,
466 sizeof(GetAcceptExSockaddrs_), &bytes, nullptr, nullptr);
467 return (status != SOCKET_ERROR);
468}
469
470bool ListenSocket::IssueAccept() {
471 MonitorLocker ml(&monitor_);
472
473 OverlappedBuffer* buffer =
474 OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize);
475 DWORD received;
476 BOOL ok;
477 ok = AcceptEx_(socket(), buffer->client(), buffer->GetBufferStart(),
478 0, // For now don't receive data with accept.
479 kAcceptExAddressStorageSize, kAcceptExAddressStorageSize,
480 &received, buffer->GetCleanOverlapped());
481 if (!ok) {
482 if (WSAGetLastError() != WSA_IO_PENDING) {
483 int error = WSAGetLastError();
484 closesocket(buffer->client());
485 OverlappedBuffer::DisposeBuffer(buffer);
486 WSASetLastError(error);
487 return false;
488 }
489 }
490
491 pending_accept_count_++;
492
493 return true;
494}
495
496void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
497 HANDLE completion_port) {
498 MonitorLocker ml(&monitor_);
499 if (!IsClosing()) {
500 // Update the accepted socket to support the full range of API calls.
501 SOCKET s = socket();
502 int rc = setsockopt(buffer->client(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
503 reinterpret_cast<char*>(&s), sizeof(s));
504 if (rc == NO_ERROR) {
505 // getpeername() returns incorrect results when used with a socket that
506 // was accepted using overlapped I/O. AcceptEx includes the remote
507 // address in its result so retrieve it using GetAcceptExSockaddrs and
508 // save it.
509 LPSOCKADDR local_addr;
510 int local_addr_length;
511 LPSOCKADDR remote_addr;
512 int remote_addr_length;
513 GetAcceptExSockaddrs_(
514 buffer->GetBufferStart(), 0, kAcceptExAddressStorageSize,
515 kAcceptExAddressStorageSize, &local_addr, &local_addr_length,
516 &remote_addr, &remote_addr_length);
517 RawAddr* raw_remote_addr = new RawAddr;
518 memmove(raw_remote_addr, remote_addr, remote_addr_length);
519
520 // Insert the accepted socket into the list.
521 ClientSocket* client_socket = new ClientSocket(
522 buffer->client(), std::unique_ptr<RawAddr>(raw_remote_addr));
523 client_socket->mark_connected();
524 client_socket->CreateCompletionPort(completion_port);
525 if (accepted_head_ == nullptr) {
526 accepted_head_ = client_socket;
527 accepted_tail_ = client_socket;
528 } else {
529 ASSERT(accepted_tail_ != nullptr);
530 accepted_tail_->set_next(client_socket);
531 accepted_tail_ = client_socket;
532 }
533 accepted_count_++;
534 } else {
535 closesocket(buffer->client());
536 }
537 } else {
538 // Close the socket, as it's already accepted.
539 closesocket(buffer->client());
540 }
541
542 pending_accept_count_--;
543 OverlappedBuffer::DisposeBuffer(buffer);
544}
545
546static void DeleteIfClosed(Handle* handle) {
547 if (handle->IsClosed()) {
548 handle->set_completion_port(INVALID_HANDLE_VALUE);
549 handle->set_event_handler(nullptr);
550 handle->NotifyAllDartPorts(1 << kDestroyedEvent);
551 handle->RemoveAllPorts();
552 // Once the Handle is closed, no further events on the IO completion port
553 // will mention it. Thus, we can drop the reference here.
554 handle->Release();
555 }
556}
557
558void ListenSocket::DoClose() {
559 closesocket(socket());
561 while (CanAccept()) {
562 // Get rid of connections already accepted.
563 ClientSocket* client = Accept();
564 if (client != nullptr) {
565 client->Close();
566 // Release the reference from the list.
567 // When an accept completes, we make a new ClientSocket (1 reference),
568 // and add it to the IO completion port (1 more reference). If an
569 // accepted connection is never requested by the Dart code, then
570 // this list owns a reference (first Release), and the IO completion
571 // port owns a reference, (second Release in DeleteIfClosed).
572 client->Release();
573 DeleteIfClosed(client);
574 } else {
575 break;
576 }
577 }
578 // To finish resetting the state of the ListenSocket back to what it was
579 // before EnsureInitialized was called, we have to reset the AcceptEx_
580 // and GetAcceptExSockaddrs_ function pointers.
581 AcceptEx_ = nullptr;
582 GetAcceptExSockaddrs_ = nullptr;
583}
584
585bool ListenSocket::CanAccept() {
586 MonitorLocker ml(&monitor_);
587 return accepted_head_ != nullptr;
588}
589
590ClientSocket* ListenSocket::Accept() {
591 MonitorLocker ml(&monitor_);
592
593 ClientSocket* result = nullptr;
594
595 if (accepted_head_ != nullptr) {
596 result = accepted_head_;
597 accepted_head_ = accepted_head_->next();
598 if (accepted_head_ == nullptr) {
599 accepted_tail_ = nullptr;
600 }
601 result->set_next(nullptr);
602 accepted_count_--;
603 }
604
605 if (pending_accept_count_ < 5) {
606 // We have less than 5 pending accepts, queue another.
607 if (!IsClosing()) {
608 if (!IssueAccept()) {
609 HandleError(this);
610 }
611 }
612 }
613
614 return result;
615}
616
617void ListenSocket::EnsureInitialized(
618 EventHandlerImplementation* event_handler) {
619 MonitorLocker ml(&monitor_);
620 if (AcceptEx_ == nullptr) {
621 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
622 ASSERT(event_handler_ == nullptr);
623 event_handler_ = event_handler;
624 CreateCompletionPort(event_handler_->completion_port());
625 bool isLoaded = LoadAcceptEx();
626 ASSERT(isLoaded);
627 }
628 if (GetAcceptExSockaddrs_ == nullptr) {
629 bool isLoaded = LoadGetAcceptExSockaddrs();
630 ASSERT(isLoaded);
631 }
632}
633
634bool ListenSocket::IsClosed() {
635 return IsClosing() && !HasPendingAccept();
636}
637
638intptr_t Handle::Available() {
639 MonitorLocker ml(&monitor_);
640 if (data_ready_ == nullptr) {
641 return 0;
642 }
643 return data_ready_->GetRemainingLength();
644}
645
646bool Handle::DataReady() {
647 return data_ready_ != nullptr;
648}
649
650intptr_t Handle::Read(void* buffer, intptr_t num_bytes) {
651 MonitorLocker ml(&monitor_);
652 if (data_ready_ == nullptr) {
653 return 0;
654 }
655 num_bytes =
656 data_ready_->Read(buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
657 if (data_ready_->IsEmpty()) {
658 data_ready_ = nullptr;
659 if (!IsClosing() && !IsClosedRead()) {
660 IssueRead();
661 }
662 }
663 return num_bytes;
664}
665
666intptr_t Handle::RecvFrom(void* buffer,
667 intptr_t num_bytes,
668 struct sockaddr* sa,
669 socklen_t sa_len) {
670 MonitorLocker ml(&monitor_);
671 if (data_ready_ == nullptr) {
672 return 0;
673 }
674 num_bytes =
675 data_ready_->Read(buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
676 if (data_ready_->from()->sa_family == AF_INET) {
677 ASSERT(sa_len >= sizeof(struct sockaddr_in));
678 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in));
679 } else {
680 ASSERT(data_ready_->from()->sa_family == AF_INET6);
681 ASSERT(sa_len >= sizeof(struct sockaddr_in6));
682 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6));
683 }
684 // Always dispose of the buffer, as UDP messages must be read in their
685 // entirety to match how recvfrom works in a socket.
686 data_ready_ = nullptr;
687 if (!IsClosing() && !IsClosedRead()) {
688 IssueRecvFrom();
689 }
690 return num_bytes;
691}
692
693intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) {
694 MonitorLocker ml(&monitor_);
695 if (HasPendingWrite()) {
696 return 0;
697 }
698 if (num_bytes > kBufferSize) {
699 num_bytes = kBufferSize;
700 }
701 ASSERT(SupportsOverlappedIO());
702 if (completion_port_ == INVALID_HANDLE_VALUE) {
703 return 0;
704 }
705 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
706 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
707 pending_write_->Write(buffer, truncated_bytes);
708 if (!IssueWrite()) {
709 return -1;
710 }
711 return truncated_bytes;
712}
713
714intptr_t Handle::SendTo(const void* buffer,
715 intptr_t num_bytes,
716 struct sockaddr* sa,
717 socklen_t sa_len) {
718 MonitorLocker ml(&monitor_);
719 if (HasPendingWrite()) {
720 return 0;
721 }
722 if (num_bytes > kBufferSize) {
723 ASSERT(kBufferSize >= kMaxUDPPackageLength);
724 // The provided buffer is larger than the maximum UDP datagram size so
725 // return an error immediately. If the buffer were larger and the data were
726 // actually passed to `WSASendTo()` then the operation would fail with
727 // ERROR_INVALID_USER_BUFFER anyway.
728 SetLastError(ERROR_INVALID_USER_BUFFER);
729 return -1;
730 }
731 ASSERT(SupportsOverlappedIO());
732 if (completion_port_ == INVALID_HANDLE_VALUE) {
733 return 0;
734 }
735 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes);
736 pending_write_->Write(buffer, num_bytes);
737 if (!IssueSendTo(sa, sa_len)) {
738 return -1;
739 }
740 return num_bytes;
741}
742
743Mutex* StdHandle::stdin_mutex_ = new Mutex();
744StdHandle* StdHandle::stdin_ = nullptr;
745
746StdHandle* StdHandle::Stdin(HANDLE handle) {
747 MutexLocker ml(stdin_mutex_);
748 if (stdin_ == nullptr) {
749 stdin_ = new StdHandle(handle);
750 }
751 return stdin_;
752}
753
754static void WriteFileThread(uword args) {
755 StdHandle* handle = reinterpret_cast<StdHandle*>(args);
756 handle->RunWriteLoop();
757}
758
759void StdHandle::RunWriteLoop() {
760 MonitorLocker ml(&monitor_);
761 write_thread_running_ = true;
762 thread_id_ = Thread::GetCurrentThreadId();
763 thread_handle_ = OpenThread(SYNCHRONIZE, false, thread_id_);
764 // Notify we have started.
765 ml.Notify();
766
767 while (write_thread_running_) {
768 ml.Wait(Monitor::kNoTimeout);
769 if (HasPendingWrite()) {
770 // We woke up and had a pending write. Execute it.
771 WriteSyncCompleteAsync();
772 }
773 }
774
775 write_thread_exists_ = false;
776 ml.Notify();
777}
778
779void StdHandle::WriteSyncCompleteAsync() {
780 ASSERT(HasPendingWrite());
781
782 DWORD bytes_written = -1;
783 BOOL ok = WriteFile(handle_, pending_write_->GetBufferStart(),
784 pending_write_->GetBufferSize(), &bytes_written, nullptr);
785 if (!ok) {
786 bytes_written = 0;
787 }
788 thread_wrote_ += bytes_written;
789 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped();
790 ok = PostQueuedCompletionStatus(
791 event_handler_->completion_port(), bytes_written,
792 reinterpret_cast<ULONG_PTR>(this), overlapped);
793 if (!ok) {
794 FATAL("PostQueuedCompletionStatus failed");
795 }
796}
797
798intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
799 MonitorLocker ml(&monitor_);
800 if (HasPendingWrite()) {
801 return 0;
802 }
803 if (num_bytes > kBufferSize) {
804 num_bytes = kBufferSize;
805 }
806 // In the case of stdout and stderr, OverlappedIO is not supported.
807 // Here we'll instead use a thread, to make it async.
808 // This code is actually never exposed to the user, as stdout and stderr is
809 // not available as a RawSocket, but only wrapped in a Socket.
810 // Note that we return '0', unless a thread have already completed a write.
811 if (thread_wrote_ > 0) {
812 if (num_bytes > thread_wrote_) {
813 num_bytes = thread_wrote_;
814 }
815 thread_wrote_ -= num_bytes;
816 return num_bytes;
817 }
818 if (!write_thread_exists_) {
819 write_thread_exists_ = true;
820 // The write thread gets a reference to the Handle, which it places in
821 // the events it puts on the IO completion port. The reference is
822 // Released by DeleteIfClosed.
823 Retain();
824 int result = Thread::Start("dart:io WriteFile", WriteFileThread,
825 reinterpret_cast<uword>(this));
826 if (result != 0) {
827 FATAL("Failed to start write file thread %d", result);
828 }
829 while (!write_thread_running_) {
830 // Wait until we the thread is running.
831 ml.Wait(Monitor::kNoTimeout);
832 }
833 }
834 // Only queue up to INT_MAX bytes.
835 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
836 // Create buffer and notify thread about the new handle.
837 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
838 pending_write_->Write(buffer, truncated_bytes);
839 ml.Notify();
840 return 0;
841}
842
843void StdHandle::DoClose() {
844 {
845 MonitorLocker ml(&monitor_);
846 if (write_thread_exists_) {
847 write_thread_running_ = false;
848 ml.Notify();
849 while (write_thread_exists_) {
850 ml.Wait(Monitor::kNoTimeout);
851 }
852 // Join the thread.
853 DWORD res = WaitForSingleObject(thread_handle_, INFINITE);
854 CloseHandle(thread_handle_);
855 ASSERT(res == WAIT_OBJECT_0);
856 }
857 Handle::DoClose();
858 }
859 MutexLocker ml(stdin_mutex_);
860 stdin_->Release();
861 StdHandle::stdin_ = nullptr;
862}
863
864#if defined(DEBUG)
865intptr_t ClientSocket::disconnecting_ = 0;
866#endif
867
868bool ClientSocket::LoadDisconnectEx() {
869 // Load the DisconnectEx function into memory using WSAIoctl.
870 GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
871 DWORD bytes;
872 int status =
873 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
874 &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_,
875 sizeof(DisconnectEx_), &bytes, nullptr, nullptr);
876 return (status != SOCKET_ERROR);
877}
878
879void ClientSocket::Shutdown(int how) {
880 int rc = shutdown(socket(), how);
881 if (how == SD_RECEIVE) {
882 MarkClosedRead();
883 }
884 if (how == SD_SEND) {
885 MarkClosedWrite();
886 }
887 if (how == SD_BOTH) {
888 MarkClosedRead();
889 MarkClosedWrite();
890 }
891}
892
893void ClientSocket::DoClose() {
894 // Always do a shutdown before initiating a disconnect.
895 shutdown(socket(), SD_BOTH);
896 IssueDisconnect();
898}
899
900bool ClientSocket::IssueRead() {
901 MonitorLocker ml(&monitor_);
902 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
903 ASSERT(!HasPendingRead());
904
905 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can
906 // handle 64k datagrams.
907 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536);
908
909 DWORD flags;
910 flags = 0;
911 int rc = WSARecv(socket(), buffer->GetWASBUF(), 1, nullptr, &flags,
912 buffer->GetCleanOverlapped(), nullptr);
913 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
914 pending_read_ = buffer;
915 return true;
916 }
917 OverlappedBuffer::DisposeBuffer(buffer);
918 pending_read_ = nullptr;
919 HandleIssueError();
920 return false;
921}
922
923bool ClientSocket::IssueWrite() {
924 MonitorLocker ml(&monitor_);
925 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
926 ASSERT(HasPendingWrite());
927 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
928
929 int rc = WSASend(socket(), pending_write_->GetWASBUF(), 1, nullptr, 0,
930 pending_write_->GetCleanOverlapped(), nullptr);
931 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
932 return true;
933 }
934 OverlappedBuffer::DisposeBuffer(pending_write_);
935 pending_write_ = nullptr;
936 HandleIssueError();
937 return false;
938}
939
940void ClientSocket::IssueDisconnect() {
941 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer();
942 BOOL ok =
943 DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0);
944 // DisconnectEx works like other OverlappedIO APIs, where we can get either an
945 // immediate success or delayed operation by WSA_IO_PENDING being set.
946 if (ok || (WSAGetLastError() != WSA_IO_PENDING)) {
947 DisconnectComplete(buffer);
948 }
949 // When the Dart side receives this event, it may decide to close its Dart
950 // ports. When all ports are closed, the VM will shut down. The EventHandler
951 // will then shut down. If the EventHandler shuts down before this
952 // asynchronous disconnect finishes, this ClientSocket will be leaked.
953 // TODO(dart:io): Retain a list of client sockets that are in the process of
954 // disconnecting. Disconnect them forcefully, and clean up their resources
955 // when the EventHandler shuts down.
956 NotifyAllDartPorts(1 << kDestroyedEvent);
957 RemoveAllPorts();
958#if defined(DEBUG)
959 disconnecting_++;
960#endif
961}
962
963void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) {
964 OverlappedBuffer::DisposeBuffer(buffer);
965 closesocket(socket());
966 data_ready_ = nullptr;
967 mark_closed();
968#if defined(DEBUG)
969 disconnecting_--;
970#endif
971}
972
973void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
974 OverlappedBuffer::DisposeBuffer(buffer);
975 // Update socket to support full socket API, after ConnectEx completed.
976 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, nullptr, 0);
977 // If the port is set, we already listen for this socket in Dart.
978 // Handle the cases here.
979 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) {
980 IssueRead();
981 }
982 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) {
983 Dart_Port port = NextNotifyDartPort(1 << kOutEvent);
984 DartUtils::PostInt32(port, 1 << kOutEvent);
985 }
986}
987
988void ClientSocket::EnsureInitialized(
989 EventHandlerImplementation* event_handler) {
990 MonitorLocker ml(&monitor_);
991 if (completion_port_ == INVALID_HANDLE_VALUE) {
992 ASSERT(event_handler_ == nullptr);
993 event_handler_ = event_handler;
994 CreateCompletionPort(event_handler_->completion_port());
995 }
996}
997
998bool ClientSocket::IsClosed() {
999 return connected_ && closed_ && !HasPendingRead() && !HasPendingWrite();
1000}
1001
1002bool ClientSocket::PopulateRemoteAddr(RawAddr& addr) {
1003 if (!remote_addr_) {
1004 return false;
1005 }
1006 addr = *remote_addr_;
1007 return true;
1008}
1009
1010bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
1011 MonitorLocker ml(&monitor_);
1012 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
1013 ASSERT(HasPendingWrite());
1014 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
1015
1016 int rc = WSASendTo(socket(), pending_write_->GetWASBUF(), 1, nullptr, 0, sa,
1017 sa_len, pending_write_->GetCleanOverlapped(), nullptr);
1018 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
1019 return true;
1020 }
1021 OverlappedBuffer::DisposeBuffer(pending_write_);
1022 pending_write_ = nullptr;
1023 HandleIssueError();
1024 return false;
1025}
1026
1027bool DatagramSocket::IssueRecvFrom() {
1028 MonitorLocker ml(&monitor_);
1029 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
1030 ASSERT(!HasPendingRead());
1031
1032 OverlappedBuffer* buffer =
1033 OverlappedBuffer::AllocateRecvFromBuffer(kMaxUDPPackageLength);
1034
1035 DWORD flags;
1036 flags = 0;
1037 int rc = WSARecvFrom(socket(), buffer->GetWASBUF(), 1, nullptr, &flags,
1038 buffer->from(), buffer->from_len_addr(),
1039 buffer->GetCleanOverlapped(), nullptr);
1040 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
1041 pending_read_ = buffer;
1042 return true;
1043 }
1044 OverlappedBuffer::DisposeBuffer(buffer);
1045 pending_read_ = nullptr;
1046 HandleIssueError();
1047 return false;
1048}
1049
1050void DatagramSocket::EnsureInitialized(
1051 EventHandlerImplementation* event_handler) {
1052 MonitorLocker ml(&monitor_);
1053 if (completion_port_ == INVALID_HANDLE_VALUE) {
1054 ASSERT(event_handler_ == nullptr);
1055 event_handler_ = event_handler;
1056 CreateCompletionPort(event_handler_->completion_port());
1057 }
1058}
1059
1060bool DatagramSocket::IsClosed() {
1061 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
1062}
1063
1064void DatagramSocket::DoClose() {
1065 // Just close the socket. This will cause any queued requests to be aborted.
1066 closesocket(socket());
1067 MarkClosedRead();
1068 MarkClosedWrite();
1070}
1071
1072void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
1073 if (msg->id == kTimerId) {
1074 // Change of timeout request. Just set the new timeout and port as the
1075 // completion thread will use the new timeout value for its next wait.
1076 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
1077 } else if (msg->id == kShutdownId) {
1078 shutdown_ = true;
1079 } else {
1080 Socket* socket = reinterpret_cast<Socket*>(msg->id);
1081 RefCntReleaseScope<Socket> rs(socket);
1082 if (socket->fd() == -1) {
1083 return;
1084 }
1085 Handle* handle = reinterpret_cast<Handle*>(socket->fd());
1086 ASSERT(handle != nullptr);
1087
1088 if (handle->is_listen_socket()) {
1089 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle);
1090 listen_socket->EnsureInitialized(this);
1091
1092 MonitorLocker ml(&listen_socket->monitor_);
1093
1094 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1095 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1096 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1097 // `events` can only have kInEvent/kOutEvent flags set.
1098 intptr_t events = msg->data & EVENT_MASK;
1099 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1100 listen_socket->SetPortAndMask(msg->dart_port, events);
1101 TryDispatchingPendingAccepts(listen_socket);
1102 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1103 if (msg->dart_port != ILLEGAL_PORT) {
1104 listen_socket->RemovePort(msg->dart_port);
1105 }
1106
1107 // We only close the socket file descriptor from the operating
1108 // system if there are no other dart socket objects which
1109 // are listening on the same (address, port) combination.
1110 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
1111 MutexLocker locker(registry->mutex());
1112 if (registry->CloseSafe(socket)) {
1113 ASSERT(listen_socket->Mask() == 0);
1114 listen_socket->Close();
1115 socket->CloseFd();
1116 }
1117 socket->SetClosedFd();
1118 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
1119 } else {
1120 UNREACHABLE();
1121 }
1122 } else {
1123 handle->EnsureInitialized(this);
1124 MonitorLocker ml(&handle->monitor_);
1125
1126 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1127 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1128 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1129 // `events` can only have kInEvent/kOutEvent flags set.
1130 intptr_t events = msg->data & EVENT_MASK;
1131 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1132
1133 handle->SetPortAndMask(msg->dart_port, events);
1134
1135 // Issue a read.
1136 if ((handle->Mask() & (1 << kInEvent)) != 0) {
1137 if (handle->is_datagram_socket()) {
1138 handle->IssueRecvFrom();
1139 } else if (handle->is_client_socket()) {
1140 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1141 handle->IssueRead();
1142 }
1143 } else {
1144 handle->IssueRead();
1145 }
1146 }
1147
1148 // If out events (can write events) have been requested, and there
1149 // are no pending writes, meaning any writes are already complete,
1150 // post an out event immediately.
1151 intptr_t out_event_mask = 1 << kOutEvent;
1152 if ((events & out_event_mask) != 0) {
1153 if (!handle->HasPendingWrite()) {
1154 if (handle->is_client_socket()) {
1155 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1156 intptr_t event_mask = 1 << kOutEvent;
1157 if ((handle->Mask() & event_mask) != 0) {
1158 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1159 DartUtils::PostInt32(port, event_mask);
1160 }
1161 }
1162 } else {
1163 if ((handle->Mask() & out_event_mask) != 0) {
1164 Dart_Port port = handle->NextNotifyDartPort(out_event_mask);
1165 DartUtils::PostInt32(port, out_event_mask);
1166 }
1167 }
1168 }
1169 }
1170 // Similarly, if in events (can read events) have been requested, and
1171 // there is pending data available, post an in event immediately.
1172 intptr_t in_event_mask = 1 << kInEvent;
1173 if ((events & in_event_mask) != 0) {
1174 if (handle->data_ready_ != nullptr &&
1175 !handle->data_ready_->IsEmpty()) {
1176 if ((handle->Mask() & in_event_mask) != 0) {
1177 Dart_Port port = handle->NextNotifyDartPort(in_event_mask);
1178 DartUtils::PostInt32(port, in_event_mask);
1179 }
1180 }
1181 }
1182 } else if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
1183 ASSERT(handle->is_client_socket());
1184
1185 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1186 client_socket->Shutdown(SD_RECEIVE);
1187 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
1188 ASSERT(handle->is_client_socket());
1189
1190 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1191 client_socket->Shutdown(SD_SEND);
1192 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1193 if (IS_SIGNAL_SOCKET(msg->data)) {
1194 Process::ClearSignalHandlerByFd(socket->fd(), socket->isolate_port());
1195 }
1196 handle->SetPortAndMask(msg->dart_port, 0);
1197 handle->Close();
1198 socket->CloseFd();
1199 } else {
1200 UNREACHABLE();
1201 }
1202 }
1203
1204 DeleteIfClosed(handle);
1205 }
1206}
1207
1208void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
1209 OverlappedBuffer* buffer) {
1210 listen_socket->AcceptComplete(buffer, completion_port_);
1211
1212 {
1213 MonitorLocker ml(&listen_socket->monitor_);
1214 TryDispatchingPendingAccepts(listen_socket);
1215 }
1216
1217 DeleteIfClosed(listen_socket);
1218}
1219
1220void EventHandlerImplementation::TryDispatchingPendingAccepts(
1221 ListenSocket* listen_socket) {
1222 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
1223 intptr_t event_mask = 1 << kInEvent;
1224 for (int i = 0; (i < listen_socket->accepted_count()) &&
1225 (listen_socket->Mask() == event_mask);
1226 i++) {
1227 Dart_Port port = listen_socket->NextNotifyDartPort(event_mask);
1228 DartUtils::PostInt32(port, event_mask);
1229 }
1230 }
1231}
1232
1233void EventHandlerImplementation::HandleRead(Handle* handle,
1234 int bytes,
1235 OverlappedBuffer* buffer) {
1236 buffer->set_data_length(bytes);
1237 handle->ReadComplete(buffer);
1238 if (bytes > 0) {
1239 if (!handle->IsClosing()) {
1240 int event_mask = 1 << kInEvent;
1241 if ((handle->Mask() & event_mask) != 0) {
1242 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1243 DartUtils::PostInt32(port, event_mask);
1244 }
1245 }
1246 } else {
1247 handle->MarkClosedRead();
1248 if (bytes == 0) {
1249 HandleClosed(handle);
1250 } else {
1251 HandleError(handle);
1252 }
1253 }
1254
1255 DeleteIfClosed(handle);
1256}
1257
1258void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
1259 int bytes,
1260 OverlappedBuffer* buffer) {
1261 ASSERT(handle->is_datagram_socket());
1262 if (bytes >= 0) {
1263 buffer->set_data_length(bytes);
1264 handle->ReadComplete(buffer);
1265 if (!handle->IsClosing()) {
1266 int event_mask = 1 << kInEvent;
1267 if ((handle->Mask() & event_mask) != 0) {
1268 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1269 DartUtils::PostInt32(port, event_mask);
1270 }
1271 }
1272 } else {
1273 HandleError(handle);
1274 }
1275
1276 DeleteIfClosed(handle);
1277}
1278
1279void EventHandlerImplementation::HandleWrite(Handle* handle,
1280 int bytes,
1281 OverlappedBuffer* buffer) {
1282 handle->WriteComplete(buffer);
1283
1284 if (bytes >= 0) {
1285 if (!handle->IsError() && !handle->IsClosing()) {
1286 int event_mask = 1 << kOutEvent;
1287 ASSERT(!handle->is_client_socket() ||
1288 reinterpret_cast<ClientSocket*>(handle)->is_connected());
1289 if ((handle->Mask() & event_mask) != 0) {
1290 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1291 DartUtils::PostInt32(port, event_mask);
1292 }
1293 }
1294 } else {
1295 HandleError(handle);
1296 }
1297
1298 DeleteIfClosed(handle);
1299}
1300
1301void EventHandlerImplementation::HandleDisconnect(ClientSocket* client_socket,
1302 int bytes,
1303 OverlappedBuffer* buffer) {
1304 client_socket->DisconnectComplete(buffer);
1305 DeleteIfClosed(client_socket);
1306}
1307
1308void EventHandlerImplementation::HandleConnect(ClientSocket* client_socket,
1309 int bytes,
1310 OverlappedBuffer* buffer) {
1311 if (bytes < 0) {
1312 HandleError(client_socket);
1313 OverlappedBuffer::DisposeBuffer(buffer);
1314 } else {
1315 client_socket->ConnectComplete(buffer);
1316 }
1317 client_socket->mark_connected();
1318 DeleteIfClosed(client_socket);
1319}
1320
1321void EventHandlerImplementation::HandleTimeout() {
1322 if (!timeout_queue_.HasTimeout()) {
1323 return;
1324 }
1325 DartUtils::PostNull(timeout_queue_.CurrentPort());
1326 timeout_queue_.RemoveCurrent();
1327}
1328
1329void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
1330 ULONG_PTR key,
1331 OVERLAPPED* overlapped) {
1332 OverlappedBuffer* buffer = OverlappedBuffer::GetFromOverlapped(overlapped);
1333 switch (buffer->operation()) {
1334 case OverlappedBuffer::kAccept: {
1335 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(key);
1336 HandleAccept(listen_socket, buffer);
1337 break;
1338 }
1340 Handle* handle = reinterpret_cast<Handle*>(key);
1341 HandleRead(handle, bytes, buffer);
1342 break;
1343 }
1344 case OverlappedBuffer::kRecvFrom: {
1345 Handle* handle = reinterpret_cast<Handle*>(key);
1346 HandleRecvFrom(handle, bytes, buffer);
1347 break;
1348 }
1350 case OverlappedBuffer::kSendTo: {
1351 Handle* handle = reinterpret_cast<Handle*>(key);
1352 HandleWrite(handle, bytes, buffer);
1353 break;
1354 }
1355 case OverlappedBuffer::kDisconnect: {
1356 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key);
1357 HandleDisconnect(client_socket, bytes, buffer);
1358 break;
1359 }
1360 case OverlappedBuffer::kConnect: {
1361 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key);
1362 HandleConnect(client_socket, bytes, buffer);
1363 break;
1364 }
1365 default:
1366 UNREACHABLE();
1367 }
1368}
1369
1370void EventHandlerImplementation::HandleCompletionOrInterrupt(
1371 BOOL ok,
1372 DWORD bytes,
1373 ULONG_PTR key,
1374 OVERLAPPED* overlapped) {
1375 if (!ok) {
1376 // Treat ERROR_CONNECTION_ABORTED as connection closed.
1377 // The error ERROR_OPERATION_ABORTED is set for pending
1378 // accept requests for a listen socket which is closed.
1379 // ERROR_NETNAME_DELETED occurs when the client closes
1380 // the socket it is reading from.
1381 DWORD last_error = GetLastError();
1382 if ((last_error == ERROR_CONNECTION_ABORTED) ||
1383 (last_error == ERROR_OPERATION_ABORTED) ||
1384 (last_error == ERROR_NETNAME_DELETED) ||
1385 (last_error == ERROR_BROKEN_PIPE)) {
1386 ASSERT(bytes == 0);
1387 HandleIOCompletion(bytes, key, overlapped);
1388 } else if (last_error == ERROR_MORE_DATA) {
1389 // Don't ASSERT no bytes in this case. This can happen if the receive
1390 // buffer for datagram sockets is too small to contain a full datagram,
1391 // and in this case bytes hold the bytes that was read.
1392 HandleIOCompletion(-1, key, overlapped);
1393 } else {
1394 ASSERT(bytes == 0);
1395 HandleIOCompletion(-1, key, overlapped);
1396 }
1397 } else if (key == NULL) {
1398 // A key of nullptr signals an interrupt message.
1399 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
1400 HandleInterrupt(msg);
1401 delete msg;
1402 } else {
1403 HandleIOCompletion(bytes, key, overlapped);
1404 }
1405}
1406
1407EventHandlerImplementation::EventHandlerImplementation() {
1408 handler_thread_id_ = Thread::kInvalidThreadId;
1409 handler_thread_handle_ = nullptr;
1410 completion_port_ =
1411 CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, NULL, 1);
1412 if (completion_port_ == nullptr) {
1413 FATAL("Completion port creation failed");
1414 }
1415 shutdown_ = false;
1416}
1417
1418EventHandlerImplementation::~EventHandlerImplementation() {
1419 // Join the handler thread.
1420 DWORD res = WaitForSingleObject(handler_thread_handle_, INFINITE);
1421 CloseHandle(handler_thread_handle_);
1422 ASSERT(res == WAIT_OBJECT_0);
1423 CloseHandle(completion_port_);
1424}
1425
1426int64_t EventHandlerImplementation::GetTimeout() {
1427 if (!timeout_queue_.HasTimeout()) {
1428 return kInfinityTimeout;
1429 }
1430 int64_t millis =
1431 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
1432 return (millis < 0) ? 0 : millis;
1433}
1434
1436 Dart_Port dart_port,
1437 int64_t data) {
1438 InterruptMessage* msg = new InterruptMessage;
1439 msg->id = id;
1440 msg->dart_port = dart_port;
1441 msg->data = data;
1442 BOOL ok = PostQueuedCompletionStatus(completion_port_, 0, NULL,
1443 reinterpret_cast<OVERLAPPED*>(msg));
1444 if (!ok) {
1445 FATAL("PostQueuedCompletionStatus failed");
1446 }
1447}
1448
1449void EventHandlerImplementation::EventHandlerEntry(uword args) {
1450 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
1451 EventHandlerImplementation* handler_impl = &handler->delegate_;
1452 ASSERT(handler_impl != nullptr);
1453
1454 {
1455 MonitorLocker ml(&handler_impl->startup_monitor_);
1456 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
1457 handler_impl->handler_thread_handle_ =
1458 OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_);
1459 ml.Notify();
1460 }
1461
1462 DWORD bytes;
1463 ULONG_PTR key;
1464 OVERLAPPED* overlapped;
1465 BOOL ok;
1466 while (!handler_impl->shutdown_) {
1467 int64_t millis = handler_impl->GetTimeout();
1468 ASSERT(millis == kInfinityTimeout || millis >= 0);
1469 if (millis > kMaxInt32) {
1470 millis = kMaxInt32;
1471 }
1472 ASSERT(sizeof(int32_t) == sizeof(DWORD));
1473 DWORD timeout = static_cast<DWORD>(millis);
1474 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1475 &key, &overlapped, timeout);
1476
1477 if (!ok && (overlapped == nullptr)) {
1478 if (GetLastError() == ERROR_ABANDONED_WAIT_0) {
1479 // The completion port should never be closed.
1480 Syslog::Print("Completion port closed\n");
1481 UNREACHABLE();
1482 } else {
1483 // Timeout is signalled by false result and nullptr in overlapped.
1484 handler_impl->HandleTimeout();
1485 }
1486 } else {
1487 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
1488 }
1489 }
1490
1491// In a Debug build, drain the IO completion port to make sure we aren't
1492// leaking any (non-disconnecting) Handles. In a Release build, we don't care
1493// because the VM is going down, and the asserts below are Debug-only.
1494#if defined(DEBUG)
1495 while (true) {
1496 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1497 &key, &overlapped, 0);
1498 if (!ok && (overlapped == nullptr)) {
1499 // There was an error or nothing is ready. Assume the port is drained.
1500 break;
1501 }
1502 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
1503 }
1504
1505 // The eventhandler thread is going down so there should be no more live
1506 // Handles or Sockets.
1507 // TODO(dart:io): It would be nice to be able to assert here that:
1508 // ReferenceCounted<Handle>::instances() == 0;
1509 // However, we cannot at the moment. See the TODO on:
1510 // ClientSocket::IssueDisconnect()
1511 // Furthermore, if the Dart program references stdin, but does not
1512 // explicitly close it, then the StdHandle for it will be leaked to here.
1513 const intptr_t stdin_leaked = (StdHandle::StdinPtr() == nullptr) ? 0 : 1;
1514 DEBUG_ASSERT(ReferenceCounted<Handle>::instances() ==
1515 ClientSocket::disconnecting() + stdin_leaked);
1516 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
1517#endif // defined(DEBUG)
1518 handler->NotifyShutdownDone();
1519}
1520
1521void EventHandlerImplementation::Start(EventHandler* handler) {
1522 int result = Thread::Start("dart:io EventHandler", EventHandlerEntry,
1523 reinterpret_cast<uword>(handler));
1524 if (result != 0) {
1525 FATAL("Failed to start event handler thread %d", result);
1526 }
1527
1528 {
1529 MonitorLocker ml(&startup_monitor_);
1530 while (handler_thread_id_ == Thread::kInvalidThreadId) {
1531 ml.Wait();
1532 }
1533 }
1534}
1535
1537 SendData(kShutdownId, 0, 0);
1538}
1539
1540} // namespace bin
1541} // namespace dart
1542
1543#endif // defined(DART_HOST_OS_WINDOWS)
static uint32_t buffer_size(uint32_t offset, uint32_t maxAlignment)
static bool ok(int result)
static void operation(T operation, uint32_t &a, uint32_t b, uint32_t c, uint32_t d, uint32_t x, uint8_t s, uint32_t t)
Definition: SkMD5.cpp:144
static const size_t kBufferSize
Definition: SkString.cpp:27
#define UNREACHABLE()
Definition: assert.h:248
#define DEBUG_ASSERT(cond)
Definition: assert.h:321
Handle(intptr_t handle)
int Read(void *buffer, int num_bytes)
static void DisposeBuffer(OverlappedBuffer *buffer)
static OverlappedBuffer * GetFromOverlapped(OVERLAPPED *overlapped)
static OverlappedBuffer * AllocateRecvFromBuffer(int buffer_size)
static OverlappedBuffer * AllocateAcceptBuffer(int buffer_size)
static OverlappedBuffer * AllocateConnectBuffer()
int Write(const void *buffer, int num_bytes)
static OverlappedBuffer * AllocateDisconnectBuffer()
static OverlappedBuffer * AllocateSendToBuffer(int buffer_size)
static OverlappedBuffer * AllocateReadBuffer(int buffer_size)
static OverlappedBuffer * AllocateWriteBuffer(int buffer_size)
#define ILLEGAL_PORT
Definition: dart_api.h:1535
int64_t Dart_Port
Definition: dart_api.h:1525
#define ASSERT(E)
#define IS_COMMAND(data, command_bit)
Definition: eventhandler.h:49
#define EVENT_MASK
Definition: eventhandler.h:44
#define IS_SIGNAL_SOCKET(data)
Definition: eventhandler.h:58
#define TOKEN_COUNT(data)
Definition: eventhandler.h:60
struct MyStruct s
#define FATAL(error)
FlutterSemanticsFlag flags
G_BEGIN_DECLS G_MODULE_EXPORT FlValue * args
const uint8_t uint32_t uint32_t GError ** error
GAsyncResult * result
int SendData(MHD_Connection *connection, const SkData *data, const char *type, bool setContentDisposition, const char *dispositionString)
Definition: Response.cpp:64
SK_API bool Read(SkStreamSeekable *src, SkDocumentPage *dstArray, int dstArrayCount, const SkDeserialProcs *=nullptr)
static constexpr intptr_t kTimerId
Definition: eventhandler.h:109
void ReadFile(uint8_t **data, intptr_t *file_len, void *stream)
@ kShutdownWriteCommand
Definition: eventhandler.h:30
@ kReturnTokenCommand
Definition: eventhandler.h:31
@ kShutdownReadCommand
Definition: eventhandler.h:29
@ kDestroyedEvent
Definition: eventhandler.h:27
@ kSetEventMaskCommand
Definition: eventhandler.h:32
static constexpr intptr_t kShutdownId
Definition: eventhandler.h:110
static constexpr intptr_t kInfinityTimeout
Definition: eventhandler.h:108
static void Shutdown(Dart_NativeArguments args)
static EventHandler * event_handler
Definition: eventhandler.cc:18
bool WriteFile(const std::string &path, const char *data, ssize_t size)
Definition: files.cc:69
Definition: dart_vm.cc:33
uintptr_t uword
Definition: globals.h:501
constexpr int32_t kMaxInt32
Definition: globals.h:483
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
Definition: switches.h:87
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service A custom Dart VM Service port The default is to pick a randomly available open port disable vm Disable the Dart VM Service The Dart VM Service is never available in release mode disable vm service Disable mDNS Dart VM Service publication Bind to the IPv6 localhost address for the Dart VM Service Ignored if vm service host is set endless trace buffer
Definition: switches.h:126
void Close(PathBuilder *builder)
Definition: tessellator.cc:38
def timeout(deadline, cmd)
fuchsia::ui::composition::ParentViewportWatcherHandle handle_
std::shared_ptr< const fml::Mapping > data
Definition: texture_gles.cc:63
const uintptr_t id
int BOOL
Definition: windows_types.h:37
#define SYNCHRONIZE
#define INVALID_HANDLE_VALUE
WINBASEAPI VOID WINAPI SetLastError(_In_ DWORD dwErrCode)
WINBASEAPI _Check_return_ _Post_equals_last_error_ DWORD WINAPI GetLastError(VOID)
void * HANDLE
Definition: windows_types.h:36
struct _OVERLAPPED OVERLAPPED
__w64 unsigned long ULONG_PTR
Definition: windows_types.h:56
unsigned long DWORD
Definition: windows_types.h:22
struct _GUID GUID