تدفقات Redis
تدفقات Redis
تدفقات Redis هي هيكل بيانات قوي مصمم لبث الرسائل عالي الأداء ومصادر الأحداث. إنه يجمع ميزات السجلات للإلحاق فقط، وقوائم انتظار الرسائل، ومجموعات المستهلكين لإنشاء نظام مراسلة موزع قوي.
ما هي تدفقات Redis؟
التدفقات هي هياكل بيانات سجل للإلحاق فقط حيث يحتوي كل إدخال على معرف فريد ويمكن أن يحتوي على أزواج حقل-قيمة متعددة:
✓ للإلحاق فقط: يتم دائماً إضافة إدخالات جديدة في النهاية ✓ دائم: يتم تخزين الرسائل على القرص (على عكس Pub/Sub) ✓ مرتب: الإدخالات لها طوابع زمنية تزايدية تلقائياً ✓ قابل للاستعلام: القراءة حسب نطاق المعرف أو نطاق الوقت ✓ مجموعات المستهلكين: معالجة التدفق بواسطة مستهلكين متعددين بالتوازي ✓ الإقرار: تتبع الرسائل التي تمت معالجتها
XADD - إضافة إدخالات إلى التدفق
إضافة إدخالات جديدة إلى تدفق مع توليد المعرف التلقائي:
# إضافة إدخال بمعرف تم إنشاؤه تلقائياً (الطابع الزمني - التسلسل) XADD orders * user_id 123 product laptop price 1299.99 # يعيد: "1705856400000-0" (الطابع الزمني بالميلي ثانية - رقم التسلسل) # إضافة إدخال بمعرف محدد XADD orders 1705856400000-1 user_id 124 product mouse price 25.99 # إضافة مع الحد الأقصى للطول (الحفاظ على آخر 1000 إدخال فقط) XADD orders MAXLEN 1000 * user_id 125 product keyboard price 79.99 # التقليم التقريبي (أسرع، يحفظ ~1000 إدخال) XADD orders MAXLEN ~ 1000 * user_id 126 product monitor price 399.99
الطابع الزمني-التسلسل. Redis ينشئ المعرفات تلقائياً باستخدام الوقت الحالي بالميلي ثانية بالإضافة إلى رقم تسلسل للإدخالات في نفس الميلي ثانية.تدفقات Laravel Redis
Laravel ليس لديه دعم مدمج للتدفقات، ولكن يمكنك استخدام أوامر Redis الخام:
use Illuminate\Support\Facades\Redis;
// إضافة حدث طلب إلى التدفق
$orderId = Redis::xadd('orders', '*', [
'user_id' => 123,
'product' => 'laptop',
'price' => 1299.99,
'status' => 'pending',
'timestamp' => now()->toIso8601String()
]);
// يعيد: "1705856400000-0"
// إضافة مع الحد الأقصى للطول
Redis::xadd('orders', 'MAXLEN', '~', 10000, '*', [
'user_id' => 124,
'product' => 'mouse',
'price' => 25.99
]);XREAD - قراءة إدخالات التدفق
قراءة الإدخالات من تدفق واحد أو عدة تدفقات:
# القراءة من البداية (المعرف 0-0 يعني البداية) XREAD COUNT 10 STREAMS orders 0-0 # قراءة الإدخالات الجديدة فقط ($ يعني الأحدث) XREAD STREAMS orders $ # القراءة المحجوبة - انتظار إدخالات جديدة (مهلة 2000 ميلي ثانية) XREAD BLOCK 2000 STREAMS orders $ # القراءة من عدة تدفقات XREAD COUNT 5 STREAMS orders notifications 0-0 0-0
class OrderStreamProcessor
{
public function processOrders()
{
$lastId = $this->getLastProcessedId() ?? '0-0';
while (true) {
// القراءة المحجوبة - انتظار حتى 5 ثوانٍ للإدخالات الجديدة
$entries = Redis::xread(['orders'], [$lastId], 10, 5000);
if (empty($entries)) {
continue; // لا توجد إدخالات جديدة، أعد المحاولة
}
foreach ($entries['orders'] ?? [] as $id => $data) {
try {
$this->processOrder($id, $data);
$lastId = $id; // تحديث آخر معرف تمت معالجته
$this->saveLastProcessedId($lastId);
} catch (\Exception $e) {
logger()->error("فشل في معالجة الطلب {$id}", [
'error' => $e->getMessage()
]);
}
}
}
}
private function processOrder(string $id, array $data)
{
logger()->info("معالجة الطلب {$id}", $data);
// معالجة الطلب...
$order = Order::create([
'user_id' => $data['user_id'],
'product' => $data['product'],
'price' => $data['price']
]);
// إرسال بريد تأكيد، تحديث المخزون، إلخ.
}
}XRANGE - الاستعلام حسب نطاق المعرف
قراءة الإدخالات ضمن نطاق معرف أو وقت محدد:
# قراءة جميع الإدخالات XRANGE orders - + # قراءة الإدخالات من معرف محدد إلى الأحدث XRANGE orders 1705856400000-0 + # قراءة آخر 10 إدخالات (نطاق عكسي) XREVRANGE orders + - COUNT 10 # قراءة الإدخالات في نطاق زمني (نافذة ساعة واحدة) XRANGE orders 1705856400000 1705860000000
public function getRecentOrders(int $count = 100): array
{
// الحصول على آخر $count إدخالات
$entries = Redis::xrevrange('orders', '+', '-', $count);
$orders = [];
foreach ($entries as $id => $data) {
$orders[] = [
'id' => $id,
'timestamp' => $this->extractTimestamp($id),
'data' => $data
];
}
return $orders;
}
private function extractTimestamp(string $id): int
{
return (int) explode('-', $id)[0];
}مجموعات المستهلكين
مجموعات المستهلكين تمكّن المعالجة المتوازية حيث يشارك مستهلكون متعددون عبء العمل:
# إنشاء مجموعة مستهلكين بدءاً من البداية XGROUP CREATE orders order-processors 0-0 # إنشاء مجموعة بدءاً من آخر الإدخالات فقط XGROUP CREATE orders order-processors $ MKSTREAM # إنشاء مجموعات متعددة لأغراض مختلفة XGROUP CREATE orders email-notifiers 0-0 XGROUP CREATE orders analytics 0-0
XREADGROUP - القراءة كمجموعة مستهلكين
قراءة الإدخالات كجزء من مجموعة مستهلكين:
# القراءة كمستهلك "worker-1" في مجموعة "order-processors" XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders > # القراءة المحجوبة مع مهلة XREADGROUP GROUP order-processors worker-1 BLOCK 2000 COUNT 5 STREAMS orders > # قراءة الرسائل غير المقرة (المعلقة لهذا المستهلك) XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders 0-0
class OrderConsumer extends Command
{
protected $signature = 'orders:consume {worker}';
public function handle()
{
$workerId = $this->argument('worker');
$group = 'order-processors';
$this->info("بدأ العامل {$workerId}");
while (true) {
// قراءة رسائل جديدة لهذا المستهلك
$messages = Redis::xreadgroup(
$group,
$workerId,
['orders'],
['>'], // > يعني رسائل غير مُسلَّمة
5, // قراءة 5 رسائل
2000 // مهلة ثانيتين
);
if (empty($messages['orders'] ?? [])) {
continue;
}
foreach ($messages['orders'] as $id => $data) {
try {
$this->processOrder($data);
// إقرار المعالجة الناجحة
Redis::xack('orders', $group, [$id]);
$this->info("تمت المعالجة والإقرار: {$id}");
} catch (\Exception $e) {
logger()->error("فشل في معالجة {$id}", [
'error' => $e->getMessage()
]);
// تظل الرسالة في قائمة الانتظار المعلقة لإعادة المحاولة
}
}
}
}
}إقرار الرسائل (XACK)
الإقرار بأن الرسالة قد تمت معالجتها بنجاح:
# إقرار رسالة واحدة XACK orders order-processors 1705856400000-0 # إقرار رسائل متعددة XACK orders order-processors 1705856400000-0 1705856400000-1 1705856400000-2 # يعيد: (integer) 3 (عدد الرسائل المقرة)
معالجة الرسائل المعلقة
مراقبة ومعالجة الرسائل التي لم يتم إقرارها:
# عرض ملخص الرسائل المعلقة XPENDING orders order-processors # عرض الرسائل المعلقة التفصيلية XPENDING orders order-processors - + 10 worker-1 # المطالبة بالرسائل المتروكة (خاملة لمدة 60000 ميلي ثانية = دقيقة واحدة) XAUTOCLAIM orders order-processors worker-2 60000 0-0 COUNT 10
class DeadLetterHandler extends Command
{
protected $signature = 'orders:dlq';
public function handle()
{
$group = 'order-processors';
$idleTimeout = 300000; // 5 دقائق
while (true) {
// الحصول على الرسائل المعلقة الخاملة لأكثر من 5 دقائق
$pending = Redis::xpending('orders', $group, '-', '+', 100);
foreach ($pending as $entry) {
[$id, $consumer, $idleTime, $deliveryCount] = $entry;
if ($idleTime > $idleTimeout && $deliveryCount > 3) {
// فشلت الرسالة 3+ مرات، انقلها إلى قائمة الانتظار الميتة
$data = Redis::xrange('orders', $id, $id);
Redis::xadd('orders:dlq', '*', $data[$id]);
Redis::xack('orders', $group, [$id]);
$this->warn("تم النقل إلى DLQ: {$id}");
}
}
sleep(60); // التحقق كل دقيقة
}
}
}تقليم التدفق
الحد من حجم التدفق لمنع النمو غير المحدود:
# التقليم الدقيق إلى 1000 إدخال (أبطأ)
XTRIM orders MAXLEN 1000
# التقليم التقريبي (أسرع، يحفظ ~1000 إدخال)
XTRIM orders MAXLEN ~ 1000
# التقليم حسب الحد الأدنى للمعرف (إزالة الإدخالات الأقدم من هذا)
XTRIM orders MINID 1705856400000-0
# Laravel - التقليم المجدول
// في App\Console\Kernel
$schedule->call(function () {
Redis::xtrim('orders', 'MAXLEN', '~', 10000);
})->daily();مصادر الأحداث مع التدفقات
استخدام التدفقات لأنماط مصادر الأحداث:
class UserAccountEventStore
{
private const STREAM = 'user:events:';
public function appendEvent(int $userId, string $eventType, array $data)
{
$streamKey = self::STREAM . $userId;
return Redis::xadd($streamKey, '*', [
'event_type' => $eventType,
'data' => json_encode($data),
'timestamp' => now()->toIso8601String()
]);
}
public function replayEvents(int $userId): array
{
$streamKey = self::STREAM . $userId;
$events = Redis::xrange($streamKey, '-', '+');
$state = [
'balance' => 0,
'status' => 'inactive'
];
foreach ($events as $id => $event) {
$data = json_decode($event['data'], true);
match($event['event_type']) {
'account_created' => $state['status'] = 'active',
'balance_credited' => $state['balance'] += $data['amount'],
'balance_debited' => $state['balance'] -= $data['amount'],
'account_suspended' => $state['status'] = 'suspended',
default => null
};
}
return $state;
}
}- إنشاء نظام معالجة الطلبات باستخدام تدفقات Redis مع مجموعات المستهلكين
- تنفيذ 3 عمليات عامل تستهلك من نفس التدفق بالتوازي
- إضافة إقرار ومعالجة أخطاء مناسبة
- بناء معالج قائمة انتظار ميتة للرسائل الفاشلة (إعادة المحاولة 3 مرات، ثم الانتقال إلى DLQ)
- تنفيذ نمط مصادر الأحداث لحالة حساب المستخدم (تم الإنشاء، تم الإيداع، تم الخصم، تم التعليق)
- إضافة مراقبة لتتبع الرسائل المعلقة، تأخر المستهلك، ومعدل المعالجة