Task in A2A Protocol: The Core Abstraction of Distributed AI Systems
Task in A2A Protocol: The Core Abstraction of Distributed AI Systems
Task Overview
In the Agent-to-Agent (A2A) protocol, Task is the most fundamental abstraction concept, representing a complete, traceable unit of work. Task is not just a simple message or request, but a complex object containing state, context, results, and metadata.
Why Do We Need Task?
In distributed AI systems, collaboration between agents often involves:
- Long-running tasks: Requiring state tracking and progress updates
- Multi-turn conversations: Requiring context maintenance and reference relationships
- Asynchronous processing: Requiring non-blocking task execution
- Error handling: Requiring task cancellation and retry mechanisms
- Result management: Requiring storage and transmission of complex result data
The Task abstraction perfectly addresses these challenges.
Core Properties of Task
1. Basic Identifiers
class Task:
id: str # Unique task identifier
context_id: str # Conversation context identifier
message_id: str # Associated message identifier
created_at: datetime # Creation time
updated_at: datetime # Last update time
2. State Information
class TaskStatus:
state: TaskState # Current state
message: Message # Status message
progress: float # Progress percentage (0.0-1.0)
error: str | None # Error information
3. Task Content
class Task:
artifacts: list[Artifact] # Task results/outputs
parts: list[Part] # Task content parts
metadata: dict # Metadata
tags: list[str] # Tags
4. Relationship Management
class Task:
reference_task_ids: list[str] # Referenced other tasks
parent_task_id: str | None # Parent task ID
child_task_ids: list[str] # Child task ID list
Task Lifecycle
The Task lifecycle is a carefully designed state machine that ensures tasks can be properly tracked and managed.
State Transition Diagram
stateDiagram-v2
[*] --> Pending: Task Created
Pending --> Working: Start Execution
Pending --> InputRequired: Input Needed
Working --> InputRequired: Wait for Input
Working --> Completed: Execution Complete
InputRequired --> Working: Input Received
InputRequired --> Completed: Direct Complete
Working --> Failed: Execution Failed
InputRequired --> Failed: Input Timeout
Working --> Canceled: Canceled
InputRequired --> Canceled: Canceled
Completed --> [*]
Failed --> [*]
Canceled --> [*]
Detailed State Descriptions
1. Pending - Awaiting Processing
# Task just created, waiting to start execution
task.status.state = TaskState.pending
2. Working - In Progress
# Task is executing, can send progress updates
await updater.update_status(
TaskState.working,
new_agent_text_message("Processing...", context_id, task_id)
)
3. InputRequired - Input Needed
# Task needs more input to continue
await updater.requires_input(final=True)
4. Completed - Completed
# Task successfully completed
await updater.complete()
5. Failed - Failed
# Task execution failed
await updater.fail("Error message")
6. Canceled - Canceled
# Task was canceled
await updater.cancel()
Task State Management
TaskUpdater: The Core Tool for State Management
TaskUpdater
is the core tool for managing Task state in the A2A protocol, providing rich APIs to update task states.
class TaskUpdater:
def __init__(self, event_queue: EventQueue, task_id: str, context_id: str):
self.event_queue = event_queue
self.task_id = task_id
self.context_id = context_id
async def submit(self) -> None:
"""Submit task, state becomes Working"""
async def update_status(self, state: TaskState, message: Message, final: bool = False) -> None:
"""Update task state"""
async def add_artifact(self, parts: list[Part], name: str = None) -> None:
"""Add task results"""
async def requires_input(self, final: bool = False) -> None:
"""Require more input"""
async def complete(self) -> None:
"""Complete task"""
async def fail(self, error_message: str) -> None:
"""Mark task as failed"""
async def cancel(self) -> None:
"""Cancel task"""
State Update Example
async def process_complex_task(context: RequestContext, event_queue: EventQueue):
updater = TaskUpdater(event_queue, task_id, context_id)
# 1. Submit task
await updater.submit()
# 2. Update progress
await updater.update_status(
TaskState.working,
new_agent_text_message("Step 1: Initializing...", context_id, task_id)
)
# 3. Add intermediate results
await updater.add_artifact([
Part(root=TextPart(text="Intermediate result"))
])
# 4. Require user input
await updater.requires_input(final=True)
# 5. Continue processing...
await updater.update_status(
TaskState.working,
new_agent_text_message("Step 2: Processing...", context_id, task_id)
)
# 6. Complete task
await updater.add_artifact([
Part(root=TextPart(text="Final result"))
])
await updater.complete()
Task Applications in Number Guessing Game
Let’s understand the practical application of Task through specific examples from the Number Guessing Game.
Scenario 1: Bob Sends Guess to Alice
# Bob sends guess
def _handle_guess(guess: str) -> str:
resp_obj = send_text(AGENT_ALICE_PORT, guess)
feedback = extract_text(resp_obj)
return feedback
# Alice processes guess
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
raw_text = get_message_text(context.message)
response_text = process_guess(raw_text)
updater = TaskUpdater(event_queue, task_id, context_id)
# Task lifecycle:
await updater.submit() # Pending -> Working
await updater.add_artifact([...]) # Add results
await updater.complete() # Working -> Completed
Scenario 2: Bob’s Multi-turn Negotiation with Carol
# First round: Bob requests visualization
resp_task = send_text(AGENT_CAROL_PORT, json.dumps(game_history))
# Carol processes and keeps task active
async def _handle_initial(self, raw_text: str, context: RequestContext, event_queue: EventQueue):
response_text = process_history_payload(raw_text)
updater = TaskUpdater(event_queue, task_id, context_id)
await updater.add_artifact([Part(root=TextPart(text=response_text))])
# Key: Require more input, keep task in InputRequired state
await updater.requires_input(final=True) # Working -> InputRequired
# Second round: Bob sends "Try again"
resp_obj = send_followup(AGENT_CAROL_PORT, resp_task, "Try again")
# Carol processes follow-up message
async def _handle_followup(self, context: RequestContext, raw_text: str, event_queue: EventQueue):
if raw_text.lower().startswith('well done'):
# Complete task
await updater.complete() # InputRequired -> Completed
else:
# Continue processing and keep active
await updater.add_artifact([...])
await updater.requires_input(final=True) # Keep InputRequired
Task Routing Mechanism
1. HTTP Connection-based Routing
Task routing in the A2A protocol is not implemented through traditional message queues, but through direct HTTP connections and identifier matching.
# Client connects to specific Agent
client = _client_factory.create(
minimal_agent_card(f'http://localhost:{port}/a2a/v1')
)
# Include routing information when sending messages
msg = Message(
kind='message',
role=Role.user,
message_id=uuid.uuid4().hex, # Message identifier
context_id=context_id, # Context identifier
reference_task_ids=reference_task_ids or [], # Referenced task IDs
parts=[TextPart(text=text)],
task_id=task_id, # Task identifier
)
2. Role of Identifiers
context_id - Conversation Context
# All messages and tasks in the same conversation share context_id
context_id = str(uuid.uuid4()) # Generate unique conversation ID
task_id - Task Identifier
# Each task has a unique task_id for state updates and references
task_id = context.task_id or str(uuid.uuid4())
reference_task_ids - Task References
# Used for multi-turn conversations, referencing previous tasks
reference_task_ids = [previous_task_id]
3. Routing Flow Diagram
sequenceDiagram
participant Bob as Bob (Client)
participant Alice as Alice (Server)
participant EventQueue as EventQueue (Alice Internal)
Bob->>Alice: HTTP POST /a2a/v1<br/>Message{task_id, context_id}
Alice->>EventQueue: Create Task
Alice->>EventQueue: Update Status (Working)
Alice->>EventQueue: Add Results
Alice->>EventQueue: Complete Task (Completed)
EventQueue->>Alice: Event Stream
Alice->>Bob: HTTP Response<br/>Task Object
Relationship Between Task and EventQueue
EventQueue is Not a Cross-Agent Message Queue
Important Clarification: EventQueue in the A2A protocol is not a traditional message queue, but an event bus within each Agent.
# ❌ Misunderstanding: Cross-agent message queue
Bob -> EventQueue -> Alice
Bob -> EventQueue -> Carol
# ✅ Correct understanding: Internal event bus for each Agent
Bob -> HTTP(8001) -> Alice -> EventQueue(Alice internal)
Bob -> HTTP(8003) -> Carol -> EventQueue(Carol internal)
Actual Role of EventQueue
# Role of EventQueue within Agent
class NumberGuessExecutor(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue):
# EventQueue is used for:
# 1. Sending task status update events
# 2. Sending result data events
# 3. Managing task lifecycle events
updater = TaskUpdater(event_queue, task_id, context_id)
await updater.submit() # Send to EventQueue
await updater.add_artifact([...]) # Send to EventQueue
await updater.complete() # Send to EventQueue
Event Types
EventQueue handles the following types of events:
# Main event types
Message # Regular message
Task # Task object
TaskStatusUpdateEvent # Task status update event
TaskArtifactUpdateEvent # Task result update event
Advanced Task Patterns
1. Task Chain Pattern
# Create task chain
async def create_task_chain():
# Parent task
parent_task = await create_task("Main task")
# Child tasks
child_task1 = await create_task("Subtask 1", parent_id=parent_task.id)
child_task2 = await create_task("Subtask 2", parent_id=parent_task.id)
# Wait for all child tasks to complete
await wait_for_tasks([child_task1.id, child_task2.id])
# Complete parent task
await complete_task(parent_task.id)
2. Parallel Task Pattern
# Execute multiple tasks in parallel
async def parallel_tasks():
tasks = []
# Create multiple parallel tasks
for i in range(5):
task = await create_task(f"Parallel task {i}")
tasks.append(task)
# Wait for all tasks to complete
results = await asyncio.gather(*[
execute_task(task) for task in tasks
])
return results
3. Task Retry Pattern
# Task retry mechanism
async def retry_task(task_id: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
result = await execute_task(task_id)
return result
except Exception as e:
if attempt == max_retries - 1:
await fail_task(task_id, str(e))
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
4. Task Timeout Pattern
# Task timeout handling
async def timeout_task(task_id: str, timeout_seconds: int = 30):
try:
result = await asyncio.wait_for(
execute_task(task_id),
timeout=timeout_seconds
)
return result
except asyncio.TimeoutError:
await cancel_task(task_id)
raise TaskTimeoutError(f"Task {task_id} timed out")
Best Practices
1. Task Design Principles
Single Responsibility
# ✅ Good design: Each task does one thing
async def process_user_input(input_text: str):
# Only process user input
pass
# ❌ Poor design: Task is too complex
async def process_everything(input_text: str):
# Process input, validate, store, notify, etc.
pass
Clear State Transitions
# ✅ Good design: Clear, logical flow
await updater.submit() # Pending -> Working
await updater.update_status(
TaskState.working,
new_agent_text_message("Processing data...", context_id, task_id)
)
await updater.add_artifact([...]) # Add results
await updater.requires_input(final=True) # Working -> InputRequired
# ... (wait for input)
await updater.complete() # InputRequired -> Completed
# ❌ Poor design: Unclear state transitions
await updater.add_artifact([...]) # What state is this? Missing submit()
await updater.requires_input() # Working -> InputRequired (but was it Working?)
await updater.update_status(TaskState.working, ...) # InputRequired -> Working? Confusing!
await updater.complete() # Working -> Completed
# ❌ Poor design: Inconsistent state management
await updater.submit() # Pending -> Working
await updater.add_artifact([...]) # Add results
# Missing requires_input() but task needs more data
await updater.complete() # Working -> Completed (but task wasn't really done)
2. Error Handling
async def robust_task_execution(context: RequestContext, event_queue: EventQueue):
updater = TaskUpdater(event_queue, task_id, context_id)
try:
await updater.submit()
# Execute task logic
result = await execute_business_logic()
await updater.add_artifact([Part(root=TextPart(text=result))])
await updater.complete()
except ValidationError as e:
await updater.fail(f"Validation error: {e}")
except TimeoutError as e:
await updater.cancel()
except Exception as e:
await updater.fail(f"Unexpected error: {e}")
raise
3. Performance Optimization
Batch Operations
# Batch add results
parts = [Part(root=TextPart(text=result)) for result in results]
await updater.add_artifact(parts) # Add multiple results at once
Control State Update Frequency
# Avoid too frequent state updates
last_update = time.time()
for i, item in enumerate(items):
if time.time() - last_update > 1.0: # Update at most once per second
await updater.update_status(
TaskState.working,
new_agent_text_message(f"Processing {i}/{len(items)}", context_id, task_id)
)
last_update = time.time()
4. Monitoring and Debugging
# Add task monitoring
async def monitored_task_execution(context: RequestContext, event_queue: EventQueue):
task_id = context.task_id
start_time = time.time()
try:
await updater.submit()
# Execute task
result = await execute_task()
# Record execution time
execution_time = time.time() - start_time
await updater.add_artifact([
Part(root=TextPart(text=f"Execution time: {execution_time:.2f}s"))
])
await updater.complete()
except Exception as e:
# Record error information
error_info = {
"error": str(e),
"execution_time": time.time() - start_time,
"task_id": task_id
}
await updater.fail(json.dumps(error_info))
Conclusion
Task in the A2A protocol is a carefully designed abstraction that addresses the core challenges of distributed AI systems:
Core Value
- State Management: Provides complete task lifecycle management
- Asynchronous Processing: Supports non-blocking task execution
- Multi-turn Conversations: Enables complex interactions through state persistence
- Error Handling: Provides comprehensive error and cancellation mechanisms
- Result Management: Supports complex result storage and transmission
Design Characteristics
- HTTP-based: Uses direct HTTP connections rather than message queues
- Identifier Routing: Routes through context_id, task_id, and other identifiers
- Event-driven: Uses EventQueue for internal event management
- State Machine: Clear state transitions and lifecycle management
Practical Applications
In the Number Guessing Game, Task enables:
- Bob to have simple request-response interactions with Alice
- Bob to have complex multi-turn negotiations with Carol
- Each Agent to independently manage its own task states
- The system to have good scalability and fault tolerance
Future Prospects
The Task abstraction lays the foundation for building more complex distributed AI systems:
- Task Orchestration: Supports complex task dependencies and orchestration
- Load Balancing: Supports dynamic task allocation and load balancing
- Monitoring and Debugging: Provides rich monitoring and debugging capabilities
- Scalability: Supports horizontal scaling and microservice architecture
Task in the A2A protocol is not just a technical implementation, but a design philosophy for distributed AI systems, embodying how to build scalable, maintainable, and observable intelligent systems.