أدوات التزامن المتقدّمة

المجموعات المتزامنة

15 دقيقة الدرس 7 من 13

المجموعات المتزامنة

المجموعات القياسية في Java — HashMap وArrayList وLinkedList — ليست آمنة للخيوط المتعددة (thread-safe). حين تقرأ وتكتب عدة خيوط فيها في وقت واحد دون مزامنة خارجية تحصل على حالة تالفة، أو حلقات لا نهاية لها، أو فقدان صامت للبيانات. تشحن حزمة java.util.concurrent مجموعة من المجموعات المُصمَّمة خصيصًا للتزامن تُعالج الأمر داخليًا دون أن يحتاج المستدعي إلى حيازة أقفال. هذا الدرس يغطّي الثلاثة التي ستلجأ إليها أكثر من غيرها: ConcurrentHashMap وCopyOnWriteArrayList وعائلة BlockingQueue.

ConcurrentHashMap

ConcurrentHashMap هي خريطة هاش آمنة للخيوط تحقّق تزامنًا عاليًا بتقسيم الجدول الداخلي إلى مقاطع مستقلة (أو باستخدام عمليات CAS على المجموعات الفردية منذ Java 8). القراءة لا تحجب أبدًا. الكتابة تقفل المجموعة المحدّدة التي تُعدَّل فقط، لذا تعمل الخيوط على مفاتيح مختلفة بالتوازي دون تنافس.

import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public class WordCounter { private final ConcurrentHashMap<String, AtomicInteger> counts = new ConcurrentHashMap<>(); // آمن لاستدعائه من أي خيط في وقت واحد public void record(String word) { counts.computeIfAbsent(word, k -> new AtomicInteger()) .incrementAndGet(); } public int count(String word) { AtomicInteger val = counts.get(word); return val == null ? 0 : val.get(); } }

النقطة المحورية في المثال أعلاه هي computeIfAbsent. هذه عملية ذرية: تُدرج الخريطة AtomicInteger الجديد فقط إن كان المفتاح غائبًا، وتفعل ذلك دون أي قفل خارجي. استدعاء get ثم put بشكل منفصل يُنشئ سباق البيانات — استخدم دائمًا العمليات الذرية المركّبة: putIfAbsent وcomputeIfAbsent وcompute وmerge.

الفخ الشائع — فحص ثم تصرّف: الكود مثل if (!map.containsKey(k)) map.put(k, v) ليس آمنًا للخيوط حتى على ConcurrentHashMap. يمكن لخيط آخر إدراج المفتاح بين الفحص والإدراج. استخدم دائمًا putIfAbsent أو computeIfAbsent للإدراج الشرطي الذري.

تكشف ConcurrentHashMap أيضًا عن دوال تجميع تجتاز الخريطة بدرجة تجانس قابلة للتهيئة:

ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>(); scores.put("Alice", 95); scores.put("Bob", 82); scores.put("Carol", 91); // تقليص متوازٍ — عتبة التوازي 1 تعني استخدام كل الأنوية int total = scores.reduceValues(1, Integer::sum); System.out.println("Total: " + total); // 268
دقة الحجم: size() على ConcurrentHashMap تقريبية في حالة التعديل المتزامن. افضّل mappingCount() حين تحتاج إلى long والخريطة قد تتجاوز Integer.MAX_VALUE. لعدادات دقيقة استخدم AtomicLong خارجيًا.

CopyOnWriteArrayList

تتبنّى CopyOnWriteArrayList المقايضة المعاكسة لـ ConcurrentHashMap: فهي مُحسَّنة لأعباء عمل تُهيمن فيها القراءات على الكتابات. كل عملية تغيير (add أو set أو remove) تُنشئ نسخة جديدة من المصفوفة الداخلية بالكامل، تُجري التغيير على النسخة، ثم تستبدل المرجع ذريًا. القرّاء لا يُحجبون أبدًا ولا يرون كتابات جزئية.

import java.util.concurrent.CopyOnWriteArrayList; public class EventListeners { private final CopyOnWriteArrayList<Runnable> listeners = new CopyOnWriteArrayList<>(); public void addListener(Runnable r) { listeners.add(r); // تنسخ المصفوفة } public void removeListener(Runnable r) { listeners.remove(r); // تنسخ المصفوفة } // يمكن استدعاؤها من خيوط عديدة في آنٍ واحد — لا حجب public void fireAll() { for (Runnable r : listeners) { // التكرار على لقطة — لا يرمي ConcurrentModificationException أبدًا r.run(); } } }

يعمل المُكرِّر الذي تُعيده CopyOnWriteArrayList على اللقطة المأخوذة لحظة استدعاء iterator(). إذا أضاف خيط آخر عنصرًا بعد بدء التكرار فلن يراه المُكرِّر الحالي — وهذا مقصود وآمن.

متى تستخدم CopyOnWriteArrayList: سجلات مستمعي الأحداث، سجلات الإضافات، نمط المراقب — أي قائمة تُسجَّل مرة واحدة أو نادرًا لكنها تُكرَّر عليها مرات كثيرة (مثلًا مع كل طلب وارد). تجنّبها حين تكون الكتابات متكررة: نسخ المصفوفة الكاملة يجعل الكتابات O(n) زمنًا وذاكرة.

BlockingQueue

BlockingQueue هي أساس أنماط المنتج-المستهلك في Java. تمدّ Queue بعمليات تحجب الخيط المستدعي حين تكون القائمة فارغة (عند الأخذ) أو ممتلئة (عند الوضع). هذا يُلغي الحاجة إلى كتابة حلقات wait/notifyAll يدويًا.

للواجهة أربع فئات من العمليات حسب كيفية تعاملها مع حدود السعة:

  • ترمي استثناءadd وremove وelement
  • تُعيد قيمة خاصةoffer وpoll وpeek
  • تحجب إلى أجل غير مسمىput وtake
  • تنتهي بعد مهلةoffer(e, timeout, unit) وpoll(timeout, unit)

التنفيذ الأكثر شيوعًا هو LinkedBlockingQueue. لقوائم العمل المحدودة استخدم ArrayBlockingQueue:

import java.util.concurrent.*; public class ProducerConsumerDemo { public static void main(String[] args) throws InterruptedException { // قائمة محدودة — المنتج يُحجب حين تمتلئ، المستهلك يُحجب حين تفرغ BlockingQueue<String> queue = new ArrayBlockingQueue<>(10); // خيط المنتج Thread producer = new Thread(() -> { String[] items = {"task-1", "task-2", "task-3", "POISON"}; for (String item : items) { try { queue.put(item); // يحجب إن كانت القائمة ممتلئة System.out.println("Produced: " + item); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }); // خيط المستهلك Thread consumer = new Thread(() -> { while (true) { try { String item = queue.take(); // يحجب إن كانت القائمة فارغة if ("POISON".equals(item)) { System.out.println("Consumer done."); return; } System.out.println("Consumed: " + item); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } }); producer.start(); consumer.start(); producer.join(); consumer.join(); } }

نمط حبة السُّم (Poison Pill) أعلاه — إرسال قيمة حارسة للإشارة إلى الانتهاء — هو الطريقة الأنظف لإيقاف حلقة المستهلك دون استخدام علامات volatile أو المقاطعات.

تنفيذات BlockingQueue الأخرى الجديرة بالملاحظة:

  • PriorityBlockingQueue — غير محدودة، تُرتّب العناصر بالترتيب الطبيعي أو Comparator. المهام ذات الأولوية الأعلى تُستهلك أولًا.
  • SynchronousQueue — سعة صفر؛ كل put يحجب حتى يتوفّر take. تُستخدم داخليًا من مصنع مجمع الخيوط ذي التخزين المؤقت لتسليم المهام مباشرة.
  • DelayQueue — العناصر لا تتاح إلا بعد انقضاء تأخير مُحدَّد لكل عنصر. مفيدة لإعادة المحاولة المجدوَلة ومخابئ TTL.
  • LinkedTransferQueue — تجمع ميزات SynchronousQueue وLinkedBlockingQueue، وتوفّر زمن استجابة أقل في السيناريوهات عالية الإنتاجية.
الاختيار بين تنفيذات BlockingQueue: استخدم ArrayBlockingQueue حين تحتاج إلى حدٍّ صارم لضغط التدفق (محدودة). استخدم LinkedBlockingQueue (يمكن تحديدها) حين يكون المنتجون متقطّعين. استخدم PriorityBlockingQueue حين تتفاوت مهامك في الإلحاح. لا تستخدم قائمة غير محدودة لمهام العمل في نظام حقيقي — القائمة غير المحدودة تمتص ضغط التدفق بصمت حتى ينفد ذاكرة العملية.

الخلاصة

تُزيل المجموعات المتزامنة الحاجة إلى كتل synchronized خشنة الحبيبة حول المجموعات القياسية. ConcurrentHashMap هي خريطة الخيوط الافتراضية: استخدم عملياتها الذرية المركّبة ولا تجرِ فحصًا ثم تصرّفًا بنفسك. CopyOnWriteArrayList تتألّق في قوائم المستمعين الكثيفة القراءة لكنها مُكلفة عند التعديل. BlockingQueue هي العمود الفقري لمسارات المنتج-المستهلك، إذ توفّر ضغط التدفق المدمج ودلالات التسليم النظيفة. اختيار المجموعة المتزامنة الصحيحة لا يقلّ أهمية عن اختيار الخوارزمية الصحيحة — كل منها تُرسّخ عقدًا محدّدًا للتزامن ومقايضةً مُحدَّدة في الأداء.