Ajusta scheduler

This commit is contained in:
LeoMortari
2025-12-05 17:45:13 -03:00
parent d5eebb5cc4
commit b996a81a01
3 changed files with 167 additions and 29 deletions

View File

@@ -25,10 +25,6 @@ class Settings(BaseSettings):
scraping_delay: float = Field(default=2.0, alias="SCRAPING_DELAY")
max_retries: int = Field(default=3, alias="MAX_RETRIES")
# Scheduling settings
schedule_hour_start: int = Field(default=2, alias="SCHEDULE_HOUR_START")
schedule_hour_end: int = Field(default=4, alias="SCHEDULE_HOUR_END")
# File paths
proxies_file: str = Field(default="/app/proxies.txt", alias="PROXIES_FILE")

View File

@@ -286,6 +286,67 @@ class DatabaseManager:
logger.error(f"Error updating proxy status: {e}")
return False
def get_all_proxies(self) -> List[Dict[str, Any]]:
"""
Get all proxies from the database.
Returns:
List of proxy dictionaries
"""
query = """
SELECT id, ip_address, port, protocol, username, password,
country_code, country_name, city, is_active, is_anonymous,
response_time_ms, last_checked_at, last_successful_at,
success_count, failure_count, source, notes,
created_at, updated_at
FROM proxies
ORDER BY created_at DESC;
"""
try:
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query)
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Error getting all proxies: {e}")
return []
def delete_proxy(
self, ip_address: str, port: int, protocol: str
) -> bool:
"""
Delete a proxy from the database.
Args:
ip_address: Proxy IP address
port: Proxy port
protocol: Proxy protocol
Returns:
True if deletion successful, False otherwise
"""
query = """
DELETE FROM proxies
WHERE ip_address = %s AND port = %s AND protocol = %s
RETURNING id;
"""
try:
with self.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, (ip_address, port, protocol.upper()))
result = cursor.fetchone()
if result:
logger.info(
f"Deleted proxy: {ip_address}:{port} ({protocol})"
)
return True
return False
except Exception as e:
logger.error(f"Error deleting proxy: {e}")
return False
def get_stats(self) -> Dict[str, Any]:
"""
Get database statistics.

View File

@@ -3,8 +3,7 @@ Main application service for proxy scraping, validation, and storage.
"""
import logging
import sys
import random
from datetime import datetime, time as dt_time
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz
@@ -119,6 +118,76 @@ class ProxyScrapingService:
logger.info(f"Total duration: {duration:.2f} seconds")
logger.info("=" * 80)
def run_validation_job(self):
"""Execute validation of existing proxies in database."""
job_start = datetime.now()
logger.info("=" * 80)
logger.info(f"Starting proxy validation job at {job_start}")
logger.info("=" * 80)
try:
# Step 1: Get all proxies from database
logger.info("Step 1: Fetching all proxies from database...")
all_proxies = self.db.get_all_proxies()
if not all_proxies:
logger.warning("No proxies found in database to validate")
return
logger.info(f"Found {len(all_proxies)} proxies to validate")
# Step 2: Validate each proxy
logger.info("Step 2: Validating proxies...")
validated_count = 0
deleted_count = 0
for proxy in all_proxies:
ip_address = proxy["ip_address"]
port = proxy["port"]
protocol = proxy["protocol"]
# Validate the proxy
is_active, response_time_ms, is_anonymous = self.validator.validate_proxy(
ip_address, port, protocol
)
if is_active and response_time_ms is not None:
# Proxy is working - update status in database
if self.db.update_proxy_status(
ip_address, port, protocol, True, response_time_ms
):
validated_count += 1
logger.info(
f"Proxy {ip_address}:{port} validated successfully - {response_time_ms}ms"
)
else:
# Proxy failed - delete from database
if self.db.delete_proxy(ip_address, port, protocol):
deleted_count += 1
logger.info(
f"Proxy {ip_address}:{port} failed validation - removed from database"
)
logger.info(
f"Validation complete: {validated_count} proxies validated, "
f"{deleted_count} proxies removed"
)
# Step 3: Display statistics
logger.info("Step 3: Database statistics...")
stats = self.db.get_stats()
self._display_stats(stats)
except Exception as e:
logger.error(f"Error during validation job: {e}", exc_info=True)
finally:
job_end = datetime.now()
duration = (job_end - job_start).total_seconds()
logger.info("=" * 80)
logger.info(f"Validation job completed at {job_end}")
logger.info(f"Total duration: {duration:.2f} seconds")
logger.info("=" * 80)
def _deduplicate_proxies(self, proxies: list) -> list:
"""
Remove duplicate proxies based on IP:PORT:PROTOCOL.
@@ -162,28 +231,44 @@ class ProxyScrapingService:
if avg_response:
logger.info(f" Avg Response Time: {avg_response:.2f}ms")
def schedule_daily_job(self):
"""Schedule the scraping job to run once daily between configured hours."""
# Generate random time between start and end hour
random_hour = random.randint(
settings.schedule_hour_start, settings.schedule_hour_end - 1
def schedule_jobs(self):
"""Schedule all proxy jobs: validation at 9h and 16h, scraping at 2h."""
# Validation job at 9:00 AM UTC
logger.info("Scheduling validation job at 09:00 UTC")
validation_trigger_9h = CronTrigger(
hour=9, minute=0, timezone=pytz.UTC
)
random_minute = random.randint(0, 59)
logger.info(
f"Scheduling daily scraping job at {random_hour:02d}:{random_minute:02d} UTC"
self.scheduler.add_job(
self.run_validation_job,
trigger=validation_trigger_9h,
id="validation_9am",
name="Proxy Validation Job (9AM)",
replace_existing=True,
)
# Create cron trigger for daily execution
trigger = CronTrigger(
hour=random_hour, minute=random_minute, timezone=pytz.UTC
# Validation job at 4:00 PM UTC (16:00)
logger.info("Scheduling validation job at 16:00 UTC")
validation_trigger_16h = CronTrigger(
hour=16, minute=0, timezone=pytz.UTC
)
self.scheduler.add_job(
self.run_validation_job,
trigger=validation_trigger_16h,
id="validation_4pm",
name="Proxy Validation Job (4PM)",
replace_existing=True,
)
# Scraping job at 2:00 AM UTC
logger.info("Scheduling scraping job at 02:00 UTC")
scraping_trigger = CronTrigger(
hour=2, minute=0, timezone=pytz.UTC
)
self.scheduler.add_job(
self.run_scraping_job,
trigger=trigger,
id="daily_proxy_scraping",
name="Daily Proxy Scraping Job",
trigger=scraping_trigger,
id="scraping_2am",
name="Proxy Scraping Job (2AM)",
replace_existing=True,
)
@@ -195,15 +280,11 @@ class ProxyScrapingService:
def start_scheduler(self):
"""Start the scheduler and wait for scheduled jobs."""
try:
self.schedule_daily_job()
self.schedule_jobs()
logger.info("Scheduler started. Waiting for scheduled jobs...")
logger.info("Press Ctrl+C to exit")
# Also run immediately on startup
logger.info("Running initial scraping job on startup...")
self.run_scraping_job()
# Start scheduler
self.scheduler.start()
@@ -226,9 +307,9 @@ def main():
logger.info(f" PostgreSQL: {settings.postgres_host}:{settings.postgres_port}")
logger.info(f" Database: {settings.postgres_db}")
logger.info(f" Proxies File: {settings.proxies_file}")
logger.info(
f" Schedule: Daily between {settings.schedule_hour_start:02d}:00 - {settings.schedule_hour_end:02d}:00 UTC"
)
logger.info(f" Schedules:")
logger.info(f" - Validation: 09:00 UTC and 16:00 UTC")
logger.info(f" - Scraping: 02:00 UTC")
logger.info(f" Proxy Timeout: {settings.proxy_timeout}s")
logger.info(f" Validation URL: {settings.validation_url}")
logger.info(f" Development Mode: {settings.development}")