WebSockets & Real-Time Apps

Real-Time Database Integration

19 min Lesson 25 of 35

Introduction to Real-Time Database Integration

Modern databases provide built-in real-time capabilities that enable applications to react immediately to data changes. This lesson explores various database features for building real-time systems without complex custom infrastructure.

Database Real-Time Features Overview

Different databases offer different real-time capabilities:

  • MongoDB Change Streams: Watch collections for changes
  • PostgreSQL LISTEN/NOTIFY: Pub/Sub built into the database
  • Firebase Realtime Database: Real-time sync as a core feature
  • Database Triggers: Execute code when data changes
  • Event Sourcing: Store all changes as events
Key Benefit: Database-native real-time features reduce complexity by eliminating the need for separate message brokers or event systems.

MongoDB Change Streams

Change streams allow applications to watch for changes in MongoDB collections:

// Install MongoDB driver // npm install mongodb const { MongoClient } = require('mongodb'); // Connect to MongoDB const client = new MongoClient('mongodb://localhost:27017'); await client.connect(); const db = client.db('myapp'); const collection = db.collection('messages'); // Create a change stream const changeStream = collection.watch(); // Listen for changes changeStream.on('change', (change) => { console.log('Change detected:', change); // Change types: insert, update, replace, delete switch (change.operationType) { case 'insert': console.log('New document:', change.fullDocument); // Broadcast to WebSocket clients io.emit('message_added', change.fullDocument); break; case 'update': console.log('Updated document:', change.documentKey); io.emit('message_updated', { id: change.documentKey._id, changes: change.updateDescription.updatedFields }); break; case 'delete': console.log('Deleted document:', change.documentKey); io.emit('message_deleted', { id: change.documentKey._id }); break; } }); // Insert a document (triggers the change stream) await collection.insertOne({ text: 'Hello World', author: 'John', timestamp: new Date() });

Filtering Change Streams

Filter change streams to watch specific changes:

// Watch only insert operations const insertStream = collection.watch([ { $match: { operationType: 'insert' } } ]); // Watch specific fields const fieldStream = collection.watch([ { $match: { operationType: 'update', 'updateDescription.updatedFields.status': { $exists: true } } } ]); // Watch documents matching criteria const userStream = collection.watch([ { $match: { $or: [ { 'fullDocument.userId': 'user123' }, { 'fullDocument.recipientId': 'user123' } ] } } ]); userStream.on('change', (change) => { // Only changes for user123 console.log('User-specific change:', change); });
Best Practice: Use aggregation pipeline filters in change streams to reduce network traffic and processing load.

Real-Time Chat with MongoDB Change Streams

const express = require('express'); const { Server } = require('socket.io'); const { MongoClient } = require('mongodb'); const app = express(); const server = app.listen(3000); const io = new Server(server); // MongoDB connection const mongoClient = new MongoClient('mongodb://localhost:27017'); await mongoClient.connect(); const db = mongoClient.db('chat'); const messages = db.collection('messages'); // Watch for new messages const changeStream = messages.watch([ { $match: { operationType: 'insert' } } ]); changeStream.on('change', (change) => { const message = change.fullDocument; // Broadcast to all connected clients io.to(`room-${message.roomId}`).emit('new_message', { id: message._id, text: message.text, author: message.author, timestamp: message.timestamp }); }); // Socket.io connection io.on('connection', (socket) => { socket.on('join_room', (roomId) => { socket.join(`room-${roomId}`); }); socket.on('send_message', async (data) => { // Insert into MongoDB (change stream will trigger) await messages.insertOne({ roomId: data.roomId, text: data.text, author: socket.userId, timestamp: new Date() }); }); });

PostgreSQL LISTEN/NOTIFY

PostgreSQL has built-in pub/sub with LISTEN and NOTIFY:

// Install pg driver // npm install pg const { Client } = require('pg'); // Publisher connection const publisher = new Client({ host: 'localhost', port: 5432, database: 'myapp', user: 'postgres', password: 'password' }); await publisher.connect(); // Subscriber connection const subscriber = new Client({ host: 'localhost', port: 5432, database: 'myapp', user: 'postgres', password: 'password' }); await subscriber.connect(); // Listen to a channel await subscriber.query('LISTEN new_orders'); // Handle notifications subscriber.on('notification', (msg) => { console.log('Notification on channel:', msg.channel); console.log('Payload:', msg.payload); const data = JSON.parse(msg.payload); io.emit('order_created', data); }); // Publish a notification await publisher.query( "NOTIFY new_orders, '{\"orderId\": 123, \"total\": 99.99}'" );

Database Triggers with NOTIFY

Automatically notify when data changes using triggers:

-- Create a function that sends notifications CREATE OR REPLACE FUNCTION notify_order_change() RETURNS trigger AS $$ BEGIN IF TG_OP = 'INSERT' THEN PERFORM pg_notify( 'order_changes', json_build_object( 'operation', 'insert', 'order_id', NEW.id, 'user_id', NEW.user_id, 'total', NEW.total )::text ); ELSIF TG_OP = 'UPDATE' THEN PERFORM pg_notify( 'order_changes', json_build_object( 'operation', 'update', 'order_id', NEW.id, 'status', NEW.status )::text ); END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; -- Create trigger on orders table CREATE TRIGGER order_change_trigger AFTER INSERT OR UPDATE ON orders FOR EACH ROW EXECUTE FUNCTION notify_order_change();

Node.js with PostgreSQL Triggers

const { Client } = require('pg'); const express = require('express'); const { Server } = require('socket.io'); const app = express(); const server = app.listen(3000); const io = new Server(server); // PostgreSQL listener const listener = new Client({ host: 'localhost', database: 'myapp', user: 'postgres', password: 'password' }); await listener.connect(); await listener.query('LISTEN order_changes'); listener.on('notification', (msg) => { const data = JSON.parse(msg.payload); // Emit to specific user if (data.user_id) { io.to(`user-${data.user_id}`).emit('order_update', data); } // Emit to admin dashboard io.to('admin').emit('order_change', data); }); // API endpoint to create order app.post('/orders', async (req, res) => { const client = new Client({ /* config */ }); await client.connect(); // Insert order (trigger will fire automatically) const result = await client.query( 'INSERT INTO orders (user_id, total, status) VALUES ($1, $2, $3) RETURNING *', [req.body.userId, req.body.total, 'pending'] ); await client.end(); res.json(result.rows[0]); });
Important: PostgreSQL LISTEN/NOTIFY doesn't guarantee delivery if the listening connection is lost. For critical messages, combine with polling or message queues.

Firebase Realtime Database Concept

Firebase provides real-time sync as a core feature:

// Install Firebase // npm install firebase import { initializeApp } from 'firebase/app'; import { getDatabase, ref, onValue, push, set } from 'firebase/database'; // Initialize Firebase const firebaseConfig = { apiKey: 'your-api-key', databaseURL: 'https://your-project.firebaseio.com' }; const app = initializeApp(firebaseConfig); const db = getDatabase(app); // Listen to real-time updates const messagesRef = ref(db, 'messages'); onValue(messagesRef, (snapshot) => { const data = snapshot.val(); console.log('Messages updated:', data); // Update UI with new data updateMessageList(data); }); // Add a new message (automatically syncs to all listeners) const newMessageRef = push(messagesRef); await set(newMessageRef, { text: 'Hello World', author: 'John', timestamp: Date.now() });

Event Sourcing Basics

Event sourcing stores all changes as a sequence of events:

// Event store (can be MongoDB, PostgreSQL, etc.) class EventStore { constructor(db) { this.events = db.collection('events'); } // Append an event async append(event) { const eventData = { aggregateId: event.aggregateId, eventType: event.type, data: event.data, timestamp: new Date(), version: await this.getNextVersion(event.aggregateId) }; const result = await this.events.insertOne(eventData); // Publish event to real-time listeners this.emit('event_appended', eventData); return result; } // Get all events for an aggregate async getEvents(aggregateId) { return await this.events .find({ aggregateId }) .sort({ version: 1 }) .toArray(); } // Rebuild state from events async replayEvents(aggregateId) { const events = await this.getEvents(aggregateId); let state = {}; for (const event of events) { state = this.applyEvent(state, event); } return state; } applyEvent(state, event) { switch (event.eventType) { case 'ORDER_CREATED': return { ...state, ...event.data, status: 'pending' }; case 'ORDER_PAID': return { ...state, status: 'paid', paidAt: event.timestamp }; case 'ORDER_SHIPPED': return { ...state, status: 'shipped', shippedAt: event.timestamp }; default: return state; } } async getNextVersion(aggregateId) { const result = await this.events .find({ aggregateId }) .sort({ version: -1 }) .limit(1) .toArray(); return result.length > 0 ? result[0].version + 1 : 1; } } // Usage const eventStore = new EventStore(db); // Create order await eventStore.append({ aggregateId: 'order-123', type: 'ORDER_CREATED', data: { userId: 'user-456', total: 99.99, items: [...] } }); // Payment received await eventStore.append({ aggregateId: 'order-123', type: 'ORDER_PAID', data: { transactionId: 'txn-789' } }); // Get current order state const orderState = await eventStore.replayEvents('order-123'); console.log('Current order:', orderState);
Event Sourcing Benefits:
  • Complete audit trail of all changes
  • Ability to replay events and rebuild state
  • Natural fit for real-time updates (new events = new notifications)
  • Temporal queries (what was the state at time X?)

Real-Time Event Projection

// Listen to event store and project to read models eventStore.on('event_appended', async (event) => { // Update read model switch (event.eventType) { case 'ORDER_CREATED': await db.orders.insertOne({ _id: event.aggregateId, ...event.data, status: 'pending', createdAt: event.timestamp }); // Notify WebSocket clients io.emit('order_created', { orderId: event.aggregateId, ...event.data }); break; case 'ORDER_PAID': await db.orders.updateOne( { _id: event.aggregateId }, { $set: { status: 'paid', paidAt: event.timestamp } } ); io.emit('order_paid', { orderId: event.aggregateId }); break; } });

Combining Database Features with WebSockets

const express = require('express'); const { Server } = require('socket.io'); const { MongoClient } = require('mongodb'); const { Client: PgClient } = require('pg'); const app = express(); const server = app.listen(3000); const io = new Server(server); // MongoDB change streams for messages const mongoClient = new MongoClient('mongodb://localhost:27017'); await mongoClient.connect(); const messages = mongoClient.db('app').collection('messages'); messages.watch().on('change', (change) => { if (change.operationType === 'insert') { io.emit('new_message', change.fullDocument); } }); // PostgreSQL NOTIFY for orders const pgClient = new PgClient({ host: 'localhost', database: 'app', user: 'postgres', password: 'password' }); await pgClient.connect(); await pgClient.query('LISTEN order_updates'); pgClient.on('notification', (msg) => { const order = JSON.parse(msg.payload); io.to(`user-${order.userId}`).emit('order_update', order); }); console.log('Real-time database integration running...');
Exercise:
  1. Set up MongoDB and create a change stream watcher
  2. Create a real-time notification system:
    • Insert notifications into MongoDB
    • Watch for changes with change streams
    • Broadcast to Socket.io clients
  3. Add filtering to only send notifications to relevant users
  4. (Optional) Set up PostgreSQL and implement LISTEN/NOTIFY
  5. Create a simple event sourcing system with event replay

Summary

Database-native real-time features simplify real-time systems:

  • MongoDB Change Streams watch collections for changes
  • PostgreSQL LISTEN/NOTIFY provides pub/sub in the database
  • Database triggers can automatically notify applications
  • Firebase offers real-time sync as a core feature
  • Event sourcing creates natural real-time event streams
  • Combining database features with WebSockets creates powerful real-time apps