Rayman: The Complete Guide to Ray - The AI Compute Engine
Ray is more than just a distributed computing framework—it’s a complete ecosystem for building and scaling AI applications. From simple parallel tasks to complex machine learning pipelines, Ray provides the tools and abstractions needed to take your Python code from a single machine to a distributed cluster with minimal changes.
In this comprehensive guide, we’ll explore Ray’s core components, dive into practical examples, and show you how to build production-ready AI applications that scale seamlessly.
Table of Contents
- What is Ray?
- Ray Core: The Foundation
- Ray Data: Scalable Data Processing
- Ray Train: Distributed Model Training
- Ray Serve: Model Serving at Scale
- Anyscale: The Ray Platform
- Real-World Examples
- Best Practices and Performance Tips
- Conclusion
What is Ray?
Ray is an open-source framework designed to simplify the development and deployment of scalable AI and machine learning applications. It provides a unified compute layer that abstracts the complexities of distributed computing, enabling efficient scaling from a single machine to large clusters.
Key Features
- Unified API: Write once, scale anywhere
- Fault Tolerance: Built-in error handling and recovery
- Resource Management: Automatic resource allocation and scheduling
- Multi-language Support: Python, Java, and C++ support
- Cloud Native: Works seamlessly with Kubernetes and cloud providers
Ray Ecosystem
Ray consists of several specialized libraries built on top of Ray Core:
- Ray Core: Distributed computing primitives
- Ray Data: Scalable data processing
- Ray Train: Distributed model training
- Ray Serve: Model serving and deployment
- Ray Tune: Hyperparameter optimization
- Ray RLlib: Reinforcement learning
Ray Core: The Foundation
Ray Core provides the fundamental building blocks for distributed applications. It offers primitives like tasks, actors, and a distributed object store.
Installation
pip install ray
Basic Ray Core Example
Let’s start with a simple example that demonstrates Ray’s power:
import ray
import time
# Initialize Ray
ray.init()
# Sample data
database = ["Learning", "Ray", "Flexible", "Distributed", "Python", "for", "Machine", "Learning"]
# Serial function
def retrieve(item):
time.sleep(item / 10.0)
return item, database[item]
# Parallel function using Ray
@ray.remote
def retrieve_task(item):
return retrieve(item)
# Compare serial vs parallel execution
def compare_execution():
# Serial execution
start = time.time()
serial_data = [retrieve(item) for item in range(8)]
serial_time = time.time() - start
# Parallel execution
start = time.time()
object_refs = [retrieve_task.remote(item) for item in range(8)]
parallel_data = ray.get(object_refs)
parallel_time = time.time() - start
print(f"Serial execution time: {serial_time:.2f} seconds")
print(f"Parallel execution time: {parallel_time:.2f} seconds")
print(f"Speedup: {serial_time / parallel_time:.2f}x")
compare_execution()
# Cleanup
ray.shutdown()
Ray Actors
Actors are stateful distributed objects that maintain state across method calls:
import ray
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
def get_count(self):
return self.count
# Create actor instances
counter1 = Counter.remote()
counter2 = Counter.remote()
# Call methods on actors
print(ray.get(counter1.increment.remote())) # 1
print(ray.get(counter1.increment.remote())) # 2
print(ray.get(counter2.increment.remote())) # 1
print(ray.get(counter1.get_count.remote())) # 2
Ray Object Store
Ray’s object store enables efficient sharing of large objects across tasks:
import ray
import numpy as np
@ray.remote
def create_large_array():
return np.random.rand(1000, 1000)
@ray.remote
def process_array(array_ref):
array = ray.get(array_ref)
return np.sum(array)
# Create a large array
array_ref = create_large_array.remote()
# Process the array without copying it
result = process_array.remote(array_ref)
print(f"Sum: {ray.get(result)}")
Ray Data: Scalable Data Processing
Ray Data provides scalable, framework-agnostic data loading and transformation for ML workloads. It’s designed to handle large datasets efficiently across distributed systems.
Installation
pip install "ray[data]"
Basic Ray Data Operations
import ray
from ray.data import Dataset
# Initialize Ray
ray.init()
# Create a dataset from a range
dataset = ray.data.range(1000)
print(f"Dataset size: {dataset.count()}")
# Add a new column
dataset = dataset.add_column("squared", lambda x: x["id"] ** 2)
# Filter data
filtered_dataset = dataset.filter(lambda x: x["squared"] > 100)
# Show first few rows
print("First 5 rows:")
for row in filtered_dataset.take(5):
print(row)
Reading Data from Various Sources
Ray Data supports reading from many data sources:
import ray
# Read from CSV
csv_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
# Read from Parquet
parquet_dataset = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
# Read from JSON
json_dataset = ray.data.read_json("s3://anonymous@ray-example-data/logs.json")
# Read images
image_dataset = ray.data.read_images("s3://anonymous@ray-example-data/images/")
# Read from Hugging Face
from datasets import load_dataset
hf_dataset = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_dataset = ray.data.from_huggingface(hf_dataset["train"])
Data Transformations
import ray
import numpy as np
# Create a dataset
dataset = ray.data.range(1000)
# Map function to each row
def add_random_noise(row):
row["noise"] = np.random.randn()
return row
dataset = dataset.map(add_random_noise)
# Map batches for better performance
def process_batch(batch):
batch["processed"] = batch["id"] * 2 + batch["noise"]
return batch
dataset = dataset.map_batches(process_batch, batch_size=100)
# Iterate over batches
for batch in dataset.iter_batches(batch_size=10):
print(f"Batch shape: {batch['id'].shape}")
break
Integration with ML Frameworks
import ray
import torch
from ray.data import Dataset
# Create a PyTorch-compatible dataset
dataset = ray.data.range(1000)
# Convert to PyTorch tensors
for batch in dataset.iter_torch_batches(batch_size=32):
print(f"Batch type: {type(batch['id'])}")
print(f"Batch shape: {batch['id'].shape}")
break
Ray Train: Distributed Model Training
Ray Train provides abstractions for distributing model training across multiple machines and GPUs. It integrates with popular ML frameworks like PyTorch, TensorFlow, and Hugging Face.
Installation
pip install "ray[train]"
PyTorch Distributed Training
import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig, RunConfig
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# Define a simple model
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(10, 1)
def forward(self, x):
return self.linear(x)
# Training function
def train_func():
# Create model and optimizer
model = SimpleModel()
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
# Prepare model for distributed training
model = ray.train.torch.prepare_model(model)
# Create dummy data
X = torch.randn(1000, 10)
y = torch.randn(1000, 1)
dataset = TensorDataset(X, y)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# Prepare dataloader for distributed training
dataloader = ray.train.torch.prepare_data_loader(dataloader)
# Training loop
for epoch in range(10):
for batch_x, batch_y in dataloader:
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
# Report metrics
ray.train.report({"loss": loss.item(), "epoch": epoch})
# Configure and run training
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=2, use_gpu=True)
)
result = trainer.fit()
print(f"Training completed: {result}")
Multi-GPU Training with Ray Data
import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
import torch
import torch.nn as nn
# Create dataset
dataset = ray.data.range(1000)
# Training function with Ray Data
def train_func():
# Get dataset shard for this worker
train_ds = ray.train.get_dataset_shard("train")
# Convert to PyTorch batches
train_dataloader = train_ds.iter_torch_batches(batch_size=32)
# Create model
model = nn.Linear(10, 1)
model = ray.train.torch.prepare_model(model)
# Training loop
for batch in train_dataloader:
# Your training logic here
pass
# Run training with Ray Data
trainer = TorchTrainer(
train_func,
datasets={"train": dataset},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
result = trainer.fit()
Hugging Face Integration
import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from datasets import load_dataset
# Load dataset
dataset = load_dataset("imdb")
ray_dataset = ray.data.from_huggingface(dataset["train"])
def train_func():
# Load model and tokenizer
model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
# Prepare model for distributed training
model = ray.train.torch.prepare_model(model)
# Get dataset shard
train_ds = ray.train.get_dataset_shard("train")
# Training loop
for batch in train_ds.iter_torch_batches(batch_size=16):
# Tokenize and train
pass
# Run training
trainer = TorchTrainer(
train_func,
datasets={"train": ray_dataset},
scaling_config=ScalingConfig(num_workers=2, use_gpu=True)
)
result = trainer.fit()
Ray Serve: Model Serving at Scale
Ray Serve is a scalable model serving library that enables you to deploy ML models as web services with automatic scaling, load balancing, and fault tolerance.
Installation
pip install "ray[serve]"
Basic Model Serving
from ray import serve
from fastapi import FastAPI
import torch
import torch.nn as nn
# Define a simple model
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(10, 1)
def forward(self, x):
return self.linear(x)
# Create FastAPI app
app = FastAPI()
@serve.deployment(route_prefix="/predict")
@serve.ingress(app)
class ModelDeployment:
def __init__(self):
self.model = SimpleModel()
self.model.eval()
@app.post("/")
async def predict(self, data: dict):
# Process input data
input_tensor = torch.tensor(data["input"], dtype=torch.float32)
# Make prediction
with torch.no_grad():
prediction = self.model(input_tensor)
return {"prediction": prediction.item()}
# Deploy the model
model_deployment = ModelDeployment.bind()
serve.run(model_deployment)
Advanced Model Composition
from ray import serve
from ray.serve.handle import DeploymentHandle
from fastapi import FastAPI
import torch
app = FastAPI()
@serve.deployment
class Preprocessor:
def __call__(self, data):
# Preprocess data
return torch.tensor(data, dtype=torch.float32)
@serve.deployment
class Model:
def __init__(self):
self.model = torch.nn.Linear(10, 1)
def __call__(self, data):
with torch.no_grad():
return self.model(data).item()
@serve.deployment
class Postprocessor:
def __call__(self, prediction):
# Postprocess prediction
return {"prediction": prediction, "confidence": 0.95}
@serve.deployment(route_prefix="/")
@serve.ingress(app)
class MLPipeline:
def __init__(self, preprocessor: DeploymentHandle, model: DeploymentHandle, postprocessor: DeploymentHandle):
self.preprocessor = preprocessor
self.model = model
self.postprocessor = postprocessor
@app.post("/predict")
async def predict(self, data: dict):
# Chain the services
preprocessed = await self.preprocessor.remote(data["input"])
prediction = await self.model.remote(preprocessed)
result = await self.postprocessor.remote(prediction)
return result
# Deploy the pipeline
preprocessor = Preprocessor.bind()
model = Model.bind()
postprocessor = Postprocessor.bind()
pipeline = MLPipeline.bind(preprocessor, model, postprocessor)
serve.run(pipeline)
Auto-scaling and Load Balancing
from ray import serve
from ray.serve.config import AutoscalingConfig
@serve.deployment(
autoscaling_config=AutoscalingConfig(
min_replicas=1,
max_replicas=10,
target_ongoing_requests=5
)
)
class ScalableModel:
def __call__(self, data):
# Your model logic here
return {"result": "processed"}
# Deploy with auto-scaling
serve.run(ScalableModel.bind())
Model Versioning and A/B Testing
from ray import serve
@serve.deployment(version="v1")
class ModelV1:
def __call__(self, data):
return {"model": "v1", "result": data}
@serve.deployment(version="v2")
class ModelV2:
def __call__(self, data):
return {"model": "v2", "result": data}
# Deploy both versions
serve.run({
"model_v1": ModelV1.bind(),
"model_v2": ModelV2.bind()
})
Anyscale: The Ray Platform
Anyscale is the company behind Ray, providing a managed platform for running Ray applications in production. It offers additional features like RayTurbo for enhanced performance and simplified deployment.
Key Anyscale Features
- Managed Ray Clusters: Fully managed Ray clusters in the cloud
- RayTurbo: Performance optimizations for training and serving
- Anyscale Workspaces: Interactive development environment
- Anyscale Services: Production deployment platform
- Enterprise Support: Professional support and consulting
RayTurbo for Enhanced Performance
RayTurbo provides performance optimizations for Ray applications:
# RayTurbo features are automatically enabled when running on Anyscale
# No code changes required - just deploy to Anyscale platform
# Example: Enhanced training with RayTurbo
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
# RayTurbo automatically optimizes:
# - Mid-epoch training resumption
# - Elastic training
# - Enhanced monitoring
# - Performance profiling
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
result = trainer.fit()
Deploying to Anyscale Services
# Deploy Ray Serve application to Anyscale
from ray import serve
@serve.deployment
class ProductionModel:
def __call__(self, data):
return {"result": "processed"}
# Deploy to Anyscale Services
# This would typically be done via Anyscale CLI or web interface
serve.run(ProductionModel.bind())
Real-World Examples
1. Large Language Model Fine-tuning
import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
def fine_tune_llm():
# Load model and tokenizer
model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")
tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")
# Prepare for distributed training
model = ray.train.torch.prepare_model(model)
# Training loop
for epoch in range(5):
# Your fine-tuning logic here
ray.train.report({"epoch": epoch, "loss": 0.5})
# Run fine-tuning
trainer = TorchTrainer(
fine_tune_llm,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
result = trainer.fit()
2. Computer Vision Pipeline
import ray
from ray import serve
from ray.data import Dataset
import torch
import torchvision.transforms as transforms
from PIL import Image
# Data processing pipeline
def process_images():
# Read images
dataset = ray.data.read_images("s3://my-bucket/images/")
# Apply transformations
def transform_batch(batch):
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
batch["image"] = [transform(Image.fromarray(img)) for img in batch["image"]]
return batch
return dataset.map_batches(transform_batch)
# Model serving
@serve.deployment
class ImageClassifier:
def __init__(self):
self.model = torch.hub.load('pytorch/vision', 'resnet18', pretrained=True)
self.model.eval()
def __call__(self, image):
with torch.no_grad():
prediction = self.model(image)
return {"prediction": prediction.argmax().item()}
# Deploy the pipeline
dataset = process_images()
classifier = ImageClassifier.bind()
serve.run(classifier)
3. Recommendation System
import ray
from ray import serve
from ray.data import Dataset
import numpy as np
from sklearn.ensemble import RandomForestRegressor
# Data processing
def process_recommendation_data():
# Load user and item data
user_data = ray.data.read_csv("s3://my-bucket/users.csv")
item_data = ray.data.read_csv("s3://my-bucket/items.csv")
# Feature engineering
def create_features(batch):
# Create user-item interaction features
batch["interaction_score"] = np.random.rand(len(batch))
return batch
return user_data.map_batches(create_features)
# Recommendation model
@serve.deployment
class RecommendationModel:
def __init__(self):
self.model = RandomForestRegressor()
# Load trained model
self.model.fit(np.random.rand(100, 10), np.random.rand(100))
def __call__(self, user_features):
prediction = self.model.predict([user_features])
return {"recommendation_score": prediction[0]}
# Deploy recommendation system
dataset = process_recommendation_data()
recommender = RecommendationModel.bind()
serve.run(recommender)
Best Practices and Performance Tips
1. Resource Management
import ray
# Configure Ray for optimal performance
ray.init(
num_cpus=8,
num_gpus=2,
object_store_memory=1000000000, # 1GB
_system_config={
"max_direct_call_object_size": 1000000,
"task_retry_delay_ms": 1000
}
)
2. Efficient Data Processing
# Use appropriate batch sizes
dataset.map_batches(process_batch, batch_size=1000)
# Use concurrency for I/O bound tasks
dataset.map_batches(process_batch, concurrency=10)
# Use GPU for compute-intensive tasks
dataset.map_batches(gpu_process_batch, num_gpus=1)
3. Error Handling
import ray
from ray.exceptions import RayTaskError
@ray.remote
def risky_task():
if np.random.rand() < 0.1:
raise ValueError("Random error")
return "success"
# Handle errors gracefully
try:
result = ray.get(risky_task.remote())
except RayTaskError as e:
print(f"Task failed: {e}")
4. Monitoring and Debugging
import ray
# Enable detailed logging
ray.init(log_to_driver=True)
# Use Ray Dashboard for monitoring
# Access at http://localhost:8265
# Profile your application
@ray.remote
def profiled_task():
import cProfile
profiler = cProfile.Profile()
profiler.enable()
# Your code here
profiler.disable()
profiler.dump_stats("profile.stats")
5. Production Deployment
# Use Ray Serve for production
from ray import serve
from ray.serve.config import HTTPOptions
# Configure HTTP options
http_options = HTTPOptions(
host="0.0.0.0",
port=8000,
location="EveryNode"
)
# Deploy with proper configuration
serve.start(http_options=http_options)
@serve.deployment(
num_replicas=4,
max_concurrent_queries=100
)
class ProductionModel:
def __call__(self, data):
return {"result": "processed"}
serve.run(ProductionModel.bind())
Conclusion
Ray represents a paradigm shift in how we build and deploy AI applications. By providing a unified API for distributed computing, Ray eliminates the complexity of managing distributed systems while offering the performance and scalability needed for modern AI workloads.
Key Takeaways
- Ray Core provides the foundation for distributed computing with tasks, actors, and object stores
- Ray Data enables scalable data processing for ML workloads
- Ray Train simplifies distributed model training across multiple frameworks
- Ray Serve provides production-ready model serving with auto-scaling
- Anyscale offers a managed platform for running Ray applications in production
Getting Started
To get started with Ray:
- Install Ray:
pip install ray - Start with simple parallel tasks using
@ray.remote - Explore Ray Data for data processing
- Try Ray Train for distributed training
- Deploy models with Ray Serve
- Consider Anyscale for production deployment
Resources
Ray is not just a framework—it’s an ecosystem that empowers developers to build the next generation of AI applications. Whether you’re processing massive datasets, training complex models, or serving predictions at scale, Ray provides the tools and abstractions you need to succeed.
Start your Ray journey today and discover how distributed computing can transform your AI applications!