Example of Multi Agent AI in healthcare

import threading import time import random Base Agent Class class Agent: def init(self, name): self.name = name def perform_task(self): raise NotImplementedError("Subclasses must implement this method.") Monitoring Agent class MonitoringAgent(Agent): def init(self, name, patient_id): super().init(name) self.patient_id = patient_id def perform_task(self): # Simulate real-time monitoring of vitals vitals = { "heart_rate": random.randint(60, 100), "blood_pressure": f"{random.randint(110, 140)}/{random.randint(70, 90)}", "temperature": round(random.uniform(36.5, 37.5), 1) } print(f"MonitoringAgent-{self.patient_id}: Vitals -> {vitals}") return vitals Diagnostic Agent class DiagnosticAgent(Agent): def init(self, name, patient_data): super().init(name) self.patient_data = patient_data def perform_task(self): # Analyze vitals and suggest potential diagnoses heart_rate = self.patient_data["heart_rate"] blood_pressure = self.patient_data["blood_pressure"] diagnosis = "Normal" if heart_rate > 90 or "140" in blood_pressure: diagnosis = "Possible Hypertension" print(f"DiagnosticAgent: Diagnosis -> {diagnosis}") return diagnosis Treatment Agent class TreatmentAgent(Agent): def init(self, name, diagnosis): super().init(name) self.diagnosis = diagnosis def perform_task(self): # Suggest treatment based on diagnosis treatment_plan = "Lifestyle changes" if self.diagnosis == "Possible Hypertension": treatment_plan = "Prescribe antihypertensive medication" print(f"TreatmentAgent: Recommended Treatment -> {treatment_plan}") return treatment_plan Multi-Agent System Coordinator class MultiAgentSystemCoordinator: def init(self, patient_id): self.patient_id = patient_id def run(self): # Step 1: Spawn Monitoring Agent monitor_agent = MonitoringAgent("Monitor", self.patient_id) vitals = monitor_agent.perform_task() # Step 2: Spawn Diagnostic Agent diagnostic_agent = DiagnosticAgent("Diagnose", vitals) diagnosis = diagnostic_agent.perform_task() # Step 3: Spawn Treatment Agent treatment_agent = TreatmentAgent("Treat", diagnosis) treatment_plan = treatment_agent.perform_task() Simulate MAS in Healthcare def simulate_healthcare_system(): patients = [1, 2] # Example patient IDs threads = [] for patient_id in patients: coordinator = MultiAgentSystemCoordinator(patient_id) # Run each patient's MAS in a separate thread for scalability thread = threading.Thread(target=coordinator.run) threads.append(thread) thread.start() for thread in threads: thread.join() if name == "main": simulate_healthcare_system()

Mar 10, 2025 - 18:25
 0
Example of Multi Agent AI in healthcare

import threading
import time
import random

Base Agent Class

class Agent:
def init(self, name):
self.name = name

def perform_task(self):
    raise NotImplementedError("Subclasses must implement this method.")

Monitoring Agent

class MonitoringAgent(Agent):
def init(self, name, patient_id):
super().init(name)
self.patient_id = patient_id

def perform_task(self):
    # Simulate real-time monitoring of vitals
    vitals = {
        "heart_rate": random.randint(60, 100),
        "blood_pressure": f"{random.randint(110, 140)}/{random.randint(70, 90)}",
        "temperature": round(random.uniform(36.5, 37.5), 1)
    }
    print(f"MonitoringAgent-{self.patient_id}: Vitals -> {vitals}")
    return vitals

Diagnostic Agent

class DiagnosticAgent(Agent):
def init(self, name, patient_data):
super().init(name)
self.patient_data = patient_data

def perform_task(self):
    # Analyze vitals and suggest potential diagnoses
    heart_rate = self.patient_data["heart_rate"]
    blood_pressure = self.patient_data["blood_pressure"]

    diagnosis = "Normal"
    if heart_rate > 90 or "140" in blood_pressure:
        diagnosis = "Possible Hypertension"

    print(f"DiagnosticAgent: Diagnosis -> {diagnosis}")
    return diagnosis

Treatment Agent

class TreatmentAgent(Agent):
def init(self, name, diagnosis):
super().init(name)
self.diagnosis = diagnosis

def perform_task(self):
    # Suggest treatment based on diagnosis
    treatment_plan = "Lifestyle changes"
    if self.diagnosis == "Possible Hypertension":
        treatment_plan = "Prescribe antihypertensive medication"

    print(f"TreatmentAgent: Recommended Treatment -> {treatment_plan}")
    return treatment_plan

Multi-Agent System Coordinator

class MultiAgentSystemCoordinator:
def init(self, patient_id):
self.patient_id = patient_id

def run(self):
    # Step 1: Spawn Monitoring Agent
    monitor_agent = MonitoringAgent("Monitor", self.patient_id)
    vitals = monitor_agent.perform_task()

    # Step 2: Spawn Diagnostic Agent
    diagnostic_agent = DiagnosticAgent("Diagnose", vitals)
    diagnosis = diagnostic_agent.perform_task()

    # Step 3: Spawn Treatment Agent
    treatment_agent = TreatmentAgent("Treat", diagnosis)
    treatment_plan = treatment_agent.perform_task()

Simulate MAS in Healthcare

def simulate_healthcare_system():
patients = [1, 2] # Example patient IDs
threads = []

for patient_id in patients:
    coordinator = MultiAgentSystemCoordinator(patient_id)

    # Run each patient's MAS in a separate thread for scalability
    thread = threading.Thread(target=coordinator.run)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

if name == "main":
simulate_healthcare_system()