Skip to main content
The DeerFlow agent system uses a sophisticated middleware chain that processes every agent invocation through 11 specialized middleware components. Each middleware executes at specific lifecycle hooks (before_agent, after_agent, before_model, after_model, wrap_model_call, wrap_tool_call) to augment agent behavior without modifying core logic.

Execution Order

Middlewares execute in strict order defined in backend/src/agents/lead_agent/agent.py:217-250:
middlewares = [
    ThreadDataMiddleware(),
    UploadsMiddleware(),
    SandboxMiddleware(),
    DanglingToolCallMiddleware(),
    # Conditionally added:
    SummarizationMiddleware(),  # if enabled
    TodoListMiddleware(),        # if is_plan_mode
    TitleMiddleware(),
    MemoryMiddleware(),
    ViewImageMiddleware(),       # if model supports vision
    SubagentLimitMiddleware(),   # if subagent_enabled
    ClarificationMiddleware()    # must be last
]

Middleware Components

1. ThreadDataMiddleware

Purpose: Creates per-thread isolated directory structure for workspace, uploads, and output files. Lifecycle: before_agent Implementation (backend/src/agents/middlewares/thread_data_middleware.py):
class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
    def __init__(self, base_dir: str | None = None, lazy_init: bool = True):
        # lazy_init=True: Only compute paths, defer directory creation
        # lazy_init=False: Eagerly create directories
        self._paths = Paths(base_dir) if base_dir else get_paths()
        self._lazy_init = lazy_init

    def before_agent(self, state, runtime):
        thread_id = runtime.context.get("thread_id")
        if self._lazy_init:
            paths = self._get_thread_paths(thread_id)
        else:
            paths = self._create_thread_directories(thread_id)
        
        return {
            "thread_data": {
                "workspace_path": str(paths["workspace_path"]),
                "uploads_path": str(paths["uploads_path"]),
                "outputs_path": str(paths["outputs_path"])
            }
        }
Directory Structure Created:
backend/.deer-flow/threads/{thread_id}/user-data/
├── workspace/  # Agent's working directory
├── uploads/    # User-uploaded files
└── outputs/    # Files presented to user via present_files tool

2. UploadsMiddleware

Purpose: Injects uploaded file information into the conversation, tracking new uploads across turns. Lifecycle: before_agent Implementation (backend/src/agents/middlewares/uploads_middleware.py:139-220):
class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
    def before_agent(self, state, runtime):
        thread_id = runtime.context.get("thread_id")
        messages = state.get("messages", [])
        
        # Track previously shown files from message history
        shown_files = self._extract_files_from_previous_messages(messages[:-1])
        
        # List only newly uploaded files
        new_files = self._list_newly_uploaded_files(thread_id, shown_files)
        
        if new_files:
            # Prepend file list to last human message
            files_message = self._create_files_message(new_files)
            updated_message = HumanMessage(
                content=f"{files_message}\n\n{original_content}"
            )
            messages[-1] = updated_message
        
        return {"uploaded_files": new_files, "messages": messages}
Key Features:
  • Deduplicates files already shown in previous turns
  • Formats file list with size and virtual path: /mnt/user-data/uploads/{filename}
  • Supports filenames with spaces via regex r"^-\s+(.+?)\s*\("

3. SandboxMiddleware

Purpose: Acquires and manages isolated execution environments for agent tool calls. Lifecycle: before_agent Implementation (backend/src/sandbox/middleware.py:18-61):
class SandboxMiddleware(AgentMiddleware[SandboxMiddlewareState]):
    def __init__(self, lazy_init: bool = True):
        # lazy_init=True: Acquire on first tool call
        # lazy_init=False: Acquire in before_agent()
        self._lazy_init = lazy_init

    def before_agent(self, state, runtime):
        if self._lazy_init:
            return None  # Defer acquisition
        
        if "sandbox" not in state or state["sandbox"] is None:
            thread_id = runtime.context["thread_id"]
            sandbox_id = self._acquire_sandbox(thread_id)
            return {"sandbox": {"sandbox_id": sandbox_id}}
Sandbox Lifecycle:
  • Sandbox reused across turns within same thread (not released after each call)
  • Cleanup occurs at application shutdown via SandboxProvider.shutdown()
  • Supports local filesystem (LocalSandboxProvider) and Docker (AioSandboxProvider)

4. DanglingToolCallMiddleware

Purpose: Fixes message history gaps caused by interrupted tool calls (e.g., user cancellation). Lifecycle: wrap_model_call Implementation (backend/src/agents/middlewares/dangling_tool_call_middleware.py:28-111):
class DanglingToolCallMiddleware(AgentMiddleware[AgentState]):
    def wrap_model_call(self, request, handler):
        # Scan for AIMessages with tool_calls that lack ToolMessage responses
        patched = self._build_patched_messages(request.messages)
        if patched:
            request = request.override(messages=patched)
        return handler(request)
    
    def _build_patched_messages(self, messages):
        existing_tool_msg_ids = {msg.tool_call_id for msg in messages 
                                  if isinstance(msg, ToolMessage)}
        
        patched = []
        for msg in messages:
            patched.append(msg)
            if getattr(msg, "type", None) == "ai":
                for tc in getattr(msg, "tool_calls", []):
                    if tc["id"] not in existing_tool_msg_ids:
                        # Inject placeholder ToolMessage
                        patched.append(ToolMessage(
                            content="[Tool call was interrupted and did not return a result.]",
                            tool_call_id=tc["id"],
                            status="error"
                        ))
        return patched
Why wrap_model_call instead of before_model: Ensures patches are inserted immediately after each dangling AIMessage, not appended to the end (which before_model + add_messages reducer would do).

5. SummarizationMiddleware (Optional)

Purpose: Automatic context reduction when approaching token limits. Lifecycle: before_model, after_model Configuration (backend/src/config/summarization_config.py):
class SummarizationConfig(BaseModel):
    enabled: bool = False
    model_name: str | None = None  # None = use lightweight model
    trigger: ContextSize | list[ContextSize] | None
    keep: ContextSize = ContextSize(type="messages", value=20)
    trim_tokens_to_summarize: int | None = 4000
    summary_prompt: str | None = None
Trigger Types:
  • {"type": "fraction", "value": 0.8} - 80% of model’s max input tokens
  • {"type": "tokens", "value": 4000} - 4000 tokens
  • {"type": "messages", "value": 50} - 50 messages
Keep Policies: Same types as triggers, defines how much context to preserve after summarization. Creation (backend/src/agents/lead_agent/agent.py:41-80):
def _create_summarization_middleware():
    config = get_summarization_config()
    if not config.enabled:
        return None
    
    # Convert config to middleware parameters
    trigger = [t.to_tuple() for t in config.trigger] if isinstance(config.trigger, list) 
              else config.trigger.to_tuple()
    keep = config.keep.to_tuple()
    model = config.model_name or create_chat_model(thinking_enabled=False)
    
    return SummarizationMiddleware(
        model=model,
        trigger=trigger,
        keep=keep,
        trim_tokens_to_summarize=config.trim_tokens_to_summarize
    )

6. TodoListMiddleware (Optional)

Purpose: Provides write_todos tool for structured task tracking in complex multi-step workflows. Lifecycle: Tool injection + state management Activation: Enabled when config.configurable.is_plan_mode = True Custom Configuration (backend/src/agents/lead_agent/agent.py:83-195):
def _create_todo_list_middleware(is_plan_mode: bool):
    if not is_plan_mode:
        return None
    
    system_prompt = """
    <todo_list_system>
    **CRITICAL RULES:**
    - Mark todos as completed IMMEDIATELY after finishing each step
    - Keep EXACTLY ONE task as `in_progress` at any time
    - Update in REAL-TIME - gives users visibility
    - DO NOT use for simple tasks (< 3 steps)
    </todo_list_system>
    """
    
    tool_description = """Use for complex tasks (3+ steps) only..."""
    
    return TodoListMiddleware(
        system_prompt=system_prompt,
        tool_description=tool_description
    )
Task States:
  • pending - Not started
  • in_progress - Currently working (one at a time, or multiple if parallel)
  • completed - Finished successfully

7. TitleMiddleware

Purpose: Auto-generates thread title after first complete user-assistant exchange. Lifecycle: after_agent Implementation (backend/src/agents/middlewares/title_middleware.py:19-94):
class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
    def after_agent(self, state, runtime):
        if self._should_generate_title(state):
            title = self._generate_title(state)
            return {"title": title}
        return None
    
    def _should_generate_title(self, state):
        config = get_title_config()
        if not config.enabled or state.get("title"):
            return False
        
        messages = state.get("messages", [])
        user_messages = [m for m in messages if m.type == "human"]
        assistant_messages = [m for m in messages if m.type == "ai"]
        
        # Generate after first complete exchange
        return len(user_messages) == 1 and len(assistant_messages) >= 1
    
    def _generate_title(self, state):
        config = get_title_config()
        model = create_chat_model(thinking_enabled=False)  # Lightweight model
        
        user_msg = next(m.content for m in messages if m.type == "human")
        assistant_msg = next(m.content for m in messages if m.type == "ai")
        
        prompt = config.prompt_template.format(
            max_words=config.max_words,
            user_msg=user_msg[:500],
            assistant_msg=assistant_msg[:500]
        )
        
        response = model.invoke(prompt)
        title = response.content.strip()[:config.max_chars]
        return title
Fallback: If LLM fails, uses first 50 characters of user message.

8. MemoryMiddleware

Purpose: Queues conversation for asynchronous memory extraction and updates. Lifecycle: after_agent Implementation (backend/src/agents/middlewares/memory_middleware.py:53-117):
class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
    def __init__(self, agent_name: str | None = None):
        # agent_name: If provided, uses per-agent memory storage
        self._agent_name = agent_name
    
    def after_agent(self, state, runtime):
        config = get_memory_config()
        if not config.enabled:
            return None
        
        thread_id = runtime.context.get("thread_id")
        messages = state.get("messages", [])
        
        # Filter to user inputs + final assistant responses (no tool calls)
        filtered_messages = _filter_messages_for_memory(messages)
        
        # Queue for debounced background processing
        queue = get_memory_queue()
        queue.add(
            thread_id=thread_id,
            messages=filtered_messages,
            agent_name=self._agent_name
        )
        
        return None  # No state changes
Message Filtering (backend/src/agents/middlewares/memory_middleware.py:19-50):
def _filter_messages_for_memory(messages):
    filtered = []
    for msg in messages:
        if msg.type == "human":
            filtered.append(msg)  # Always keep user messages
        elif msg.type == "ai" and not getattr(msg, "tool_calls", None):
            filtered.append(msg)  # Only keep final AI responses
    return filtered
Memory Workflow:
  1. Middleware queues conversation after agent completes
  2. Queue debounces (30s default) and batches updates
  3. Background thread invokes LLM to extract facts and context
  4. Updates stored atomically in backend/.deer-flow/memory.json
  5. Next interaction injects top 15 facts into system prompt

9. ViewImageMiddleware (Optional)

Purpose: Injects base64 image data into conversation when view_image tool completes. Lifecycle: before_model Activation: Only added if model_config.supports_vision = true Implementation (backend/src/agents/middlewares/view_image_middleware.py:19-222):
class ViewImageMiddleware(AgentMiddleware[ViewImageMiddlewareState]):
    def before_model(self, state, runtime):
        return self._inject_image_message(state)
    
    def _should_inject_image_message(self, state):
        messages = state.get("messages", [])
        last_assistant_msg = self._get_last_assistant_message(messages)
        
        if not last_assistant_msg:
            return False
        
        # Check if it has view_image tool calls
        if not self._has_view_image_tool(last_assistant_msg):
            return False
        
        # Check if all tools completed
        if not self._all_tools_completed(messages, last_assistant_msg):
            return False
        
        # Check if we already injected the message
        # (prevents duplicate injections)
        return not self._already_injected(messages, last_assistant_msg)
    
    def _create_image_details_message(self, state):
        viewed_images = state.get("viewed_images", {})
        content_blocks = [
            {"type": "text", "text": "Here are the images you've viewed:"}
        ]
        
        for image_path, image_data in viewed_images.items():
            content_blocks.append({
                "type": "text",
                "text": f"\n- **{image_path}** ({image_data['mime_type']})"
            })
            content_blocks.append({
                "type": "image_url",
                "image_url": {
                    "url": f"data:{image_data['mime_type']};base64,{image_data['base64']}"
                }
            })
        
        return content_blocks
State Management: Uses viewed_images dict in ThreadState with custom reducer:
def merge_viewed_images(existing, new):
    if new == {}:  # Empty dict clears all viewed images
        return {}
    return {**existing, **new}  # Merge dictionaries

10. SubagentLimitMiddleware (Optional)

Purpose: Enforces maximum concurrent subagent calls by truncating excess task tool calls. Lifecycle: after_model Activation: Only added if config.configurable.subagent_enabled = True Implementation (backend/src/agents/middlewares/subagent_limit_middleware.py:24-76):
class SubagentLimitMiddleware(AgentMiddleware[AgentState]):
    def __init__(self, max_concurrent: int = MAX_CONCURRENT_SUBAGENTS):
        # max_concurrent clamped to [2, 4]
        self.max_concurrent = _clamp_subagent_limit(max_concurrent)
    
    def after_model(self, state, runtime):
        return self._truncate_task_calls(state)
    
    def _truncate_task_calls(self, state):
        messages = state.get("messages", [])
        last_msg = messages[-1]
        
        if getattr(last_msg, "type", None) != "ai":
            return None
        
        tool_calls = getattr(last_msg, "tool_calls", None)
        task_indices = [i for i, tc in enumerate(tool_calls) 
                        if tc.get("name") == "task"]
        
        if len(task_indices) <= self.max_concurrent:
            return None
        
        # Keep only first max_concurrent task calls
        indices_to_drop = set(task_indices[self.max_concurrent:])
        truncated = [tc for i, tc in enumerate(tool_calls) 
                     if i not in indices_to_drop]
        
        logger.warning(f"Truncated {len(indices_to_drop)} excess task calls")
        
        updated_msg = last_msg.model_copy(update={"tool_calls": truncated})
        return {"messages": [updated_msg]}
Why This Works: More reliable than prompt-based limits. Model can generate unlimited task calls, middleware truncates deterministically.

11. ClarificationMiddleware

Purpose: Intercepts ask_clarification tool calls and interrupts execution to present questions to user. Lifecycle: wrap_tool_call Position: MUST BE LAST in middleware chain to intercept after all other processing. Implementation (backend/src/agents/middlewares/clarification_middleware.py:20-174):
class ClarificationMiddleware(AgentMiddleware[ClarificationMiddlewareState]):
    def wrap_tool_call(self, request, handler):
        if request.tool_call.get("name") != "ask_clarification":
            return handler(request)  # Pass through
        
        return self._handle_clarification(request)
    
    def _handle_clarification(self, request):
        args = request.tool_call.get("args", {})
        formatted_message = self._format_clarification_message(args)
        
        tool_message = ToolMessage(
            content=formatted_message,
            tool_call_id=request.tool_call.get("id"),
            name="ask_clarification"
        )
        
        # Return Command that interrupts execution
        return Command(
            update={"messages": [tool_message]},
            goto=END  # Stop execution, wait for user response
        )
    
    def _format_clarification_message(self, args):
        question = args.get("question", "")
        clarification_type = args.get("clarification_type", "missing_info")
        context = args.get("context")
        options = args.get("options", [])
        
        type_icons = {
            "missing_info": "❓",
            "ambiguous_requirement": "🤔",
            "approach_choice": "🔀",
            "risk_confirmation": "⚠️",
            "suggestion": "💡"
        }
        
        icon = type_icons.get(clarification_type, "❓")
        
        message_parts = []
        if context:
            message_parts.append(f"{icon} {context}")
            message_parts.append(f"\n{question}")
        else:
            message_parts.append(f"{icon} {question}")
        
        if options:
            message_parts.append("")
            for i, option in enumerate(options, 1):
                message_parts.append(f"  {i}. {option}")
        
        return "\n".join(message_parts)
Key Behavior: Uses Command(goto=END) to interrupt graph execution, forcing wait for user input.

Middleware Ordering Rationale

The strict order ensures correct dependency resolution:
  1. ThreadDataMiddleware → Creates thread directories first (required by UploadsMiddleware, SandboxMiddleware)
  2. UploadsMiddleware → Injects file info before sandbox/model sees it
  3. SandboxMiddleware → Acquires environment before tool execution
  4. DanglingToolCallMiddleware → Patches message history before model sees it
  5. SummarizationMiddleware → Reduces context early (before other processing)
  6. TodoListMiddleware → Enables task tracking (before clarification)
  7. TitleMiddleware → Generates title after first exchange
  8. MemoryMiddleware → Queues after title generation (complete turn)
  9. ViewImageMiddleware → Injects images before model call (if vision supported)
  10. SubagentLimitMiddleware → Truncates after model generates tool calls
  11. ClarificationMiddlewareMUST BE LAST to intercept all tool calls

Runtime Configuration

Middlewares can be conditionally enabled via config.configurable:
config = {
    "configurable": {
        "thinking_enabled": True,
        "model_name": "gpt-4o",
        "is_plan_mode": False,      # Enables TodoListMiddleware
        "subagent_enabled": True,    # Enables SubagentLimitMiddleware
        "max_concurrent_subagents": 3
    }
}

agent = make_lead_agent(config)

State Schema Compatibility

All middlewares use state schemas compatible with ThreadState (backend/src/agents/thread_state.py:48-56):
class ThreadState(AgentState):
    sandbox: NotRequired[SandboxState | None]
    thread_data: NotRequired[ThreadDataState | None]
    title: NotRequired[str | None]
    artifacts: Annotated[list[str], merge_artifacts]
    todos: NotRequired[list | None]
    uploaded_files: NotRequired[list[dict] | None]
    viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images]
Custom Reducers:
  • merge_artifacts - Deduplicates artifact paths while preserving order
  • merge_viewed_images - Merges image dicts, empty dict {} clears all

Debugging Middlewares

Each middleware logs key actions:
logger.warning(f"Injecting {count} placeholder ToolMessage(s) for dangling tool calls")
logger.warning(f"Truncated {count} excess task call(s) from model response (limit: {limit})")
print(f"[ViewImageMiddleware] Injecting image details message with images before LLM call")
print(f"[ClarificationMiddleware] Intercepted clarification request")
View logs via:
cd backend
make dev  # Watch logs in terminal

See Also