""" Main application service for proxy scraping, validation, and storage. """ import logging import sys import random from datetime import datetime, time as dt_time from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger import pytz import colorlog from config import settings from database import DatabaseManager from validator import ProxyValidator from scrapers import scrape_from_file def setup_logging(): """Configure colored logging.""" handler = colorlog.StreamHandler() handler.setFormatter( colorlog.ColoredFormatter( "%(log_color)s%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", log_colors={ "DEBUG": "cyan", "INFO": "green", "WARNING": "yellow", "ERROR": "red", "CRITICAL": "red,bg_white", }, ) ) root_logger = logging.getLogger() root_logger.addHandler(handler) root_logger.setLevel(getattr(logging, settings.log_level.upper())) logger = logging.getLogger(__name__) class ProxyScrapingService: """Main service for orchestrating proxy scraping operations.""" def __init__(self): """Initialize the proxy scraping service.""" self.db = DatabaseManager() self.validator = ProxyValidator() self.scheduler = BlockingScheduler(timezone=pytz.UTC) def run_scraping_job(self): """Execute the complete scraping, validation, and storage workflow.""" job_start = datetime.now() logger.info("=" * 80) logger.info(f"Starting proxy scraping job at {job_start}") logger.info("=" * 80) try: # Step 1: Scrape proxies from sources logger.info("Step 1: Scraping proxies from sources...") raw_proxies = scrape_from_file(settings.proxies_file) if not raw_proxies: logger.warning("No proxies scraped from sources") return logger.info(f"Scraped {len(raw_proxies)} proxies from sources") # Step 2: Remove duplicates based on IP:PORT:PROTOCOL logger.info("Step 2: Removing duplicates...") unique_proxies = self._deduplicate_proxies(raw_proxies) logger.info( f"Reduced to {len(unique_proxies)} unique proxies " f"(removed {len(raw_proxies) - len(unique_proxies)} duplicates)" ) # Step 3: Validate proxies logger.info("Step 3: Validating proxies for connectivity and anonymity...") validated_proxies = self.validator.validate_proxies_bulk( unique_proxies, max_workers=20 ) if not validated_proxies: logger.warning("No proxies passed validation") return logger.info( f"{len(validated_proxies)} proxies validated successfully " f"({len(validated_proxies) / len(unique_proxies) * 100:.1f}% success rate)" ) # Step 4: Store in database logger.info("Step 4: Storing validated proxies in database...") inserted_count = 0 for proxy in validated_proxies: if self.db.insert_proxy(proxy): inserted_count += 1 logger.info( f"Inserted {inserted_count} new anonymous proxies into database " f"({len(validated_proxies) - inserted_count} already existed)" ) # Step 5: Display statistics logger.info("Step 5: Database statistics...") stats = self.db.get_stats() self._display_stats(stats) except Exception as e: logger.error(f"Error during scraping job: {e}", exc_info=True) finally: job_end = datetime.now() duration = (job_end - job_start).total_seconds() logger.info("=" * 80) logger.info(f"Scraping job completed at {job_end}") logger.info(f"Total duration: {duration:.2f} seconds") logger.info("=" * 80) def _deduplicate_proxies(self, proxies: list) -> list: """ Remove duplicate proxies based on IP:PORT:PROTOCOL. Args: proxies: List of proxy dictionaries Returns: List of unique proxies """ seen = set() unique = [] for proxy in proxies: key = ( proxy["ip_address"], proxy["port"], proxy["protocol"], ) if key not in seen: seen.add(key) unique.append(proxy) return unique def _display_stats(self, stats: dict): """ Display database statistics. Args: stats: Statistics dictionary from database """ logger.info("Database Statistics:") logger.info(f" Total Proxies: {stats.get('total_proxies', 0)}") logger.info(f" Active Proxies: {stats.get('active_proxies', 0)}") logger.info(f" Anonymous Proxies: {stats.get('anonymous_proxies', 0)}") logger.info(f" Unique Protocols: {stats.get('unique_protocols', 0)}") logger.info(f" Unique Countries: {stats.get('unique_countries', 0)}") avg_response = stats.get("avg_response_time") if avg_response: logger.info(f" Avg Response Time: {avg_response:.2f}ms") def schedule_daily_job(self): """Schedule the scraping job to run once daily between configured hours.""" # Generate random time between start and end hour random_hour = random.randint( settings.schedule_hour_start, settings.schedule_hour_end - 1 ) random_minute = random.randint(0, 59) logger.info( f"Scheduling daily scraping job at {random_hour:02d}:{random_minute:02d} UTC" ) # Create cron trigger for daily execution trigger = CronTrigger( hour=random_hour, minute=random_minute, timezone=pytz.UTC ) self.scheduler.add_job( self.run_scraping_job, trigger=trigger, id="daily_proxy_scraping", name="Daily Proxy Scraping Job", replace_existing=True, ) def run_immediate(self): """Run scraping job immediately (for testing or manual execution).""" logger.info("Running immediate scraping job...") self.run_scraping_job() def start_scheduler(self): """Start the scheduler and wait for scheduled jobs.""" try: self.schedule_daily_job() logger.info("Scheduler started. Waiting for scheduled jobs...") logger.info("Press Ctrl+C to exit") # Also run immediately on startup logger.info("Running initial scraping job on startup...") self.run_scraping_job() # Start scheduler self.scheduler.start() except (KeyboardInterrupt, SystemExit): logger.info("Scheduler shutdown requested") self.scheduler.shutdown() self.db.close() except Exception as e: logger.error(f"Scheduler error: {e}", exc_info=True) self.db.close() sys.exit(1) def main(): """Main entry point for the application.""" setup_logging() logger.info("Proxy Scraping Service Starting...") logger.info(f"Configuration:") logger.info(f" PostgreSQL: {settings.postgres_host}:{settings.postgres_port}") logger.info(f" Database: {settings.postgres_db}") logger.info(f" Proxies File: {settings.proxies_file}") logger.info( f" Schedule: Daily between {settings.schedule_hour_start:02d}:00 - {settings.schedule_hour_end:02d}:00 UTC" ) logger.info(f" Proxy Timeout: {settings.proxy_timeout}s") logger.info(f" Validation URL: {settings.validation_url}") service = ProxyScrapingService() # Check for command line arguments if len(sys.argv) > 1 and sys.argv[1] == "--immediate": # Run immediately and exit service.run_immediate() service.db.close() else: # Start scheduler for recurring jobs service.start_scheduler() if __name__ == "__main__": main()