How to build a scalable crawler with Prefect v3 (PokeAPI Example)
This blog post serves as an in-depth tutorial for integrating a new data source crawler—specifically for the PokeAPI V2—into our existing Prefect-powered data aggregation framework. Our primary goal isn't just to crawl PokeAPI, but to use this integration as an opportunity to explore and implement a wide array of Prefect V3 features. (built for LLM context for myself & sharing with you) This guide assumes you have the base project structure (with core/, crawlers/, flows.py, tasks.py, project_blocks.py, etc.) and are looking to understand how to leverage Prefect to its fullest for this new well-known API. We'll be focusing on how and why Prefect's tools make building complex, resilient, and observable data pipelines more manageable and powerful. The PokeAPI (V2) Target: We've chosen PokeAPI V2 because it's a well-documented, public REST API that requires no authentication, returns JSON, and has clear pagination. This makes it ideal for demonstrating various crawling and data processing patterns. Base URL: https://pokeapi.co/api/v2/ Key Endpoints for this tutorial: List Pokémon: /pokemon/ (paginated with limit and offset) Specific Pokémon: /pokemon/{id_or_name}/ Data Changes: We'll assume Pokémon data might change (e.g., new forms, game appearances added) requiring a daily refresh of details, and new Pokémon might be added, requiring more frequent checks (e.g., hourly discovery). Table of Contents The Grand Plan: Showcasing All of Prefect V3 Laying the Groundwork: The PokeApiCrawler Class 2.1. Creating crawlers/pokeapi_crawler.py 2.2. Initializing with Prefect SourceConfigBlock 2.3. Handling API Access (No Key for PokeAPI) 2.4. Discovering All Pokémon: Pagination with AsyncIterator 2.5. Fetching Pokémon Details 2.6. Essential Helper Methods Integrating PokeApiCrawler into the Prefect Ecosystem 3.1. Registering in crawlers/__init__.py 3.2. Crafting the pokeapi-source-config Prefect Block 3.3. Setting up the pokeapi-public-calls Global Concurrency Limit Building the "Ultimate PokeAPI Showcase" Prefect Flow 4.1. Flow Definition and Core Concepts 4.2. Parameters and Advanced Type Validation 4.3. Nested Flows & Task Runners 4.4. Task Features: .submit(), .map(), Custom Cache Keys, Results, States 4.5. Advanced Caching Strategies 4.6. Ensuring Idempotency with Prefect Transactions 4.7. Interactive Workflows: Pausing and Receiving Input 4.8. Creating Rich Artifacts for Observability 4.9. Using Prefect Variables and Blocks within the Flow 4.10. Orchestration Control: State Hooks, Final State, Retries, Timeouts 4.11. Logging, Settings & Profiles in Action 4.12. Interacting with the Prefect API via PrefectClient Deploying the PokeAPI Showcase Flow: Covering All Deployment Angles 5.1. Option A: Quick Local Serving with flow.serve() 5.2. Option B: Robust Python SDK Deployment with flow.deploy() & Docker 5.3. Option C: Declarative Deployments with prefect.yaml 5.4. Understanding Code Storage: Bake vs. Pull 5.5. Work Pools & Workers: The Dynamic Infrastructure Duo 5.6. Schedules: Cron, Interval, RRule, and Parameters 5.7. Deployment Versioning (Automatic with Prefect Cloud) 5.8. Cancelling and Pausing/Suspending Deployed Runs Event-Driven Triggers & Automations with Prefect Testing Your Prefect Workflows Exploring with the Prefect CLI Prefect Server/Cloud and Final Thoughts 1. The Grand Plan: Showcasing All of Prefect V3 Our goal is ambitious: we'll create a new flow, the "Ultimate PokeAPI Showcase," which will call upon the PokeApiCrawler and other tasks to explicitly demonstrate: Core Constructs: Flows, Tasks, Parameters. Execution Control: Sync/Async tasks, various Task Runners, .submit(), .map(). Orchestration: Results, States, Retries, Timeouts, Logging, Schedules. Deployment: flow.serve(), flow.deploy(), prefect.yaml, Work Pools (Docker), Workers, Code Storage. Advanced Features: Caching (diverse policies, cache_key_fn), Transactions, Interactive Input (wait_for_input, send_input), Artifacts (all types), Variables, Blocks (Custom & System), Secrets. Observability & Control: State Hooks, Final State Determination, Cancellation, GCLs, Prefect Client, CLI interactions. Ecosystem: Prefect Server/Cloud settings. This will be a very rich flow, designed less for optimal PokeAPI crawling and more for illustrating the breadth of Prefect's capabilities. We'll also integrate the PokeApiCrawler into the existing hourly discovery and daily refresh flows for a more "production-like" setup. 2. Laying the Groundwork: The PokeApiCrawler Class We need a new class in the crawlers/ directory that implements the SourceCrawler protocol (defined in crawlers/protocols.py) specifically for PokeAPI. 2.1. Creating crawlers/pokeapi_crawler.py Create a new file: crawlers/pokeapi_crawler.py. # crawlers/pokeapi_crawler.py import httpx import

This blog post serves as an in-depth tutorial for integrating a new data source crawler—specifically for the PokeAPI V2—into our existing Prefect-powered data aggregation framework. Our primary goal isn't just to crawl PokeAPI, but to use this integration as an opportunity to explore and implement a wide array of Prefect V3 features. (built for LLM context for myself & sharing with you)
This guide assumes you have the base project structure (with core/
, crawlers/
, flows.py
, tasks.py
, project_blocks.py
, etc.) and are looking to understand how to leverage Prefect to its fullest for this new well-known API. We'll be focusing on how and why Prefect's tools make building complex, resilient, and observable data pipelines more manageable and powerful.
The PokeAPI (V2) Target:
We've chosen PokeAPI V2 because it's a well-documented, public REST API that requires no authentication, returns JSON, and has clear pagination. This makes it ideal for demonstrating various crawling and data processing patterns.
- Base URL:
https://pokeapi.co/api/v2/
- Key Endpoints for this tutorial:
- List Pokémon:
/pokemon/
(paginated withlimit
andoffset
) - Specific Pokémon:
/pokemon/{id_or_name}/
- List Pokémon:
- Data Changes: We'll assume Pokémon data might change (e.g., new forms, game appearances added) requiring a daily refresh of details, and new Pokémon might be added, requiring more frequent checks (e.g., hourly discovery).
Table of Contents
- The Grand Plan: Showcasing All of Prefect V3
- Laying the Groundwork: The
PokeApiCrawler
Class- 2.1. Creating
crawlers/pokeapi_crawler.py
- 2.2. Initializing with Prefect
SourceConfigBlock
- 2.3. Handling API Access (No Key for PokeAPI)
- 2.4. Discovering All Pokémon: Pagination with
AsyncIterator
- 2.5. Fetching Pokémon Details
- 2.6. Essential Helper Methods
- 2.1. Creating
- Integrating
PokeApiCrawler
into the Prefect Ecosystem- 3.1. Registering in
crawlers/__init__.py
- 3.2. Crafting the
pokeapi-source-config
Prefect Block - 3.3. Setting up the
pokeapi-public-calls
Global Concurrency Limit
- 3.1. Registering in
- Building the "Ultimate PokeAPI Showcase" Prefect Flow
- 4.1. Flow Definition and Core Concepts
- 4.2. Parameters and Advanced Type Validation
- 4.3. Nested Flows & Task Runners
- 4.4. Task Features:
.submit()
,.map()
, Custom Cache Keys, Results, States - 4.5. Advanced Caching Strategies
- 4.6. Ensuring Idempotency with Prefect Transactions
- 4.7. Interactive Workflows: Pausing and Receiving Input
- 4.8. Creating Rich Artifacts for Observability
- 4.9. Using Prefect Variables and Blocks within the Flow
- 4.10. Orchestration Control: State Hooks, Final State, Retries, Timeouts
- 4.11. Logging, Settings & Profiles in Action
- 4.12. Interacting with the Prefect API via
PrefectClient
- Deploying the PokeAPI Showcase Flow: Covering All Deployment Angles
- 5.1. Option A: Quick Local Serving with
flow.serve()
- 5.2. Option B: Robust Python SDK Deployment with
flow.deploy()
& Docker - 5.3. Option C: Declarative Deployments with
prefect.yaml
- 5.4. Understanding Code Storage: Bake vs. Pull
- 5.5. Work Pools & Workers: The Dynamic Infrastructure Duo
- 5.6. Schedules: Cron, Interval, RRule, and Parameters
- 5.7. Deployment Versioning (Automatic with Prefect Cloud)
- 5.8. Cancelling and Pausing/Suspending Deployed Runs
- 5.1. Option A: Quick Local Serving with
- Event-Driven Triggers & Automations with Prefect
- Testing Your Prefect Workflows
- Exploring with the Prefect CLI
- Prefect Server/Cloud and Final Thoughts
1. The Grand Plan: Showcasing All of Prefect V3
Our goal is ambitious: we'll create a new flow, the "Ultimate PokeAPI Showcase," which will call upon the PokeApiCrawler
and other tasks to explicitly demonstrate:
- Core Constructs: Flows, Tasks, Parameters.
- Execution Control: Sync/Async tasks, various Task Runners,
.submit()
,.map()
. - Orchestration: Results, States, Retries, Timeouts, Logging, Schedules.
- Deployment:
flow.serve()
,flow.deploy()
,prefect.yaml
, Work Pools (Docker), Workers, Code Storage. - Advanced Features: Caching (diverse policies,
cache_key_fn
), Transactions, Interactive Input (wait_for_input
,send_input
), Artifacts (all types), Variables, Blocks (Custom & System), Secrets. - Observability & Control: State Hooks, Final State Determination, Cancellation, GCLs, Prefect Client, CLI interactions.
- Ecosystem: Prefect Server/Cloud settings.
This will be a very rich flow, designed less for optimal PokeAPI crawling and more for illustrating the breadth of Prefect's capabilities. We'll also integrate the PokeApiCrawler
into the existing hourly discovery and daily refresh flows for a more "production-like" setup.
2. Laying the Groundwork: The PokeApiCrawler
Class
We need a new class in the crawlers/
directory that implements the SourceCrawler
protocol (defined in crawlers/protocols.py
) specifically for PokeAPI.
2.1. Creating crawlers/pokeapi_crawler.py
Create a new file: crawlers/pokeapi_crawler.py
.
# crawlers/pokeapi_crawler.py
import httpx
import pathlib
import asyncio
import logging # Standard logging, will be wrapped by Prefect's logger in tasks/flows
from typing import List, Dict, Any, AsyncIterator, cast
import math # For ceiling division in pagination
# Project-specific imports
from config import DATA_ROOT_DIR # Assumes this is still relevant for local data copies if desired
from core.http_client import make_api_request
from project_blocks import SourceConfigBlock
from prefect.blocks.system import Secret
from .protocols import SourceCrawler # Import the protocol
class PokeApiCrawler(SourceCrawler): # Implement the protocol
"""
A Prefect-integrated crawler for PokeAPI V2.
This class encapsulates the logic to interact with PokeAPI,
configured via a SourceConfigBlock and utilizing Prefect features
when called from within Prefect tasks and flows.
"""
# Attributes `source_name` and `config_block` will be set based on the Protocol
# ... (Implementation details will follow in next steps) ...
2.2. Initializing with Prefect SourceConfigBlock
The __init__
method will load a Prefect SourceConfigBlock
instance. This block, which we'll create later, will store all API-specific configurations for PokeAPI.
# crawlers/pokeapi_crawler.py (inside PokeApiCrawler class)
def __init__(self, source_name: str = "pokeapi"):
"""
Initializes the PokeApiCrawler.
The `source_name` is used to dynamically load a Prefect `SourceConfigBlock`
instance named '{source_name}-source-config'. This block contains all
API-specific settings (base URL, endpoints, pagination details, GCL names, etc.).
PREFECT FEATURE: Dynamic Configuration with Custom Blocks.
By loading configuration from a Prefect Block, we separate connection
details and operational parameters from the crawler's code. This makes the
system highly adaptable: API changes or new source variants can often be
handled by updating the Block in the Prefect UI/API, without code redeployment.
"""
self.source_name = source_name
block_load_name = f"{self.source_name}-source-config"
try:
# PREFECT FEATURE: `Block.load("block-name")`
# This is a synchronous call when used in __init__. If this crawler were
# initialized inside an async Prefect task/flow, you might consider an async
# factory or an async setup method. For __init__, sync load is common.
# The SourceConfigBlock schema is defined in `project_blocks.py`.
self.config_block: SourceConfigBlock = cast(SourceConfigBlock, SourceConfigBlock.load(block_load_name))
logging.info(f"[{self.source_name}] Successfully loaded SourceConfigBlock: {block_load_name}")
except ValueError as e: # Prefect typically raises ValueError if block not found
logging.error(
f"[{self.source_name}] FATAL: Could not load SourceConfigBlock named '{block_load_name}'. "
f"Ensure this block is created in your Prefect workspace. Original error: {e}"
)
raise ValueError(f"Failed to load SourceConfigBlock '{block_load_name}': {e}") from e
# Example of accessing a config value from the block:
logging.debug(f"[{self.source_name}] API Base URL from block: {self.config_block.api_base_url}")
Prefect Insight: We're already seeing Prefect's power. SourceConfigBlock.load()
fetches a structured configuration object managed by Prefect. If API details change, you update the Block, not the code. The cast
is for type hinting clarity.
2.3. Handling API Access (No Key for PokeAPI)
The SourceCrawler
protocol includes get_api_key()
. PokeAPI doesn't require one.
# crawlers/pokeapi_crawler.py (inside PokeApiCrawler class)
async def get_api_key(self) -> str:
"""
Retrieves the API key for PokeAPI. PokeAPI does not require one.
This method demonstrates adapting a generic protocol for a specific API.
The `SourceConfigBlock` has an `api_key_block_name` field. If it's configured
(even with a dummy value) for PokeAPI, we could attempt to load it.
More robustly, for APIs like PokeAPI that need NO key, we can simply return
an empty string or handle the absence of `api_key_block_name` in the config.
PREFECT FEATURE: Secure Secret Handling with `prefect.blocks.system.Secret` (Conceptual).
If PokeAPI *did* require a key, `self.config_block.api_key_block_name` would
point to a `Secret` block instance. We would then use `await Secret.load(...)`
and `.get()` to securely retrieve it. This keeps the actual key out of code
and Prefect manages its encryption and access.
"""
logger = logging.getLogger(f"prefect.pokeapi_crawler.{self.source_name}") # Get a logger
if self.config_block.api_key_block_name and self.config_block.api_key_block_name.lower() != "none":
logger.warning(
f"[{self.source_name}] 'api_key_block_name' ('{self.config_block.api_key_block_name}') "
f"is configured for PokeAPI, but PokeAPI does not require an API key. "
f"Attempting to load it anyway, but it will not be used."
)
# This block demonstrates loading a Secret even if not used, for completeness
try:
# PREFECT FEATURE: Asynchronously loading a Secret block.
api_key_secret = await Secret.load(self.config_block.api_key_block_name)
# PREFECT FEATURE: Securely retrieving the secret value.
key_value = api_key_secret.get()
logger.info(f"[{self.source_name}] Loaded dummy API key from block '{self.config_block.api_key_block_name}'.")
return key_value # Could be a dummy/placeholder if block exists
except ValueError:
logger.warning(f"[{self.source_name}] Secret block '{self.config_block.api_key_block_name}' not found, as expected for keyless API.")
return "" # Return empty string, it won't be used by make_api_request if None/empty
else:
logger.info(f"[{self.source_name}] PokeAPI does not require an API key. Proceeding without one.")
return "" # Explicitly return empty string or None for clarity.
Prefect Insight: We're showing how to fit into a generic pattern (api_key_block_name
in SourceConfigBlock
) even when the specific API doesn't need a part of it. It also points to how Secret
blocks would be used.
2.4. Discovering All Pokémon: Pagination with AsyncIterator
This method lists all Pokémon, handling PokeAPI's limit
and offset
pagination.
# crawlers/pokeapi_crawler.py (inside PokeApiCrawler class)
def _get_nested_value(self, data: Dict, path: List[str], default: Any = None) -> Any:
# This helper from SmitheryCrawler is useful, let's keep it.
current = data
for key in path:
if not isinstance(current, dict) or key not in current: return default
current = current[key]
return current
async def discover_item_ids(self, logger: logging.Logger, client: httpx.AsyncClient = None) -> AsyncIterator[List[str]]:
"""
Asynchronously discovers all Pokémon names (our 'item_ids' for this source)
from the PokeAPI /pokemon endpoint, handling pagination.
This method will be called by a Prefect @task, which provides the `logger`
(a Prefect run logger) and can provide the `client` (an httpx.AsyncClient
managed by the parent flow).
PREFECT FEATURE: Integration with Asynchronous Python (`async def`, `AsyncIterator`).
Prefect seamlessly orchestrates async functions, making I/O-bound tasks
like API calls highly efficient.
"""
logger.info(f"[{self.source_name}] Starting Pokémon ID discovery...")
# Configuration from the 'pokeapi-source-config' Block instance
base_url = self.config_block.api_base_url
list_endpoint = self.config_block.list_endpoint # Should be "pokemon/"
page_size_param = self.config_block.page_size_param # "limit"
page_num_param = self.config_block.page_number_param # "offset"
page_size = self.config_block.default_page_size # e.g., 100
# API key not typically used for GET params in REST, usually a header.
# Our make_api_request handles Bearer tokens if api_key is returned by get_api_key().
# For PokeAPI, api_key will be empty and thus ignored by make_api_request's header logic.
api_key_to_pass = await self.get_api_key() # Will be empty for PokeAPI
current_offset = 0
total_items_count = -1 # We'll get this from the first API call
# Handles client creation if one isn't passed by the calling Prefect task/flow
async def _perform_discovery(actual_client: httpx.AsyncClient):
nonlocal current_offset, total_items_count # Allow modification of outer scope variables
items_yielded_this_run = 0
while True:
params = {
page_size_param: page_size,
page_num_param: current_offset,
}
logger.debug(f"[{self.source_name}] Requesting Pokémon list: offset={current_offset}, limit={page_size}")
# PREFECT FEATURE Context: make_api_request receives the Prefect `logger`.
# Any logs within make_api_request will be associated with the calling task/flow run.
page_data = await make_api_request(
client=actual_client,
method="GET",
base_url=str(base_url), # Ensure HttpUrl is converted to str if make_api_request expects str
endpoint=list_endpoint,
logger=logger,
api_key=api_key_to_pass, # Pass along the (empty) key
params=params
)
if not page_data:
logger.warning(f"[{self.source_name}] No data returned from Pokémon list API at offset {current_offset}.")
break
if total_items_count == -1: # First call
# Path to count: self.config_block.pagination_total_pages_path_in_response -> ["count"] for PokeAPI
count_path = self.config_block.pagination_total_pages_path_in_response
total_items_count = self._get_nested_value(page_data, count_path, 0)
if not isinstance(total_items_count, int): total_items_count = 0 # Safety
logger.info(f"[{self.source_name}] PokeAPI reports {total_items_count} total Pokémon.")
# Path to results list: self.config_block.item_list_path_in_response -> ["results"] for PokeAPI
results_path = self.config_block.item_list_path_in_response
pokemon_summaries_on_page = self._get_nested_value(page_data, results_path, [])
pokemon_names_on_page: List[str] = []
for summary_data in pokemon_summaries_on_page:
# `get_item_unique_id_from_summary_data` will extract "name" based on config
pokemon_names_on_page.append(self.get_item_unique_id_from_summary_data(summary_data))
if pokemon_names_on_page:
logger.info(f"[{self.source_name}] Discovered {len(pokemon_names_on_page)} Pokémon names on this page (offset {current_offset}).")
items_yielded_this_run += len(pokemon_names_on_page)
yield pokemon_names_on_page # Yield a batch of names
else:
logger.info(f"[{self.source_name}] No Pokémon found on page with offset {current_offset}. End of results likely.")
break # No more results on this page, or API changed
current_offset += page_size
if items_yielded_this_run >= total_items_count or current_offset >= total_items_count :
# PokeAPI 'next' field is also a good check, but 'count' is robust for total.
# Using items_yielded_this_run or current_offset check against total_items_count is safer
# if `total_items_count` accurately represents *all* items and not just on a page.
# For PokeAPI, `count` is the total, so this is fine.
logger.info(f"[{self.source_name}] Reached reported total of {total_items_count} Pokémon or end of pages.")
break
# Polite delay between paged requests
# This can also be managed by a Prefect Global Concurrency Limit (GCL)
# if applied to the calling Prefect Task.
await asyncio.sleep(0.1) # Be very gentle with public APIs
# Logic to use provided client or create a new one
if client:
async for batch_of_names in _perform_discovery(client):
yield batch_of_names
else:
logger.warning(f"[{self.source_name}] No httpx.AsyncClient passed to discover_item_ids. Creating a temporary one.")
async with httpx.AsyncClient() as new_internal_client:
async for batch_of_names in _perform_discovery(new_internal_client):
yield batch_of_names
logger.info(f"[{self.source_name}] Pokémon ID discovery process finished.")
Prefect Insight:
- The
logger
andclient
passed in are assumed to originate from a Prefect task/flow context. This means logging is centralized, and HTTP client lifecycle (like connection pooling) can be managed by the flow for efficiency. - The parameters for API interaction (
list_endpoint
,page_size_param
, etc.) are all read fromself.config_block
, demonstrating configuration-driven behavior.
2.5. Fetching Pokémon Details
This method fetches detailed data for a single Pokémon by its name or ID.
# crawlers/pokeapi_crawler.py (inside PokeApiCrawler class)
async def fetch_item_details(self, item_id: str, logger: logging.Logger, client: httpx.AsyncClient = None) -> Dict[str, Any]:
"""
Fetches detailed information for a single Pokémon by its name (or ID).
This method will be called by a Prefect @task, often mapped over many IDs.
It leverages the shared `httpx.AsyncClient` and Prefect `logger`.
The calling Prefect @task will be configured with retries and caching.
PREFECT FEATURE Context: Resilience via Task Retries.
If this API call fails (e.g., temporary network issue), the calling Prefect task
(fetch_and_save_item_details_for_source_task) can automatically retry.
PREFECT FEATURE Context: Performance via Task Caching.
If this Pokémon's details were recently fetched and haven't "expired"
according to the cache key function, the calling Prefect task can return
a cached result, avoiding this API call.
"""
logger.info(f"[{self.source_name}] Fetching details for Pokémon: '{item_id}'")
# Configuration from the 'pokeapi-source-config' Block
base_url = self.config_block.api_base_url
# `detail_endpoint_template` from block should be "pokemon/{}"
detail_endpoint = self.config_block.detail_endpoint_template.format(item_id)
api_key_to_pass = await self.get_api_key()
async def _perform_fetch(actual_client: httpx.AsyncClient) -> Dict[str, Any]:
return await make_api_request(
client=actual_client,
method="GET",
base_url=str(base_url),
endpoint=detail_endpoint,
logger=logger,
api_key=api_key_to_pass
# No specific params for this detail endpoint typically, ID is in path
)
if client:
return await _perform_fetch(client)
else:
logger.warning(f"[{self.source_name}] No httpx.AsyncClient passed to fetch_item_details for '{item_id}'. Creating temporary one.")
async with httpx.AsyncClient() as new_internal_client:
return await _perform_fetch(new_internal_client)
Prefect Insight: This method is lean because the actual HTTP logic is in make_api_request
, and resilience/caching are handled by the calling Prefect task. This separation of concerns is key.
2.6. Essential Helper Methods
These helpers derive information from the config_block
or provide utility.
# crawlers/pokeapi_crawler.py (inside PokeApiCrawler class)
def get_item_unique_id_from_summary_data(self, item_summary_data: Dict[str, Any]) -> str:
"""
Extracts the Pokémon's name from the summary data returned by the
PokeAPI /pokemon list endpoint. For PokeAPI, item['name'] is the ID.
The field name ('name') is specified in the SourceConfigBlock's
`item_id_field_in_summary` attribute.
"""
# From config_block: self.config_block.item_id_field_in_summary (should be "name" for PokeAPI)
id_field = self.config_block.item_id_field_in_summary
pokemon_name = item_summary_data.get(id_field)
if not pokemon_name or not isinstance(pokemon_name, str):
# Log this situation using a standard logger, as this might be called outside a Prefect logger context sometimes
# Or, ensure a logger is always passed even to this method if it's critical to capture in Prefect.
# For now, standard logging is fine for this utility.
logging.warning(f"[{self.source_name}] Could not extract Pokémon name using field '{id_field}' from summary: {item_summary_data}")
raise ValueError(f"Pokémon name not found or not a string in summary using field '{id_field}'.")
return pokemon_name.lower() # Standardize to lowercase, as PokeAPI is case-insensitive for names in path
def get_data_path_for_item(self, item_id: str) -> pathlib.Path:
"""
Returns the local file path for storing the JSON data of a specific Pokémon.
Used by `save_item_data_locally_task`. This specific method for local saving
might be superseded if all results are persisted via Prefect's S3/MinIO storage.
It can still be useful for creating local copies for quick debugging or archives.
"""
# Ensure item_id is filesystem-safe (e.g. for names with special characters if any)
safe_item_id = str(item_id).replace("/", "_").replace(":", "_").replace("\\", "_")
# DATA_ROOT_DIR is from `config.py`
path = DATA_ROOT_DIR / self.source_name / "pokemon_details" / f"{safe_item_id}.json"
return path
# The following methods directly expose configured values from the SourceConfigBlock.
# This adheres to the SourceCrawler protocol and makes these values easily accessible.
def get_refresh_interval_hours(self) -> int:
"""Returns the configured data refresh interval in hours for this source."""
# PREFECT FEATURE Context: This value, from SourceConfigBlock, is used by
# the `item_detail_cache_key_fn` in `tasks.py` to determine cache TTL.
return self.config_block.refresh_interval_hours
def get_gcl_name(self) -> str:
"""Returns the configured Prefect Global Concurrency Limit name for this source."""
# PREFECT FEATURE Context: This GCL name (e.g., "pokeapi-public-calls") is used
# by tasks to limit concurrent API calls, enforced by the Prefect server/Cloud.
return self.config_block.gcl_name
# These protocol methods might not be strictly needed by PokeAPI, but are part of the protocol.
# Implement them minimally or adapt if there's a conceptual equivalent for PokeAPI.
def get_state_file_path(self) -> pathlib.Path:
"""Returns path for a source-specific state file (e.g., last successful run timestamp)."""
return DATA_ROOT_DIR / self.source_name / f"{self.source_name}_crawler_state.json"
def get_item_list_cache_path(self) -> pathlib.Path:
"""Path for a local cache of discovered item IDs (if local caching is used)."""
# Note: Prefect's built-in caching (`persist_result` and `cache_key_fn` in tasks)
# is generally preferred over manual file-based caching for discover_item_ids task results.
# This method is here for protocol adherence or if an alternative local cache was desired.
return DATA_ROOT_DIR / self.source_name / f"{self.source_name}_discovered_items_local_cache.json"
3. Integrating PokeApiCrawler
into the Prefect Ecosystem
Now that the PokeApiCrawler
class is defined, we need to make the Prefect system aware of it and configure its operational parameters using Prefect Blocks.
3.1. Registering in crawlers/__init__.py
This makes get_crawler("pokeapi")
work.
- Open
crawlers/__init__.py
. - Import
PokeApiCrawler
:from .pokeapi_crawler import PokeApiCrawler
-
Add it to
_CRAWLER_CLASSES
:
# crawlers/__init__.py # ... other imports from .pokeapi_crawler import PokeApiCrawler # <<< IMPORT _CRAWLER_CLASSES: Dict[str, Type[SourceCrawler]] = { "smithery": SmitheryCrawler, "pokeapi": PokeApiCrawler, # <<< REGISTER # "glama": GlamaCrawler, # Example from previous } # ... (rest of the file)
3.2. Crafting the pokeapi-source-config
Prefect Block
This is the central configuration piece for your new crawler. It's an instance of SourceConfigBlock
specific to PokeAPI.
- Create a new Python script in your project root, e.g.,
create_pokeapi_block.py
. -
Populate it with PokeAPI V2 specifics:
# create_pokeapi_block.py from project_blocks import SourceConfigBlock # Your custom block definition from pydantic import HttpUrl # For type validation of URLs print("Attempting to create/update 'pokeapi-source-config' Prefect Block...") # Define the configuration for the PokeAPI data source pokeapi_config = SourceConfigBlock( # ----- General API Configuration ----- api_base_url=HttpUrl("https://pokeapi.co/api/v2/"), # PokeAPI does not require an API key. We'll configure the crawler to handle this. # For consistency in the block structure, we can set a placeholder or specific "None" value. # The `PokeApiCrawler.get_api_key()` will know not to actually use it. api_key_block_name="pokeapi-no-key-dummy", # We'll create a dummy Secret block with this name. # Name of the Prefect Global Concurrency Limit to use for this API. gcl_name="pokeapi-public-calls", # We'll create this GCL later. # ----- Item List/Discovery Endpoint Configuration ----- list_endpoint="pokemon/", # Endpoint to list all Pokémon page_size_param="limit", # Query parameter name for page size page_number_param="offset", # Query parameter name for page offset (0-indexed) default_page_size=100, # Fetch 100 Pokémon per page by default # JSON path to the actual list of items within the API response # For /pokemon/, response is {"count": N, "next": ..., "previous": ..., "results": [...]} item_list_path_in_response=["results"], # JSON path to the field within each summary item that contains the unique ID # For /pokemon/ results, each item is {"name": "bulbasaur", "url": "..."} -> "name" is our ID item_id_field_in_summary="name", # JSON path to the total count of items (not total pages) in the paginated response # For /pokemon/, this is the top-level "count" field. pagination_total_pages_path_in_response=["count"], # Note: Crawler logic needs to calculate pages from count # ----- Item Detail Endpoint Configuration ----- # Endpoint template for fetching a single item. "{}" will be replaced with the item_id (Pokémon name). detail_endpoint_template="pokemon/{}", # ----- Caching & Refresh Configuration ----- # How often to consider fetched Pokémon details "stale" and worth refreshing (e.g., daily). refresh_interval_hours=24, # Used by item_detail_cache_key_fn # How long the list of *all* discovered Pokémon names should be cached. # Shorter TTL for discovery means we check for *new* Pokémon more often. item_list_cache_ttl_seconds=3600, # 1 hour. Used by discover_cache_key_fn ) # Save the block document to Prefect. This also registers the block type if not already present. # The name "pokeapi-source-config" is critical as `PokeApiCrawler` will load it using this name. try: pokeapi_config.save(name="pokeapi-source-config", overwrite=True) print("SUCCESS: Prefect Block 'pokeapi-source-config' created/updated successfully.") print("Next steps:") print("1. Create a dummy Secret Block named 'pokeapi-no-key-dummy' (value can be anything like 'not_applicable').") print("2. Create a Global Concurrency Limit (GCL) named 'pokeapi-public-calls' (e.g., with a limit of 5-10).") print(" Example CLI: `prefect gcl create pokeapi-public-calls --limit 10`") except Exception as e: print(f"ERROR: Could not save 'pokeapi-source-config' block: {e}") print("Ensure you are logged into a Prefect backend (Cloud or local server).") if __name__ == "__main__": # This allows running the script directly: `python create_pokeapi_block.py` # (No specific execution needed here beyond what's already in global scope, # but keeping __main__ block is good practice for script structure.) pass
-
Run this script from your terminal:
python create_pokeapi_block.py
- This connects to your active Prefect backend and saves this configuration.
- PREFECT FEATURE: Custom Block Definition (
SourceConfigBlock
) andblock.save()
. This act makes the configuration available to any flow/task run in this Prefect environment. It's versioned if you are using Prefect Cloud and update it.
-
Create the Dummy
Secret
Block:
SinceSourceConfigBlock
has anapi_key_block_name
field, and PokeAPI doesn't use a key, we need to satisfy this field.- Go to your Prefect UI -> Blocks ->
+
. - Select
Secret
type. - Name:
pokeapi-no-key-dummy
(must match what's inpokeapi-source-config
). - Value:
"not_applicable"
(or any placeholder). - Save.
- Go to your Prefect UI -> Blocks ->
3.3. Setting up the pokeapi-public-calls
Global Concurrency Limit
To interact politely with the public PokeAPI and to demonstrate GCLs:
prefect gcl create pokeapi-public-calls --limit 10
# PokeAPI is quite resilient, so 10 concurrent calls might be acceptable.
# Adjust if experiencing issues or for stricter politeness.
# This GCL is referenced by name in your `pokeapi-source-config` block.
PREFECT FEATURE: Global Concurrency Limits. Defined once server-side, respected by any task (async with concurrency("pokeapi-public-calls")
) asking for it.
4. Building the "Ultimate PokeAPI Showcase" Prefect Flow
Now, for the centerpiece! This flow will demonstrate a multitude of Prefect V3 features using our new PokeApiCrawler
. This flow will live in a new file, e.g., flows/pokeapi_showcase_flow.py
.
4.1. Flow Definition and Core Concepts
# flows/pokeapi_showcase_flow.py
import asyncio
import httpx
from datetime import timedelta, datetime
from typing import List, Dict, Optional, Any
import random # For simulating choices or failures
from prefect import flow, task, get_run_logger, unmapped,serve
from prefect.context import get_run_context
from prefect.states import Completed, Failed # PREFECT FEATURE: Manual State Returns
from prefect.client.schemas.objects import FlowRun # For type hints with client
from prefect.client.main import get_client # PREFECT FEATURE: PrefectClient
# PREFECT FEATURE: Flow Run Input with Pydantic models
from prefect.input import RunInput
# PREFECT FEATURE: Task Runners
from prefect.task_runners import ThreadPoolTaskRunner, DaskTaskRunner # Dask is an extra: pip install prefect[dask]
# PREFECT FEATURE: Caching Policies & utilities
from prefect.tasks import task_input_hash
from prefect.cache_policies import INPUTS, TASK_SOURCE, NO_CACHE, CachePolicy, LRUCache
# PREFECT FEATURE: Transactions
from prefect.transactions import transaction
# PREFECT FEATURE: Event System
from prefect.events import emit_event
# PREFECT FEATURE: Artifacts
from prefect.artifacts import (
create_markdown_artifact,
create_table_artifact,
create_link_artifact,
create_progress_artifact,
update_progress_artifact,
create_image_artifact, # We'll need an actual image URL for this
)
# PREFECT FEATURE: Variables
from prefect.variables import Variable
# Project-specific imports
from crawlers import get_crawler # To get our PokeApiCrawler instance
from tasks import ( # We'll reuse our robust tasks, passing the PokeAPI client & config
discover_all_item_ids_for_source_task,
fetch_and_save_item_details_for_source_task,
# We might define some new tasks specific to this showcase below
)
# Load the SourceConfigBlock type, primarily for Pydantic validation in flow params.
from project_blocks import SourceConfigBlock
# Pydantic model for flow parameters for advanced validation demonstration
class PokeApiShowcaseParams(RunInput): # PREFECT FEATURE: Using RunInput for Pydantic parameters
_block_type_name = "PokeAPI Showcase Parameters Input" # Optional: For UI if used with wait_for_input
source_name: str = "pokeapi"
pokemon_to_spotlight: str = "pikachu"
max_pokemon_to_discover_in_showcase: int = 25 # A smaller number for demo speed
enable_interactive_pause: bool = True
force_detail_refresh: bool = False
minio_results_block_name: str = "minio-crawler-results" # Default block for S3 results
# Example of a more complex Pydantic validation
# @validator("pokemon_to_spotlight")
# def name_must_be_alpha(cls, v):
# if not v.isalpha():
# raise ValueError("Pokemon name must be alphabetic for spotlight")
# return v.lower()
# --- Showcase Specific Tasks ---
@task(name="Fetch Pokémon Species Data", retries=1)
async def fetch_species_data_task(pokemon_name: str, client: httpx.AsyncClient, config_block: SourceConfigBlock) -> Dict[str, Any]:
logger = get_run_logger()
logger.info(f"Fetching SPECIES data for {pokemon_name} using specific task.")
# PokeAPI species data is often linked from the main pokemon data
# Example URL: https://pokeapi.co/api/v2/pokemon-species/{pokemon_name_or_id}/
# We'll use the main pokemon detail endpoint for this mock-up to keep it simpler
# than navigating through HATEOAS links within this example.
# In a real scenario, you'd extract the species URL from the initial pokemon detail response.
detail_endpoint = config_block.detail_endpoint_template.format(f"{pokemon_name}") # Mocking this by re-using pokemon detail endpoint
# Actually, let's be more correct. The detail URL from discover_item_ids for 'pikachu'
# would be 'https://pokeapi.co/api/v2/pokemon/25/' or similar if discover_task also returned URLs
# For simplicity in this example, assume pokemon_name can be used for /pokemon-species/
species_endpoint = f"pokemon-species/{pokemon_name}"
species_data = await make_api_request(
client, "GET", str(config_block.api_base_url), species_endpoint, logger, api_key="" # PokeAPI uses no key
)
return species_data if species_data else {}
# This task demonstrates an advanced cache policy combination.
advanced_cache_policy = TASK_SOURCE + INPUTS # Cache based on task code AND inputs
@task(
name="Analyze Evolution Chain (Advanced Cache)",
cache_policy=advanced_cache_policy,
cache_expiration=timedelta(hours=6),
persist_result=True
)
async def analyze_evolution_chain_task(species_data: Dict[str, Any], client: httpx.AsyncClient, logger: logging.Logger) -> List[str]:
logger.info(f"Analyzing evolution chain for species related to: {species_data.get('name', 'Unknown')}")
evolution_chain_info = species_data.get("evolution_chain", {})
chain_url = evolution_chain_info.get("url")
if not chain_url:
logger.warning("No evolution chain URL found in species data.")
return []
# make_api_request needs base_url and endpoint separated. We only have full URL here.
# A more robust http_client would handle full URLs too. For now, quick parse:
parsed_url = httpx.URL(chain_url)
base_url = f"{parsed_url.scheme}://{parsed_url.host}"
endpoint = parsed_url.path # includes /api/v2 prefix typically
logger.info(f"Fetching evolution chain from: {chain_url}")
chain_data = await make_api_request(client, "GET", base_url, endpoint, logger, api_key="")
evolutions = []
if chain_data and "chain" in chain_data:
current_link = chain_data["chain"]
while current_link:
evolutions.append(current_link["species"]["name"])
if current_link["evolves_to"]:
current_link = current_link["evolves_to"][0] # Simplification: takes first evolution path
else:
current_link = None
logger.info(f"Evolution path: {' -> '.join(evolutions) if evolutions else 'N/A'}")
return evolutions
# --- THE MAIN SHOWCASE FLOW ---
@flow(
name="Ultimate PokeAPI Prefect Showcase",
description="A flow demonstrating a wide array of Prefect V3 features using PokeAPI.",
# PREFECT FEATURE: Flow Version (can be dynamic based on git commit etc. too)
version="1.0.0-showcase",
# PREFECT FEATURE: Task Runner specified on Flow
# Tasks called with .submit() or .map() will use this runner.
# For I/O-bound API calls, ThreadPoolTaskRunner is good.
# DaskTaskRunner/RayTaskRunner for CPU-bound parallelism (require extra installs).
task_runner=ThreadPoolTaskRunner(max_workers=5), # Allow up to 5 concurrent submitted tasks
# PREFECT FEATURE: Default Retries/Timeouts on Flow
retries=1, # Default retries for the flow itself if it fails
retry_delay_seconds=30,
timeout_seconds=3600, # Max 1 hour for the whole showcase
# PREFECT FEATURE: Result Storage and Persistence for the flow's own return value
# result_storage=S3Bucket.load("minio-crawler-results"), # Uncomment if minio block exists
# persist_result=True, # To save the flow's return value
# PREFECT FEATURE: log_prints=True to capture print() statements
log_prints=True,
)
async def ultimate_pokeapi_showcase_flow(params: PokeApiShowcaseParams):
"""
This flow will:
1. Load configurations using Prefect Blocks.
2. Discover a small batch of Pokémon using a reusable, cached task.
3. Fetch details for these Pokémon concurrently using .map().
4. Spotlight a specific Pokémon, fetching its species and evolution chain within a Prefect Transaction.
5. Demonstrate interactive pausing with user input.
6. Create various types of Prefect Artifacts.
7. Emit a Prefect Event.
8. Use Prefect Variables.
9. Optionally demonstrate Dask/Ray task runners if a section is uncommented.
10. Use PrefectClient for API interaction.
11. Showcase logging, settings, state hooks, and more.
"""
# PREFECT FEATURE: `get_run_logger()` for contextualized logging.
logger = get_run_logger()
logger.info(f"Starting Ultimate PokeAPI Showcase with params: {params.dict()}")
# PREFECT FEATURE: Get current flow run context (includes names, ids, etc.)
flow_run_context = get_run_context().flow_run
if flow_run_context:
logger.info(f"Flow Run Name: {flow_run_context.name}, ID: {flow_run_context.id}")
# PREFECT FEATURE: Generating dynamic Flow Run Name via flow decorator
# We can template flow_run_name e.g. @flow(flow_run_name="poke-showcase-{params.pokemon_to_spotlight}")
# if parameters are available at definition time, or use this runtime approach
# to rename if needed (though less common, for complex naming logic):
# await client.update_flow_run(flow_run_id=flow_run_context.id, name=f"poke-run-{params.pokemon_to_spotlight}-{datetime.utcnow().strftime('%Y%m%d%H%M')}")
# --- 1. Configuration Loading (using Prefect Blocks) ---
logger.info("--- Section 1: Configuration with Prefect Blocks ---")
# PREFECT FEATURE: Loading custom SourceConfigBlock
# The crawler instance loads its own block. We access it for certain parameters.
crawler = get_crawler(params.source_name) # "pokeapi"
config_block = crawler.config_block
print(f"Loaded config for '{params.source_name}' from block '{config_block._block_document_name}'. API Base: {config_block.api_base_url}")
# PREFECT FEATURE: Using a general System Variable
# For non-sensitive, dynamic settings managed via Prefect UI/API
# Create this variable: `prefect variable set POKEAPI_SHOWCASE_GREETING "Hello from Prefect Variables!"`
try:
showcase_greeting = await Variable.get("POKEAPI_SHOWCASE_GREETING")
print(f"Variable POKEAPI_SHOWCASE_GREETING: {showcase_greeting.value}")
except Exception as e: # Could be TypeError if not found and no default, or other PrefectError
logger.warning(f"Could not load POKEAPI_SHOWCASE_GREETING variable: {e}. Using default.")
showcase_greeting_value = "Hello from Fallback Greeting!"
print(f"Variable POKEAPI_SHOWCASE_GREETING: {showcase_greeting_value} (using default)")
# PREFECT FEATURE: Establishing an HTTP Client managed by the Flow
# This client will be passed to tasks, promoting connection pooling and easier configuration.
async with httpx.AsyncClient(timeout=20.0) as client: # Shared client for tasks
logger.info("Initialized shared httpx.AsyncClient for tasks.")
# --- 2. Discovering Pokémon (Reusable Task, Caching) ---
logger.info(f"--- Section 2: Discovering First {params.max_pokemon_to_discover_in_showcase} Pokémon ---")
# PREFECT FEATURE: Calling a Task, results are automatically persisted if task's persist_result=True
# The `discover_all_item_ids_for_source_task` has `cache_key_fn` and `persist_result=True`.
# It also now takes `item_list_cache_ttl_seconds` from its parameters, driven by `config_block`.
# And it takes the `client`.
# Note: discover_all_item_ids_for_source_task yields batches, we need to collect them.
discovered_pokemon_names: List[str] = []
async for batch in discover_all_item_ids_for_source_task( # Not .submit(), running in main flow thread here.
source_name=params.source_name,
item_list_cache_ttl_seconds=config_block.item_list_cache_ttl_seconds,
client=client
):
discovered_pokemon_names.extend(batch)
if len(discovered_pokemon_names) >= params.max_pokemon_to_discover_in_showcase:
break # Limit for the showcase
# Trim to exact number for the showcase demo
discovered_pokemon_names = discovered_pokemon_names[:params.max_pokemon_to_discover_in_showcase]
print(f"Discovered {len(discovered_pokemon_names)} Pokémon: {discovered_pokemon_names[:5]}...")
# --- 3. Fetching Details Concurrently (Task.map, GCLs) ---
logger.info(f"--- Section 3: Fetching Details Concurrently for {len(discovered_pokemon_names)} Pokémon ---")
if discovered_pokemon_names:
# PREFECT FEATURE: `Task.map()` for parallel execution over inputs.
# Uses the flow's configured `task_runner`.
# `fetch_and_save_item_details_for_source_task` uses GCL "pokeapi-public-calls".
# It also has its own `cache_key_fn`, `persist_result=True`, `retries`, `task_run_name`.
detail_futures = fetch_and_save_item_details_for_source_task.with_options(
name=f"map-fetch-pokemon-details", # Names the "Map" task itself
# PREFECT FEATURE: Forcing cache refresh on mapped tasks if needed
refresh_cache=params.force_detail_refresh
).map(
source_name=unmapped(params.source_name),
item_id=discovered_pokemon_names,
refresh_interval_hours=unmapped(config_block.refresh_interval_hours),
client=unmapped(client) # Pass the shared client
)
# PREFECT FEATURE: Waiting for and retrieving results from futures
# (This happens implicitly if futures are passed to other tasks/returned,
# but explicit .result() or .wait() is sometimes needed.)
pokemon_details_list = []
print(f"Waiting for {len(detail_futures)} detail fetch tasks...")
# Create a progress artifact for this mapped operation
detail_progress_id = await create_progress_artifact(
total=len(detail_futures),
description="Fetching Pokémon Details via Map"
)
for i, future in enumerate(detail_futures):
try:
# PREFECT FEATURE: future.result(raise_on_failure=True) is default.
# `raise_on_failure=False` would return the exception object instead.
result = await future.result(timeout=60) # Timeout for individual future result
if result: pokemon_details_list.append(result)
except Exception as e:
logger.error(f"Failed to get result for Pokémon '{discovered_pokemon_names[i]}': {e}")
await update_progress_artifact(detail_progress_id, progress=(i + 1))
print(f"Fetched details for {len(pokemon_details_list)} Pokémon.")
else:
logger.warning("No Pokémon discovered, skipping detail fetching.")
pokemon_details_list = []
# --- 4. Spotlight Pokémon: Transactions, Nested Task, Advanced Caching ---
logger.info(f"--- Section 4: Spotlight on {params.pokemon_to_spotlight} (Transactions, Advanced Cache) ---")
spotlight_pokemon_name = params.pokemon_to_spotlight
# PREFECT FEATURE: Transactions for atomicity / related operations
# If any task within this transaction fails, the entire transaction can be considered failed.
# Rollback hooks (if defined on tasks using @task.on_rollback) would trigger.
async with transaction(name=f"Process Spotlight Pokemon - {spotlight_pokemon_name}"):
logger.info(f"Transaction started for {spotlight_pokemon_name}.")
# Fetch base details first (might hit cache from previous map or run)
spotlight_details_task = fetch_and_save_item_details_for_source_task.submit( # PREFECT_FEATURE: Task.submit()
source_name=params.source_name,
item_id=spotlight_pokemon_name,
refresh_interval_hours=config_block.refresh_interval_hours,
client=client, # Pass the shared client
# task_run_name=f"fetch-spotlight-{spotlight_pokemon_name}" # Dynamic name for submitted task
)
# This is explicitly naming the task run for THIS submission of the task.
# It overrides the `task_run_name` from the @task decorator if both are present
# for this specific submitted instance.
spotlight_details = await spotlight_details_task.result() # Wait for this one specifically
if spotlight_details:
# PREFECT FEATURE: Task calling another task (implicitly forms DAG)
# Here, we use a new task `fetch_species_data_task`.
species_data_future = fetch_species_data_task.submit(
pokemon_name=spotlight_pokemon_name, client=client, config_block=config_block
)
species_data = await species_data_future.result()
if species_data:
# PREFECT FEATURE: Task with custom composite cache policy (TASK_SOURCE + INPUTS)
# `analyze_evolution_chain_task` will use cache if inputs AND its code are same.
evolution_future = analyze_evolution_chain_task.submit(species_data, client, logger) # Logger passed here
evolution_path = await evolution_future.result()
print(f"Evolution path for {spotlight_pokemon_name}: {' -> '.join(evolution_path)}")
else:
logger.warning(f"No species data for {spotlight_pokemon_name}, skipping evolution chain analysis.")
else:
logger.error(f"Could not fetch details for spotlight Pokémon {spotlight_pokemon_name}. Transaction might effectively fail if this was critical.")
# PREFECT FEATURE: Raising an exception here would fail the transaction
# raise ValueError("Spotlight details fetch failed, aborting transaction.")
logger.info(f"Transaction for {spotlight_pokemon_name} complete.")
# --- 5. Interactive Workflow: Pause for User Input ---
logger.info(f"--- Section 5: Interactive Workflow (Pause/Suspend, Receive Input) ---")
if params.enable_interactive_pause:
print(f"Simulating a scenario requiring user approval for {spotlight_pokemon_name}...")
# PREFECT FEATURE: `pause_flow_run` with `wait_for_input`
# The flow will pause here, keeping infrastructure (container) running.
# User must go to Prefect UI to resume and provide input.
try:
approval_decision_model = ApprovalInput.with_initial_data(
description=f"## Pokémon Processing Approval Needed!\n\nShould we proceed with *extra* processing for **{spotlight_pokemon_name}**?"
f"\n(Example of `{params.dict()}`: {params.dict()})",
action="approve" # Pre-fill for user
)
# PREFECT FEATURE: Pydantic model for RunInput shown here.
user_input: ApprovalInput = await pause_flow_run(
wait_for_input=approval_decision_model,
timeout=300 # 5 minute timeout for user to respond
)
print(f"Resumed from pause! User decision: {user_input.action}, Reason: '{user_input.reason}'")
if user_input.action.lower() != "approve":
print("User denied extra processing. Skipping.")
except TimeoutError:
print("Pause timed out. No user input received for extra processing.")
else:
print("Interactive pause disabled by parameters.")
# PREFECT FEATURE: `suspend_flow_run` (Conceptual - for longer pauses)
# If we needed a very long pause (e.g., hours/days for manual data validation),
# `suspend_flow_run()` would stop the flow AND release infrastructure.
# On resume, the flow *restarts from the beginning* (relying on task caching).
# Example:
# await suspend_flow_run(timeout=timedelta(days=1).total_seconds())
# print("Resumed after suspension!") # This would only be hit after a full flow restart
# PREFECT FEATURE: `receive_input` / `send_input` for dynamic commands
print("Listening for an external 'go_faster' command via receive_input for 15s...")
try:
async for command in receive_input(str, timeout=15, poll_interval=3):
logger.info(f"Flow run received command: '{command}'")
if command.lower() == "go_faster_please":
logger.info("Acknowledged 'go_faster_please' command! (Simulation)")
# In a real flow, could adjust GCLs, batch sizes etc.
break # Exit after one command for demo
except TimeoutError:
logger.info("No external command received via receive_input within timeout.")
# To send input: from another script/flow:
# `from prefect.input import send_input; await send_input("go_faster_please", flow_run_id="")`
# --- 6. Rich Observability with Prefect Artifacts ---
logger.info(f"--- Section 6: Creating Prefect Artifacts ---")
# PREFECT FEATURE: Markdown Artifact
report_md = f"# PokeAPI Showcase Report: {spotlight_pokemon_name}\n\n"
report_md += f"Timestamp: {datetime.utcnow().isoformat()}Z\n"
if spotlight_details:
report_md += f"## {spotlight_pokemon_name.capitalize()} Details\n"
report_md += f"- ID: {spotlight_details.get('id', 'N/A')}\n"
report_md += f"- Height: {spotlight_details.get('height', 'N/A')}\n"
report_md += f"- Weight: {spotlight_details.get('weight', 'N/A')}\n"
primary_type = spotlight_details.get('types', [{}])[0].get('type', {}).get('name', 'N/A')
report_md += f"- Primary Type: {primary_type}\n"
else:
report_md += "Spotlight Pokémon details could not be fetched.\n"
await create_markdown_artifact(
key=f"pokemon-spotlight-report-{spotlight_pokemon_name}", # Key for versioning
markdown=report_md,
description=f"Summary report for {spotlight_pokemon_name}."
)
print(f"Created Markdown artifact for {spotlight_pokemon_name}.")
# PREFECT FEATURE: Table Artifact
if pokemon_details_list:
sample_table_data = [
{
"Name": p.get("name", "N/A"),
"ID": p.get("id", "N/A"),
"Height": p.get("height", "N/A"),
"Weight": p.get("weight", "N/A"),
"Primary Type": p.get('types', [{}])[0].get('type', {}).get('name', 'N/A')
}
for p in pokemon_details_list[:3] # Sample of first 3
]
await create_table_artifact(
key="pokemon-showcase-sample-table",
table=sample_table_data,
description="A sample of Pokémon details fetched during this showcase run."
)
print("Created Table artifact with sample Pokémon data.")
# PREFECT FEATURE: Link Artifact
await create_link_artifact(
link=f"https://pokeapi.co/api/v2/pokemon/{spotlight_pokemon_name}",
link_text=f"PokeAPI Page for {spotlight_pokemon_name.capitalize()}",
description="Direct link to the PokeAPI resource for the spotlight Pokémon.",
key="spotlight-pokemon-api-link"
)
print("Created Link artifact to PokeAPI.")
# PREFECT FEATURE: Image Artifact (using a known sprite URL)
# Typically you'd fetch the sprite URL from the pokemon_details
if spotlight_details and 'sprites' in spotlight_details and spotlight_details['sprites'].get('front_default'):
sprite_url = spotlight_details['sprites']['front_default']
await create_image_artifact(
image_url=sprite_url, # Must be a publicly accessible URL
description=f"Default sprite for {spotlight_pokemon_name.capitalize()}.",
key=f"pokemon-sprite-{spotlight_pokemon_name}"
)
print(f"Created Image artifact for {spotlight_pokemon_name}'s sprite.")
else:
# Fallback to a generic image if no sprite
await create_image_artifact(
image_url="https://raw.githubusercontent.com/PrefectHQ/prefect/main/docs/img/prefect-logo-mark-solid-white-500.png",
description="Generic Prefect Logo (Spotlight sprite not available).",
key="generic-placeholder-image"
)
print(f"Sprite for {spotlight_pokemon_name} not found, created generic image artifact.")
# --- 7. Event Emission ---
logger.info(f"--- Section 7: Emitting a Prefect Event ---")
# PREFECT FEATURE: `emit_event`
# Emits an event that can be used to trigger Automations or for external observability.
if spotlight_details:
await emit_event(
event=f"showcase.pokemon.processed",
resource={"prefect.resource.id": f"pokeapi.pokemon.{spotlight_pokemon_name}", "name": spotlight_pokemon_name},
payload={"id": spotlight_details.get("id"), "types": spotlight_details.get("types", [])}
)
print(f"Emitted 'showcase.pokemon.processed' event for {spotlight_pokemon_name}.")
# --- 8. Final Flow State Determination and Client Interaction ---
logger.info(f"--- Section 8: Finalizing Flow & Prefect Client Demo ---")
# PREFECT FEATURE: Accessing current client for API interaction
try:
async with get_client() as prefect_client: # `prefect_client` not just `client` to avoid name clash
logger.info("Successfully obtained PrefectClient.")
if flow_run_context:
# Read current flow run again, perhaps its state changed
updated_flow_run = await prefect_client.read_flow_run(flow_run_context.id)
logger.info(f"Current flow run state (via client post-operations): {updated_flow_run.state.name if updated_flow_run.state else 'N/A'}")
# Example: Read deployments using client filters (less critical for this specific flow)
# deployments = await prefect_client.read_deployments(
# deployment_filter=DeploymentFilter(name=dict(like_="%pokeapi%"))
# )
# logger.info(f"Found {len(deployments)} deployments with 'pokeapi' in name.")
except Exception as e:
logger.error(f"Error during PrefectClient usage: {e}")
# PREFECT FEATURE: Explicitly returning a State object
# This sets the final state of the flow run and its message.
final_message = f"Ultimate PokeAPI Showcase for {params.pokemon_to_spotlight} completed successfully!"
print(final_message)
return Completed(message=final_message, result={"pokemon_spotlighted": spotlight_pokemon_name, "discovered_count": len(discovered_pokemon_names)})
# Example of returning a Failed state:
# return Failed(message="Something went deliberately wrong in the showcase!")
# PREFECT FEATURE: State Change Hooks on Flow
# These are defined outside the flow body, associated with the flow object.
@ultimate_pokeapi_showcase_flow.on_completion
async def showcase_flow_completed_hook(flow: "Flow", flow_run: FlowRun, state: "State"):
# Note: type hints in quotes if classes are not yet defined or for forward refs.
print(f"COMPLETION HOOK: Flow '{flow.name}' / Run '{flow_run.name}' finished with state: {state.name}")
# Here you could send a final notification, update an external system, etc.
# Example: if state.is_failed(): send_critical_alert_email_via_block()
@ultimate_pokeapi_showcase_flow.on_failure
async def showcase_flow_failed_hook(flow: "Flow", flow_run: FlowRun, state: "State"):
print(f"FAILURE HOOK: Flow '{flow.name}' / Run '{flow_run.name}' FAILED. State: {state.name}, Message: {state.message}")
# Detailed error reporting or specific cleanup
# --- For local execution and `flow.serve()` or `flow.deploy()` ---
if __name__ == "__main__":
# PREFECT FEATURE: Default parameter values for local run / serving.
# These would be used if `.serve()` or `.deploy()` is called without specific params.
default_params = PokeApiShowcaseParams(
pokemon_to_spotlight="charizard",
max_pokemon_to_discover_in_showcase=15,
enable_interactive_pause=True # Set to False for fully unattended runs of this script
)
# To run locally:
# asyncio.run(ultimate_pokeapi_showcase_flow(params=default_params))
# PREFECT FEATURE: `flow.serve()` to create a deployment and start serving it.
# This makes the flow runnable via API, UI, or schedule.
# The `name` for `serve` becomes the deployment name suffix.
ultimate_pokeapi_showcase_flow.serve(
name="showcase-deployment-script-served", # Deployment will be "Ultimate PokeAPI Prefect Showcase/showcase-deployment-script-served"
parameters=default_params.dict(), # Pass Pydantic model as dict
# PREFECT FEATURE: Schedule for flow.serve()
cron="0 5 * * *" # Example: Run daily at 5 AM UTC
)
# PREFECT FEATURE: Settings like PREFECT_API_URL, PREFECT_API_KEY, PREFECT_LOGGING_LEVEL
# are active during this `serve` process, typically from profile or environment.
# `prefect.toml` or `pyproject.toml` in project root can also define these.
# PREFECT FEATURE: Prefect CLI commands this `serve` enables:
# - `prefect deployment ls` (to see this deployment)
# - `prefect deployment run "Ultimate PokeAPI Prefect Showcase/showcase-deployment-script-served"`
# - `prefect flow-run logs `
# - `prefect work-pool create ...` / `prefect worker start ...` (if deploying to a worker instead of serve)
# PREFECT FEATURE: `prefect shell watch` / `prefect shell serve`
# These CLI tools allow running arbitrary shell commands as Prefect flows.
# Example to try in terminal:
# `prefect shell watch "curl -s https://pokeapi.co/api/v2/pokemon/snorlax | jq '.base_experience'" --name "SnorlaxXP"`
Explanation for the Developer about this "Ultimate Showcase" Flow:
- Kitchen Sink Approach: This flow is intentionally dense with features. In a real-world production flow, you'd likely use a subset of these, focusing on what's most impactful for that specific workflow.
- Comments as Education: The
# PREFECT FEATURE:
comments are crucial. They explain what Prefect feature is being used and why it's beneficial in that context. - Pydantic Parameters: Using a Pydantic model (
PokeApiShowcaseParams
) for flow parameters demonstrates type validation and structured inputs. - Task Runners: Explicitly setting
ThreadPoolTaskRunner
showcases this configuration point. The comments mention Dask/Ray for further exploration. - Client Usage: The
async with get_client() as client:
pattern is shown for direct API interactions if needed. - Comprehensive Artifacts: All five artifact types are created to show rich UI output.
- Interactive Elements:
pause_flow_run
andreceive_input
showcase human-in-the-loop and dynamic control capabilities. - Transaction: A sequence of spotlight Pokémon processing steps is wrapped in a transaction to demonstrate atomicity concept.
- Events:
emit_event
demonstrates Prefect's capability to integrate into larger event-driven systems. - State Management & Hooks: Explicit state returns and flow-level
on_completion
/on_failure
hooks are demonstrated. - Logging & Settings: These are implicitly active and mentioned for context.
5. Deploying the PokeAPI Showcase Flow: Covering All Deployment Angles
Prefect V3 offers flexible deployment options. This section shows how to deploy the ultimate_pokeapi_showcase_flow
.
5.1. Option A: Quick Local Serving with flow.serve()
This is what the if __name__ == "__main__":
block in flows/pokeapi_showcase_flow.py
already does.
-
How:
# flows/pokeapi_showcase_flow.py (in __main__) ultimate_pokeapi_showcase_flow.serve( name="showcase-local-serve-deployment", parameters=default_params.dict(), # From Pydantic model cron="0 10 * * *" # Example schedule )
To Run:
python flows/pokeapi_showcase_flow.py
-
What Happens:
- A Prefect
Deployment
object is created/updated in your Prefect backend. - A long-running Python process starts on your local machine.
- This process polls the Prefect API for scheduled runs of this deployment (or ad-hoc runs triggered via UI/CLI).
- When a run is due, it executes the flow in a subprocess on the same machine.
- A Prefect
PREFECT FEATURE:
flow.serve()
,Deployment
(server-side object), Scheduling.Why Prefect? Extremely simple way to get a flow scheduled and managed by Prefect for local development or simple, single-machine setups.
5.2. Option B: Robust Python SDK Deployment with flow.deploy()
& Docker
This approach is for more production-like scenarios, especially when using containerized environments managed by workers.
-
Create a deployment script, e.g.,
deployment_scripts/deploy_pokeapi_showcase.py
:
# deployment_scripts/deploy_pokeapi_showcase.py from flows.pokeapi_showcase_flow import ultimate_pokeapi_showcase_flow, PokeApiShowcaseParams from prefect.docker import DockerImage # PREFECT FEATURE: DockerImage for defining image build # from prefect_aws.s3 import S3Bucket # If using S3 for remote code storage if __name__ == "__main__": default_params = PokeApiShowcaseParams( pokemon_to_spotlight="mewtwo", enable_interactive_pause=False # Often disable for automated prod runs ) # PREFECT FEATURE: `flow.deploy()` - programmatic deployment creation ultimate_pokeapi_showcase_flow.deploy( name="poke-showcase-docker-py-deploy", # Deployment name work_pool_name="my-docker-pool", # Assumes a Docker work pool exists # PREFECT FEATURE: Building Docker image as part of deployment image=DockerImage( name="your-registry/pokeapi-showcase-app", # Replace with your Docker Hub/ECR/GCR tag="latest", dockerfile="Dockerfile", # Assumes Dockerfile in project root # buildargs={"SOME_BUILD_ARG": "value"}, # If Dockerfile needs build args # platform="linux/amd64", # If building for specific platform ), push=False, # Set to True to push to `your-registry` # PREFECT FEATURE: Parameters defined with the deployment parameters=default_params.dict(), # PREFECT FEATURE: Schedule on deployment cron="0 12 * * 1-5", # Weekdays at 12:00 UTC # PREFECT FEATURE: Job Variables for work pool customization job_variables={ "env": {"SHOWCASE_RUN_MODE": "docker_deployed", "PREFECT_LOGGING_LEVEL": "DEBUG"}, "image_pull_policy": "Always" # Example for Docker work pool }, tags=["pokeapi", "showcase", "docker", "python-sdk"], version="showcase-v1.py" ) print("Deployment 'poke-showcase-docker-py-deploy' created/updated.") print("Ensure 'my-docker-pool' work pool exists and a worker is started:") print(" `prefect work-pool create my-docker-pool --type docker`") print(" `prefect worker start -p my-docker-pool`")
Run this script:
python deployment_scripts/deploy_pokeapi_showcase.py
PREFECT FEATURE:
flow.deploy()
,DockerImage
,work_pool_name
,job_variables
.-
What happens:
- Prefect builds the Docker image specified (if
image
is aDockerImage
object and not just a string name of a pre-built image). - Optionally pushes it to a registry.
- Creates/updates a
Deployment
in Prefect backend, linking it to the work pool and specifying the image to use for flow runs. - A
Prefect Worker
pollingmy-docker-pool
will pick up scheduled/triggered runs and execute them by launching a new Docker container from this image.
- Prefect builds the Docker image specified (if
5.3. Option C: Declarative Deployments with prefect.yaml
This is for managing deployments as version-controlled configuration files, often integrated with CI/CD.
- Ensure your project root has
prefect.yaml
. If not, runprefect init
(you can select 'none' or a generic recipe). -
Add/Modify the
deployments
section inprefect.yaml
:
# prefect.yaml (add this to the existing file or a new one) # ... (build, push, pull sections can be global or per-deployment) # Global build example (if not already present from previous exercises): # build: # - prefect_docker.deployments.steps.build_docker_image: # id: build-showcase-image # requires: prefect-docker # image_name: your-registry/pokeapi-showcase-app-yaml # Use a distinct name # tag: "latest-yaml" # dockerfile: "Dockerfile" # Assumes Dockerfile at root # push_on_build: false # Only build, separate push step if needed # Global push example: # push: # - prefect_docker.deployments.steps.push_docker_image: # requires: prefect-docker # image_name: "{{ build-showcase-image.image_name }}" # tag: "{{ build-showcase-image.tag }}" deployments: - name: "poke-showcase-yaml-declarative" # Deployment Name # PREFECT FEATURE: `entrypoint` in prefect.yaml entrypoint: "flows/pokeapi_showcase_flow.py:ultimate_pokeapi_showcase_flow" description: "PokeAPI Showcase deployed declaratively via prefect.yaml." version: "showcase-v1.yaml" tags: ["pokeapi", "showcase", "docker", "yaml"] # PREFECT FEATURE: `parameters` in prefect.yaml (JSON/YAML format) parameters: params: # Name of the Pydantic model parameter in the flow function source_name: "pokeapi" pokemon_to_spotlight: "ditto" max_pokemon_to_discover_in_showcase: 10 enable_interactive_pause: false force_detail_refresh: true # PREFECT FEATURE: `schedule` in prefect.yaml (multiple schedule types can be here) schedules: # Note: this is a list - cron: "30 10 * * MON-FRI" # Weekdays at 10:30 UTC timezone: "America/New_York" # PREFECT FEATURE: Timezone aware schedules active: true # PREFECT FEATURE: Active/Inactive schedules # PREFECT FEATURE: Schedule with parameters (overrides deployment params) # parameters: # params: # pokemon_to_spotlight: "snorlax" # This schedule runs with snorlax - interval: 7200 # Every 2 hours anchor_date: "2024-01-01T08:00:00Z" # PREFECT FEATURE: `slug` for named schedules for easier management (optional) slug: "poke-showcase-interval-every-2h" # Example of RRule # - rrule: "FREQ=DAILY;BYHOUR=15;BYMINUTE=0;UNTIL=20251231T235959Z" # description: "Daily at 3 PM UTC until end of 2025" # PREFECT FEATURE: `work_pool` configuration in prefect.yaml work_pool: name: "my-docker-pool" # Matches a pre-existing Docker work pool # work_queue_name: "high-priority" # Optional: if using queues in work pool # PREFECT FEATURE: `job_variables` in prefect.yaml (infra overrides) job_variables: # Assumes build-showcase-image step exists in global 'build' section image: "{{ build-showcase-image.image if build-showcase-image else 'your-registry/pokeapi-showcase-app-yaml:latest-yaml' }}" env: PREFECT_LOGGING_LEVEL: "INFO" SHOWCASE_SOURCE_FILE: "prefect.yaml" # PREFECT FEATURE: Deployment specific build/push/pull overrides # If this deployment needed a different Docker image or pull step than global # build: # - prefect_docker.deployments.steps.build_docker_image: # image_name: "your-registry/pokeapi-showcase-special" # # ... other build params # pull: # - prefect.deployments.steps.git_clone: # repository: "https://github.com/my_org/special_poke_code.git" # branch: "feature-branch"
-
Apply the deployment:
prefect deploy --name poke-showcase-yaml-declarative # Or if the build section is defined and you want to build first: # prefect deploy --name poke-showcase-yaml-declarative --build # (The --build flag runs the 'build' steps before creating/updating the deployment)
PREFECT FEATURE:
prefect.yaml
,prefect deploy
CLI, declarative deployment definitions, templating ({{ }}
), multiple schedule types (cron
,interval
,rrule
).-
What happens:
- If
--build
, executesbuild
steps (e.g., builds Docker image). - Executes
push
steps if defined (e.g., pushes image). - Creates/updates the
Deployment
object in Prefect backend based on YAML, including schedule(s), work pool linkage, and job variable overrides (like the Docker image name).pull
steps from YAML are stored on the deployment for workers to use.
- If
5.4. Understanding Code Storage: Bake vs. Pull
- Baked-in (Default for
flow.deploy
withDockerImage
/prefect.yaml
with Docker build):- The flow code (e.g.,
pokeapi_showcase_flow.py
,tasks.py
,crawlers/
) is COPIED into the Docker image during the build process (e.g., byCOPY . .
inDockerfile
or auto-generated Dockerfile). - PREFECT FEATURE: When the worker starts a container for a flow run, the code is already inside. No
pull
step is strictly necessary inprefect.yaml
for the code itself if it's baked in, thoughpull
can still be used for other setup.
- The flow code (e.g.,
-
Pulled at Runtime (Remote Storage):
- You can store your code in Git, S3, GCS, etc.
- The Docker image used by the worker might be more generic (e.g., just
prefecthq/prefect:3-python3.11
). - The
Deployment
object in Prefect will havepull_steps
defined (either from globalprefect.yaml
pull
or deployment-specificpull
). - PREFECT FEATURE:
pull_steps
(e.g.,prefect.deployments.steps.git_clone
orprefect_aws.deployments.steps.pull_from_s3
). The worker executes these steps inside the container before running the flow to fetch the latest code. -
To use this with
flow.deploy()
:
# flow.deploy(..., image="prefecthq/prefect:3-python3.11", # generic image # pull_steps=[{"prefect.deployments.steps.git_clone": {"repository": "url"}}]) # or if using `flow.from_source()` it often handles this implicitly or via work pool default pull step.
5.5. Work Pools & Workers: The Dynamic Infrastructure Duo
- Work Pools (e.g.,
my-docker-pool
of typedocker
):- Created via UI or
prefect work-pool create my-docker-pool --type docker
. - Define the type of infrastructure (Docker, K8s, ECS) and a base job template.
- PREFECT FEATURE: Work Pools act as a bridge. They define what kind of job to run and offer default configurations.
job_variables
on a deployment override these defaults.
- Created via UI or
- Workers (e.g.,
prefect worker start -p my-docker-pool
):- Lightweight polling services. A Docker worker knows how to talk to the Docker daemon to start containers.
- They poll a specific work pool for
Scheduled
flow runs. - When a run is found, the worker:
- Uses the deployment's job info (e.g., Docker image, command, environment variables).
- Executes
pull_steps
if defined on the deployment. - Submits the job to the infrastructure (e.g.,
docker run ...
). - Monitors the job and reports status back to Prefect API.
- PREFECT FEATURE: Workers decouple orchestration (Prefect Server/Cloud) from execution (your infrastructure). You can have many workers for a pool for HA/scaling.
5.6. Schedules: Cron, Interval, RRule, and Parameters
As shown in prefect.yaml
example:
- Multiple Schedules: A single deployment can have multiple active schedules of different types.
- Timezones: Schedules are timezone-aware (
timezone: "America/New_York"
). - Parameterized Schedules (Advanced): You can define different default parameters for flow runs generated by different schedules on the same deployment. (Shown commented out in
prefect.yaml
.) - PREFECT FEATURE: Flexible scheduling primitives (
CronSchedule
,IntervalSchedule
,RRuleSchedule
objects are created under the hood, or configured via simple strings/dicts in YAML/Python deployment methods).
5.7. Deployment Versioning (Automatic with Prefect Cloud)
- Whenever you update a deployment (e.g., re-run
flow.deploy()
orprefect deploy
), Prefect Cloud keeps track of versions. This helps in auditing changes and potentially rolling back. - For self-hosted, versioning is primarily what you manage via the
version
string in your deployment definitions and your Git history. - PREFECT FEATURE: Prefect tracks
flow_id
anddeployment_id
. Changes to flow code create new flow versions implicitly.Deployment
objects link to a specificflow_id
.
5.8. Cancelling and Pausing/Suspending Deployed Runs
- Cancellation:
- Once a flow run from a deployment is
Running
(orScheduled
,Pending
), you can cancel it:- UI: Go to Flow Run page -> "Cancel" button.
- CLI:
prefect flow-run cancel
- PREFECT FEATURE: The worker polling the work pool detects the "Cancelling" state and attempts to terminate the underlying infrastructure job (e.g.,
docker stop
).
- Once a flow run from a deployment is
- Pause/Suspend (From within the flow, as shown in Sec 4.7):
-
pause_flow_run
: Infrastructure keeps running (e.g., Docker container stays alive, polling). Good for short interactive steps. -
suspend_flow_run
: Infrastructure is released. Flow restarts from beginning on resume (relies on task caching for efficiency). Good for long human-in-the-loop waits.
-
6. Event-Driven Triggers & Automations with Prefect
- As shown in
ultimate_pokeapi_showcase_flow
(Section 4.1),emit_event(...)
can be called. - Prefect Cloud Automations / Open Source Event Handling:
- In Prefect Cloud, you can create Automations:
- Trigger: On specific event name/resource (e.g.,
showcase.pokemon.processed
). - Action: Run a deployment (e.g., a reporting flow, or another stage of processing), send a notification.
- Trigger: On specific event name/resource (e.g.,
- In OSS Prefect, this "event system" is now available, meaning your flows can react to events emitted by other flows or external systems if you build the listening/triggering mechanism.
- In Prefect Cloud, you can create Automations:
- PREFECT FEATURE: Events & Automations provide a powerful way to build reactive, decoupled systems.
7. Testing Your Prefect Workflows
-
The
prefect_test_harness
utility is key for unit testing.
# test_pokeapi_flows.py (example snippet) from prefect.testing.utilities import prefect_test_harness from flows.pokeapi_showcase_flow import ultimate_pokeapi_showcase_flow, PokeApiShowcaseParams def test_ultimate_showcase_basic_run(): with prefect_test_harness(): # Sets up temp DB, isolated testing environment # For flows with complex inputs or many side effects, # you'd mock API calls (e.g., using pytest-httpx) and Block loads. test_params = PokeApiShowcaseParams( pokemon_to_spotlight="bulbasaur", max_pokemon_to_discover_in_showcase=3, enable_interactive_pause=False # Crucial for non-interactive tests ) # For async flows, you'd need an event loop runner like asyncio.run or pytest-asyncio # For simplicity here, imagine it was a sync flow or this was in an async test: # result_state = ultimate_pokeapi_showcase_flow(params=test_params, return_state=True) # assert result_state.is_completed() # For async in sync test (less ideal, but for concept): # import asyncio # result = asyncio.run(ultimate_pokeapi_showcase_flow(params=test_params)) # blocks until complete # assert "bulbasaur" in result.result.get("pokemon_spotlighted")
PREFECT FEATURE:
prefect_test_harness
provides an isolated Prefect runtime for testing flows and tasks as if they were running against a real backend, but using a temporary database.
8. Exploring with the Prefect CLI
The Prefect CLI is indispensable for managing and interacting with your Prefect setup. Many commands have been mentioned, but here's a consolidated list relevant to this PokeAPI integration:
- Login/Server:
prefect cloud login
,prefect server start
,prefect dev start
(for local UI, API, and worker). - Configuration:
prefect config set ...
,prefect config view
,prefect profile ...
. - Blocks:
prefect block ls
,prefect block type ls
,prefect block register -f project_blocks.py
,prefect block inspect secret/pokeapi-no-key-dummy
. - GCLs:
prefect gcl create ...
,prefect gcl ls
,prefect gcl inspect ...
. - Work Pools:
prefect work-pool create ...
,prefect work-pool ls
,prefect work-pool inspect ...
. - Workers:
prefect worker start -p my-docker-pool
. - Deployments (created by scripts or YAML):
prefect deployment ls
,prefect deployment inspect "Flow Name/Deployment Name"
,prefect deployment run "Flow Name/Deployment Name" --param 'params={"pokemon_to_spotlight": "charmander"}'
.- Applying YAML:
prefect deploy -f prefect.yaml --name poke-showcase-yaml-declarative
- Applying YAML:
- Flow Runs:
prefect flow-run ls
,prefect flow-run logs
,prefect flow-run inspect
,prefect flow-run cancel
. - Shell Integration:
prefect shell watch "curl -s https://pokeapi.co/api/v2/pokemon/mew | jq .id"
(get Mew's ID),prefect shell serve ...
(to quickly deploy a shell command as a flow). - Variables:
prefect variable set MY_VAR "value"
,prefect variable get MY_VAR
. - Artifacts:
prefect artifact ls
,prefect artifact inspect
.
9. Prefect Server/Cloud and Final Thoughts
- Prefect Server (Self-Hosted): Runs the API, UI, and scheduler. Requires a database (SQLite for dev, PostgreSQL for production). You manage its uptime and resources. The
prefect server start
CLI command is your entry point. Key settings likePREFECT_API_DATABASE_CONNECTION_URL
are crucial. - Prefect Cloud: A fully managed version of Prefect server with additional enterprise features (RBAC, SSO, audit logs, workspaces, managed execution options). You connect via
PREFECT_API_URL
andPREFECT_API_KEY
. - Workspaces (Prefect Cloud): Allows you to isolate environments (dev, staging, prod) or teams within a single Prefect Cloud account. Each workspace has its own set of flows, deployments, blocks, etc.
By leveraging this comprehensive suite of Prefect V3 features as demonstrated with the PokeAPI example, you can build incredibly powerful, dynamic, resilient, and observable data applications. The key is understanding how these features interlock and how to apply them to your specific use case. This tutorial aims to provide that deep understanding for the PokeAPI integration, which can then be generalized to other data sources and complex workflows.