Cancelling Tasks

no

Original Documentation

When working with large datasets or complex evaluations, some Ragas operations can take significant time to complete. The cancellation feature allows you to gracefully terminate these long-running tasks when needed, which is especially important in production environments.

Overview#

Ragas provides cancellation support for:

  • evaluate() - Evaluation of datasets with metrics
  • generate_with_langchain_docs() - Test set generation from documents

The cancellation mechanism is thread-safe and allows for graceful termination with partial results when possible.

Basic Usage#

Cancellable Evaluation#

Instead of running evaluation directly, you can get an executor that allows cancellation:

from ragas import evaluate
from ragas.dataset_schema import EvaluationDataset

# Your dataset and metrics
dataset = EvaluationDataset(...)
metrics = [...]

# Get executor instead of running evaluation immediately
executor = evaluate(
    dataset=dataset,
    metrics=metrics,
    return_executor=True  # Key parameter
)

# Now you can:
# - Cancel: executor.cancel()
# - Check status: executor.is_cancelled()
# - Get results: executor.results()  # This blocks until completion

Cancellable Test Set Generation#

Similar approach for test set generation:

from ragas.testset.synthesizers.generate import TestsetGenerator

generator = TestsetGenerator(...)

# Get executor for cancellable generation
executor = generator.generate_with_langchain_docs(
    documents=documents,
    testset_size=100,
    return_executor=True  # Allow access to Executor to cancel
)

# Use the same cancellation interface
executor.cancel()

Production Patterns#

1. Timeout Pattern#

Automatically cancel operations that exceed a time limit:

import threading
import time

def evaluate_with_timeout(dataset, metrics, timeout_seconds=300):
    """Run evaluation with automatic timeout."""
    # Get cancellable executor
    executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)

    results = None
    exception = None

    def run_evaluation():
        nonlocal results, exception
        try:
            results = executor.results()
        except Exception as e:
            exception = e

    # Start evaluation in background thread
    thread = threading.Thread(target=run_evaluation)
    thread.start()

    # Wait for completion or timeout
    thread.join(timeout=timeout_seconds)

    if thread.is_alive():
        print(f"Evaluation exceeded {timeout_seconds}s timeout, cancelling...")
        executor.cancel()
        thread.join(timeout=10)  # Custom timeout as per need
        return None, "timeout"

    return results, exception

# Usage
results, error = evaluate_with_timeout(dataset, metrics, timeout_seconds=600)
if error == "timeout":
    print("Evaluation was cancelled due to timeout")
else:
    print(f"Evaluation completed: {results}")

2. Signal Handler Pattern (Ctrl+C)#

Allow users to cancel with keyboard interrupt:

import signal
import sys

def setup_cancellation_handler():
    """Set up graceful cancellation on Ctrl+C."""
    executor = None

    def signal_handler(signum, frame):
        if executor and not executor.is_cancelled():
            print("\nReceived interrupt signal, cancelling evaluation...")
            executor.cancel()
            print("Cancellation requested. Waiting for graceful shutdown...")
        sys.exit(0)

    # Register signal handler
    signal.signal(signal.SIGINT, signal_handler)

    return lambda exec: setattr(signal_handler, 'executor', exec)

# Usage
set_executor = setup_cancellation_handler()

executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
set_executor(executor)

print("Running evaluation... Press Ctrl+C to cancel gracefully")
try:
    results = executor.results()
    print("Evaluation completed successfully")
except KeyboardInterrupt:
    print("Evaluation was cancelled")

3. Web Application Pattern#

For web applications, cancel operations when requests are aborted:

from flask import Flask, request
import threading
import uuid

app = Flask(__name__)
active_evaluations = {}

@app.route('/evaluate', methods=['POST'])
def start_evaluation():
    # Create unique evaluation ID
    eval_id = str(uuid.uuid4())

    # Get dataset and metrics from request
    dataset = get_dataset_from_request(request)
    metrics = get_metrics_from_request(request)

    # Start cancellable evaluation
    executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
    active_evaluations[eval_id] = executor

    # Start evaluation in background
    def run_eval():
        try:
            results = executor.results()
            # Store results somewhere
            store_results(eval_id, results)
        except Exception as e:
            store_error(eval_id, str(e))
        finally:
            active_evaluations.pop(eval_id, None)

    threading.Thread(target=run_eval).start()

    return {"evaluation_id": eval_id, "status": "started"}

@app.route('/evaluate/<eval_id>/cancel', methods=['POST'])
def cancel_evaluation(eval_id):
    executor = active_evaluations.get(eval_id)
    if executor:
        executor.cancel()
        return {"status": "cancelled"}
    return {"error": "Evaluation not found"}, 404

Advanced Usage#

Checking Cancellation Status#

executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)

# Start in background
def monitor_evaluation():
    while not executor.is_cancelled():
        print("Evaluation still running...")
        time.sleep(5)
    print("Evaluation was cancelled")

threading.Thread(target=monitor_evaluation).start()

# Cancel after some condition
if some_condition():
    executor.cancel()

Partial Results#

When cancellation occurs during execution, you may get partial results:

executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)

try:
    results = executor.results()
    print(f"Completed {len(results)} evaluations")
except Exception as e:
    if executor.is_cancelled():
        print("Evaluation was cancelled - may have partial results")
    else:
        print(f"Evaluation failed: {e}")

Custom Cancellation Logic#

class EvaluationManager:
    def __init__(self):
        self.executors = []

    def start_evaluation(self, dataset, metrics):
        executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
        self.executors.append(executor)
        return executor

    def cancel_all(self):
        """Cancel all running evaluations."""
        for executor in self.executors:
            if not executor.is_cancelled():
                executor.cancel()
        print(f"Cancelled {len(self.executors)} evaluations")

    def cleanup_completed(self):
        """Remove completed executors."""
        self.executors = [ex for ex in self.executors if not ex.is_cancelled()]

# Usage
manager = EvaluationManager()

# Start multiple evaluations
exec1 = manager.start_evaluation(dataset1, metrics)
exec2 = manager.start_evaluation(dataset2, metrics)

# Cancel all if needed
manager.cancel_all()

Best Practices#

1. Always Use Timeouts in Production#

# Good: Always set reasonable timeouts
results, error = evaluate_with_timeout(dataset, metrics, timeout_seconds=1800)  # 30 minutes

# Avoid: Indefinite blocking
results = executor.results()  # Could block forever

2. Handle Cancellation Gracefully#

try:
    results = executor.results()
    process_results(results)
except Exception as e:
    if executor.is_cancelled():
        log_cancellation()
        cleanup_partial_work()
    else:
        log_error(e)
        handle_failure()

3. Provide User Feedback#

def run_with_progress_and_cancellation(executor):
    print("Starting evaluation... Press Ctrl+C to cancel")

    # Monitor progress in background
    def show_progress():
        while not executor.is_cancelled():
            # Show some progress indication
            print(".", end="", flush=True)
            time.sleep(1)

    progress_thread = threading.Thread(target=show_progress)
    progress_thread.daemon = True
    progress_thread.start()

    try:
        return executor.results()
    except KeyboardInterrupt:
        print("\nCancelling...")
        executor.cancel()
        return None

4. Clean Up Resources#

def managed_evaluation(dataset, metrics):
    executor = None
    try:
        executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
        return executor.results()
    except Exception as e:
        if executor:
            executor.cancel()
        raise
    finally:
        # Clean up any temporary resources
        cleanup_temp_files()

Limitations#

  • Async Operations: Cancellation works at the task level, not within individual LLM calls
  • Partial State: Cancelled operations may leave partial results or temporary files
  • Timing: Cancellation is cooperative - tasks need to check for cancellation periodically
  • Dependencies: Some external services may not respect cancellation immediately

Troubleshooting#

Cancellation Not Working#

# Check if cancellation is set
if executor.is_cancelled():
    print("Cancellation was requested")
else:
    print("Cancellation not requested yet")

# Ensure you're calling cancel()
executor.cancel()
assert executor.is_cancelled()

Tasks Still Running After Cancellation#

# Give time for graceful shutdown
executor.cancel()
time.sleep(2)  # Allow tasks to detect cancellation

# Force cleanup if needed
import asyncio
try:
    loop = asyncio.get_running_loop()
    for task in asyncio.all_tasks(loop):
        task.cancel()
except RuntimeError:
    pass  # No event loop running

The cancellation feature provides robust control over long-running Ragas operations, enabling production-ready deployments with proper resource management and user experience.

Link last verified June 7, 2026. View original ↗
Source: RAGAS Docs
Link last verified: 2026-03-04