Example of Multi Agent AI in healthcare
import threading import time import random Base Agent Class class Agent: def init(self, name): self.name = name def perform_task(self): raise NotImplementedError("Subclasses must implement this method.") Monitoring Agent class MonitoringAgent(Agent): def init(self, name, patient_id): super().init(name) self.patient_id = patient_id def perform_task(self): # Simulate real-time monitoring of vitals vitals = { "heart_rate": random.randint(60, 100), "blood_pressure": f"{random.randint(110, 140)}/{random.randint(70, 90)}", "temperature": round(random.uniform(36.5, 37.5), 1) } print(f"MonitoringAgent-{self.patient_id}: Vitals -> {vitals}") return vitals Diagnostic Agent class DiagnosticAgent(Agent): def init(self, name, patient_data): super().init(name) self.patient_data = patient_data def perform_task(self): # Analyze vitals and suggest potential diagnoses heart_rate = self.patient_data["heart_rate"] blood_pressure = self.patient_data["blood_pressure"] diagnosis = "Normal" if heart_rate > 90 or "140" in blood_pressure: diagnosis = "Possible Hypertension" print(f"DiagnosticAgent: Diagnosis -> {diagnosis}") return diagnosis Treatment Agent class TreatmentAgent(Agent): def init(self, name, diagnosis): super().init(name) self.diagnosis = diagnosis def perform_task(self): # Suggest treatment based on diagnosis treatment_plan = "Lifestyle changes" if self.diagnosis == "Possible Hypertension": treatment_plan = "Prescribe antihypertensive medication" print(f"TreatmentAgent: Recommended Treatment -> {treatment_plan}") return treatment_plan Multi-Agent System Coordinator class MultiAgentSystemCoordinator: def init(self, patient_id): self.patient_id = patient_id def run(self): # Step 1: Spawn Monitoring Agent monitor_agent = MonitoringAgent("Monitor", self.patient_id) vitals = monitor_agent.perform_task() # Step 2: Spawn Diagnostic Agent diagnostic_agent = DiagnosticAgent("Diagnose", vitals) diagnosis = diagnostic_agent.perform_task() # Step 3: Spawn Treatment Agent treatment_agent = TreatmentAgent("Treat", diagnosis) treatment_plan = treatment_agent.perform_task() Simulate MAS in Healthcare def simulate_healthcare_system(): patients = [1, 2] # Example patient IDs threads = [] for patient_id in patients: coordinator = MultiAgentSystemCoordinator(patient_id) # Run each patient's MAS in a separate thread for scalability thread = threading.Thread(target=coordinator.run) threads.append(thread) thread.start() for thread in threads: thread.join() if name == "main": simulate_healthcare_system()

import threading
import time
import random
Base Agent Class
class Agent:
def init(self, name):
self.name = name
def perform_task(self):
raise NotImplementedError("Subclasses must implement this method.")
Monitoring Agent
class MonitoringAgent(Agent):
def init(self, name, patient_id):
super().init(name)
self.patient_id = patient_id
def perform_task(self):
# Simulate real-time monitoring of vitals
vitals = {
"heart_rate": random.randint(60, 100),
"blood_pressure": f"{random.randint(110, 140)}/{random.randint(70, 90)}",
"temperature": round(random.uniform(36.5, 37.5), 1)
}
print(f"MonitoringAgent-{self.patient_id}: Vitals -> {vitals}")
return vitals
Diagnostic Agent
class DiagnosticAgent(Agent):
def init(self, name, patient_data):
super().init(name)
self.patient_data = patient_data
def perform_task(self):
# Analyze vitals and suggest potential diagnoses
heart_rate = self.patient_data["heart_rate"]
blood_pressure = self.patient_data["blood_pressure"]
diagnosis = "Normal"
if heart_rate > 90 or "140" in blood_pressure:
diagnosis = "Possible Hypertension"
print(f"DiagnosticAgent: Diagnosis -> {diagnosis}")
return diagnosis
Treatment Agent
class TreatmentAgent(Agent):
def init(self, name, diagnosis):
super().init(name)
self.diagnosis = diagnosis
def perform_task(self):
# Suggest treatment based on diagnosis
treatment_plan = "Lifestyle changes"
if self.diagnosis == "Possible Hypertension":
treatment_plan = "Prescribe antihypertensive medication"
print(f"TreatmentAgent: Recommended Treatment -> {treatment_plan}")
return treatment_plan
Multi-Agent System Coordinator
class MultiAgentSystemCoordinator:
def init(self, patient_id):
self.patient_id = patient_id
def run(self):
# Step 1: Spawn Monitoring Agent
monitor_agent = MonitoringAgent("Monitor", self.patient_id)
vitals = monitor_agent.perform_task()
# Step 2: Spawn Diagnostic Agent
diagnostic_agent = DiagnosticAgent("Diagnose", vitals)
diagnosis = diagnostic_agent.perform_task()
# Step 3: Spawn Treatment Agent
treatment_agent = TreatmentAgent("Treat", diagnosis)
treatment_plan = treatment_agent.perform_task()
Simulate MAS in Healthcare
def simulate_healthcare_system():
patients = [1, 2] # Example patient IDs
threads = []
for patient_id in patients:
coordinator = MultiAgentSystemCoordinator(patient_id)
# Run each patient's MAS in a separate thread for scalability
thread = threading.Thread(target=coordinator.run)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if name == "main":
simulate_healthcare_system()