WebSocketsWebSockets

WebSockets

After mastering Server-Sent Events for one-way communication, Batman realized he needed something more powerful. When Commissioner Gordon wanted to chat with him in real-time during crisis situations, Batman needed bidirectional communication.

"SSE is great for pushing updates to my dashboard," Batman thought, "but I need two-way communication for coordinating with my allies!"

To handle real-time bidirectional communication, Batman learned how to work with WebSockets using Robyn's modern decorator-based API:

Request

WebSocket
/web_socket
from robyn import Robyn

app = Robyn(__file__)

@app.websocket("/web_socket")
async def websocket_endpoint(websocket):
    # Connections are auto-accepted in Robyn - no need to call accept()!
    try:
        while True:
            message = await websocket.receive_text()
            websocket_id = websocket.id
            await websocket.send_text(f"Echo from {websocket_id}: {message}")
    except Exception as e:
        print(f"WebSocket {websocket.id} error: {e}")

@websocket_endpoint.on_connect
async def on_connect(websocket):
    await websocket.send_text("Connected!")
    return "Connection established"

@websocket_endpoint.on_close
async def on_close(websocket):
    print(f"WebSocket {websocket.id} disconnected")
    return "Disconnected"

WebSocket Methods

The modern WebSocket API provides clean, intuitive methods for handling real-time communication:

Note: WebSocket connections are automatically accepted in Robyn - no need to call accept()!

  • websocket.id - Get the unique connection ID
  • websocket.send_text(data) - Send text messages
  • websocket.send_json(data) - Send JSON messages
  • websocket.receive_text() - Receive text messages
  • websocket.receive_json() - Receive JSON messages
  • websocket.broadcast(data) - Broadcast to all connected clients
  • websocket.close() - Close the connection

WebSocket Methods

Methods
@app.websocket("/chat")
async def chat_handler(websocket):
      
    while True:
        message = await websocket.receive_text()
        websocket_id = websocket.id
        
        # Send to this client
        await websocket.send_text(f"You said: {message}")
        
        # Broadcast to all clients
        await websocket.broadcast(f"User {websocket_id}: {message}")

Dependency Injection

WebSockets support the same dependency injection system as HTTP routes, using global_dependencies and router_dependencies parameters:

Dependency Injection

DI
from robyn import Robyn
import logging

app = Robyn(__file__)

# Configure dependencies
app.inject_global(
    logger=logging.getLogger(__name__),
    database=DatabaseConnection(),
    metrics=MetricsCollector()
)

app.inject(
    cache=RedisCache(),
    auth_service=JWTAuthService()
)

Broadcasting Messages

Batman learned to send messages to all connected clients using the clean broadcast() method:

Broadcasting

Broadcast
@app.websocket("/notifications")
async def notification_handler(websocket):
      
    while True:
        message = await websocket.receive_text()
        
        # Broadcast to all connected clients
        await websocket.broadcast(f"Notification: {message}")
        
        # Confirm to sender
        await websocket.send_text("Notification sent!")

Query Parameters and Headers

WebSockets can access query parameters and headers for authentication and configuration:

Query Params

Params
@app.websocket("/secure_chat")
async def secure_chat(websocket):
    # Access query parameters
    token = websocket.query_params.get("token")
    room_id = websocket.query_params.get("room")
    
    if not token or not authenticate_token(token):
        await websocket.close()
        return
        
      await websocket.send_text(f"Joined room: {room_id}")
    
    while True:
        message = await websocket.receive_text()
        # Broadcast only to users in the same room
        await websocket.broadcast(f"Room {room_id} - {websocket.id}: {message}")

Connection Management

Batman learned to programmatically manage WebSocket connections with the close() method. This is useful for enforcing business rules, handling authentication failures, or managing resource limits:

Connection Management

Management
@app.websocket("/admin_panel")
async def admin_handler(websocket):
      
    while True:
        command = await websocket.receive_text()
        
        if command == "shutdown":
            await websocket.send_text("Shutting down connection...")
            await websocket.close()
            break
        elif command.startswith("kick_user"):
            user_id = command.split(":")[1]
            await websocket.broadcast(f"User {user_id} has been kicked")
            # Close specific user's connection (implementation depends on your user tracking)
        else:
            await websocket.send_text(f"Command executed: {command}")

Error Handling

Robust WebSocket applications need proper error handling for network issues, client disconnections, and application errors:

Error Handling

Errors
from robyn import WebSocketDisconnect

@app.websocket("/robust_chat")
async def robust_chat(websocket):
      
    try:
        while True:
            message = await websocket.receive_text()
            await websocket.send_text(f"Echo: {message}")
            
    except WebSocketDisconnect:
        print(f"Client {websocket.id} disconnected gracefully")
    except Exception as e:
        print(f"Unexpected error for {websocket.id}: {e}")
        await websocket.close()

What's next?

With real-time communication mastered, Batman was ready to scale his application. He learned about organizing his growing codebase with views and subrouters to keep everything maintainable as the Justice League joined his mission.