Skip to content

feat(cloud_functions): add support for cloud functions stream #17214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5bbf13d
chore: add platform interface and method channel implementation for C…
SelaseKay Mar 18, 2025
24acdda
chore: add `httpsCallableStreamFromUrl` and `httpsStreamCallableWithUri`
SelaseKay Mar 18, 2025
fa00dd9
chore: resolve comments
SelaseKay Mar 21, 2025
fa750ed
chore: add Android implementation for Cloud Functions stream
SelaseKay Mar 21, 2025
3e8ac29
chore: resolve formatting issues
SelaseKay Mar 21, 2025
c248cd6
chore: correct variable name
SelaseKay Mar 21, 2025
1ac4533
chore: add support for Cloud Functions Stream(Android)
SelaseKay Mar 23, 2025
d8e0fce
Merge branch 'main' into feat/cloud_functions_stream_support
SelaseKay Mar 24, 2025
95236c8
chore: create dedicated StreamHandler class
SelaseKay Mar 24, 2025
785e019
Merge branch 'main' into feat/cloud_functions_stream_support
SelaseKay Mar 24, 2025
6161988
chore: add streamhandler implementation for ios
SelaseKay Mar 25, 2025
debdc46
Merge branch 'main' into feat/cloud_functions_stream_support
SelaseKay Mar 26, 2025
4d0c10e
chore: add iOS implementation for Cloud Functions stream
SelaseKay Mar 28, 2025
1190fde
chore: add license header to stream handler files
SelaseKay Mar 28, 2025
6bbde2f
chore: web Cloud Functions stream wip
SelaseKay Apr 1, 2025
66be89b
chore: push all
SelaseKay Apr 1, 2025
4804ab9
chore: update functions based on API Doc modification
SelaseKay Apr 2, 2025
4f83c36
chore: clean up code
SelaseKay Apr 3, 2025
213e283
chore: add web package
SelaseKay Apr 3, 2025
de10bcc
Merge branch 'main' into feat/cloud_functions_stream_support
SelaseKay Apr 3, 2025
794d441
chore: add streaming example
SelaseKay Apr 3, 2025
560e3eb
Merge branch 'feat/cloud_functions_stream_support' of github.com:fire…
SelaseKay Apr 3, 2025
6ad0820
chore: fix ci issues
SelaseKay Apr 3, 2025
a9819db
chore: fix ci
SelaseKay Apr 3, 2025
295c6c6
chore: fix cloud function test
SelaseKay Apr 3, 2025
7a5ad4e
chore: add missing doc
SelaseKay Apr 4, 2025
45fea6f
chore: fixes and clean up
SelaseKay Apr 9, 2025
047354a
chore: add e2e for Cloud Functions Stream
SelaseKay Apr 10, 2025
045150d
chore: fix formatting issue
SelaseKay Apr 10, 2025
69ef58f
chore: add more tests and fix timeout for Android
SelaseKay Apr 11, 2025
693aa89
chore: add test for map and list
SelaseKay Apr 11, 2025
aa77160
chore: fix test
SelaseKay Apr 11, 2025
06c1c9c
chore: update year to 2025 in files
SelaseKay Apr 16, 2025
1404359
chore(web): add support for abort signal
SelaseKay Apr 17, 2025
43c223c
chore: resolve comments and add test for Abort
SelaseKay Apr 17, 2025
df743ee
chore: fix test
SelaseKay Apr 17, 2025
c5136d6
chore: fix test
SelaseKay Apr 17, 2025
3964f3b
chore: update copyright year
SelaseKay Apr 18, 2025
f3af9e8
chore: print error to console
SelaseKay Apr 25, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,923 changes: 971 additions & 1,952 deletions .github/workflows/scripts/functions/package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions .github/workflows/scripts/functions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
},
"main": "lib/index.js",
"dependencies": {
"firebase-admin": "^11.5.0",
"firebase-functions": "^4.5.0"
"firebase-admin": "^13.2.0",
"firebase-functions": "^6.3.2"
},
"devDependencies": {
"firebase-functions-test": "^0.2.0",
Expand Down
127 changes: 95 additions & 32 deletions .github/workflows/scripts/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,99 @@ import * as assert from 'assert';
import * as functions from 'firebase-functions';
import * as functionsv2 from 'firebase-functions/v2';


// For example app.
// noinspection JSUnusedGlobalSymbols
export const listFruit = functions.https.onCall(() => {
return ['Apple', 'Banana', 'Cherry', 'Date', 'Fig', 'Grapes'];
});

export const listfruits2ndgen = functionsv2.https.onCall(() => {
return ['Apple', 'Banana', 'Cherry', 'Date', 'Fig', 'Grapes'];
export const listfruits2ndgen = functionsv2.https.onCall((res, req) => {
const fruitList = ['Apple', 'Banana', 'Cherry', 'Date', 'Fig', 'Grapes'];
const allFruits = fruitList.map(async (fruit) => {
if (res.acceptsStreaming) {
req?.sendChunk(fruit)
}
})
return Promise.all(allFruits);
});

// For e2e testing a custom region.
// noinspection JSUnusedGlobalSymbols
export const testFunctionCustomRegion = functions
.region('europe-west1')
.https.onCall(() => 'europe-west1');

export const testFunctionCustomRegion = functions.https.onCall(
{
region: 'europe-west1'
},
() => 'europe-west1'
);

// For e2e testing timeouts.
export const testFunctionTimeout = functions.https.onCall((data) => {
export const testFunctionTimeout = functions.https.onCall((req, res) => {
const data = req.data
console.log(JSON.stringify({ data }));
return new Promise((resolve, reject) => {
if (data && data.testTimeout) {
setTimeout(
() => resolve({ timeLimit: 'exceeded' }),
parseInt(data.testTimeout, 10)
);
} else {
reject(
new functions.https.HttpsError(
'invalid-argument',
'testTimeout must be provided.'
)
);
}

const timeoutMs = parseInt(data?.testTimeout, 10);

if (isNaN(timeoutMs)) {
throw new functions.https.HttpsError(
'invalid-argument',
'testTimeout must be provided.'
);
}

if (req.acceptsStreaming) {
setTimeout(() => {
res?.sendChunk({ timeLimit: 'exceeded' });
}, timeoutMs);

return new Promise((resolve) => {
setTimeout(resolve, timeoutMs + 100);
});
}

return new Promise((resolve) => {
setTimeout(() => resolve({ timeLimit: 'exceeded' }), timeoutMs);
});

});

// For e2e testing errors & return values.
// noinspection JSUnusedGlobalSymbols
export const testFunctionDefaultRegion = functions.https.onCall((data) => {
export const testFunctionDefaultRegion = functions.https.onCall((req, res) => {
const data = req.data;
console.log(JSON.stringify({ data }));
if (typeof data === 'undefined') {
return 'undefined';
}

const sendResponse = (value: any) => {
if (req.acceptsStreaming && res) {
res.sendChunk(value);
return value;
}
return value;
};

if (typeof data === 'string') {
return 'string';
return sendResponse('string');
}

if (typeof data === 'number') {
return 'number';
return sendResponse('number');
}

if (typeof data === 'boolean') {
return 'boolean';
return sendResponse('boolean');
}

if (data === null) {
return 'null';
return sendResponse('null');
}

if (Array.isArray(data)) {
return 'array';
return sendResponse('array');
}

if(data.type === 'rawData') {
return data;
if (data.type === 'rawData') {
return sendResponse(data);
}

const sampleData: {
Expand Down Expand Up @@ -153,9 +180,45 @@ export const testFunctionDefaultRegion = functions.https.onCall((data) => {
);
}

return outputData;
return sendResponse(outputData);
});

export const testMapConvertType = functions.https.onCall((data) => ({
foo: 'bar',
}));

export const testStream = functions.https.onCall((req, res) => {
const data = req.data;
if (data === null || undefined) {
if (req.acceptsStreaming) {
res?.sendChunk('null');
}
return
}

const results = [];
results.push(data)

const allResults = results.map(async (result) => {
if (req.acceptsStreaming) {
res?.sendChunk(result);
}
return result;
});
return Promise.all(allResults);
})

export const testStreamResponse = functions.https.onCall(async (request, response) => {
const fruits = ['Apple', 'Mango', 'Banana']

const allFruits = fruits.map(async (fruit) => {
// Stream each fruit as it resolves!
if (request.acceptsStreaming) {
response?.sendChunk(fruit);
}
return fruit;
});

// Fallback for non-streaming clients
return Promise.all(allFruits);
});
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ android {
implementation platform("com.google.firebase:firebase-bom:${getRootProjectExtOrCoreProperty("FirebaseSDKVersion", firebaseCoreProject)}")
implementation 'com.google.firebase:firebase-functions'
implementation 'androidx.annotation:annotation:1.7.0'
implementation 'org.reactivestreams:reactive-streams:1.0.4'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we would need an external library to handle this? We managed to do all the other implementations without it AFAIK/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The other ones provided a way to add listeners directly (eg. addOnConfigUpdatedListener) whereas this one doesn't. It returns a Publisher so this is the only way around it AFAIK.
Reference: https://firebase.google.com/docs/reference/kotlin/com/google/firebase/functions/HttpsCallableReference#summary

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2025 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package io.flutter.plugins.firebase.functions;

import android.net.Uri;
import com.google.firebase.functions.FirebaseFunctions;
import com.google.firebase.functions.HttpsCallableOptions;
import com.google.firebase.functions.HttpsCallableReference;
import com.google.firebase.functions.StreamResponse;
import io.flutter.plugin.common.EventChannel;
import io.flutter.plugin.common.EventChannel.StreamHandler;
import java.net.URL;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;

public class FirebaseFunctionsStreamHandler implements StreamHandler {

private final FirebaseFunctions firebaseFunctions;

private StreamResponseSubscriber subscriber;

public FirebaseFunctionsStreamHandler(FirebaseFunctions functions) {
this.firebaseFunctions = functions;
}

@Override
public void onListen(Object arguments, EventChannel.EventSink events) {
@SuppressWarnings("unchecked")
Map<String, Object> argumentsMap = (Map<String, Object>) arguments;
httpsStreamCall(argumentsMap, events);
}

@Override
public void onCancel(Object arguments) {
subscriber.cancel();
}

private void httpsStreamCall(Map<String, Object> arguments, EventChannel.EventSink events) {
try {

String functionName = (String) arguments.get("functionName");
String functionUri = (String) arguments.get("functionUri");
String origin = (String) arguments.get("origin");
Integer timeout = (Integer) arguments.get("timeout");
Object parameters = arguments.get("parameters");
boolean limitedUseAppCheckToken =
(boolean) Objects.requireNonNull(arguments.get("limitedUseAppCheckToken"));

if (origin != null) {
Uri originUri = Uri.parse(origin);
firebaseFunctions.useEmulator(originUri.getHost(), originUri.getPort());
}

HttpsCallableReference httpsCallableReference;
HttpsCallableOptions options =
new HttpsCallableOptions.Builder()
.setLimitedUseAppCheckTokens(limitedUseAppCheckToken)
.build();

Publisher<StreamResponse> publisher;
if (functionName != null) {
httpsCallableReference = firebaseFunctions.getHttpsCallable(functionName, options);
publisher = httpsCallableReference.stream(parameters);
} else if (functionUri != null) {
httpsCallableReference =
firebaseFunctions.getHttpsCallableFromUrl(new URL(functionUri), options);
publisher = httpsCallableReference.stream();
} else {
throw new IllegalArgumentException("Either functionName or functionUri must be set");
}

if (timeout != null) {
httpsCallableReference.setTimeout(timeout.longValue(), TimeUnit.MILLISECONDS);
}
subscriber = new StreamResponseSubscriber(events);
publisher.subscribe(subscriber);
} catch (Exception e) {
events.error("firebase_functions", e.getMessage(), null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.firebase.functions.HttpsCallableReference;
import com.google.firebase.functions.HttpsCallableResult;
import io.flutter.embedding.engine.plugins.FlutterPlugin;
import io.flutter.plugin.common.EventChannel;
import io.flutter.plugin.common.MethodCall;
import io.flutter.plugin.common.MethodChannel;
import io.flutter.plugin.common.MethodChannel.MethodCallHandler;
Expand All @@ -35,6 +36,7 @@ public class FlutterFirebaseFunctionsPlugin

private static final String METHOD_CHANNEL_NAME = "plugins.flutter.io/firebase_functions";
private MethodChannel channel;
private FlutterPluginBinding pluginBinding;

/**
* Default Constructor.
Expand All @@ -45,6 +47,7 @@ public FlutterFirebaseFunctionsPlugin() {}

@Override
public void onAttachedToEngine(@NonNull FlutterPluginBinding binding) {
pluginBinding = binding;
channel = new MethodChannel(binding.getBinaryMessenger(), METHOD_CHANNEL_NAME);
channel.setMethodCallHandler(this);
}
Expand All @@ -55,6 +58,16 @@ public void onDetachedFromEngine(@NonNull FlutterPluginBinding binding) {
channel = null;
}

private void registerEventChannel(Map<String, Object> arguments) {
final String eventId = (String) Objects.requireNonNull(arguments.get("eventChannelId"));
final String eventChannelName = METHOD_CHANNEL_NAME + "/" + eventId;
final EventChannel eventChannel =
new EventChannel(pluginBinding.getBinaryMessenger(), eventChannelName);
FirebaseFunctions functions = getFunctions(arguments);
FirebaseFunctionsStreamHandler streamHandler = new FirebaseFunctionsStreamHandler(functions);
eventChannel.setStreamHandler(streamHandler);
}

private FirebaseFunctions getFunctions(Map<String, Object> arguments) {
String appName = (String) Objects.requireNonNull(arguments.get("appName"));
String region = (String) Objects.requireNonNull(arguments.get("region"));
Expand Down Expand Up @@ -116,24 +129,26 @@ private Task<Object> httpsFunctionCall(Map<String, Object> arguments) {

@Override
public void onMethodCall(MethodCall call, @NonNull final Result result) {
if (!call.method.equals("FirebaseFunctions#call")) {
if (call.method.equals("FirebaseFunctions#registerEventChannel")) {
registerEventChannel(call.arguments());
result.success(null);
} else if (call.method.equals("FirebaseFunctions#call")) {
httpsFunctionCall(call.arguments())
.addOnCompleteListener(
task -> {
if (task.isSuccessful()) {
result.success(task.getResult());
} else {
Exception exception = task.getException();
result.error(
"firebase_functions",
exception != null ? exception.getMessage() : null,
getExceptionDetails(exception));
}
});
} else {
result.notImplemented();
return;
}

httpsFunctionCall(call.arguments())
.addOnCompleteListener(
task -> {
if (task.isSuccessful()) {
result.success(task.getResult());
} else {
Exception exception = task.getException();
result.error(
"firebase_functions",
exception != null ? exception.getMessage() : null,
getExceptionDetails(exception));
}
});
}

private Map<String, Object> getExceptionDetails(@Nullable Exception exception) {
Expand Down
Loading
Loading