Documentation Index
Fetch the complete documentation index at: https://fairforge.alquimia.ai/llms.txt
Use this file to discover all available pages before exploring further.
Custom Runners
Create custom runners to execute tests against any AI system by implementing theBaseRunner interface.
BaseRunner Interface
from fair_forge.schemas.runner import BaseRunner
from fair_forge.schemas import Dataset, Batch
from typing import Any
class BaseRunner:
async def run_batch(
self,
batch: Batch,
session_id: str,
**kwargs: Any
) -> tuple[Batch, bool, float]:
"""Execute a single test case.
Returns:
tuple: (updated_batch, success, execution_time_ms)
"""
raise NotImplementedError
async def run_dataset(
self,
dataset: Dataset,
**kwargs: Any
) -> tuple[Dataset, dict[str, Any]]:
"""Execute all tests in a dataset.
Returns:
tuple: (updated_dataset, summary_dict)
"""
raise NotImplementedError
Example: LLM Runner
A runner for any LangChain-compatible model:import time
from fair_forge.schemas.runner import BaseRunner
from fair_forge.schemas import Dataset, Batch
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, SystemMessage
class LLMRunner(BaseRunner):
def __init__(self, model: BaseChatModel, system_prompt: str = ""):
self.model = model
self.system_prompt = system_prompt
async def run_batch(
self,
batch: Batch,
session_id: str,
**kwargs
) -> tuple[Batch, bool, float]:
start = time.time()
try:
messages = []
if self.system_prompt:
messages.append(SystemMessage(content=self.system_prompt))
messages.append(HumanMessage(content=batch.query))
response = await self.model.ainvoke(messages)
updated_batch = batch.model_copy(
update={"assistant": response.content}
)
exec_time = (time.time() - start) * 1000
return updated_batch, True, exec_time
except Exception as e:
exec_time = (time.time() - start) * 1000
updated_batch = batch.model_copy(
update={"assistant": f"Error: {str(e)}"}
)
return updated_batch, False, exec_time
async def run_dataset(
self,
dataset: Dataset,
**kwargs
) -> tuple[Dataset, dict[str, Any]]:
start = time.time()
updated_batches = []
successes = 0
failures = 0
for batch in dataset.conversation:
updated_batch, success, _ = await self.run_batch(
batch, dataset.session_id, **kwargs
)
updated_batches.append(updated_batch)
if success:
successes += 1
else:
failures += 1
total_time = (time.time() - start) * 1000
updated_dataset = dataset.model_copy(
update={"conversation": updated_batches}
)
summary = {
"session_id": dataset.session_id,
"total_batches": len(dataset.conversation),
"successes": successes,
"failures": failures,
"total_execution_time_ms": total_time,
"avg_batch_time_ms": total_time / len(dataset.conversation),
}
return updated_dataset, summary
Usage
from langchain_groq import ChatGroq
# Create model
model = ChatGroq(model="llama-3.1-8b-instant", api_key="...")
# Create runner
runner = LLMRunner(
model=model,
system_prompt="You are a helpful assistant.",
)
# Execute tests
updated_dataset, summary = await runner.run_dataset(dataset)
Example: REST API Runner
A runner for any REST API:import httpx
import time
from fair_forge.schemas.runner import BaseRunner
class RestAPIRunner(BaseRunner):
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.client = httpx.AsyncClient(
headers={"Authorization": f"Bearer {api_key}"}
)
async def run_batch(self, batch, session_id, **kwargs):
start = time.time()
try:
response = await self.client.post(
f"{self.base_url}/chat",
json={
"message": batch.query,
"session_id": session_id,
}
)
response.raise_for_status()
data = response.json()
updated_batch = batch.model_copy(
update={"assistant": data["response"]}
)
exec_time = (time.time() - start) * 1000
return updated_batch, True, exec_time
except Exception as e:
exec_time = (time.time() - start) * 1000
return batch, False, exec_time
async def run_dataset(self, dataset, **kwargs):
# Standard implementation
start = time.time()
updated_batches = []
successes = failures = 0
for batch in dataset.conversation:
updated, success, _ = await self.run_batch(
batch, dataset.session_id
)
updated_batches.append(updated)
successes += success
failures += not success
total_time = (time.time() - start) * 1000
return dataset.model_copy(
update={"conversation": updated_batches}
), {
"session_id": dataset.session_id,
"total_batches": len(dataset.conversation),
"successes": successes,
"failures": failures,
"total_execution_time_ms": total_time,
"avg_batch_time_ms": total_time / len(dataset.conversation),
}
Example: Mock Runner
For testing without external calls:class MockRunner(BaseRunner):
def __init__(self, responses: dict[str, str] | None = None):
self.responses = responses or {}
async def run_batch(self, batch, session_id, **kwargs):
start = time.time()
# Use predefined response or generate mock
response = self.responses.get(
batch.qa_id,
f"Mock response for: {batch.query}"
)
updated_batch = batch.model_copy(update={"assistant": response})
exec_time = (time.time() - start) * 1000
return updated_batch, True, exec_time
async def run_dataset(self, dataset, **kwargs):
# Implementation...
pass
# Usage
mock_runner = MockRunner(responses={
"q1": "Paris is the capital of France.",
"q2": "The sky is blue due to Rayleigh scattering.",
})
Parallel Execution
For faster execution with many batches:import asyncio
class ParallelLLMRunner(BaseRunner):
def __init__(self, model, max_concurrent: int = 5):
self.model = model
self.semaphore = asyncio.Semaphore(max_concurrent)
async def run_batch(self, batch, session_id, **kwargs):
async with self.semaphore:
# Rate-limited execution
return await self._execute(batch, session_id)
async def run_dataset(self, dataset, **kwargs):
# Run all batches concurrently
tasks = [
self.run_batch(batch, dataset.session_id)
for batch in dataset.conversation
]
results = await asyncio.gather(*tasks)
# Process results...
updated_batches = [r[0] for r in results]
successes = sum(1 for r in results if r[1])
# ...
Best Practices
Handle Errors Gracefully
Handle Errors Gracefully
Always return a valid batch even on failure:
except Exception as e:
return batch.model_copy(
update={"assistant": f"Error: {e}"}
), False, exec_time
Track Execution Time
Track Execution Time
Measure time for performance analysis:
start = time.time()
# ... execution ...
exec_time = (time.time() - start) * 1000
Use Async for I/O
Use Async for I/O
Make network calls async for efficiency:
async def run_batch(self, ...):
response = await self.client.post(...)
Implement Rate Limiting
Implement Rate Limiting
Respect API rate limits:
self.semaphore = asyncio.Semaphore(5)
async with self.semaphore:
# Rate-limited execution
Next Steps
Storage
Save and load results
Examples
See complete examples