Custom Consumer¶
The ChatMessagingConsumer handles the WebSocket lifecycle and event routing.
Extend it to add new event types, override connection behaviour, or add
cross-cutting concerns like rate limiting, analytics, or logging.
Creating a Custom Consumer¶
Subclass ChatMessagingConsumer:
# myapp/consumers.py
from realtime_chat_messaging.consumers import ChatMessagingConsumer
class CustomChatConsumer(ChatMessagingConsumer):
pass
Register it in settings:
REALTIME_CHAT_MESSAGING = {
"CHAT_CONSUMER_CLASS": "myapp.consumers.CustomChatConsumer",
}
Overriding connect and disconnect¶
Always call super() to preserve authentication, session setup, and
notification dispatch:
from realtime_chat_messaging.consumers import ChatMessagingConsumer
from django.utils import timezone
import logging
logger = logging.getLogger(__name__)
class CustomChatConsumer(ChatMessagingConsumer):
async def connect(self):
await super().connect() # authentication + session setup
self.connected_at = timezone.now()
logger.info(f"User {self.user.username} connected")
async def disconnect(self, code):
if hasattr(self, "connected_at"):
duration = (timezone.now() - self.connected_at).total_seconds()
logger.info(f"User {self.user.username} disconnected after {duration:.0f}s")
await super().disconnect(code)
Adding New Event Handlers¶
New event handler methods must be async and follow the same signature
pattern as the built-in handlers. Use @ExceptionHandler.exception_handler_decorator
for consistent error handling, and add permission checks as appropriate:
from realtime_chat_messaging.consumers import ChatMessagingConsumer
from realtime_chat_messaging.utils.decorators import ExceptionHandler
from channels.db import database_sync_to_async
import json
class CustomChatConsumer(ChatMessagingConsumer):
@ExceptionHandler.exception_handler_decorator
async def receive_message_pin(self, data):
"""
Pin a message in a room.
Event: message.pin
Data: {"room_id": "<uuid>", "message_id": "<uuid>"}
"""
room_id = data.get("room_id")
message_id = data.get("message_id")
# Permission check
is_permitted, room = await self.permission_handler.have_admin_privileges(
self.user, room_id, "pin"
)
if not is_permitted:
from django.core.exceptions import PermissionDenied
raise PermissionDenied("Only admins can pin messages")
# Business logic
await self._pin_message(room_id, message_id)
# Broadcast to room
group = f"group-{room_id}"
await self.send_group(group, "message.pinned", {
"message_id": message_id,
"pinned_by": self.user.username,
})
@database_sync_to_async
def _pin_message(self, room_id, message_id):
from realtime_chat_messaging.utils.loader import get_model
Message = get_model("Message")
Message.objects.filter(pk=message_id).update(is_pinned=True)
Registering New Events with the Event Mapper¶
New handler methods must also be registered in a custom event mapper so the
consumer’s receive() method can route them:
# myapp/events.py
from realtime_chat_messaging.variables.consumers import map_event_type_to_handlers
def custom_event_mapper(consumer):
"""Extend default event map with custom events."""
events = map_event_type_to_handlers(consumer)
# Add new events
events["message.pin"] = consumer.receive_message_pin
events["message.flag"] = consumer.receive_message_flag
# Remove events you want to block
# del events["room.join"]
return events
Register the mapper:
REALTIME_CHAT_MESSAGING = {
"CHAT_CONSUMER_CLASS": "myapp.consumers.CustomChatConsumer",
"EVENT_MAPPER": "myapp.events.custom_event_mapper",
}
Adding Rate Limiting¶
Override receive() to intercept all events before routing:
import json
from realtime_chat_messaging.consumers import ChatMessagingConsumer
from django.utils import timezone
class CustomChatConsumer(ChatMessagingConsumer):
RATE_LIMIT = 30 # max events
RATE_WINDOW = 60 # per 60 seconds
async def connect(self):
await super().connect()
self._timestamps = []
async def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
if data.get("event_type") == "message.send":
if not self._check_rate():
await self.send(text_data=json.dumps({
"error": {
"code": 4029,
"detail": f"Rate limit: max {self.RATE_LIMIT} messages per {self.RATE_WINDOW}s"
}
}))
return
await super().receive(text_data)
def _check_rate(self):
now = timezone.now().timestamp()
cutoff = now - self.RATE_WINDOW
self._timestamps = [t for t in self._timestamps if t > cutoff]
if len(self._timestamps) >= self.RATE_LIMIT:
return False
self._timestamps.append(now)
return True
Using send_group in Custom Handlers¶
The send_group helper broadcasts to a channel layer group and uses the
broadcast_group receiver to forward the message to connected clients:
# Broadcast to all room members
await self.send_group(f"group-{room_id}", "custom.event", {"key": "value"})
# Send only to the current user (all their devices)
await self.send_group(f"user-{self.user.id}", "custom.event", {"key": "value"})
The resulting client payload:
{
"eventType": "custom.event",
"data": {"key": "value"}
}
Full Custom Consumer Example¶
A complete example combining custom events, rate limiting, analytics, and moderation features is shown below. This demonstrates the full pattern for a production-ready consumer extension:
# myapp/consumers.py
import json
import logging
from realtime_chat_messaging.consumers import ChatMessagingConsumer
from realtime_chat_messaging.utils.decorators import ExceptionHandler
from realtime_chat_messaging.permissions.handlers import PermissionHandler
from channels.db import database_sync_to_async
from django.utils import timezone
logger = logging.getLogger(__name__)
permission_handler = PermissionHandler()
class CustomChatConsumer(ChatMessagingConsumer):
RATE_LIMIT = 30
RATE_WINDOW = 60
async def connect(self):
await super().connect()
self._timestamps = []
logger.info(f"Connected: {self.scope['user'].username}")
async def disconnect(self, code):
logger.info(f"Disconnected: {self.scope['user'].username}")
await super().disconnect(code)
async def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
if data.get("event_type") == "message.send" and not self._check_rate():
await self.send(text_data=json.dumps({
"error": {"code": 4029, "detail": "Rate limit exceeded"}
}))
return
await super().receive(text_data)
@ExceptionHandler.exception_handler_decorator
async def receive_message_pin(self, data):
"""Handle message.pin event."""
room_id = data.get("room_id")
message_id = data.get("message_id")
is_permitted, room = await permission_handler.have_admin_privileges(
self.user, room_id, "update"
)
if not is_permitted:
from django.core.exceptions import PermissionDenied
raise PermissionDenied("Only admins can pin messages")
await self._do_pin(message_id)
await self.send_group(f"group-{room_id}", "message.pinned", {
"message_id": message_id,
"pinned_by": self.user.username,
})
@database_sync_to_async
def _do_pin(self, message_id):
from realtime_chat_messaging.utils.loader import get_model
get_model("Message").objects.filter(pk=message_id).update(is_pinned=True)
def _check_rate(self):
now = timezone.now().timestamp()
self._timestamps = [t for t in self._timestamps
if t > now - self.RATE_WINDOW]
if len(self._timestamps) >= self.RATE_LIMIT:
return False
self._timestamps.append(now)
return True