Flutter Engine
The Flutter Engine
DartMessenger.java
Go to the documentation of this file.
1// Copyright 2013 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5package io.flutter.embedding.engine.dart;
6
7import androidx.annotation.NonNull;
8import androidx.annotation.Nullable;
9import androidx.annotation.UiThread;
10import io.flutter.FlutterInjector;
11import io.flutter.Log;
12import io.flutter.embedding.engine.FlutterJNI;
13import io.flutter.plugin.common.BinaryMessenger;
14import io.flutter.util.TraceSection;
15import java.nio.ByteBuffer;
16import java.util.HashMap;
17import java.util.LinkedList;
18import java.util.List;
19import java.util.Map;
20import java.util.WeakHashMap;
21import java.util.concurrent.ConcurrentLinkedQueue;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.atomic.AtomicBoolean;
24
25/**
26 * Message conduit for 2-way communication between Android and Dart.
27 *
28 * <p>See {@link BinaryMessenger}, which sends messages from Android to Dart
29 *
30 * <p>See {@link PlatformMessageHandler}, which handles messages to Android from Dart
31 */
33 private static final String TAG = "DartMessenger";
34
35 @NonNull private final FlutterJNI flutterJNI;
36
37 /**
38 * Maps a channel name to an object that contains the task queue and the handler associated with
39 * the channel.
40 *
41 * <p>Reads and writes to this map must lock {@code handlersLock}.
42 */
43 @NonNull private final Map<String, HandlerInfo> messageHandlers = new HashMap<>();
44
45 /**
46 * Maps a channel name to an object that holds information about the incoming Dart message.
47 *
48 * <p>Reads and writes to this map must lock {@code handlersLock}.
49 */
50 @NonNull private Map<String, List<BufferedMessageInfo>> bufferedMessages = new HashMap<>();
51
52 @NonNull private final Object handlersLock = new Object();
53 @NonNull private final AtomicBoolean enableBufferingIncomingMessages = new AtomicBoolean(false);
54
55 @NonNull private final Map<Integer, BinaryMessenger.BinaryReply> pendingReplies = new HashMap<>();
56 private int nextReplyId = 1;
57
58 @NonNull private final DartMessengerTaskQueue platformTaskQueue = new PlatformTaskQueue();
59
60 @NonNull
61 private WeakHashMap<TaskQueue, DartMessengerTaskQueue> createdTaskQueues =
62 new WeakHashMap<TaskQueue, DartMessengerTaskQueue>();
63
64 @NonNull private TaskQueueFactory taskQueueFactory;
65
66 DartMessenger(@NonNull FlutterJNI flutterJNI, @NonNull TaskQueueFactory taskQueueFactory) {
67 this.flutterJNI = flutterJNI;
68 this.taskQueueFactory = taskQueueFactory;
69 }
70
71 DartMessenger(@NonNull FlutterJNI flutterJNI) {
72 this(flutterJNI, new DefaultTaskQueueFactory());
73 }
74
75 private static class TaskQueueToken implements TaskQueue {}
76
78 void dispatch(@NonNull Runnable runnable);
79 }
80
81 interface TaskQueueFactory {
83 }
84
85 private static class DefaultTaskQueueFactory implements TaskQueueFactory {
86 ExecutorService executorService;
87
88 DefaultTaskQueueFactory() {
89 executorService = FlutterInjector.instance().executorService();
90 }
91
93 if (options.getIsSerial()) {
94 return new SerialTaskQueue(executorService);
95 } else {
96 return new ConcurrentTaskQueue(executorService);
97 }
98 }
99 }
100
101 /**
102 * Holds information about a platform handler, such as the task queue that processes messages from
103 * Dart.
104 */
105 private static class HandlerInfo {
106 @NonNull public final BinaryMessenger.BinaryMessageHandler handler;
107 @Nullable public final DartMessengerTaskQueue taskQueue;
108
109 HandlerInfo(
110 @NonNull BinaryMessenger.BinaryMessageHandler handler,
111 @Nullable DartMessengerTaskQueue taskQueue) {
112 this.handler = handler;
113 this.taskQueue = taskQueue;
114 }
115 }
116
117 /**
118 * Holds information that allows to dispatch a Dart message to a platform handler when it becomes
119 * available.
120 */
121 private static class BufferedMessageInfo {
122 @NonNull public final ByteBuffer message;
123 int replyId;
124 long messageData;
125
126 BufferedMessageInfo(@NonNull ByteBuffer message, int replyId, long messageData) {
127 this.message = message;
128 this.replyId = replyId;
129 this.messageData = messageData;
130 }
131 }
132
134 @NonNull private final ExecutorService executor;
135
136 ConcurrentTaskQueue(ExecutorService executor) {
137 this.executor = executor;
138 }
139
140 @Override
141 public void dispatch(@NonNull Runnable runnable) {
142 executor.execute(runnable);
143 }
144 }
145
146 /** A serial task queue that can run on a concurrent ExecutorService. */
147 static class SerialTaskQueue implements DartMessengerTaskQueue {
148 @NonNull private final ExecutorService executor;
149 @NonNull private final ConcurrentLinkedQueue<Runnable> queue;
150 @NonNull private final AtomicBoolean isRunning;
151
152 SerialTaskQueue(ExecutorService executor) {
153 this.executor = executor;
154 queue = new ConcurrentLinkedQueue<>();
155 isRunning = new AtomicBoolean(false);
156 }
157
158 @Override
159 public void dispatch(@NonNull Runnable runnable) {
160 queue.add(runnable);
161 executor.execute(
162 () -> {
163 flush();
164 });
165 }
166
167 private void flush() {
168 // Don't execute if we are already executing (enforce serial execution).
169 if (isRunning.compareAndSet(false, true)) {
170 try {
171 @Nullable Runnable runnable = queue.poll();
172 if (runnable != null) {
173 runnable.run();
174 }
175 } finally {
176 isRunning.set(false);
177 if (!queue.isEmpty()) {
178 // Schedule the next event.
179 executor.execute(
180 () -> {
181 flush();
182 });
183 }
184 }
185 }
186 }
187 }
188
189 @Override
191 DartMessengerTaskQueue taskQueue = taskQueueFactory.makeBackgroundTaskQueue(options);
192 TaskQueueToken token = new TaskQueueToken();
193 createdTaskQueues.put(token, taskQueue);
194 return token;
195 }
196
197 @Override
198 public void setMessageHandler(
199 @NonNull String channel, @Nullable BinaryMessenger.BinaryMessageHandler handler) {
200 setMessageHandler(channel, handler, null);
201 }
202
203 @Override
204 public void setMessageHandler(
205 @NonNull String channel,
206 @Nullable BinaryMessenger.BinaryMessageHandler handler,
207 @Nullable TaskQueue taskQueue) {
208 if (handler == null) {
209 Log.v(TAG, "Removing handler for channel '" + channel + "'");
210 synchronized (handlersLock) {
211 messageHandlers.remove(channel);
212 }
213 return;
214 }
215 DartMessengerTaskQueue dartMessengerTaskQueue = null;
216 if (taskQueue != null) {
217 dartMessengerTaskQueue = createdTaskQueues.get(taskQueue);
218 if (dartMessengerTaskQueue == null) {
219 throw new IllegalArgumentException(
220 "Unrecognized TaskQueue, use BinaryMessenger to create your TaskQueue (ex makeBackgroundTaskQueue).");
221 }
222 }
223 Log.v(TAG, "Setting handler for channel '" + channel + "'");
224
226 synchronized (handlersLock) {
227 messageHandlers.put(channel, new HandlerInfo(handler, dartMessengerTaskQueue));
228 list = bufferedMessages.remove(channel);
229 if (list == null) {
230 return;
231 }
232 }
233 for (BufferedMessageInfo info : list) {
234 dispatchMessageToQueue(
235 channel, messageHandlers.get(channel), info.message, info.replyId, info.messageData);
236 }
237 }
238
239 @Override
241 enableBufferingIncomingMessages.set(true);
242 }
243
244 @Override
246 Map<String, List<BufferedMessageInfo>> pendingMessages;
247 synchronized (handlersLock) {
248 enableBufferingIncomingMessages.set(false);
249 pendingMessages = bufferedMessages;
250 bufferedMessages = new HashMap<>();
251 }
252 for (Map.Entry<String, List<BufferedMessageInfo>> channel : pendingMessages.entrySet()) {
253 for (BufferedMessageInfo info : channel.getValue()) {
254 dispatchMessageToQueue(
255 channel.getKey(), null, info.message, info.replyId, info.messageData);
256 }
257 }
258 }
259
260 @Override
261 @UiThread
262 public void send(@NonNull String channel, @NonNull ByteBuffer message) {
263 Log.v(TAG, "Sending message over channel '" + channel + "'");
264 send(channel, message, null);
265 }
266
267 @Override
268 public void send(
269 @NonNull String channel,
270 @Nullable ByteBuffer message,
272 try (TraceSection e = TraceSection.scoped("DartMessenger#send on " + channel)) {
273 Log.v(TAG, "Sending message with callback over channel '" + channel + "'");
274 int replyId = nextReplyId++;
275 if (callback != null) {
276 pendingReplies.put(replyId, callback);
277 }
278 if (message == null) {
279 flutterJNI.dispatchEmptyPlatformMessage(channel, replyId);
280 } else {
281 flutterJNI.dispatchPlatformMessage(channel, message, message.position(), replyId);
282 }
283 }
284 }
285
286 private void invokeHandler(
287 @Nullable HandlerInfo handlerInfo, @Nullable ByteBuffer message, final int replyId) {
288 // Called from any thread.
289 if (handlerInfo != null) {
290 try {
291 Log.v(TAG, "Deferring to registered handler to process message.");
292 handlerInfo.handler.onMessage(message, new Reply(flutterJNI, replyId));
293 } catch (Exception ex) {
294 Log.e(TAG, "Uncaught exception in binary message listener", ex);
295 flutterJNI.invokePlatformMessageEmptyResponseCallback(replyId);
296 } catch (Error err) {
297 handleError(err);
298 }
299 } else {
300 Log.v(TAG, "No registered handler for message. Responding to Dart with empty reply message.");
301 flutterJNI.invokePlatformMessageEmptyResponseCallback(replyId);
302 }
303 }
304
305 private void dispatchMessageToQueue(
306 @NonNull String channel,
307 @Nullable HandlerInfo handlerInfo,
308 @Nullable ByteBuffer message,
309 int replyId,
310 long messageData) {
311 // Called from any thread.
312 final DartMessengerTaskQueue taskQueue = (handlerInfo != null) ? handlerInfo.taskQueue : null;
313 TraceSection.beginAsyncSection("PlatformChannel ScheduleHandler on " + channel, replyId);
314 Runnable myRunnable =
315 () -> {
316 TraceSection.endAsyncSection("PlatformChannel ScheduleHandler on " + channel, replyId);
317 try (TraceSection e =
318 TraceSection.scoped("DartMessenger#handleMessageFromDart on " + channel)) {
319 invokeHandler(handlerInfo, message, replyId);
320 if (message != null && message.isDirect()) {
321 // This ensures that if a user retains an instance to the ByteBuffer and it
322 // happens to be direct they will get a deterministic error.
323 message.limit(0);
324 }
325 } finally {
326 // This is deleting the data underneath the message object.
327 flutterJNI.cleanupMessageData(messageData);
328 }
329 };
330 final DartMessengerTaskQueue nonnullTaskQueue =
331 taskQueue == null ? platformTaskQueue : taskQueue;
332 nonnullTaskQueue.dispatch(myRunnable);
333 }
334
335 @Override
337 @NonNull String channel, @Nullable ByteBuffer message, int replyId, long messageData) {
338 // Called from any thread.
339 Log.v(TAG, "Received message from Dart over channel '" + channel + "'");
340
341 HandlerInfo handlerInfo;
342 boolean messageDeferred;
343 // This lock can potentially be a bottleneck and could replaced with a
344 // read/write lock.
345 synchronized (handlersLock) {
346 handlerInfo = messageHandlers.get(channel);
347 messageDeferred = (enableBufferingIncomingMessages.get() && handlerInfo == null);
348 if (messageDeferred) {
349 // The channel is not defined when the Dart VM sends a message before the channels are
350 // registered.
351 //
352 // This is possible if the Dart VM starts before channel registration, and if the thread
353 // that registers the channels is busy or slow at registering the channel handlers.
354 //
355 // In such cases, the task dispatchers are queued, and processed when the channel is
356 // defined.
357 if (!bufferedMessages.containsKey(channel)) {
358 bufferedMessages.put(channel, new LinkedList<>());
359 }
360 List<BufferedMessageInfo> buffer = bufferedMessages.get(channel);
361 buffer.add(new BufferedMessageInfo(message, replyId, messageData));
362 }
363 }
364 if (!messageDeferred) {
365 dispatchMessageToQueue(channel, handlerInfo, message, replyId, messageData);
366 }
367 }
368
369 @Override
370 public void handlePlatformMessageResponse(int replyId, @Nullable ByteBuffer reply) {
371 Log.v(TAG, "Received message reply from Dart.");
372 BinaryMessenger.BinaryReply callback = pendingReplies.remove(replyId);
373 if (callback != null) {
374 try {
375 Log.v(TAG, "Invoking registered callback for reply from Dart.");
376 callback.reply(reply);
377 if (reply != null && reply.isDirect()) {
378 // This ensures that if a user retains an instance to the ByteBuffer and it happens to
379 // be direct they will get a deterministic error.
380 reply.limit(0);
381 }
382 } catch (Exception ex) {
383 Log.e(TAG, "Uncaught exception in binary message reply handler", ex);
384 } catch (Error err) {
385 handleError(err);
386 }
387 }
388 }
389
390 /**
391 * Returns the number of pending channel callback replies.
392 *
393 * <p>When sending messages to the Flutter application using {@link BinaryMessenger#send(String,
394 * ByteBuffer, io.flutter.plugin.common.BinaryMessenger.BinaryReply)}, developers can optionally
395 * specify a reply callback if they expect a reply from the Flutter application.
396 *
397 * <p>This method tracks all the pending callbacks that are waiting for response, and is supposed
398 * to be called from the main thread (as other methods). Calling from a different thread could
399 * possibly capture an indeterministic internal state, so don't do it.
400 */
401 @UiThread
403 return pendingReplies.size();
404 }
405
406 // Handles `Error` objects which are not supposed to be caught.
407 //
408 // We forward them to the thread's uncaught exception handler if there is one. If not, they
409 // are rethrown.
410 private static void handleError(Error err) {
411 Thread currentThread = Thread.currentThread();
412 if (currentThread.getUncaughtExceptionHandler() == null) {
413 throw err;
414 }
415 currentThread.getUncaughtExceptionHandler().uncaughtException(currentThread, err);
416 }
417
418 static class Reply implements BinaryMessenger.BinaryReply {
419 @NonNull private final FlutterJNI flutterJNI;
420 private final int replyId;
421 private final AtomicBoolean done = new AtomicBoolean(false);
422
423 Reply(@NonNull FlutterJNI flutterJNI, int replyId) {
424 this.flutterJNI = flutterJNI;
425 this.replyId = replyId;
426 }
427
428 @Override
429 public void reply(@Nullable ByteBuffer reply) {
430 if (done.getAndSet(true)) {
431 throw new IllegalStateException("Reply already submitted");
432 }
433 if (reply == null) {
434 flutterJNI.invokePlatformMessageEmptyResponseCallback(replyId);
435 } else {
436 flutterJNI.invokePlatformMessageResponseCallback(replyId, reply, reply.position());
437 }
438 }
439 }
440}
const char * options
static void info(const char *fmt,...) SK_PRINTF_LIKE(1
Definition: DM.cpp:213
static void v(@NonNull String tag, @NonNull String message)
Definition: Log.java:40
static void e(@NonNull String tag, @NonNull String message)
Definition: Log.java:84
Reply(@NonNull FlutterJNI flutterJNI, int replyId)
void setMessageHandler( @NonNull String channel, @Nullable BinaryMessenger.BinaryMessageHandler handler)
void handleMessageFromDart( @NonNull String channel, @Nullable ByteBuffer message, int replyId, long messageData)
TaskQueue makeBackgroundTaskQueue(TaskQueueOptions options)
void send(@NonNull String channel, @NonNull ByteBuffer message)
void setMessageHandler( @NonNull String channel, @Nullable BinaryMessenger.BinaryMessageHandler handler, @Nullable TaskQueue taskQueue)
void send( @NonNull String channel, @Nullable ByteBuffer message, @Nullable BinaryMessenger.BinaryReply callback)
void handlePlatformMessageResponse(int replyId, @Nullable ByteBuffer reply)
DartMessenger(@NonNull FlutterJNI flutterJNI)
DartMessenger(@NonNull FlutterJNI flutterJNI, @NonNull TaskQueueFactory taskQueueFactory)
static TraceSection scoped(String name)
VkQueue queue
Definition: main.cc:55
FlKeyEvent uint64_t FlKeyResponderAsyncCallback callback
DartMessengerTaskQueue makeBackgroundTaskQueue(TaskQueueOptions options)
Win32Message message
void Log(const char *format,...) SK_PRINTF_LIKE(1
Definition: TestRunner.cpp:137
DEF_SWITCHES_START aot vmservice shared library Name of the *so containing AOT compiled Dart assets for launching the service isolate vm snapshot The VM snapshot data that will be memory mapped as read only SnapshotAssetPath must be present isolate snapshot The isolate snapshot data that will be memory mapped as read only SnapshotAssetPath must be present cache dir Path to the cache directory This is different from the persistent_cache_path in embedder which is used for Skia shader cache icu native lib Path to the library file that exports the ICU data vm service The hostname IP address on which the Dart VM Service should be served If not defaults to or::depending on whether ipv6 is specified vm service A custom Dart VM Service port The default is to pick a randomly available open port disable vm Disable the Dart VM Service The Dart VM Service is never available in release mode disable vm service Disable mDNS Dart VM Service publication Bind to the IPv6 localhost address for the Dart VM Service Ignored if vm service host is set endless trace buffer
Definition: switches.h:126
#define TAG()