Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][compiled graphs] Rename adag -> cgraph to clean up API #49448

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/ray-core/api/exceptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Exceptions
ray.exceptions.ObjectReconstructionFailedLineageEvictedError
ray.exceptions.RayChannelError
ray.exceptions.RayChannelTimeoutError
ray.exceptions.RayAdagCapacityExceeded
ray.exceptions.RayCgraphCapacityExceeded
ray.exceptions.RuntimeEnvSetupError
ray.exceptions.CrossLanguageError
ray.exceptions.RaySystemError
Expand Down
10 changes: 5 additions & 5 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ def __init__(
max_inflight_executions: The maximum number of in-flight executions that
are allowed to be sent to this DAG. Before submitting more requests,
the caller is responsible for calling ray.get to get the result,
otherwise, RayAdagCapacityExceeded is raised.
otherwise, RayCgraphCapacityExceeded is raised.
overlap_gpu_communication: Whether to overlap GPU communication with
computation during DAG execution. If True, the communication
and computation can be overlapped, which can improve the
Expand Down Expand Up @@ -968,7 +968,7 @@ def _preprocess(self) -> None:
continue
if (
len(task.downstream_task_idxs) == 0
and task.dag_node.is_adag_output_node
and task.dag_node.is_cgraph_output_node
):
assert self.output_task_idx is None, "More than one output node found"
self.output_task_idx = idx
Expand Down Expand Up @@ -1143,7 +1143,7 @@ def _preprocess(self) -> None:
continue
if (
len(task.downstream_task_idxs) == 0
and not task.dag_node.is_adag_output_node
and not task.dag_node.is_cgraph_output_node
):
leaf_nodes.append(task.dag_node)
# Leaf nodes are not allowed because the exception thrown by the leaf
Expand Down Expand Up @@ -1931,13 +1931,13 @@ def raise_if_too_many_inflight_requests(self):
self._execution_index - self._max_finished_execution_index
)
if num_in_flight_requests > self._max_inflight_executions:
raise ray.exceptions.RayAdagCapacityExceeded(
raise ray.exceptions.RayCgraphCapacityExceeded(
f"There are {num_in_flight_requests} in-flight requests which "
"is more than specified _max_inflight_executions of the dag: "
f"{self._max_inflight_executions}. Retrieve the output using "
"ray.get before submitting more requests or increase "
"`max_inflight_executions`. "
"`adag.experimental_compile(_max_inflight_executions=...)`"
"`dag.experimental_compile(_max_inflight_executions=...)`"
)

def _has_execution_results(
Expand Down
16 changes: 8 additions & 8 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(

self._type_hint: ChannelOutputType = ChannelOutputType()
# Whether this node calls `experimental_compile`.
self.is_adag_output_node = False
self.is_cgraph_output_node = False

def _collect_upstream_nodes(self) -> List["DAGNode"]:
"""
Expand Down Expand Up @@ -236,7 +236,7 @@ def experimental_compile(
_max_buffered_results = ctx.max_buffered_results

# Validate whether this DAG node has already been compiled.
if self.is_adag_output_node:
if self.is_cgraph_output_node:
raise ValueError(
"It is not allowed to call `experimental_compile` on the same DAG "
"object multiple times no matter whether `teardown` is called or not. "
Expand All @@ -245,7 +245,7 @@ def experimental_compile(
# Whether this node is an output node in the DAG. We cannot determine
# this in the constructor because the output node is determined when
# `experimental_compile` is called.
self.is_adag_output_node = True
self.is_cgraph_output_node = True
return build_compiled_dag_from_ray_dag(
self,
_submit_timeout,
Expand Down Expand Up @@ -425,24 +425,24 @@ def traverse_and_apply(self, fn: "Callable[[DAGNode], T]"):
"""
visited = set()
queue = [self]
adag_output_node: Optional[DAGNode] = None
cgraph_output_node: Optional[DAGNode] = None

while queue:
node = queue.pop(0)
if node._args_contain_nested_dag_node:
self._raise_nested_dag_node_error(node._bound_args)

if node not in visited:
if node.is_adag_output_node:
if node.is_cgraph_output_node:
# Validate whether there are multiple nodes that call
# `experimental_compile`.
if adag_output_node is not None:
if cgraph_output_node is not None:
raise ValueError(
"The DAG was compiled more than once. The following two "
"nodes call `experimental_compile`: "
f"(1) {adag_output_node}, (2) {node}"
f"(1) {cgraph_output_node}, (2) {node}"
)
adag_output_node = node
cgraph_output_node = node
fn(node)
visited.add(node)
"""
Expand Down
20 changes: 10 additions & 10 deletions python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2191,7 +2191,7 @@ def fwd(self, x):
output_refs = []
for i in range(MAX_INFLIGHT_EXECUTIONS):
output_refs.append(dag.execute(i))
with pytest.raises(ray.exceptions.RayAdagCapacityExceeded):
with pytest.raises(ray.exceptions.RayCgraphCapacityExceeded):
dag.execute(1)
assert len(output_refs) == MAX_INFLIGHT_EXECUTIONS
for i, ref in enumerate(output_refs):
Expand Down Expand Up @@ -2227,7 +2227,7 @@ async def main():
output_refs = []
for i in range(MAX_INFLIGHT_EXECUTIONS):
output_refs.append(await async_dag.execute_async(i))
with pytest.raises(ray.exceptions.RayAdagCapacityExceeded):
with pytest.raises(ray.exceptions.RayCgraphCapacityExceeded):
await async_dag.execute_async(1)
assert len(output_refs) == MAX_INFLIGHT_EXECUTIONS
for i, ref in enumerate(output_refs):
Expand All @@ -2254,8 +2254,8 @@ def test_event_profiling(ray_start_regular, monkeypatch):
y = b.inc.bind(inp)
z = b.inc.bind(y)
dag = MultiOutputNode([x, z])
adag = dag.experimental_compile()
ray.get(adag.execute(1))
cdag = dag.experimental_compile()
ray.get(cdag.execute(1))

a_events = ray.get(a.get_events.remote())
b_events = ray.get(b.get_events.remote())
Expand Down Expand Up @@ -2297,8 +2297,8 @@ def add_value_to_tensor(self, value: int, tensor: torch.Tensor) -> torch.Tensor:
2. Both the input and output of the graph are the same actor process.

This test suite covers the second case. The second case is useful when we use
Ray Serve to deploy the ADAG as a backend. In this case, the Ray Serve replica,
which is an actor, needs to be the input and output of the graph.
Ray Serve to deploy the Compiled Graph as a backend. In this case, the Ray Serve
replica, which is an actor, needs to be the input and output of the graph.
"""


Expand Down Expand Up @@ -2502,10 +2502,10 @@ def __init__(self):
inp,
).with_type_hint(TorchTensorType()),
)
self._adag = dag.experimental_compile()
self._cdag = dag.experimental_compile()

def call(self, value):
return ray.get(self._adag.execute(value))
return ray.get(self._cdag.execute(value))

replica = Replica.remote()
ref = replica.call.remote(5)
Expand Down Expand Up @@ -2536,8 +2536,8 @@ def f(self, i):
y = b.f.bind(inp)
dag = MultiOutputNode([x, y])

adag = dag.experimental_compile(enable_asyncio=True)
refs = await adag.execute_async(1)
cdag = dag.experimental_compile(enable_asyncio=True)
refs = await cdag.execute_async(1)
outputs = []
for ref in refs:
outputs.append(await ref)
Expand Down
12 changes: 6 additions & 6 deletions python/ray/dag/tests/experimental/test_multi_node_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ def _get_node_id(self) -> "ray.NodeID":
z = c.inc.bind(inp)
dag = MultiOutputNode([x, y, z])

adag = dag.experimental_compile()
cdag = dag.experimental_compile()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want cdag or cgraph ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

external API: cgraph
internally, cgraph and cdag can be used interchangeably
here cdag is easier to read since we call experimental_compile() on dag


for i in range(1, 10):
assert ray.get(adag.execute(1)) == [i, i, i]
assert ray.get(cdag.execute(1)) == [i, i, i]


def test_bunch_readers_on_different_nodes(ray_start_cluster):
Expand Down Expand Up @@ -134,10 +134,10 @@ def _get_node_id(self) -> "ray.NodeID":
outputs.append(actor.inc.bind(inp))
dag = MultiOutputNode(outputs)

adag = dag.experimental_compile()
cdag = dag.experimental_compile()

for i in range(1, 10):
assert ray.get(adag.execute(1)) == [
assert ray.get(cdag.execute(1)) == [
i for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1))
]

Expand Down Expand Up @@ -322,12 +322,12 @@ def __init__(self):
x,
)

self._adag = dag.experimental_compile(
self._cdag = dag.experimental_compile(
_submit_timeout=120,
)

def call(self, prompt: str) -> bytes:
return ray.get(self._adag.execute(prompt))
return ray.get(self._cdag.execute(prompt))

parallel = DriverActor.remote()
assert ray.get(parallel.call.remote("abc")) == "abc"
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dag/tests/experimental/test_torch_tensor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ def test_nccl_all_reduce_with_class_method_output_node(ray_start_regular):

@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 2}], indirect=True)
def test_tensor_writable_warning_suppressed(ray_start_regular):
"""When we move cpu tensor to gpu, aDAG does zero-copy with is_wriatble=False.
"""When we move cpu tensor to gpu, Compiled Graph does zero-copy with is_writable=False.
Torch doesn't like it, so it prints warning. We know that it is safe to do it,
so Ray suppress the warning message. This test verifies the warning is not
printed in this scenario.
Expand Down
8 changes: 4 additions & 4 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,14 +889,14 @@ class RayChannelError(RaySystemError):

@PublicAPI(stability="alpha")
class RayChannelTimeoutError(RayChannelError, TimeoutError):
"""Raised when the accelerated DAG channel operation times out."""
"""Raised when the Compiled Graph channel operation times out."""

pass


@PublicAPI(stability="alpha")
class RayAdagCapacityExceeded(RaySystemError):
"""Raised when the accelerated DAG channel's buffer is at max capacity"""
class RayCgraphCapacityExceeded(RaySystemError):
"""Raised when the Compiled Graph channel's buffer is at max capacity"""

pass

Expand Down Expand Up @@ -929,5 +929,5 @@ class RayAdagCapacityExceeded(RaySystemError):
RayChannelError,
RayChannelTimeoutError,
OufOfBandObjectRefSerializationException,
RayAdagCapacityExceeded,
RayCgraphCapacityExceeded,
]
2 changes: 1 addition & 1 deletion python/ray/experimental/channel/cached_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def read(self, timeout: Optional[float] = None) -> Any:
), "Cannot read from the serialization context while inner channel is None."
value = self._inner_channel.read(timeout)
ctx.set_data(self._channel_id, value, self._num_reads)
# NOTE: Currently we make a contract with aDAG users that the
# NOTE: Currently we make a contract with Compiled Graph users that the
# channel results should not be mutated by the actor methods.
# When the user needs to modify the channel results, they should
# make a copy of the channel results and modify the copy.
Expand Down
8 changes: 4 additions & 4 deletions python/ray/experimental/channel/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
@DeveloperAPI
class Communicator(ABC):
"""
Communicator for a group of aDAG actors on Nvidia GPU.
Communicator for a group of Compiled Graph actors on Nvidia GPU.
The aDAG execution leverages this internally to support communication
The Compiled Graph execution leverages this internally to support communication
between actors in the group.
"""

Expand All @@ -29,8 +29,8 @@ def initialize(self, rank: int) -> None:
"""
Initialize the communicator from the actor.
This is called once by aDAG on each actor to initialize the communicator,
before any other methods.
This is called once by Compiled Graph on each actor to initialize the
communicator,before any other methods.
Args:
rank: The rank of this actor in the group.
Expand Down
2 changes: 1 addition & 1 deletion python/ray/experimental/channel/nccl_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
class _NcclGroup(Communicator):
"""
Represents an actor's NCCL communicator. This is the default NCCL communicator
to be used in aDAG if a custom communicator is not provided.
to be used in Compiled Graph if a custom communicator is not provided.
This class is not thread-safe.
"""
Expand Down
2 changes: 1 addition & 1 deletion python/ray/experimental/channel/shared_memory_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _create_channel_ref(
return object_ref


# aDAG maintains 1 reader object reference (also called buffer) per node.
# Compiled Graph maintains 1 reader object reference (also called buffer) per node.
# reader_ref: The object reference.
# ref_owner_actor_id: The actor who created the object reference.
# num_readers: The number of reader actors who reads this object reference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ def exec_ray_dag(
sender,
receiver,
use_nccl=False,
use_adag=True,
use_cgraph=True,
static_shape=False,
direct_return=False,
):
# Test torch.Tensor sent between actors.
with InputNode() as inp:
dag = sender.send.bind(SHAPE, DTYPE, inp)

if use_adag:
if use_cgraph:
dag = dag.with_type_hint(
TorchTensorType(
_static_shape=static_shape,
Expand All @@ -138,7 +138,7 @@ def exec_ray_dag(

dag = receiver.recv.bind(dag)

if use_adag:
if use_cgraph:
dag = dag.experimental_compile()

def _run():
Expand All @@ -154,7 +154,7 @@ def _run():

results = timeit(label, _run)

if use_adag:
if use_cgraph:
dag.teardown()

# Workaround for Ray bug in reusing GPUs too quickly.
Expand Down Expand Up @@ -301,7 +301,7 @@ def exec_ray_core_cpu(sender_hint, receiver_hint):
time.sleep(1)
sender = TorchTensorWorker.options(scheduling_strategy=sender_hint).remote()
receiver = TorchTensorWorker.options(scheduling_strategy=receiver_hint).remote()
return exec_ray_dag("exec_ray_core_cpu", sender, receiver, use_adag=False)
return exec_ray_dag("exec_ray_core_cpu", sender, receiver, use_cgraph=False)


def exec_ray_dag_gpu_ipc_gpu():
Expand Down Expand Up @@ -355,7 +355,7 @@ def exec_ray_core_gpu(sender_hint, receiver_hint):
receiver = TorchTensorWorker.options(
num_gpus=1, scheduling_strategy=receiver_hint
).remote()
return exec_ray_dag("exec_ray_core_gpu", sender, receiver, use_adag=False)
return exec_ray_dag("exec_ray_core_gpu", sender, receiver, use_cgraph=False)


def main(distributed):
Expand Down
Loading