diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index 7262c85c9f491e..277ecc480ffc9f 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -21,6 +21,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" } bevy_ecs_macros = { path = "macros", version = "0.8.0-dev" } async-channel = "1.4" +thread_local = "1.1.4" fixedbitset = "0.4" fxhash = "0.2" downcast-rs = "1.2" diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index 5f35dffd5b9e1d..56ac5db9acd94c 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -39,7 +39,7 @@ pub mod prelude { }, system::{ Commands, In, IntoChainSystem, IntoExclusiveSystem, IntoSystem, Local, NonSend, - NonSendMut, ParamSet, Query, RemovedComponents, Res, ResMut, System, + NonSendMut, ParallelCommands, ParamSet, Query, RemovedComponents, Res, ResMut, System, SystemParamFunction, }, world::{FromWorld, Mut, World}, diff --git a/crates/bevy_ecs/src/system/commands/mod.rs b/crates/bevy_ecs/src/system/commands/mod.rs index 564f1ab3ebe5fa..bf40e13fb98b18 100644 --- a/crates/bevy_ecs/src/system/commands/mod.rs +++ b/crates/bevy_ecs/src/system/commands/mod.rs @@ -1,4 +1,5 @@ mod command_queue; +mod parallel_scope; use crate::{ bundle::Bundle, @@ -8,6 +9,7 @@ use crate::{ }; use bevy_utils::tracing::{error, warn}; pub use command_queue::CommandQueue; +pub use parallel_scope::*; use std::marker::PhantomData; use super::Resource; diff --git a/crates/bevy_ecs/src/system/commands/parallel_scope.rs b/crates/bevy_ecs/src/system/commands/parallel_scope.rs new file mode 100644 index 00000000000000..41dc9b72891928 --- /dev/null +++ b/crates/bevy_ecs/src/system/commands/parallel_scope.rs @@ -0,0 +1,98 @@ +use std::cell::Cell; + +use thread_local::ThreadLocal; + +use crate::{ + entity::Entities, + prelude::World, + system::{SystemParam, SystemParamFetch, SystemParamState}, +}; + +use super::{CommandQueue, Commands}; + +#[doc(hidden)] +#[derive(Default)] +/// The internal [`SystemParamState`] of the [`ParallelCommands`] type +pub struct ParallelCommandsState { + thread_local_storage: ThreadLocal>, +} + +/// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_for_each`](crate::system::Query::par_for_each) +/// +/// Note: Because command application order will depend on how many threads are ran, non-commutative commands may result in non-deterministic results. +/// +/// Example: +/// ``` +/// # use bevy_ecs::prelude::*; +/// # use bevy_tasks::ComputeTaskPool; +/// # +/// # #[derive(Component)] +/// # struct Velocity; +/// # impl Velocity { fn magnitude(&self) -> f32 { 42.0 } } +/// fn parallel_command_system( +/// mut query: Query<(Entity, &Velocity)>, +/// par_commands: ParallelCommands +/// ) { +/// query.par_for_each(32, |(entity, velocity)| { +/// if velocity.magnitude() > 10.0 { +/// par_commands.command_scope(|mut commands| { +/// commands.entity(entity).despawn(); +/// }); +/// } +/// }); +/// } +/// # bevy_ecs::system::assert_is_system(parallel_command_system); +///``` +pub struct ParallelCommands<'w, 's> { + state: &'s mut ParallelCommandsState, + entities: &'w Entities, +} + +impl SystemParam for ParallelCommands<'_, '_> { + type Fetch = ParallelCommandsState; +} + +impl<'w, 's> SystemParamFetch<'w, 's> for ParallelCommandsState { + type Item = ParallelCommands<'w, 's>; + + unsafe fn get_param( + state: &'s mut Self, + _: &crate::system::SystemMeta, + world: &'w World, + _: u32, + ) -> Self::Item { + ParallelCommands { + state, + entities: world.entities(), + } + } +} + +// SAFE: no component or resource access to report +unsafe impl SystemParamState for ParallelCommandsState { + fn init(_: &mut World, _: &mut crate::system::SystemMeta) -> Self { + Self::default() + } + + fn apply(&mut self, world: &mut World) { + for cq in self.thread_local_storage.iter_mut() { + cq.get_mut().apply(world); + } + } +} + +impl<'w, 's> ParallelCommands<'w, 's> { + pub fn command_scope(&self, f: impl FnOnce(Commands) -> R) -> R { + let store = &self.state.thread_local_storage; + let command_queue_cell = store.get_or_default(); + let mut command_queue = command_queue_cell.take(); + + let r = f(Commands::new_from_entities( + &mut command_queue, + self.entities, + )); + + command_queue_cell.set(command_queue); + r + } +}