Skip to content

dspy.streamify

dspy.streamify(program: Module) -> Callable[[Any, Any], Awaitable[Any]]

Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them all at once.

Parameters:

Name Type Description Default
program Module

The DSPy program to wrap with streaming functionality.

required

Returns:

Type Description
Callable[[Any, Any], Awaitable[Any]]

A function that takes the same arguments as the original program, but returns an async generator that yields the program's outputs incrementally.

Example:

class TestSignature(dspy.Signature):
    input_text: str = dspy.InputField()
    output_text: str = dspy.OutputField()

# Create the program and wrap it with streaming functionality
program = dspy.streamify(dspy.Predict(TestSignature))

# Use the program with streaming output
async def use_streaming():
    output_stream = program(input_text="Test")
    async for value in output_stream:
        print(value)  # Print each streamed value incrementally
Source code in dspy/utils/streaming.py
def streamify(program: Module) -> Callable[[Any, Any], Awaitable[Any]]:
    """
    Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them
    all at once.

    Args:
        program: The DSPy program to wrap with streaming functionality.

    Returns:
        A function that takes the same arguments as the original program, but returns an async
            generator that yields the program's outputs incrementally.

    Example:

    ```python
    class TestSignature(dspy.Signature):
        input_text: str = dspy.InputField()
        output_text: str = dspy.OutputField()

    # Create the program and wrap it with streaming functionality
    program = dspy.streamify(dspy.Predict(TestSignature))

    # Use the program with streaming output
    async def use_streaming():
        output_stream = program(input_text="Test")
        async for value in output_stream:
            print(value)  # Print each streamed value incrementally
    ```
    """
    import dspy

    if not iscoroutinefunction(program):
        program = asyncify(program)

    async def generator(args, kwargs, stream: MemoryObjectSendStream):
        with dspy.settings.context(send_stream=stream):
            prediction = await program(*args, **kwargs)

        await stream.send(prediction)

    async def streamer(*args, **kwargs):
        send_stream, receive_stream = create_memory_object_stream(16)
        async with create_task_group() as tg, send_stream, receive_stream:
            tg.start_soon(generator, args, kwargs, send_stream)

            async for value in receive_stream:
                yield value
                if isinstance(value, Prediction):
                    return

    return streamer