Flutter Engine
The Flutter Engine
Loading...
Searching...
No Matches
eventhandler_win.cc
Go to the documentation of this file.
1// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2// for details. All rights reserved. Use of this source code is governed by a
3// BSD-style license that can be found in the LICENSE file.
4
5#include "platform/globals.h"
6#if defined(DART_HOST_OS_WINDOWS)
7
8#include "bin/eventhandler.h"
10
11#include <fcntl.h> // NOLINT
12#include <io.h> // NOLINT
13#include <mswsock.h> // NOLINT
14#include <winsock2.h> // NOLINT
15#include <ws2tcpip.h> // NOLINT
16#include <utility>
17
18#include "bin/builtin.h"
19#include "bin/dartutils.h"
20#include "bin/lockers.h"
21#include "bin/process.h"
22#include "bin/socket.h"
23#include "bin/thread.h"
24#include "bin/utils.h"
25#include "platform/syslog.h"
26
27#include "platform/utils.h"
28
29namespace dart {
30namespace bin {
31
32// kBufferSize must be >= kMaxUDPPackageLength so that a complete UDP packet
33// can fit in the buffer.
34static constexpr int kBufferSize = 64 * 1024;
35static constexpr int kStdOverlappedBufferSize = 16 * 1024;
36static constexpr int kMaxUDPPackageLength = 64 * 1024;
37// For AcceptEx there needs to be buffer storage for address
38// information for two addresses (local and remote address). The
39// AcceptEx documentation says: "This value must be at least 16
40// bytes more than the maximum address length for the transport
41// protocol in use."
42static constexpr int kAcceptExAddressAdditionalBytes = 16;
43static constexpr int kAcceptExAddressStorageSize =
44 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes;
45
46OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size,
47 Operation operation) {
48 OverlappedBuffer* buffer =
49 new (buffer_size) OverlappedBuffer(buffer_size, operation);
50 return buffer;
51}
52
54 OverlappedBuffer* buffer = AllocateBuffer(buffer_size, kAccept);
55 return buffer;
56}
57
59 return AllocateBuffer(buffer_size, kRead);
60}
61
63 // For calling recvfrom additional buffer space is needed for the source
64 // address information.
65 buffer_size += sizeof(socklen_t) + sizeof(struct sockaddr_storage);
66 return AllocateBuffer(buffer_size, kRecvFrom);
67}
68
70 return AllocateBuffer(buffer_size, kWrite);
71}
72
74 return AllocateBuffer(buffer_size, kSendTo);
75}
76
78 return AllocateBuffer(0, kDisconnect);
79}
80
82 return AllocateBuffer(0, kConnect);
83}
84
85void OverlappedBuffer::DisposeBuffer(OverlappedBuffer* buffer) {
86 delete buffer;
87}
88
89OverlappedBuffer* OverlappedBuffer::GetFromOverlapped(OVERLAPPED* overlapped) {
90 OverlappedBuffer* buffer =
91 CONTAINING_RECORD(overlapped, OverlappedBuffer, overlapped_);
92 return buffer;
93}
94
95int OverlappedBuffer::Read(void* buffer, int num_bytes) {
96 if (num_bytes > GetRemainingLength()) {
97 num_bytes = GetRemainingLength();
98 }
99 memmove(buffer, GetBufferStart() + index_, num_bytes);
100 index_ += num_bytes;
101 return num_bytes;
102}
103
104int OverlappedBuffer::Write(const void* buffer, int num_bytes) {
105 ASSERT(num_bytes == buflen_);
106 memmove(GetBufferStart(), buffer, num_bytes);
107 data_length_ = num_bytes;
108 return num_bytes;
109}
110
112 ASSERT(operation_ == kRead || operation_ == kRecvFrom);
113 return data_length_ - index_;
114}
115
116Handle::Handle(intptr_t handle)
117 : ReferenceCounted(),
118 DescriptorInfoBase(handle),
119 monitor_(),
120 handle_(reinterpret_cast<HANDLE>(handle)),
121 completion_port_(INVALID_HANDLE_VALUE),
122 event_handler_(nullptr),
123 data_ready_(),
124 pending_read_(nullptr),
125 pending_write_(nullptr),
126 last_error_(NOERROR),
127 read_thread_id_(Thread::kInvalidThreadId),
128 read_thread_handle_(nullptr),
129 read_thread_starting_(false),
130 read_thread_finished_(false),
131 flags_(0) {}
132
133Handle::~Handle() {}
134
135bool Handle::CreateCompletionPort(HANDLE completion_port) {
136 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
137 // A reference to the Handle is Retained by the IO completion port.
138 // It is Released by DeleteIfClosed.
139 Retain();
140 completion_port_ = CreateIoCompletionPort(
141 handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0);
142 return (completion_port_ != nullptr);
143}
144
145void Handle::Close() {
146 MonitorLocker ml(&monitor_);
147 if (!SupportsOverlappedIO()) {
148 // If the handle uses synchronous I/O (e.g. stdin), cancel any pending
149 // operation before closing the handle, so the read thread is not blocked.
150 BOOL result = CancelIoEx(handle_, nullptr);
151// The Dart code 'stdin.listen(() {}).cancel()' causes this assert to be
152// triggered on Windows 7, but not on Windows 10.
153#if defined(DEBUG)
154 if (IsWindows10OrGreater()) {
155 ASSERT(result || (GetLastError() == ERROR_NOT_FOUND));
156 }
157#else
158 USE(result);
159#endif
160 }
161 if (!IsClosing()) {
162 // Close the socket and set the closing state. This close method can be
163 // called again if this socket has pending IO operations in flight.
164 MarkClosing();
165 // Perform handle type specific closing.
166 DoClose();
167 }
168 ASSERT(IsHandleClosed());
169}
170
171void Handle::DoClose() {
172 if (!IsHandleClosed()) {
173 CloseHandle(handle_);
175 }
176}
177
178bool Handle::HasPendingRead() {
179 return pending_read_ != nullptr;
180}
181
182bool Handle::HasPendingWrite() {
183 return pending_write_ != nullptr;
184}
185
186void Handle::WaitForReadThreadStarted() {
187 MonitorLocker ml(&monitor_);
188 while (read_thread_starting_) {
189 ml.Wait();
190 }
191}
192
193void Handle::WaitForReadThreadFinished() {
194 HANDLE to_join = nullptr;
195 {
196 MonitorLocker ml(&monitor_);
197 if (read_thread_id_ != Thread::kInvalidThreadId) {
198 while (!read_thread_finished_) {
199 ml.Wait();
200 }
201 read_thread_finished_ = false;
202 read_thread_id_ = Thread::kInvalidThreadId;
203 to_join = read_thread_handle_;
204 read_thread_handle_ = nullptr;
205 }
206 }
207 if (to_join != nullptr) {
208 // Join the read thread.
209 DWORD res = WaitForSingleObject(to_join, INFINITE);
210 CloseHandle(to_join);
211 ASSERT(res == WAIT_OBJECT_0);
212 }
213}
214
215void Handle::ReadComplete(OverlappedBuffer* buffer) {
216 WaitForReadThreadStarted();
217 {
218 MonitorLocker ml(&monitor_);
219 // Currently only one outstanding read at the time.
220 ASSERT(pending_read_ == buffer);
221 ASSERT(data_ready_ == nullptr);
222 if (!IsClosing()) {
223 data_ready_.reset(pending_read_);
224 } else {
225 OverlappedBuffer::DisposeBuffer(buffer);
226 }
227 pending_read_ = nullptr;
228 }
229 WaitForReadThreadFinished();
230}
231
232void Handle::RecvFromComplete(OverlappedBuffer* buffer) {
233 ReadComplete(buffer);
234}
235
236void Handle::WriteComplete(OverlappedBuffer* buffer) {
237 MonitorLocker ml(&monitor_);
238 // Currently only one outstanding write at the time.
239 ASSERT(pending_write_ == buffer);
240 OverlappedBuffer::DisposeBuffer(buffer);
241 pending_write_ = nullptr;
242}
243
244static void ReadFileThread(uword args) {
245 Handle* handle = reinterpret_cast<Handle*>(args);
246 handle->ReadSyncCompleteAsync();
247}
248
249void Handle::NotifyReadThreadStarted() {
250 MonitorLocker ml(&monitor_);
251 ASSERT(read_thread_starting_);
252 ASSERT(read_thread_id_ == Thread::kInvalidThreadId);
253 read_thread_id_ = Thread::GetCurrentThreadId();
254 read_thread_handle_ = OpenThread(SYNCHRONIZE, false, read_thread_id_);
255 read_thread_starting_ = false;
256 ml.Notify();
257}
258
259void Handle::NotifyReadThreadFinished() {
260 MonitorLocker ml(&monitor_);
261 ASSERT(!read_thread_finished_);
262 ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
263 read_thread_finished_ = true;
264 ml.Notify();
265}
266
267void Handle::ReadSyncCompleteAsync() {
268 NotifyReadThreadStarted();
269 ASSERT(HasPendingRead());
270 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
271
272 DWORD buffer_size = pending_read_->GetBufferSize();
273 if (GetFileType(handle_) == FILE_TYPE_CHAR) {
274 buffer_size = kStdOverlappedBufferSize;
275 }
276 char* buffer_start = pending_read_->GetBufferStart();
277 DWORD bytes_read = 0;
278 BOOL ok = ReadFile(handle_, buffer_start, buffer_size, &bytes_read, nullptr);
279 if (!ok) {
280 bytes_read = 0;
281 }
282 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped();
283 ok =
284 PostQueuedCompletionStatus(event_handler_->completion_port(), bytes_read,
285 reinterpret_cast<ULONG_PTR>(this), overlapped);
286 if (!ok) {
287 FATAL("PostQueuedCompletionStatus failed");
288 }
289 NotifyReadThreadFinished();
290}
291
292bool Handle::IssueRead() {
293 ASSERT(type_ != kListenSocket);
294 ASSERT(!HasPendingRead());
295 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
296 if (SupportsOverlappedIO()) {
297 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
298
299 BOOL ok =
300 ReadFile(handle_, buffer->GetBufferStart(), buffer->GetBufferSize(),
301 nullptr, buffer->GetCleanOverlapped());
302 if (ok || (GetLastError() == ERROR_IO_PENDING)) {
303 // Completing asynchronously.
304 pending_read_ = buffer;
305 return true;
306 }
307 OverlappedBuffer::DisposeBuffer(buffer);
308 HandleIssueError();
309 return false;
310 } else {
311 // Completing asynchronously through thread.
312 pending_read_ = buffer;
313 read_thread_starting_ = true;
314 int result = Thread::Start("dart:io ReadFile", ReadFileThread,
315 reinterpret_cast<uword>(this));
316 if (result != 0) {
317 FATAL("Failed to start read file thread %d", result);
318 }
319 return true;
320 }
321}
322
323bool Handle::IssueRecvFrom() {
324 return false;
325}
326
327bool Handle::IssueWrite() {
328 MonitorLocker ml(&monitor_);
329 ASSERT(type_ != kListenSocket);
330 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
331 ASSERT(HasPendingWrite());
332 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
333
334 OverlappedBuffer* buffer = pending_write_;
335 BOOL ok =
336 WriteFile(handle_, buffer->GetBufferStart(), buffer->GetBufferSize(),
337 nullptr, buffer->GetCleanOverlapped());
338 if (ok || (GetLastError() == ERROR_IO_PENDING)) {
339 // Completing asynchronously.
340 pending_write_ = buffer;
341 return true;
342 }
343 OverlappedBuffer::DisposeBuffer(buffer);
344 HandleIssueError();
345 return false;
346}
347
348bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
349 return false;
350}
351
352static void HandleClosed(Handle* handle) {
353 if (!handle->IsClosing()) {
354 int event_mask = 1 << kCloseEvent;
355 handle->NotifyAllDartPorts(event_mask);
356 }
357}
358
359static void HandleError(Handle* handle) {
360 handle->set_last_error(WSAGetLastError());
361 handle->MarkError();
362 if (!handle->IsClosing()) {
363 handle->NotifyAllDartPorts(1 << kErrorEvent);
364 }
365}
366
367void Handle::HandleIssueError() {
369 if (error == ERROR_BROKEN_PIPE) {
370 HandleClosed(this);
371 } else {
372 HandleError(this);
373 }
375}
376
377void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
378 MonitorLocker ml(&monitor_);
379 event_handler_ = event_handler;
380 if (completion_port_ == INVALID_HANDLE_VALUE) {
381 if (SupportsOverlappedIO()) {
382 CreateCompletionPort(event_handler_->completion_port());
383 } else {
384 // We need to retain the Handle even if overlapped IO is not supported.
385 // It is Released by DeleteIfClosed after ReadSyncCompleteAsync
386 // manually puts an event on the IO completion port.
387 Retain();
388 completion_port_ = event_handler_->completion_port();
389 }
390 }
391}
392
393bool FileHandle::IsClosed() {
394 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
395}
396
397void DirectoryWatchHandle::EnsureInitialized(
398 EventHandlerImplementation* event_handler) {
399 MonitorLocker ml(&monitor_);
400 event_handler_ = event_handler;
401 if (completion_port_ == INVALID_HANDLE_VALUE) {
402 CreateCompletionPort(event_handler_->completion_port());
403 }
404}
405
406bool DirectoryWatchHandle::IsClosed() {
407 return IsClosing() && !HasPendingRead();
408}
409
410bool DirectoryWatchHandle::IssueRead() {
411 // It may have been started before, as we start the directory-handler when
412 // we create it.
413 if (HasPendingRead() || (data_ready_ != nullptr)) {
414 return true;
415 }
416 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
417 // Set up pending_read_ before ReadDirectoryChangesW because it might be
418 // needed in ReadComplete invoked on event loop thread right away if data is
419 // also ready right away.
420 pending_read_ = buffer;
421 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
422 BOOL ok = ReadDirectoryChangesW(
423 handle_, buffer->GetBufferStart(), buffer->GetBufferSize(), recursive_,
424 events_, nullptr, buffer->GetCleanOverlapped(), nullptr);
425 if (ok || (GetLastError() == ERROR_IO_PENDING)) {
426 // Completing asynchronously.
427 return true;
428 }
429 pending_read_ = nullptr;
430 OverlappedBuffer::DisposeBuffer(buffer);
431 return false;
432}
433
434void DirectoryWatchHandle::Stop() {
435 MonitorLocker ml(&monitor_);
436 // Stop the outstanding read, so we can close the handle.
437
438 if (HasPendingRead()) {
439 CancelIoEx(handle(), pending_read_->GetCleanOverlapped());
440 // Don't dispose of the buffer, as it will still complete (with length 0).
441 }
442
443 DoClose();
444}
445
446void SocketHandle::HandleIssueError() {
447 int error = WSAGetLastError();
448 if (error == WSAECONNRESET) {
449 HandleClosed(this);
450 } else {
451 HandleError(this);
452 }
453 WSASetLastError(error);
454}
455
456bool ListenSocket::LoadAcceptEx() {
457 // Load the AcceptEx function into memory using WSAIoctl.
458 GUID guid_accept_ex = WSAID_ACCEPTEX;
459 DWORD bytes;
460 int status = WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
461 &guid_accept_ex, sizeof(guid_accept_ex), &AcceptEx_,
462 sizeof(AcceptEx_), &bytes, nullptr, nullptr);
463 return (status != SOCKET_ERROR);
464}
465
466bool ListenSocket::LoadGetAcceptExSockaddrs() {
467 // Load the GetAcceptExSockaddrs function into memory using WSAIoctl.
468 GUID guid_get_accept_ex_sockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
469 DWORD bytes;
470 int status =
471 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
472 &guid_get_accept_ex_sockaddrs,
473 sizeof(guid_get_accept_ex_sockaddrs), &GetAcceptExSockaddrs_,
474 sizeof(GetAcceptExSockaddrs_), &bytes, nullptr, nullptr);
475 return (status != SOCKET_ERROR);
476}
477
478bool ListenSocket::IssueAccept() {
479 MonitorLocker ml(&monitor_);
480
481 OverlappedBuffer* buffer =
482 OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize);
483 DWORD received;
484 BOOL ok;
485 ok = AcceptEx_(socket(), buffer->client(), buffer->GetBufferStart(),
486 0, // For now don't receive data with accept.
487 kAcceptExAddressStorageSize, kAcceptExAddressStorageSize,
488 &received, buffer->GetCleanOverlapped());
489 if (!ok) {
490 if (WSAGetLastError() != WSA_IO_PENDING) {
491 int error = WSAGetLastError();
492 closesocket(buffer->client());
493 OverlappedBuffer::DisposeBuffer(buffer);
494 WSASetLastError(error);
495 return false;
496 }
497 }
498
499 pending_accept_count_++;
500
501 return true;
502}
503
504void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
505 HANDLE completion_port) {
506 MonitorLocker ml(&monitor_);
507 if (!IsClosing()) {
508 // Update the accepted socket to support the full range of API calls.
509 SOCKET s = socket();
510 int rc = setsockopt(buffer->client(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
511 reinterpret_cast<char*>(&s), sizeof(s));
512 if (rc == NO_ERROR) {
513 // getpeername() returns incorrect results when used with a socket that
514 // was accepted using overlapped I/O. AcceptEx includes the remote
515 // address in its result so retrieve it using GetAcceptExSockaddrs and
516 // save it.
517 LPSOCKADDR local_addr;
518 int local_addr_length;
519 LPSOCKADDR remote_addr;
520 int remote_addr_length;
521 GetAcceptExSockaddrs_(
522 buffer->GetBufferStart(), 0, kAcceptExAddressStorageSize,
523 kAcceptExAddressStorageSize, &local_addr, &local_addr_length,
524 &remote_addr, &remote_addr_length);
525 RawAddr* raw_remote_addr = new RawAddr;
526 memmove(raw_remote_addr, remote_addr, remote_addr_length);
527
528 // Insert the accepted socket into the list.
529 ClientSocket* client_socket = new ClientSocket(
530 buffer->client(), std::unique_ptr<RawAddr>(raw_remote_addr));
531 client_socket->mark_connected();
532 client_socket->CreateCompletionPort(completion_port);
533 if (accepted_head_ == nullptr) {
534 accepted_head_ = client_socket;
535 accepted_tail_ = client_socket;
536 } else {
537 ASSERT(accepted_tail_ != nullptr);
538 accepted_tail_->set_next(client_socket);
539 accepted_tail_ = client_socket;
540 }
541 accepted_count_++;
542 } else {
543 closesocket(buffer->client());
544 }
545 } else {
546 // Close the socket, as it's already accepted.
547 closesocket(buffer->client());
548 }
549
550 pending_accept_count_--;
551 OverlappedBuffer::DisposeBuffer(buffer);
552}
553
554static void DeleteIfClosed(Handle* handle) {
555 if (handle->IsClosed()) {
556 handle->set_completion_port(INVALID_HANDLE_VALUE);
557 handle->set_event_handler(nullptr);
558 handle->NotifyAllDartPorts(1 << kDestroyedEvent);
559 handle->RemoveAllPorts();
560 // Once the Handle is closed, no further events on the IO completion port
561 // will mention it. Thus, we can drop the reference here.
562 handle->Release();
563 }
564}
565
566void ListenSocket::DoClose() {
567 closesocket(socket());
569 while (CanAccept()) {
570 // Get rid of connections already accepted.
571 ClientSocket* client = Accept();
572 if (client != nullptr) {
573 client->Close();
574 // Release the reference from the list.
575 // When an accept completes, we make a new ClientSocket (1 reference),
576 // and add it to the IO completion port (1 more reference). If an
577 // accepted connection is never requested by the Dart code, then
578 // this list owns a reference (first Release), and the IO completion
579 // port owns a reference, (second Release in DeleteIfClosed).
580 client->Release();
581 DeleteIfClosed(client);
582 } else {
583 break;
584 }
585 }
586 // To finish resetting the state of the ListenSocket back to what it was
587 // before EnsureInitialized was called, we have to reset the AcceptEx_
588 // and GetAcceptExSockaddrs_ function pointers.
589 AcceptEx_ = nullptr;
590 GetAcceptExSockaddrs_ = nullptr;
591}
592
593bool ListenSocket::CanAccept() {
594 MonitorLocker ml(&monitor_);
595 return accepted_head_ != nullptr;
596}
597
598ClientSocket* ListenSocket::Accept() {
599 MonitorLocker ml(&monitor_);
600
601 ClientSocket* result = nullptr;
602
603 if (accepted_head_ != nullptr) {
604 result = accepted_head_;
605 accepted_head_ = accepted_head_->next();
606 if (accepted_head_ == nullptr) {
607 accepted_tail_ = nullptr;
608 }
609 result->set_next(nullptr);
610 accepted_count_--;
611 }
612
613 if (pending_accept_count_ < 5) {
614 // We have less than 5 pending accepts, queue another.
615 if (!IsClosing()) {
616 if (!IssueAccept()) {
617 HandleError(this);
618 }
619 }
620 }
621
622 return result;
623}
624
625void ListenSocket::EnsureInitialized(
626 EventHandlerImplementation* event_handler) {
627 MonitorLocker ml(&monitor_);
628 if (AcceptEx_ == nullptr) {
629 ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
630 ASSERT(event_handler_ == nullptr);
631 event_handler_ = event_handler;
632 CreateCompletionPort(event_handler_->completion_port());
633 bool isLoaded = LoadAcceptEx();
634 ASSERT(isLoaded);
635 }
636 if (GetAcceptExSockaddrs_ == nullptr) {
637 bool isLoaded = LoadGetAcceptExSockaddrs();
638 ASSERT(isLoaded);
639 }
640}
641
642bool ListenSocket::IsClosed() {
643 return IsClosing() && !HasPendingAccept();
644}
645
646intptr_t Handle::Available() {
647 MonitorLocker ml(&monitor_);
648 if (data_ready_ == nullptr) {
649 return 0;
650 }
651 return data_ready_->GetRemainingLength();
652}
653
654bool Handle::DataReady() {
655 return data_ready_ != nullptr;
656}
657
658intptr_t Handle::Read(void* buffer, intptr_t num_bytes) {
659 MonitorLocker ml(&monitor_);
660 if (data_ready_ == nullptr) {
661 return 0;
662 }
663 num_bytes =
664 data_ready_->Read(buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
665 if (data_ready_->IsEmpty()) {
666 data_ready_ = nullptr;
667 if (!IsClosing() && !IsClosedRead()) {
668 IssueRead();
669 }
670 }
671 return num_bytes;
672}
673
674intptr_t Handle::RecvFrom(void* buffer,
675 intptr_t num_bytes,
676 struct sockaddr* sa,
677 socklen_t sa_len) {
678 MonitorLocker ml(&monitor_);
679 if (data_ready_ == nullptr) {
680 return 0;
681 }
682 num_bytes =
683 data_ready_->Read(buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
684 if (data_ready_->from()->sa_family == AF_INET) {
685 ASSERT(sa_len >= sizeof(struct sockaddr_in));
686 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in));
687 } else {
688 ASSERT(data_ready_->from()->sa_family == AF_INET6);
689 ASSERT(sa_len >= sizeof(struct sockaddr_in6));
690 memmove(sa, data_ready_->from(), sizeof(struct sockaddr_in6));
691 }
692 // Always dispose of the buffer, as UDP messages must be read in their
693 // entirety to match how recvfrom works in a socket.
694 data_ready_ = nullptr;
695 if (!IsClosing() && !IsClosedRead()) {
696 IssueRecvFrom();
697 }
698 return num_bytes;
699}
700
701intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) {
702 MonitorLocker ml(&monitor_);
703 if (HasPendingWrite()) {
704 return 0;
705 }
706 if (num_bytes > kBufferSize) {
707 num_bytes = kBufferSize;
708 }
709 ASSERT(SupportsOverlappedIO());
710 if (completion_port_ == INVALID_HANDLE_VALUE) {
711 return 0;
712 }
713 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
714 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
715 pending_write_->Write(buffer, truncated_bytes);
716 if (!IssueWrite()) {
717 return -1;
718 }
719 return truncated_bytes;
720}
721
722intptr_t Handle::SendTo(const void* buffer,
723 intptr_t num_bytes,
724 struct sockaddr* sa,
725 socklen_t sa_len) {
726 MonitorLocker ml(&monitor_);
727 if (HasPendingWrite()) {
728 return 0;
729 }
730 if (num_bytes > kBufferSize) {
731 ASSERT(kBufferSize >= kMaxUDPPackageLength);
732 // The provided buffer is larger than the maximum UDP datagram size so
733 // return an error immediately. If the buffer were larger and the data were
734 // actually passed to `WSASendTo()` then the operation would fail with
735 // ERROR_INVALID_USER_BUFFER anyway.
736 SetLastError(ERROR_INVALID_USER_BUFFER);
737 return -1;
738 }
739 ASSERT(SupportsOverlappedIO());
740 if (completion_port_ == INVALID_HANDLE_VALUE) {
741 return 0;
742 }
743 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes);
744 pending_write_->Write(buffer, num_bytes);
745 if (!IssueSendTo(sa, sa_len)) {
746 return -1;
747 }
748 return num_bytes;
749}
750
751Mutex* StdHandle::stdin_mutex_ = new Mutex();
752StdHandle* StdHandle::stdin_ = nullptr;
753
754StdHandle* StdHandle::Stdin(HANDLE handle) {
755 MutexLocker ml(stdin_mutex_);
756 if (stdin_ == nullptr) {
757 stdin_ = new StdHandle(handle);
758 }
759 return stdin_;
760}
761
762static void WriteFileThread(uword args) {
763 StdHandle* handle = reinterpret_cast<StdHandle*>(args);
764 handle->RunWriteLoop();
765}
766
767void StdHandle::RunWriteLoop() {
768 MonitorLocker ml(&monitor_);
769 write_thread_running_ = true;
770 thread_id_ = Thread::GetCurrentThreadId();
771 thread_handle_ = OpenThread(SYNCHRONIZE, false, thread_id_);
772 // Notify we have started.
773 ml.Notify();
774
775 while (write_thread_running_) {
776 ml.Wait(Monitor::kNoTimeout);
777 if (HasPendingWrite()) {
778 // We woke up and had a pending write. Execute it.
779 WriteSyncCompleteAsync();
780 }
781 }
782
783 write_thread_exists_ = false;
784 ml.Notify();
785}
786
787void StdHandle::WriteSyncCompleteAsync() {
788 ASSERT(HasPendingWrite());
789
790 DWORD bytes_written = -1;
791 BOOL ok = WriteFile(handle_, pending_write_->GetBufferStart(),
792 pending_write_->GetBufferSize(), &bytes_written, nullptr);
793 if (!ok) {
794 bytes_written = 0;
795 }
796 thread_wrote_ += bytes_written;
797 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped();
798 ok = PostQueuedCompletionStatus(
799 event_handler_->completion_port(), bytes_written,
800 reinterpret_cast<ULONG_PTR>(this), overlapped);
801 if (!ok) {
802 FATAL("PostQueuedCompletionStatus failed");
803 }
804}
805
806intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
807 MonitorLocker ml(&monitor_);
808 if (HasPendingWrite()) {
809 return 0;
810 }
811 if (num_bytes > kBufferSize) {
812 num_bytes = kBufferSize;
813 }
814 // In the case of stdout and stderr, OverlappedIO is not supported.
815 // Here we'll instead use a thread, to make it async.
816 // This code is actually never exposed to the user, as stdout and stderr is
817 // not available as a RawSocket, but only wrapped in a Socket.
818 // Note that we return '0', unless a thread have already completed a write.
819 if (thread_wrote_ > 0) {
820 if (num_bytes > thread_wrote_) {
821 num_bytes = thread_wrote_;
822 }
823 thread_wrote_ -= num_bytes;
824 return num_bytes;
825 }
826 if (!write_thread_exists_) {
827 write_thread_exists_ = true;
828 // The write thread gets a reference to the Handle, which it places in
829 // the events it puts on the IO completion port. The reference is
830 // Released by DeleteIfClosed.
831 Retain();
832 int result = Thread::Start("dart:io WriteFile", WriteFileThread,
833 reinterpret_cast<uword>(this));
834 if (result != 0) {
835 FATAL("Failed to start write file thread %d", result);
836 }
837 while (!write_thread_running_) {
838 // Wait until we the thread is running.
839 ml.Wait(Monitor::kNoTimeout);
840 }
841 }
842 // Only queue up to INT_MAX bytes.
843 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
844 // Create buffer and notify thread about the new handle.
845 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
846 pending_write_->Write(buffer, truncated_bytes);
847 ml.Notify();
848 return 0;
849}
850
851void StdHandle::DoClose() {
852 {
853 MonitorLocker ml(&monitor_);
854 if (write_thread_exists_) {
855 write_thread_running_ = false;
856 ml.Notify();
857 while (write_thread_exists_) {
858 ml.Wait(Monitor::kNoTimeout);
859 }
860 // Join the thread.
861 DWORD res = WaitForSingleObject(thread_handle_, INFINITE);
862 CloseHandle(thread_handle_);
863 ASSERT(res == WAIT_OBJECT_0);
864 }
865 Handle::DoClose();
866 }
867 MutexLocker ml(stdin_mutex_);
868 stdin_->Release();
869 StdHandle::stdin_ = nullptr;
870}
871
872#if defined(DEBUG)
873intptr_t ClientSocket::disconnecting_ = 0;
874#endif
875
876bool ClientSocket::LoadDisconnectEx() {
877 // Load the DisconnectEx function into memory using WSAIoctl.
878 GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
879 DWORD bytes;
880 int status =
881 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
882 &guid_disconnect_ex, sizeof(guid_disconnect_ex), &DisconnectEx_,
883 sizeof(DisconnectEx_), &bytes, nullptr, nullptr);
884 return (status != SOCKET_ERROR);
885}
886
887void ClientSocket::Shutdown(int how) {
888 int rc = shutdown(socket(), how);
889 if (how == SD_RECEIVE) {
890 MarkClosedRead();
891 }
892 if (how == SD_SEND) {
893 MarkClosedWrite();
894 }
895 if (how == SD_BOTH) {
896 MarkClosedRead();
897 MarkClosedWrite();
898 }
899}
900
901void ClientSocket::DoClose() {
902 // Always do a shutdown before initiating a disconnect.
903 shutdown(socket(), SD_BOTH);
904 IssueDisconnect();
906}
907
908bool ClientSocket::IssueRead() {
909 MonitorLocker ml(&monitor_);
910 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
911 ASSERT(!HasPendingRead());
912
913 // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can
914 // handle 64k datagrams.
915 OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(65536);
916
917 DWORD flags;
918 flags = 0;
919 int rc = WSARecv(socket(), buffer->GetWASBUF(), 1, nullptr, &flags,
920 buffer->GetCleanOverlapped(), nullptr);
921 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
922 pending_read_ = buffer;
923 return true;
924 }
925 OverlappedBuffer::DisposeBuffer(buffer);
926 pending_read_ = nullptr;
927 HandleIssueError();
928 return false;
929}
930
931bool ClientSocket::IssueWrite() {
932 MonitorLocker ml(&monitor_);
933 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
934 ASSERT(HasPendingWrite());
935 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
936
937 int rc = WSASend(socket(), pending_write_->GetWASBUF(), 1, nullptr, 0,
938 pending_write_->GetCleanOverlapped(), nullptr);
939 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
940 return true;
941 }
942 OverlappedBuffer::DisposeBuffer(pending_write_);
943 pending_write_ = nullptr;
944 HandleIssueError();
945 return false;
946}
947
948void ClientSocket::IssueDisconnect() {
949 OverlappedBuffer* buffer = OverlappedBuffer::AllocateDisconnectBuffer();
950 BOOL ok =
951 DisconnectEx_(socket(), buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0);
952 // DisconnectEx works like other OverlappedIO APIs, where we can get either an
953 // immediate success or delayed operation by WSA_IO_PENDING being set.
954 if (ok || (WSAGetLastError() != WSA_IO_PENDING)) {
955 DisconnectComplete(buffer);
956 }
957 // When the Dart side receives this event, it may decide to close its Dart
958 // ports. When all ports are closed, the VM will shut down. The EventHandler
959 // will then shut down. If the EventHandler shuts down before this
960 // asynchronous disconnect finishes, this ClientSocket will be leaked.
961 // TODO(dart:io): Retain a list of client sockets that are in the process of
962 // disconnecting. Disconnect them forcefully, and clean up their resources
963 // when the EventHandler shuts down.
964 NotifyAllDartPorts(1 << kDestroyedEvent);
965 RemoveAllPorts();
966#if defined(DEBUG)
967 disconnecting_++;
968#endif
969}
970
971void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) {
972 OverlappedBuffer::DisposeBuffer(buffer);
973 closesocket(socket());
974 data_ready_ = nullptr;
975 mark_closed();
976#if defined(DEBUG)
977 disconnecting_--;
978#endif
979}
980
981void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
982 OverlappedBuffer::DisposeBuffer(buffer);
983 // Update socket to support full socket API, after ConnectEx completed.
984 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, nullptr, 0);
985 // If the port is set, we already listen for this socket in Dart.
986 // Handle the cases here.
987 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) {
988 IssueRead();
989 }
990 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) {
991 Dart_Port port = NextNotifyDartPort(1 << kOutEvent);
992 DartUtils::PostInt32(port, 1 << kOutEvent);
993 }
994}
995
996void ClientSocket::EnsureInitialized(
997 EventHandlerImplementation* event_handler) {
998 MonitorLocker ml(&monitor_);
999 if (completion_port_ == INVALID_HANDLE_VALUE) {
1000 ASSERT(event_handler_ == nullptr);
1001 event_handler_ = event_handler;
1002 CreateCompletionPort(event_handler_->completion_port());
1003 }
1004}
1005
1006bool ClientSocket::IsClosed() {
1007 return connected_ && closed_ && !HasPendingRead() && !HasPendingWrite();
1008}
1009
1010bool ClientSocket::PopulateRemoteAddr(RawAddr& addr) {
1011 if (!remote_addr_) {
1012 return false;
1013 }
1014 addr = *remote_addr_;
1015 return true;
1016}
1017
1018bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
1019 MonitorLocker ml(&monitor_);
1020 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
1021 ASSERT(HasPendingWrite());
1022 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
1023
1024 int rc = WSASendTo(socket(), pending_write_->GetWASBUF(), 1, nullptr, 0, sa,
1025 sa_len, pending_write_->GetCleanOverlapped(), nullptr);
1026 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
1027 return true;
1028 }
1029 OverlappedBuffer::DisposeBuffer(pending_write_);
1030 pending_write_ = nullptr;
1031 HandleIssueError();
1032 return false;
1033}
1034
1035bool DatagramSocket::IssueRecvFrom() {
1036 MonitorLocker ml(&monitor_);
1037 ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
1038 ASSERT(!HasPendingRead());
1039
1040 OverlappedBuffer* buffer =
1041 OverlappedBuffer::AllocateRecvFromBuffer(kMaxUDPPackageLength);
1042
1043 DWORD flags;
1044 flags = 0;
1045 int rc = WSARecvFrom(socket(), buffer->GetWASBUF(), 1, nullptr, &flags,
1046 buffer->from(), buffer->from_len_addr(),
1047 buffer->GetCleanOverlapped(), nullptr);
1048 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
1049 pending_read_ = buffer;
1050 return true;
1051 }
1052 OverlappedBuffer::DisposeBuffer(buffer);
1053 pending_read_ = nullptr;
1054 HandleIssueError();
1055 return false;
1056}
1057
1058void DatagramSocket::EnsureInitialized(
1059 EventHandlerImplementation* event_handler) {
1060 MonitorLocker ml(&monitor_);
1061 if (completion_port_ == INVALID_HANDLE_VALUE) {
1062 ASSERT(event_handler_ == nullptr);
1063 event_handler_ = event_handler;
1064 CreateCompletionPort(event_handler_->completion_port());
1065 }
1066}
1067
1068bool DatagramSocket::IsClosed() {
1069 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
1070}
1071
1072void DatagramSocket::DoClose() {
1073 // Just close the socket. This will cause any queued requests to be aborted.
1074 closesocket(socket());
1075 MarkClosedRead();
1076 MarkClosedWrite();
1078}
1079
1080void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
1081 if (msg->id == kTimerId) {
1082 // Change of timeout request. Just set the new timeout and port as the
1083 // completion thread will use the new timeout value for its next wait.
1084 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
1085 } else if (msg->id == kShutdownId) {
1086 shutdown_ = true;
1087 } else {
1088 Socket* socket = reinterpret_cast<Socket*>(msg->id);
1089 RefCntReleaseScope<Socket> rs(socket);
1090 if (socket->fd() == -1) {
1091 return;
1092 }
1093 Handle* handle = reinterpret_cast<Handle*>(socket->fd());
1094 ASSERT(handle != nullptr);
1095
1096 if (handle->is_listen_socket()) {
1097 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(handle);
1098 listen_socket->EnsureInitialized(this);
1099
1100 MonitorLocker ml(&listen_socket->monitor_);
1101
1102 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1103 listen_socket->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1104 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1105 // `events` can only have kInEvent/kOutEvent flags set.
1106 intptr_t events = msg->data & EVENT_MASK;
1107 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1108 listen_socket->SetPortAndMask(msg->dart_port, events);
1109 TryDispatchingPendingAccepts(listen_socket);
1110 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1111 if (msg->dart_port != ILLEGAL_PORT) {
1112 listen_socket->RemovePort(msg->dart_port);
1113 }
1114
1115 // We only close the socket file descriptor from the operating
1116 // system if there are no other dart socket objects which
1117 // are listening on the same (address, port) combination.
1118 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
1119 MutexLocker locker(registry->mutex());
1120 if (registry->CloseSafe(socket)) {
1121 ASSERT(listen_socket->Mask() == 0);
1122 listen_socket->Close();
1123 socket->CloseFd();
1124 }
1125 socket->SetClosedFd();
1126 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
1127 } else {
1128 UNREACHABLE();
1129 }
1130 } else {
1131 handle->EnsureInitialized(this);
1132 MonitorLocker ml(&handle->monitor_);
1133
1134 if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
1135 handle->ReturnTokens(msg->dart_port, TOKEN_COUNT(msg->data));
1136 } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1137 // `events` can only have kInEvent/kOutEvent flags set.
1138 intptr_t events = msg->data & EVENT_MASK;
1139 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1140
1141 handle->SetPortAndMask(msg->dart_port, events);
1142
1143 // Issue a read.
1144 if ((handle->Mask() & (1 << kInEvent)) != 0) {
1145 if (handle->is_datagram_socket()) {
1146 handle->IssueRecvFrom();
1147 } else if (handle->is_client_socket()) {
1148 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1149 handle->IssueRead();
1150 }
1151 } else {
1152 handle->IssueRead();
1153 }
1154 }
1155
1156 // If out events (can write events) have been requested, and there
1157 // are no pending writes, meaning any writes are already complete,
1158 // post an out event immediately.
1159 intptr_t out_event_mask = 1 << kOutEvent;
1160 if ((events & out_event_mask) != 0) {
1161 if (!handle->HasPendingWrite()) {
1162 if (handle->is_client_socket()) {
1163 if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
1164 intptr_t event_mask = 1 << kOutEvent;
1165 if ((handle->Mask() & event_mask) != 0) {
1166 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1167 DartUtils::PostInt32(port, event_mask);
1168 }
1169 }
1170 } else {
1171 if ((handle->Mask() & out_event_mask) != 0) {
1172 Dart_Port port = handle->NextNotifyDartPort(out_event_mask);
1173 DartUtils::PostInt32(port, out_event_mask);
1174 }
1175 }
1176 }
1177 }
1178 // Similarly, if in events (can read events) have been requested, and
1179 // there is pending data available, post an in event immediately.
1180 intptr_t in_event_mask = 1 << kInEvent;
1181 if ((events & in_event_mask) != 0) {
1182 if (handle->data_ready_ != nullptr &&
1183 !handle->data_ready_->IsEmpty()) {
1184 if ((handle->Mask() & in_event_mask) != 0) {
1185 Dart_Port port = handle->NextNotifyDartPort(in_event_mask);
1186 DartUtils::PostInt32(port, in_event_mask);
1187 }
1188 }
1189 }
1190 } else if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
1191 ASSERT(handle->is_client_socket());
1192
1193 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1194 client_socket->Shutdown(SD_RECEIVE);
1195 } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
1196 ASSERT(handle->is_client_socket());
1197
1198 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(handle);
1199 client_socket->Shutdown(SD_SEND);
1200 } else if (IS_COMMAND(msg->data, kCloseCommand)) {
1201 if (IS_SIGNAL_SOCKET(msg->data)) {
1202 Process::ClearSignalHandlerByFd(socket->fd(), socket->isolate_port());
1203 }
1204 handle->SetPortAndMask(msg->dart_port, 0);
1205 handle->Close();
1206 socket->CloseFd();
1207 } else {
1208 UNREACHABLE();
1209 }
1210 }
1211
1212 DeleteIfClosed(handle);
1213 }
1214}
1215
1216void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
1217 OverlappedBuffer* buffer) {
1218 listen_socket->AcceptComplete(buffer, completion_port_);
1219
1220 {
1221 MonitorLocker ml(&listen_socket->monitor_);
1222 TryDispatchingPendingAccepts(listen_socket);
1223 }
1224
1225 DeleteIfClosed(listen_socket);
1226}
1227
1228void EventHandlerImplementation::TryDispatchingPendingAccepts(
1229 ListenSocket* listen_socket) {
1230 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
1231 intptr_t event_mask = 1 << kInEvent;
1232 for (int i = 0; (i < listen_socket->accepted_count()) &&
1233 (listen_socket->Mask() == event_mask);
1234 i++) {
1235 Dart_Port port = listen_socket->NextNotifyDartPort(event_mask);
1236 DartUtils::PostInt32(port, event_mask);
1237 }
1238 }
1239}
1240
1241void EventHandlerImplementation::HandleRead(Handle* handle,
1242 int bytes,
1243 OverlappedBuffer* buffer) {
1244 buffer->set_data_length(bytes);
1245 handle->ReadComplete(buffer);
1246 if (bytes > 0) {
1247 if (!handle->IsClosing()) {
1248 int event_mask = 1 << kInEvent;
1249 if ((handle->Mask() & event_mask) != 0) {
1250 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1251 DartUtils::PostInt32(port, event_mask);
1252 }
1253 }
1254 } else {
1255 handle->MarkClosedRead();
1256 if (bytes == 0) {
1257 HandleClosed(handle);
1258 } else {
1259 HandleError(handle);
1260 }
1261 }
1262
1263 DeleteIfClosed(handle);
1264}
1265
1266void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
1267 int bytes,
1268 OverlappedBuffer* buffer) {
1269 ASSERT(handle->is_datagram_socket());
1270 if (bytes >= 0) {
1271 buffer->set_data_length(bytes);
1272 handle->ReadComplete(buffer);
1273 if (!handle->IsClosing()) {
1274 int event_mask = 1 << kInEvent;
1275 if ((handle->Mask() & event_mask) != 0) {
1276 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1277 DartUtils::PostInt32(port, event_mask);
1278 }
1279 }
1280 } else {
1281 HandleError(handle);
1282 }
1283
1284 DeleteIfClosed(handle);
1285}
1286
1287void EventHandlerImplementation::HandleWrite(Handle* handle,
1288 int bytes,
1289 OverlappedBuffer* buffer) {
1290 handle->WriteComplete(buffer);
1291
1292 if (bytes >= 0) {
1293 if (!handle->IsError() && !handle->IsClosing()) {
1294 int event_mask = 1 << kOutEvent;
1295 ASSERT(!handle->is_client_socket() ||
1296 reinterpret_cast<ClientSocket*>(handle)->is_connected());
1297 if ((handle->Mask() & event_mask) != 0) {
1298 Dart_Port port = handle->NextNotifyDartPort(event_mask);
1299 DartUtils::PostInt32(port, event_mask);
1300 }
1301 }
1302 } else {
1303 HandleError(handle);
1304 }
1305
1306 DeleteIfClosed(handle);
1307}
1308
1309void EventHandlerImplementation::HandleDisconnect(ClientSocket* client_socket,
1310 int bytes,
1311 OverlappedBuffer* buffer) {
1312 client_socket->DisconnectComplete(buffer);
1313 DeleteIfClosed(client_socket);
1314}
1315
1316void EventHandlerImplementation::HandleConnect(ClientSocket* client_socket,
1317 int bytes,
1318 OverlappedBuffer* buffer) {
1319 if (bytes < 0) {
1320 HandleError(client_socket);
1321 OverlappedBuffer::DisposeBuffer(buffer);
1322 } else {
1323 client_socket->ConnectComplete(buffer);
1324 }
1325 client_socket->mark_connected();
1326 DeleteIfClosed(client_socket);
1327}
1328
1329void EventHandlerImplementation::HandleTimeout() {
1330 if (!timeout_queue_.HasTimeout()) {
1331 return;
1332 }
1333 DartUtils::PostNull(timeout_queue_.CurrentPort());
1334 timeout_queue_.RemoveCurrent();
1335}
1336
1337void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
1338 ULONG_PTR key,
1339 OVERLAPPED* overlapped) {
1340 OverlappedBuffer* buffer = OverlappedBuffer::GetFromOverlapped(overlapped);
1341 switch (buffer->operation()) {
1342 case OverlappedBuffer::kAccept: {
1343 ListenSocket* listen_socket = reinterpret_cast<ListenSocket*>(key);
1344 HandleAccept(listen_socket, buffer);
1345 break;
1346 }
1347 case OverlappedBuffer::kRead: {
1348 Handle* handle = reinterpret_cast<Handle*>(key);
1349 HandleRead(handle, bytes, buffer);
1350 break;
1351 }
1352 case OverlappedBuffer::kRecvFrom: {
1353 Handle* handle = reinterpret_cast<Handle*>(key);
1354 HandleRecvFrom(handle, bytes, buffer);
1355 break;
1356 }
1357 case OverlappedBuffer::kWrite:
1358 case OverlappedBuffer::kSendTo: {
1359 Handle* handle = reinterpret_cast<Handle*>(key);
1360 HandleWrite(handle, bytes, buffer);
1361 break;
1362 }
1363 case OverlappedBuffer::kDisconnect: {
1364 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key);
1365 HandleDisconnect(client_socket, bytes, buffer);
1366 break;
1367 }
1368 case OverlappedBuffer::kConnect: {
1369 ClientSocket* client_socket = reinterpret_cast<ClientSocket*>(key);
1370 HandleConnect(client_socket, bytes, buffer);
1371 break;
1372 }
1373 default:
1374 UNREACHABLE();
1375 }
1376}
1377
1378void EventHandlerImplementation::HandleCompletionOrInterrupt(
1379 BOOL ok,
1380 DWORD bytes,
1381 ULONG_PTR key,
1382 OVERLAPPED* overlapped) {
1383 if (!ok) {
1384 // Treat ERROR_CONNECTION_ABORTED as connection closed.
1385 // The error ERROR_OPERATION_ABORTED is set for pending
1386 // accept requests for a listen socket which is closed.
1387 // ERROR_NETNAME_DELETED occurs when the client closes
1388 // the socket it is reading from.
1389 DWORD last_error = GetLastError();
1390 if ((last_error == ERROR_CONNECTION_ABORTED) ||
1391 (last_error == ERROR_OPERATION_ABORTED) ||
1392 (last_error == ERROR_NETNAME_DELETED) ||
1393 (last_error == ERROR_BROKEN_PIPE)) {
1394 ASSERT(bytes == 0);
1395 HandleIOCompletion(bytes, key, overlapped);
1396 } else if (last_error == ERROR_MORE_DATA) {
1397 // Don't ASSERT no bytes in this case. This can happen if the receive
1398 // buffer for datagram sockets is too small to contain a full datagram,
1399 // and in this case bytes hold the bytes that was read.
1400 HandleIOCompletion(-1, key, overlapped);
1401 } else {
1402 ASSERT(bytes == 0);
1403 HandleIOCompletion(-1, key, overlapped);
1404 }
1405 } else if (key == NULL) {
1406 // A key of nullptr signals an interrupt message.
1407 InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
1408 HandleInterrupt(msg);
1409 delete msg;
1410 } else {
1411 HandleIOCompletion(bytes, key, overlapped);
1412 }
1413}
1414
1415EventHandlerImplementation::EventHandlerImplementation() {
1416 handler_thread_id_ = Thread::kInvalidThreadId;
1417 handler_thread_handle_ = nullptr;
1418 completion_port_ =
1419 CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, NULL, 1);
1420 if (completion_port_ == nullptr) {
1421 FATAL("Completion port creation failed");
1422 }
1423 shutdown_ = false;
1424}
1425
1426EventHandlerImplementation::~EventHandlerImplementation() {
1427 // Join the handler thread.
1428 DWORD res = WaitForSingleObject(handler_thread_handle_, INFINITE);
1429 CloseHandle(handler_thread_handle_);
1430 ASSERT(res == WAIT_OBJECT_0);
1431 CloseHandle(completion_port_);
1432}
1433
1434int64_t EventHandlerImplementation::GetTimeout() {
1435 if (!timeout_queue_.HasTimeout()) {
1436 return kInfinityTimeout;
1437 }
1438 int64_t millis =
1439 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
1440 return (millis < 0) ? 0 : millis;
1441}
1442
1443void EventHandlerImplementation::SendData(intptr_t id,
1444 Dart_Port dart_port,
1445 int64_t data) {
1446 InterruptMessage* msg = new InterruptMessage;
1447 msg->id = id;
1448 msg->dart_port = dart_port;
1449 msg->data = data;
1450 BOOL ok = PostQueuedCompletionStatus(completion_port_, 0, NULL,
1451 reinterpret_cast<OVERLAPPED*>(msg));
1452 if (!ok) {
1453 FATAL("PostQueuedCompletionStatus failed");
1454 }
1455}
1456
1457void EventHandlerImplementation::EventHandlerEntry(uword args) {
1458 EventHandler* handler = reinterpret_cast<EventHandler*>(args);
1459 EventHandlerImplementation* handler_impl = &handler->delegate_;
1460 ASSERT(handler_impl != nullptr);
1461
1462 {
1463 MonitorLocker ml(&handler_impl->startup_monitor_);
1464 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
1465 handler_impl->handler_thread_handle_ =
1466 OpenThread(SYNCHRONIZE, false, handler_impl->handler_thread_id_);
1467 ml.Notify();
1468 }
1469
1470 DWORD bytes;
1471 ULONG_PTR key;
1472 OVERLAPPED* overlapped;
1473 BOOL ok;
1474 while (!handler_impl->shutdown_) {
1475 int64_t millis = handler_impl->GetTimeout();
1476 ASSERT(millis == kInfinityTimeout || millis >= 0);
1477 if (millis > kMaxInt32) {
1478 millis = kMaxInt32;
1479 }
1480 ASSERT(sizeof(int32_t) == sizeof(DWORD));
1481 DWORD timeout = static_cast<DWORD>(millis);
1482 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1483 &key, &overlapped, timeout);
1484
1485 if (!ok && (overlapped == nullptr)) {
1486 if (GetLastError() == ERROR_ABANDONED_WAIT_0) {
1487 // The completion port should never be closed.
1488 Syslog::Print("Completion port closed\n");
1489 UNREACHABLE();
1490 } else {
1491 // Timeout is signalled by false result and nullptr in overlapped.
1492 handler_impl->HandleTimeout();
1493 }
1494 } else {
1495 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
1496 }
1497 }
1498
1499// In a Debug build, drain the IO completion port to make sure we aren't
1500// leaking any (non-disconnecting) Handles. In a Release build, we don't care
1501// because the VM is going down, and the asserts below are Debug-only.
1502#if defined(DEBUG)
1503 while (true) {
1504 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1505 &key, &overlapped, 0);
1506 if (!ok && (overlapped == nullptr)) {
1507 // There was an error or nothing is ready. Assume the port is drained.
1508 break;
1509 }
1510 handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
1511 }
1512
1513 // The eventhandler thread is going down so there should be no more live
1514 // Handles or Sockets.
1515 // TODO(dart:io): It would be nice to be able to assert here that:
1516 // ReferenceCounted<Handle>::instances() == 0;
1517 // However, we cannot at the moment. See the TODO on:
1518 // ClientSocket::IssueDisconnect()
1519 // Furthermore, if the Dart program references stdin, but does not
1520 // explicitly close it, then the StdHandle for it will be leaked to here.
1521 const intptr_t stdin_leaked = (StdHandle::StdinPtr() == nullptr) ? 0 : 1;
1522 DEBUG_ASSERT(ReferenceCounted<Handle>::instances() ==
1523 ClientSocket::disconnecting() + stdin_leaked);
1524 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
1525#endif // defined(DEBUG)
1526 handler->NotifyShutdownDone();
1527}
1528
1529void EventHandlerImplementation::Start(EventHandler* handler) {
1530 int result = Thread::Start("dart:io EventHandler", EventHandlerEntry,
1531 reinterpret_cast<uword>(handler));
1532 if (result != 0) {
1533 FATAL("Failed to start event handler thread %d", result);
1534 }
1535
1536 {
1537 MonitorLocker ml(&startup_monitor_);
1538 while (handler_thread_id_ == Thread::kInvalidThreadId) {
1539 ml.Wait();
1540 }
1541 }
1542}
1543
1544void EventHandlerImplementation::Shutdown() {
1545 SendData(kShutdownId, 0, 0);
1546}
1547
1548} // namespace bin
1549} // namespace dart
1550
1551#endif // defined(DART_HOST_OS_WINDOWS)
static uint32_t buffer_size(uint32_t offset, uint32_t maxAlignment)
static bool ok(int result)
static void operation(T operation, uint32_t &a, uint32_t b, uint32_t c, uint32_t d, uint32_t x, uint8_t s, uint32_t t)
Definition SkMD5.cpp:144
static const size_t kBufferSize
Definition SkString.cpp:27
#define UNREACHABLE()
Definition assert.h:248
#define DEBUG_ASSERT(cond)
Definition assert.h:321
Handle(intptr_t handle)
int Read(void *buffer, int num_bytes)
static void DisposeBuffer(OverlappedBuffer *buffer)
static OverlappedBuffer * GetFromOverlapped(OVERLAPPED *overlapped)
static OverlappedBuffer * AllocateRecvFromBuffer(int buffer_size)
static OverlappedBuffer * AllocateAcceptBuffer(int buffer_size)
static OverlappedBuffer * AllocateConnectBuffer()
int Write(const void *buffer, int num_bytes)
static OverlappedBuffer * AllocateDisconnectBuffer()
static OverlappedBuffer * AllocateSendToBuffer(int buffer_size)
static OverlappedBuffer * AllocateReadBuffer(int buffer_size)
static OverlappedBuffer * AllocateWriteBuffer(int buffer_size)
#define ILLEGAL_PORT
Definition dart_api.h:1530
int64_t Dart_Port
Definition dart_api.h:1524
#define ASSERT(E)
#define IS_COMMAND(data, command_bit)
#define EVENT_MASK
#define IS_SIGNAL_SOCKET(data)
#define TOKEN_COUNT(data)
struct MyStruct s
#define FATAL(error)
FlutterSemanticsFlag flags
G_BEGIN_DECLS G_MODULE_EXPORT FlValue * args
static const uint8_t buffer[]
const uint8_t uint32_t uint32_t GError ** error
GAsyncResult * result
int SendData(MHD_Connection *connection, const SkData *data, const char *type, bool setContentDisposition, const char *dispositionString)
Definition Response.cpp:64
void ReadFile(uint8_t **data, intptr_t *file_len, void *stream)
bool WriteFile(const std::string &path, const char *data, ssize_t size)
Definition files.cc:69
uintptr_t uword
Definition globals.h:501
constexpr int32_t kMaxInt32
Definition globals.h:483
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
Definition switches.h:87
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot data
Definition switches.h:41
timeout(deadline, cmd)
fuchsia::ui::composition::ParentViewportWatcherHandle handle_
const uintptr_t id
int BOOL
#define SYNCHRONIZE
#define INVALID_HANDLE_VALUE
WINBASEAPI VOID WINAPI SetLastError(_In_ DWORD dwErrCode)
WINBASEAPI _Check_return_ _Post_equals_last_error_ DWORD WINAPI GetLastError(VOID)
void * HANDLE
struct _OVERLAPPED OVERLAPPED
__w64 unsigned long ULONG_PTR
unsigned long DWORD
struct _GUID GUID