Custom Transport
The transport abstraction lets you swap out how messages are sent and received without changing any plugin code. The default transport is AlgoChat (encrypted on-chain messaging via Algorand), but you can implement your own.
The Transport Trait
#![allow(unused)] fn main() { use anyhow::Result; use async_trait::async_trait; use nano_transport::{Message, OutboundMessage, SendResult, Transport}; #[async_trait] pub trait Transport: Send + Sync { /// Human-readable name (e.g. "algochat", "websocket", "mqtt"). fn name(&self) -> &str; /// Poll for new messages since the last sync. /// Return an empty vec if there are no new messages. async fn recv(&self) -> Result<Vec<Message>>; /// Send a message through this transport. async fn send(&self, msg: OutboundMessage) -> Result<SendResult>; /// The local agent's address on this transport. fn local_address(&self) -> &str; } }
Message Types
Inbound messages use the Message struct:
#![allow(unused)] fn main() { pub struct Message { pub sender: String, // Who sent it pub recipient: String, // Who it's for (your agent) pub content: String, // Plaintext content (already decrypted) pub timestamp: DateTime<Utc>, // When received/confirmed pub metadata: serde_json::Value, // Transport-specific extras } }
Outbound messages use OutboundMessage:
#![allow(unused)] fn main() { pub struct OutboundMessage { pub to: String, // Recipient address pub content: String, // Plaintext (transport handles encryption) } }
Send results return an ID:
#![allow(unused)] fn main() { pub struct SendResult { pub id: String, // Transport-assigned ID (tx hash, message ID, etc.) } }
Example: WebSocket Transport
Here's a complete example of a WebSocket-based transport. This uses external crates (tokio-tungstenite, futures-util) that you'd add to your own Cargo.toml:
#![allow(unused)] fn main() { use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; use tokio::sync::Mutex; use nano_transport::{Message, OutboundMessage, SendResult, Transport}; pub struct WebSocketTransport { address: String, ws_url: String, inbox: Arc<Mutex<Vec<Message>>>, } impl WebSocketTransport { pub fn new(address: String, ws_url: String) -> Self { Self { address, ws_url, inbox: Arc::new(Mutex::new(Vec::new())), } } /// Spawn a background listener that pushes messages into the inbox. pub async fn connect(&self) -> Result<()> { let inbox = self.inbox.clone(); let url = self.ws_url.clone(); let address = self.address.clone(); tokio::spawn(async move { // Connect to WebSocket and push incoming messages to inbox // (implementation depends on your WS library) let (mut ws, _) = tokio_tungstenite::connect_async(&url) .await .expect("ws connect failed"); use futures_util::StreamExt; while let Some(Ok(msg)) = ws.next().await { if let Ok(text) = msg.into_text() { inbox.lock().await.push(Message { sender: "ws-peer".into(), recipient: address.clone(), content: text, timestamp: chrono::Utc::now(), metadata: serde_json::Value::Null, }); } } }); Ok(()) } } #[async_trait] impl Transport for WebSocketTransport { fn name(&self) -> &str { "websocket" } async fn recv(&self) -> Result<Vec<Message>> { let mut inbox = self.inbox.lock().await; let messages = inbox.drain(..).collect(); Ok(messages) } async fn send(&self, msg: OutboundMessage) -> Result<SendResult> { // Send via WebSocket connection // (simplified — real impl would hold a write handle) Ok(SendResult { id: format!("ws-{}", uuid::Uuid::new_v4()), }) } fn local_address(&self) -> &str { &self.address } } }
Example: HTTP Polling Transport
A simpler transport that polls an HTTP endpoint:
#![allow(unused)] fn main() { pub struct HttpTransport { address: String, poll_url: String, send_url: String, http: reqwest::Client, last_seen: Arc<Mutex<Option<String>>>, } #[async_trait] impl Transport for HttpTransport { fn name(&self) -> &str { "http" } async fn recv(&self) -> Result<Vec<Message>> { let mut url = self.poll_url.clone(); if let Some(cursor) = self.last_seen.lock().await.as_ref() { url = format!("{}?after={}", url, cursor); } let resp: Vec<Message> = self.http .get(&url) .send().await? .json().await?; if let Some(last) = resp.last() { *self.last_seen.lock().await = Some( last.metadata["id"].as_str().unwrap_or("").to_string() ); } Ok(resp) } async fn send(&self, msg: OutboundMessage) -> Result<SendResult> { let resp = self.http .post(&self.send_url) .json(&msg) .send().await?; let id = resp.json::<serde_json::Value>().await? ["id"].as_str().unwrap_or("unknown").to_string(); Ok(SendResult { id }) } fn local_address(&self) -> &str { &self.address } } }
Built-in Transports
NullTransport
A no-op transport for offline mode and testing. recv() always returns empty, send() always succeeds.
#![allow(unused)] fn main() { use nano_transport::NullTransport; let transport = NullTransport::new("my-address"); }
MockTransport
A test transport that lets you inject messages and capture outbound ones:
#![allow(unused)] fn main() { use nano_transport::MockTransport; let transport = MockTransport::new("test-agent"); // Inject a message that recv() will return transport.inject(transport.message_from("alice", "hello")); // After running, check what was sent let sent = transport.sent_messages(); assert_eq!(sent[0].to, "alice"); // Inject multiple messages at once transport.inject_many(vec![ transport.message_from("bob", "msg 1"), transport.message_from("charlie", "msg 2"), ]); // Clear captured messages transport.clear_sent(); // Check total send count (persists across clears) assert_eq!(transport.send_count(), 1); }
MockTransport::clone() shares state — both clones see the same inbox and outbox. This is useful for passing one clone to the runtime and keeping another for assertions.
Using Your Transport
Pass your transport to the runtime:
#![allow(unused)] fn main() { use std::sync::Arc; use nano_runtime::{Runtime, RuntimeConfig}; let transport = Arc::new(WebSocketTransport::new( "my-agent".into(), "ws://localhost:8080".into(), )); transport.connect().await?; let mut runtime = Runtime::new(transport, RuntimeConfig::default()); runtime.add_plugin(Box::new(MyPlugin)).await?; let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); runtime.run(shutdown_rx).await?; }
The runtime calls transport.recv() on every poll tick (default: every 5 seconds, configurable via RuntimeConfig::poll_interval_secs). Each returned message becomes a MessageReceived event dispatched to plugins.
Design Guidelines
-
recv()should drain — return all pending messages and clear them. The runtime callsrecv()on a timer; returning the same messages twice will cause duplicate processing. -
send()should be idempotent-safe — if the runtime retries a failed send, your transport should handle it gracefully. -
Use
metadatafor transport-specific data — round numbers, transaction hashes, channel IDs, etc. Plugins can readmsg.metadatafor transport-specific context without the transport leaking into the core API. -
Handle errors gracefully — the runtime logs transport errors and continues. Don't panic in
recv()orsend().
Next Steps
- Nano Runtime — the event-driven plugin system
- Examples — complete worked examples
- Architecture Overview — system design