6#if defined(DART_HOST_OS_WINDOWS)
35static constexpr int kStdOverlappedBufferSize = 16 * 1024;
36static constexpr int kMaxUDPPackageLength = 64 * 1024;
42static constexpr int kAcceptExAddressAdditionalBytes = 16;
43static constexpr int kAcceptExAddressStorageSize =
44 sizeof(SOCKADDR_STORAGE) + kAcceptExAddressAdditionalBytes;
46OverlappedBuffer* OverlappedBuffer::AllocateBuffer(
int buffer_size,
65 buffer_size +=
sizeof(socklen_t) +
sizeof(
struct sockaddr_storage);
91 CONTAINING_RECORD(overlapped, OverlappedBuffer, overlapped_);
105 ASSERT(num_bytes == buflen_);
107 data_length_ = num_bytes;
113 return data_length_ - index_;
117 : ReferenceCounted(),
118 DescriptorInfoBase(handle),
122 event_handler_(nullptr),
124 pending_read_(nullptr),
125 pending_write_(nullptr),
126 last_error_(NOERROR),
127 read_thread_id_(Thread::kInvalidThreadId),
128 read_thread_handle_(nullptr),
129 read_thread_starting_(
false),
130 read_thread_finished_(
false),
135bool Handle::CreateCompletionPort(
HANDLE completion_port) {
140 completion_port_ = CreateIoCompletionPort(
141 handle(), completion_port,
reinterpret_cast<ULONG_PTR>(
this), 0);
142 return (completion_port_ !=
nullptr);
146 MonitorLocker ml(&monitor_);
147 if (!SupportsOverlappedIO()) {
163void Handle::DoClose() {
164 if (!IsHandleClosed()) {
170bool Handle::HasPendingRead() {
171 return pending_read_ !=
nullptr;
174bool Handle::HasPendingWrite() {
175 return pending_write_ !=
nullptr;
178void Handle::WaitForReadThreadStarted() {
179 MonitorLocker ml(&monitor_);
180 while (read_thread_starting_) {
185void Handle::WaitForReadThreadFinished() {
188 MonitorLocker ml(&monitor_);
189 if (read_thread_id_ != Thread::kInvalidThreadId) {
190 while (!read_thread_finished_) {
193 read_thread_finished_ =
false;
194 read_thread_id_ = Thread::kInvalidThreadId;
195 to_join = read_thread_handle_;
196 read_thread_handle_ =
nullptr;
199 if (to_join !=
nullptr) {
201 DWORD res = WaitForSingleObject(to_join, INFINITE);
202 CloseHandle(to_join);
203 ASSERT(res == WAIT_OBJECT_0);
207void Handle::ReadComplete(OverlappedBuffer*
buffer) {
208 WaitForReadThreadStarted();
210 MonitorLocker ml(&monitor_);
213 ASSERT(data_ready_ ==
nullptr);
215 data_ready_.reset(pending_read_);
217 OverlappedBuffer::DisposeBuffer(
buffer);
219 pending_read_ =
nullptr;
221 WaitForReadThreadFinished();
224void Handle::RecvFromComplete(OverlappedBuffer*
buffer) {
228void Handle::WriteComplete(OverlappedBuffer*
buffer) {
229 MonitorLocker ml(&monitor_);
232 OverlappedBuffer::DisposeBuffer(
buffer);
233 pending_write_ =
nullptr;
237 Handle* handle =
reinterpret_cast<Handle*
>(
args);
238 handle->ReadSyncCompleteAsync();
241void Handle::NotifyReadThreadStarted() {
242 MonitorLocker ml(&monitor_);
243 ASSERT(read_thread_starting_);
244 ASSERT(read_thread_id_ == Thread::kInvalidThreadId);
245 read_thread_id_ = Thread::GetCurrentThreadId();
246 read_thread_handle_ = OpenThread(
SYNCHRONIZE,
false, read_thread_id_);
247 read_thread_starting_ =
false;
251void Handle::NotifyReadThreadFinished() {
252 MonitorLocker ml(&monitor_);
253 ASSERT(!read_thread_finished_);
254 ASSERT(read_thread_id_ != Thread::kInvalidThreadId);
255 read_thread_finished_ =
true;
259void Handle::ReadSyncCompleteAsync() {
260 NotifyReadThreadStarted();
262 ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
265 if (GetFileType(
handle_) == FILE_TYPE_CHAR) {
268 char* buffer_start = pending_read_->GetBufferStart();
269 DWORD bytes_read = 0;
274 OVERLAPPED* overlapped = pending_read_->GetCleanOverlapped();
276 PostQueuedCompletionStatus(event_handler_->completion_port(), bytes_read,
277 reinterpret_cast<ULONG_PTR>(
this), overlapped);
279 FATAL(
"PostQueuedCompletionStatus failed");
281 NotifyReadThreadFinished();
284bool Handle::IssueRead() {
285 ASSERT(type_ != kListenSocket);
286 ASSERT(!HasPendingRead());
288 if (SupportsOverlappedIO()) {
293 nullptr,
buffer->GetCleanOverlapped());
299 OverlappedBuffer::DisposeBuffer(
buffer);
305 read_thread_starting_ =
true;
306 int result = Thread::Start(
"dart:io ReadFile", ReadFileThread,
307 reinterpret_cast<uword>(
this));
309 FATAL(
"Failed to start read file thread %d",
result);
315bool Handle::IssueRecvFrom() {
319bool Handle::IssueWrite() {
320 MonitorLocker ml(&monitor_);
321 ASSERT(type_ != kListenSocket);
323 ASSERT(HasPendingWrite());
326 OverlappedBuffer*
buffer = pending_write_;
329 nullptr,
buffer->GetCleanOverlapped());
335 OverlappedBuffer::DisposeBuffer(
buffer);
340bool Handle::IssueSendTo(
struct sockaddr* sa, socklen_t sa_len) {
344static void HandleClosed(Handle* handle) {
345 if (!handle->IsClosing()) {
347 handle->NotifyAllDartPorts(event_mask);
351static void HandleError(Handle* handle) {
352 handle->set_last_error(WSAGetLastError());
354 if (!handle->IsClosing()) {
359void Handle::HandleIssueError() {
361 if (
error == ERROR_BROKEN_PIPE) {
369void FileHandle::EnsureInitialized(EventHandlerImplementation*
event_handler) {
370 MonitorLocker ml(&monitor_);
373 if (SupportsOverlappedIO()) {
374 CreateCompletionPort(event_handler_->completion_port());
380 completion_port_ = event_handler_->completion_port();
385bool FileHandle::IsClosed() {
386 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
389void DirectoryWatchHandle::EnsureInitialized(
391 MonitorLocker ml(&monitor_);
394 CreateCompletionPort(event_handler_->completion_port());
398bool DirectoryWatchHandle::IsClosed() {
399 return IsClosing() && !HasPendingRead();
402bool DirectoryWatchHandle::IssueRead() {
405 if (HasPendingRead() || (data_ready_ !=
nullptr)) {
414 BOOL ok = ReadDirectoryChangesW(
416 events_,
nullptr,
buffer->GetCleanOverlapped(),
nullptr);
421 pending_read_ =
nullptr;
422 OverlappedBuffer::DisposeBuffer(
buffer);
426void DirectoryWatchHandle::Stop() {
427 MonitorLocker ml(&monitor_);
430 if (HasPendingRead()) {
431 CancelIoEx(handle(), pending_read_->GetCleanOverlapped());
438void SocketHandle::HandleIssueError() {
439 int error = WSAGetLastError();
440 if (
error == WSAECONNRESET) {
445 WSASetLastError(
error);
448bool ListenSocket::LoadAcceptEx() {
450 GUID guid_accept_ex = WSAID_ACCEPTEX;
452 int status = WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
453 &guid_accept_ex,
sizeof(guid_accept_ex), &AcceptEx_,
454 sizeof(AcceptEx_), &bytes,
nullptr,
nullptr);
455 return (status != SOCKET_ERROR);
458bool ListenSocket::LoadGetAcceptExSockaddrs() {
460 GUID guid_get_accept_ex_sockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
463 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
464 &guid_get_accept_ex_sockaddrs,
465 sizeof(guid_get_accept_ex_sockaddrs), &GetAcceptExSockaddrs_,
466 sizeof(GetAcceptExSockaddrs_), &bytes,
nullptr,
nullptr);
467 return (status != SOCKET_ERROR);
470bool ListenSocket::IssueAccept() {
471 MonitorLocker ml(&monitor_);
473 OverlappedBuffer*
buffer =
474 OverlappedBuffer::AllocateAcceptBuffer(2 * kAcceptExAddressStorageSize);
477 ok = AcceptEx_(socket(),
buffer->client(),
buffer->GetBufferStart(),
479 kAcceptExAddressStorageSize, kAcceptExAddressStorageSize,
480 &received,
buffer->GetCleanOverlapped());
482 if (WSAGetLastError() != WSA_IO_PENDING) {
483 int error = WSAGetLastError();
484 closesocket(
buffer->client());
485 OverlappedBuffer::DisposeBuffer(
buffer);
486 WSASetLastError(
error);
491 pending_accept_count_++;
496void ListenSocket::AcceptComplete(OverlappedBuffer*
buffer,
498 MonitorLocker ml(&monitor_);
502 int rc = setsockopt(
buffer->client(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
503 reinterpret_cast<char*
>(&
s),
sizeof(
s));
504 if (
rc == NO_ERROR) {
509 LPSOCKADDR local_addr;
510 int local_addr_length;
511 LPSOCKADDR remote_addr;
512 int remote_addr_length;
513 GetAcceptExSockaddrs_(
514 buffer->GetBufferStart(), 0, kAcceptExAddressStorageSize,
515 kAcceptExAddressStorageSize, &local_addr, &local_addr_length,
516 &remote_addr, &remote_addr_length);
517 RawAddr* raw_remote_addr =
new RawAddr;
518 memmove(raw_remote_addr, remote_addr, remote_addr_length);
521 ClientSocket* client_socket =
new ClientSocket(
522 buffer->client(), std::unique_ptr<RawAddr>(raw_remote_addr));
523 client_socket->mark_connected();
524 client_socket->CreateCompletionPort(completion_port);
525 if (accepted_head_ ==
nullptr) {
526 accepted_head_ = client_socket;
527 accepted_tail_ = client_socket;
529 ASSERT(accepted_tail_ !=
nullptr);
530 accepted_tail_->set_next(client_socket);
531 accepted_tail_ = client_socket;
535 closesocket(
buffer->client());
539 closesocket(
buffer->client());
542 pending_accept_count_--;
543 OverlappedBuffer::DisposeBuffer(
buffer);
546static void DeleteIfClosed(Handle* handle) {
547 if (handle->IsClosed()) {
549 handle->set_event_handler(
nullptr);
551 handle->RemoveAllPorts();
558void ListenSocket::DoClose() {
559 closesocket(socket());
561 while (CanAccept()) {
563 ClientSocket* client = Accept();
564 if (client !=
nullptr) {
573 DeleteIfClosed(client);
582 GetAcceptExSockaddrs_ =
nullptr;
585bool ListenSocket::CanAccept() {
586 MonitorLocker ml(&monitor_);
587 return accepted_head_ !=
nullptr;
590ClientSocket* ListenSocket::Accept() {
591 MonitorLocker ml(&monitor_);
593 ClientSocket*
result =
nullptr;
595 if (accepted_head_ !=
nullptr) {
597 accepted_head_ = accepted_head_->next();
598 if (accepted_head_ ==
nullptr) {
599 accepted_tail_ =
nullptr;
601 result->set_next(
nullptr);
605 if (pending_accept_count_ < 5) {
608 if (!IssueAccept()) {
617void ListenSocket::EnsureInitialized(
619 MonitorLocker ml(&monitor_);
620 if (AcceptEx_ ==
nullptr) {
622 ASSERT(event_handler_ ==
nullptr);
624 CreateCompletionPort(event_handler_->completion_port());
625 bool isLoaded = LoadAcceptEx();
628 if (GetAcceptExSockaddrs_ ==
nullptr) {
629 bool isLoaded = LoadGetAcceptExSockaddrs();
634bool ListenSocket::IsClosed() {
635 return IsClosing() && !HasPendingAccept();
638intptr_t Handle::Available() {
639 MonitorLocker ml(&monitor_);
640 if (data_ready_ ==
nullptr) {
643 return data_ready_->GetRemainingLength();
646bool Handle::DataReady() {
647 return data_ready_ !=
nullptr;
651 MonitorLocker ml(&monitor_);
652 if (data_ready_ ==
nullptr) {
656 data_ready_->Read(
buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
657 if (data_ready_->IsEmpty()) {
658 data_ready_ =
nullptr;
659 if (!IsClosing() && !IsClosedRead()) {
666intptr_t Handle::RecvFrom(
void*
buffer,
670 MonitorLocker ml(&monitor_);
671 if (data_ready_ ==
nullptr) {
675 data_ready_->Read(
buffer, Utils::Minimum<intptr_t>(num_bytes, INT_MAX));
676 if (data_ready_->from()->sa_family == AF_INET) {
677 ASSERT(sa_len >=
sizeof(
struct sockaddr_in));
678 memmove(sa, data_ready_->from(),
sizeof(
struct sockaddr_in));
680 ASSERT(data_ready_->from()->sa_family == AF_INET6);
681 ASSERT(sa_len >=
sizeof(
struct sockaddr_in6));
682 memmove(sa, data_ready_->from(),
sizeof(
struct sockaddr_in6));
686 data_ready_ =
nullptr;
687 if (!IsClosing() && !IsClosedRead()) {
693intptr_t Handle::Write(
const void*
buffer, intptr_t num_bytes) {
694 MonitorLocker ml(&monitor_);
695 if (HasPendingWrite()) {
701 ASSERT(SupportsOverlappedIO());
705 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
706 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
707 pending_write_->Write(
buffer, truncated_bytes);
711 return truncated_bytes;
714intptr_t Handle::SendTo(
const void*
buffer,
718 MonitorLocker ml(&monitor_);
719 if (HasPendingWrite()) {
731 ASSERT(SupportsOverlappedIO());
735 pending_write_ = OverlappedBuffer::AllocateSendToBuffer(num_bytes);
736 pending_write_->Write(
buffer, num_bytes);
737 if (!IssueSendTo(sa, sa_len)) {
743Mutex* StdHandle::stdin_mutex_ =
new Mutex();
744StdHandle* StdHandle::stdin_ =
nullptr;
746StdHandle* StdHandle::Stdin(
HANDLE handle) {
747 MutexLocker ml(stdin_mutex_);
748 if (stdin_ ==
nullptr) {
749 stdin_ =
new StdHandle(handle);
755 StdHandle* handle =
reinterpret_cast<StdHandle*
>(
args);
756 handle->RunWriteLoop();
759void StdHandle::RunWriteLoop() {
760 MonitorLocker ml(&monitor_);
761 write_thread_running_ =
true;
762 thread_id_ = Thread::GetCurrentThreadId();
763 thread_handle_ = OpenThread(
SYNCHRONIZE,
false, thread_id_);
767 while (write_thread_running_) {
768 ml.Wait(Monitor::kNoTimeout);
769 if (HasPendingWrite()) {
771 WriteSyncCompleteAsync();
775 write_thread_exists_ =
false;
779void StdHandle::WriteSyncCompleteAsync() {
780 ASSERT(HasPendingWrite());
782 DWORD bytes_written = -1;
784 pending_write_->GetBufferSize(), &bytes_written,
nullptr);
788 thread_wrote_ += bytes_written;
789 OVERLAPPED* overlapped = pending_write_->GetCleanOverlapped();
790 ok = PostQueuedCompletionStatus(
791 event_handler_->completion_port(), bytes_written,
792 reinterpret_cast<ULONG_PTR>(
this), overlapped);
794 FATAL(
"PostQueuedCompletionStatus failed");
798intptr_t StdHandle::Write(
const void*
buffer, intptr_t num_bytes) {
799 MonitorLocker ml(&monitor_);
800 if (HasPendingWrite()) {
811 if (thread_wrote_ > 0) {
812 if (num_bytes > thread_wrote_) {
813 num_bytes = thread_wrote_;
815 thread_wrote_ -= num_bytes;
818 if (!write_thread_exists_) {
819 write_thread_exists_ =
true;
824 int result = Thread::Start(
"dart:io WriteFile", WriteFileThread,
825 reinterpret_cast<uword>(
this));
827 FATAL(
"Failed to start write file thread %d",
result);
829 while (!write_thread_running_) {
831 ml.Wait(Monitor::kNoTimeout);
835 int truncated_bytes = Utils::Minimum<intptr_t>(num_bytes, INT_MAX);
837 pending_write_ = OverlappedBuffer::AllocateWriteBuffer(truncated_bytes);
838 pending_write_->Write(
buffer, truncated_bytes);
843void StdHandle::DoClose() {
845 MonitorLocker ml(&monitor_);
846 if (write_thread_exists_) {
847 write_thread_running_ =
false;
849 while (write_thread_exists_) {
850 ml.Wait(Monitor::kNoTimeout);
853 DWORD res = WaitForSingleObject(thread_handle_, INFINITE);
854 CloseHandle(thread_handle_);
855 ASSERT(res == WAIT_OBJECT_0);
859 MutexLocker ml(stdin_mutex_);
861 StdHandle::stdin_ =
nullptr;
865intptr_t ClientSocket::disconnecting_ = 0;
868bool ClientSocket::LoadDisconnectEx() {
870 GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
873 WSAIoctl(socket(), SIO_GET_EXTENSION_FUNCTION_POINTER,
874 &guid_disconnect_ex,
sizeof(guid_disconnect_ex), &DisconnectEx_,
875 sizeof(DisconnectEx_), &bytes,
nullptr,
nullptr);
876 return (status != SOCKET_ERROR);
880 int rc = shutdown(socket(), how);
881 if (how == SD_RECEIVE) {
884 if (how == SD_SEND) {
887 if (how == SD_BOTH) {
893void ClientSocket::DoClose() {
895 shutdown(socket(), SD_BOTH);
900bool ClientSocket::IssueRead() {
901 MonitorLocker ml(&monitor_);
903 ASSERT(!HasPendingRead());
907 OverlappedBuffer*
buffer = OverlappedBuffer::AllocateReadBuffer(65536);
911 int rc = WSARecv(socket(),
buffer->GetWASBUF(), 1,
nullptr, &
flags,
912 buffer->GetCleanOverlapped(),
nullptr);
913 if ((
rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
917 OverlappedBuffer::DisposeBuffer(
buffer);
918 pending_read_ =
nullptr;
923bool ClientSocket::IssueWrite() {
924 MonitorLocker ml(&monitor_);
926 ASSERT(HasPendingWrite());
929 int rc = WSASend(socket(), pending_write_->GetWASBUF(), 1,
nullptr, 0,
930 pending_write_->GetCleanOverlapped(),
nullptr);
931 if ((
rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
934 OverlappedBuffer::DisposeBuffer(pending_write_);
935 pending_write_ =
nullptr;
940void ClientSocket::IssueDisconnect() {
941 OverlappedBuffer*
buffer = OverlappedBuffer::AllocateDisconnectBuffer();
943 DisconnectEx_(socket(),
buffer->GetCleanOverlapped(), TF_REUSE_SOCKET, 0);
946 if (
ok || (WSAGetLastError() != WSA_IO_PENDING)) {
947 DisconnectComplete(
buffer);
963void ClientSocket::DisconnectComplete(OverlappedBuffer*
buffer) {
964 OverlappedBuffer::DisposeBuffer(
buffer);
965 closesocket(socket());
966 data_ready_ =
nullptr;
973void ClientSocket::ConnectComplete(OverlappedBuffer*
buffer) {
974 OverlappedBuffer::DisposeBuffer(
buffer);
976 setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
nullptr, 0);
979 if (!IsClosedRead() && ((Mask() & (1 <<
kInEvent)) != 0)) {
982 if (!IsClosedWrite() && ((Mask() & (1 <<
kOutEvent)) != 0)) {
988void ClientSocket::EnsureInitialized(
990 MonitorLocker ml(&monitor_);
992 ASSERT(event_handler_ ==
nullptr);
994 CreateCompletionPort(event_handler_->completion_port());
998bool ClientSocket::IsClosed() {
999 return connected_ && closed_ && !HasPendingRead() && !HasPendingWrite();
1002bool ClientSocket::PopulateRemoteAddr(RawAddr&
addr) {
1003 if (!remote_addr_) {
1006 addr = *remote_addr_;
1010bool DatagramSocket::IssueSendTo(
struct sockaddr* sa, socklen_t sa_len) {
1011 MonitorLocker ml(&monitor_);
1013 ASSERT(HasPendingWrite());
1014 ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
1016 int rc = WSASendTo(socket(), pending_write_->GetWASBUF(), 1,
nullptr, 0, sa,
1017 sa_len, pending_write_->GetCleanOverlapped(),
nullptr);
1018 if ((
rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
1021 OverlappedBuffer::DisposeBuffer(pending_write_);
1022 pending_write_ =
nullptr;
1027bool DatagramSocket::IssueRecvFrom() {
1028 MonitorLocker ml(&monitor_);
1030 ASSERT(!HasPendingRead());
1032 OverlappedBuffer*
buffer =
1033 OverlappedBuffer::AllocateRecvFromBuffer(kMaxUDPPackageLength);
1037 int rc = WSARecvFrom(socket(),
buffer->GetWASBUF(), 1,
nullptr, &
flags,
1039 buffer->GetCleanOverlapped(),
nullptr);
1040 if ((
rc == NO_ERROR) || (WSAGetLastError() == WSA_IO_PENDING)) {
1044 OverlappedBuffer::DisposeBuffer(
buffer);
1045 pending_read_ =
nullptr;
1050void DatagramSocket::EnsureInitialized(
1052 MonitorLocker ml(&monitor_);
1054 ASSERT(event_handler_ ==
nullptr);
1056 CreateCompletionPort(event_handler_->completion_port());
1060bool DatagramSocket::IsClosed() {
1061 return IsClosing() && !HasPendingRead() && !HasPendingWrite();
1064void DatagramSocket::DoClose() {
1066 closesocket(socket());
1072void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
1076 timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
1080 Socket* socket =
reinterpret_cast<Socket*
>(msg->id);
1081 RefCntReleaseScope<Socket> rs(socket);
1082 if (socket->fd() == -1) {
1085 Handle* handle =
reinterpret_cast<Handle*
>(socket->fd());
1086 ASSERT(handle !=
nullptr);
1088 if (handle->is_listen_socket()) {
1089 ListenSocket* listen_socket =
reinterpret_cast<ListenSocket*
>(handle);
1090 listen_socket->EnsureInitialized(
this);
1092 MonitorLocker ml(&listen_socket->monitor_);
1095 listen_socket->ReturnTokens(msg->dart_port,
TOKEN_COUNT(msg->data));
1100 listen_socket->SetPortAndMask(msg->dart_port, events);
1101 TryDispatchingPendingAccepts(listen_socket);
1104 listen_socket->RemovePort(msg->dart_port);
1110 ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
1111 MutexLocker locker(registry->mutex());
1112 if (registry->CloseSafe(socket)) {
1113 ASSERT(listen_socket->Mask() == 0);
1114 listen_socket->Close();
1117 socket->SetClosedFd();
1123 handle->EnsureInitialized(
this);
1124 MonitorLocker ml(&handle->monitor_);
1127 handle->ReturnTokens(msg->dart_port,
TOKEN_COUNT(msg->data));
1133 handle->SetPortAndMask(msg->dart_port, events);
1136 if ((handle->Mask() & (1 <<
kInEvent)) != 0) {
1137 if (handle->is_datagram_socket()) {
1138 handle->IssueRecvFrom();
1139 }
else if (handle->is_client_socket()) {
1140 if (
reinterpret_cast<ClientSocket*
>(handle)->is_connected()) {
1141 handle->IssueRead();
1144 handle->IssueRead();
1151 intptr_t out_event_mask = 1 <<
kOutEvent;
1152 if ((events & out_event_mask) != 0) {
1153 if (!handle->HasPendingWrite()) {
1154 if (handle->is_client_socket()) {
1155 if (
reinterpret_cast<ClientSocket*
>(handle)->is_connected()) {
1157 if ((handle->Mask() & event_mask) != 0) {
1159 DartUtils::PostInt32(
port, event_mask);
1163 if ((handle->Mask() & out_event_mask) != 0) {
1164 Dart_Port port = handle->NextNotifyDartPort(out_event_mask);
1165 DartUtils::PostInt32(
port, out_event_mask);
1172 intptr_t in_event_mask = 1 <<
kInEvent;
1173 if ((events & in_event_mask) != 0) {
1174 if (handle->data_ready_ !=
nullptr &&
1175 !handle->data_ready_->IsEmpty()) {
1176 if ((handle->Mask() & in_event_mask) != 0) {
1178 DartUtils::PostInt32(
port, in_event_mask);
1183 ASSERT(handle->is_client_socket());
1185 ClientSocket* client_socket =
reinterpret_cast<ClientSocket*
>(handle);
1186 client_socket->Shutdown(SD_RECEIVE);
1188 ASSERT(handle->is_client_socket());
1190 ClientSocket* client_socket =
reinterpret_cast<ClientSocket*
>(handle);
1191 client_socket->Shutdown(SD_SEND);
1194 Process::ClearSignalHandlerByFd(socket->fd(), socket->isolate_port());
1196 handle->SetPortAndMask(msg->dart_port, 0);
1204 DeleteIfClosed(handle);
1208void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
1209 OverlappedBuffer*
buffer) {
1210 listen_socket->AcceptComplete(
buffer, completion_port_);
1213 MonitorLocker ml(&listen_socket->monitor_);
1214 TryDispatchingPendingAccepts(listen_socket);
1217 DeleteIfClosed(listen_socket);
1220void EventHandlerImplementation::TryDispatchingPendingAccepts(
1221 ListenSocket* listen_socket) {
1222 if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
1223 intptr_t event_mask = 1 <<
kInEvent;
1224 for (
int i = 0; (
i < listen_socket->accepted_count()) &&
1225 (listen_socket->Mask() == event_mask);
1227 Dart_Port port = listen_socket->NextNotifyDartPort(event_mask);
1228 DartUtils::PostInt32(
port, event_mask);
1233void EventHandlerImplementation::HandleRead(Handle* handle,
1235 OverlappedBuffer*
buffer) {
1236 buffer->set_data_length(bytes);
1237 handle->ReadComplete(
buffer);
1239 if (!handle->IsClosing()) {
1241 if ((handle->Mask() & event_mask) != 0) {
1243 DartUtils::PostInt32(
port, event_mask);
1247 handle->MarkClosedRead();
1249 HandleClosed(handle);
1251 HandleError(handle);
1255 DeleteIfClosed(handle);
1258void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
1260 OverlappedBuffer*
buffer) {
1261 ASSERT(handle->is_datagram_socket());
1263 buffer->set_data_length(bytes);
1264 handle->ReadComplete(
buffer);
1265 if (!handle->IsClosing()) {
1267 if ((handle->Mask() & event_mask) != 0) {
1269 DartUtils::PostInt32(
port, event_mask);
1273 HandleError(handle);
1276 DeleteIfClosed(handle);
1279void EventHandlerImplementation::HandleWrite(Handle* handle,
1281 OverlappedBuffer*
buffer) {
1282 handle->WriteComplete(
buffer);
1285 if (!handle->IsError() && !handle->IsClosing()) {
1287 ASSERT(!handle->is_client_socket() ||
1288 reinterpret_cast<ClientSocket*
>(handle)->is_connected());
1289 if ((handle->Mask() & event_mask) != 0) {
1291 DartUtils::PostInt32(
port, event_mask);
1295 HandleError(handle);
1298 DeleteIfClosed(handle);
1301void EventHandlerImplementation::HandleDisconnect(ClientSocket* client_socket,
1303 OverlappedBuffer*
buffer) {
1304 client_socket->DisconnectComplete(
buffer);
1305 DeleteIfClosed(client_socket);
1308void EventHandlerImplementation::HandleConnect(ClientSocket* client_socket,
1310 OverlappedBuffer*
buffer) {
1312 HandleError(client_socket);
1313 OverlappedBuffer::DisposeBuffer(
buffer);
1315 client_socket->ConnectComplete(
buffer);
1317 client_socket->mark_connected();
1318 DeleteIfClosed(client_socket);
1321void EventHandlerImplementation::HandleTimeout() {
1322 if (!timeout_queue_.HasTimeout()) {
1325 DartUtils::PostNull(timeout_queue_.CurrentPort());
1326 timeout_queue_.RemoveCurrent();
1329void EventHandlerImplementation::HandleIOCompletion(
DWORD bytes,
1332 OverlappedBuffer*
buffer = OverlappedBuffer::GetFromOverlapped(overlapped);
1333 switch (
buffer->operation()) {
1334 case OverlappedBuffer::kAccept: {
1335 ListenSocket* listen_socket =
reinterpret_cast<ListenSocket*
>(
key);
1336 HandleAccept(listen_socket,
buffer);
1340 Handle* handle =
reinterpret_cast<Handle*
>(
key);
1341 HandleRead(handle, bytes,
buffer);
1344 case OverlappedBuffer::kRecvFrom: {
1345 Handle* handle =
reinterpret_cast<Handle*
>(
key);
1346 HandleRecvFrom(handle, bytes,
buffer);
1350 case OverlappedBuffer::kSendTo: {
1351 Handle* handle =
reinterpret_cast<Handle*
>(
key);
1352 HandleWrite(handle, bytes,
buffer);
1355 case OverlappedBuffer::kDisconnect: {
1356 ClientSocket* client_socket =
reinterpret_cast<ClientSocket*
>(
key);
1357 HandleDisconnect(client_socket, bytes,
buffer);
1360 case OverlappedBuffer::kConnect: {
1361 ClientSocket* client_socket =
reinterpret_cast<ClientSocket*
>(
key);
1362 HandleConnect(client_socket, bytes,
buffer);
1370void EventHandlerImplementation::HandleCompletionOrInterrupt(
1382 if ((last_error == ERROR_CONNECTION_ABORTED) ||
1383 (last_error == ERROR_OPERATION_ABORTED) ||
1384 (last_error == ERROR_NETNAME_DELETED) ||
1385 (last_error == ERROR_BROKEN_PIPE)) {
1387 HandleIOCompletion(bytes,
key, overlapped);
1388 }
else if (last_error == ERROR_MORE_DATA) {
1392 HandleIOCompletion(-1,
key, overlapped);
1395 HandleIOCompletion(-1,
key, overlapped);
1397 }
else if (
key == NULL) {
1399 InterruptMessage* msg =
reinterpret_cast<InterruptMessage*
>(overlapped);
1400 HandleInterrupt(msg);
1403 HandleIOCompletion(bytes,
key, overlapped);
1407EventHandlerImplementation::EventHandlerImplementation() {
1408 handler_thread_id_ = Thread::kInvalidThreadId;
1409 handler_thread_handle_ =
nullptr;
1412 if (completion_port_ ==
nullptr) {
1413 FATAL(
"Completion port creation failed");
1418EventHandlerImplementation::~EventHandlerImplementation() {
1420 DWORD res = WaitForSingleObject(handler_thread_handle_, INFINITE);
1421 CloseHandle(handler_thread_handle_);
1422 ASSERT(res == WAIT_OBJECT_0);
1423 CloseHandle(completion_port_);
1426int64_t EventHandlerImplementation::GetTimeout() {
1427 if (!timeout_queue_.HasTimeout()) {
1431 timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis();
1432 return (millis < 0) ? 0 : millis;
1438 InterruptMessage* msg =
new InterruptMessage;
1440 msg->dart_port = dart_port;
1442 BOOL ok = PostQueuedCompletionStatus(completion_port_, 0, NULL,
1445 FATAL(
"PostQueuedCompletionStatus failed");
1449void EventHandlerImplementation::EventHandlerEntry(
uword args) {
1450 EventHandler* handler =
reinterpret_cast<EventHandler*
>(
args);
1451 EventHandlerImplementation* handler_impl = &handler->delegate_;
1452 ASSERT(handler_impl !=
nullptr);
1455 MonitorLocker ml(&handler_impl->startup_monitor_);
1456 handler_impl->handler_thread_id_ = Thread::GetCurrentThreadId();
1457 handler_impl->handler_thread_handle_ =
1458 OpenThread(
SYNCHRONIZE,
false, handler_impl->handler_thread_id_);
1466 while (!handler_impl->shutdown_) {
1467 int64_t millis = handler_impl->GetTimeout();
1474 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1477 if (!
ok && (overlapped ==
nullptr)) {
1480 Syslog::Print(
"Completion port closed\n");
1484 handler_impl->HandleTimeout();
1487 handler_impl->HandleCompletionOrInterrupt(
ok, bytes,
key, overlapped);
1496 ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
1497 &
key, &overlapped, 0);
1498 if (!
ok && (overlapped ==
nullptr)) {
1502 handler_impl->HandleCompletionOrInterrupt(
ok, bytes,
key, overlapped);
1513 const intptr_t stdin_leaked = (StdHandle::StdinPtr() ==
nullptr) ? 0 : 1;
1515 ClientSocket::disconnecting() + stdin_leaked);
1516 DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
1518 handler->NotifyShutdownDone();
1521void EventHandlerImplementation::Start(EventHandler* handler) {
1522 int result = Thread::Start(
"dart:io EventHandler", EventHandlerEntry,
1523 reinterpret_cast<uword>(handler));
1525 FATAL(
"Failed to start event handler thread %d",
result);
1529 MonitorLocker ml(&startup_monitor_);
1530 while (handler_thread_id_ == Thread::kInvalidThreadId) {
static uint32_t buffer_size(uint32_t offset, uint32_t maxAlignment)
static bool ok(int result)
static void operation(T operation, uint32_t &a, uint32_t b, uint32_t c, uint32_t d, uint32_t x, uint8_t s, uint32_t t)
static const size_t kBufferSize
#define DEBUG_ASSERT(cond)
int Read(void *buffer, int num_bytes)
static void DisposeBuffer(OverlappedBuffer *buffer)
static OverlappedBuffer * GetFromOverlapped(OVERLAPPED *overlapped)
static OverlappedBuffer * AllocateRecvFromBuffer(int buffer_size)
static OverlappedBuffer * AllocateAcceptBuffer(int buffer_size)
Operation operation() const
static OverlappedBuffer * AllocateConnectBuffer()
int Write(const void *buffer, int num_bytes)
static OverlappedBuffer * AllocateDisconnectBuffer()
static OverlappedBuffer * AllocateSendToBuffer(int buffer_size)
static OverlappedBuffer * AllocateReadBuffer(int buffer_size)
static OverlappedBuffer * AllocateWriteBuffer(int buffer_size)
#define IS_COMMAND(data, command_bit)
#define IS_SIGNAL_SOCKET(data)
#define TOKEN_COUNT(data)
FlutterSemanticsFlag flags
G_BEGIN_DECLS G_MODULE_EXPORT FlValue * args
const uint8_t uint32_t uint32_t GError ** error
int SendData(MHD_Connection *connection, const SkData *data, const char *type, bool setContentDisposition, const char *dispositionString)
SK_API bool Read(SkStreamSeekable *src, SkDocumentPage *dstArray, int dstArrayCount, const SkDeserialProcs *=nullptr)
static constexpr intptr_t kTimerId
void ReadFile(uint8_t **data, intptr_t *file_len, void *stream)
static constexpr intptr_t kShutdownId
static constexpr intptr_t kInfinityTimeout
static void Shutdown(Dart_NativeArguments args)
static EventHandler * event_handler
bool WriteFile(const std::string &path, const char *data, ssize_t size)
constexpr int32_t kMaxInt32
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service port
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service A custom Dart VM Service port The default is to pick a randomly available open port disable vm Disable the Dart VM Service The Dart VM Service is never available in release mode disable vm service Disable mDNS Dart VM Service publication Bind to the IPv6 localhost address for the Dart VM Service Ignored if vm service host is set endless trace buffer
void Close(PathBuilder *builder)
def timeout(deadline, cmd)
std::shared_ptr< const fml::Mapping > data
#define INVALID_HANDLE_VALUE
WINBASEAPI VOID WINAPI SetLastError(_In_ DWORD dwErrCode)
WINBASEAPI _Check_return_ _Post_equals_last_error_ DWORD WINAPI GetLastError(VOID)
struct _OVERLAPPED OVERLAPPED
__w64 unsigned long ULONG_PTR