IntegrationsFrameworksTemporal
This is a Jupyter notebook

Trace Temporal Workflows with Langfuse

This notebook demonstrates how to integrate Langfuse into your Temporal workflows to monitor, debug, and evaluate your AI agents and LLM-powered applications.

What is Temporal?: Temporal is a durable execution platform that guarantees the execution of your application code, even in the presence of failures. It provides reliability, scalability, and visibility into long-running workflows and distributed applications.

What is Langfuse?: Langfuse is an open-source observability platform for AI agents and LLM applications. It helps you visualize and monitor LLM calls, tool usage, cost, latency, and more.

Use Case: Deep Research Agent with Temporal

In this example, we’ll build a deep research agent that:

  • Uses Temporal workflows to orchestrate long-running research tasks
  • Leverages the OpenAI Agents SDK for research planning and content generation
  • Sends all observability data to Langfuse via OpenTelemetry

This setup allows you to:

  • Track workflow execution: See all workflow runs, activities, and their status
  • Monitor LLM calls: View prompts, completions, token usage, and costs
  • Debug failures: Identify bottlenecks and errors in your research pipeline
  • Evaluate quality: Assess the quality of research outputs over time

1. Install Dependencies

Install Temporal SDK, OpenTelemetry packages, and Langfuse:

%pip install temporalio openai langfuse openinference-instrumentation-openai-agents

2. Configure Environment & API Keys

Set up your Langfuse, Temporal, and OpenAI credentials. You can get Langfuse keys by signing up for a free Langfuse Cloud account or by self-hosting Langfuse.

import os
 
# Get keys for your project from the project settings page: https://cloud.langfuse.com
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..." 
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..." 
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com" # 🇪🇺 EU region
# os.environ["LANGFUSE_HOST"] = "https://us.cloud.langfuse.com" # 🇺🇸 US region
 
# Your openai key
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
 
# Temporal server address (use Temporal Cloud or local dev server)
os.environ["TEMPORAL_HOST"] = "localhost:7233"
os.environ.setdefault("TEMPORAL_NAMESPACE", "default")
os.environ.setdefault("TEMPORAL_TASK_QUEUE", "agents-task-queue")

3. OpenTelemetry Tracing for OpenAI Agents

Use the OpenAIAgentsInstrumentor library to wrap the OpenaAI Agents SDK and send OpenTelemetry spans to Langfuse.

from openinference.instrumentation.openai_agents import OpenAIAgentsInstrumentor
 
OpenAIAgentsInstrumentor().instrument()

4. Initialize Langfuse Client

Verify the Langfuse connection:

from langfuse import get_client
 
langfuse = get_client()
 
# Verify connection
if langfuse.auth_check():
    print("✅ Langfuse client is authenticated and ready!")
else:
    print("❌ Authentication failed. Please check your credentials and host.")

5. Define Temporal Activities

Create activities that will be executed as part of the research workflow. Each activity represents a discrete step in the research process.

from __future__ import annotations
 
import asyncio
from temporalio import workflow
from agents import Agent, RunConfig, Runner, WebSearchTool, custom_span, gen_trace_id, trace
from agents.model_settings import ModelSettings
from pydantic import BaseModel
 
# Planner Agent Models
class WebSearchItem(BaseModel):
    reason: str
    "Your reasoning for why this search is important to the query."
 
    query: str
    "The search term to use for the web search."
 
class WebSearchPlan(BaseModel):
    searches: list[WebSearchItem]
    """A list of web searches to perform to best answer the query."""
 
# Writer Agent Models
class ReportData(BaseModel):
    short_summary: str
    """A short 2-3 sentence summary of the findings."""
 
    markdown_report: str
    """The final report"""
 
    follow_up_questions: list[str]
    """Suggested topics to research further"""
 
# Agent factory functions
def new_planner_agent():
    return Agent(
        name="PlannerAgent",
        instructions=(
            "You are a helpful research assistant. Given a query, come up with a set of web searches "
            "to perform to best answer the query. Output between 5 and 20 terms to query for."
        ),
        model="gpt-4o",
        output_type=WebSearchPlan,
    )
 
def new_search_agent():
    return Agent(
        name="Search agent",
        instructions=(
            "You are a research assistant. Given a search term, you search the web for that term and "
            "produce a concise summary of the results. The summary must 2-3 paragraphs and less than 300 "
            "words. Capture the main points. Write succinctly, no need to have complete sentences or good "
            "grammar. This will be consumed by someone synthesizing a report, so its vital you capture the "
            "essence and ignore any fluff. Do not include any additional commentary other than the summary "
            "itself."
        ),
        tools=[WebSearchTool()],
        model_settings=ModelSettings(tool_choice="required"),
    )
 
def new_writer_agent():
    return Agent(
        name="WriterAgent",
        instructions=(
            "You are a senior researcher tasked with writing a cohesive report for a research query. "
            "You will be provided with the original query, and some initial research done by a research "
            "assistant.\n"
            "You should first come up with an outline for the report that describes the structure and "
            "flow of the report. Then, generate the report and return that as your final output.\n"
            "The final output should be in markdown format, and it should be lengthy and detailed. Aim "
            "for 5-10 pages of content, at least 1000 words."
        ),
        model="o3-mini",
        output_type=ReportData,
    )

6. Define Temporal Workflow

Create a workflow that orchestrates the research activities. Temporal ensures the workflow executes reliably, even if failures occur.

class ResearchManager:
    def __init__(self):
        self.run_config = RunConfig()
        self.search_agent = new_search_agent()
        self.planner_agent = new_planner_agent()
        self.writer_agent = new_writer_agent()
 
    async def run(self, query: str) -> str:
        trace_id = gen_trace_id()
        with trace("Research trace", trace_id=trace_id):
            search_plan = await self._plan_searches(query)
            search_results = await self._perform_searches(search_plan)
            report = await self._write_report(query, search_results)
 
        return report.markdown_report
 
    async def _plan_searches(self, query: str) -> WebSearchPlan:
        result = await Runner.run(
            self.planner_agent,
            f"Query: {query}",
            run_config=self.run_config,
        )
        return result.final_output_as(WebSearchPlan)
 
    async def _perform_searches(self, search_plan: WebSearchPlan) -> list[str]:
        with custom_span("Search the web"):
            num_completed = 0
            tasks = [
                asyncio.create_task(self._search(item)) for item in search_plan.searches
            ]
            results = []
            for task in workflow.as_completed(tasks):
                result = await task
                if result is not None:
                    results.append(result)
                num_completed += 1
            return results
 
    async def _search(self, item: WebSearchItem) -> str | None:
        input = f"Search term: {item.query}\nReason for searching: {item.reason}"
        try:
            result = await Runner.run(
                self.search_agent,
                input,
                run_config=self.run_config,
            )
            return str(result.final_output)
        except Exception:
            return None
 
    async def _write_report(self, query: str, search_results: list[str]) -> ReportData:
        input = f"Original query: {query}\nSummarized search results: {search_results}"
        result = await Runner.run(
            self.writer_agent,
            input,
            run_config=self.run_config,
        )
 
        return result.final_output_as(ReportData)
@workflow.defn
class ResearchWorkflow:
    @workflow.run
    async def run(self, query: str) -> str:
        return await ResearchManager().run(query)

7. Run the Workflow

As of Step 3, only OpenAI Agents executions send OpenTelemetry spans to Langfuse by default. To also capture Temporal workflow steps, enable Temporal’s OpenTelemetry integration by importing:

from temporalio.contrib.opentelemetry import TracingInterceptor

and adding interceptors=[TracingInterceptor()] to the Temporal client and worker.

When you execute the research workflow, both Temporal and OpenAI Agents will send OTel spans to Langfuse.

Note: This requires a running Temporal server. You can start a local dev server with:

temporal server start-dev
import sys
import os
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
from temporalio.worker import UnsandboxedWorkflowRunner
 
async def main():
    tls = os.environ.get("TEMPORAL_TLS", "").lower() in ("1", "true", "yes")
    api_key = os.environ.get("TEMPORAL_API_KEY")
 
    plugin = OpenAIAgentsPlugin()
 
    client = await Client.connect(
        target_host=os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
        namespace=os.environ.get("TEMPORAL_NAMESPACE", "default"),
        api_key=api_key or None,
        tls=tls,
        plugins=[plugin],
        interceptors=[TracingInterceptor()] # Expose Temporal OTel spans
    )
 
    worker = Worker(
        client,
        task_queue=os.environ.get("TEMPORAL_TASK_QUEUE", "openai-agents-task-queue"),
        workflows=[ResearchWorkflow],
        workflow_runner=UnsandboxedWorkflowRunner(),
        interceptors=[TracingInterceptor()],  # Expose Temporal OTel spans
    )
 
    async with worker:
        handle = await client.start_workflow(
            ResearchWorkflow.run,
            id="research-workflow-01",
            task_queue=os.environ.get("TEMPORAL_TASK_QUEUE", "openai-agents-task-queue"),
            args=["Caribbean vacation spots in April, optimizing for surfing, hiking and water sports"],
        )
        result = await handle.result()
        print("\nWorkflow result:\n", result)
 
await main()

8. View Traces in Langfuse

After running the workflow, you can view the complete trace in Langfuse. The trace will show:

  • Workflow execution: The entire ResearchWorkflow with timing and status
  • Activity spans: Each activity (plan_research, execute_research) as nested spans
  • LLM calls: OpenAI API calls with prompts, completions, and token usage
  • Cost tracking: Estimated costs based on token usage
  • Latency metrics: Time spent in each component

Example trace in Langfuse

Example Trace: View in Langfuse

Was this page helpful?