Source code for langchain.smith.evaluation.runner_utils

"""Utilities for running language models or Chains over datasets."""

from __future__ import annotations

import asyncio
import functools
import itertools
import logging
from datetime import datetime
from typing import (
    Any,
    Callable,
    Coroutine,
    Dict,
    Iterator,
    List,
    Optional,
    Sequence,
    Tuple,
    Union,
)
from urllib.parse import urlparse, urlunparse

from langsmith import Client, RunEvaluator
from langsmith.schemas import Dataset, DataType, Example

from langchain.callbacks.base import BaseCallbackHandler
from langchain.callbacks.manager import Callbacks
from langchain.callbacks.tracers.base import BaseTracer
from langchain.callbacks.tracers.evaluation import EvaluatorCallbackHandler
from langchain.callbacks.tracers.langchain import LangChainTracer
from langchain.chains.base import Chain
from langchain.chat_models.openai import ChatOpenAI
from langchain.evaluation.loading import load_evaluator
from langchain.evaluation.schema import EvaluatorType, StringEvaluator
from langchain.schema import ChatResult, LLMResult
from langchain.schema.language_model import BaseLanguageModel
from langchain.schema.messages import BaseMessage, messages_from_dict
from langchain.smith.evaluation.config import EvalConfig, RunEvalConfig
from langchain.smith.evaluation.string_run_evaluator import StringRunEvaluatorChain

logger = logging.getLogger(__name__)

MODEL_OR_CHAIN_FACTORY = Union[Callable[[], Chain], BaseLanguageModel]


[docs]class InputFormatError(Exception): """Raised when the input format is invalid."""
## Shared Utilities def _get_eval_project_url(api_url: str, project_id: str) -> str: """Get the project url from the api url.""" parsed = urlparse(api_url) hostname = parsed.hostname or "" if "api." in hostname: hostname = hostname.replace("api.", "", 1) if "localhost" in hostname: # Remove the port hostname = "localhost" url = urlunparse(parsed._replace(netloc=hostname)) return f"{url}/projects/p/{project_id}?eval=true" def _wrap_in_chain_factory( llm_or_chain_factory: Union[Chain, MODEL_OR_CHAIN_FACTORY], dataset_name: str = "<my_dataset>", ) -> MODEL_OR_CHAIN_FACTORY: """Forgive the user if they pass in a chain without memory instead of a chain factory. It's a common mistake. Raise a more helpful error message as well.""" if isinstance(llm_or_chain_factory, Chain): chain = llm_or_chain_factory chain_class = chain.__class__.__name__ if llm_or_chain_factory.memory is not None: memory_class = chain.memory.__class__.__name__ raise ValueError( "Cannot directly evaluate a chain with stateful memory." " To evaluate this chain, pass in a chain constructor" " that initializes fresh memory each time it is called." " This will safegaurd against information" " leakage between dataset examples." "\nFor example:\n\n" "def chain_constructor():\n" f" new_memory = {memory_class}(...)\n" f" return {chain_class}" "(memory=new_memory, ...)\n\n" f'run_on_dataset("{dataset_name}", chain_constructor, ...)' ) logger.warning( "Directly passing in a chain is not recommended as chains may have state." " This can lead to unexpected behavior as the " "same chain instance could be used across multiple datasets. Instead," " please pass a chain constructor that creates a new " "chain with fresh memory each time it is called. This will safeguard" " against information leakage between dataset examples. " "\nFor example:\n\n" "def chain_constructor():\n" f" return {chain_class}(memory=new_memory, ...)\n\n" f'run_on_dataset("{dataset_name}", chain_constructor, ...)' ) return lambda: chain elif isinstance(llm_or_chain_factory, BaseLanguageModel): return llm_or_chain_factory elif callable(llm_or_chain_factory): _model = llm_or_chain_factory() if isinstance(_model, BaseLanguageModel): return _model return llm_or_chain_factory return llm_or_chain_factory def _first_example(examples: Iterator[Example]) -> Tuple[Example, Iterator[Example]]: """Get the first example while chaining it back and preserving the iterator.""" try: example: Example = next(examples) except StopIteration: raise ValueError("No examples provided.") return example, itertools.chain([example], examples) def _get_prompt(inputs: Dict[str, Any]) -> str: """Get prompt from inputs. Args: inputs: The input dictionary. Returns: A string prompt. Raises: InputFormatError: If the input format is invalid. """ if not inputs: raise InputFormatError("Inputs should not be empty.") prompts = [] if "prompt" in inputs: if not isinstance(inputs["prompt"], str): raise InputFormatError( "Expected string for 'prompt', got" f" {type(inputs['prompt']).__name__}" ) prompts = [inputs["prompt"]] elif "prompts" in inputs: if not isinstance(inputs["prompts"], list) or not all( isinstance(i, str) for i in inputs["prompts"] ): raise InputFormatError( "Expected list of strings for 'prompts'," f" got {type(inputs['prompts']).__name__}" ) prompts = inputs["prompts"] elif len(inputs) == 1: prompt_ = next(iter(inputs.values())) if isinstance(prompt_, str): prompts = [prompt_] elif isinstance(prompt_, list) and all(isinstance(i, str) for i in prompt_): prompts = prompt_ else: raise InputFormatError(f"LLM Run expects string prompt input. Got {inputs}") else: raise InputFormatError( f"LLM Run expects 'prompt' or 'prompts' in inputs. Got {inputs}" ) if len(prompts) == 1: return prompts[0] else: raise InputFormatError( f"LLM Run expects single prompt input. Got {len(prompts)} prompts." ) def _get_messages(inputs: Dict[str, Any]) -> List[BaseMessage]: """Get Chat Messages from inputs. Args: inputs: The input dictionary. Returns: A list of chat messages. Raises: InputFormatError: If the input format is invalid. """ if not inputs: raise InputFormatError("Inputs should not be empty.") if "messages" in inputs: single_input = inputs["messages"] elif len(inputs) == 1: single_input = next(iter(inputs.values())) else: raise InputFormatError( f"Chat Run expects 'messages' in inputs when example has multiple" f" input keys. Got {inputs}" ) if isinstance(single_input, list) and all( isinstance(i, dict) for i in single_input ): raw_messages = [single_input] elif isinstance(single_input, list) and all( isinstance(i, list) for i in single_input ): raw_messages = single_input else: raise InputFormatError( f"Chat Run expects List[dict] or List[List[dict]] values for" f" 'messages' key input. Got {inputs}" ) if len(raw_messages) == 1: return messages_from_dict(raw_messages[0]) else: raise InputFormatError( f"Chat Run expects single List[dict] or List[List[dict]] 'messages'" f" input. Got {len(raw_messages)} messages from inputs {inputs}" ) def _get_project_name( project_name: Optional[str], llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, ) -> str: """ Get the project name. Args: project_name: The project name if manually specified. llm_or_chain_factory: The Chain or language model constructor. Returns: The project name. """ if project_name is not None: return project_name current_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") if isinstance(llm_or_chain_factory, BaseLanguageModel): model_name = llm_or_chain_factory.__class__.__name__ else: model_name = llm_or_chain_factory().__class__.__name__ return f"{current_time}-{model_name}" ## Shared Validation Utilities def _validate_example_inputs_for_language_model( first_example: Example, input_mapper: Optional[Callable[[Dict], Any]], ) -> None: if input_mapper: prompt_input = input_mapper(first_example.inputs) if not isinstance(prompt_input, str) and not ( isinstance(prompt_input, list) and all(isinstance(msg, BaseMessage) for msg in prompt_input) ): raise InputFormatError( "When using an input_mapper to prepare dataset example inputs" " for an LLM or chat model, the output must a single string or" " a list of chat messages." f"\nGot: {prompt_input} of type {type(prompt_input)}." ) else: try: _get_prompt(first_example.inputs) except InputFormatError: try: _get_messages(first_example.inputs) except InputFormatError: raise InputFormatError( "Example inputs do not match language model input format. " "Expected a dictionary with messages or a single prompt." f" Got: {first_example.inputs}" " Please update your dataset OR provide an input_mapper" " to convert the example.inputs to a compatible format" " for the llm or chat model you wish to evaluate." ) def _validate_example_inputs_for_chain( first_example: Example, chain: Chain, input_mapper: Optional[Callable[[Dict], Any]], ) -> None: """Validate that the example inputs match the chain input keys.""" if input_mapper: first_inputs = input_mapper(first_example.inputs) if not isinstance(first_inputs, dict): raise InputFormatError( "When using an input_mapper to prepare dataset example" " inputs for a chain, the mapped value must be a dictionary." f"\nGot: {first_inputs} of type {type(first_inputs)}." ) if not set(first_inputs.keys()) == set(chain.input_keys): raise InputFormatError( "When using an input_mapper to prepare dataset example inputs" " for a chain mapped value must have keys that match the chain's" " expected input keys." f"\nExpected: {chain.input_keys}. Got: {first_inputs.keys()}" ) else: first_inputs = first_example.inputs if len(first_inputs) == 1 and len(chain.input_keys) == 1: # We can pass this through the run method. # Refrain from calling to validate. pass elif not set(first_inputs.keys()) == set(chain.input_keys): raise InputFormatError( "Example inputs do not match chain input keys." " Please provide an input_mapper to convert the example.inputs" " to a compatible format for the chain you wish to evaluate." f"Expected: {chain.input_keys}. " f"Got: {first_inputs.keys()}" ) def _validate_example_inputs( examples: Iterator[Example], llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, input_mapper: Optional[Callable[[Dict], Any]], ) -> Iterator[Example]: """Validate that the example inputs are valid for the model.""" first_example, examples = _first_example(examples) if isinstance(llm_or_chain_factory, BaseLanguageModel): _validate_example_inputs_for_language_model(first_example, input_mapper) else: chain = llm_or_chain_factory() _validate_example_inputs_for_chain(first_example, chain, input_mapper) return examples ## Shared Evaluator Setup Utilities def _setup_evaluation( llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, examples: Iterator[Example], evaluation: Optional[RunEvalConfig], data_type: DataType, ) -> Tuple[Optional[List[RunEvaluator]], Iterator[Example]]: """Configure the evaluators to run on the results of the chain.""" if evaluation: first_example, examples = _first_example(examples) if isinstance(llm_or_chain_factory, BaseLanguageModel): run_inputs, run_outputs = None, None run_type = "llm" else: run_type = "chain" if data_type in (DataType.chat, DataType.llm): raise ValueError( "Cannot evaluate a chain on dataset with " f"data_type={data_type.value}. " "Please specify a dataset with the default 'kv' data type." ) chain = llm_or_chain_factory() run_inputs = chain.input_keys run_outputs = chain.output_keys run_evaluators = _load_run_evaluators( evaluation, run_type, data_type, list(first_example.outputs) if first_example.outputs else None, run_inputs, run_outputs, ) else: # TODO: Create a default helpfulness evaluator run_evaluators = None return run_evaluators, examples def _determine_input_key( config: RunEvalConfig, run_inputs: Optional[List[str]], run_type: str, ) -> Optional[str]: if config.input_key: input_key = config.input_key if run_inputs and input_key not in run_inputs: raise ValueError(f"Input key {input_key} not in run inputs {run_inputs}") elif run_type == "llm": input_key = None elif run_inputs and len(run_inputs) == 1: input_key = run_inputs[0] else: raise ValueError( f"Must specify input key for model with multiple inputs: {run_inputs}" ) return input_key def _determine_prediction_key( config: RunEvalConfig, run_outputs: Optional[List[str]], run_type: str, ) -> Optional[str]: if config.prediction_key: prediction_key = config.prediction_key if run_outputs and prediction_key not in run_outputs: raise ValueError( f"Prediction key {prediction_key} not in run outputs {run_outputs}" ) elif run_type == "llm": prediction_key = None elif run_outputs and len(run_outputs) == 1: prediction_key = run_outputs[0] else: raise ValueError( f"Must specify prediction key for model" f" with multiple outputs: {run_outputs}" ) return prediction_key def _determine_reference_key( config: RunEvalConfig, example_outputs: Optional[List[str]], ) -> Optional[str]: if config.reference_key: reference_key = config.reference_key if example_outputs and reference_key not in example_outputs: raise ValueError( f"Reference key {reference_key} not in Dataset" f" example outputs: {example_outputs}" ) elif example_outputs and len(example_outputs) == 1: reference_key = list(example_outputs)[0] else: reference_key = None return reference_key def _construct_run_evaluator( eval_config: Union[EvaluatorType, EvalConfig], eval_llm: BaseLanguageModel, run_type: str, data_type: DataType, example_outputs: Optional[List[str]], reference_key: Optional[str], input_key: Optional[str], prediction_key: Optional[str], ) -> RunEvaluator: if isinstance(eval_config, EvaluatorType): evaluator_ = load_evaluator(eval_config, llm=eval_llm) eval_type_tag = eval_config.value else: kwargs = {"llm": eval_llm, **eval_config.get_kwargs()} evaluator_ = load_evaluator(eval_config.evaluator_type, **kwargs) eval_type_tag = eval_config.evaluator_type.value if isinstance(evaluator_, StringEvaluator): if evaluator_.requires_reference and reference_key is None: raise ValueError( f"Must specify reference_key in RunEvalConfig to use" f" evaluator of type {eval_type_tag} with" f" dataset with multiple output keys: {example_outputs}." ) run_evaluator = StringRunEvaluatorChain.from_run_and_data_type( evaluator_, run_type, data_type, input_key=input_key, prediction_key=prediction_key, reference_key=reference_key, tags=[eval_type_tag], ) else: raise NotImplementedError( f"Run evaluator for {eval_type_tag} is not implemented" ) return run_evaluator def _load_run_evaluators( config: RunEvalConfig, run_type: str, data_type: DataType, example_outputs: Optional[List[str]], run_inputs: Optional[List[str]], run_outputs: Optional[List[str]], ) -> List[RunEvaluator]: """ Load run evaluators from a configuration. Args: config: Configuration for the run evaluators. Returns: A list of run evaluators. """ eval_llm = config.eval_llm or ChatOpenAI(model="gpt-4", temperature=0.0) run_evaluators = [] input_key = _determine_input_key(config, run_inputs, run_type) prediction_key = _determine_prediction_key(config, run_outputs, run_type) reference_key = _determine_reference_key(config, example_outputs) for eval_config in config.evaluators: run_evaluator = _construct_run_evaluator( eval_config, eval_llm, run_type, data_type, example_outputs, reference_key, input_key, prediction_key, ) run_evaluators.append(run_evaluator) custom_evaluators = config.custom_evaluators or [] for custom_evaluator in custom_evaluators: if isinstance(custom_evaluator, RunEvaluator): run_evaluators.append(custom_evaluator) elif isinstance(custom_evaluator, StringEvaluator): run_evaluators.append( StringRunEvaluatorChain.from_run_and_data_type( custom_evaluator, run_type, data_type, input_key=input_key, prediction_key=prediction_key, reference_key=reference_key, ) ) else: raise ValueError( f"Unsupported custom evaluator: {custom_evaluator}." f" Expected RunEvaluator or StringEvaluator." ) return run_evaluators ### Async Helpers async def _arun_llm( llm: BaseLanguageModel, inputs: Dict[str, Any], *, tags: Optional[List[str]] = None, callbacks: Callbacks = None, input_mapper: Optional[Callable[[Dict], Any]] = None, ) -> Union[str, BaseMessage]: """Asynchronously run the language model. Args: llm: The language model to run. inputs: The input dictionary. tags: Optional tags to add to the run. callbacks: Optional callbacks to use during the run. input_mapper: Optional function to map inputs to the expected format. Returns: The LLMResult or ChatResult. Raises: ValueError: If the LLM type is unsupported. InputFormatError: If the input format is invalid. """ if input_mapper is not None: prompt_or_messages = input_mapper(inputs) if isinstance(prompt_or_messages, str): return await llm.apredict( prompt_or_messages, callbacks=callbacks, tags=tags ) elif isinstance(prompt_or_messages, list) and all( isinstance(msg, BaseMessage) for msg in prompt_or_messages ): return await llm.apredict_messages( prompt_or_messages, callbacks=callbacks, tags=tags ) else: raise InputFormatError( "Input mapper returned invalid format" f" {prompt_or_messages}" "\nExpected a single string or list of chat messages." ) else: try: prompt = _get_prompt(inputs) llm_output: Union[str, BaseMessage] = await llm.apredict( prompt, callbacks=callbacks, tags=tags ) except InputFormatError: messages = _get_messages(inputs) llm_output = await llm.apredict_messages( messages, callbacks=callbacks, tags=tags ) return llm_output async def _arun_chain( chain: Chain, inputs: Dict[str, Any], callbacks: Callbacks, *, tags: Optional[List[str]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, ) -> Union[dict, str]: """Run a chain asynchronously on inputs.""" if input_mapper is not None: inputs_ = input_mapper(inputs) output: Union[dict, str] = await chain.acall( inputs_, callbacks=callbacks, tags=tags ) else: if len(inputs) == 1: inputs_ = next(iter(inputs.values())) output = await chain.arun(inputs_, callbacks=callbacks, tags=tags) else: output = await chain.acall(inputs, callbacks=callbacks, tags=tags) return output async def _arun_llm_or_chain( example: Example, llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, n_repetitions: int, *, tags: Optional[List[str]] = None, callbacks: Optional[List[BaseCallbackHandler]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, ) -> Union[List[dict], List[str], List[LLMResult], List[ChatResult]]: """Asynchronously run the Chain or language model. Args: example: The example to run. llm_or_chain_factory: The Chain or language model constructor to run. n_repetitions: The number of times to run the model on each example. tags: Optional tags to add to the run. callbacks: Optional callbacks to use during the run. input_mapper: Optional function to map the input to the expected format. Returns: A list of outputs. """ if callbacks: previous_example_ids = [ getattr(tracer, "example_id", None) for tracer in callbacks ] for tracer in callbacks: if hasattr(tracer, "example_id"): tracer.example_id = example.id else: previous_example_ids = None outputs = [] chain_or_llm = ( "LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain" ) for _ in range(n_repetitions): try: if isinstance(llm_or_chain_factory, BaseLanguageModel): output: Any = await _arun_llm( llm_or_chain_factory, example.inputs, tags=tags, callbacks=callbacks, input_mapper=input_mapper, ) else: chain = llm_or_chain_factory() output = await _arun_chain( chain, example.inputs, tags=tags, callbacks=callbacks, input_mapper=input_mapper, ) outputs.append(output) except Exception as e: logger.warning( f"{chain_or_llm} failed for example {example.id}. Error: {e}" ) outputs.append({"Error": str(e)}) if callbacks and previous_example_ids: for example_id, tracer in zip(previous_example_ids, callbacks): if hasattr(tracer, "example_id"): tracer.example_id = example_id return outputs async def _gather_with_concurrency( n: int, initializer: Callable[[], Coroutine[Any, Any, Any]], *async_funcs: Callable[ [Sequence[BaseCallbackHandler], Dict], Coroutine[Any, Any, Any] ], ) -> List[Any]: """Run coroutines with a concurrency limit. Args: n: The maximum number of concurrent tasks. initializer: A coroutine that initializes shared resources for the tasks. async_funcs: The async_funcs to be run concurrently. Returns: A list of results from the coroutines. """ semaphore = asyncio.Semaphore(n) job_state = {"num_processed": 0} callback_queue: asyncio.Queue[Sequence[BaseCallbackHandler]] = asyncio.Queue() for _ in range(n): callback_queue.put_nowait(await initializer()) async def run_coroutine_with_semaphore( async_func: Callable[ [Sequence[BaseCallbackHandler], Dict], Coroutine[Any, Any, Any] ] ) -> Any: async with semaphore: callbacks = await callback_queue.get() try: result = await async_func(callbacks, job_state) finally: callback_queue.put_nowait(callbacks) return result results = await asyncio.gather( *(run_coroutine_with_semaphore(function) for function in async_funcs) ) while callback_queue: try: callbacks = callback_queue.get_nowait() except asyncio.QueueEmpty: break for callback in callbacks: if isinstance(callback, (LangChainTracer, EvaluatorCallbackHandler)): callback.wait_for_futures() return results async def _callbacks_initializer( project_name: Optional[str], client: Client, run_evaluators: Sequence[RunEvaluator], evaluation_handler_collector: List[EvaluatorCallbackHandler], ) -> List[BaseTracer]: """ Initialize a tracer to share across tasks. Args: project_name: The project name for the tracer. client: The client to use for the tracer. run_evaluators: The evaluators to run. evaluation_handler_collector: A list to collect the evaluators. Used to wait for the evaluators to finish. Returns: The callbacks for this thread. """ callbacks: List[BaseTracer] = [] if project_name: callbacks.append( LangChainTracer( project_name=project_name, client=client, use_threading=False ) ) evaluator_project_name = f"{project_name}-evaluators" if project_name else None if run_evaluators: callback = EvaluatorCallbackHandler( client=client, evaluators=run_evaluators, # We already have concurrency, don't want to overload the machine max_workers=1, project_name=evaluator_project_name, ) callbacks.append(callback) evaluation_handler_collector.append(callback) return callbacks async def _arun_on_examples( client: Client, examples: Iterator[Example], llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, *, evaluation: Optional[RunEvalConfig] = None, concurrency_level: int = 5, num_repetitions: int = 1, project_name: Optional[str] = None, verbose: bool = False, tags: Optional[List[str]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, data_type: DataType = DataType.kv, ) -> Dict[str, Any]: """ Asynchronously run the chain on examples and store traces to the specified project name. Args: client: LangSmith client to use to log feedback and runs. examples: Examples to run the model or chain over. llm_or_chain_factory: Language model or Chain constructor to run over the dataset. The Chain constructor is used to permit independent calls on each example without carrying over state. evaluation: Optional evaluation configuration to use when evaluating concurrency_level: The number of async tasks to run concurrently. num_repetitions: Number of times to run the model on each example. This is useful when testing success rates or generating confidence intervals. project_name: Project name to use when tracing runs. Defaults to {dataset_name}-{chain class name}-{datetime}. verbose: Whether to print progress. tags: Tags to add to each run in the project. input_mapper: function to map to the inputs dictionary from an Example to the format expected by the model to be evaluated. This is useful if your model needs to deserialize more complex schema or if your dataset has inputs with keys that differ from what is expected by your chain or agent. data_type: The dataset's data type. This is used to determine determine how to deserialize the reference data and model compatibility. Returns: A dictionary mapping example ids to the model outputs. """ llm_or_chain_factory = _wrap_in_chain_factory(llm_or_chain_factory) project_name = _get_project_name(project_name, llm_or_chain_factory) run_evaluators, examples = _setup_evaluation( llm_or_chain_factory, examples, evaluation, data_type ) examples = _validate_example_inputs(examples, llm_or_chain_factory, input_mapper) results: Dict[str, List[Any]] = {} async def process_example( example: Example, callbacks: List[BaseCallbackHandler], job_state: dict ) -> None: """Process a single example.""" result = await _arun_llm_or_chain( example, llm_or_chain_factory, num_repetitions, tags=tags, callbacks=callbacks, input_mapper=input_mapper, ) results[str(example.id)] = result job_state["num_processed"] += 1 if verbose: print( f"Processed examples: {job_state['num_processed']}", end="\r", flush=True, ) evaluation_handlers: List[EvaluatorCallbackHandler] = [] await _gather_with_concurrency( concurrency_level, functools.partial( _callbacks_initializer, project_name=project_name, client=client, evaluation_handler_collector=evaluation_handlers, run_evaluators=run_evaluators or [], ), *(functools.partial(process_example, e) for e in examples), ) for handler in evaluation_handlers: handler.wait_for_futures() return results ## Sync Utilities def _run_llm( llm: BaseLanguageModel, inputs: Dict[str, Any], callbacks: Callbacks, *, tags: Optional[List[str]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, ) -> Union[str, BaseMessage]: """ Run the language model on the example. Args: llm: The language model to run. inputs: The input dictionary. callbacks: The callbacks to use during the run. tags: Optional tags to add to the run. input_mapper: function to map to the inputs dictionary from an Example Returns: The LLMResult or ChatResult. Raises: ValueError: If the LLM type is unsupported. InputFormatError: If the input format is invalid. """ if input_mapper is not None: prompt_or_messages = input_mapper(inputs) if isinstance(prompt_or_messages, str): llm_output: Union[str, BaseMessage] = llm.predict( prompt_or_messages, callbacks=callbacks, tags=tags ) elif isinstance(prompt_or_messages, list) and all( isinstance(msg, BaseMessage) for msg in prompt_or_messages ): llm_output = llm.predict_messages( prompt_or_messages, callbacks=callbacks, tags=tags ) else: raise InputFormatError( "Input mapper returned invalid format: " f" {prompt_or_messages}" "\nExpected a single string or list of chat messages." ) else: try: llm_prompts = _get_prompt(inputs) llm_output = llm.predict(llm_prompts, callbacks=callbacks, tags=tags) except InputFormatError: llm_messages = _get_messages(inputs) llm_output = llm.predict_messages(llm_messages, callbacks=callbacks) return llm_output def _run_chain( chain: Chain, inputs: Dict[str, Any], callbacks: Callbacks, *, tags: Optional[List[str]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, ) -> Union[Dict, str]: """Run a chain on inputs.""" if input_mapper is not None: inputs_ = input_mapper(inputs) output: Union[dict, str] = chain(inputs_, callbacks=callbacks, tags=tags) else: if len(inputs) == 1: inputs_ = next(iter(inputs.values())) output = chain.run(inputs_, callbacks=callbacks, tags=tags) else: output = chain(inputs, callbacks=callbacks, tags=tags) return output def _run_llm_or_chain( example: Example, llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, n_repetitions: int, *, tags: Optional[List[str]] = None, callbacks: Optional[List[BaseCallbackHandler]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, ) -> Union[List[dict], List[str], List[LLMResult], List[ChatResult]]: """ Run the Chain or language model synchronously. Args: example: The example to run. llm_or_chain_factory: The Chain or language model constructor to run. n_repetitions: The number of times to run the model on each example. tags: Optional tags to add to the run. callbacks: Optional callbacks to use during the run. Returns: Union[List[dict], List[str], List[LLMResult], List[ChatResult]]: The outputs of the model or chain. """ if callbacks: previous_example_ids = [ getattr(tracer, "example_id", None) for tracer in callbacks ] for tracer in callbacks: if hasattr(tracer, "example_id"): tracer.example_id = example.id else: previous_example_ids = None outputs = [] chain_or_llm = ( "LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain" ) for _ in range(n_repetitions): try: if isinstance(llm_or_chain_factory, BaseLanguageModel): output: Any = _run_llm( llm_or_chain_factory, example.inputs, callbacks, tags=tags, input_mapper=input_mapper, ) else: chain = llm_or_chain_factory() output = _run_chain( chain, example.inputs, callbacks, tags=tags, input_mapper=input_mapper, ) outputs.append(output) except Exception as e: logger.warning( f"{chain_or_llm} failed for example {example.id}. Error: {e}" ) outputs.append({"Error": str(e)}) if callbacks and previous_example_ids: for example_id, tracer in zip(previous_example_ids, callbacks): if hasattr(tracer, "example_id"): tracer.example_id = example_id return outputs def _run_on_examples( client: Client, examples: Iterator[Example], llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, *, evaluation: Optional[RunEvalConfig] = None, num_repetitions: int = 1, project_name: Optional[str] = None, verbose: bool = False, tags: Optional[List[str]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, data_type: DataType = DataType.kv, ) -> Dict[str, Any]: """ Run the Chain or language model on examples and store traces to the specified project name. Args: client: LangSmith client to use to log feedback and runs. examples: Examples to run the model or chain over. llm_or_chain_factory: Language model or Chain constructor to run over the dataset. The Chain constructor is used to permit independent calls on each example without carrying over state. evaluation: Optional evaluation configuration to use when evaluating num_repetitions: Number of times to run the model on each example. This is useful when testing success rates or generating confidence intervals. project_name: Name of the project to store the traces in. Defaults to {dataset_name}-{chain class name}-{datetime}. verbose: Whether to print progress. tags: Tags to add to each run in the project. input_mapper: A function to map to the inputs dictionary from an Example to the format expected by the model to be evaluated. This is useful if your model needs to deserialize more complex schema or if your dataset has inputs with keys that differ from what is expected by your chain or agent. data_type: The dataset's data type. This is used to determine determine how to deserialize the reference data and model compatibility. Returns: A dictionary mapping example ids to the model outputs. """ results: Dict[str, Any] = {} llm_or_chain_factory = _wrap_in_chain_factory(llm_or_chain_factory) project_name = _get_project_name(project_name, llm_or_chain_factory) tracer = LangChainTracer( project_name=project_name, client=client, use_threading=False ) evaluator_project_name = f"{project_name}-evaluators" run_evaluators, examples = _setup_evaluation( llm_or_chain_factory, examples, evaluation, data_type ) examples = _validate_example_inputs(examples, llm_or_chain_factory, input_mapper) evalution_handler = EvaluatorCallbackHandler( evaluators=run_evaluators or [], client=client, project_name=evaluator_project_name, ) callbacks: List[BaseCallbackHandler] = [tracer, evalution_handler] for i, example in enumerate(examples): result = _run_llm_or_chain( example, llm_or_chain_factory, num_repetitions, tags=tags, callbacks=callbacks, input_mapper=input_mapper, ) if verbose: print(f"{i+1} processed", flush=True, end="\r") results[str(example.id)] = result tracer.wait_for_futures() evalution_handler.wait_for_futures() return results ## Public API def _prepare_eval_run( client: Client, dataset_name: str, llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, project_name: Optional[str], ) -> Tuple[MODEL_OR_CHAIN_FACTORY, str, Dataset, Iterator[Example]]: llm_or_chain_factory = _wrap_in_chain_factory(llm_or_chain_factory, dataset_name) project_name = _get_project_name(project_name, llm_or_chain_factory) try: project = client.create_project(project_name) except ValueError as e: if "already exists " not in str(e): raise e raise ValueError( f"Project {project_name} already exists. Please use a different name." ) project_url = _get_eval_project_url(client.api_url, project.id) print( f"View the evaluation results for project '{project_name}' at:\n{project_url}" ) dataset = client.read_dataset(dataset_name=dataset_name) examples = client.list_examples(dataset_id=str(dataset.id)) return llm_or_chain_factory, project_name, dataset, examples
[docs]async def arun_on_dataset( client: Client, dataset_name: str, llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, *, evaluation: Optional[RunEvalConfig] = None, concurrency_level: int = 5, num_repetitions: int = 1, project_name: Optional[str] = None, verbose: bool = False, tags: Optional[List[str]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, ) -> Dict[str, Any]: """ Asynchronously run the Chain or language model on a dataset and store traces to the specified project name. Args: client: LangSmith client to use to read the dataset, and to log feedback and run traces. dataset_name: Name of the dataset to run the chain on. llm_or_chain_factory: Language model or Chain constructor to run over the dataset. The Chain constructor is used to permit independent calls on each example without carrying over state. evaluation: Optional evaluation configuration to use when evaluating concurrency_level: The number of async tasks to run concurrently. num_repetitions: Number of times to run the model on each example. This is useful when testing success rates or generating confidence intervals. project_name: Name of the project to store the traces in. Defaults to {dataset_name}-{chain class name}-{datetime}. verbose: Whether to print progress. tags: Tags to add to each run in the project. input_mapper: A function to map to the inputs dictionary from an Example to the format expected by the model to be evaluated. This is useful if your model needs to deserialize more complex schema or if your dataset has inputs with keys that differ from what is expected by your chain or agent. Returns: A dictionary containing the run's project name and the resulting model outputs. For the synchronous version, see :func:`run_on_dataset`. Examples -------- .. code-block:: python from langsmith import Client from langchain.chat_models import ChatOpenAI from langchain.chains import LLMChain from langchain.smith import RunEvalConfig, arun_on_dataset # Chains may have memory. Passing in a constructor function lets the # evaluation framework avoid cross-contamination between runs. def construct_chain(): llm = ChatOpenAI(temperature=0) chain = LLMChain.from_string( llm, "What's the answer to {your_input_key}" ) return chain # Load off-the-shelf evaluators via config or the EvaluatorType (string or enum) evaluation_config = RunEvalConfig( evaluators=[ "qa", # "Correctness" against a reference answer "embedding_distance", RunEvalConfig.Criteria("helpfulness"), RunEvalConfig.Criteria({ "fifth-grader-score": "Do you have to be smarter than a fifth grader to answer this question?" }), ] ) client = Client() await arun_on_dataset( client, "<my_dataset_name>", construct_chain, evaluation=evaluation_config, ) You can also create custom evaluators by subclassing the :class:`StringEvaluator <langchain.evaluation.schema.StringEvaluator>` or LangSmith's `RunEvaluator` classes. .. code-block:: python from typing import Optional from langchain.evaluation import StringEvaluator class MyStringEvaluator(StringEvaluator): @property def requires_input(self) -> bool: return False @property def requires_reference(self) -> bool: return True @property def evaluation_name(self) -> str: return "exact_match" def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> dict: return {"score": prediction == reference} evaluation_config = RunEvalConfig( custom_evaluators = [MyStringEvaluator()], ) await arun_on_dataset( client, "<my_dataset_name>", construct_chain, evaluation=evaluation_config, ) """ # noqa: E501 llm_or_chain_factory, project_name, dataset, examples = _prepare_eval_run( client, dataset_name, llm_or_chain_factory, project_name ) results = await _arun_on_examples( client, examples, llm_or_chain_factory, concurrency_level=concurrency_level, num_repetitions=num_repetitions, project_name=project_name, verbose=verbose, tags=tags, evaluation=evaluation, input_mapper=input_mapper, data_type=dataset.data_type, ) return { "project_name": project_name, "results": results, }
[docs]def run_on_dataset( client: Client, dataset_name: str, llm_or_chain_factory: MODEL_OR_CHAIN_FACTORY, *, evaluation: Optional[RunEvalConfig] = None, num_repetitions: int = 1, project_name: Optional[str] = None, verbose: bool = False, tags: Optional[List[str]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, ) -> Dict[str, Any]: """ Run the Chain or language model on a dataset and store traces to the specified project name. Args: client: LangSmith client to use to access the dataset and to log feedback and run traces. dataset_name: Name of the dataset to run the chain on. llm_or_chain_factory: Language model or Chain constructor to run over the dataset. The Chain constructor is used to permit independent calls on each example without carrying over state. evaluation: Configuration for evaluators to run on the results of the chain num_repetitions: Number of times to run the model on each example. This is useful when testing success rates or generating confidence intervals. project_name: Name of the project to store the traces in. Defaults to {dataset_name}-{chain class name}-{datetime}. verbose: Whether to print progress. tags: Tags to add to each run in the project. input_mapper: A function to map to the inputs dictionary from an Example to the format expected by the model to be evaluated. This is useful if your model needs to deserialize more complex schema or if your dataset has inputs with keys that differ from what is expected by your chain or agent. Returns: A dictionary containing the run's project name and the resulting model outputs. For the (usually faster) async version of this function, see :func:`arun_on_dataset`. Examples -------- .. code-block:: python from langsmith import Client from langchain.chat_models import ChatOpenAI from langchain.chains import LLMChain from langchain.smith import RunEvalConfig, run_on_dataset # Chains may have memory. Passing in a constructor function lets the # evaluation framework avoid cross-contamination between runs. def construct_chain(): llm = ChatOpenAI(temperature=0) chain = LLMChain.from_string( llm, "What's the answer to {your_input_key}" ) return chain # Load off-the-shelf evaluators via config or the EvaluatorType (string or enum) evaluation_config = RunEvalConfig( evaluators=[ "qa", # "Correctness" against a reference answer "embedding_distance", RunEvalConfig.Criteria("helpfulness"), RunEvalConfig.Criteria({ "fifth-grader-score": "Do you have to be smarter than a fifth grader to answer this question?" }), ] ) client = Client() run_on_dataset( client, "<my_dataset_name>", construct_chain, evaluation=evaluation_config, ) You can also create custom evaluators by subclassing the :class:`StringEvaluator <langchain.evaluation.schema.StringEvaluator>` or LangSmith's `RunEvaluator` classes. .. code-block:: python from typing import Optional from langchain.evaluation import StringEvaluator class MyStringEvaluator(StringEvaluator): @property def requires_input(self) -> bool: return False @property def requires_reference(self) -> bool: return True @property def evaluation_name(self) -> str: return "exact_match" def _evaluate_strings(self, prediction, reference=None, input=None, **kwargs) -> dict: return {"score": prediction == reference} evaluation_config = RunEvalConfig( custom_evaluators = [MyStringEvaluator()], ) run_on_dataset( client, "<my_dataset_name>", construct_chain, evaluation=evaluation_config, ) """ # noqa: E501 llm_or_chain_factory, project_name, dataset, examples = _prepare_eval_run( client, dataset_name, llm_or_chain_factory, project_name ) results = _run_on_examples( client, examples, llm_or_chain_factory, num_repetitions=num_repetitions, project_name=project_name, verbose=verbose, tags=tags, evaluation=evaluation, input_mapper=input_mapper, data_type=dataset.data_type, ) return { "project_name": project_name, "results": results, }