package main import ( "context" "os" "os/signal" "strings" "syscall" "time" "gemini-grc/common" "gemini-grc/common/blackList" "gemini-grc/common/contextlog" "gemini-grc/common/seedList" "gemini-grc/common/whiteList" "gemini-grc/config" "gemini-grc/contextutil" gemdb "gemini-grc/db" "gemini-grc/robotsMatch" "gemini-grc/util" "git.antanst.com/antanst/logging" "github.com/jmoiron/sqlx" ) var jobs chan string func main() { var err error err = initializeApp() if err != nil { handleUnexpectedError(err) } err = runApp() if err != nil { handleUnexpectedError(err) } err = shutdownApp() if err != nil { handleUnexpectedError(err) } } func handleUnexpectedError(err error) { logging.LogError("Unexpected error: %v", err) _ = shutdownApp() os.Exit(1) } func initializeApp() error { config.CONFIG = *config.Initialize() logging.InitSlogger(config.CONFIG.LogLevel) logging.LogInfo("Starting up. Press Ctrl+C to exit") common.SignalsChan = make(chan os.Signal, 1) signal.Notify(common.SignalsChan, syscall.SIGINT, syscall.SIGTERM) common.FatalErrorsChan = make(chan error) jobs = make(chan string, config.CONFIG.NumOfWorkers) var err error err = blackList.Initialize() if err != nil { return err } err = whiteList.Initialize() if err != nil { return err } err = seedList.Initialize() if err != nil { return err } err = robotsMatch.Initialize() if err != nil { return err } ctx := context.Background() err = gemdb.Database.Initialize(ctx) if err != nil { return err } if config.CONFIG.SeedUrlPath != "" { err := AddURLsFromFile(ctx, config.CONFIG.SeedUrlPath) if err != nil { return err } } return nil } func shutdownApp() error { var err error err = blackList.Shutdown() if err != nil { return err } err = whiteList.Shutdown() if err != nil { return err } err = seedList.Shutdown() if err != nil { return err } err = robotsMatch.Shutdown() if err != nil { return err } ctx := context.Background() err = gemdb.Database.Shutdown(ctx) if err != nil { return err } return nil } func runApp() (err error) { go spawnWorkers(config.CONFIG.NumOfWorkers) go runJobScheduler() for { select { case <-common.SignalsChan: logging.LogWarn("Received SIGINT or SIGTERM signal, exiting") return nil case err := <-common.FatalErrorsChan: return err } } } func spawnWorkers(total int) { for id := 0; id < total; id++ { go func(a int) { for { job := <-jobs common.RunWorkerWithTx(id, job) } }(id) } } // Current Logic Flow: // // 1. Create transaction // 2. Get distinct hosts // 3. If no hosts → fetch snapshots from history (adds URLs to queue) // 4. Re-query for hosts (should now have some) // 5. Get URLs from hosts // 6. Commit transaction // 7. Queue URLs for workers func runJobScheduler() { var tx *sqlx.Tx var err error ctx := contextutil.ContextWithComponent(context.Background(), "crawler") tx, err = gemdb.Database.NewTx(ctx) if err != nil { common.FatalErrorsChan <- err return } defer func(tx *sqlx.Tx) { if tx != nil { if err := gemdb.SafeRollback(ctx, tx); err != nil { common.FatalErrorsChan <- err } } }(tx) // First, check if the URLs table is empty. var urlCount int if config.CONFIG.GopherEnable { err = tx.Get(&urlCount, "SELECT COUNT(*) FROM urls") } else { err = tx.Get(&urlCount, "SELECT COUNT(*) FROM urls WHERE url LIKE 'gemini://%'") } if err != nil { common.FatalErrorsChan <- err return } err = tx.Commit() if err != nil { common.FatalErrorsChan <- err return } // If no pending URLs, add the ones from the standard crawl set. tx, err = gemdb.Database.NewTx(ctx) if err != nil { common.FatalErrorsChan <- err return } if urlCount == 0 { logging.LogInfo("URLs table is empty, enqueueing standard crawl set") err = enqueueSeedURLs(ctx, tx) if err != nil { common.FatalErrorsChan <- err return } // Commit this tx here so the loop sees the changes. err := tx.Commit() if err != nil { common.FatalErrorsChan <- err return } } else { contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Found %d pending URLs to crawl.", urlCount) } // Main job loop. // We get URLs from the pending URLs table, // add crawling jobs for those, // and sleep a bit after each run. for { contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Polling DB for jobs") // Use fresh context for DB operations to avoid timeouts/cancellation // from the long-lived scheduler context affecting database transactions dbCtx := context.Background() tx, err = gemdb.Database.NewTx(dbCtx) if err != nil { common.FatalErrorsChan <- err return } // Get all distinct hosts from pending URLs distinctHosts, err := gemdb.Database.GetUrlHosts(dbCtx, tx) if err != nil { common.FatalErrorsChan <- err return } // When out of pending URLs, add some random ones. if len(distinctHosts) == 0 { // Queue random old URLs from history. count, err := fetchSnapshotsFromHistory(dbCtx, tx, config.CONFIG.NumOfWorkers*3, config.CONFIG.SkipIfUpdatedDays) if err != nil { common.FatalErrorsChan <- err return } if count == 0 { contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...") time.Sleep(30 * time.Second) continue } distinctHosts, err = gemdb.Database.GetUrlHosts(dbCtx, tx) if err != nil { common.FatalErrorsChan <- err return } } // Get some URLs from each host, up to a limit urls, err := gemdb.Database.GetRandomUrlsFromHosts(dbCtx, distinctHosts, 10, tx) if err != nil { common.FatalErrorsChan <- err return } err = tx.Commit() if err != nil { common.FatalErrorsChan <- err return } if len(urls) == 0 { contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...") time.Sleep(30 * time.Second) continue } contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Queueing %d distinct hosts -> %d urls to crawl", len(distinctHosts), len(urls)) for _, url := range urls { jobs <- url } } } func enqueueSeedURLs(ctx context.Context, tx *sqlx.Tx) error { // Get seed URLs from seedList module urls := seedList.GetSeedURLs() for _, url := range urls { err := gemdb.Database.InsertURL(ctx, tx, url) if err != nil { return err } } return nil } //func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age int) (int, error) { // // Select snapshots from snapshots table for recrawling // // They should be at least days old, random order, // // one snapshot per host if possible. // historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory") // contextlog.LogDebugWithContext(historyCtx, logging.GetSlogger(), "Looking for %d snapshots at least %d days old to recrawl", num, age) // // // Calculate the cutoff date // cutoffDate := time.Now().AddDate(0, 0, -age) // // // SQL to select one random URL per host from snapshots older than the cutoff date // // TODO implement when gopher enabled // query := ` // WITH hosts AS ( // SELECT DISTINCT host // FROM snapshots // WHERE url ~ '^gemini://[^/]+/?$' // AND gemtext IS NOT NULL // AND timestamp < $1 // AND error IS NULL // ), // ranked_snapshots AS ( // SELECT // s.url, // s.host, // s.timestamp, // ROW_NUMBER() OVER (PARTITION BY s.host ORDER BY RANDOM()) as rank // FROM snapshots s // JOIN hosts h ON s.host = h.host // WHERE s.timestamp < $1 // ) // SELECT url, host // FROM ranked_snapshots // WHERE rank = 1 // ORDER BY RANDOM() // LIMIT $2 // ` // // type SnapshotURL struct { // URL string `db:"url"` // Host string `db:"host"` // } // // // Execute the query // var snapshotURLs []SnapshotURL // err := tx.Select(&snapshotURLs, query, cutoffDate, num) // if err != nil { // return 0, err // } // // if len(snapshotURLs) == 0 { // historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory") // contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No old snapshots found to recrawl") // return 0, nil // } // // // For each selected snapshot, add the URL to the urls table // insertCount := 0 // for _, snapshot := range snapshotURLs { // err := gemdb.Database.InsertURL(ctx, tx, snapshot.URL) // if err != nil { // logging.LogError("Error inserting URL %s from old snapshot to queue: %v", snapshot.URL, err) // return 0, err // } // insertCount++ // } // // // Note: The transaction is committed by the caller (runJobScheduler), // // not here. This function is called as part of a larger transaction. // if insertCount > 0 { // historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory") // contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "Added %d old URLs to recrawl queue", insertCount) // } // // return insertCount, nil //} func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age int) (int, error) { // Select snapshots from snapshots table for recrawling // Find URLs where the LATEST crawl attempt (via last_crawled) is at least days old // Now works correctly with SkipIdenticalContent=true - fixes infinite recrawl loop historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory") contextlog.LogDebugWithContext(historyCtx, logging.GetSlogger(), "Looking for %d URLs whose latest crawl attempt is at least %d days old to recrawl", num, age) // Calculate the cutoff date cutoffDate := time.Now().AddDate(0, 0, -age) // SQL to select URLs where the latest crawl attempt is older than the cutoff date // Now uses last_crawled timestamp to track actual crawl attempts // Works correctly with SkipIdenticalContent setting - avoids infinite recrawl loop // One URL per host for diversity, focusing on root domain URLs query := ` WITH latest_attempts AS ( SELECT url, host, COALESCE(MAX(last_crawled), '1970-01-01'::timestamp) as latest_attempt FROM snapshots WHERE url ~ '^gemini://[^/]+/?$' GROUP BY url, host ), root_urls_with_content AS ( SELECT DISTINCT la.url, la.host, la.latest_attempt FROM latest_attempts la JOIN snapshots s ON s.url = la.url WHERE (s.gemtext IS NOT NULL OR s.data IS NOT NULL) AND s.response_code BETWEEN 20 AND 29 ), eligible_urls AS ( SELECT url, host, latest_attempt FROM root_urls_with_content WHERE latest_attempt < $1 ), ranked_urls AS ( SELECT url, host, latest_attempt, ROW_NUMBER() OVER (PARTITION BY host ORDER BY RANDOM()) as rank FROM eligible_urls ) SELECT url, host FROM ranked_urls WHERE rank = 1 ORDER BY RANDOM() LIMIT $2 ` type SnapshotURL struct { URL string `db:"url"` Host string `db:"host"` } // Execute the query var snapshotURLs []SnapshotURL err := tx.Select(&snapshotURLs, query, cutoffDate, num) if err != nil { return 0, err } if len(snapshotURLs) == 0 { contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No URLs with old latest crawl attempts found to recrawl") return 0, nil } // For each selected snapshot, add the URL to the urls table insertCount := 0 for _, snapshot := range snapshotURLs { err := gemdb.Database.InsertURL(ctx, tx, snapshot.URL) if err != nil { logging.LogError("Error inserting URL %s from old snapshot to queue: %v", snapshot.URL, err) return 0, err } insertCount++ } // Note: The transaction is committed by the caller (runJobScheduler), // not here. This function is called as part of a larger transaction. if insertCount > 0 { contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "Added %d old URLs to recrawl queue", insertCount) } return insertCount, nil } func AddURLsFromFile(ctx context.Context, filepath string) error { data, err := os.ReadFile(filepath) if err != nil { return err } lines := strings.Split(string(data), "\n") urls := util.Filter(lines, func(url string) bool { return strings.TrimSpace(url) != "" }) // Create a context for database operations tx, err := gemdb.Database.NewTx(ctx) if err != nil { return err } // Insert all the URLs for _, url := range urls { fileCtx := contextutil.ContextWithComponent(context.Background(), "AddURLsFromFile") contextlog.LogInfoWithContext(fileCtx, logging.GetSlogger(), "Adding %s to queue", url) err := gemdb.Database.InsertURL(ctx, tx, url) if err != nil { return err } } err = tx.Commit() if err != nil { return err } return nil }