def __call__(
self,
program,
metric=None,
devset=None,
num_threads=None,
display_progress=None,
display_table=None,
return_all_scores=None,
return_outputs=None,
):
metric = metric if metric is not None else self.metric
devset = devset if devset is not None else self.devset
num_threads = num_threads if num_threads is not None else self.num_threads
display_progress = display_progress if display_progress is not None else self.display_progress
display_table = display_table if display_table is not None else self.display_table
return_all_scores = return_all_scores if return_all_scores is not None else self.return_all_scores
return_outputs = return_outputs if return_outputs is not None else self.return_outputs
tqdm.tqdm._instances.clear()
executor = ParallelExecutor(
num_threads=num_threads,
disable_progress_bar=not display_progress,
max_errors=self.max_errors,
provide_traceback=self.provide_traceback,
compare_results=True,
)
def process_item(example):
prediction = program(**example.inputs())
score = metric(example, prediction)
# Increment assert and suggest failures to program's attributes
if hasattr(program, "_assert_failures"):
program._assert_failures += dspy.settings.get("assert_failures")
if hasattr(program, "_suggest_failures"):
program._suggest_failures += dspy.settings.get("suggest_failures")
return prediction, score
results = executor.execute(process_item, devset)
assert len(devset) == len(results)
results = [((dspy.Prediction(), self.failure_score) if r is None else r) for r in results]
results = [(example, prediction, score) for example, (prediction, score) in zip(devset, results)]
ncorrect, ntotal = sum(score for *_, score in results), len(devset)
logger.info(f"Average Metric: {ncorrect} / {ntotal} ({round(100 * ncorrect / ntotal, 1)}%)")
def prediction_is_dictlike(prediction):
# Downstream logic for displaying dictionary-like predictions depends solely on the predictions
# having a method called `items()` for iterating through key/value pairs
return hasattr(prediction, "items") and callable(getattr(prediction, "items"))
data = [
(
merge_dicts(example, prediction) | {"correct": score}
if prediction_is_dictlike(prediction)
else dict(example) | {"prediction": prediction, "correct": score}
)
for example, prediction, score in results
]
# Truncate every cell in the DataFrame (DataFrame.applymap was renamed to DataFrame.map in Pandas 2.1.0)
result_df = pd.DataFrame(data)
result_df = result_df.map(truncate_cell) if hasattr(result_df, "map") else result_df.applymap(truncate_cell)
# Rename the 'correct' column to the name of the metric object
metric_name = metric.__name__ if isinstance(metric, types.FunctionType) else metric.__class__.__name__
result_df = result_df.rename(columns={"correct": metric_name})
if display_table:
if isinstance(display_table, bool):
df_to_display = result_df.copy()
truncated_rows = 0
else:
df_to_display = result_df.head(display_table).copy()
truncated_rows = len(result_df) - display_table
df_to_display = stylize_metric_name(df_to_display, metric_name)
display_dataframe(df_to_display)
if truncated_rows > 0:
# Simplified message about the truncated rows
message = f"""
<div style='
text-align: center;
font-size: 16px;
font-weight: bold;
color: #555;
margin: 10px 0;'>
... {truncated_rows} more rows not displayed ...
</div>
"""
display(HTML(message))
if return_all_scores and return_outputs:
return round(100 * ncorrect / ntotal, 2), results, [score for *_, score in results]
if return_all_scores:
return round(100 * ncorrect / ntotal, 2), [score for *_, score in results]
if return_outputs:
return round(100 * ncorrect / ntotal, 2), results
return round(100 * ncorrect / ntotal, 2)