added default logger

This commit is contained in:
Thephaseless 2024-07-24 15:16:46 +00:00
parent b148fcc669
commit d0034e723b
6 changed files with 44 additions and 25 deletions

View File

@ -10,8 +10,8 @@ from fastapi.responses import RedirectResponse
from src.models.requests import LinkRequest, LinkResponse from src.models.requests import LinkRequest, LinkResponse
from src.utils import logger from src.utils import logger
from src.utils.browser import bypass_cloudflare, new_browser
from src.utils.extentions import download_extentions from src.utils.extentions import download_extentions
from src.utils.misc import bypass_cloudflare, new_browser
download_extentions() download_extentions()
app = FastAPI(debug=True) app = FastAPI(debug=True)
@ -35,10 +35,8 @@ async def read_item(request: LinkRequest):
challenged = await asyncio.wait_for( challenged = await asyncio.wait_for(
bypass_cloudflare(page), timeout=request.maxTimeout bypass_cloudflare(page), timeout=request.maxTimeout
) )
if challenged:
logger.info("Challenged") logger.info(f"Got webpage: {request.url}")
else:
logger.info("Not challenged")
response = await LinkResponse.create( response = await LinkResponse.create(
page=page, page=page,

View File

@ -5,6 +5,7 @@ import time
from typing import Any from typing import Any
from nodriver import Tab from nodriver import Tab
from nodriver.cdp import storage
from pydantic import BaseModel from pydantic import BaseModel
@ -55,11 +56,13 @@ class LinkResponse(BaseModel):
# # Convert cookies to json # # Convert cookies to json
# cookies = [cookie.to_json() for cookie in cookies] # cookies = [cookie.to_json() for cookie in cookies]
cookies = [] cookies = storage.get_cookies()
cookies = await page.send(cookies)
solution = Solution( solution = Solution(
url=page.url, url=page.url,
status=200, status=200,
cookies=cookies, cookies=cookies if cookies else [],
userAgent=user_agent, userAgent=user_agent,
headers={}, headers={},
response=await page.get_content(), response=await page.get_content(),
@ -70,3 +73,7 @@ class LinkResponse(BaseModel):
solution=solution, solution=solution,
startTimestamp=start_timestamp, startTimestamp=start_timestamp,
) )
class NoChromeExtentionError(Exception):
"""No chrome extention found."""

View File

@ -2,12 +2,14 @@ import nodriver as webdriver
from nodriver.core.element import Element from nodriver.core.element import Element
from src.utils import logger from src.utils import logger
from src.utils.extentions import downloaded_extentions
from src.utils.utils import CHALLENGE_TITLES from src.utils.utils import CHALLENGE_TITLES
async def new_browser(): async def new_browser():
config: webdriver.Config = webdriver.Config() config: webdriver.Config = webdriver.Config()
config.sandbox = False config.sandbox = False
config.add_argument(f"--load-extension={','.join(downloaded_extentions)}")
return await webdriver.start(config=config) return await webdriver.start(config=config)
@ -15,13 +17,13 @@ async def new_browser():
async def bypass_cloudflare(page: webdriver.Tab): async def bypass_cloudflare(page: webdriver.Tab):
challenged = False challenged = False
while True: while True:
logger.info("Bypassing cloudflare")
await page await page
await page.wait(0.5) await page.wait(0.5)
if page.target.title not in CHALLENGE_TITLES: if page.target.title not in CHALLENGE_TITLES:
return challenged return challenged
challenged = True if not challenged:
logger.info("Found challenge")
challenged = True
elem = await page.query_selector(".cf-turnstile-wrapper") elem = await page.query_selector(".cf-turnstile-wrapper")
if isinstance(elem, Element): if isinstance(elem, Element):
logger.info(f"Clicking element: {elem}")
await elem.mouse_click() await elem.mouse_click()

View File

@ -11,3 +11,5 @@ EXTENTION_REPOSITIORIES = [
] ]
SLEEP_SECONDS = 1 SLEEP_SECONDS = 1
EXTENTIONS_PATH = "./.extentions"

View File

@ -1,12 +1,15 @@
from __future__ import annotations from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
from zipfile import ZipFile from zipfile import ZipFile
import httpx import httpx
from src.models.github import GithubResponse from src.models.github import GithubResponse
from src.utils.consts import EXTENTION_REPOSITIORIES from src.models.requests import NoChromeExtentionError
from src.utils import logger
from src.utils.consts import EXTENTION_REPOSITIORIES, EXTENTIONS_PATH
downloaded_extentions: list[str] = [] # Do not modify, use download_extentions() downloaded_extentions: list[str] = [] # Do not modify, use download_extentions()
@ -15,27 +18,39 @@ def get_latest_github_release(url: str):
"""Get the latest release from GitHub.""" """Get the latest release from GitHub."""
url = "https://api.github.com/repos/" + url url = "https://api.github.com/repos/" + url
url += "/releases/latest" url += "/releases/latest"
response = GithubResponse.model_validate_json(httpx.get(url).text)
response = httpx.get(url)
if response.status_code == httpx.codes.FORBIDDEN:
error = json.loads(response.text)["message"]
logger.error(error)
raise httpx.NetworkError(error)
response = GithubResponse(**response.json())
for asset in response.assets: for asset in response.assets:
if "chrom" in asset.name: if "chrom" in asset.name:
return asset return asset
raise ValueError("Couldn't find chrome verions of the release") raise NoChromeExtentionError
def download_extentions(): def download_extentions():
"""Download the extention.""" """Download the extention."""
for repository in EXTENTION_REPOSITIORIES: for repository in EXTENTION_REPOSITIORIES:
extention = get_latest_github_release(repository)
extention_name = repository.split("/")[-1] extention_name = repository.split("/")[-1]
path = Path(f"{EXTENTIONS_PATH}/{extention_name}")
try:
extention = get_latest_github_release(repository)
except httpx.NetworkError:
if path.is_dir():
downloaded_extentions.append(path.as_posix())
continue
response = httpx.get(extention.browser_download_url) response = httpx.get(extention.browser_download_url)
Path("extentions").mkdir(exist_ok=True) Path("{EXTENTIONS_PATH}").mkdir(exist_ok=True)
if not Path(f"extentions/{extention.name}").exists(): if not Path(f"{EXTENTIONS_PATH}/{extention.name}").is_file():
with Path(f"extentions/{extention.name}").open("wb") as f: with Path(f"{EXTENTIONS_PATH}/{extention.name}").open("wb") as f:
f.write(response.content) f.write(response.content)
with ZipFile(f"extentions/{extention.name}", "r") as zip_obj: with ZipFile(f"{EXTENTIONS_PATH}/{extention.name}", "r") as zip_obj:
zip_obj.extractall(f"extentions/{extention_name}") zip_obj.extractall(f"{EXTENTIONS_PATH}/{extention_name}")
downloaded_extentions.append(f"extentions/{extention_name}") downloaded_extentions.append(path.as_posix())

View File

@ -1,4 +1,3 @@
import logging
from venv import logger from venv import logger
import nodriver as webdriver import nodriver as webdriver
@ -7,10 +6,6 @@ from nodriver.core.element import Element
from src.utils.consts import CHALLENGE_TITLES from src.utils.consts import CHALLENGE_TITLES
def get_logger():
return logging.getLogger("uvicorn.error")
async def new_browser(): async def new_browser():
config: webdriver.Config = webdriver.Config() config: webdriver.Config = webdriver.Config()
config.sandbox = False config.sandbox = False