Flutter Engine
message_loop_unittests.cc
Go to the documentation of this file.
1 // Copyright 2013 The Flutter Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #define FML_USED_ON_EMBEDDER
6 
7 #include "flutter/fml/message_loop.h"
8 
9 #include <iostream>
10 #include <thread>
11 
12 #include "flutter/fml/build_config.h"
13 #include "flutter/fml/concurrent_message_loop.h"
14 #include "flutter/fml/synchronization/count_down_latch.h"
15 #include "flutter/fml/synchronization/waitable_event.h"
16 #include "flutter/fml/task_runner.h"
17 #include "gtest/gtest.h"
18 
19 #define TIMESENSITIVE(x) TimeSensitiveTest_##x
20 #if OS_WIN
21 #define PLATFORM_SPECIFIC_CAPTURE(...) [ __VA_ARGS__, count ]
22 #else
23 #define PLATFORM_SPECIFIC_CAPTURE(...) [__VA_ARGS__]
24 #endif
25 
26 TEST(MessageLoop, GetCurrent) {
27  std::thread thread([]() {
29  ASSERT_TRUE(fml::MessageLoop::GetCurrent().GetTaskRunner());
30  });
31  thread.join();
32 }
33 
34 TEST(MessageLoop, DifferentThreadsHaveDifferentLoops) {
35  fml::MessageLoop* loop1 = nullptr;
38  std::thread thread1([&loop1, &latch1, &term1]() {
41  latch1.Signal();
42  term1.Wait();
43  });
44 
45  fml::MessageLoop* loop2 = nullptr;
48  std::thread thread2([&loop2, &latch2, &term2]() {
51  latch2.Signal();
52  term2.Wait();
53  });
54  latch1.Wait();
55  latch2.Wait();
56  ASSERT_FALSE(loop1 == loop2);
57  term1.Signal();
58  term2.Signal();
59  thread1.join();
60  thread2.join();
61 }
62 
63 TEST(MessageLoop, CanRunAndTerminate) {
64  bool started = false;
65  bool terminated = false;
66  std::thread thread([&started, &terminated]() {
68  auto& loop = fml::MessageLoop::GetCurrent();
69  ASSERT_TRUE(loop.GetTaskRunner());
70  loop.GetTaskRunner()->PostTask([&terminated]() {
72  terminated = true;
73  });
74  loop.Run();
75  started = true;
76  });
77  thread.join();
78  ASSERT_TRUE(started);
79  ASSERT_TRUE(terminated);
80 }
81 
82 TEST(MessageLoop, NonDelayedTasksAreRunInOrder) {
83  const size_t count = 100;
84  bool started = false;
85  bool terminated = false;
86  std::thread thread([&started, &terminated, count]() {
88  auto& loop = fml::MessageLoop::GetCurrent();
89  size_t current = 0;
90  for (size_t i = 0; i < count; i++) {
91  loop.GetTaskRunner()->PostTask(
92  PLATFORM_SPECIFIC_CAPTURE(&terminated, i, &current)() {
93  ASSERT_EQ(current, i);
94  current++;
95  if (count == i + 1) {
97  terminated = true;
98  }
99  });
100  }
101  loop.Run();
102  ASSERT_EQ(current, count);
103  started = true;
104  });
105  thread.join();
106  ASSERT_TRUE(started);
107  ASSERT_TRUE(terminated);
108 }
109 
110 TEST(MessageLoop, DelayedTasksAtSameTimeAreRunInOrder) {
111  const size_t count = 100;
112  bool started = false;
113  bool terminated = false;
114  std::thread thread([&started, &terminated, count]() {
116  auto& loop = fml::MessageLoop::GetCurrent();
117  size_t current = 0;
118  const auto now_plus_some =
120  for (size_t i = 0; i < count; i++) {
121  loop.GetTaskRunner()->PostTaskForTime(
122  PLATFORM_SPECIFIC_CAPTURE(&terminated, i, &current)() {
123  ASSERT_EQ(current, i);
124  current++;
125  if (count == i + 1) {
127  terminated = true;
128  }
129  },
130  now_plus_some);
131  }
132  loop.Run();
133  ASSERT_EQ(current, count);
134  started = true;
135  });
136  thread.join();
137  ASSERT_TRUE(started);
138  ASSERT_TRUE(terminated);
139 }
140 
141 TEST(MessageLoop, CheckRunsTaskOnCurrentThread) {
144  std::thread thread([&runner, &latch]() {
146  auto& loop = fml::MessageLoop::GetCurrent();
147  runner = loop.GetTaskRunner();
148  latch.Signal();
149  ASSERT_TRUE(loop.GetTaskRunner()->RunsTasksOnCurrentThread());
150  });
151  latch.Wait();
152  ASSERT_TRUE(runner);
153  ASSERT_FALSE(runner->RunsTasksOnCurrentThread());
154  thread.join();
155 }
156 
157 TEST(MessageLoop, TIMESENSITIVE(SingleDelayedTaskByDelta)) {
158  bool checked = false;
159  std::thread thread([&checked]() {
161  auto& loop = fml::MessageLoop::GetCurrent();
162  auto begin = fml::TimePoint::Now();
163  loop.GetTaskRunner()->PostDelayedTask(
164  [begin, &checked]() {
165  auto delta = fml::TimePoint::Now() - begin;
166  auto ms = delta.ToMillisecondsF();
167  ASSERT_GE(ms, 3);
168  ASSERT_LE(ms, 7);
169  checked = true;
171  },
173  loop.Run();
174  });
175  thread.join();
176  ASSERT_TRUE(checked);
177 }
178 
179 TEST(MessageLoop, TIMESENSITIVE(SingleDelayedTaskForTime)) {
180  bool checked = false;
181  std::thread thread([&checked]() {
183  auto& loop = fml::MessageLoop::GetCurrent();
184  auto begin = fml::TimePoint::Now();
185  loop.GetTaskRunner()->PostTaskForTime(
186  [begin, &checked]() {
187  auto delta = fml::TimePoint::Now() - begin;
188  auto ms = delta.ToMillisecondsF();
189  ASSERT_GE(ms, 3);
190  ASSERT_LE(ms, 7);
191  checked = true;
193  },
195  loop.Run();
196  });
197  thread.join();
198  ASSERT_TRUE(checked);
199 }
200 
201 TEST(MessageLoop, TIMESENSITIVE(MultipleDelayedTasksWithIncreasingDeltas)) {
202  const auto count = 10;
203  int checked = false;
204  std::thread thread(PLATFORM_SPECIFIC_CAPTURE(&checked)() {
206  auto& loop = fml::MessageLoop::GetCurrent();
207  for (int target_ms = 0 + 2; target_ms < count + 2; target_ms++) {
208  auto begin = fml::TimePoint::Now();
209  loop.GetTaskRunner()->PostDelayedTask(
210  PLATFORM_SPECIFIC_CAPTURE(begin, target_ms, &checked)() {
211  auto delta = fml::TimePoint::Now() - begin;
212  auto ms = delta.ToMillisecondsF();
213  ASSERT_GE(ms, target_ms - 2);
214  ASSERT_LE(ms, target_ms + 2);
215  checked++;
216  if (checked == count) {
218  }
219  },
221  }
222  loop.Run();
223  });
224  thread.join();
225  ASSERT_EQ(checked, count);
226 }
227 
228 TEST(MessageLoop, TIMESENSITIVE(MultipleDelayedTasksWithDecreasingDeltas)) {
229  const auto count = 10;
230  int checked = false;
231  std::thread thread(PLATFORM_SPECIFIC_CAPTURE(&checked)() {
233  auto& loop = fml::MessageLoop::GetCurrent();
234  for (int target_ms = count + 2; target_ms > 0 + 2; target_ms--) {
235  auto begin = fml::TimePoint::Now();
236  loop.GetTaskRunner()->PostDelayedTask(
237  PLATFORM_SPECIFIC_CAPTURE(begin, target_ms, &checked)() {
238  auto delta = fml::TimePoint::Now() - begin;
239  auto ms = delta.ToMillisecondsF();
240  ASSERT_GE(ms, target_ms - 2);
241  ASSERT_LE(ms, target_ms + 2);
242  checked++;
243  if (checked == count) {
245  }
246  },
248  }
249  loop.Run();
250  });
251  thread.join();
252  ASSERT_EQ(checked, count);
253 }
254 
255 TEST(MessageLoop, TaskObserverFire) {
256  bool started = false;
257  bool terminated = false;
258  std::thread thread([&started, &terminated]() {
260  const size_t count = 25;
261  auto& loop = fml::MessageLoop::GetCurrent();
262  size_t task_count = 0;
263  size_t obs_count = 0;
264  auto obs = PLATFORM_SPECIFIC_CAPTURE(&obs_count)() { obs_count++; };
265  for (size_t i = 0; i < count; i++) {
266  loop.GetTaskRunner()->PostTask(
267  PLATFORM_SPECIFIC_CAPTURE(&terminated, i, &task_count)() {
268  ASSERT_EQ(task_count, i);
269  task_count++;
270  if (count == i + 1) {
272  terminated = true;
273  }
274  });
275  }
276  loop.AddTaskObserver(0, obs);
277  loop.Run();
278  ASSERT_EQ(task_count, count);
279  ASSERT_EQ(obs_count, count);
280  started = true;
281  });
282  thread.join();
283  ASSERT_TRUE(started);
284  ASSERT_TRUE(terminated);
285 }
286 
287 TEST(MessageLoop, ConcurrentMessageLoopHasNonZeroWorkers) {
289  0u /* explicitly specify zero workers */);
290  ASSERT_GT(loop->GetWorkerCount(), 0u);
291 }
292 
293 TEST(MessageLoop, CanCreateAndShutdownConcurrentMessageLoopsOverAndOver) {
294  for (size_t i = 0; i < 10; ++i) {
295  auto loop = fml::ConcurrentMessageLoop::Create(i + 1);
296  ASSERT_EQ(loop->GetWorkerCount(), i + 1);
297  }
298 }
299 
300 TEST(MessageLoop, CanCreateConcurrentMessageLoop) {
302  auto task_runner = loop->GetTaskRunner();
303  const size_t kCount = 10;
304  fml::CountDownLatch latch(kCount);
305  std::mutex thread_ids_mutex;
306  std::set<std::thread::id> thread_ids;
307  for (size_t i = 0; i < kCount; ++i) {
308  task_runner->PostTask([&]() {
309  std::this_thread::sleep_for(std::chrono::seconds(1));
310  std::cout << "Ran on thread: " << std::this_thread::get_id() << std::endl;
311  std::scoped_lock lock(thread_ids_mutex);
312  thread_ids.insert(std::this_thread::get_id());
313  latch.CountDown();
314  });
315  }
316  latch.Wait();
317  ASSERT_GE(thread_ids.size(), 1u);
318 }
#define TIMESENSITIVE(x)
static FML_EMBEDDER_ONLY MessageLoop & GetCurrent()
Definition: message_loop.cc:19
static void EnsureInitializedForCurrentThread()
Definition: message_loop.cc:27
static std::shared_ptr< ConcurrentMessageLoop > Create(size_t worker_count=std::thread::hardware_concurrency())
#define PLATFORM_SPECIFIC_CAPTURE(...)
static constexpr TimeDelta FromMilliseconds(int64_t millis)
Definition: time_delta.h:46
virtual bool RunsTasksOnCurrentThread()
Definition: task_runner.cc:43
TEST(MessageLoop, GetCurrent)
static TimePoint Now()
Definition: time_point.cc:26