أدوات التزامن المتقدّمة

مشروع: معالج مهام متزامن

15 دقيقة الدرس 10 من 13

مشروع: معالج مهام متزامن

على مدار هذه الوحدة الدراسية بنيتَ صندوق أدوات كاملاً: مجمّعات الخيوط، وCallable وFuture، وCompletableFuture، والمجموعات المتزامنة، والأقفال، ومزامِنات التنسيق. في هذا المشروع الختامي ستجمع هذه القطع في تطبيق صغير لكنه واقعي — معالج مهام متزامن — يستقبل دُفعةً من عناصر العمل، ويعالجها بالتوازي عبر مجمّع خيوط محدود الحجم، ويجمع النتائج، ويُبلّغ عن التقدم فور اكتمال كل مهمة.

يتكوّن المشروع من ثلاث طبقات:

  1. تعريف المهمة — سجلّ يصف وحدة العمل الواحدة.
  2. المعالج — يُرسل المهام إلى ExecutorService، ويجمع مقابض CompletableFuture، ويعالج النتائج المنتهية.
  3. المُشغّل — يربط كل شيء معًا، ويُغذّي المهام، ويطبع ملخصًا نهائيًا.

الخطوة الأولى: نمذجة العمل

يجعل سجلّ Java تعريف المهمة موجزًا وغير قابل للتعديل. كل WorkItem له معرّف وحمولة وهمية:

import java.util.concurrent.ThreadLocalRandom; public record WorkItem(int id, String payload) { /** محاكاة عمل كثيف المعالج أو كثيف الإدخال/الإخراج؛ قد يرمي استثناءً على إدخال خاطئ. */ public String process() { int delay = ThreadLocalRandom.current().nextInt(50, 300); try { Thread.sleep(delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Work interrupted", e); } if (payload.isBlank()) { throw new IllegalArgumentException("Empty payload for item " + id); } return "Item-%d: processed '%s' in %d ms".formatted(id, payload, delay); } }
لماذا سجلّ (record)؟ تمنحك السجلات equals وhashCode وtoString وحقولًا نهائية مجانًا. البيانات غير القابلة للتعديل التي تتدفق عبر الكود المتزامن دائمًا أأمن — لا حالة مشتركة قابلة للتعديل تحتاج إلى حماية.

الخطوة الثانية: بناء المعالج

تمتلك فئة المعالج مجمّع خيوط ثابت الحجم وتُعرّض طريقة واحدة تستقبل قائمة عناصر العمل وتُعيد قائمة من CompletableFuture<String>. استخدام CompletableFuture بدلًا من Future الخام يتيح ربط استدعاءات الاسترجاع دون إيقاف الخيط المُستدعي:

import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; public final class TaskProcessor implements AutoCloseable { private final ExecutorService pool; public TaskProcessor(int parallelism) { this.pool = Executors.newFixedThreadPool(parallelism, Thread.ofVirtual().factory()); // خيوط افتراضية (Java 21+) } /** * إرسال جميع العناصر للمعالجة المتزامنة. * كل future يُحلّ بسلسلة النتيجة أو يكتمل باستثناء. */ public List<CompletableFuture<String>> submitAll(List<WorkItem> items) { return items.stream() .map(item -> CompletableFuture .supplyAsync(item::process, pool) .exceptionally(ex -> "Item-%d FAILED: %s".formatted(item.id(), ex.getMessage()))) .collect(Collectors.toList()); } @Override public void close() { pool.shutdown(); try { if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { pool.shutdownNow(); } } catch (InterruptedException e) { pool.shutdownNow(); Thread.currentThread().interrupt(); } } }
الخيوط الافتراضية في Java 21: Thread.ofVirtual().factory() ينشئ مصنع خيوط افتراضية. تمريره إلى newFixedThreadPool يجعل كل مهمة تحصل على خيط افتراضي خفيف الوزن. للأحمال كثيفة الإدخال/الإخراج، يتوسّع هذا أفضل بكثير من خيوط المنصة — يُوقّف JVM الخيوط الافتراضية أثناء الاستدعاءات الحاجبة دون شغل خيط نظام تشغيل. للعمل الكثيف على المعالج، احتفظ بحجم المجمّع عند Runtime.getRuntime().availableProcessors().

الخطوة الثالثة: تجميع النتائج بعداد متزامن

مع اكتمال الـ futures نريد عدًّا حيًّا للنجاحات والإخفاقات. LongAdder مُصمَّم تحديدًا للزيادات عالية التنافس — أسرع من AtomicLong تحت ضغط الكتابة الشديد لأنه يشرّح العداد عبر خلايا:

import java.util.concurrent.atomic.LongAdder; public final class ResultAggregator { private final LongAdder successes = new LongAdder(); private final LongAdder failures = new LongAdder(); public void record(String result) { if (result.contains("FAILED")) { failures.increment(); } else { successes.increment(); } } public void printSummary(int total) { System.out.printf("%n=== الملخص ===%n"); System.out.printf("الإجمالي : %d%n", total); System.out.printf("ناجح : %d%n", successes.sum()); System.out.printf("فاشل : %d%n", failures.sum()); } }

الخطوة الرابعة: تشغيل خط المعالجة

ينشئ المُشغّل عناصر العمل، ويُرسلها، ويربط استدعاء استرجاع بكل عنصر لطباعة التقدم فور اكتمال كل future، ثم ينتظر الدُّفعة كاملة قبل طباعة الملخص:

import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; public final class Main { public static void main(String[] args) throws Exception { int batchSize = 20; int parallelism = 8; List<WorkItem> batch = IntStream.rangeClosed(1, batchSize) .mapToObj(i -> new WorkItem(i, i % 7 == 0 ? "" : "task-" + i)) .toList(); ResultAggregator agg = new ResultAggregator(); try (TaskProcessor processor = new TaskProcessor(parallelism)) { List<CompletableFuture<String>> futures = processor.submitAll(batch); // ربط استدعاء استرجاع غير حاجب بكل future List<CompletableFuture<Void>> reporters = futures.stream() .map(f -> f.thenAccept(result -> { System.out.println(result); agg.record(result); })) .toList(); // الانتظار حتى ينتهي كل reporter — لا يحجب المجمّع CompletableFuture.allOf(reporters.toArray(new CompletableFuture[0])).join(); } agg.printSummary(batchSize); } }
لماذا thenAccept بدلًا من حلقة future.get()؟ استدعاء get() في حلقة يعالج النتائج بترتيب الإرسال — إذا استغرق العنصر 1 مدة 300 مللي ثانية لكن العنصر 2 انتهى في 50 مللي ثانية، تظل منتظرًا مع ذلك. thenAccept يُشغّل الاستدعاء على أي خيط يُكمل الـ future أولًا، فترى المخرجات تصل بترتيب الاكتمال لا الإرسال. ثم allOf(...).join() ينتظر فقط أبطأ عنصر.

المقايضات الرئيسية في التصميم

  • حجم المجمّع مقابل شكل الحمل: للمهام كثيفة الإدخال/الإخراج (شبكة، قرص)، تتفوق الخيوط الافتراضية أو المجمّع الكبير على مجمّع ثابت صغير. للمهام كثيفة المعالج، مجمّع بحجم availableProcessors() يُعظّم الإنتاجية دون الإفراط في الاشتراك.
  • عزل الأخطاء: exceptionally يحوّل الاستثناء إلى سلسلة علامة حتى لا يلغي عنصر معطوب واحد الدُّفعة كلها. بديل ذلك handle() الذي يستقبل النتيجة والاستثناء معًا ويتيح إعادة كائن خطأ أغنى.
  • الضغط الخلفي: إذا أُنتجت المهام بسرعة تفوق معالجتها، تنمو قائمة انتظار المجمّع إلى ما لا نهاية. للأنظمة عالية الإنتاجية فكّر في ThreadPoolExecutor مع ArrayBlockingQueue محدود وCallerRunsPolicy — هذا يُبطّئ المنتِج تلقائيًا.
  • قابلية المراقبة: في الإنتاج تستبدل System.out.println بتسجيل منظّم وتُعرّض عدادات المُجمِّع عبر نقطة نهاية مقاييس (Micrometer، Prometheus).
لا تُشارك المجمّع خارج كتلة try-with-resources. تنفيذ AutoCloseable يُغلق المجمّع عند خروج الكتلة. إذا أفلت منه مرجع، يمكن للمُستدعين إرسال مهام إلى منفّذ مُغلق والحصول على RejectedExecutionException في وقت التشغيل.

توسيع المشروع

بمجرد نجاح الخط الأساسي، فكّر في هذه التمارين لتعميق فهمك:

  1. مهلة لكل مهمة — لُفّ كل supplyAsync بـ .orTimeout(2, TimeUnit.SECONDS). المهام التي تستغرق وقتًا طويلًا تكتمل باستثناء TimeoutException.
  2. قائمة انتظار ذات أولوية — استبدل القائمة بـ PriorityBlockingQueue<WorkItem> مرتّبة بحقل أولوية. أطعم العناصر من القائمة إلى submitAll على دُفعات.
  3. معالجة متعددة المراحل — اربط مرحلة CompletableFuture ثانية بعد الأولى لحفظ النتيجة في قاعدة بيانات. استخدم thenApplyAsync مع منفّذ ثانٍ أصغر لتجنّب حجب مجمّع الحساب بعمليات الإدخال/الإخراج.

الخلاصة

جمع هذا المشروع كل مفهوم من الوحدة: ExecutorService للتنفيذ المُجمَّع، وCompletableFuture للتركيب غير الحاجب، وexceptionally لعزل الأخطاء، وLongAdder للتجميع عالي الأداء، وallOf لمزامنة الحاجز. الرؤية الجوهرية هي أن لكل أداة وظيفة — ادمجها بحسب الدور، واجعل الحالة المشتركة محدودة ومُحكمة الأنواع، ودع خط المعالجة الوظيفي يحمل البيانات عبر المراحل بنظافة.