def compile(self, student, *, trainset, eval_kwargs):
"""
optimizes `signature` of `student` program - note that it may be zero-shot or already pre-optimized (demos already chosen - `demos != []`)
parameters:
student: program to optimize and left modified.
trainset: iterable of `Example`s
eval_kwargs: optional, dict
Additional keywords to go into `Evaluate` for the metric.
Returns optimized version of `student`.
"""
module = student.deepcopy()
evaluate = Evaluate(devset=trainset, metric=self.metric, **eval_kwargs)
total_calls = 0
results_best = {
id(p): {"depth": [], "max": [], "average": [], "min": [], "std": []} for p in module.predictors()
}
results_latest = {
id(p): {"depth": [], "max": [], "average": [], "min": [], "std": []} for p in module.predictors()
}
if self.track_stats:
import numpy as np
candidates = {}
evaluated_candidates = defaultdict(dict)
# Seed the prompt optimizer zero shot with just the instruction, generate BREADTH new prompts
for predictor in module.predictors():
basic_instruction = None
basic_prefix = None
*_, last_key = self._get_signature(predictor).fields.keys()
basic_instruction = self._get_signature(predictor).instructions
basic_prefix = self._get_signature(predictor).fields[last_key].json_schema_extra["prefix"]
if self.prompt_model:
with dspy.settings.context(lm=self.prompt_model):
instruct = dspy.Predict(
BasicGenerateInstruction,
n=self.breadth - 1,
temperature=self.init_temperature,
)(basic_instruction=basic_instruction)
else:
instruct = dspy.Predict(
BasicGenerateInstruction,
n=self.breadth - 1,
temperature=self.init_temperature,
)(basic_instruction=basic_instruction)
# Add in our initial prompt as a candidate as well
instruct.completions.proposed_instruction.append(basic_instruction)
instruct.completions.proposed_prefix_for_output_field.append(basic_prefix)
candidates[id(predictor)] = instruct.completions
evaluated_candidates[id(predictor)] = {}
if self.prompt_model:
logger.debug(f"{self.prompt_model.inspect_history(n=1)}")
latest_candidates = candidates
all_candidates = candidates
module_clone = module.deepcopy()
# For each iteration in depth...
for d in range(
self.depth,
): # TODO: fix this so that we eval the new batch of predictors with the new best following predictors
logger.info(f"Iteration Depth: {d+1}/{self.depth}.")
latest_scores = []
# Go through our module's predictors
for p_i, (p_old, p_new) in enumerate(zip(module.predictors(), module_clone.predictors())):
candidates_ = latest_candidates[id(p_old)] # Use the most recently generated candidates for evaluation
if len(module.predictors()) > 1:
# Unless our program has multiple predictors, in which case we need to reevaluate all prompts with
# the new prompt(s) for the other predictor(s).
candidates_ = all_candidates[
id(p_old)
]
# For each candidate
for c_i, c in enumerate(candidates_):
# Get the candidate instruction and prefix
instruction, prefix = (
c.proposed_instruction.strip('"').strip(),
c.proposed_prefix_for_output_field.strip('"').strip(),
)
# Set this new module with our instruction / prefix
*_, last_key = self._get_signature(p_new).fields.keys()
updated_signature = (
self._get_signature(p_new)
.with_instructions(instruction)
.with_updated_fields(last_key, prefix=prefix)
)
self._set_signature(p_new, updated_signature)
# Score the instruction / prefix
for i, predictor in enumerate(module_clone.predictors()):
logger.debug(f"Predictor {i+1}")
self._print_signature(predictor)
logger.info(
f"At Depth {d+1}/{self.depth}, Evaluating Prompt Candidate #{c_i+1}/{len(candidates_)} for "
f"Predictor {p_i+1} of {len(module.predictors())}.",
)
score = evaluate(module_clone, devset=trainset, **eval_kwargs)
if self.prompt_model:
logger.debug(f"prompt_model.inspect_history(n=1) {self.prompt_model.inspect_history(n=1)}")
total_calls += 1
replace_entry = True
logger.debug(f"(instruction, prefix) {(instruction, prefix)}")
if (instruction, prefix) in evaluated_candidates[id(p_old)]:
if evaluated_candidates[id(p_old)][(instruction, prefix)]["score"] >= score:
replace_entry = False
if replace_entry:
# Add it to our evaluated candidates list
evaluated_candidates[id(p_old)][(instruction, prefix)] = {
"score": score,
"program": module_clone.deepcopy(),
"instruction": instruction,
"prefix": prefix,
"depth": d,
}
if len(candidates_) - self.breadth <= c_i:
latest_scores.append(score)
if self.track_stats:
results_latest[id(p_old)]["depth"].append(d)
results_latest[id(p_old)]["max"].append(max(latest_scores))
results_latest[id(p_old)]["average"].append(sum(latest_scores) / len(latest_scores))
results_latest[id(p_old)]["min"].append(min(latest_scores))
results_latest[id(p_old)]["std"].append(np.std(latest_scores))
# Now that we've evaluated the candidates, set this predictor to the best performing version
# to ensure the next round of scores reflect the best possible version
best_candidate = max(evaluated_candidates[id(p_old)].values(), key=lambda candidate: candidate["score"])
*_, last_key = self._get_signature(p_old).fields.keys()
updated_signature = (
self._get_signature(p_new)
.with_instructions(best_candidate["instruction"])
.with_updated_fields(last_key, prefix=best_candidate["prefix"])
)
self._set_signature(p_new, updated_signature)
logger.debug(
f"Updating Predictor {id(p_old)} to:\ni: {best_candidate['instruction']}\n"
f"p: {best_candidate['prefix']}",
)
logger.debug("Full predictor with update: ")
for i, predictor in enumerate(module_clone.predictors()):
logger.debug(f"Predictor {i}")
self._print_signature(predictor)
if d == self.depth - 1:
break
new_candidates = {}
for p_base in module.predictors():
# Build Few-Shot Example of Optimized Prompts
attempts = []
shortest_len = self.breadth
shortest_len = min(len(evaluated_candidates[id(p_base)]), shortest_len)
best_predictors = list(evaluated_candidates[id(p_base)].values())
# best_predictors = evaluated_candidates[id(p_base)].values()[:]
best_predictors.sort(key=lambda x: x["score"], reverse=True)
if self.track_stats:
scores = [x["score"] for x in best_predictors][:10]
results_best[id(p_base)]["depth"].append(d)
results_best[id(p_base)]["max"].append(max(scores))
results_best[id(p_base)]["average"].append(sum(scores) / len(scores))
results_best[id(p_base)]["min"].append(min(scores))
results_best[id(p_base)]["std"].append(np.std(scores))
for i in range(shortest_len - 1, -1, -1):
# breakpoint()
attempts.append(f'Instruction #{shortest_len-i}: {best_predictors[i]["instruction"]}')
attempts.append(f'Prefix #{shortest_len-i}: {best_predictors[i]["prefix"]}')
attempts.append(f'Resulting Score #{shortest_len-i}: {best_predictors[i]["score"]}')
# Generate next batch of potential prompts to optimize, with previous attempts as input
if self.prompt_model:
with dspy.settings.context(lm=self.prompt_model):
instr = dspy.Predict(
GenerateInstructionGivenAttempts,
n=self.breadth,
temperature=self.init_temperature,
)(attempted_instructions=attempts)
else:
instr = dspy.Predict(
GenerateInstructionGivenAttempts,
n=self.breadth,
temperature=self.init_temperature,
)(attempted_instructions=attempts)
if self.prompt_model:
logger.debug(
f"(self.prompt_model.inspect_history(n=1)) {self.prompt_model.inspect_history(n=1)}"
)
# Get candidates for each predictor
new_candidates[id(p_base)] = instr.completions
all_candidates[id(p_base)].proposed_instruction.extend(instr.completions.proposed_instruction)
all_candidates[id(p_base)].proposed_prefix_for_output_field.extend(
instr.completions.proposed_prefix_for_output_field,
)
if self.prompt_model:
logger.debug(f"{self.prompt_model.inspect_history(n=1)}")
latest_candidates = new_candidates
candidates = []
for predictor in module.predictors():
candidates.extend(list(evaluated_candidates[id(predictor)].values()))
if self.track_stats:
best_predictors = list(evaluated_candidates[id(predictor)].values())
best_predictors.sort(key=lambda x: x["score"], reverse=True)
scores = [x["score"] for x in best_predictors][:10]
results_best[id(predictor)]["depth"].append(d)
results_best[id(predictor)]["max"].append(max(scores))
results_best[id(predictor)]["average"].append(sum(scores) / len(scores))
results_best[id(predictor)]["min"].append(min(scores))
results_best[id(predictor)]["std"].append(np.std(scores))
candidates.sort(key=lambda x: x["score"], reverse=True)
candidates = self._drop_duplicates(candidates)
best_program = candidates[0]["program"]
best_program.candidate_programs = candidates
best_program.total_calls = total_calls
if self.track_stats:
best_program.results_best = results_best
best_program.results_latest = results_latest
return best_program