From acbac15c20a02cd7e2e529736243f0c9ab7ed8a4 Mon Sep 17 00:00:00 2001 From: antanst <> Date: Thu, 19 Jun 2025 09:59:50 +0300 Subject: [PATCH] Improve crawler performance and worker coordination - Add WaitGroup synchronization for workers to prevent overlapping scheduler runs - Increase history fetch multiplier and sleep intervals for better resource usage - Simplify error handling and logging in worker processing - Update SQL query to exclude error snapshots from history selection - Fix worker ID variable reference in spawning loop - Streamline snapshot update logic and error reporting --- cmd/crawler/crawler.go | 40 ++++++++++++++--------- common/shared.go | 6 +++- common/worker.go | 73 ++++++++++++++++++++---------------------- db/db_queries.go | 9 ++---- 4 files changed, 66 insertions(+), 62 deletions(-) diff --git a/cmd/crawler/crawler.go b/cmd/crawler/crawler.go index 6df5726..f0dbd97 100644 --- a/cmd/crawler/crawler.go +++ b/cmd/crawler/crawler.go @@ -148,7 +148,7 @@ func spawnWorkers(total int) { go func(a int) { for { job := <-jobs - common.RunWorkerWithTx(id, job) + common.RunWorkerWithTx(a, job) } }(id) } @@ -251,14 +251,14 @@ func runJobScheduler() { // When out of pending URLs, add some random ones. if len(distinctHosts) == 0 { // Queue random old URLs from history. - count, err := fetchSnapshotsFromHistory(dbCtx, tx, config.CONFIG.NumOfWorkers*3, config.CONFIG.SkipIfUpdatedDays) + count, err := fetchSnapshotsFromHistory(dbCtx, tx, config.CONFIG.NumOfWorkers*10, config.CONFIG.SkipIfUpdatedDays) if err != nil { common.FatalErrorsChan <- err return } if count == 0 { - contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...") - time.Sleep(30 * time.Second) + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...") + time.Sleep(120 * time.Second) continue } distinctHosts, err = gemdb.Database.GetUrlHosts(dbCtx, tx) @@ -282,28 +282,39 @@ func runJobScheduler() { } if len(urls) == 0 { - contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...") - time.Sleep(30 * time.Second) + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...") + time.Sleep(120 * time.Second) continue } contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Queueing %d distinct hosts -> %d urls to crawl", len(distinctHosts), len(urls)) + + // Add jobs to WaitGroup before queuing + common.WorkerWG.Add(len(urls)) + for _, url := range urls { jobs <- url } + + // Wait for all workers to complete their jobs + common.WorkerWG.Wait() + + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "All workers done. New scheduler run starts") + logging.LogInfo("") + logging.LogInfo("") } } func enqueueSeedURLs(ctx context.Context, tx *sqlx.Tx) error { // Get seed URLs from seedList module - urls := seedList.GetSeedURLs() - - for _, url := range urls { - err := gemdb.Database.InsertURL(ctx, tx, url) - if err != nil { - return err - } - } + //urls := seedList.GetSeedURLs() + // + //for _, url := range urls { + // err := gemdb.Database.InsertURL(ctx, tx, url) + // if err != nil { + // return err + // } + //} return nil } @@ -332,7 +343,6 @@ func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age in } if len(snapshotURLs) == 0 { - contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No URLs with old latest crawl attempts found to recrawl") return 0, nil } diff --git a/common/shared.go b/common/shared.go index d190f49..0eb82ec 100644 --- a/common/shared.go +++ b/common/shared.go @@ -1,6 +1,9 @@ package common -import "os" +import ( + "os" + "sync" +) // FatalErrorsChan accepts errors from workers. // In case of fatal error, gracefully @@ -8,6 +11,7 @@ import "os" var ( FatalErrorsChan chan error SignalsChan chan os.Signal + WorkerWG sync.WaitGroup ) const VERSION string = "0.0.1" diff --git a/common/worker.go b/common/worker.go index 50df35c..78d54ea 100644 --- a/common/worker.go +++ b/common/worker.go @@ -27,7 +27,6 @@ import ( ) func RunWorkerWithTx(workerID int, job string) { - // Extract host from URL for the context. parsedURL, err := url2.ParseURL(job, "", true) if err != nil { logging.LogInfo("Failed to parse URL: %s Error: %s", job, err) @@ -40,7 +39,6 @@ func RunWorkerWithTx(workerID int, job string) { ctx, cancel := contextutil.NewRequestContext(baseCtx, job, host, workerID) ctx = contextutil.ContextWithComponent(ctx, "worker") defer cancel() // Ensure the context is cancelled when we're done - // contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "======================================\n\n") contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Starting worker for URL %s", job) // Create a new db transaction @@ -51,6 +49,7 @@ func RunWorkerWithTx(workerID int, job string) { } err = runWorker(ctx, tx, []string{job}) + WorkerWG.Done() if err != nil { // Two cases to handle: // - context cancellation/timeout errors (log and ignore) @@ -114,17 +113,11 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) { s, err := snapshot.SnapshotFromURL(url, true) if err != nil { - contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Failed to parse URL: %v", err) return err } // We always use the normalized URL if url != s.URL.Full { - //err = gemdb.Database.CheckAndUpdateNormalizedURL(ctx, tx, url, s.URL.Full) - //if err != nil { - // return err - //} - //contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Normalized URL: %s → %s", url, s.URL.Full) url = s.URL.Full } @@ -147,7 +140,7 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) { // Only check blacklist if URL is not whitelisted if !isUrlWhitelisted && blackList.IsBlacklisted(s.URL.String()) { - contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches blacklist, ignoring %s", url) + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "URL matches blacklist, skipped") s.Error = null.StringFrom(commonErrors.ErrBlacklistMatch.Error()) return saveSnapshotAndRemoveURL(ctx, tx, s) } @@ -159,7 +152,7 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) { // add it as an error and remove url robotMatch = robotsMatch.RobotMatch(ctx, s.URL.String()) if robotMatch { - contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches robots.txt, skipping") + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "URL matches robots.txt, skipped") s.Error = null.StringFrom(commonErrors.ErrRobotsMatch.Error()) return saveSnapshotAndRemoveURL(ctx, tx, s) } @@ -184,7 +177,6 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) { } if err != nil { - contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Error visiting URL: %v", err) return err } @@ -223,43 +215,32 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) { } } - // Save the snapshot and remove the URL from the queue - if s.Error.ValueOrZero() != "" { - // Only save error if we didn't have any valid - // snapshot data from a previous crawl! - shouldUpdateSnapshot, err := shouldUpdateSnapshotData(ctx, tx, s) - if err != nil { - return err - } - if shouldUpdateSnapshot { - contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero()) - return saveSnapshotAndRemoveURL(ctx, tx, s) - } else { - contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s (but old content exists, updating crawl date)", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero()) - err = gemdb.Database.UpdateLastCrawled(ctx, tx, s.URL.String()) - if err != nil { - return err - } - return removeURL(ctx, tx, s.URL.String()) - } - } else { - contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d", s.ResponseCode.ValueOrZero()) - return saveSnapshotAndRemoveURL(ctx, tx, s) - } + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d", s.ResponseCode.ValueOrZero()) + return saveSnapshotAndRemoveURL(ctx, tx, s) } func shouldUpdateSnapshotData(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) { + // If we don't have an error, save the new snapshot. + if !s.Error.Valid { + return true, nil + } prevSnapshot, err := gemdb.Database.GetLatestSnapshot(ctx, tx, s.URL.String()) if err != nil { return false, err } + // If we don't have a previous snapshot, save it anyway. if prevSnapshot == nil { return true, nil } - if prevSnapshot.ResponseCode.Valid { - return false, nil + // If we have a previous snapshot, + // and it didn't have an error, save. + // This means that we can have a max + // of one consecutive snapshot with + // an error. + if prevSnapshot.Error.ValueOrZero() == "" { + return true, nil } - return true, nil + return false, nil } func isContentIdentical(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) { @@ -299,11 +280,25 @@ func removeURL(ctx context.Context, tx *sqlx.Tx, url string) error { } func saveSnapshotAndRemoveURL(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error { - err := gemdb.Database.SaveSnapshot(ctx, tx, s) + shouldUpdateSnapshot, err := shouldUpdateSnapshotData(ctx, tx, s) if err != nil { return err } - return gemdb.Database.DeleteURL(ctx, tx, s.URL.String()) + if shouldUpdateSnapshot { + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero()) + err := gemdb.Database.SaveSnapshot(ctx, tx, s) + if err != nil { + return err + } + return removeURL(ctx, tx, s.URL.String()) + } else { + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s (but old content exists, updating crawl date)", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero()) + err = gemdb.Database.UpdateLastCrawled(ctx, tx, s.URL.String()) + if err != nil { + return err + } + return removeURL(ctx, tx, s.URL.String()) + } } // shouldPersistURL returns true given URL is a diff --git a/db/db_queries.go b/db/db_queries.go index 2e99b67..a630e96 100644 --- a/db/db_queries.go +++ b/db/db_queries.go @@ -115,12 +115,7 @@ LIMIT $1 SQL_UPDATE_LAST_CRAWLED = ` UPDATE snapshots SET last_crawled = CURRENT_TIMESTAMP - WHERE id = ( - SELECT id FROM snapshots - WHERE url = $1 - ORDER BY timestamp DESC - LIMIT 1 - ) + WHERE url = $1 ` // SQL_FETCH_SNAPSHOTS_FROM_HISTORY Fetches URLs from snapshots for re-crawling based on last_crawled timestamp // This query finds root domain URLs that haven't been crawled recently and selects @@ -137,7 +132,7 @@ LIMIT $1 host, COALESCE(MAX(last_crawled), '1970-01-01'::timestamp) as latest_attempt FROM snapshots - WHERE url ~ '^gemini://[^/]+/?$' AND mimetype = 'text/gemini' + WHERE url ~ '^gemini://[^/]+/?$' AND mimetype = 'text/gemini' AND error IS NULL GROUP BY url, host ), root_urls_with_content AS (