-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathecho_server.py
More file actions
92 lines (76 loc) · 3.14 KB
/
echo_server.py
File metadata and controls
92 lines (76 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
"""ROAR Protocol — Echo Server Example.
A minimal ROARServer that accepts DELEGATE messages and echoes them back.
Demonstrates Layer 1 (identity), Layer 4 (exchange), and Layer 2 (discovery registration).
Run this first, then run client.py in a second terminal.
Requirements:
pip install -e "python/[dev]" (from the roar-protocol repo)
pip install uvicorn fastapi (for the HTTP server)
Usage:
python3 examples/python/echo_server.py
"""
import asyncio
import logging
import os
# ── Imports from the standalone roar-sdk package ────────────────────────────
from roar_sdk import (
AgentIdentity,
MessageIntent,
ROARMessage,
StreamEvent,
StreamEventType,
ROARServer,
)
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
log = logging.getLogger("echo-server")
# ── Step 1: Give this server an identity ────────────────────────────────────
identity = AgentIdentity(
display_name="echo-server",
agent_type="agent",
capabilities=["echo", "reflect"],
)
log.info("Server DID: %s", identity.did)
# ── Step 2: Create the server ───────────────────────────────────────────────
server = ROARServer(
identity=identity,
host="127.0.0.1",
port=8089,
description="Echoes DELEGATE messages back to the sender",
signing_secret=os.environ.get("ROAR_SIGNING_SECRET", ""),
)
# ── Step 3: Register a handler for DELEGATE messages ────────────────────────
@server.on(MessageIntent.DELEGATE)
async def handle_delegate(msg: ROARMessage) -> ROARMessage:
log.info(
"← DELEGATE from %s: %s",
msg.from_identity.display_name,
msg.payload,
)
# Emit a StreamEvent so any subscriber can see this happened
await server.emit(
StreamEvent(
type=StreamEventType.TASK_UPDATE,
source=identity.did,
session_id=msg.context.get("session_id", ""),
data={"status": "echoed", "original_payload": msg.payload},
)
)
response = ROARMessage(
**{"from": server.identity, "to": msg.from_identity},
intent=MessageIntent.RESPOND,
payload={"echo": msg.payload, "status": "ok"},
context={"in_reply_to": msg.id},
)
log.info("→ RESPOND: %s", response.payload)
return response
# ── Step 4: Start the built-in HTTP server ──────────────────────────────────
if __name__ == "__main__":
log.info("Starting ROAR echo server on http://127.0.0.1:8089")
log.info("Endpoints:")
log.info(" POST /roar/message — receive a ROARMessage")
log.info(" GET /roar/agents — see this server's AgentCard")
log.info("")
log.info("Now run: python3 examples/python/client.py")
# ROARServer.serve() wires FastAPI + uvicorn automatically.
# Requires: pip install 'roar-sdk[server]'
server.serve()