Custom Runners
Create custom runners to execute tests against any AI system by implementing theBaseRunner interface.
BaseRunner Interface
Copy
from fair_forge.schemas.runner import BaseRunner
from fair_forge.schemas import Dataset, Batch
from typing import Any
class BaseRunner:
async def run_batch(
self,
batch: Batch,
session_id: str,
**kwargs: Any
) -> tuple[Batch, bool, float]:
"""Execute a single test case.
Returns:
tuple: (updated_batch, success, execution_time_ms)
"""
raise NotImplementedError
async def run_dataset(
self,
dataset: Dataset,
**kwargs: Any
) -> tuple[Dataset, dict[str, Any]]:
"""Execute all tests in a dataset.
Returns:
tuple: (updated_dataset, summary_dict)
"""
raise NotImplementedError
Example: LLM Runner
A runner for any LangChain-compatible model:Copy
import time
from fair_forge.schemas.runner import BaseRunner
from fair_forge.schemas import Dataset, Batch
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage, SystemMessage
class LLMRunner(BaseRunner):
def __init__(self, model: BaseChatModel, system_prompt: str = ""):
self.model = model
self.system_prompt = system_prompt
async def run_batch(
self,
batch: Batch,
session_id: str,
**kwargs
) -> tuple[Batch, bool, float]:
start = time.time()
try:
messages = []
if self.system_prompt:
messages.append(SystemMessage(content=self.system_prompt))
messages.append(HumanMessage(content=batch.query))
response = await self.model.ainvoke(messages)
updated_batch = batch.model_copy(
update={"assistant": response.content}
)
exec_time = (time.time() - start) * 1000
return updated_batch, True, exec_time
except Exception as e:
exec_time = (time.time() - start) * 1000
updated_batch = batch.model_copy(
update={"assistant": f"Error: {str(e)}"}
)
return updated_batch, False, exec_time
async def run_dataset(
self,
dataset: Dataset,
**kwargs
) -> tuple[Dataset, dict[str, Any]]:
start = time.time()
updated_batches = []
successes = 0
failures = 0
for batch in dataset.conversation:
updated_batch, success, _ = await self.run_batch(
batch, dataset.session_id, **kwargs
)
updated_batches.append(updated_batch)
if success:
successes += 1
else:
failures += 1
total_time = (time.time() - start) * 1000
updated_dataset = dataset.model_copy(
update={"conversation": updated_batches}
)
summary = {
"session_id": dataset.session_id,
"total_batches": len(dataset.conversation),
"successes": successes,
"failures": failures,
"total_execution_time_ms": total_time,
"avg_batch_time_ms": total_time / len(dataset.conversation),
}
return updated_dataset, summary
Usage
Copy
from langchain_groq import ChatGroq
# Create model
model = ChatGroq(model="llama-3.1-8b-instant", api_key="...")
# Create runner
runner = LLMRunner(
model=model,
system_prompt="You are a helpful assistant.",
)
# Execute tests
updated_dataset, summary = await runner.run_dataset(dataset)
Example: REST API Runner
A runner for any REST API:Copy
import httpx
import time
from fair_forge.schemas.runner import BaseRunner
class RestAPIRunner(BaseRunner):
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.client = httpx.AsyncClient(
headers={"Authorization": f"Bearer {api_key}"}
)
async def run_batch(self, batch, session_id, **kwargs):
start = time.time()
try:
response = await self.client.post(
f"{self.base_url}/chat",
json={
"message": batch.query,
"session_id": session_id,
}
)
response.raise_for_status()
data = response.json()
updated_batch = batch.model_copy(
update={"assistant": data["response"]}
)
exec_time = (time.time() - start) * 1000
return updated_batch, True, exec_time
except Exception as e:
exec_time = (time.time() - start) * 1000
return batch, False, exec_time
async def run_dataset(self, dataset, **kwargs):
# Standard implementation
start = time.time()
updated_batches = []
successes = failures = 0
for batch in dataset.conversation:
updated, success, _ = await self.run_batch(
batch, dataset.session_id
)
updated_batches.append(updated)
successes += success
failures += not success
total_time = (time.time() - start) * 1000
return dataset.model_copy(
update={"conversation": updated_batches}
), {
"session_id": dataset.session_id,
"total_batches": len(dataset.conversation),
"successes": successes,
"failures": failures,
"total_execution_time_ms": total_time,
"avg_batch_time_ms": total_time / len(dataset.conversation),
}
Example: Mock Runner
For testing without external calls:Copy
class MockRunner(BaseRunner):
def __init__(self, responses: dict[str, str] | None = None):
self.responses = responses or {}
async def run_batch(self, batch, session_id, **kwargs):
start = time.time()
# Use predefined response or generate mock
response = self.responses.get(
batch.qa_id,
f"Mock response for: {batch.query}"
)
updated_batch = batch.model_copy(update={"assistant": response})
exec_time = (time.time() - start) * 1000
return updated_batch, True, exec_time
async def run_dataset(self, dataset, **kwargs):
# Implementation...
pass
# Usage
mock_runner = MockRunner(responses={
"q1": "Paris is the capital of France.",
"q2": "The sky is blue due to Rayleigh scattering.",
})
Parallel Execution
For faster execution with many batches:Copy
import asyncio
class ParallelLLMRunner(BaseRunner):
def __init__(self, model, max_concurrent: int = 5):
self.model = model
self.semaphore = asyncio.Semaphore(max_concurrent)
async def run_batch(self, batch, session_id, **kwargs):
async with self.semaphore:
# Rate-limited execution
return await self._execute(batch, session_id)
async def run_dataset(self, dataset, **kwargs):
# Run all batches concurrently
tasks = [
self.run_batch(batch, dataset.session_id)
for batch in dataset.conversation
]
results = await asyncio.gather(*tasks)
# Process results...
updated_batches = [r[0] for r in results]
successes = sum(1 for r in results if r[1])
# ...
Best Practices
Handle Errors Gracefully
Handle Errors Gracefully
Always return a valid batch even on failure:
Copy
except Exception as e:
return batch.model_copy(
update={"assistant": f"Error: {e}"}
), False, exec_time
Track Execution Time
Track Execution Time
Measure time for performance analysis:
Copy
start = time.time()
# ... execution ...
exec_time = (time.time() - start) * 1000
Use Async for I/O
Use Async for I/O
Make network calls async for efficiency:
Copy
async def run_batch(self, ...):
response = await self.client.post(...)
Implement Rate Limiting
Implement Rate Limiting
Respect API rate limits:
Copy
self.semaphore = asyncio.Semaphore(5)
async with self.semaphore:
# Rate-limited execution