From b996a81a0115f72683fdbb6258b717a3a598ee2d Mon Sep 17 00:00:00 2001 From: LeoMortari Date: Fri, 5 Dec 2025 17:45:13 -0300 Subject: [PATCH] Ajusta scheduler --- src/config.py | 4 -- src/database.py | 61 ++++++++++++++++++++++ src/main.py | 131 +++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 167 insertions(+), 29 deletions(-) diff --git a/src/config.py b/src/config.py index 917e1dd..4b1f1d9 100644 --- a/src/config.py +++ b/src/config.py @@ -25,10 +25,6 @@ class Settings(BaseSettings): scraping_delay: float = Field(default=2.0, alias="SCRAPING_DELAY") max_retries: int = Field(default=3, alias="MAX_RETRIES") - # Scheduling settings - schedule_hour_start: int = Field(default=2, alias="SCHEDULE_HOUR_START") - schedule_hour_end: int = Field(default=4, alias="SCHEDULE_HOUR_END") - # File paths proxies_file: str = Field(default="/app/proxies.txt", alias="PROXIES_FILE") diff --git a/src/database.py b/src/database.py index 1eb2a22..36ca049 100644 --- a/src/database.py +++ b/src/database.py @@ -286,6 +286,67 @@ class DatabaseManager: logger.error(f"Error updating proxy status: {e}") return False + def get_all_proxies(self) -> List[Dict[str, Any]]: + """ + Get all proxies from the database. + + Returns: + List of proxy dictionaries + """ + query = """ + SELECT id, ip_address, port, protocol, username, password, + country_code, country_name, city, is_active, is_anonymous, + response_time_ms, last_checked_at, last_successful_at, + success_count, failure_count, source, notes, + created_at, updated_at + FROM proxies + ORDER BY created_at DESC; + """ + + try: + with self.get_connection() as conn: + with conn.cursor(cursor_factory=RealDictCursor) as cursor: + cursor.execute(query) + return [dict(row) for row in cursor.fetchall()] + except Exception as e: + logger.error(f"Error getting all proxies: {e}") + return [] + + def delete_proxy( + self, ip_address: str, port: int, protocol: str + ) -> bool: + """ + Delete a proxy from the database. + + Args: + ip_address: Proxy IP address + port: Proxy port + protocol: Proxy protocol + + Returns: + True if deletion successful, False otherwise + """ + query = """ + DELETE FROM proxies + WHERE ip_address = %s AND port = %s AND protocol = %s + RETURNING id; + """ + + try: + with self.get_connection() as conn: + with conn.cursor() as cursor: + cursor.execute(query, (ip_address, port, protocol.upper())) + result = cursor.fetchone() + if result: + logger.info( + f"Deleted proxy: {ip_address}:{port} ({protocol})" + ) + return True + return False + except Exception as e: + logger.error(f"Error deleting proxy: {e}") + return False + def get_stats(self) -> Dict[str, Any]: """ Get database statistics. diff --git a/src/main.py b/src/main.py index e013f7e..b9ee320 100644 --- a/src/main.py +++ b/src/main.py @@ -3,8 +3,7 @@ Main application service for proxy scraping, validation, and storage. """ import logging import sys -import random -from datetime import datetime, time as dt_time +from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger import pytz @@ -119,6 +118,76 @@ class ProxyScrapingService: logger.info(f"Total duration: {duration:.2f} seconds") logger.info("=" * 80) + def run_validation_job(self): + """Execute validation of existing proxies in database.""" + job_start = datetime.now() + logger.info("=" * 80) + logger.info(f"Starting proxy validation job at {job_start}") + logger.info("=" * 80) + + try: + # Step 1: Get all proxies from database + logger.info("Step 1: Fetching all proxies from database...") + all_proxies = self.db.get_all_proxies() + + if not all_proxies: + logger.warning("No proxies found in database to validate") + return + + logger.info(f"Found {len(all_proxies)} proxies to validate") + + # Step 2: Validate each proxy + logger.info("Step 2: Validating proxies...") + validated_count = 0 + deleted_count = 0 + + for proxy in all_proxies: + ip_address = proxy["ip_address"] + port = proxy["port"] + protocol = proxy["protocol"] + + # Validate the proxy + is_active, response_time_ms, is_anonymous = self.validator.validate_proxy( + ip_address, port, protocol + ) + + if is_active and response_time_ms is not None: + # Proxy is working - update status in database + if self.db.update_proxy_status( + ip_address, port, protocol, True, response_time_ms + ): + validated_count += 1 + logger.info( + f"Proxy {ip_address}:{port} validated successfully - {response_time_ms}ms" + ) + else: + # Proxy failed - delete from database + if self.db.delete_proxy(ip_address, port, protocol): + deleted_count += 1 + logger.info( + f"Proxy {ip_address}:{port} failed validation - removed from database" + ) + + logger.info( + f"Validation complete: {validated_count} proxies validated, " + f"{deleted_count} proxies removed" + ) + + # Step 3: Display statistics + logger.info("Step 3: Database statistics...") + stats = self.db.get_stats() + self._display_stats(stats) + + except Exception as e: + logger.error(f"Error during validation job: {e}", exc_info=True) + finally: + job_end = datetime.now() + duration = (job_end - job_start).total_seconds() + logger.info("=" * 80) + logger.info(f"Validation job completed at {job_end}") + logger.info(f"Total duration: {duration:.2f} seconds") + logger.info("=" * 80) + def _deduplicate_proxies(self, proxies: list) -> list: """ Remove duplicate proxies based on IP:PORT:PROTOCOL. @@ -162,28 +231,44 @@ class ProxyScrapingService: if avg_response: logger.info(f" Avg Response Time: {avg_response:.2f}ms") - def schedule_daily_job(self): - """Schedule the scraping job to run once daily between configured hours.""" - # Generate random time between start and end hour - random_hour = random.randint( - settings.schedule_hour_start, settings.schedule_hour_end - 1 + def schedule_jobs(self): + """Schedule all proxy jobs: validation at 9h and 16h, scraping at 2h.""" + # Validation job at 9:00 AM UTC + logger.info("Scheduling validation job at 09:00 UTC") + validation_trigger_9h = CronTrigger( + hour=9, minute=0, timezone=pytz.UTC ) - random_minute = random.randint(0, 59) - - logger.info( - f"Scheduling daily scraping job at {random_hour:02d}:{random_minute:02d} UTC" + self.scheduler.add_job( + self.run_validation_job, + trigger=validation_trigger_9h, + id="validation_9am", + name="Proxy Validation Job (9AM)", + replace_existing=True, ) - # Create cron trigger for daily execution - trigger = CronTrigger( - hour=random_hour, minute=random_minute, timezone=pytz.UTC + # Validation job at 4:00 PM UTC (16:00) + logger.info("Scheduling validation job at 16:00 UTC") + validation_trigger_16h = CronTrigger( + hour=16, minute=0, timezone=pytz.UTC + ) + self.scheduler.add_job( + self.run_validation_job, + trigger=validation_trigger_16h, + id="validation_4pm", + name="Proxy Validation Job (4PM)", + replace_existing=True, ) + # Scraping job at 2:00 AM UTC + logger.info("Scheduling scraping job at 02:00 UTC") + scraping_trigger = CronTrigger( + hour=2, minute=0, timezone=pytz.UTC + ) self.scheduler.add_job( self.run_scraping_job, - trigger=trigger, - id="daily_proxy_scraping", - name="Daily Proxy Scraping Job", + trigger=scraping_trigger, + id="scraping_2am", + name="Proxy Scraping Job (2AM)", replace_existing=True, ) @@ -195,15 +280,11 @@ class ProxyScrapingService: def start_scheduler(self): """Start the scheduler and wait for scheduled jobs.""" try: - self.schedule_daily_job() + self.schedule_jobs() logger.info("Scheduler started. Waiting for scheduled jobs...") logger.info("Press Ctrl+C to exit") - # Also run immediately on startup - logger.info("Running initial scraping job on startup...") - self.run_scraping_job() - # Start scheduler self.scheduler.start() @@ -226,9 +307,9 @@ def main(): logger.info(f" PostgreSQL: {settings.postgres_host}:{settings.postgres_port}") logger.info(f" Database: {settings.postgres_db}") logger.info(f" Proxies File: {settings.proxies_file}") - logger.info( - f" Schedule: Daily between {settings.schedule_hour_start:02d}:00 - {settings.schedule_hour_end:02d}:00 UTC" - ) + logger.info(f" Schedules:") + logger.info(f" - Validation: 09:00 UTC and 16:00 UTC") + logger.info(f" - Scraping: 02:00 UTC") logger.info(f" Proxy Timeout: {settings.proxy_timeout}s") logger.info(f" Validation URL: {settings.validation_url}") logger.info(f" Development Mode: {settings.development}")