بنية التخزين المؤقت والمراسلة

منتجو Kafka ومستهلكوه في بيئة الإنتاج

18 دقيقة الدرس 5 من 30

منتجو Kafka ومستهلكوه في بيئة الإنتاج

يتطلب تشغيل Kafka على نطاق واسع التحكم الدقيق في كيفية كتابة المنتجين وقراءة المستهلكين. الركائز الأربع لهذا الدرس — دلالات الإقرار، والإنتاج غير القابل للتكرار، ومجموعات المستهلكين، ومراقبة التأخر — تحكم كل قرار موثوقية وإنتاجية على المسار الحرج. هنا تكسب المجموعة التي "تعمل في الاختبار" ثقة الإنتاج أو تفقدها.

إقرارات المنتج (acks)

يتحكم إعداد acks في عدد النسخ المتماثلة من الوسيط التي يجب أن تؤكد الكتابة قبل أن يعتبرها العميل ناجحة. توجد ثلاث قيم عملية:

  • acks=0 — أرسل وانسَ. يرسل المنتج ويستمر دون أي ضمان للمتانة. مناسب فقط للبيانات عالية الحجم المتسامحة مع الفقد (تتبع النقرات، مقاييس ping).
  • acks=1 — يُقرّ قائد القسم. أسرع من all، لكن إذا فشل القائد قبل نسخ الأتباع، تُفقد الرسالة بصمت. شائع في الكود القديم؛ تجنبه لأي شيء مالي أو مرتبط بالتدقيق.
  • acks=all (أو acks=-1) — ينتظر القائد حتى تُقرّ جميع النسخ المتزامنة (ISR). بالاقتران مع min.insync.replicas=2 على الوسيط، هذا هو الحد الأدنى للمتانة على مستوى الإنتاج. يضيف حوالي 5–15 ملي ثانية من زمن الاستجابة مقارنة بـ acks=1، وهي تكلفة تستحق الدفع دائمًا تقريبًا.
فخ min.insync.replicas: ضبط acks=all لا يوفر أي حماية إذا كان min.insync.replicas=1 (الإعداد الافتراضي للوسيط). اضبط دائمًا min.insync.replicas=2 (أو 3 لمواضيع RF=3 التي تحمل بيانات مرتبطة بـ SLO). عدم التطابق هذا هو أحد أكثر تكوينات فقدان البيانات الصامتة شيوعًا في مجموعات Kafka الإنتاجية.
# إعداد المنتج — الأولوية للمتانة (خصائص عميل Java أو ما يعادلها في librdkafka) acks=all min.insync.replicas=2 # يُضبط على الموضوع أو الوسيط، وليس على العميل retries=2147483647 # إعادة المحاولة إلى أجل غير مسمى؛ الاعتماد على delivery.timeout.ms delivery.timeout.ms=120000 # حد صارم لمدة دقيقتين على نافذة إعادة المحاولة الكاملة enable.idempotence=true # يقترن مع acks=all تلقائيًا linger.ms=5 # نافذة تجميع صغيرة للإنتاجية batch.size=65536 # دفعة 64 كيلوبايت؛ اضبطها مع مقاييس المنتج compression.type=snappy # رخيص حسابيًا، ضغط ~3x لحمولات JSON

المنتجون غير القابلون للتكرار (Idempotent Producers)

إعادة المحاولات الشبكية دون idempotence تُنشئ تكرارات. عند تعيين enable.idempotence=true، يُعيّن الوسيط لكل منتج PID (معرّف المنتج) ويتتبع رقم تسلسليًا متصاعدًا لكل قسم. إذا تلقى الوسيط دفعة مُعاد إرسالها سبق أن التزم بها، يزيل تكرارها بصمت — دلالة exactly-once ضمن جلسة منتج واحدة، دون أي تكلفة للمعاملات.

تتطلب Idempotence: acks=all، وmax.in.flight.requests.per.connection <= 5، وretries > 0. يُطبّق عميل Kafka هذه المتطلبات تلقائيًا عند ضبط enable.idempotence=true؛ لا تتجاوزها بشكل فردي وإلا سيرفض الوسيط تسجيل المنتج.

Idempotence مقابل المعاملات: تُزيل المنتجات غير القابلة للتكرار التكرارات في الكتابات إلى قسم-موضوع واحد من نسخة منتج واحدة. إذا احتجت إلى كتابات ذرية عبر أقسام أو مواضيع متعددة (مثل خطوط أنابيب read-process-write)، استخدم معاملات Kafka (transactional.id + initTransactions()). تضيف المعاملات زمن استجابة ملحوظًا — احتفظ بها لتدفقات الدفع والمخزون ودفاتر الأستاذ.

مجموعات المستهلكين وتوزيع الأقسام

مجموعة المستهلكين هي وحدة الاستهلاك المتوازي في Kafka. يشترك كل مستهلك في المجموعة في نفس الموضوع (الموضوعات)؛ يُعيّن منسق المجموعة (وسيط) كل قسم لمستهلك واحد بالضبط. يمنح هذا التوسع الأفقي: موضوع من 60 قسمًا يمكن تصريفه بواسطة ما يصل إلى 60 مستهلكًا في نفس المجموعة في وقت واحد. المستهلكون الزائدون عن عدد الأقسام يجلسون خاملين — خطأ شائع في الإفراط في التوفير.

Kafka Consumer Group Partition Assignment Topic: orders Partition 0 Partition 1 Partition 2 Partition 3 Partition 4 Partition 5 Consumer Group: order-processors Consumer A P0, P1 pod-0 Consumer B P2, P3 pod-1 Consumer C P4, P5 pod-2 __consumer_offsets الإزاحات المُلتزم بها لكل قسم المتقطع = التزام بالإزاحة
ستة أقسام موزعة بالتساوي على ثلاثة مستهلكين في المجموعة؛ كل مستهلك يُلتزم بالإزاحات بشكل مستقل.

إعادة التوازن: الأسباب والتكلفة والتخفيف

تُشغَّل إعادة التوازن عند تغيير عضوية المجموعة: ينضم مستهلك أو يتعطل أو يفشل في إرسال النبضات خلال session.timeout.ms. خلال إعادة التوازن المتحمسة (الافتراضي قبل الإصدار 2.4)، يُسقط جميع المستهلكين جميع الأقسام ويُعيدون التعيين من الصفر — توقف كامل يوقف الاستهلاك على مستوى المجموعة لثوانٍ إلى عشرات الثواني في المجموعات الكبيرة.

تخفيفات الإنتاج، بترتيب التأثير:

  1. إعادة التوازن التعاونية اللاصقة — اضبط partition.assignment.strategy=CooperativeStickyAssignor. يُلغى فقط الأقسام التي تحتاج إلى الانتقال؛ يستمر الباقي في المعالجة. متوفر منذ Kafka 2.4 والافتراضي المقبول الوحيد للخدمات الجديدة.
  2. عضوية المجموعة الثابتة — عيّن group.instance.id ثابتًا (مثل اسم الـ pod). يحتفظ الوسيط بتعيين قسم المستهلك لمدة session.timeout.ms بعد قطع الاتصال قبل تشغيل إعادة التوازن. إعادة تشغيل الـ pod في عمليات النشر المتدرجة لم تعد تسبب إعادة ترتيب كاملة للمجموعة.
  3. ضبط مهلات النبضات/الجلسة — الافتراضي session.timeout.ms=45000 طويل جدًا لاكتشاف الفشل السريع. الضبط الشائع في الإنتاج: session.timeout.ms=15000، heartbeat.interval.ms=4000، max.poll.interval.ms=300000. ارفع max.poll.interval.ms إذا استغرقت معالجة دفعة واحدة دقائق بشكل مشروع (استدلال ML، كتابات بطيئة إلى قاعدة البيانات).
# إعداد المستهلك — خط الأساس الإنتاجي group.id=order-processors group.instance.id=${HOSTNAME} # عضوية ثابتة؛ تُضبط لكل pod partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor session.timeout.ms=15000 heartbeat.interval.ms=4000 max.poll.interval.ms=300000 max.poll.records=500 # اضبط مع زمن استجابة المعالجة auto.offset.reset=earliest # إعادة التشغيل من البداية للمجموعة الجديدة enable.auto.commit=false # التزام صريح فقط في الإنتاج isolation.level=read_committed # مطلوب عند استخدام المنبع للمعاملات

التزامات الإزاحة: اليدوية مقابل التلقائية

enable.auto.commit=true يُلتزم بالإزاحات كل auto.commit.interval.ms (الافتراضي 5000 ملي ثانية) بغض النظر عما إذا كان تطبيقك قد نجح في معالجة تلك الرسائل. التعطل بين الالتزام التلقائي وإكمال المعالجة يعني تخطي تلك الرسائل بصمت عند إعادة التشغيل. هذا هو السبب الأساسي لفقدان البيانات في تطبيقات مستهلك Kafka.

مع الالتزامات اليدوية، استخدم الالتزام بعد المعالجة: استدعِ commitSync() أو commitAsync() فقط بعد نجاح الكتابة الأسفل (upsert في قاعدة البيانات، استدعاء API أسفل، إلخ). للدلالة exactly-once من البداية إلى النهاية، الخيار الوحيد الموثوق هو معاملات Kafka أو الكتابات الأسفل غير القابلة للتكرار مع الالتزامات اليدوية.

At-least-once مع سجلات idempotent: تستخدم معظم أنظمة الإنتاج commitAsync اليدوي مع احتياطي متزامن عند الفشل، مقترنًا بكتابات أسفل غير قابلة للتكرار (upsert بمعرف الحدث). هذا أبسط من معاملات Kafka ويتعامل بشكل أنيق مع حالة 99.99% من التكرارات الأحادية الرسالة.

تأخر المستهلك: أهم مقياس تشغيلي

تأخر المستهلك هو الفرق بين أحدث إزاحة في قسم (إزاحة نهاية السجل) والإزاحة الأخيرة الملتزم بها لمجموعة مستهلكين. يُقاس بـ عدد الرسائل، وليس بالوقت — تأخر 50,000 بمعدل 10,000 رسالة/ثانية يعني 5 ثوانٍ من الأعمال المتراكمة؛ نفس التأخر بمعدل 100 رسالة/ثانية يعني 8 دقائق.

إشارات التأخر الرئيسية وعتباتها (اضبطها وفق SLO):

  • تأخر متنامٍ — معدل الاستهلاك < معدل الإنتاج. وسّع المستهلكين (حتى عدد الأقسام) أو زد max.poll.records.
  • تأخر ثابت غير صفري — المستهلك يواكب البث المباشر لكن لا يُصفّي الأعمال المتراكمة أبدًا. يحدث عادةً بسبب موجة لم تتعافَ منها بالكامل، أو إعادة تشغيل مستهلك تأخر.
  • ارتفاع مفاجئ في التأخر — إعادة توازن جارية، تعطل مستهلك، أو تبعية أسفل بطيئة. تنبيه فوري إذا تجاوز ما يعادل 60 ثانية.
# مراقبة التأخر من سطر الأوامر (Kafka 3.x، وضع KRaft أو ZK) kafka-consumer-groups.sh \ --bootstrap-server kafka:9092 \ --group order-processors \ --describe # أعمدة الإخراج: TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID # إعادة ضبط الإزاحات إلى earliest (تصفية الأعمال المتراكمة من البداية — استخدم بحذر) kafka-consumer-groups.sh \ --bootstrap-server kafka:9092 \ --group order-processors \ --topic orders \ --reset-offsets --to-earliest --execute # مقياس Prometheus JMX للتنبيه # kafka.consumer:type=consumer-fetch-manager-metrics, # attribute=records-lag-max # تنبيه: records-lag-max > 50000 لأكثر من دقيقتين

على نطاق واسع، يُكشف تأخر كل قسم عبر JMX exporter ويُعرض في Grafana (انظر درس المراقبة). اضبط عتبتَي تنبيه: تحذير عند ما يعادل 60 ثانية من التأخر، وحرج عند نقطة خرق SLO. اربط التنبيه الحرج مباشرةً بدورة المناوبة — مجموعة مستهلكين تتوقف عن الالتزام بالإزاحات في خط أنابيب المدفوعات هي حادثة P1.

التأخر بالوقت مقابل الرسائل: لا يكشف Kafka بطبيعته عن التأخر بالثواني. أدوات مثل Burrow (LinkedIn) أو Confluent Control Center تحسب سرعة التأخر (هل المستهلكون يلحقون أم يتأخرون أكثر؟) وهو أكثر فائدة تشغيليًا من عدد الرسائل الخام. في الشركات التي تُشغّل >1 تيرابايت/يوم، يُعدّ التوسع التلقائي بناءً على التأخر (KEDA Kafka scaler على Kubernetes) معيارًا — يتوسع نشر المستهلك عند تجاوز التأخر حدًا ثم يتقلص عند التصفية.

تجميع الأمر: قائمة التحقق الإنتاجية

  • اضبط acks=all + min.insync.replicas=2 على جميع المواضيع المرتبطة بـ SLO.
  • مكّن enable.idempotence=true على جميع المنتجين افتراضيًا؛ أضف المعاملات فقط حيثما تطلب الذرية عبر أقسام متعددة.
  • استخدم CooperativeStickyAssignor والعضوية الثابتة في المجموعة لجميع خدمات المستهلك طويلة الأمد.
  • عطّل الالتزام التلقائي؛ التزم بالإزاحات فقط بعد المعالجة الأسفل الناجحة.
  • تنبيه على سرعة تأخر المستهلك (وليس فقط العدد المطلق) واربط التنبيهات الحرجة بالمناوبة.
  • اختبر سلوك إعادة التوازن في الاختبار: أوقف pod مستهلك في منتصف الدفعة وتحقق من عدم فقدان أي رسائل أو تخطيها بشكل دائم.