Throttling strategy for requests in a springboot app deployed on GCP cloud run

I have a springboot3 app which is deployed on GCP Cloud Run and it connects to GCP Cloud SQL Postgre flavor. Request to my API looks like this { "filters": [ { "callDate": { "range": [{ "from": "2024-05-01", "to": "2024-05-02" }] } }, { "recordedUsers": { "matchType": "any", "users": [ { "email": "a@b.com" }, { "email": "c@d.com" } ] } } ], "features": [ { "name": "metadata" }, { "name": "audio" }, { "name": "transcription" } ] } Below is the flow of a request to my API : 1. Request comes in. registered in database. 2. Different features asked by client in request are registered in database. 3. A trackingId is returned to the user as response, so that user can use it for next set of APIs, while request continues asychronously as stated below. 4. Next it is passed to Orchestrator class, which depending on the features in request, orchesrates the flow of request. 5. Request is sent to MetadataHandler which publishes it to a PUBSUB whch is subscribed by a Cloud Function. 6. Cloud Function processes the request and puts the response on PUBSUB which is subscribed by my service. 7. Database is updated accrodingly and orchestrator is informed for next step. 8. The response for Metadata request contains multiple interaction ids so now batches are made from those interaction ids.There can be 1 to 500 interactions ids being returned per request. 9. Each batch is sent to AudioHandler which publishes it to a PUBSUB whch is subscribed by a Cloud Function. 10.Cloud Function processes the request and puts the response on PUBSUB which is subscribed by my service. 11.The response however is not sent for the atch sent in request, but rather sent for individual interaction id in the batch one by one. 12.Database is updated accrodingly and orchestrator is informed for next step. 13.From here onwards the request has only 1 interaction id and not batch 14.Request sent to FlacConversionHandler -> PUSUB -> CLoud Function -> Google API 15.Response received updated in database passed to Orchestrator 16.Request sent to TranscriptionHandler -> PUSUB -> CLoud Function -> Google API 17.Response received updated in database I have implemented throttling on all steps where I send a request to PUBUSUB because I don't want to overwhelm the systems behind the respective CLoud Functions at various stages. Below is how the code looks like: @Component @Data public class ThrottlingConfig { @Value("${throttle.audio-download}") private long audioDownloadThrottlingThreshold; @Value("${throttle.flac-conversion}") private long flacConversionThrottlingThreshold; @Value("${throttle.transcription}") private long transcriptionThrottlingThreshold; @Value("${throttle.retry.interval.audio-download.milliseconds}") private long retryIntervalAudioDownloadMilliseconds; @Value("${throttle.retry.interval.flac-conversion.milliseconds}") private long retryIntervalFlacConversionMilliseconds; @Value("${throttle.retry.interval.transcription.milliseconds}") private long retryIntervalTranscriptionMilliseconds; public Duration getRetryInterval(ThrottleFeatureEnum feature) { return switch (feature) { case DOWNLOAD_AUDIO -> Duration.ofMillis(retryIntervalAudioDownloadMilliseconds); case FLAC_CONVERSION -> Duration.ofMillis(retryIntervalFlacConversionMilliseconds); case GET_TRANSCRIPTION -> Duration.ofMillis(retryIntervalTranscriptionMilliseconds); }; } } @Component @Slf4j public class FeatureRequestThrottler implements Throttler { private final ThrottlingConfig throttlingConfig; private final CallAudioDownloadBatchInteractionRepository interactionRepository; private final AudioRepository audioRepository; private final AudioFeatureRepository audioFeatureRepository; private final ApplicationProperties applicationProperties; private final RetryRegistry retryRegistry; private final TimeLimiter timeLimiter; private final MeterRegistry meterRegistry; private final Semaphore databaseQuerySemaphore = new Semaphore(1); public FeatureRequestThrottler(CallAudioDownloadBatchInteractionRepository interactionRepository, AudioRepository audioRepository, AudioFeatureRepository audioFeatureRepository, ThrottlingConfig throttlingConfig, ApplicationProperties applicationProperties, RetryRegistry retryRegistry, TimeLimiter timeLimiter, MeterRegistry meterRegistry) { this.interactionRepository = interactionRepository; this.audioRepository = audioRepository; t

Jun 12, 2025 - 17:30
 0

I have a springboot3 app which is deployed on GCP Cloud Run and it connects to GCP Cloud SQL Postgre flavor.

Request to my API looks like this

{
  "filters": [
    {
      "callDate": {
        "range": [{
          "from": "2024-05-01",
          "to": "2024-05-02"
        }]
      }
    },
    {
      "recordedUsers": {
        "matchType": "any",
        "users": [
          {
            "email": "a@b.com"
          },
          {
            "email": "c@d.com"
          }
        ]
      }
    }
  ],
  "features": [
    {
      "name": "metadata"
    },
    {
      "name": "audio"
    },
    {
      "name": "transcription"
    }
  ]
}

Below is the flow of a request to my API :
1. Request comes in. registered in database.
2. Different features asked by client in request are registered in database. 
3. A trackingId is returned to the user as response, so that user can use it for next set of APIs, while request continues asychronously as stated below.
4. Next it is passed to Orchestrator class, which depending on the features in request, orchesrates the flow of request.
5. Request is sent to MetadataHandler which publishes it to a PUBSUB whch is subscribed by a Cloud Function.
6. Cloud Function processes the request and puts the response on PUBSUB which is subscribed by my service.
7. Database is updated accrodingly and orchestrator is informed for next step.
8. The response for Metadata request contains multiple interaction ids so now batches are made from those interaction ids.There can be 1 to 500 interactions ids being returned per request.
9. Each batch is sent to AudioHandler which publishes it to a PUBSUB whch is subscribed by a Cloud Function.
10.Cloud Function processes the request and puts the response on PUBSUB which is subscribed by my service.
11.The response however is not sent for the atch sent in request, but rather sent for individual interaction id in the batch one by one.
12.Database is updated accrodingly and orchestrator is informed for next step.
13.From here onwards the request has only 1 interaction id and not batch
14.Request sent to FlacConversionHandler -> PUSUB -> CLoud Function -> Google API 
15.Response received updated in database passed to Orchestrator
16.Request sent to TranscriptionHandler -> PUSUB -> CLoud Function -> Google API 
17.Response received updated in database

I have implemented throttling on all steps where I send a request to PUBUSUB because I don't want to overwhelm the systems behind the respective CLoud Functions at various stages. Below is how the code looks like:

@Component
@Data
public class ThrottlingConfig {

    @Value("${throttle.audio-download}")
    private long audioDownloadThrottlingThreshold;

    @Value("${throttle.flac-conversion}")
    private long flacConversionThrottlingThreshold;

    @Value("${throttle.transcription}")
    private long transcriptionThrottlingThreshold;

    @Value("${throttle.retry.interval.audio-download.milliseconds}")
    private long retryIntervalAudioDownloadMilliseconds;

    @Value("${throttle.retry.interval.flac-conversion.milliseconds}")
    private long retryIntervalFlacConversionMilliseconds;

    @Value("${throttle.retry.interval.transcription.milliseconds}")
    private long retryIntervalTranscriptionMilliseconds;

    public Duration getRetryInterval(ThrottleFeatureEnum feature) {
        return switch (feature) {
            case DOWNLOAD_AUDIO -> Duration.ofMillis(retryIntervalAudioDownloadMilliseconds);
            case FLAC_CONVERSION -> Duration.ofMillis(retryIntervalFlacConversionMilliseconds);
            case GET_TRANSCRIPTION -> Duration.ofMillis(retryIntervalTranscriptionMilliseconds);
        };
    }
}

@Component
@Slf4j
public class FeatureRequestThrottler implements Throttler {

    private final ThrottlingConfig throttlingConfig;
    private final CallAudioDownloadBatchInteractionRepository interactionRepository;
    private final AudioRepository audioRepository;
    private final AudioFeatureRepository audioFeatureRepository;
    private final ApplicationProperties applicationProperties;
    private final RetryRegistry retryRegistry;
    private final TimeLimiter timeLimiter;
    private final MeterRegistry meterRegistry;
    private final Semaphore databaseQuerySemaphore = new Semaphore(1);

    public FeatureRequestThrottler(CallAudioDownloadBatchInteractionRepository interactionRepository,
                                   AudioRepository audioRepository,
                                   AudioFeatureRepository audioFeatureRepository,
                                   ThrottlingConfig throttlingConfig,
                                   ApplicationProperties applicationProperties,
                                   RetryRegistry retryRegistry,
                                   TimeLimiter timeLimiter,
                                   MeterRegistry meterRegistry) {
        this.interactionRepository = interactionRepository;
        this.audioRepository = audioRepository;
        this.audioFeatureRepository = audioFeatureRepository;
        this.throttlingConfig = throttlingConfig;
        this.applicationProperties = applicationProperties;
        this.retryRegistry = retryRegistry;
        this.timeLimiter = timeLimiter;
        this.meterRegistry = meterRegistry;
    }

    @Override
    public void throttle(ThrottleFeatureEnum feature, String requestId, String identifier) {

        Retry retry = retryRegistry.retry(feature.name(), createRetryConfig(feature));

        try {
            Callable throttlingCallable = () -> checkThrottleCondition(feature, requestId, identifier);

            Callable timeLimitedCallable = TimeLimiter.decorateFutureSupplier(
                    timeLimiter,
                    () -> CompletableFuture.supplyAsync(() -> {
                        try {
                            return throttlingCallable.call();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    })
            );

            Boolean result = Retry.decorateCallable(retry, timeLimitedCallable).call();

            if (!result) {
                throw new ThrottlingException("Throttle condition not met within allowed time for feature {}" + feature);
            }

        } catch (Exception e) {
            meterRegistry.counter("throttler.failures", "feature", feature.name()).increment();
            log.error("Throttling failed for feature {} with error {}", feature, e);
        }
    }

    private RetryConfig createRetryConfig(ThrottleFeatureEnum feature) {
        return RetryConfig.custom()
                .maxAttempts(Integer.MAX_VALUE)
                .waitDuration(Duration.ofMillis(throttlingConfig.getRetryInterval(feature).toMillis()))
                .retryExceptions(ThrottlingException.class)
                .failAfterMaxAttempts(false)
                .build();
    }

    private boolean checkThrottleCondition(ThrottleFeatureEnum feature,
                                           String requestId,
                                           String identifier) {
        meterRegistry.counter("throttler.checks", "feature", feature.name()).increment();

        long outstandingRequestsCount = getOutstandingRequestsCount(feature, requestId, identifier);
        long threshold = getMaxOutstandingRequestsThreshold(feature);

        if (outstandingRequestsCount < threshold) {
            log.debug("Permit granted for {} ({} < {})", feature, outstandingRequestsCount, threshold);
            meterRegistry.counter("throttler.grants", "feature", feature.name()).increment();
            return true;
        }

        log.debug("Throttle active for {} ({} >= {})", feature, outstandingRequestsCount, threshold);
        return false;
    }

    private long getOutstandingRequestsCount(ThrottleFeatureEnum feature, String requestId, String identifier) {
        try {
            databaseQuerySemaphore.acquire();
            return switch (feature) {
                case DOWNLOAD_AUDIO -> interactionRepository.countByDownloadSuccessIsNull();
                case FLAC_CONVERSION, GET_TRANSCRIPTION -> {
                    Audio audio = audioRepository.findByAudioKey_RequestIdAndInteractionId(requestId, identifier);
                    yield feature == ThrottleFeatureEnum.FLAC_CONVERSION
                            ? getFeatureCount(audio, AudioFeatureEnum.CONVERT_TO_FLAC)
                            : getFeatureCount(audio, AudioFeatureEnum.RAW_TRANSCRIPTION);
                }
            };
        } catch (InterruptedException e) {
            log.error("Throttling failed for feature {} with error {}", feature, e);
            Thread.currentThread().interrupt();
            throw new RuntimeException("Thread interrupted while waiting for database access", e);
        } finally {
            databaseQuerySemaphore.release();
        }
    }

    private long getFeatureCount(Audio audio, AudioFeatureEnum featureEnum) {
        return audioFeatureRepository.countByKey_AudioIdAndKey_FeatureAndEndTimeUtcIsNull(
                audio.getAudioKey().getAudioId(), featureEnum);
    }

    private long getMaxOutstandingRequestsThreshold(ThrottleFeatureEnum feature) {
        return switch (feature) {
            case DOWNLOAD_AUDIO -> throttlingConfig.getAudioDownloadThrottlingThreshold() - applicationProperties.getBatchSize();
            case FLAC_CONVERSION -> throttlingConfig.getFlacConversionThrottlingThreshold();
            case GET_TRANSCRIPTION -> throttlingConfig.getTranscriptionThrottlingThreshold();
        };
    }
}


 // here in my Handler class, this is how I throttle requests just before publishinh them to pubsub
 public void requestAudioDownload(String requestId, String audioBatchId, String runId) {

        log.info("Requesting download of audio files for requestId {}, batchId {}, runId {}", requestId, audioBatchId, runId);

        AudioDownloadRequest audioDownloadRequest = downloadRequestedTransaction.execute(requestId, audioBatchId, runId);

        throttler.throttle(ThrottleFeatureEnum.DOWNLOAD_AUDIO, requestId, audioBatchId);
        publishAudioDownloadRequest(audioDownloadRequest);

        log.info("Download requested of audio files for requestId {}, batchId {}, runId {}", requestId, audioBatchId, runId);
    }

    private void publishAudioDownloadRequest(AudioDownloadRequest audioDownloadRequest) {
        try {
            String payload = objectMapper.writeValueAsString(audioDownloadRequest);
            log.info("Publishing request for audio download {}",payload);
            audioDownloadRequestPublisher.handleMessage(MessageBuilder.withPayload(payload).build());
        } catch (JsonProcessingException ex) {
            String error = String.format("Failed to publish audio download request: %s", audioDownloadRequest);
            log.error(error, ex);
            throw new AudioDownloadBatchRequestException("Failed to publish audio download request", ex);
        }
    }

Considering multiple instances (will be 2 for now) will be deployed in production, is there a better way to throttle my requests. Also I am just logging the exception after retrying for a time limit as part of my throttling, but I dont feel thats the best way as I am ending up loosing tack of those requests. What can be improved here?