Skip to main content

Custom Runners

Create custom runners to execute tests against any AI system by implementing the BaseRunner interface.

BaseRunner Interface

from fair_forge.schemas.runner import BaseRunner
from fair_forge.schemas import Dataset, Batch
from typing import Any

class BaseRunner:
    async def run_batch(
        self,
        batch: Batch,
        session_id: str,
        **kwargs: Any
    ) -> tuple[Batch, bool, float]:
        """Execute a single test case.

        Returns:
            tuple: (updated_batch, success, execution_time_ms)
        """
        raise NotImplementedError

    async def run_dataset(
        self,
        dataset: Dataset,
        **kwargs: Any
    ) -> tuple[Dataset, dict[str, Any]]:
        """Execute all tests in a dataset.

        Returns:
            tuple: (updated_dataset, summary_dict)
        """
        raise NotImplementedError

Example: LLM Runner

A runner for any LangChain-compatible model:
import time
from fair_forge.schemas.runner import BaseRunner
from fair_forge.schemas import Dataset, Batch
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, SystemMessage

class LLMRunner(BaseRunner):
    def __init__(self, model: BaseChatModel, system_prompt: str = ""):
        self.model = model
        self.system_prompt = system_prompt

    async def run_batch(
        self,
        batch: Batch,
        session_id: str,
        **kwargs
    ) -> tuple[Batch, bool, float]:
        start = time.time()

        try:
            messages = []
            if self.system_prompt:
                messages.append(SystemMessage(content=self.system_prompt))
            messages.append(HumanMessage(content=batch.query))

            response = await self.model.ainvoke(messages)

            updated_batch = batch.model_copy(
                update={"assistant": response.content}
            )
            exec_time = (time.time() - start) * 1000
            return updated_batch, True, exec_time

        except Exception as e:
            exec_time = (time.time() - start) * 1000
            updated_batch = batch.model_copy(
                update={"assistant": f"Error: {str(e)}"}
            )
            return updated_batch, False, exec_time

    async def run_dataset(
        self,
        dataset: Dataset,
        **kwargs
    ) -> tuple[Dataset, dict[str, Any]]:
        start = time.time()
        updated_batches = []
        successes = 0
        failures = 0

        for batch in dataset.conversation:
            updated_batch, success, _ = await self.run_batch(
                batch, dataset.session_id, **kwargs
            )
            updated_batches.append(updated_batch)
            if success:
                successes += 1
            else:
                failures += 1

        total_time = (time.time() - start) * 1000
        updated_dataset = dataset.model_copy(
            update={"conversation": updated_batches}
        )

        summary = {
            "session_id": dataset.session_id,
            "total_batches": len(dataset.conversation),
            "successes": successes,
            "failures": failures,
            "total_execution_time_ms": total_time,
            "avg_batch_time_ms": total_time / len(dataset.conversation),
        }

        return updated_dataset, summary

Usage

from langchain_groq import ChatGroq

# Create model
model = ChatGroq(model="llama-3.1-8b-instant", api_key="...")

# Create runner
runner = LLMRunner(
    model=model,
    system_prompt="You are a helpful assistant.",
)

# Execute tests
updated_dataset, summary = await runner.run_dataset(dataset)

Example: REST API Runner

A runner for any REST API:
import httpx
import time
from fair_forge.schemas.runner import BaseRunner

class RestAPIRunner(BaseRunner):
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.client = httpx.AsyncClient(
            headers={"Authorization": f"Bearer {api_key}"}
        )

    async def run_batch(self, batch, session_id, **kwargs):
        start = time.time()

        try:
            response = await self.client.post(
                f"{self.base_url}/chat",
                json={
                    "message": batch.query,
                    "session_id": session_id,
                }
            )
            response.raise_for_status()

            data = response.json()
            updated_batch = batch.model_copy(
                update={"assistant": data["response"]}
            )
            exec_time = (time.time() - start) * 1000
            return updated_batch, True, exec_time

        except Exception as e:
            exec_time = (time.time() - start) * 1000
            return batch, False, exec_time

    async def run_dataset(self, dataset, **kwargs):
        # Standard implementation
        start = time.time()
        updated_batches = []
        successes = failures = 0

        for batch in dataset.conversation:
            updated, success, _ = await self.run_batch(
                batch, dataset.session_id
            )
            updated_batches.append(updated)
            successes += success
            failures += not success

        total_time = (time.time() - start) * 1000

        return dataset.model_copy(
            update={"conversation": updated_batches}
        ), {
            "session_id": dataset.session_id,
            "total_batches": len(dataset.conversation),
            "successes": successes,
            "failures": failures,
            "total_execution_time_ms": total_time,
            "avg_batch_time_ms": total_time / len(dataset.conversation),
        }

Example: Mock Runner

For testing without external calls:
class MockRunner(BaseRunner):
    def __init__(self, responses: dict[str, str] | None = None):
        self.responses = responses or {}

    async def run_batch(self, batch, session_id, **kwargs):
        start = time.time()

        # Use predefined response or generate mock
        response = self.responses.get(
            batch.qa_id,
            f"Mock response for: {batch.query}"
        )

        updated_batch = batch.model_copy(update={"assistant": response})
        exec_time = (time.time() - start) * 1000
        return updated_batch, True, exec_time

    async def run_dataset(self, dataset, **kwargs):
        # Implementation...
        pass

# Usage
mock_runner = MockRunner(responses={
    "q1": "Paris is the capital of France.",
    "q2": "The sky is blue due to Rayleigh scattering.",
})

Parallel Execution

For faster execution with many batches:
import asyncio

class ParallelLLMRunner(BaseRunner):
    def __init__(self, model, max_concurrent: int = 5):
        self.model = model
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def run_batch(self, batch, session_id, **kwargs):
        async with self.semaphore:
            # Rate-limited execution
            return await self._execute(batch, session_id)

    async def run_dataset(self, dataset, **kwargs):
        # Run all batches concurrently
        tasks = [
            self.run_batch(batch, dataset.session_id)
            for batch in dataset.conversation
        ]
        results = await asyncio.gather(*tasks)

        # Process results...
        updated_batches = [r[0] for r in results]
        successes = sum(1 for r in results if r[1])
        # ...

Best Practices

Always return a valid batch even on failure:
except Exception as e:
    return batch.model_copy(
        update={"assistant": f"Error: {e}"}
    ), False, exec_time
Measure time for performance analysis:
start = time.time()
# ... execution ...
exec_time = (time.time() - start) * 1000
Make network calls async for efficiency:
async def run_batch(self, ...):
    response = await self.client.post(...)
Respect API rate limits:
self.semaphore = asyncio.Semaphore(5)
async with self.semaphore:
    # Rate-limited execution

Next Steps