Skip to content

dspy.Parallel

dspy.Parallel(num_threads: int | None = None, max_errors: int | None = None, access_examples: bool = True, return_failed_examples: bool = False, provide_traceback: bool | None = None, disable_progress_bar: bool = False, timeout: int = 120, straggler_limit: int = 3)

A utility class for parallel, multi-threaded execution of (module, example) pairs. Supports various example formats (e.g., Example, dict, tuple, list), robust error handling, optional progress tracking, and can optionally return failed examples and exceptions.

Parameters:

Name Type Description Default
num_threads Optional[int]

The number of threads to use. Defaults to settings.num_threads.

None
max_errors Optional[int]

The maximum number of errors allowed before raising an exception. Defaults to settings.max_errors.

None
access_examples bool

Whether to unpack Example objects via .inputs(). Defaults to True.

True
return_failed_examples bool

Whether to return failed examples. Defaults to False.

False
provide_traceback Optional[bool]

Whether to provide traceback. Defaults to None.

None
disable_progress_bar bool

Whether to disable progress bar. Defaults to False.

False
Example
import dspy
from dspy import Parallel
lm = dspy.LM("openai/gpt-4o-mini")
dspy.configure(lm=lm)

examples = [
    {"question": "What is the capital of Spain?"},
    {"question": "What is 3 * 4?"},
    {"question": "Who wrote Hamlet?"},
]

module = dspy.Predict("question->answer")
exec_pairs = [(module, example) for example in examples]
parallel = Parallel(num_threads=3, disable_progress_bar=False)
results = parallel(exec_pairs)
for i, result in enumerate(results):
    print(f"Result {i+1}: {result.answer}")

# Expected Output:
# Result 1: Madrid
# Result 2: 12
# Result 3: William Shakespeare
Source code in .venv/lib/python3.14/site-packages/dspy/predict/parallel.py
def __init__(
    self,
    num_threads: int | None = None,
    max_errors: int | None = None,
    access_examples: bool = True,
    return_failed_examples: bool = False,
    provide_traceback: bool | None = None,
    disable_progress_bar: bool = False,
    timeout: int = 120,
    straggler_limit: int = 3,
):
    """
    A utility class for parallel, multi-threaded execution of (module, example) pairs.
    Supports various example formats (e.g., `Example`, dict, tuple, list), robust error handling,
    optional progress tracking, and can optionally return failed examples and exceptions.

    Args:
        num_threads (Optional[int]): The number of threads to use. Defaults to `settings.num_threads`.
        max_errors (Optional[int]): The maximum number of errors allowed before raising an exception. Defaults to `settings.max_errors`.
        access_examples (bool): Whether to unpack `Example` objects via `.inputs()`. Defaults to True.
        return_failed_examples (bool): Whether to return failed examples. Defaults to False.
        provide_traceback (Optional[bool]): Whether to provide traceback. Defaults to None.
        disable_progress_bar (bool): Whether to disable progress bar. Defaults to False.

    Example:
        ```python
        import dspy
        from dspy import Parallel
        lm = dspy.LM("openai/gpt-4o-mini")
        dspy.configure(lm=lm)

        examples = [
            {"question": "What is the capital of Spain?"},
            {"question": "What is 3 * 4?"},
            {"question": "Who wrote Hamlet?"},
        ]

        module = dspy.Predict("question->answer")
        exec_pairs = [(module, example) for example in examples]
        parallel = Parallel(num_threads=3, disable_progress_bar=False)
        results = parallel(exec_pairs)
        for i, result in enumerate(results):
            print(f"Result {i+1}: {result.answer}")

        # Expected Output:
        # Result 1: Madrid
        # Result 2: 12
        # Result 3: William Shakespeare
        ```
    """

    super().__init__()
    self.num_threads = num_threads or settings.num_threads
    self.max_errors = settings.max_errors if max_errors is None else max_errors
    self.access_examples = access_examples
    self.return_failed_examples = return_failed_examples
    self.provide_traceback = provide_traceback
    self.disable_progress_bar = disable_progress_bar
    self.timeout = timeout
    self.straggler_limit = straggler_limit

    self.error_count = 0
    self.error_lock = threading.Lock()
    self.cancel_jobs = threading.Event()
    self.failed_examples = []
    self.exceptions = []

Functions

__call__(*args: Any, **kwargs: Any) -> Any

Source code in .venv/lib/python3.14/site-packages/dspy/predict/parallel.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    return self.forward(*args, **kwargs)

forward(exec_pairs: list[tuple[Any, Example]], num_threads: int | None = None) -> list[Any]

Source code in .venv/lib/python3.14/site-packages/dspy/predict/parallel.py
def forward(self, exec_pairs: list[tuple[Any, Example]], num_threads: int | None = None) -> list[Any]:
    num_threads = num_threads if num_threads is not None else self.num_threads

    executor = ParallelExecutor(
        num_threads=num_threads,
        max_errors=self.max_errors,
        provide_traceback=self.provide_traceback,
        disable_progress_bar=self.disable_progress_bar,
        timeout=self.timeout,
        straggler_limit=self.straggler_limit,
    )

    def process_pair(pair):
        result = None
        module, example = pair

        if isinstance(example, Example):
            if self.access_examples:
                result = module(**example.inputs())
            else:
                result = module(example)
        elif isinstance(example, dict):
            result = module(**example)
        elif isinstance(example, list) and module.__class__.__name__ == "Parallel":
            result = module(example)
        elif isinstance(example, tuple):
            result = module(*example)
        else:
            raise ValueError(
                f"Invalid example type: {type(example)}, only supported types are Example, dict, list and tuple"
            )
        return result

    # Execute the processing function over the execution pairs
    results = executor.execute(process_pair, exec_pairs)

    # Populate failed examples and exceptions from the executor
    if self.return_failed_examples:
        for failed_idx in executor.failed_indices:
            if failed_idx < len(exec_pairs):
                _, original_example = exec_pairs[failed_idx]
                self.failed_examples.append(original_example)
                if exception := executor.exceptions_map.get(failed_idx):
                    self.exceptions.append(exception)

        return results, self.failed_examples, self.exceptions
    else:
        return results