Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,26 @@ POSTHOG_PROJECT_KEY=
# DEPOT_TOKEN=<Depot org token>
# DEV_OTEL_EXPORTER_OTLP_ENDPOINT="http://0.0.0.0:4318"
# These are needed for the object store (for handling large payloads/outputs)
# Default provider (backward compatible - no protocol prefix)
# OBJECT_STORE_BASE_URL="https://{bucket}.{accountId}.r2.cloudflarestorage.com"
# OBJECT_STORE_ACCESS_KEY_ID=
# OBJECT_STORE_SECRET_ACCESS_KEY=
# OBJECT_STORE_REGION=auto
# OBJECT_STORE_SERVICE=s3
# OBJECT_STORE_DEFAULT_PROTOCOL=s3 # Optional: protocol to use for new uploads (e.g., "s3", "r2")
#
# Named providers (protocol-prefixed data) - optional for multi-provider support
# OBJECT_STORE_S3_BASE_URL=https://s3.amazonaws.com
# OBJECT_STORE_S3_ACCESS_KEY_ID=
# OBJECT_STORE_S3_SECRET_ACCESS_KEY=
# OBJECT_STORE_S3_REGION=us-east-1
# OBJECT_STORE_S3_SERVICE=s3
#
# OBJECT_STORE_R2_BASE_URL=https://{bucket}.{accountId}.r2.cloudflarestorage.com
# OBJECT_STORE_R2_ACCESS_KEY_ID=
# OBJECT_STORE_R2_SECRET_ACCESS_KEY=
# OBJECT_STORE_R2_REGION=auto
# OBJECT_STORE_R2_SERVICE=s3
# CHECKPOINT_THRESHOLD_IN_MS=10000

# These control the server-side internal telemetry
Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ const EnvironmentSchema = z
OBJECT_STORE_REGION: z.string().optional(),
OBJECT_STORE_SERVICE: z.string().default("s3"),

// Protocol to use for new uploads (e.g., "s3", "r2"). Data without protocol uses default provider above.
// If specified, you must configure the corresponding provider using OBJECT_STORE_{PROTOCOL}_* env vars.
// Example: OBJECT_STORE_DEFAULT_PROTOCOL=s3 requires OBJECT_STORE_S3_BASE_URL, OBJECT_STORE_S3_ACCESS_KEY_ID, etc.
// Enables zero-downtime migration between providers (old data keeps working, new data uses new provider).
OBJECT_STORE_DEFAULT_PROTOCOL: z.string().regex(/^[a-z0-9]+$/).optional(),

ARTIFACTS_OBJECT_STORE_BUCKET: z.string().optional(),
ARTIFACTS_OBJECT_STORE_BASE_URL: z.string().optional(),
ARTIFACTS_OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { generatePresignedUrl } from "~/v3/r2.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.packets.$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { generatePresignedUrl } from "~/v3/r2.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";

const ParamsSchema = z.object({
"*": z.string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { basename } from "node:path";
import { z } from "zod";
import { prisma } from "~/db.server";
import { requireUserId } from "~/services/session.server";
import { generatePresignedRequest } from "~/v3/r2.server";
import { generatePresignedRequest } from "~/v3/objectStore.server";

const ParamSchema = z.object({
environmentId: z.string(),
Expand Down
28 changes: 17 additions & 11 deletions apps/webapp/app/runEngine/concerns/batchPayloads.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore, r2 } from "~/v3/r2.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { hasObjectStoreClient, uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { startActiveSpan } from "~/v3/tracer.server";

export type BatchPayloadProcessResult = {
/** The processed payload - either the original or an R2 path */
Expand Down Expand Up @@ -31,7 +31,7 @@ export class BatchPayloadProcessor {
* If not available, large payloads will be stored inline (which may fail for very large payloads).
*/
isObjectStoreAvailable(): boolean {
return r2 !== undefined && env.OBJECT_STORE_BASE_URL !== undefined;
return hasObjectStoreClient();
}

/**
Expand Down Expand Up @@ -103,11 +103,17 @@ export class BatchPayloadProcessor {
};
}

// Upload to R2
// Upload to object store
const filename = `batch_${batchId}/item_${itemIndex}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment)
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(
filename,
packet.data,
packet.dataType,
environment,
env.OBJECT_STORE_DEFAULT_PROTOCOL
)
);

if (uploadError) {
Expand All @@ -125,18 +131,18 @@ export class BatchPayloadProcessor {
);
}

logger.debug("Batch item payload offloaded to R2", {
logger.debug("Batch item payload offloaded to object store", {
batchId,
itemIndex,
filename,
filename: uploadedFilename,
size,
});

span.setAttribute("wasOffloaded", true);
span.setAttribute("offloadPath", filename);
span.setAttribute("offloadPath", uploadedFilename);

return {
payload: filename,
payload: uploadedFilename!,
payloadType: "application/store",
wasOffloaded: true,
size,
Expand Down
8 changes: 4 additions & 4 deletions apps/webapp/app/runEngine/concerns/payloads.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/
import { PayloadProcessor, TriggerTaskRequest } from "../types";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore } from "~/v3/r2.server";
import { uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { ServiceValidationError } from "~/v3/services/common.server";

export class DefaultPayloadProcessor implements PayloadProcessor {
Expand Down Expand Up @@ -31,16 +31,16 @@ export class DefaultPayloadProcessor implements PayloadProcessor {

const filename = `${request.friendlyId}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment)
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment, env.OBJECT_STORE_DEFAULT_PROTOCOL)
);

if (uploadError) {
throw new ServiceValidationError("Failed to upload large payload to object store", 500); // This is retryable
}

return {
data: filename,
data: uploadedFilename!,
dataType: "application/store",
};
});
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/runEngine/services/batchTrigger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { env } from "~/env.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { batchTriggerWorker } from "~/v3/batchTriggerWorker.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/objectStore.server";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
import { TriggerTaskService } from "../../v3/services/triggerTask.server";
import { startActiveSpan } from "../../v3/tracer.server";
Expand Down
Loading
Loading