33 private static final String TAG =
"DartMessenger";
43 @NonNull
private final Map<String, HandlerInfo> messageHandlers =
new HashMap<>();
50 @NonNull
private Map<String, List<BufferedMessageInfo>> bufferedMessages =
new HashMap<>();
52 @NonNull
private final Object handlersLock =
new Object();
53 @NonNull
private final AtomicBoolean enableBufferingIncomingMessages =
new AtomicBoolean(
false);
55 @NonNull
private final Map<Integer, BinaryMessenger.BinaryReply> pendingReplies =
new HashMap<>();
56 private int nextReplyId = 1;
61 private WeakHashMap<TaskQueue, DartMessengerTaskQueue> createdTaskQueues =
62 new WeakHashMap<TaskQueue, DartMessengerTaskQueue>();
67 this.flutterJNI = flutterJNI;
68 this.taskQueueFactory = taskQueueFactory;
72 this(flutterJNI,
new DefaultTaskQueueFactory());
75 private static class TaskQueueToken
implements TaskQueue {}
86 ExecutorService executorService;
88 DefaultTaskQueueFactory() {
92 public DartMessengerTaskQueue makeBackgroundTaskQueue(TaskQueueOptions
options) {
94 return new SerialTaskQueue(executorService);
96 return new ConcurrentTaskQueue(executorService);
105 private static class HandlerInfo {
106 @NonNull
public final BinaryMessenger.BinaryMessageHandler handler;
107 @Nullable
public final DartMessengerTaskQueue taskQueue;
110 @NonNull BinaryMessenger.BinaryMessageHandler handler,
111 @Nullable DartMessengerTaskQueue taskQueue) {
112 this.handler = handler;
113 this.taskQueue = taskQueue;
121 private static class BufferedMessageInfo {
122 @NonNull
public final ByteBuffer
message;
126 BufferedMessageInfo(@NonNull ByteBuffer message,
int replyId,
long messageData) {
128 this.replyId = replyId;
129 this.messageData = messageData;
134 @NonNull
private final ExecutorService executor;
137 this.executor = executor;
142 executor.execute(runnable);
148 @NonNull
private final ExecutorService executor;
149 @NonNull
private final ConcurrentLinkedQueue<Runnable> queue;
150 @NonNull
private final AtomicBoolean isRunning;
153 this.executor = executor;
154 queue =
new ConcurrentLinkedQueue<>();
155 isRunning =
new AtomicBoolean(
false);
167 private void flush() {
169 if (isRunning.compareAndSet(
false,
true)) {
171 @Nullable Runnable runnable =
queue.poll();
172 if (runnable !=
null) {
176 isRunning.set(
false);
177 if (!
queue.isEmpty()) {
192 TaskQueueToken token =
new TaskQueueToken();
193 createdTaskQueues.put(token, taskQueue);
199 @NonNull String channel, @Nullable BinaryMessenger.BinaryMessageHandler handler) {
205 @NonNull String channel,
206 @Nullable BinaryMessenger.BinaryMessageHandler handler,
207 @Nullable TaskQueue taskQueue) {
208 if (handler ==
null) {
209 Log.
v(
TAG,
"Removing handler for channel '" + channel +
"'");
210 synchronized (handlersLock) {
211 messageHandlers.remove(channel);
216 if (taskQueue !=
null) {
217 dartMessengerTaskQueue = createdTaskQueues.get(taskQueue);
218 if (dartMessengerTaskQueue ==
null) {
219 throw new IllegalArgumentException(
220 "Unrecognized TaskQueue, use BinaryMessenger to create your TaskQueue (ex makeBackgroundTaskQueue).");
223 Log.
v(
TAG,
"Setting handler for channel '" + channel +
"'");
226 synchronized (handlersLock) {
227 messageHandlers.put(channel,
new HandlerInfo(handler, dartMessengerTaskQueue));
228 list = bufferedMessages.remove(channel);
233 for (BufferedMessageInfo
info : list) {
234 dispatchMessageToQueue(
235 channel, messageHandlers.get(channel),
info.message,
info.replyId,
info.messageData);
241 enableBufferingIncomingMessages.set(
true);
246 Map<String, List<BufferedMessageInfo>> pendingMessages;
247 synchronized (handlersLock) {
248 enableBufferingIncomingMessages.set(
false);
249 pendingMessages = bufferedMessages;
250 bufferedMessages =
new HashMap<>();
253 for (BufferedMessageInfo
info : channel.getValue()) {
254 dispatchMessageToQueue(
255 channel.getKey(),
null,
info.message,
info.replyId,
info.messageData);
262 public void send(@NonNull String channel, @NonNull ByteBuffer
message) {
263 Log.
v(
TAG,
"Sending message over channel '" + channel +
"'");
269 @NonNull String channel,
271 @Nullable BinaryMessenger.BinaryReply
callback) {
272 try (TraceSection e = TraceSection.scoped(
"DartMessenger#send on " + channel)) {
273 Log.
v(
TAG,
"Sending message with callback over channel '" + channel +
"'");
274 int replyId = nextReplyId++;
276 pendingReplies.put(replyId,
callback);
279 flutterJNI.dispatchEmptyPlatformMessage(channel, replyId);
281 flutterJNI.dispatchPlatformMessage(channel,
message,
message.position(), replyId);
286 private void invokeHandler(
287 @Nullable HandlerInfo handlerInfo, @Nullable ByteBuffer
message,
final int replyId) {
289 if (handlerInfo !=
null) {
291 Log.
v(TAG,
"Deferring to registered handler to process message.");
292 handlerInfo.handler.onMessage(
message,
new Reply(flutterJNI, replyId));
293 }
catch (Exception ex) {
294 Log.
e(TAG,
"Uncaught exception in binary message listener", ex);
295 flutterJNI.invokePlatformMessageEmptyResponseCallback(replyId);
296 }
catch (Error err) {
300 Log.v(TAG,
"No registered handler for message. Responding to Dart with empty reply message.");
301 flutterJNI.invokePlatformMessageEmptyResponseCallback(replyId);
305 private void dispatchMessageToQueue(
306 @NonNull String channel,
307 @Nullable HandlerInfo handlerInfo,
312 final DartMessengerTaskQueue taskQueue = (handlerInfo !=
null) ? handlerInfo.taskQueue : null;
313 TraceSection.beginAsyncSection(
"PlatformChannel ScheduleHandler on " + channel, replyId);
314 Runnable myRunnable =
316 TraceSection.endAsyncSection(
"PlatformChannel ScheduleHandler on " + channel, replyId);
317 try (TraceSection e =
318 TraceSection.scoped(
"DartMessenger#handleMessageFromDart on " + channel)) {
319 invokeHandler(handlerInfo,
message, replyId);
327 flutterJNI.cleanupMessageData(messageData);
330 final DartMessengerTaskQueue nonnullTaskQueue =
331 taskQueue ==
null ? platformTaskQueue : taskQueue;
332 nonnullTaskQueue.dispatch(myRunnable);
337 @NonNull String channel, @Nullable ByteBuffer
message,
int replyId,
long messageData) {
339 Log.
v(
TAG,
"Received message from Dart over channel '" + channel +
"'");
341 HandlerInfo handlerInfo;
342 boolean messageDeferred;
345 synchronized (handlersLock) {
346 handlerInfo = messageHandlers.get(channel);
347 messageDeferred = (enableBufferingIncomingMessages.get() && handlerInfo ==
null);
348 if (messageDeferred) {
357 if (!bufferedMessages.containsKey(channel)) {
358 bufferedMessages.put(channel,
new LinkedList<>());
364 if (!messageDeferred) {
365 dispatchMessageToQueue(channel, handlerInfo,
message, replyId, messageData);
371 Log.
v(
TAG,
"Received message reply from Dart.");
372 BinaryMessenger.BinaryReply
callback = pendingReplies.remove(replyId);
375 Log.
v(
TAG,
"Invoking registered callback for reply from Dart.");
377 if (reply !=
null && reply.isDirect()) {
382 }
catch (Exception ex) {
383 Log.
e(
TAG,
"Uncaught exception in binary message reply handler", ex);
384 }
catch (Error err) {
403 return pendingReplies.size();
410 private static void handleError(Error err) {
411 Thread currentThread = Thread.currentThread();
412 if (currentThread.getUncaughtExceptionHandler() ==
null) {
415 currentThread.getUncaughtExceptionHandler().uncaughtException(currentThread, err);
418 static class Reply implements BinaryMessenger.BinaryReply {
420 private final int replyId;
421 private final AtomicBoolean done =
new AtomicBoolean(
false);
424 this.flutterJNI = flutterJNI;
425 this.replyId = replyId;
430 if (done.getAndSet(
true)) {
431 throw new IllegalStateException(
"Reply already submitted");
434 flutterJNI.invokePlatformMessageEmptyResponseCallback(replyId);
436 flutterJNI.invokePlatformMessageResponseCallback(replyId,
reply,
reply.position());
static const uint8_t buffer[]