Skip to content

dspy.ProgramOfThought

dspy.ProgramOfThought(signature, max_iters=3)

Bases: Module

Source code in dspy/predict/program_of_thought.py
def __init__(self, signature, max_iters=3):
    super().__init__()
    self.signature = signature = ensure_signature(signature)
    self.max_iters = max_iters

    self.input_fields = signature.input_fields
    self.output_fields = signature.output_fields

    assert len(self.output_fields) == 1, "PoT only supports one output field."

    self.output_field_name = next(iter(self.output_fields))
    inputs_ = ", ".join(
        [f"`{field_name}`" for field_name in self.input_fields.keys()],
    )
    outputs_ = f"`{self.output_field_name}`"

    assert len(self.output_fields) == 1, "PoT only supports one output field."

    instr = []
    instr.append(
        f"You will be given {inputs_} and you will respond with {outputs_}.",
    )
    instr.append(
        f"Generating executable Python code that programmatically computes the correct {outputs_}.",
    )
    instr.append(
        f"After you're done with the computation, make sure the last line in your code evaluates to the correct value for {outputs_}.",
    )
    instr = "\n".join(instr)

    self.code_generate = dspy.ChainOfThought(
        dspy.Signature(
            self._generate_signature("generate").fields,
            self._generate_instruction("generate"),
        ),
    )
    self.code_regenerate = dspy.ChainOfThought(
        dspy.Signature(
            self._generate_signature("regenerate").fields,
            self._generate_instruction("regenerate"),
        ),
    )
    self.generate_answer = dspy.ChainOfThought(
        dspy.Signature(
            self._generate_signature("answer").fields,
            self._generate_instruction("answer"),
        ),
    )

Functions

execute_code(code)

Source code in dspy/predict/program_of_thought.py
def execute_code(self, code):
    if not code:
        return code, None, "Error: Empty code before execution."
    interpreter = PythonInterpreter()
    try:
        output = str(interpreter.execute(code))
        return code, output, None
    except Exception as e:
        return code, None, str(e)

forward(**kwargs)

Source code in dspy/predict/program_of_thought.py
def forward(self, **kwargs):
    input_kwargs = {
        field_name: kwargs[field_name] for field_name in self.input_fields
    }
    code_data = self.code_generate(**input_kwargs)
    parsed_code, error = self.parse_code(code_data)
    # FIXME: Don't try to execute the code if it didn't parse
    code, output, error = self.execute_code(parsed_code)
    hop = 0
    while hop < self.max_iters and error:
        print("Error in code execution")
        input_kwargs.update({"previous_code": code, "error": error})
        code_data = self.code_regenerate(**input_kwargs)
        parsed_code, error = self.parse_code(code_data)
        # FIXME: Don't try to execute the code if it didn't parse
        code, output, error = self.execute_code(parsed_code)
        hop += 1
        if hop == self.max_iters:
            print("Max hops reached. Error persists.")
            return None
    input_kwargs.update({"final_generated_code": code, "code_output": output})
    answer_gen_result = self.generate_answer(**input_kwargs)
    return answer_gen_result

parse_code(code_data)

Source code in dspy/predict/program_of_thought.py
def parse_code(self, code_data):
    code = (
        code_data.get("generated_code", "").split("---", 1)[0].split("\n\n\n", 1)[0]
    )
    code_match = re.search(r"```python[ \n](.*?)[ \n]```?", code, re.DOTALL)
    code_block = (code_match.group(1) if code_match else code).replace("\\n", "\n")
    if not code_block:
        return code, "Error: Empty code after parsing."
    if "\n" not in code_block and code_block.count("=") > 1:
        return code, "Error: Code format is not correct."
    lines = code_block.split("\n")
    last_line_match = re.match(r"^(\w+)\s*=", lines[-1].strip())
    if last_line_match and len(lines) > 1:
        code_block += "\n" + last_line_match.group(1)
    else:
        code_block = re.sub(
            r"([a-zA-Z_]\w* *=.*?)(?=[a-zA-Z_]\w* *=)", r"\1\n", code_block,
        )
        code_block = re.sub(
            r"([a-zA-Z_]\w* *=.*?)([a-zA-Z_]\w*)$", r"\1\n\2", code_block,
        )
    return code_block, None