import ray import random import time from typing import List @ray.remote class ProgressActor: def __init__(self, total_tasks: int): self.total_tasks = total_tasks self.completed_tasks = 0 def update(self, num_completed: int): self.completed_tasks += num_completed print(f"Progress: {self.completed_tasks}/{self.total_tasks} tasks completed") return self.completed_tasks def get_progress(self): return self.completed_tasks @ray.remote def estimate_pi_chunk(num_samples: int, progress_actor) -> float: """Estimate pi using Monte Carlo method for a chunk of samples""" print(f"Worker starting computation with {num_samples} samples") inside_circle = 0 for _ in range(num_samples): x, y = random.random(), random.random() if x*x + y*y <= 1: inside_circle += 1 # Update progress progress_actor.update.remote(1) pi_estimate = 4 * inside_circle / num_samples print(f"Worker completed: pi estimate = {pi_estimate}") return pi_estimate, inside_circle, num_samples def main(): print("Starting distributed Pi calculation demo") print(f"Ray cluster resources: {ray.cluster_resources()}") # Configuration total_samples = 10_000_000 num_workers = 2 samples_per_worker = total_samples // num_workers print(f"Total samples: {total_samples}") print(f"Number of workers: {num_workers}") print(f"Samples per worker: {samples_per_worker}") # Create progress tracker progress_actor = ProgressActor.remote(num_workers) # Submit tasks to workers print("\nSubmitting tasks to workers...") start_time = time.time() tasks = [ estimate_pi_chunk.remote(samples_per_worker, progress_actor) for _ in range(num_workers) ] # Wait for results print("Waiting for results...") results = ray.get(tasks) end_time = time.time() # Calculate final pi estimate total_inside = sum(inside for _, inside, _ in results) total_samples_actual = sum(samples for _, _, samples in results) final_pi_estimate = 4 * total_inside / total_samples_actual # Display results print(f"\n{'='*50}") print("DISTRIBUTED PI CALCULATION RESULTS") print(f"{'='*50}") print(f"Total samples processed: {total_samples_actual:,}") print(f"Total points inside circle: {total_inside:,}") print(f"Final Pi estimate: {final_pi_estimate}") print(f"Actual Pi value: {3.14159265359}") print(f"Error: {abs(final_pi_estimate - 3.14159265359):.6f}") print(f"Execution time: {end_time - start_time:.2f} seconds") print(f"Samples per second: {total_samples_actual / (end_time - start_time):,.0f}") # Show individual worker results print(f"\nIndividual worker results:") for i, (pi_est, inside, samples) in enumerate(results): print(f"Worker {i+1}: π ≈ {pi_est:.6f} ({inside}/{samples})") return final_pi_estimate if __name__ == "__main__": ray.init() try: result = main() print(f"\nJob completed successfully! Final result: π ≈ {result}") finally: ray.shutdown()