反反爬浏览器自动化patchright

import asyncio import json import re import logging from enum import Enum from datetime import datetime from typing import Optional from traceback import format_exc # 用patchright替换playwright from patchright.async_api import async_playwright, Frame from patchright.async_api import Error as PlaywrightError logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('GMGN Holders Tag') class ChallengePlatform(Enum): """Cloudflare challenge platform types.""" JAVASCRIPT = "non-interactive" MANAGED = "managed" INTERACTIVE = "interactive" class PumpRanks: spider_name = 'gmgn_tags' def __init__(self): self._timeout = 30 async def on_response(self, response): """ 拦截响应 数据结构 gmgn.json """ if not response.ok: return if '/v1/rank/sol/pump_ranks/1h' in response.url: oridata = await response.body() format_data = json.loads(oridata) data = format_data['data'] completeds = data['completeds'] for c in completeds: print(c) def _get_turnstile_frame(self, page) -> Optional[Frame]: """ Get the Cloudflare turnstile frame. Returns ------- Optional[Frame] The Cloudflare turnstile frame. """ frame = page.frame( url=re.compile( "https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile" ), ) return frame async def cookies(self, page) -> Optional[str]: """The cookies from the current page.""" cookies = await page.context.cookies() if not cookies: return None for cookie in cookies: if cookie["name"] == "cf_clearance": return cookie["value"] return None async def detect_challenge(self, page) -> Optional[str]: """ Detect the Cloudflare challenge platform on the current page. Returns ------- Optional[ChallengePlatform] The Cloudflare challenge platform. """ html = await page.content() for platform in ChallengePlatform: if f"cType: '{platform.value}'" in html: return platform.value return None async def solve_challenge(self, page) -> None: """Solve the Cloudflare challenge on the current page.""" verify_button_pattern = re.compile( "Verify (I am|you are) (not a bot|(a )?human)" ) verify_button = page.get_by_role("button", name=verify_button_pattern) challenge_spinner = page.locator("#challenge-spinner") challenge_stage = page.locator("#challenge-stage") start_timestamp = datetime.now() cookies = await self.cookies(page) challenge_type = await self.detect_challenge(page) while ( cookies is None and challenge_type is not None and (datetime.now() - start_timestamp).seconds

Apr 3, 2025 - 02:47
 0
反反爬浏览器自动化patchright
import asyncio
import json
import re
import logging
from enum import Enum
from datetime import datetime
from typing import Optional
from traceback import format_exc
# 用patchright替换playwright
from patchright.async_api import async_playwright, Frame
from patchright.async_api import Error as PlaywrightError

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('GMGN Holders Tag')

class ChallengePlatform(Enum):
    """Cloudflare challenge platform types."""

    JAVASCRIPT = "non-interactive"
    MANAGED = "managed"
    INTERACTIVE = "interactive"

class PumpRanks:
    spider_name = 'gmgn_tags'
    def __init__(self):
        self._timeout = 30

    async def on_response(self, response):
        """
        拦截响应
        数据结构 gmgn.json
        """
        if not response.ok:
            return
        if '/v1/rank/sol/pump_ranks/1h' in response.url:
            oridata = await response.body()
            format_data = json.loads(oridata)
            data = format_data['data']
            completeds = data['completeds']
            for c in completeds:
                print(c)

    def _get_turnstile_frame(self, page) -> Optional[Frame]:
            """
            Get the Cloudflare turnstile frame.

            Returns
            -------
            Optional[Frame]
                The Cloudflare turnstile frame.
            """
            frame = page.frame(
                url=re.compile(
                    "https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile"
                ),
            )
            return frame

    async def cookies(self, page) -> Optional[str]:
        """The cookies from the current page."""
        cookies = await page.context.cookies()
        if not cookies:
            return None
        for cookie in cookies:
            if cookie["name"] == "cf_clearance":
                return cookie["value"]
        return None

    async def detect_challenge(self, page) -> Optional[str]:
        """
        Detect the Cloudflare challenge platform on the current page.

        Returns
        -------
        Optional[ChallengePlatform]
            The Cloudflare challenge platform.
        """
        html = await page.content()
        for platform in ChallengePlatform:
            if f"cType: '{platform.value}'" in html:
                return platform.value
        return None

    async def solve_challenge(self, page) -> None:
        """Solve the Cloudflare challenge on the current page."""
        verify_button_pattern = re.compile(
            "Verify (I am|you are) (not a bot|(a )?human)"
        )

        verify_button = page.get_by_role("button", name=verify_button_pattern)
        challenge_spinner = page.locator("#challenge-spinner")
        challenge_stage = page.locator("#challenge-stage")
        start_timestamp = datetime.now()

        cookies = await self.cookies(page)
        challenge_type = await self.detect_challenge(page)
        while (
            cookies is None
            and challenge_type is not None
            and (datetime.now() - start_timestamp).seconds < self._timeout
        ):
            if await challenge_spinner.is_visible():
                await challenge_spinner.wait_for(state="hidden")

            turnstile_frame = self._get_turnstile_frame(page)

            if await verify_button.is_visible():
                await verify_button.click()
                await challenge_stage.wait_for(state="hidden")
            elif turnstile_frame is not None:
                await page.mouse.click(210, 290)
                await challenge_stage.wait_for(state="hidden")

            await page.wait_for_timeout(250)

    async def detect(self, page):
        """
        破解CloudFlare
        """
        clearance_cookie = await self.cookies(page)
        if clearance_cookie is None:
            challenge_platform = await self.detect_challenge(page)

            if challenge_platform is None:
                logging.error("No Cloudflare challenge detected.")
                return
            logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...")

            try:
                await self.solve_challenge(page)
            except PlaywrightError as err:
                logging.error(err)

    async def run_local(self, proxy=None):
        async with async_playwright() as p:
            # 必须得是有头浏览器,否则过不了Cloudflare
            launch_data = {
                "headless": False,
                "proxy": proxy
            }

            user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
            browser = await p.chromium.launch(**launch_data)
            context = await browser.new_context(user_agent=user_agent)
            timeout = 30
            context.set_default_timeout(timeout * 1000)
            page = await context.new_page()
            # 监听请求流
            page.on('response', self.on_response)

            url = 'https://gmgn.ai/meme/LXtw30LM?chain=sol'
            # 访问目标地址
            await page.goto(url)
            await asyncio.sleep(10)
            await self.detect(page)
            await  page.wait_for_function("() => window.x > 0", timeout=0)

    async def run_aws(self):
        """
        在AWS服务器启动
        """
        # proxy = self.proxy
        from pyvirtualdisplay import Display
        with Display():
            try:
                await self.run_local()
            except:
                logger.error(f'浏览器异常:{format_exc()}')


    def task(self):
        if env == 'local':
            asyncio.run(self.run_local())
        else:
            asyncio.run(self.run_aws())

    def run(self):
        self.task()

if __name__ == '__main__':
    env = 'prod'
    PumpRanks().run()