Back to home

Rayman: The Complete Guide to Ray - The AI Compute Engine

A comprehensive exploration of Ray, the distributed computing framework that's revolutionizing AI and machine learning. From Ray Core to Ray Serve, Ray Train, Ray Data, and Anyscale - everything you need to know.

Reading time: ~16 min

Rayman: The Complete Guide to Ray - The AI Compute Engine

Ray is more than just a distributed computing framework—it’s a complete ecosystem for building and scaling AI applications. From simple parallel tasks to complex machine learning pipelines, Ray provides the tools and abstractions needed to take your Python code from a single machine to a distributed cluster with minimal changes.

In this comprehensive guide, we’ll explore Ray’s core components, dive into practical examples, and show you how to build production-ready AI applications that scale seamlessly.

Table of Contents

  1. What is Ray?
  2. Ray Core: The Foundation
  3. Ray Data: Scalable Data Processing
  4. Ray Train: Distributed Model Training
  5. Ray Serve: Model Serving at Scale
  6. Anyscale: The Ray Platform
  7. Real-World Examples
  8. Best Practices and Performance Tips
  9. Conclusion

What is Ray?

Ray is an open-source framework designed to simplify the development and deployment of scalable AI and machine learning applications. It provides a unified compute layer that abstracts the complexities of distributed computing, enabling efficient scaling from a single machine to large clusters.

Key Features

  • Unified API: Write once, scale anywhere
  • Fault Tolerance: Built-in error handling and recovery
  • Resource Management: Automatic resource allocation and scheduling
  • Multi-language Support: Python, Java, and C++ support
  • Cloud Native: Works seamlessly with Kubernetes and cloud providers

Ray Ecosystem

Ray consists of several specialized libraries built on top of Ray Core:

  • Ray Core: Distributed computing primitives
  • Ray Data: Scalable data processing
  • Ray Train: Distributed model training
  • Ray Serve: Model serving and deployment
  • Ray Tune: Hyperparameter optimization
  • Ray RLlib: Reinforcement learning

Ray Core: The Foundation

Ray Core provides the fundamental building blocks for distributed applications. It offers primitives like tasks, actors, and a distributed object store.

Installation

pip install ray

Basic Ray Core Example

Let’s start with a simple example that demonstrates Ray’s power:

import ray
import time

# Initialize Ray
ray.init()

# Sample data
database = ["Learning", "Ray", "Flexible", "Distributed", "Python", "for", "Machine", "Learning"]

# Serial function
def retrieve(item):
    time.sleep(item / 10.0)
    return item, database[item]

# Parallel function using Ray
@ray.remote
def retrieve_task(item):
    return retrieve(item)

# Compare serial vs parallel execution
def compare_execution():
    # Serial execution
    start = time.time()
    serial_data = [retrieve(item) for item in range(8)]
    serial_time = time.time() - start

    # Parallel execution
    start = time.time()
    object_refs = [retrieve_task.remote(item) for item in range(8)]
    parallel_data = ray.get(object_refs)
    parallel_time = time.time() - start

    print(f"Serial execution time: {serial_time:.2f} seconds")
    print(f"Parallel execution time: {parallel_time:.2f} seconds")
    print(f"Speedup: {serial_time / parallel_time:.2f}x")

compare_execution()

# Cleanup
ray.shutdown()

Ray Actors

Actors are stateful distributed objects that maintain state across method calls:

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1
        return self.count

    def get_count(self):
        return self.count

# Create actor instances
counter1 = Counter.remote()
counter2 = Counter.remote()

# Call methods on actors
print(ray.get(counter1.increment.remote()))  # 1
print(ray.get(counter1.increment.remote()))  # 2
print(ray.get(counter2.increment.remote()))  # 1
print(ray.get(counter1.get_count.remote()))  # 2

Ray Object Store

Ray’s object store enables efficient sharing of large objects across tasks:

import ray
import numpy as np

@ray.remote
def create_large_array():
    return np.random.rand(1000, 1000)

@ray.remote
def process_array(array_ref):
    array = ray.get(array_ref)
    return np.sum(array)

# Create a large array
array_ref = create_large_array.remote()

# Process the array without copying it
result = process_array.remote(array_ref)
print(f"Sum: {ray.get(result)}")

Ray Data: Scalable Data Processing

Ray Data provides scalable, framework-agnostic data loading and transformation for ML workloads. It’s designed to handle large datasets efficiently across distributed systems.

Installation

pip install "ray[data]"

Basic Ray Data Operations

import ray
from ray.data import Dataset

# Initialize Ray
ray.init()

# Create a dataset from a range
dataset = ray.data.range(1000)
print(f"Dataset size: {dataset.count()}")

# Add a new column
dataset = dataset.add_column("squared", lambda x: x["id"] ** 2)

# Filter data
filtered_dataset = dataset.filter(lambda x: x["squared"] > 100)

# Show first few rows
print("First 5 rows:")
for row in filtered_dataset.take(5):
    print(row)

Reading Data from Various Sources

Ray Data supports reading from many data sources:

import ray

# Read from CSV
csv_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

# Read from Parquet
parquet_dataset = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")

# Read from JSON
json_dataset = ray.data.read_json("s3://anonymous@ray-example-data/logs.json")

# Read images
image_dataset = ray.data.read_images("s3://anonymous@ray-example-data/images/")

# Read from Hugging Face
from datasets import load_dataset
hf_dataset = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_dataset = ray.data.from_huggingface(hf_dataset["train"])

Data Transformations

import ray
import numpy as np

# Create a dataset
dataset = ray.data.range(1000)

# Map function to each row
def add_random_noise(row):
    row["noise"] = np.random.randn()
    return row

dataset = dataset.map(add_random_noise)

# Map batches for better performance
def process_batch(batch):
    batch["processed"] = batch["id"] * 2 + batch["noise"]
    return batch

dataset = dataset.map_batches(process_batch, batch_size=100)

# Iterate over batches
for batch in dataset.iter_batches(batch_size=10):
    print(f"Batch shape: {batch['id'].shape}")
    break

Integration with ML Frameworks

import ray
import torch
from ray.data import Dataset

# Create a PyTorch-compatible dataset
dataset = ray.data.range(1000)

# Convert to PyTorch tensors
for batch in dataset.iter_torch_batches(batch_size=32):
    print(f"Batch type: {type(batch['id'])}")
    print(f"Batch shape: {batch['id'].shape}")
    break

Ray Train: Distributed Model Training

Ray Train provides abstractions for distributing model training across multiple machines and GPUs. It integrates with popular ML frameworks like PyTorch, TensorFlow, and Hugging Face.

Installation

pip install "ray[train]"

PyTorch Distributed Training

import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig, RunConfig
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Define a simple model
class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(10, 1)

    def forward(self, x):
        return self.linear(x)

# Training function
def train_func():
    # Create model and optimizer
    model = SimpleModel()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # Prepare model for distributed training
    model = ray.train.torch.prepare_model(model)

    # Create dummy data
    X = torch.randn(1000, 10)
    y = torch.randn(1000, 1)
    dataset = TensorDataset(X, y)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

    # Prepare dataloader for distributed training
    dataloader = ray.train.torch.prepare_data_loader(dataloader)

    # Training loop
    for epoch in range(10):
        for batch_x, batch_y in dataloader:
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

        # Report metrics
        ray.train.report({"loss": loss.item(), "epoch": epoch})

# Configure and run training
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=2, use_gpu=True)
)

result = trainer.fit()
print(f"Training completed: {result}")

Multi-GPU Training with Ray Data

import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
import torch
import torch.nn as nn

# Create dataset
dataset = ray.data.range(1000)

# Training function with Ray Data
def train_func():
    # Get dataset shard for this worker
    train_ds = ray.train.get_dataset_shard("train")

    # Convert to PyTorch batches
    train_dataloader = train_ds.iter_torch_batches(batch_size=32)

    # Create model
    model = nn.Linear(10, 1)
    model = ray.train.torch.prepare_model(model)

    # Training loop
    for batch in train_dataloader:
        # Your training logic here
        pass

# Run training with Ray Data
trainer = TorchTrainer(
    train_func,
    datasets={"train": dataset},
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)

result = trainer.fit()

Hugging Face Integration

import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from datasets import load_dataset

# Load dataset
dataset = load_dataset("imdb")
ray_dataset = ray.data.from_huggingface(dataset["train"])

def train_func():
    # Load model and tokenizer
    model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased")
    tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

    # Prepare model for distributed training
    model = ray.train.torch.prepare_model(model)

    # Get dataset shard
    train_ds = ray.train.get_dataset_shard("train")

    # Training loop
    for batch in train_ds.iter_torch_batches(batch_size=16):
        # Tokenize and train
        pass

# Run training
trainer = TorchTrainer(
    train_func,
    datasets={"train": ray_dataset},
    scaling_config=ScalingConfig(num_workers=2, use_gpu=True)
)

result = trainer.fit()

Ray Serve: Model Serving at Scale

Ray Serve is a scalable model serving library that enables you to deploy ML models as web services with automatic scaling, load balancing, and fault tolerance.

Installation

pip install "ray[serve]"

Basic Model Serving

from ray import serve
from fastapi import FastAPI
import torch
import torch.nn as nn

# Define a simple model
class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(10, 1)

    def forward(self, x):
        return self.linear(x)

# Create FastAPI app
app = FastAPI()

@serve.deployment(route_prefix="/predict")
@serve.ingress(app)
class ModelDeployment:
    def __init__(self):
        self.model = SimpleModel()
        self.model.eval()

    @app.post("/")
    async def predict(self, data: dict):
        # Process input data
        input_tensor = torch.tensor(data["input"], dtype=torch.float32)

        # Make prediction
        with torch.no_grad():
            prediction = self.model(input_tensor)

        return {"prediction": prediction.item()}

# Deploy the model
model_deployment = ModelDeployment.bind()
serve.run(model_deployment)

Advanced Model Composition

from ray import serve
from ray.serve.handle import DeploymentHandle
from fastapi import FastAPI
import torch

app = FastAPI()

@serve.deployment
class Preprocessor:
    def __call__(self, data):
        # Preprocess data
        return torch.tensor(data, dtype=torch.float32)

@serve.deployment
class Model:
    def __init__(self):
        self.model = torch.nn.Linear(10, 1)

    def __call__(self, data):
        with torch.no_grad():
            return self.model(data).item()

@serve.deployment
class Postprocessor:
    def __call__(self, prediction):
        # Postprocess prediction
        return {"prediction": prediction, "confidence": 0.95}

@serve.deployment(route_prefix="/")
@serve.ingress(app)
class MLPipeline:
    def __init__(self, preprocessor: DeploymentHandle, model: DeploymentHandle, postprocessor: DeploymentHandle):
        self.preprocessor = preprocessor
        self.model = model
        self.postprocessor = postprocessor

    @app.post("/predict")
    async def predict(self, data: dict):
        # Chain the services
        preprocessed = await self.preprocessor.remote(data["input"])
        prediction = await self.model.remote(preprocessed)
        result = await self.postprocessor.remote(prediction)
        return result

# Deploy the pipeline
preprocessor = Preprocessor.bind()
model = Model.bind()
postprocessor = Postprocessor.bind()
pipeline = MLPipeline.bind(preprocessor, model, postprocessor)

serve.run(pipeline)

Auto-scaling and Load Balancing

from ray import serve
from ray.serve.config import AutoscalingConfig

@serve.deployment(
    autoscaling_config=AutoscalingConfig(
        min_replicas=1,
        max_replicas=10,
        target_ongoing_requests=5
    )
)
class ScalableModel:
    def __call__(self, data):
        # Your model logic here
        return {"result": "processed"}

# Deploy with auto-scaling
serve.run(ScalableModel.bind())

Model Versioning and A/B Testing

from ray import serve

@serve.deployment(version="v1")
class ModelV1:
    def __call__(self, data):
        return {"model": "v1", "result": data}

@serve.deployment(version="v2")
class ModelV2:
    def __call__(self, data):
        return {"model": "v2", "result": data}

# Deploy both versions
serve.run({
    "model_v1": ModelV1.bind(),
    "model_v2": ModelV2.bind()
})

Anyscale: The Ray Platform

Anyscale is the company behind Ray, providing a managed platform for running Ray applications in production. It offers additional features like RayTurbo for enhanced performance and simplified deployment.

Key Anyscale Features

  • Managed Ray Clusters: Fully managed Ray clusters in the cloud
  • RayTurbo: Performance optimizations for training and serving
  • Anyscale Workspaces: Interactive development environment
  • Anyscale Services: Production deployment platform
  • Enterprise Support: Professional support and consulting

RayTurbo for Enhanced Performance

RayTurbo provides performance optimizations for Ray applications:

# RayTurbo features are automatically enabled when running on Anyscale
# No code changes required - just deploy to Anyscale platform

# Example: Enhanced training with RayTurbo
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

# RayTurbo automatically optimizes:
# - Mid-epoch training resumption
# - Elastic training
# - Enhanced monitoring
# - Performance profiling

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)

result = trainer.fit()

Deploying to Anyscale Services

# Deploy Ray Serve application to Anyscale
from ray import serve

@serve.deployment
class ProductionModel:
    def __call__(self, data):
        return {"result": "processed"}

# Deploy to Anyscale Services
# This would typically be done via Anyscale CLI or web interface
serve.run(ProductionModel.bind())

Real-World Examples

1. Large Language Model Fine-tuning

import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

def fine_tune_llm():
    # Load model and tokenizer
    model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")
    tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")

    # Prepare for distributed training
    model = ray.train.torch.prepare_model(model)

    # Training loop
    for epoch in range(5):
        # Your fine-tuning logic here
        ray.train.report({"epoch": epoch, "loss": 0.5})

# Run fine-tuning
trainer = TorchTrainer(
    fine_tune_llm,
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)

result = trainer.fit()

2. Computer Vision Pipeline

import ray
from ray import serve
from ray.data import Dataset
import torch
import torchvision.transforms as transforms
from PIL import Image

# Data processing pipeline
def process_images():
    # Read images
    dataset = ray.data.read_images("s3://my-bucket/images/")

    # Apply transformations
    def transform_batch(batch):
        transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        batch["image"] = [transform(Image.fromarray(img)) for img in batch["image"]]
        return batch

    return dataset.map_batches(transform_batch)

# Model serving
@serve.deployment
class ImageClassifier:
    def __init__(self):
        self.model = torch.hub.load('pytorch/vision', 'resnet18', pretrained=True)
        self.model.eval()

    def __call__(self, image):
        with torch.no_grad():
            prediction = self.model(image)
        return {"prediction": prediction.argmax().item()}

# Deploy the pipeline
dataset = process_images()
classifier = ImageClassifier.bind()
serve.run(classifier)

3. Recommendation System

import ray
from ray import serve
from ray.data import Dataset
import numpy as np
from sklearn.ensemble import RandomForestRegressor

# Data processing
def process_recommendation_data():
    # Load user and item data
    user_data = ray.data.read_csv("s3://my-bucket/users.csv")
    item_data = ray.data.read_csv("s3://my-bucket/items.csv")

    # Feature engineering
    def create_features(batch):
        # Create user-item interaction features
        batch["interaction_score"] = np.random.rand(len(batch))
        return batch

    return user_data.map_batches(create_features)

# Recommendation model
@serve.deployment
class RecommendationModel:
    def __init__(self):
        self.model = RandomForestRegressor()
        # Load trained model
        self.model.fit(np.random.rand(100, 10), np.random.rand(100))

    def __call__(self, user_features):
        prediction = self.model.predict([user_features])
        return {"recommendation_score": prediction[0]}

# Deploy recommendation system
dataset = process_recommendation_data()
recommender = RecommendationModel.bind()
serve.run(recommender)

Best Practices and Performance Tips

1. Resource Management

import ray

# Configure Ray for optimal performance
ray.init(
    num_cpus=8,
    num_gpus=2,
    object_store_memory=1000000000,  # 1GB
    _system_config={
        "max_direct_call_object_size": 1000000,
        "task_retry_delay_ms": 1000
    }
)

2. Efficient Data Processing

# Use appropriate batch sizes
dataset.map_batches(process_batch, batch_size=1000)

# Use concurrency for I/O bound tasks
dataset.map_batches(process_batch, concurrency=10)

# Use GPU for compute-intensive tasks
dataset.map_batches(gpu_process_batch, num_gpus=1)

3. Error Handling

import ray
from ray.exceptions import RayTaskError

@ray.remote
def risky_task():
    if np.random.rand() < 0.1:
        raise ValueError("Random error")
    return "success"

# Handle errors gracefully
try:
    result = ray.get(risky_task.remote())
except RayTaskError as e:
    print(f"Task failed: {e}")

4. Monitoring and Debugging

import ray

# Enable detailed logging
ray.init(log_to_driver=True)

# Use Ray Dashboard for monitoring
# Access at http://localhost:8265

# Profile your application
@ray.remote
def profiled_task():
    import cProfile
    profiler = cProfile.Profile()
    profiler.enable()

    # Your code here

    profiler.disable()
    profiler.dump_stats("profile.stats")

5. Production Deployment

# Use Ray Serve for production
from ray import serve
from ray.serve.config import HTTPOptions

# Configure HTTP options
http_options = HTTPOptions(
    host="0.0.0.0",
    port=8000,
    location="EveryNode"
)

# Deploy with proper configuration
serve.start(http_options=http_options)

@serve.deployment(
    num_replicas=4,
    max_concurrent_queries=100
)
class ProductionModel:
    def __call__(self, data):
        return {"result": "processed"}

serve.run(ProductionModel.bind())

Conclusion

Ray represents a paradigm shift in how we build and deploy AI applications. By providing a unified API for distributed computing, Ray eliminates the complexity of managing distributed systems while offering the performance and scalability needed for modern AI workloads.

Key Takeaways

  1. Ray Core provides the foundation for distributed computing with tasks, actors, and object stores
  2. Ray Data enables scalable data processing for ML workloads
  3. Ray Train simplifies distributed model training across multiple frameworks
  4. Ray Serve provides production-ready model serving with auto-scaling
  5. Anyscale offers a managed platform for running Ray applications in production

Getting Started

To get started with Ray:

  1. Install Ray: pip install ray
  2. Start with simple parallel tasks using @ray.remote
  3. Explore Ray Data for data processing
  4. Try Ray Train for distributed training
  5. Deploy models with Ray Serve
  6. Consider Anyscale for production deployment

Resources

Ray is not just a framework—it’s an ecosystem that empowers developers to build the next generation of AI applications. Whether you’re processing massive datasets, training complex models, or serving predictions at scale, Ray provides the tools and abstractions you need to succeed.

Start your Ray journey today and discover how distributed computing can transform your AI applications!

~

Mohamed Hasan