def forward(self, exec_pairs: List[Tuple[Any, Example]], num_threads: int = None) -> List[Any]:
num_threads = num_threads if num_threads is not None else self.num_threads
executor = ParallelExecutor(
num_threads=num_threads,
max_errors=self.max_errors,
provide_traceback=self.provide_traceback,
disable_progress_bar=self.disable_progress_bar,
)
def process_pair(pair):
result = None
module, example = pair
if isinstance(example, Example):
result = module(**example.inputs())
elif isinstance(example, dict):
result = module(**example)
elif isinstance(example, list) and module.__class__.__name__ == "Parallel":
result = module(example)
elif isinstance(example, tuple):
result = module(*example)
else:
raise ValueError(f"Invalid example type: {type(example)}, only supported types are Example, dict, list and tuple")
return result
# Execute the processing function over the execution pairs
results = executor.execute(process_pair, exec_pairs)
if self.return_failed_examples:
return results, self.failed_examples, self.exceptions
else:
return results