> ## Documentation Index
> Fetch the complete documentation index at: https://agno-v2-studio-tools-doc.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Running Workflows

> Execute workflows with Workflow.run() and process their output.

The `Workflow.run()` function runs the agent and generates a response, either as a `WorkflowRunOutput` object or a stream of `WorkflowRunOutput` objects.

Many of our examples use `workflow.print_response()` which is a helper utility to print the response in the terminal. This uses `workflow.run()` under the hood.

## Running your Workflow

Here's how to run your workflow. The response is captured in the `response`.

```python theme={null}
from agno.agent import Agent
from agno.models.openai import OpenAIResponses
from agno.db.sqlite import SqliteDb
from agno.team import Team
from agno.tools.hackernews import HackerNewsTools
from agno.tools.yfinance import YFinanceTools
from agno.workflow import Step, Workflow
from agno.run.workflow import WorkflowRunOutput
from agno.utils.pprint import pprint_run_response

# Define agents
hackernews_agent = Agent(
    name="Hackernews Agent",
    model=OpenAIResponses(id="gpt-5.2"),
    tools=[HackerNewsTools()],
    role="Extract key insights and content from Hackernews posts",
)
finance_agent = Agent(
    name="Finance Agent",
    model=OpenAIResponses(id="gpt-5.2"),
    tools=[YFinanceTools()],
    role="Get stock prices and financial data",
)

# Define research team for complex analysis
research_team = Team(
    name="Research Team",
    members=[hackernews_agent, finance_agent],
    instructions="Research tech topics and related stocks",
)

content_planner = Agent(
    name="Content Planner",
    model=OpenAIResponses(id="gpt-5.2"),
    instructions=[
        "Plan a content schedule over 4 weeks for the provided topic and research content",
        "Ensure that I have posts for 3 posts per week",
    ],
)

content_creation_workflow = Workflow(
    name="Content Creation Workflow",
    description="Automated content creation from blog posts to social media",
    db=SqliteDb(db_file="tmp/workflow.db"),
    steps=[research_team, content_planner],
)

# Create and use workflow
if __name__ == "__main__":
    response: WorkflowRunOutput = content_creation_workflow.run(
        input="AI trends in 2024",
        markdown=True,
    )

    pprint_run_response(response, markdown=True)
```

<Note>
  The `Workflow.run()` function returns a `WorkflowRunOutput` object when not
  streaming. Here is detailed documentation for
  [WorkflowRunOutput](/reference/workflows/run-output).
</Note>

## Async Execution

The `Workflow.arun()` function is the async version of `Workflow.run()`.

Here is an example of how to use it:

```python theme={null}
from typing import AsyncIterator
import asyncio

from agno.agent import Agent
from agno.tools.hackernews import HackerNewsTools
from agno.workflow import Condition, Step, Workflow, StepInput
from agno.run.workflow import WorkflowRunOutput, WorkflowRunOutputEvent, WorkflowRunEvent

# === BASIC AGENTS ===
researcher = Agent(
    name="Researcher",
    instructions="Research the given topic and provide detailed findings.",
    tools=[HackerNewsTools()],
)

summarizer = Agent(
    name="Summarizer",
    instructions="Create a clear summary of the research findings.",
)

fact_checker = Agent(
    name="Fact Checker",
    instructions="Verify facts and check for accuracy in the research.",
    tools=[HackerNewsTools()],
)

writer = Agent(
    name="Writer",
    instructions="Write a comprehensive article based on all available research and verification.",
)

# === CONDITION EVALUATOR ===
def needs_fact_checking(step_input: StepInput) -> bool:
    """Determine if the research contains claims that need fact-checking"""
    summary = step_input.previous_step_content or ""

    # Look for keywords that suggest factual claims
    fact_indicators = [
        "study shows",
        "breakthroughs",
        "research indicates",
        "according to",
        "statistics",
        "data shows",
        "survey",
        "report",
        "million",
        "billion",
        "percent",
        "%",
        "increase",
        "decrease",
    ]

    return any(indicator in summary.lower() for indicator in fact_indicators)


# === WORKFLOW STEPS ===
research_step = Step(
    name="research",
    description="Research the topic",
    agent=researcher,
)

summarize_step = Step(
    name="summarize",
    description="Summarize research findings",
    agent=summarizer,
)

# Conditional fact-checking step
fact_check_step = Step(
    name="fact_check",
    description="Verify facts and claims",
    agent=fact_checker,
)

write_article = Step(
    name="write_article",
    description="Write final article",
    agent=writer,
)

# === BASIC LINEAR WORKFLOW ===
basic_workflow = Workflow(
    name="Basic Linear Workflow",
    description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
    steps=[
        research_step,
        summarize_step,
        Condition(
            name="fact_check_condition",
            description="Check if fact-checking is needed",
            evaluator=needs_fact_checking,
            steps=[fact_check_step],
        ),
        write_article,
    ],
)

async def main():
    try:
        response: WorkflowRunOutput = await basic_workflow.arun(
            input="Recent breakthroughs in quantum computing",
        )
        pprint_run_response(response, markdown=True)
    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    asyncio.run(main())
```

## Streaming Responses

To enable streaming, set `stream=True` when calling `run()`. This will return an iterator of `WorkflowRunOutputEvent` objects instead of a single response.

```python theme={null}
# Define your agents/team
...

content_creation_workflow = Workflow(
    name="Content Creation Workflow",
    description="Automated content creation from blog posts to social media",
    db=SqliteDb(db_file="tmp/workflow.db"),
    steps=[research_team, content_planner],
)

# Create and use workflow
if __name__ == "__main__":
    response: Iterator[WorkflowRunOutputEvent] = content_creation_workflow.run(
        input="AI trends in 2024",
        markdown=True,
        stream=True,
    )

    pprint_run_response(response, markdown=True)
```

### Streaming all events

By default, when you stream a response, only the `WorkflowStartedEvent` and `WorkflowCompletedEvent` events will be streamed (together with all the Agent and Team events).

You can also stream all events by setting `stream_events=True`.

This will provide real-time updates about the workflow's internal processes:

```python theme={null}
from typing import Iterator

from agno.agent import Agent
from agno.tools.hackernews import HackerNewsTools
from agno.workflow import Condition, Step, Workflow, StepInput
from agno.run.workflow import WorkflowRunOutput, WorkflowRunOutputEvent, WorkflowRunEvent

# === BASIC AGENTS ===
researcher = Agent(
    name="Researcher",
    instructions="Research the given topic and provide detailed findings.",
    tools=[HackerNewsTools()],
)

summarizer = Agent(
    name="Summarizer",
    instructions="Create a clear summary of the research findings.",
)

fact_checker = Agent(
    name="Fact Checker",
    instructions="Verify facts and check for accuracy in the research.",
    tools=[HackerNewsTools()],
)

writer = Agent(
    name="Writer",
    instructions="Write a comprehensive article based on all available research and verification.",
)

# === CONDITION EVALUATOR ===
def needs_fact_checking(step_input: StepInput) -> bool:
    """Determine if the research contains claims that need fact-checking"""
    summary = step_input.previous_step_content or ""

    # Look for keywords that suggest factual claims
    fact_indicators = [
        "study shows",
        "breakthroughs",
        "research indicates",
        "according to",
        "statistics",
        "data shows",
        "survey",
        "report",
        "million",
        "billion",
        "percent",
        "%",
        "increase",
        "decrease",
    ]

    return any(indicator in summary.lower() for indicator in fact_indicators)


# === WORKFLOW STEPS ===
research_step = Step(
    name="research",
    description="Research the topic",
    agent=researcher,
)

summarize_step = Step(
    name="summarize",
    description="Summarize research findings",
    agent=summarizer,
)

# Conditional fact-checking step
fact_check_step = Step(
    name="fact_check",
    description="Verify facts and claims",
    agent=fact_checker,
)

write_article = Step(
    name="write_article",
    description="Write final article",
    agent=writer,
)

# === BASIC LINEAR WORKFLOW ===
basic_workflow = Workflow(
    name="Basic Linear Workflow",
    description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
    steps=[
        research_step,
        summarize_step,
        Condition(
            name="fact_check_condition",
            description="Check if fact-checking is needed",
            evaluator=needs_fact_checking,
            steps=[fact_check_step],
        ),
        write_article,
    ],
)

if __name__ == "__main__":
    try:
        response: Iterator[WorkflowRunOutputEvent] = basic_workflow.run(
            input="Recent breakthroughs in quantum computing",
            stream=True,
            stream_events=True,
        )
        for event in response:
            if event.event == WorkflowRunEvent.condition_execution_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.condition_execution_completed.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.workflow_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.step_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.step_completed.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.workflow_completed.value:
                print(event)
                print()
    except Exception as e:
        print(f"Error: {e}")
        import traceback

        traceback.print_exc()
```

### Streaming Executor Events

The events from Agents and Teams used inside your workflow are automatically yielded during the streaming of a Workflow. You can choose not to stream these executor events by setting `stream_executor_events=False`.

The following Workflow events will be streamed in all cases:

* `WorkflowStarted`
* `WorkflowCompleted`
* `StepStarted`
* `StepCompleted`

See the following example:

```python theme={null}
from agno.agent import Agent
from agno.models.openai import OpenAIResponses
from agno.workflow.step import Step
from agno.workflow.workflow import Workflow

agent = Agent(
    name="ResearchAgent",
    model=OpenAIResponses(id="gpt-5.2"),
    instructions="You are a helpful research assistant. Be concise.",
)

workflow = Workflow(
    name="Research Workflow",
    steps=[Step(name="Research", agent=agent)],
    stream=True,
    stream_executor_events=False,  # <- Filter out internal executor events
)

print("\n" + "=" * 70)
print("Workflow Streaming Example: stream_executor_events=False")
print("=" * 70)
print(
    "\nThis will show only workflow and step events and will not yield RunContent and TeamRunContent events"
)
print("filtering out internal agent/team events for cleaner output.\n")

# Run workflow and display events
for event in workflow.run(
    "What is Python?",
    stream=True,
    stream_events=True,
):
    event_name = event.event if hasattr(event, "event") else type(event).__name__
    print(f"  → {event_name}")
```

### Async Streaming

The `Workflow.arun(stream=True)` returns an async iterator of `WorkflowRunOutputEvent` objects instead of a single response.
So for example, if you want to stream the response, you can do the following:

```Python theme={null}

# Define your workflow
...

async def main():
    try:
        response: AsyncIterator[WorkflowRunOutputEvent] = basic_workflow.arun(
            message="Recent breakthroughs in quantum computing",
            stream=True,
            stream_events=True,
        )
        async for event in response:
            if event.event == WorkflowRunEvent.condition_execution_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.condition_execution_completed.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.workflow_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.step_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.step_completed.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.workflow_completed.value:
                print(event)
                print()
    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    asyncio.run(main())
```

See the [Async Streaming](/workflows/usage/async-events-streaming) example for more details.

### Event Types

The following events are yielded by the `Workflow.run()` and `Workflow.arun()` functions depending on the workflow's configuration:

#### Core Events

| Event Type          | Description                                         |
| ------------------- | --------------------------------------------------- |
| `WorkflowStarted`   | Indicates the start of a workflow run               |
| `WorkflowCompleted` | Signals successful completion of the workflow run   |
| `WorkflowError`     | Indicates an error occurred during the workflow run |

#### Step Events

| Event Type      | Description                               |
| --------------- | ----------------------------------------- |
| `StepStarted`   | Indicates the start of a step             |
| `StepCompleted` | Signals successful completion of a step   |
| `StepError`     | Indicates an error occurred during a step |

#### Step Output Events (For custom functions)

| Event Type   | Description                    |
| ------------ | ------------------------------ |
| `StepOutput` | Indicates the output of a step |

#### Parallel Execution Events

| Event Type                   | Description                                      |
| ---------------------------- | ------------------------------------------------ |
| `ParallelExecutionStarted`   | Indicates the start of a parallel step           |
| `ParallelExecutionCompleted` | Signals successful completion of a parallel step |

#### Condition Execution Events

| Event Type                    | Description                                  |
| ----------------------------- | -------------------------------------------- |
| `ConditionExecutionStarted`   | Indicates the start of a condition           |
| `ConditionExecutionCompleted` | Signals successful completion of a condition |

#### Loop Execution Events

| Event Type                    | Description                                       |
| ----------------------------- | ------------------------------------------------- |
| `LoopExecutionStarted`        | Indicates the start of a loop                     |
| `LoopIterationStartedEvent`   | Indicates the start of a loop iteration           |
| `LoopIterationCompletedEvent` | Signals successful completion of a loop iteration |
| `LoopExecutionCompleted`      | Signals successful completion of a loop           |

#### Router Execution Events

| Event Type                 | Description                               |
| -------------------------- | ----------------------------------------- |
| `RouterExecutionStarted`   | Indicates the start of a router           |
| `RouterExecutionCompleted` | Signals successful completion of a router |

#### Steps Execution Events

| Event Type                | Description                                        |
| ------------------------- | -------------------------------------------------- |
| `StepsExecutionStarted`   | Indicates the start of `Steps` being executed      |
| `StepsExecutionCompleted` | Signals successful completion of `Steps` execution |

See detailed documentation in the [WorkflowRunOutputEvent](/reference/workflows/run-output) documentation.

### Storing Events

Workflows can automatically store all execution events for analysis, debugging, and audit purposes. Filter specific event types to reduce noise and storage overhead while maintaining essential execution records.

Access stored events via `workflow.run_response.events` and in the `runs` column of your workflow's session database (SQLite, PostgreSQL, etc.).

* `store_events=True`: Automatically stores all workflow events in the database
* `events_to_skip=[]`: Filter out specific event types to reduce storage and noise

Access all stored events via `workflow.run_response.events`

**Available Events to Skip:**

```python theme={null}
from agno.run.workflow import WorkflowRunEvent

# Common events you might want to skip
events_to_skip = [
    WorkflowRunEvent.workflow_started,
    WorkflowRunEvent.workflow_completed,
    WorkflowRunEvent.workflow_cancelled,
    WorkflowRunEvent.step_started,
    WorkflowRunEvent.step_completed,
    WorkflowRunEvent.parallel_execution_started,
    WorkflowRunEvent.parallel_execution_completed,
    WorkflowRunEvent.condition_execution_started,
    WorkflowRunEvent.condition_execution_completed,
    WorkflowRunEvent.loop_execution_started,
    WorkflowRunEvent.loop_execution_completed,
    WorkflowRunEvent.router_execution_started,
    WorkflowRunEvent.router_execution_completed,
]
```

**Use Cases**

* **Debugging**: Store all events to analyze workflow execution flow
* **Audit Trails**: Keep records of all workflow activities for compliance
* **Performance Analysis**: Analyze timing and execution patterns
* **Error Investigation**: Review event sequences leading to failures
* **Noise Reduction**: Skip verbose events like `step_started` to focus on results

**Configuration Examples**

```python theme={null}
# store everything
debug_workflow = Workflow(
    name="Debug Workflow",
    store_events=True,
    steps=[...]
)

# store only important events
production_workflow = Workflow(
    name="Production Workflow",
    store_events=True,
    events_to_skip=[
        WorkflowRunEvent.step_started,
        WorkflowRunEvent.parallel_execution_started,
        # keep step_completed and workflow_completed
    ],
    steps=[...]
)

# No event storage
fast_workflow = Workflow(
    name="Fast Workflow",
    store_events=False,
    steps=[...]
)
```

<Tip>
  See this [example](/workflows/usage/store-events-and-events-to-skip-in-a-workflow) for more information.
</Tip>

## Agno Telemetry

Agno logs which model an workflow used so we can prioritize updates to the most popular providers. You can disable this by setting `AGNO_TELEMETRY=false` in your environment or by setting `telemetry=False` on the workflow.

```bash theme={null}
export AGNO_TELEMETRY=false
```

or:

```python theme={null}
workflow = Workflow(..., telemetry=False)
```

See the [Workflow class reference](/reference/workflows/workflow) for more details.

## Developer Resources

* [Workflow reference](/reference/workflows/workflow)
* [WorkflowRunOutput schema](/reference/workflows/run-output)
* [Cookbook](https://github.com/agno-agi/agno/tree/main/cookbook/05_workflows/README.md)
