# Temporal

Temporal is a durable execution platform that guarantees application code runs to completion despite failures. The platform provides reliability, scalability, and visibility for long-running workflows and distributed applications.

This guide covers routing telemetry from Temporal workflows to InteractiveAI for monitoring, debugging, and evaluating AI agents and LLM-powered applications.

### Prerequisites

* InteractiveAI account with API credentials
* Temporal server (Cloud or local development server)
* OpenAI API key

***

### Installation

```bash
pip install temporalio openai openai-agents interactiveai openinference-instrumentation-openai-agents
```

***

### Configuration

Set environment variables for InteractiveAI, Temporal, and OpenAI:

```python
import os

# InteractiveAI credentials
# Obtain keys from Settings > API Keys in the dashboard
os.environ["INTERACTIVEAI_PUBLIC_KEY"] = "pk-ia-..."
os.environ["INTERACTIVEAI_SECRET_KEY"] = "sk-ia-..."
os.environ["INTERACTIVEAI_HOST"] = "https://app.interactiveai.com"

# OpenAI credentials
os.environ["OPENAI_API_KEY"] = "sk-proj-..."

# Temporal server configuration
os.environ["TEMPORAL_HOST"] = "localhost:7233"
os.environ.setdefault("TEMPORAL_NAMESPACE", "default")
os.environ.setdefault("TEMPORAL_TASK_QUEUE", "agents-task-queue")
```

***

### Enabling Trace Capture

Initialize the OpenAI Agents instrumentor to capture LLM operations:

```python
from openinference.instrumentation.openai_agents import OpenAIAgentsInstrumentor

OpenAIAgentsInstrumentor().instrument()
```

Initialize the InteractiveAI client:

```python
from interactiveai import Interactive
import os

client = Interactive(
    public_key=os.environ["INTERACTIVEAI_PUBLIC_KEY"],
    secret_key=os.environ["INTERACTIVEAI_SECRET_KEY"],
    host=os.environ.get("INTERACTIVEAI_HOST", "https://app.interactiveai.com")
)

if client.auth_check():
    print("Connection established")
else:
    print("Authentication failed - verify credentials")
```

***

### Example: Building a Research Pipeline

The following sections walk through a complete implementation of a multi-step research pipeline. Temporal handles workflow orchestration and failure recovery, while OpenAI agents perform the actual research tasks. InteractiveAI captures telemetry from both layers.

The pipeline works in three stages:

1. **Planning** - An agent analyzes the research query and generates search terms
2. **Searching** - Multiple agents execute web searches in parallel and summarize findings
3. **Writing** - A final agent synthesizes all results into a comprehensive report

Each section below builds on the previous one, starting with data models, then agent definitions, workflow logic, and finally execution.

### Step 1: Define Data Models

These Pydantic models define the structure for data passed between agents. The planner outputs search items, and the writer produces the final report.

```python
from pydantic import BaseModel

class WebSearchItem(BaseModel):
    reason: str
    query: str

class WebSearchPlan(BaseModel):
    searches: list[WebSearchItem]

class ReportData(BaseModel):
    short_summary: str
    markdown_report: str
    follow_up_questions: list[str]
```

### Step 2: Create Agent Factories

Each factory function returns a configured agent for a specific task. Separating these into functions allows the workflow to create fresh agent instances as needed.

```python
from agents import Agent, WebSearchTool
from agents.model_settings import ModelSettings

def new_planner_agent():
    """Creates an agent that breaks down a query into search terms."""
    return Agent(
        name="PlannerAgent",
        instructions=(
            "You are a research assistant. Given a query, generate a set of web searches "
            "to gather information. Output between 5 and 20 search terms."
        ),
        model="gpt-4o",
        output_type=WebSearchPlan,
    )

def new_search_agent():
    """Creates an agent that executes searches and summarizes results."""
    return Agent(
        name="SearchAgent",
        instructions=(
            "You are a research assistant. Given a search term, search the web and "
            "produce a concise 2-3 paragraph summary under 300 words. Capture main points only."
        ),
        tools=[WebSearchTool()],
        model_settings=ModelSettings(tool_choice="required"),
    )

def new_writer_agent():
    """Creates an agent that synthesizes research into a final report."""
    return Agent(
        name="WriterAgent",
        instructions=(
            "You are a senior researcher writing a cohesive report. You will receive the original "
            "query and initial research. Create an outline, then generate a detailed markdown report "
            "of at least 1000 words."
        ),
        model="gpt-4o",
        output_type=ReportData,
    )
```

### Step 3: Build the Research Manager

The ResearchManager class coordinates the three-stage pipeline. It creates agents, runs them in sequence, and handles parallel search execution.

```python
import asyncio
from temporalio import workflow
from agents import RunConfig, Runner, custom_span, gen_trace_id, trace

class ResearchManager:
    def __init__(self):
        self.run_config = RunConfig()
        self.planner_agent = new_planner_agent()
        self.search_agent = new_search_agent()
        self.writer_agent = new_writer_agent()

    async def run(self, query: str) -> str:
        """Executes the full research pipeline for a given query."""
        trace_id = gen_trace_id()
        with trace("Research trace", trace_id=trace_id):
            search_plan = await self._plan_searches(query)
            search_results = await self._perform_searches(search_plan)
            report = await self._write_report(query, search_results)

        return report.markdown_report

    async def _plan_searches(self, query: str) -> WebSearchPlan:
        """Stage 1: Generate search terms from the query."""
        result = await Runner.run(
            self.planner_agent,
            f"Query: {query}",
            run_config=self.run_config,
        )
        return result.final_output_as(WebSearchPlan)

    async def _perform_searches(self, search_plan: WebSearchPlan) -> list[str]:
        """Stage 2: Execute all searches in parallel and collect summaries."""
        with custom_span("Search the web"):
            tasks = [
                asyncio.create_task(self._search(item)) 
                for item in search_plan.searches
            ]
            results = []
            for task in workflow.as_completed(tasks):
                result = await task
                if result is not None:
                    results.append(result)
            return results

    async def _search(self, item: WebSearchItem) -> str | None:
        """Executes a single search and returns the summary."""
        input_text = f"Search term: {item.query}\nReason: {item.reason}"
        try:
            result = await Runner.run(
                self.search_agent,
                input_text,
                run_config=self.run_config,
            )
            return str(result.final_output)
        except Exception:
            return None

    async def _write_report(self, query: str, search_results: list[str]) -> ReportData:
        """Stage 3: Synthesize all search results into a final report."""
        input_text = f"Original query: {query}\nSearch results: {search_results}"
        result = await Runner.run(
            self.writer_agent,
            input_text,
            run_config=self.run_config,
        )
        return result.final_output_as(ReportData)
```

### Step 4: Define the Temporal Workflow

The workflow decorator registers ResearchWorkflow with Temporal. When triggered, it creates a ResearchManager and runs the pipeline.

```python
from temporalio import workflow

@workflow.defn
class ResearchWorkflow:
    @workflow.run
    async def run(self, query: str) -> str:
        return await ResearchManager().run(query)
```

### Step 5: Execute the Workflow

Before running, start a local Temporal development server:

```bash
temporal server start-dev
```

Then execute the workflow:

```python
import os
from temporalio.client import Client
from temporalio.worker import Worker, UnsandboxedWorkflowRunner
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin

async def main():
    tls = os.environ.get("TEMPORAL_TLS", "").lower() in ("1", "true", "yes")
    api_key = os.environ.get("TEMPORAL_API_KEY")

    plugin = OpenAIAgentsPlugin()

    # Connect to Temporal server
    client = await Client.connect(
        target_host=os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
        namespace=os.environ.get("TEMPORAL_NAMESPACE", "default"),
        api_key=api_key or None,
        tls=tls,
        plugins=[plugin]
    )

    # Create worker to process workflows
    worker = Worker(
        client,
        task_queue=os.environ.get("TEMPORAL_TASK_QUEUE", "agents-task-queue"),
        workflows=[ResearchWorkflow],
        workflow_runner=UnsandboxedWorkflowRunner()
    )

    # Start workflow and wait for result
    async with worker:
        handle = await client.start_workflow(
            ResearchWorkflow.run,
            id="research-workflow-01",
            task_queue=os.environ.get("TEMPORAL_TASK_QUEUE", "agents-task-queue"),
            args=["Best practices for building reliable distributed systems"],
        )
        result = await handle.result()
        print("\nWorkflow result:\n", result)

await main()
```

***

### Trace Visibility

The InteractiveAI dashboard displays:

* Workflow execution with timing and completion status
* Activity spans for each pipeline stage as nested traces
* LLM calls with prompts, completions, and token usage
* Cost estimates based on token consumption
* Latency breakdown across all components


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.interactive.ai/integrations/ai-frameworks/temporal.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
