WebSockets & Real-Time Apps
Real-Time Database Integration
Introduction to Real-Time Database Integration
Modern databases provide built-in real-time capabilities that enable applications to react immediately to data changes. This lesson explores various database features for building real-time systems without complex custom infrastructure.
Database Real-Time Features Overview
Different databases offer different real-time capabilities:
- MongoDB Change Streams: Watch collections for changes
- PostgreSQL LISTEN/NOTIFY: Pub/Sub built into the database
- Firebase Realtime Database: Real-time sync as a core feature
- Database Triggers: Execute code when data changes
- Event Sourcing: Store all changes as events
Key Benefit: Database-native real-time features reduce complexity by eliminating the need for separate message brokers or event systems.
MongoDB Change Streams
Change streams allow applications to watch for changes in MongoDB collections:
// Install MongoDB driver
// npm install mongodb
const { MongoClient } = require('mongodb');
// Connect to MongoDB
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('myapp');
const collection = db.collection('messages');
// Create a change stream
const changeStream = collection.watch();
// Listen for changes
changeStream.on('change', (change) => {
console.log('Change detected:', change);
// Change types: insert, update, replace, delete
switch (change.operationType) {
case 'insert':
console.log('New document:', change.fullDocument);
// Broadcast to WebSocket clients
io.emit('message_added', change.fullDocument);
break;
case 'update':
console.log('Updated document:', change.documentKey);
io.emit('message_updated', {
id: change.documentKey._id,
changes: change.updateDescription.updatedFields
});
break;
case 'delete':
console.log('Deleted document:', change.documentKey);
io.emit('message_deleted', { id: change.documentKey._id });
break;
}
});
// Insert a document (triggers the change stream)
await collection.insertOne({
text: 'Hello World',
author: 'John',
timestamp: new Date()
});
Filtering Change Streams
Filter change streams to watch specific changes:
// Watch only insert operations
const insertStream = collection.watch([
{ $match: { operationType: 'insert' } }
]);
// Watch specific fields
const fieldStream = collection.watch([
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}
}
]);
// Watch documents matching criteria
const userStream = collection.watch([
{
$match: {
$or: [
{ 'fullDocument.userId': 'user123' },
{ 'fullDocument.recipientId': 'user123' }
]
}
}
]);
userStream.on('change', (change) => {
// Only changes for user123
console.log('User-specific change:', change);
});
Best Practice: Use aggregation pipeline filters in change streams to reduce network traffic and processing load.
Real-Time Chat with MongoDB Change Streams
const express = require('express');
const { Server } = require('socket.io');
const { MongoClient } = require('mongodb');
const app = express();
const server = app.listen(3000);
const io = new Server(server);
// MongoDB connection
const mongoClient = new MongoClient('mongodb://localhost:27017');
await mongoClient.connect();
const db = mongoClient.db('chat');
const messages = db.collection('messages');
// Watch for new messages
const changeStream = messages.watch([
{ $match: { operationType: 'insert' } }
]);
changeStream.on('change', (change) => {
const message = change.fullDocument;
// Broadcast to all connected clients
io.to(`room-${message.roomId}`).emit('new_message', {
id: message._id,
text: message.text,
author: message.author,
timestamp: message.timestamp
});
});
// Socket.io connection
io.on('connection', (socket) => {
socket.on('join_room', (roomId) => {
socket.join(`room-${roomId}`);
});
socket.on('send_message', async (data) => {
// Insert into MongoDB (change stream will trigger)
await messages.insertOne({
roomId: data.roomId,
text: data.text,
author: socket.userId,
timestamp: new Date()
});
});
});
PostgreSQL LISTEN/NOTIFY
PostgreSQL has built-in pub/sub with LISTEN and NOTIFY:
// Install pg driver
// npm install pg
const { Client } = require('pg');
// Publisher connection
const publisher = new Client({
host: 'localhost',
port: 5432,
database: 'myapp',
user: 'postgres',
password: 'password'
});
await publisher.connect();
// Subscriber connection
const subscriber = new Client({
host: 'localhost',
port: 5432,
database: 'myapp',
user: 'postgres',
password: 'password'
});
await subscriber.connect();
// Listen to a channel
await subscriber.query('LISTEN new_orders');
// Handle notifications
subscriber.on('notification', (msg) => {
console.log('Notification on channel:', msg.channel);
console.log('Payload:', msg.payload);
const data = JSON.parse(msg.payload);
io.emit('order_created', data);
});
// Publish a notification
await publisher.query(
"NOTIFY new_orders, '{\"orderId\": 123, \"total\": 99.99}'"
);
Database Triggers with NOTIFY
Automatically notify when data changes using triggers:
-- Create a function that sends notifications
CREATE OR REPLACE FUNCTION notify_order_change()
RETURNS trigger AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
PERFORM pg_notify(
'order_changes',
json_build_object(
'operation', 'insert',
'order_id', NEW.id,
'user_id', NEW.user_id,
'total', NEW.total
)::text
);
ELSIF TG_OP = 'UPDATE' THEN
PERFORM pg_notify(
'order_changes',
json_build_object(
'operation', 'update',
'order_id', NEW.id,
'status', NEW.status
)::text
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create trigger on orders table
CREATE TRIGGER order_change_trigger
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW
EXECUTE FUNCTION notify_order_change();
Node.js with PostgreSQL Triggers
const { Client } = require('pg');
const express = require('express');
const { Server } = require('socket.io');
const app = express();
const server = app.listen(3000);
const io = new Server(server);
// PostgreSQL listener
const listener = new Client({
host: 'localhost',
database: 'myapp',
user: 'postgres',
password: 'password'
});
await listener.connect();
await listener.query('LISTEN order_changes');
listener.on('notification', (msg) => {
const data = JSON.parse(msg.payload);
// Emit to specific user
if (data.user_id) {
io.to(`user-${data.user_id}`).emit('order_update', data);
}
// Emit to admin dashboard
io.to('admin').emit('order_change', data);
});
// API endpoint to create order
app.post('/orders', async (req, res) => {
const client = new Client({ /* config */ });
await client.connect();
// Insert order (trigger will fire automatically)
const result = await client.query(
'INSERT INTO orders (user_id, total, status) VALUES ($1, $2, $3) RETURNING *',
[req.body.userId, req.body.total, 'pending']
);
await client.end();
res.json(result.rows[0]);
});
Important: PostgreSQL LISTEN/NOTIFY doesn't guarantee delivery if the listening connection is lost. For critical messages, combine with polling or message queues.
Firebase Realtime Database Concept
Firebase provides real-time sync as a core feature:
// Install Firebase
// npm install firebase
import { initializeApp } from 'firebase/app';
import { getDatabase, ref, onValue, push, set } from 'firebase/database';
// Initialize Firebase
const firebaseConfig = {
apiKey: 'your-api-key',
databaseURL: 'https://your-project.firebaseio.com'
};
const app = initializeApp(firebaseConfig);
const db = getDatabase(app);
// Listen to real-time updates
const messagesRef = ref(db, 'messages');
onValue(messagesRef, (snapshot) => {
const data = snapshot.val();
console.log('Messages updated:', data);
// Update UI with new data
updateMessageList(data);
});
// Add a new message (automatically syncs to all listeners)
const newMessageRef = push(messagesRef);
await set(newMessageRef, {
text: 'Hello World',
author: 'John',
timestamp: Date.now()
});
Event Sourcing Basics
Event sourcing stores all changes as a sequence of events:
// Event store (can be MongoDB, PostgreSQL, etc.)
class EventStore {
constructor(db) {
this.events = db.collection('events');
}
// Append an event
async append(event) {
const eventData = {
aggregateId: event.aggregateId,
eventType: event.type,
data: event.data,
timestamp: new Date(),
version: await this.getNextVersion(event.aggregateId)
};
const result = await this.events.insertOne(eventData);
// Publish event to real-time listeners
this.emit('event_appended', eventData);
return result;
}
// Get all events for an aggregate
async getEvents(aggregateId) {
return await this.events
.find({ aggregateId })
.sort({ version: 1 })
.toArray();
}
// Rebuild state from events
async replayEvents(aggregateId) {
const events = await this.getEvents(aggregateId);
let state = {};
for (const event of events) {
state = this.applyEvent(state, event);
}
return state;
}
applyEvent(state, event) {
switch (event.eventType) {
case 'ORDER_CREATED':
return { ...state, ...event.data, status: 'pending' };
case 'ORDER_PAID':
return { ...state, status: 'paid', paidAt: event.timestamp };
case 'ORDER_SHIPPED':
return { ...state, status: 'shipped', shippedAt: event.timestamp };
default:
return state;
}
}
async getNextVersion(aggregateId) {
const result = await this.events
.find({ aggregateId })
.sort({ version: -1 })
.limit(1)
.toArray();
return result.length > 0 ? result[0].version + 1 : 1;
}
}
// Usage
const eventStore = new EventStore(db);
// Create order
await eventStore.append({
aggregateId: 'order-123',
type: 'ORDER_CREATED',
data: { userId: 'user-456', total: 99.99, items: [...] }
});
// Payment received
await eventStore.append({
aggregateId: 'order-123',
type: 'ORDER_PAID',
data: { transactionId: 'txn-789' }
});
// Get current order state
const orderState = await eventStore.replayEvents('order-123');
console.log('Current order:', orderState);
Event Sourcing Benefits:
- Complete audit trail of all changes
- Ability to replay events and rebuild state
- Natural fit for real-time updates (new events = new notifications)
- Temporal queries (what was the state at time X?)
Real-Time Event Projection
// Listen to event store and project to read models
eventStore.on('event_appended', async (event) => {
// Update read model
switch (event.eventType) {
case 'ORDER_CREATED':
await db.orders.insertOne({
_id: event.aggregateId,
...event.data,
status: 'pending',
createdAt: event.timestamp
});
// Notify WebSocket clients
io.emit('order_created', {
orderId: event.aggregateId,
...event.data
});
break;
case 'ORDER_PAID':
await db.orders.updateOne(
{ _id: event.aggregateId },
{ $set: { status: 'paid', paidAt: event.timestamp } }
);
io.emit('order_paid', {
orderId: event.aggregateId
});
break;
}
});
Combining Database Features with WebSockets
const express = require('express');
const { Server } = require('socket.io');
const { MongoClient } = require('mongodb');
const { Client: PgClient } = require('pg');
const app = express();
const server = app.listen(3000);
const io = new Server(server);
// MongoDB change streams for messages
const mongoClient = new MongoClient('mongodb://localhost:27017');
await mongoClient.connect();
const messages = mongoClient.db('app').collection('messages');
messages.watch().on('change', (change) => {
if (change.operationType === 'insert') {
io.emit('new_message', change.fullDocument);
}
});
// PostgreSQL NOTIFY for orders
const pgClient = new PgClient({
host: 'localhost',
database: 'app',
user: 'postgres',
password: 'password'
});
await pgClient.connect();
await pgClient.query('LISTEN order_updates');
pgClient.on('notification', (msg) => {
const order = JSON.parse(msg.payload);
io.to(`user-${order.userId}`).emit('order_update', order);
});
console.log('Real-time database integration running...');
Exercise:
- Set up MongoDB and create a change stream watcher
- Create a real-time notification system:
- Insert notifications into MongoDB
- Watch for changes with change streams
- Broadcast to Socket.io clients
- Add filtering to only send notifications to relevant users
- (Optional) Set up PostgreSQL and implement LISTEN/NOTIFY
- Create a simple event sourcing system with event replay
Summary
Database-native real-time features simplify real-time systems:
- MongoDB Change Streams watch collections for changes
- PostgreSQL LISTEN/NOTIFY provides pub/sub in the database
- Database triggers can automatically notify applications
- Firebase offers real-time sync as a core feature
- Event sourcing creates natural real-time event streams
- Combining database features with WebSockets creates powerful real-time apps