6#if defined(DART_HOST_OS_WINDOWS)
35static constexpr int kStdOverlappedBufferSize = 16 * 1024;
36static constexpr int kMaxUDPPackageLength = 64 * 1024;
42static constexpr int kAcceptExAddressAdditionalBytes = 16;
43static constexpr int kAcceptExAddressStorageSize =
44 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes;
46OverlappedBuffer* OverlappedBuffer::AllocateBuffer(
int buffer_size,
65 buffer_size +=
sizeof(socklen_t) +
sizeof(
struct sockaddr_storage);
91 CONTAINING_RECORD(overlapped, OverlappedBuffer, overlapped_);
105 ASSERT(num_bytes == buflen_);
107 data_length_ = num_bytes;
113 return data_length_ - index_;
117 : ReferenceCounted(),
118 DescriptorInfoBase(handle),
122 event_handler_(nullptr),
124 pending_read_(nullptr),
125 pending_write_(nullptr),
126 last_error_(NOERROR),
127 read_thread_id_(Thread::kInvalidThreadId),
128 read_thread_handle_(nullptr),
129 read_thread_starting_(
false),
130 read_thread_finished_(
false),
135bool Handle::CreateCompletionPort(
HANDLE completion_port) {
140 completion_port_ = CreateIoCompletionPort(
141 handle(), completion_port,
reinterpret_cast<ULONG_PTR>(
this), 0);
142 return (completion_port_ !=
nullptr);
145void Handle::Close() {
146 MonitorLocker ml(&monitor_);
147 if (!SupportsOverlappedIO()) {
154 if (IsWindows10OrGreater()) {
171void Handle::DoClose() {
172 if (!IsHandleClosed()) {
178bool Handle::HasPendingRead() {
179 return pending_read_ !=
nullptr;
182bool Handle::HasPendingWrite() {
183 return pending_write_ !=
nullptr;
186void Handle::WaitForReadThreadStarted() {
187 MonitorLocker ml(&monitor_);
188 while (read_thread_starting_) {
193void Handle::WaitForReadThreadFinished() {
196 MonitorLocker ml(&monitor_);
197 if (read_thread_id_ != Thread::kInvalidThreadId) {
198 while (!read_thread_finished_) {
201 read_thread_finished_ =
false;
202 read_thread_id_ = Thread::kInvalidThreadId;
203 to_join = read_thread_handle_;
204 read_thread_handle_ =
nullptr;
207 if (to_join !=
nullptr) {
209 DWORD res = WaitForSingleObject(to_join, INFINITE);
210 CloseHandle(to_join);
211 ASSERT(res == WAIT_OBJECT_0);
215void Handle::ReadComplete(OverlappedBuffer*
buffer) {
216 WaitForReadThreadStarted();
218 MonitorLocker ml(&monitor_);
221 ASSERT(data_ready_ ==
nullptr);
223 data_ready_.reset(pending_read_);
225 OverlappedBuffer::DisposeBuffer(
buffer);
227 pending_read_ =
nullptr;
229 WaitForReadThreadFinished();
232void Handle::RecvFromComplete(OverlappedBuffer*
buffer) {
236void Handle::WriteComplete(OverlappedBuffer*
buffer) {
237 MonitorLocker ml(&monitor_);
240 OverlappedBuffer::DisposeBuffer(
buffer);
241 pending_write_ =
nullptr;
244static void ReadFileThread(uword
args) {
245 Handle* handle =
reinterpret_cast<Handle*
>(
args);
246 handle->ReadSyncCompleteAsync();
249void Handle::NotifyReadThreadStarted() {
250 MonitorLocker ml(&monitor_);
251 ASSERT(read_thread_starting_);
252 ASSERT(read_thread_id_ == Thread::kInvalidThreadId);
253 read_thread_id_ = Thread::GetCurrentThreadId();
254 read_thread_handle_ = OpenThread(
SYNCHRONIZE,
false, read_thread_id_);
255 read_thread_starting_ =
false;
259void Handle::NotifyReadThreadFinished() {
260 MonitorLocker ml(&monitor_);
261 ASSERT(!read_thread_finished_);
262 ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
263 read_thread_finished_ =
true;
267void Handle::ReadSyncCompleteAsync() {
268 NotifyReadThreadStarted();
270 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
273 if (GetFileType(
handle_) == FILE_TYPE_CHAR) {
276 char* buffer_start = pending_read_->GetBufferStart();
277 DWORD bytes_read = 0;
282 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped();
284 PostQueuedCompletionStatus(event_handler_->completion_port(), bytes_read,
285 reinterpret_cast<ULONG_PTR>(
this), overlapped);
287 FATAL(
"PostQueuedCompletionStatus failed");
289 NotifyReadThreadFinished();
292bool Handle::IssueRead() {
293 ASSERT(type_ != kListenSocket);
294 ASSERT(!HasPendingRead());
296 if (SupportsOverlappedIO()) {
301 nullptr,
buffer->GetCleanOverlapped());
307 OverlappedBuffer::DisposeBuffer(
buffer);
313 read_thread_starting_ =
true;
314 int result = Thread::Start(
"dart:io ReadFile", ReadFileThread,
315 reinterpret_cast<uword>(
this));
317 FATAL(
"Failed to start read file thread %d",
result);
323bool Handle::IssueRecvFrom() {
327bool Handle::IssueWrite() {
328 MonitorLocker ml(&monitor_);
329 ASSERT(type_ != kListenSocket);
331 ASSERT(HasPendingWrite());
332 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
334 OverlappedBuffer*
buffer = pending_write_;
337 nullptr,
buffer->GetCleanOverlapped());
343 OverlappedBuffer::DisposeBuffer(
buffer);
348bool Handle::IssueSendTo(
struct sockaddr* sa, socklen_t sa_len) {
352static void HandleClosed(Handle* handle) {
353 if (!handle->IsClosing()) {
355 handle->NotifyAllDartPorts(event_mask);
359static void HandleError(Handle* handle) {
360 handle->set_last_error(WSAGetLastError());
362 if (!handle->IsClosing()) {
363 handle->NotifyAllDartPorts(1 << kErrorEvent);
367void Handle::HandleIssueError() {
369 if (
error == ERROR_BROKEN_PIPE) {
377void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
378 MonitorLocker ml(&monitor_);
379 event_handler_ = event_handler;
381 if (SupportsOverlappedIO()) {
382 CreateCompletionPort(event_handler_->completion_port());
388 completion_port_ = event_handler_->completion_port();
393bool FileHandle::IsClosed() {
394 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
397void DirectoryWatchHandle::EnsureInitialized(
398 EventHandlerImplementation* event_handler) {
399 MonitorLocker ml(&monitor_);
400 event_handler_ = event_handler;
402 CreateCompletionPort(event_handler_->completion_port());
406bool DirectoryWatchHandle::IsClosed() {
407 return IsClosing() && !HasPendingRead();
410bool DirectoryWatchHandle::IssueRead() {
413 if (HasPendingRead() || (data_ready_ !=
nullptr)) {
422 BOOL ok = ReadDirectoryChangesW(
424 events_,
nullptr,
buffer->GetCleanOverlapped(),
nullptr);
429 pending_read_ =
nullptr;
430 OverlappedBuffer::DisposeBuffer(
buffer);
434void DirectoryWatchHandle::Stop() {
435 MonitorLocker ml(&monitor_);
438 if (HasPendingRead()) {
439 CancelIoEx(handle(), pending_read_->GetCleanOverlapped());
446void SocketHandle::HandleIssueError() {
447 int error = WSAGetLastError();
448 if (
error == WSAECONNRESET) {
453 WSASetLastError(
error);
456bool ListenSocket::LoadAcceptEx() {
458 GUID guid_accept_ex = WSAID_ACCEPTEX;
460 int status = WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
461 &guid_accept_ex,
sizeof(guid_accept_ex), &AcceptEx_,
462 sizeof(AcceptEx_), &bytes,
nullptr,
nullptr);
463 return (status != SOCKET_ERROR);
466bool ListenSocket::LoadGetAcceptExSockaddrs() {
468 GUID guid_get_accept_ex_sockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
471 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
472 &guid_get_accept_ex_sockaddrs,
473 sizeof(guid_get_accept_ex_sockaddrs), &GetAcceptExSockaddrs_,
474 sizeof(GetAcceptExSockaddrs_), &bytes,
nullptr,
nullptr);
475 return (status != SOCKET_ERROR);
478bool ListenSocket::IssueAccept() {
479 MonitorLocker ml(&monitor_);
481 OverlappedBuffer*
buffer =
482 OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize);
485 ok = AcceptEx_(socket(),
buffer->client(),
buffer->GetBufferStart(),
487 kAcceptExAddressStorageSize, kAcceptExAddressStorageSize,
488 &received,
buffer->GetCleanOverlapped());
490 if (WSAGetLastError() != WSA_IO_PENDING) {
491 int error = WSAGetLastError();
492 closesocket(
buffer->client());
493 OverlappedBuffer::DisposeBuffer(
buffer);
494 WSASetLastError(
error);
499 pending_accept_count_++;
504void ListenSocket::AcceptComplete(OverlappedBuffer*
buffer,
506 MonitorLocker ml(&monitor_);
510 int rc = setsockopt(
buffer->client(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
511 reinterpret_cast<char*
>(&
s),
sizeof(
s));
512 if (rc == NO_ERROR) {
517 LPSOCKADDR local_addr;
518 int local_addr_length;
519 LPSOCKADDR remote_addr;
520 int remote_addr_length;
521 GetAcceptExSockaddrs_(
522 buffer->GetBufferStart(), 0, kAcceptExAddressStorageSize,
523 kAcceptExAddressStorageSize, &local_addr, &local_addr_length,
524 &remote_addr, &remote_addr_length);
525 RawAddr* raw_remote_addr =
new RawAddr;
526 memmove(raw_remote_addr, remote_addr, remote_addr_length);
529 ClientSocket* client_socket =
new ClientSocket(
530 buffer->client(), std::unique_ptr<RawAddr>(raw_remote_addr));
531 client_socket->mark_connected();
532 client_socket->CreateCompletionPort(completion_port);
533 if (accepted_head_ ==
nullptr) {
534 accepted_head_ = client_socket;
535 accepted_tail_ = client_socket;
537 ASSERT(accepted_tail_ !=
nullptr);
538 accepted_tail_->set_next(client_socket);
539 accepted_tail_ = client_socket;
543 closesocket(
buffer->client());
547 closesocket(
buffer->client());
550 pending_accept_count_--;
551 OverlappedBuffer::DisposeBuffer(
buffer);
554static void DeleteIfClosed(Handle* handle) {
555 if (handle->IsClosed()) {
557 handle->set_event_handler(
nullptr);
558 handle->NotifyAllDartPorts(1 << kDestroyedEvent);
559 handle->RemoveAllPorts();
566void ListenSocket::DoClose() {
567 closesocket(socket());
569 while (CanAccept()) {
571 ClientSocket* client = Accept();
572 if (client !=
nullptr) {
581 DeleteIfClosed(client);
590 GetAcceptExSockaddrs_ =
nullptr;
593bool ListenSocket::CanAccept() {
594 MonitorLocker ml(&monitor_);
595 return accepted_head_ !=
nullptr;
598ClientSocket* ListenSocket::Accept() {
599 MonitorLocker ml(&monitor_);
601 ClientSocket*
result =
nullptr;
603 if (accepted_head_ !=
nullptr) {
605 accepted_head_ = accepted_head_->next();
606 if (accepted_head_ ==
nullptr) {
607 accepted_tail_ =
nullptr;
609 result->set_next(
nullptr);
613 if (pending_accept_count_ < 5) {
616 if (!IssueAccept()) {
625void ListenSocket::EnsureInitialized(
626 EventHandlerImplementation* event_handler) {
627 MonitorLocker ml(&monitor_);
628 if (AcceptEx_ ==
nullptr) {
630 ASSERT(event_handler_ ==
nullptr);
631 event_handler_ = event_handler;
632 CreateCompletionPort(event_handler_->completion_port());
633 bool isLoaded = LoadAcceptEx();
636 if (GetAcceptExSockaddrs_ ==
nullptr) {
637 bool isLoaded = LoadGetAcceptExSockaddrs();
642bool ListenSocket::IsClosed() {
643 return IsClosing() && !HasPendingAccept();
646intptr_t Handle::Available() {
647 MonitorLocker ml(&monitor_);
648 if (data_ready_ ==
nullptr) {
651 return data_ready_->GetRemainingLength();
654bool Handle::DataReady() {
655 return data_ready_ !=
nullptr;
658intptr_t Handle::Read(
void*
buffer, intptr_t num_bytes) {
659 MonitorLocker ml(&monitor_);
660 if (data_ready_ ==
nullptr) {
664 data_ready_->Read(
buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
665 if (data_ready_->IsEmpty()) {
666 data_ready_ =
nullptr;
667 if (!IsClosing() && !IsClosedRead()) {
674intptr_t Handle::RecvFrom(
void*
buffer,
678 MonitorLocker ml(&monitor_);
679 if (data_ready_ ==
nullptr) {
683 data_ready_->Read(
buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
684 if (data_ready_->from()->sa_family == AF_INET) {
685 ASSERT(sa_len >=
sizeof(
struct sockaddr_in));
686 memmove(sa, data_ready_->from(),
sizeof(
struct sockaddr_in));
688 ASSERT(data_ready_->from()->sa_family == AF_INET6);
689 ASSERT(sa_len >=
sizeof(
struct sockaddr_in6));
690 memmove(sa, data_ready_->from(),
sizeof(
struct sockaddr_in6));
694 data_ready_ =
nullptr;
695 if (!IsClosing() && !IsClosedRead()) {
701intptr_t Handle::Write(
const void*
buffer, intptr_t num_bytes) {
702 MonitorLocker ml(&monitor_);
703 if (HasPendingWrite()) {
709 ASSERT(SupportsOverlappedIO());
713 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
714 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
715 pending_write_->Write(
buffer, truncated_bytes);
719 return truncated_bytes;
722intptr_t Handle::SendTo(
const void*
buffer,
726 MonitorLocker ml(&monitor_);
727 if (HasPendingWrite()) {
739 ASSERT(SupportsOverlappedIO());
743 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes);
744 pending_write_->Write(
buffer, num_bytes);
745 if (!IssueSendTo(sa, sa_len)) {
751Mutex* StdHandle::stdin_mutex_ =
new Mutex();
752StdHandle* StdHandle::stdin_ =
nullptr;
754StdHandle* StdHandle::Stdin(
HANDLE handle) {
755 MutexLocker ml(stdin_mutex_);
756 if (stdin_ ==
nullptr) {
757 stdin_ =
new StdHandle(handle);
762static void WriteFileThread(uword
args) {
763 StdHandle* handle =
reinterpret_cast<StdHandle*
>(
args);
764 handle->RunWriteLoop();
767void StdHandle::RunWriteLoop() {
768 MonitorLocker ml(&monitor_);
769 write_thread_running_ =
true;
770 thread_id_ = Thread::GetCurrentThreadId();
771 thread_handle_ = OpenThread(
SYNCHRONIZE,
false, thread_id_);
775 while (write_thread_running_) {
776 ml.Wait(Monitor::kNoTimeout);
777 if (HasPendingWrite()) {
779 WriteSyncCompleteAsync();
783 write_thread_exists_ =
false;
787void StdHandle::WriteSyncCompleteAsync() {
788 ASSERT(HasPendingWrite());
790 DWORD bytes_written = -1;
792 pending_write_->GetBufferSize(), &bytes_written,
nullptr);
796 thread_wrote_ += bytes_written;
797 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped();
798 ok = PostQueuedCompletionStatus(
799 event_handler_->completion_port(), bytes_written,
800 reinterpret_cast<ULONG_PTR>(
this), overlapped);
802 FATAL(
"PostQueuedCompletionStatus failed");
806intptr_t StdHandle::Write(
const void*
buffer, intptr_t num_bytes) {
807 MonitorLocker ml(&monitor_);
808 if (HasPendingWrite()) {
819 if (thread_wrote_ > 0) {
820 if (num_bytes > thread_wrote_) {
821 num_bytes = thread_wrote_;
823 thread_wrote_ -= num_bytes;
826 if (!write_thread_exists_) {
827 write_thread_exists_ =
true;
832 int result = Thread::Start(
"dart:io WriteFile", WriteFileThread,
833 reinterpret_cast<uword>(
this));
835 FATAL(
"Failed to start write file thread %d",
result);
837 while (!write_thread_running_) {
839 ml.Wait(Monitor::kNoTimeout);
843 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
845 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
846 pending_write_->Write(
buffer, truncated_bytes);
851void StdHandle::DoClose() {
853 MonitorLocker ml(&monitor_);
854 if (write_thread_exists_) {
855 write_thread_running_ =
false;
857 while (write_thread_exists_) {
858 ml.Wait(Monitor::kNoTimeout);
861 DWORD res = WaitForSingleObject(thread_handle_, INFINITE);
862 CloseHandle(thread_handle_);
863 ASSERT(res == WAIT_OBJECT_0);
867 MutexLocker ml(stdin_mutex_);
869 StdHandle::stdin_ =
nullptr;
873intptr_t ClientSocket::disconnecting_ = 0;
876bool ClientSocket::LoadDisconnectEx() {
878 GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
881 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
882 &guid_disconnect_ex,
sizeof(guid_disconnect_ex), &DisconnectEx_,
883 sizeof(DisconnectEx_), &bytes,
nullptr,
nullptr);
884 return (status != SOCKET_ERROR);
887void ClientSocket::Shutdown(
int how) {
888 int rc = shutdown(socket(), how);
889 if (how == SD_RECEIVE) {
892 if (how == SD_SEND) {
895 if (how == SD_BOTH) {
901void ClientSocket::DoClose() {
903 shutdown(socket(), SD_BOTH);
908bool ClientSocket::IssueRead() {
909 MonitorLocker ml(&monitor_);
911 ASSERT(!HasPendingRead());
915 OverlappedBuffer*
buffer = OverlappedBuffer::AllocateReadBuffer(65536);
919 int rc = WSARecv(socket(),
buffer->GetWASBUF(), 1,
nullptr, &
flags,
920 buffer->GetCleanOverlapped(),
nullptr);
921 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
925 OverlappedBuffer::DisposeBuffer(
buffer);
926 pending_read_ =
nullptr;
931bool ClientSocket::IssueWrite() {
932 MonitorLocker ml(&monitor_);
934 ASSERT(HasPendingWrite());
935 ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
937 int rc = WSASend(socket(), pending_write_->GetWASBUF(), 1,
nullptr, 0,
938 pending_write_->GetCleanOverlapped(),
nullptr);
939 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
942 OverlappedBuffer::DisposeBuffer(pending_write_);
943 pending_write_ =
nullptr;
948void ClientSocket::IssueDisconnect() {
949 OverlappedBuffer*
buffer = OverlappedBuffer::AllocateDisconnectBuffer();
951 DisconnectEx_(socket(),
buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0);
954 if (
ok || (WSAGetLastError() != WSA_IO_PENDING)) {
955 DisconnectComplete(
buffer);
964 NotifyAllDartPorts(1 << kDestroyedEvent);
971void ClientSocket::DisconnectComplete(OverlappedBuffer*
buffer) {
972 OverlappedBuffer::DisposeBuffer(
buffer);
973 closesocket(socket());
974 data_ready_ =
nullptr;
981void ClientSocket::ConnectComplete(OverlappedBuffer*
buffer) {
982 OverlappedBuffer::DisposeBuffer(
buffer);
984 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
nullptr, 0);
987 if (!IsClosedRead() && ((Mask() & (1 << kInEvent)) != 0)) {
990 if (!IsClosedWrite() && ((Mask() & (1 << kOutEvent)) != 0)) {
992 DartUtils::PostInt32(port, 1 << kOutEvent);
996void ClientSocket::EnsureInitialized(
997 EventHandlerImplementation* event_handler) {
998 MonitorLocker ml(&monitor_);
1000 ASSERT(event_handler_ ==
nullptr);
1001 event_handler_ = event_handler;
1002 CreateCompletionPort(event_handler_->completion_port());
1006bool ClientSocket::IsClosed() {
1007 return connected_ && closed_ && !HasPendingRead() && !HasPendingWrite();
1010bool ClientSocket::PopulateRemoteAddr(RawAddr& addr) {
1011 if (!remote_addr_) {
1014 addr = *remote_addr_;
1018bool DatagramSocket::IssueSendTo(
struct sockaddr* sa, socklen_t sa_len) {
1019 MonitorLocker ml(&monitor_);
1021 ASSERT(HasPendingWrite());
1022 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
1024 int rc = WSASendTo(socket(), pending_write_->GetWASBUF(), 1,
nullptr, 0, sa,
1025 sa_len, pending_write_->GetCleanOverlapped(),
nullptr);
1026 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
1029 OverlappedBuffer::DisposeBuffer(pending_write_);
1030 pending_write_ =
nullptr;
1035bool DatagramSocket::IssueRecvFrom() {
1036 MonitorLocker ml(&monitor_);
1038 ASSERT(!HasPendingRead());
1040 OverlappedBuffer*
buffer =
1041 OverlappedBuffer::AllocateRecvFromBuffer(kMaxUDPPackageLength);
1045 int rc = WSARecvFrom(socket(),
buffer->GetWASBUF(), 1,
nullptr, &
flags,
1047 buffer->GetCleanOverlapped(),
nullptr);
1048 if ((rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
1052 OverlappedBuffer::DisposeBuffer(
buffer);
1053 pending_read_ =
nullptr;
1058void DatagramSocket::EnsureInitialized(
1059 EventHandlerImplementation* event_handler) {
1060 MonitorLocker ml(&monitor_);
1062 ASSERT(event_handler_ ==
nullptr);
1063 event_handler_ = event_handler;
1064 CreateCompletionPort(event_handler_->completion_port());
1068bool DatagramSocket::IsClosed() {
1069 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
1072void DatagramSocket::DoClose() {
1074 closesocket(socket());
1080void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
1081 if (msg->id == kTimerId) {
1084 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
1085 }
else if (msg->id == kShutdownId) {
1088 Socket* socket =
reinterpret_cast<Socket*
>(msg->id);
1089 RefCntReleaseScope<Socket> rs(socket);
1090 if (socket->fd() == -1) {
1093 Handle* handle =
reinterpret_cast<Handle*
>(socket->fd());
1094 ASSERT(handle !=
nullptr);
1096 if (handle->is_listen_socket()) {
1097 ListenSocket* listen_socket =
reinterpret_cast<ListenSocket*
>(handle);
1098 listen_socket->EnsureInitialized(
this);
1100 MonitorLocker ml(&listen_socket->monitor_);
1102 if (
IS_COMMAND(msg->data, kReturnTokenCommand)) {
1103 listen_socket->ReturnTokens(msg->dart_port,
TOKEN_COUNT(msg->data));
1104 }
else if (
IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1107 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1108 listen_socket->SetPortAndMask(msg->dart_port, events);
1109 TryDispatchingPendingAccepts(listen_socket);
1110 }
else if (
IS_COMMAND(msg->data, kCloseCommand)) {
1112 listen_socket->RemovePort(msg->dart_port);
1118 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
1119 MutexLocker locker(registry->mutex());
1120 if (registry->CloseSafe(socket)) {
1121 ASSERT(listen_socket->Mask() == 0);
1122 listen_socket->Close();
1125 socket->SetClosedFd();
1126 DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
1131 handle->EnsureInitialized(
this);
1132 MonitorLocker ml(&handle->monitor_);
1134 if (
IS_COMMAND(msg->data, kReturnTokenCommand)) {
1135 handle->ReturnTokens(msg->dart_port,
TOKEN_COUNT(msg->data));
1136 }
else if (
IS_COMMAND(msg->data, kSetEventMaskCommand)) {
1139 ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
1141 handle->SetPortAndMask(msg->dart_port, events);
1144 if ((handle->Mask() & (1 << kInEvent)) != 0) {
1145 if (handle->is_datagram_socket()) {
1146 handle->IssueRecvFrom();
1147 }
else if (handle->is_client_socket()) {
1148 if (
reinterpret_cast<ClientSocket*
>(handle)->is_connected()) {
1149 handle->IssueRead();
1152 handle->IssueRead();
1159 intptr_t out_event_mask = 1 <<
kOutEvent;
1160 if ((events & out_event_mask) != 0) {
1161 if (!handle->HasPendingWrite()) {
1162 if (handle->is_client_socket()) {
1163 if (
reinterpret_cast<ClientSocket*
>(handle)->is_connected()) {
1165 if ((handle->Mask() & event_mask) != 0) {
1167 DartUtils::PostInt32(port, event_mask);
1171 if ((handle->Mask() & out_event_mask) != 0) {
1172 Dart_Port port = handle->NextNotifyDartPort(out_event_mask);
1173 DartUtils::PostInt32(port, out_event_mask);
1180 intptr_t in_event_mask = 1 <<
kInEvent;
1181 if ((events & in_event_mask) != 0) {
1182 if (handle->data_ready_ !=
nullptr &&
1183 !handle->data_ready_->IsEmpty()) {
1184 if ((handle->Mask() & in_event_mask) != 0) {
1186 DartUtils::PostInt32(port, in_event_mask);
1190 }
else if (
IS_COMMAND(msg->data, kShutdownReadCommand)) {
1191 ASSERT(handle->is_client_socket());
1193 ClientSocket* client_socket =
reinterpret_cast<ClientSocket*
>(handle);
1194 client_socket->Shutdown(SD_RECEIVE);
1195 }
else if (
IS_COMMAND(msg->data, kShutdownWriteCommand)) {
1196 ASSERT(handle->is_client_socket());
1198 ClientSocket* client_socket =
reinterpret_cast<ClientSocket*
>(handle);
1199 client_socket->Shutdown(SD_SEND);
1200 }
else if (
IS_COMMAND(msg->data, kCloseCommand)) {
1202 Process::ClearSignalHandlerByFd(socket->fd(), socket->isolate_port());
1204 handle->SetPortAndMask(msg->dart_port, 0);
1212 DeleteIfClosed(handle);
1216void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
1217 OverlappedBuffer*
buffer) {
1218 listen_socket->AcceptComplete(
buffer, completion_port_);
1221 MonitorLocker ml(&listen_socket->monitor_);
1222 TryDispatchingPendingAccepts(listen_socket);
1225 DeleteIfClosed(listen_socket);
1228void EventHandlerImplementation::TryDispatchingPendingAccepts(
1229 ListenSocket* listen_socket) {
1230 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
1231 intptr_t event_mask = 1 <<
kInEvent;
1232 for (
int i = 0; (i < listen_socket->accepted_count()) &&
1233 (listen_socket->Mask() == event_mask);
1235 Dart_Port port = listen_socket->NextNotifyDartPort(event_mask);
1236 DartUtils::PostInt32(port, event_mask);
1241void EventHandlerImplementation::HandleRead(Handle* handle,
1243 OverlappedBuffer*
buffer) {
1244 buffer->set_data_length(bytes);
1245 handle->ReadComplete(
buffer);
1247 if (!handle->IsClosing()) {
1249 if ((handle->Mask() & event_mask) != 0) {
1251 DartUtils::PostInt32(port, event_mask);
1255 handle->MarkClosedRead();
1257 HandleClosed(handle);
1259 HandleError(handle);
1263 DeleteIfClosed(handle);
1266void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
1268 OverlappedBuffer*
buffer) {
1269 ASSERT(handle->is_datagram_socket());
1271 buffer->set_data_length(bytes);
1272 handle->ReadComplete(
buffer);
1273 if (!handle->IsClosing()) {
1275 if ((handle->Mask() & event_mask) != 0) {
1277 DartUtils::PostInt32(port, event_mask);
1281 HandleError(handle);
1284 DeleteIfClosed(handle);
1287void EventHandlerImplementation::HandleWrite(Handle* handle,
1289 OverlappedBuffer*
buffer) {
1290 handle->WriteComplete(
buffer);
1293 if (!handle->IsError() && !handle->IsClosing()) {
1295 ASSERT(!handle->is_client_socket() ||
1296 reinterpret_cast<ClientSocket*
>(handle)->is_connected());
1297 if ((handle->Mask() & event_mask) != 0) {
1299 DartUtils::PostInt32(port, event_mask);
1303 HandleError(handle);
1306 DeleteIfClosed(handle);
1309void EventHandlerImplementation::HandleDisconnect(ClientSocket* client_socket,
1311 OverlappedBuffer*
buffer) {
1312 client_socket->DisconnectComplete(
buffer);
1313 DeleteIfClosed(client_socket);
1316void EventHandlerImplementation::HandleConnect(ClientSocket* client_socket,
1318 OverlappedBuffer*
buffer) {
1320 HandleError(client_socket);
1321 OverlappedBuffer::DisposeBuffer(
buffer);
1323 client_socket->ConnectComplete(
buffer);
1325 client_socket->mark_connected();
1326 DeleteIfClosed(client_socket);
1329void EventHandlerImplementation::HandleTimeout() {
1330 if (!timeout_queue_.HasTimeout()) {
1333 DartUtils::PostNull(timeout_queue_.CurrentPort());
1334 timeout_queue_.RemoveCurrent();
1337void EventHandlerImplementation::HandleIOCompletion(
DWORD bytes,
1340 OverlappedBuffer*
buffer = OverlappedBuffer::GetFromOverlapped(overlapped);
1341 switch (
buffer->operation()) {
1342 case OverlappedBuffer::kAccept: {
1343 ListenSocket* listen_socket =
reinterpret_cast<ListenSocket*
>(
key);
1344 HandleAccept(listen_socket,
buffer);
1347 case OverlappedBuffer::kRead: {
1348 Handle* handle =
reinterpret_cast<Handle*
>(
key);
1349 HandleRead(handle, bytes,
buffer);
1352 case OverlappedBuffer::kRecvFrom: {
1353 Handle* handle =
reinterpret_cast<Handle*
>(
key);
1354 HandleRecvFrom(handle, bytes,
buffer);
1357 case OverlappedBuffer::kWrite:
1358 case OverlappedBuffer::kSendTo: {
1359 Handle* handle =
reinterpret_cast<Handle*
>(
key);
1360 HandleWrite(handle, bytes,
buffer);
1363 case OverlappedBuffer::kDisconnect: {
1364 ClientSocket* client_socket =
reinterpret_cast<ClientSocket*
>(
key);
1365 HandleDisconnect(client_socket, bytes,
buffer);
1368 case OverlappedBuffer::kConnect: {
1369 ClientSocket* client_socket =
reinterpret_cast<ClientSocket*
>(
key);
1370 HandleConnect(client_socket, bytes,
buffer);
1378void EventHandlerImplementation::HandleCompletionOrInterrupt(
1390 if ((last_error == ERROR_CONNECTION_ABORTED) ||
1391 (last_error == ERROR_OPERATION_ABORTED) ||
1392 (last_error == ERROR_NETNAME_DELETED) ||
1393 (last_error == ERROR_BROKEN_PIPE)) {
1395 HandleIOCompletion(bytes,
key, overlapped);
1396 }
else if (last_error == ERROR_MORE_DATA) {
1400 HandleIOCompletion(-1,
key, overlapped);
1403 HandleIOCompletion(-1,
key, overlapped);
1405 }
else if (
key == NULL) {
1407 InterruptMessage* msg =
reinterpret_cast<InterruptMessage*
>(overlapped);
1408 HandleInterrupt(msg);
1411 HandleIOCompletion(bytes,
key, overlapped);
1415EventHandlerImplementation::EventHandlerImplementation() {
1416 handler_thread_id_ = Thread::kInvalidThreadId;
1417 handler_thread_handle_ =
nullptr;
1420 if (completion_port_ ==
nullptr) {
1421 FATAL(
"Completion port creation failed");
1426EventHandlerImplementation::~EventHandlerImplementation() {
1428 DWORD res = WaitForSingleObject(handler_thread_handle_, INFINITE);
1429 CloseHandle(handler_thread_handle_);
1430 ASSERT(res == WAIT_OBJECT_0);
1431 CloseHandle(completion_port_);
1434int64_t EventHandlerImplementation::GetTimeout() {
1435 if (!timeout_queue_.HasTimeout()) {
1436 return kInfinityTimeout;
1439 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
1440 return (millis < 0) ? 0 : millis;
1443void EventHandlerImplementation::SendData(intptr_t
id,
1446 InterruptMessage* msg =
new InterruptMessage;
1448 msg->dart_port = dart_port;
1450 BOOL ok = PostQueuedCompletionStatus(completion_port_, 0, NULL,
1453 FATAL(
"PostQueuedCompletionStatus failed");
1457void EventHandlerImplementation::EventHandlerEntry(uword
args) {
1458 EventHandler* handler =
reinterpret_cast<EventHandler*
>(
args);
1459 EventHandlerImplementation* handler_impl = &handler->delegate_;
1460 ASSERT(handler_impl !=
nullptr);
1463 MonitorLocker ml(&handler_impl->startup_monitor_);
1464 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
1465 handler_impl->handler_thread_handle_ =
1466 OpenThread(
SYNCHRONIZE,
false, handler_impl->handler_thread_id_);
1474 while (!handler_impl->shutdown_) {
1475 int64_t millis = handler_impl->GetTimeout();
1476 ASSERT(millis == kInfinityTimeout || millis >= 0);
1477 if (millis > kMaxInt32) {
1482 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1483 &
key, &overlapped, timeout);
1485 if (!
ok && (overlapped ==
nullptr)) {
1488 Syslog::Print(
"Completion port closed\n");
1492 handler_impl->HandleTimeout();
1495 handler_impl->HandleCompletionOrInterrupt(
ok, bytes,
key, overlapped);
1504 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1505 &
key, &overlapped, 0);
1506 if (!
ok && (overlapped ==
nullptr)) {
1510 handler_impl->HandleCompletionOrInterrupt(
ok, bytes,
key, overlapped);
1521 const intptr_t stdin_leaked = (StdHandle::StdinPtr() ==
nullptr) ? 0 : 1;
1523 ClientSocket::disconnecting() + stdin_leaked);
1524 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
1526 handler->NotifyShutdownDone();
1529void EventHandlerImplementation::Start(EventHandler* handler) {
1530 int result = Thread::Start(
"dart:io EventHandler", EventHandlerEntry,
1531 reinterpret_cast<uword>(handler));
1533 FATAL(
"Failed to start event handler thread %d",
result);
1537 MonitorLocker ml(&startup_monitor_);
1538 while (handler_thread_id_ == Thread::kInvalidThreadId) {
1544void EventHandlerImplementation::Shutdown() {
static uint32_t buffer_size(uint32_t offset, uint32_t maxAlignment)
static bool ok(int result)
static void operation(T operation, uint32_t &a, uint32_t b, uint32_t c, uint32_t d, uint32_t x, uint8_t s, uint32_t t)
static const size_t kBufferSize
#define DEBUG_ASSERT(cond)
int Read(void *buffer, int num_bytes)
static void DisposeBuffer(OverlappedBuffer *buffer)
static OverlappedBuffer * GetFromOverlapped(OVERLAPPED *overlapped)
static OverlappedBuffer * AllocateRecvFromBuffer(int buffer_size)
static OverlappedBuffer * AllocateAcceptBuffer(int buffer_size)
Operation operation() const
static OverlappedBuffer * AllocateConnectBuffer()
int Write(const void *buffer, int num_bytes)
static OverlappedBuffer * AllocateDisconnectBuffer()
static OverlappedBuffer * AllocateSendToBuffer(int buffer_size)
static OverlappedBuffer * AllocateReadBuffer(int buffer_size)
static OverlappedBuffer * AllocateWriteBuffer(int buffer_size)
#define IS_COMMAND(data, command_bit)
#define IS_SIGNAL_SOCKET(data)
#define TOKEN_COUNT(data)
FlutterSemanticsFlag flags
G_BEGIN_DECLS G_MODULE_EXPORT FlValue * args
static const uint8_t buffer[]
const uint8_t uint32_t uint32_t GError ** error
int SendData(MHD_Connection *connection, const SkData *data, const char *type, bool setContentDisposition, const char *dispositionString)
void ReadFile(uint8_t **data, intptr_t *file_len, void *stream)
bool WriteFile(const std::string &path, const char *data, ssize_t size)
constexpr int32_t kMaxInt32
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot data
#define INVALID_HANDLE_VALUE
WINBASEAPI VOID WINAPI SetLastError(_In_ DWORD dwErrCode)
WINBASEAPI _Check_return_ _Post_equals_last_error_ DWORD WINAPI GetLastError(VOID)
struct _OVERLAPPED OVERLAPPED
__w64 unsigned long ULONG_PTR