نمط CQRS والبنية المُحرَّكة بالأحداث
يُعدّ CQRS (فصل مسؤولية الأوامر والاستعلامات) نمطًا معماريًا يفصل العمليات التي تُغيّر الحالة (الأوامر) عن العمليات التي تقرأ الحالة (الاستعلامات). بالاقتران مع ناقل الأحداث، يُتيح بناء أنظمة مؤسسية متقارنة الاقتران وقابلة للتوسع بشكل كبير. يُوفّر NestJS حزمة @nestjs/cqrs من الدرجة الأولى تُربط النمط بأكمله بأقل قدر من الكود المتكرر.
لماذا CQRS؟
- فصل المخاوف — تحتاج مسارات الكتابة والقراءة إلى تحجيم وتحقّق وتحسين مختلفة.
- قابلية التدقيق — كل تغيير في الحالة أمرٌ صريح؛ يمكنك تسجيله وإعادة تشغيله أو إسقاطه.
- قابلية الاختبار — المعالجات فئات عادية؛ يتم اختبار معالج الأوامر دون طبقة HTTP.
- التوافق مع مصادر الأحداث — CQRS هو الرفيق الطبيعي لمصادر الأحداث (Event Sourcing) والـ Sagas.
إعداد @nestjs/cqrs
npm install @nestjs/cqrs
استورد CqrsModule في وحدة الميزة. يُسجّل CommandBus وQueryBus وEventBus كمزوّدين يمكن حقنهم في أي مكان.
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { OrdersController } from './orders.controller';
import { PlaceOrderHandler } from './commands/place-order.handler';
import { GetOrderHandler } from './queries/get-order.handler';
import { OrderPlacedHandler } from './events/order-placed.handler';
@Module({
imports: [CqrsModule],
controllers: [OrdersController],
providers: [PlaceOrderHandler, GetOrderHandler, OrderPlacedHandler],
})
export class OrdersModule {}
الأوامر ومعالجاتها
الأمر (Command) هو فئة عادية تحمل النيّة والبيانات الخاصة بإجراء واحد لتغيير الحالة. معالج الأمر مُزيَّن بـ@CommandHandler ويُنفّذ الواجهة ICommandHandler:
// place-order.command.ts
export class PlaceOrderCommand {
constructor(
public readonly userId: string,
public readonly items: { productId: string; qty: number }[],
) {}
}
// place-order.handler.ts
import { CommandHandler, ICommandHandler, EventBus } from '@nestjs/cqrs';
import { PlaceOrderCommand } from './place-order.command';
import { OrderPlacedEvent } from '../events/order-placed.event';
@CommandHandler(PlaceOrderCommand)
export class PlaceOrderHandler implements ICommandHandler<PlaceOrderCommand> {
constructor(
private readonly orderRepo: OrderRepository,
private readonly eventBus: EventBus,
) {}
async execute(command: PlaceOrderCommand): Promise<string> {
const order = await this.orderRepo.create(command.userId, command.items);
this.eventBus.publish(new OrderPlacedEvent(order.id, command.userId));
return order.id;
}
}
معالج واحد لكل أمر. تتوافق كل فئة أمر مع معالج واحد بالضبط. هذا مُطبَّق في وقت التشغيل — تسجيل معالجَين لنفس الأمر يُطلق خطأً.
الاستعلامات ومعالجاتها
تتبع الاستعلامات النمط ذاته، لكنها يجب ألّا تُطفّر الحالة أبدًا. تُنفّذ IQueryHandler ويمكنها إعادة كائنات DTO مباشرةً من مخزن مُحسَّن للقراءة:
// get-order.query.ts
export class GetOrderQuery {
constructor(public readonly orderId: string) {}
}
// get-order.handler.ts
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { GetOrderQuery } from './get-order.query';
@QueryHandler(GetOrderQuery)
export class GetOrderHandler implements IQueryHandler<GetOrderQuery> {
constructor(private readonly readStore: OrderReadRepository) {}
async execute(query: GetOrderQuery) {
return this.readStore.findById(query.orderId);
}
}
الأحداث ومعالجاتها
عندما يُنشر حدثٌ عبر EventBus، يُستدعى أي عدد من معالجات الأحداث المُزيَّنة بـ@EventsHandler. هذا هو العمود الفقري للتأثيرات الجانبية المُحرَّكة بالأحداث (إرسال البريد، تحديث الإسقاطات، تشغيل سياقات محدودة أخرى):
// order-placed.event.ts
export class OrderPlacedEvent {
constructor(public readonly orderId: string, public readonly userId: string) {}
}
// order-placed.handler.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { OrderPlacedEvent } from './order-placed.event';
@EventsHandler(OrderPlacedEvent)
export class OrderPlacedHandler implements IEventHandler<OrderPlacedEvent> {
constructor(private readonly notifier: NotificationService) {}
handle(event: OrderPlacedEvent) {
this.notifier.sendConfirmation(event.userId, event.orderId);
}
}
Sagas — تنسيق العمليات طويلة الأمد
السيجا (Saga) تدفّق تفاعلي مبني على RxJS يستمع إلى حدث أو أكثر ويُصدر أوامر جديدة استجابةً له. الـ Sagas مثالية لسير العمل متعدد الخطوات (مثل: الدفع → التنفيذ → الشحن):
import { Injectable } from '@nestjs/common';
import { Saga, ICommand, ofType } from '@nestjs/cqrs';
import { Observable, map } from 'rxjs';
import { OrderPlacedEvent } from '../events/order-placed.event';
import { StartFulfillmentCommand } from '../commands/start-fulfillment.command';
@Injectable()
export class OrderSaga {
@Saga()
orderPlaced = (events$: Observable<any>): Observable<ICommand> =>
events$.pipe(
ofType(OrderPlacedEvent),
map((event) => new StartFulfillmentCommand(event.orderId)),
);
}
أبقِ الـ Sagas عديمة الحالة. يجب على السيجا فقط تحويل الأحداث إلى أوامر؛ لا تُخزّن الحالة داخل فئة السيجا أبدًا. تنتمي حالة العملية طويلة الأمد إلى قاعدة البيانات (جدول Process Manager).
الإرسال من المتحكم
يبقى المتحكم نحيفًا — يبني كائن الأمر أو الاستعلام فحسب ثم يرسله:
import { Controller, Post, Get, Body, Param } from '@nestjs/common';
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { PlaceOrderCommand } from './commands/place-order.command';
import { GetOrderQuery } from './queries/get-order.query';
@Controller('orders')
export class OrdersController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
@Post()
placeOrder(@Body() dto: PlaceOrderDto) {
return this.commandBus.execute(new PlaceOrderCommand(dto.userId, dto.items));
}
@Get(':id')
getOrder(@Param('id') id: string) {
return this.queryBus.execute(new GetOrderQuery(id));
}
}
الخلاصة
تجمع حزمة @nestjs/cqrs الأوامر والاستعلامات والأحداث والـ Sagas تحت نموذج واحد متسق. تُطفّر الأوامر الحالة عبر معالج وحيد؛ وتقرأ الاستعلامات دون آثار جانبية؛ وتنشر الأحداث النتائج لمعالجات كثيرة؛ وتستخدم الـ Sagas RxJS لتنسيق سير العمل المعقد متعدد الخطوات. معًا، تُشكّل بنية معمارية محرَّكة بالأحداث قابلة للتوسع وقابلة للتدقيق وقابلة للاختبار — وهي أساس تطبيقات NestJS المؤسسية.