Task in A2A Protocol: The Core Abstraction of Distributed AI Systems

Task Overview

In the Agent-to-Agent (A2A) protocol, Task is the most fundamental abstraction concept, representing a complete, traceable unit of work. Task is not just a simple message or request, but a complex object containing state, context, results, and metadata.

Why Do We Need Task?

In distributed AI systems, collaboration between agents often involves:

  • Long-running tasks: Requiring state tracking and progress updates
  • Multi-turn conversations: Requiring context maintenance and reference relationships
  • Asynchronous processing: Requiring non-blocking task execution
  • Error handling: Requiring task cancellation and retry mechanisms
  • Result management: Requiring storage and transmission of complex result data

The Task abstraction perfectly addresses these challenges.


Core Properties of Task

1. Basic Identifiers

class Task:
    id: str                    # Unique task identifier
    context_id: str           # Conversation context identifier
    message_id: str           # Associated message identifier
    created_at: datetime      # Creation time
    updated_at: datetime      # Last update time

2. State Information

class TaskStatus:
    state: TaskState          # Current state
    message: Message          # Status message
    progress: float           # Progress percentage (0.0-1.0)
    error: str | None         # Error information

3. Task Content

class Task:
    artifacts: list[Artifact] # Task results/outputs
    parts: list[Part]         # Task content parts
    metadata: dict            # Metadata
    tags: list[str]           # Tags

4. Relationship Management

class Task:
    reference_task_ids: list[str]  # Referenced other tasks
    parent_task_id: str | None     # Parent task ID
    child_task_ids: list[str]      # Child task ID list

Task Lifecycle

The Task lifecycle is a carefully designed state machine that ensures tasks can be properly tracked and managed.

State Transition Diagram

stateDiagram-v2
    [*] --> Pending: Task Created
    Pending --> Working: Start Execution
    Pending --> InputRequired: Input Needed
    Working --> InputRequired: Wait for Input
    Working --> Completed: Execution Complete
    InputRequired --> Working: Input Received
    InputRequired --> Completed: Direct Complete
    Working --> Failed: Execution Failed
    InputRequired --> Failed: Input Timeout
    Working --> Canceled: Canceled
    InputRequired --> Canceled: Canceled
    Completed --> [*]
    Failed --> [*]
    Canceled --> [*]

Detailed State Descriptions

1. Pending - Awaiting Processing

# Task just created, waiting to start execution
task.status.state = TaskState.pending

2. Working - In Progress

# Task is executing, can send progress updates
await updater.update_status(
    TaskState.working,
    new_agent_text_message("Processing...", context_id, task_id)
)

3. InputRequired - Input Needed

# Task needs more input to continue
await updater.requires_input(final=True)

4. Completed - Completed

# Task successfully completed
await updater.complete()

5. Failed - Failed

# Task execution failed
await updater.fail("Error message")

6. Canceled - Canceled

# Task was canceled
await updater.cancel()

Task State Management

TaskUpdater: The Core Tool for State Management

TaskUpdater is the core tool for managing Task state in the A2A protocol, providing rich APIs to update task states.

class TaskUpdater:
    def __init__(self, event_queue: EventQueue, task_id: str, context_id: str):
        self.event_queue = event_queue
        self.task_id = task_id
        self.context_id = context_id
    
    async def submit(self) -> None:
        """Submit task, state becomes Working"""
        
    async def update_status(self, state: TaskState, message: Message, final: bool = False) -> None:
        """Update task state"""
        
    async def add_artifact(self, parts: list[Part], name: str = None) -> None:
        """Add task results"""
        
    async def requires_input(self, final: bool = False) -> None:
        """Require more input"""
        
    async def complete(self) -> None:
        """Complete task"""
        
    async def fail(self, error_message: str) -> None:
        """Mark task as failed"""
        
    async def cancel(self) -> None:
        """Cancel task"""

State Update Example

async def process_complex_task(context: RequestContext, event_queue: EventQueue):
    updater = TaskUpdater(event_queue, task_id, context_id)
    
    # 1. Submit task
    await updater.submit()
    
    # 2. Update progress
    await updater.update_status(
        TaskState.working,
        new_agent_text_message("Step 1: Initializing...", context_id, task_id)
    )
    
    # 3. Add intermediate results
    await updater.add_artifact([
        Part(root=TextPart(text="Intermediate result"))
    ])
    
    # 4. Require user input
    await updater.requires_input(final=True)
    
    # 5. Continue processing...
    await updater.update_status(
        TaskState.working,
        new_agent_text_message("Step 2: Processing...", context_id, task_id)
    )
    
    # 6. Complete task
    await updater.add_artifact([
        Part(root=TextPart(text="Final result"))
    ])
    await updater.complete()

Task Applications in Number Guessing Game

Let’s understand the practical application of Task through specific examples from the Number Guessing Game.

Scenario 1: Bob Sends Guess to Alice

# Bob sends guess
def _handle_guess(guess: str) -> str:
    resp_obj = send_text(AGENT_ALICE_PORT, guess)
    feedback = extract_text(resp_obj)
    return feedback
# Alice processes guess
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
    raw_text = get_message_text(context.message)
    response_text = process_guess(raw_text)
    
    updater = TaskUpdater(event_queue, task_id, context_id)
    
    # Task lifecycle:
    await updater.submit()                    # Pending -> Working
    await updater.add_artifact([...])         # Add results
    await updater.complete()                  # Working -> Completed

Scenario 2: Bob’s Multi-turn Negotiation with Carol

# First round: Bob requests visualization
resp_task = send_text(AGENT_CAROL_PORT, json.dumps(game_history))
# Carol processes and keeps task active
async def _handle_initial(self, raw_text: str, context: RequestContext, event_queue: EventQueue):
    response_text = process_history_payload(raw_text)
    
    updater = TaskUpdater(event_queue, task_id, context_id)
    await updater.add_artifact([Part(root=TextPart(text=response_text))])
    
    # Key: Require more input, keep task in InputRequired state
    await updater.requires_input(final=True)  # Working -> InputRequired
# Second round: Bob sends "Try again"
resp_obj = send_followup(AGENT_CAROL_PORT, resp_task, "Try again")
# Carol processes follow-up message
async def _handle_followup(self, context: RequestContext, raw_text: str, event_queue: EventQueue):
    if raw_text.lower().startswith('well done'):
        # Complete task
        await updater.complete()  # InputRequired -> Completed
    else:
        # Continue processing and keep active
        await updater.add_artifact([...])
        await updater.requires_input(final=True)  # Keep InputRequired

Task Routing Mechanism

1. HTTP Connection-based Routing

Task routing in the A2A protocol is not implemented through traditional message queues, but through direct HTTP connections and identifier matching.

# Client connects to specific Agent
client = _client_factory.create(
    minimal_agent_card(f'http://localhost:{port}/a2a/v1')
)

# Include routing information when sending messages
msg = Message(
    kind='message',
    role=Role.user,
    message_id=uuid.uuid4().hex,        # Message identifier
    context_id=context_id,              # Context identifier
    reference_task_ids=reference_task_ids or [],  # Referenced task IDs
    parts=[TextPart(text=text)],
    task_id=task_id,                    # Task identifier
)

2. Role of Identifiers

context_id - Conversation Context

# All messages and tasks in the same conversation share context_id
context_id = str(uuid.uuid4())  # Generate unique conversation ID

task_id - Task Identifier

# Each task has a unique task_id for state updates and references
task_id = context.task_id or str(uuid.uuid4())

reference_task_ids - Task References

# Used for multi-turn conversations, referencing previous tasks
reference_task_ids = [previous_task_id]

3. Routing Flow Diagram

sequenceDiagram
    participant Bob as Bob (Client)
    participant Alice as Alice (Server)
    participant EventQueue as EventQueue (Alice Internal)
    
    Bob->>Alice: HTTP POST /a2a/v1<br/>Message{task_id, context_id}
    Alice->>EventQueue: Create Task
    Alice->>EventQueue: Update Status (Working)
    Alice->>EventQueue: Add Results
    Alice->>EventQueue: Complete Task (Completed)
    EventQueue->>Alice: Event Stream
    Alice->>Bob: HTTP Response<br/>Task Object

Relationship Between Task and EventQueue

EventQueue is Not a Cross-Agent Message Queue

Important Clarification: EventQueue in the A2A protocol is not a traditional message queue, but an event bus within each Agent.

# ❌ Misunderstanding: Cross-agent message queue
Bob -> EventQueue -> Alice
Bob -> EventQueue -> Carol

# ✅ Correct understanding: Internal event bus for each Agent
Bob -> HTTP(8001) -> Alice -> EventQueue(Alice internal)
Bob -> HTTP(8003) -> Carol -> EventQueue(Carol internal)

Actual Role of EventQueue

# Role of EventQueue within Agent
class NumberGuessExecutor(AgentExecutor):
    async def execute(self, context: RequestContext, event_queue: EventQueue):
        # EventQueue is used for:
        # 1. Sending task status update events
        # 2. Sending result data events
        # 3. Managing task lifecycle events
        
        updater = TaskUpdater(event_queue, task_id, context_id)
        await updater.submit()      # Send to EventQueue
        await updater.add_artifact([...])  # Send to EventQueue
        await updater.complete()    # Send to EventQueue

Event Types

EventQueue handles the following types of events:

# Main event types
Message                    # Regular message
Task                      # Task object
TaskStatusUpdateEvent     # Task status update event
TaskArtifactUpdateEvent   # Task result update event

Advanced Task Patterns

1. Task Chain Pattern

# Create task chain
async def create_task_chain():
    # Parent task
    parent_task = await create_task("Main task")
    
    # Child tasks
    child_task1 = await create_task("Subtask 1", parent_id=parent_task.id)
    child_task2 = await create_task("Subtask 2", parent_id=parent_task.id)
    
    # Wait for all child tasks to complete
    await wait_for_tasks([child_task1.id, child_task2.id])
    
    # Complete parent task
    await complete_task(parent_task.id)

2. Parallel Task Pattern

# Execute multiple tasks in parallel
async def parallel_tasks():
    tasks = []
    
    # Create multiple parallel tasks
    for i in range(5):
        task = await create_task(f"Parallel task {i}")
        tasks.append(task)
    
    # Wait for all tasks to complete
    results = await asyncio.gather(*[
        execute_task(task) for task in tasks
    ])
    
    return results

3. Task Retry Pattern

# Task retry mechanism
async def retry_task(task_id: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            result = await execute_task(task_id)
            return result
        except Exception as e:
            if attempt == max_retries - 1:
                await fail_task(task_id, str(e))
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

4. Task Timeout Pattern

# Task timeout handling
async def timeout_task(task_id: str, timeout_seconds: int = 30):
    try:
        result = await asyncio.wait_for(
            execute_task(task_id),
            timeout=timeout_seconds
        )
        return result
    except asyncio.TimeoutError:
        await cancel_task(task_id)
        raise TaskTimeoutError(f"Task {task_id} timed out")

Best Practices

1. Task Design Principles

Single Responsibility

# ✅ Good design: Each task does one thing
async def process_user_input(input_text: str):
    # Only process user input
    pass

# ❌ Poor design: Task is too complex
async def process_everything(input_text: str):
    # Process input, validate, store, notify, etc.
    pass

Clear State Transitions

# ✅ Good design: Clear, logical flow
await updater.submit()                    # Pending -> Working
await updater.update_status(
    TaskState.working, 
    new_agent_text_message("Processing data...", context_id, task_id)
)
await updater.add_artifact([...])         # Add results
await updater.requires_input(final=True)  # Working -> InputRequired
# ... (wait for input)
await updater.complete()                  # InputRequired -> Completed

# ❌ Poor design: Unclear state transitions
await updater.add_artifact([...])         # What state is this? Missing submit()
await updater.requires_input()            # Working -> InputRequired (but was it Working?)
await updater.update_status(TaskState.working, ...)  # InputRequired -> Working? Confusing!
await updater.complete()                  # Working -> Completed

# ❌ Poor design: Inconsistent state management
await updater.submit()                    # Pending -> Working
await updater.add_artifact([...])         # Add results
# Missing requires_input() but task needs more data
await updater.complete()                  # Working -> Completed (but task wasn't really done)

2. Error Handling

async def robust_task_execution(context: RequestContext, event_queue: EventQueue):
    updater = TaskUpdater(event_queue, task_id, context_id)
    
    try:
        await updater.submit()
        
        # Execute task logic
        result = await execute_business_logic()
        
        await updater.add_artifact([Part(root=TextPart(text=result))])
        await updater.complete()
        
    except ValidationError as e:
        await updater.fail(f"Validation error: {e}")
    except TimeoutError as e:
        await updater.cancel()
    except Exception as e:
        await updater.fail(f"Unexpected error: {e}")
        raise

3. Performance Optimization

Batch Operations

# Batch add results
parts = [Part(root=TextPart(text=result)) for result in results]
await updater.add_artifact(parts)  # Add multiple results at once

Control State Update Frequency

# Avoid too frequent state updates
last_update = time.time()
for i, item in enumerate(items):
    if time.time() - last_update > 1.0:  # Update at most once per second
        await updater.update_status(
            TaskState.working,
            new_agent_text_message(f"Processing {i}/{len(items)}", context_id, task_id)
        )
        last_update = time.time()

4. Monitoring and Debugging

# Add task monitoring
async def monitored_task_execution(context: RequestContext, event_queue: EventQueue):
    task_id = context.task_id
    start_time = time.time()
    
    try:
        await updater.submit()
        
        # Execute task
        result = await execute_task()
        
        # Record execution time
        execution_time = time.time() - start_time
        await updater.add_artifact([
            Part(root=TextPart(text=f"Execution time: {execution_time:.2f}s"))
        ])
        
        await updater.complete()
        
    except Exception as e:
        # Record error information
        error_info = {
            "error": str(e),
            "execution_time": time.time() - start_time,
            "task_id": task_id
        }
        await updater.fail(json.dumps(error_info))

Conclusion

Task in the A2A protocol is a carefully designed abstraction that addresses the core challenges of distributed AI systems:

Core Value

  1. State Management: Provides complete task lifecycle management
  2. Asynchronous Processing: Supports non-blocking task execution
  3. Multi-turn Conversations: Enables complex interactions through state persistence
  4. Error Handling: Provides comprehensive error and cancellation mechanisms
  5. Result Management: Supports complex result storage and transmission

Design Characteristics

  1. HTTP-based: Uses direct HTTP connections rather than message queues
  2. Identifier Routing: Routes through context_id, task_id, and other identifiers
  3. Event-driven: Uses EventQueue for internal event management
  4. State Machine: Clear state transitions and lifecycle management

Practical Applications

In the Number Guessing Game, Task enables:

  • Bob to have simple request-response interactions with Alice
  • Bob to have complex multi-turn negotiations with Carol
  • Each Agent to independently manage its own task states
  • The system to have good scalability and fault tolerance

Future Prospects

The Task abstraction lays the foundation for building more complex distributed AI systems:

  • Task Orchestration: Supports complex task dependencies and orchestration
  • Load Balancing: Supports dynamic task allocation and load balancing
  • Monitoring and Debugging: Provides rich monitoring and debugging capabilities
  • Scalability: Supports horizontal scaling and microservice architecture

Task in the A2A protocol is not just a technical implementation, but a design philosophy for distributed AI systems, embodying how to build scalable, maintainable, and observable intelligent systems.