Skip to content

better parallelism in partitioned ray executor#945

Draft
cyruszhang wants to merge 18 commits intomainfrom
feat/cyrusz/parallel-partition-actor-reuse
Draft

better parallelism in partitioned ray executor#945
cyruszhang wants to merge 18 commits intomainfrom
feat/cyrusz/parallel-partition-actor-reuse

Conversation

@cyruszhang
Copy link
Copy Markdown
Collaborator

No description provided.

  Partitions now run concurrently as Ray remote tasks instead of
  sequentially, eliminating GPU idle time between partitions.
  max_concurrent_partitions defaults to "auto", which detects the
  number of GPUs in the Ray cluster and sets concurrency accordingly.
  num_of_partitions is automatically raised to match when too low.

  - New: concurrency_scoping.py with scope_op_concurrency() utility
  - New: _process_partitions_concurrent() method using @ray.remote tasks
  - New: _resolve_max_concurrent() for auto GPU detection
  - Sequential path preserved for DAG monitoring and single-GPU setups
  - Tests for scoping, config parsing, auto-inference, and e2e concurrent
  - perf-test.py simplified: no manual GPU tuning flags needed
…count

  The @ray.remote partition tasks had no GPU resources assigned, causing
  torch.cuda.is_available() to return False inside the task. This made
  use_ray_actor() fall back to task mode, reloading the model on every
  batch instead of once per actor — explaining the 30+ minute stalls.

  Fix: force ray_execution_mode="actor" on ops with num_gpus > 0 after
  re-creating them in the remote task.

  Also:
  - max_concurrent_partitions defaults to "auto" (detect GPUs from Ray
    cluster), num_of_partitions auto-raised to match
  - perf-test.py auto-detects GPU count for num_proc instead of
    hardcoding 8, benefiting both ray and ray_partitioned modes
  Repartition dataset to exactly num_partitions blocks before split()
  to ensure even row distribution. Without this, some input blocks can
  be empty, causing split() to produce 0-row partitions and leaving
  GPUs idle (observed 6/8 GPUs utilized with 2 empty partitions).

  - Repartition before split: redistributes rows evenly, no data loss
  - Skip empty partitions as safety net in concurrent task submission
…processing

  Set ray_execution_mode = "actor" BEFORE calling scope_op_concurrency()
  so that use_ray_actor() returns True and num_proc gets correctly divided
  by max_concurrent_partitions (e.g. 8 // 8 = 1 GPU per partition instead
  of 8).
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the parallelism capabilities of the PartitionedRayExecutor by enabling concurrent processing of data partitions. It introduces a mechanism to dynamically scope the concurrency of individual operators within each parallel partition, optimizing GPU utilization and preventing resource over-subscription. This change aims to reduce overall data processing time, especially for GPU-intensive workflows, by leveraging available resources more effectively.

Highlights

  • Concurrent Partition Processing: The PartitionedRayExecutor now supports processing multiple data partitions in parallel as Ray remote tasks, improving overall throughput for data processing pipelines.
  • Dynamic Operator Concurrency Scoping: A new utility function, scope_op_concurrency, was introduced to dynamically adjust the num_proc for individual operators within each concurrent partition, ensuring efficient resource allocation and preventing GPU over-subscription.
  • Flexible Concurrent Partition Configuration: Users can now configure max_concurrent_partitions to automatically detect available GPUs in the Ray cluster or specify an explicit integer, with automatic adjustment of num_partitions to ensure efficient resource usage.
  • Improved Dataset Splitting Robustness: The dataset is now explicitly repartitioned before splitting to prevent the creation of empty partitions, ensuring all partitions contain data for processing.
  • New Design Document: A design document, parallel_partition_actor_reuse.md, was added, outlining the problem of sequential GPU operator execution and proposing solutions for improved parallelism and actor reuse.
  • Performance Benchmark Script: A new perf-test.py script was added to benchmark the performance of DataJuicer's Ray executors, including the new partitioned mode, and direct Ray Data parallelism.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • data_juicer/core/data/ray_dataset.py
    • Updated Ray map_batches calls to directly specify operator concurrency (concurrency=op.num_proc) instead of using get_compute_strategy, aiming for better GPU utilization.
  • data_juicer/core/executor/concurrency_scoping.py
    • Added a new utility module defining scope_op_concurrency to dynamically adjust operator concurrency based on the number of concurrently running partitions.
  • data_juicer/core/executor/ray_executor_partitioned.py
    • Introduced max_concurrent_partitions configuration, allowing 'auto' detection of GPUs or explicit integer setting.
    • Implemented a new _process_partitions_concurrent method to execute partitions in parallel as Ray remote tasks.
    • Ensured num_partitions is automatically increased if it's less than max_concurrent_partitions to fully utilize concurrent slots.
    • Added repartitioning logic (dataset.data.repartition(self.num_partitions)) before splitting the dataset to prevent empty partitions.
    • Modified _configure_partitioning to resolve max_concurrent_partitions and adjust num_partitions.
    • Modified _process_with_simple_partitioning to branch to the new concurrent processing logic if max_concurrent_partitions is greater than 1.
  • docs/design/parallel_partition_actor_reuse.md
    • Added a new design document outlining the problem of sequential GPU operator execution and proposing solutions for improved parallelism and actor reuse in the partitioned Ray executor.
  • perf-test.py
    • Added a new benchmark script to evaluate the performance of DataJuicer's Ray executors, including direct Ray Data parallelism and partitioned executor modes.
  • pyproject.toml
    • Added a [tool.uv] section to specify constraint dependencies for the kaleido package.
  • tests/core/executor/test_ray_executor_partitioned.py
    • Expanded unit tests to cover the new concurrency scoping logic in ConcurrencyScopingTest.
    • Added new test cases in ConcurrentPartitionConfigTest to validate configuration handling for max_concurrent_partitions and end-to-end concurrent execution scenarios, including checkpointing and backward compatibility.
Activity
  • The author, cyruszhang, implemented changes to improve parallelism in the partitioned Ray executor.
  • A new design document was added to explain the motivation and proposed architecture for parallel partition processing and actor reuse.
  • A new benchmark script was introduced to test the performance of the updated executor.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces concurrent partition processing in the partitioned Ray executor to improve parallelism. The changes are mostly in ray_executor_partitioned.py, with supporting utilities and tests. My review found a couple of critical bugs in the new concurrent processing logic that could lead to errors when partitions are skipped. I've also pointed out a minor issue with exception handling and a significant discrepancy between the new design document and the actual implementation. Overall, the direction is good, but the identified bugs need to be addressed.

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
  Repartition(32) was reducing the 96 natural source blocks (from JSONL
  shards) down to 32, losing parallelism. Let split() distribute the
  original blocks round-robin so each partition inherits ~12 blocks,
  enabling better streaming pipelining within each partition.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant