Skip to main content

Documentation Index

Fetch the complete documentation index at: https://fairforge.alquimia.ai/llms.txt

Use this file to discover all available pages before exploring further.

Custom Runners

Create custom runners to execute tests against any AI system by implementing the BaseRunner interface.

BaseRunner Interface

from fair_forge.schemas.runner import BaseRunner
from fair_forge.schemas import Dataset, Batch
from typing import Any

class BaseRunner:
    async def run_batch(
        self,
        batch: Batch,
        session_id: str,
        **kwargs: Any
    ) -> tuple[Batch, bool, float]:
        """Execute a single test case.

        Returns:
            tuple: (updated_batch, success, execution_time_ms)
        """
        raise NotImplementedError

    async def run_dataset(
        self,
        dataset: Dataset,
        **kwargs: Any
    ) -> tuple[Dataset, dict[str, Any]]:
        """Execute all tests in a dataset.

        Returns:
            tuple: (updated_dataset, summary_dict)
        """
        raise NotImplementedError

Example: LLM Runner

A runner for any LangChain-compatible model:
import time
from fair_forge.schemas.runner import BaseRunner
from fair_forge.schemas import Dataset, Batch
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, SystemMessage

class LLMRunner(BaseRunner):
    def __init__(self, model: BaseChatModel, system_prompt: str = ""):
        self.model = model
        self.system_prompt = system_prompt

    async def run_batch(
        self,
        batch: Batch,
        session_id: str,
        **kwargs
    ) -> tuple[Batch, bool, float]:
        start = time.time()

        try:
            messages = []
            if self.system_prompt:
                messages.append(SystemMessage(content=self.system_prompt))
            messages.append(HumanMessage(content=batch.query))

            response = await self.model.ainvoke(messages)

            updated_batch = batch.model_copy(
                update={"assistant": response.content}
            )
            exec_time = (time.time() - start) * 1000
            return updated_batch, True, exec_time

        except Exception as e:
            exec_time = (time.time() - start) * 1000
            updated_batch = batch.model_copy(
                update={"assistant": f"Error: {str(e)}"}
            )
            return updated_batch, False, exec_time

    async def run_dataset(
        self,
        dataset: Dataset,
        **kwargs
    ) -> tuple[Dataset, dict[str, Any]]:
        start = time.time()
        updated_batches = []
        successes = 0
        failures = 0

        for batch in dataset.conversation:
            updated_batch, success, _ = await self.run_batch(
                batch, dataset.session_id, **kwargs
            )
            updated_batches.append(updated_batch)
            if success:
                successes += 1
            else:
                failures += 1

        total_time = (time.time() - start) * 1000
        updated_dataset = dataset.model_copy(
            update={"conversation": updated_batches}
        )

        summary = {
            "session_id": dataset.session_id,
            "total_batches": len(dataset.conversation),
            "successes": successes,
            "failures": failures,
            "total_execution_time_ms": total_time,
            "avg_batch_time_ms": total_time / len(dataset.conversation),
        }

        return updated_dataset, summary

Usage

from langchain_groq import ChatGroq

# Create model
model = ChatGroq(model="llama-3.1-8b-instant", api_key="...")

# Create runner
runner = LLMRunner(
    model=model,
    system_prompt="You are a helpful assistant.",
)

# Execute tests
updated_dataset, summary = await runner.run_dataset(dataset)

Example: REST API Runner

A runner for any REST API:
import httpx
import time
from fair_forge.schemas.runner import BaseRunner

class RestAPIRunner(BaseRunner):
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.client = httpx.AsyncClient(
            headers={"Authorization": f"Bearer {api_key}"}
        )

    async def run_batch(self, batch, session_id, **kwargs):
        start = time.time()

        try:
            response = await self.client.post(
                f"{self.base_url}/chat",
                json={
                    "message": batch.query,
                    "session_id": session_id,
                }
            )
            response.raise_for_status()

            data = response.json()
            updated_batch = batch.model_copy(
                update={"assistant": data["response"]}
            )
            exec_time = (time.time() - start) * 1000
            return updated_batch, True, exec_time

        except Exception as e:
            exec_time = (time.time() - start) * 1000
            return batch, False, exec_time

    async def run_dataset(self, dataset, **kwargs):
        # Standard implementation
        start = time.time()
        updated_batches = []
        successes = failures = 0

        for batch in dataset.conversation:
            updated, success, _ = await self.run_batch(
                batch, dataset.session_id
            )
            updated_batches.append(updated)
            successes += success
            failures += not success

        total_time = (time.time() - start) * 1000

        return dataset.model_copy(
            update={"conversation": updated_batches}
        ), {
            "session_id": dataset.session_id,
            "total_batches": len(dataset.conversation),
            "successes": successes,
            "failures": failures,
            "total_execution_time_ms": total_time,
            "avg_batch_time_ms": total_time / len(dataset.conversation),
        }

Example: Mock Runner

For testing without external calls:
class MockRunner(BaseRunner):
    def __init__(self, responses: dict[str, str] | None = None):
        self.responses = responses or {}

    async def run_batch(self, batch, session_id, **kwargs):
        start = time.time()

        # Use predefined response or generate mock
        response = self.responses.get(
            batch.qa_id,
            f"Mock response for: {batch.query}"
        )

        updated_batch = batch.model_copy(update={"assistant": response})
        exec_time = (time.time() - start) * 1000
        return updated_batch, True, exec_time

    async def run_dataset(self, dataset, **kwargs):
        # Implementation...
        pass

# Usage
mock_runner = MockRunner(responses={
    "q1": "Paris is the capital of France.",
    "q2": "The sky is blue due to Rayleigh scattering.",
})

Parallel Execution

For faster execution with many batches:
import asyncio

class ParallelLLMRunner(BaseRunner):
    def __init__(self, model, max_concurrent: int = 5):
        self.model = model
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def run_batch(self, batch, session_id, **kwargs):
        async with self.semaphore:
            # Rate-limited execution
            return await self._execute(batch, session_id)

    async def run_dataset(self, dataset, **kwargs):
        # Run all batches concurrently
        tasks = [
            self.run_batch(batch, dataset.session_id)
            for batch in dataset.conversation
        ]
        results = await asyncio.gather(*tasks)

        # Process results...
        updated_batches = [r[0] for r in results]
        successes = sum(1 for r in results if r[1])
        # ...

Best Practices

Always return a valid batch even on failure:
except Exception as e:
    return batch.model_copy(
        update={"assistant": f"Error: {e}"}
    ), False, exec_time
Measure time for performance analysis:
start = time.time()
# ... execution ...
exec_time = (time.time() - start) * 1000
Make network calls async for efficiency:
async def run_batch(self, ...):
    response = await self.client.post(...)
Respect API rate limits:
self.semaphore = asyncio.Semaphore(5)
async with self.semaphore:
    # Rate-limited execution

Next Steps

Storage

Save and load results

Examples

See complete examples