54 {
55 if (!handler) {
57 return;
58 }
59
60
61
62
63 std::shared_ptr<StreamHandler<T>> shared_handler(handler.release());
64 const MethodCodec<T>* codec = codec_;
65 const std::string channel_name = name_;
66 const BinaryMessenger* messenger = messenger_;
68 [shared_handler, codec, channel_name, messenger,
69
70 is_listening = bool(
false)](
const uint8_t*
message,
71 const size_t message_size,
73 constexpr char kOnListenMethod[] = "listen";
74 constexpr char kOnCancelMethod[] = "cancel";
75
77 codec->DecodeMethodCall(
message, message_size);
79 std::cerr
80 << "Unable to construct method call from message on channel: "
81 << channel_name << std::endl;
82 reply(nullptr, 0);
83 return;
84 }
85
86 const std::string& method =
method_call->method_name();
87 if (method.compare(kOnListenMethod) == 0) {
88 if (is_listening) {
89 std::unique_ptr<StreamHandlerError<T>>
error =
90 shared_handler->OnCancel(nullptr);
92 std::cerr << "Failed to cancel existing stream: "
93 << (
error->error_code) <<
", "
94 << (
error->error_message) <<
", "
95 << (
error->error_details);
96 }
97 }
98 is_listening = true;
99
100 std::unique_ptr<std::vector<uint8_t>>
result;
101 auto sink = std::make_unique<EventSinkImplementation>(
102 messenger, channel_name, codec);
103 std::unique_ptr<StreamHandlerError<T>>
error =
105 std::move(sink));
107 result = codec->EncodeErrorEnvelope(
error->error_code,
108 error->error_message,
109 error->error_details.get());
110 } else {
111 result = codec->EncodeSuccessEnvelope();
112 }
114 } else if (method.compare(kOnCancelMethod) == 0) {
115 std::unique_ptr<std::vector<uint8_t>>
result;
116 if (is_listening) {
117 std::unique_ptr<StreamHandlerError<T>>
error =
118 shared_handler->OnCancel(
method_call->arguments());
120 result = codec->EncodeErrorEnvelope(
error->error_code,
121 error->error_message,
122 error->error_details.get());
123 } else {
124 result = codec->EncodeSuccessEnvelope();
125 }
126 is_listening = false;
127 } else {
128 result = codec->EncodeErrorEnvelope(
129 "error", "No active stream to cancel", nullptr);
130 }
132 } else {
133 reply(nullptr, 0);
134 }
135 };
137 }
virtual void SetMessageHandler(const std::string &channel, BinaryMessageHandler handler)=0
G_BEGIN_DECLS G_MODULE_EXPORT FlMethodCall * method_call
const uint8_t uint32_t uint32_t GError ** error
std::function< void(const uint8_t *message, size_t message_size, BinaryReply reply)> BinaryMessageHandler
std::function< void(const uint8_t *reply, size_t reply_size)> BinaryReply