Improve crawler performance and worker coordination

- Add WaitGroup synchronization for workers to prevent overlapping scheduler runs
- Increase history fetch multiplier and sleep intervals for better resource usage
- Simplify error handling and logging in worker processing
- Update SQL query to exclude error snapshots from history selection
- Fix worker ID variable reference in spawning loop
- Streamline snapshot update logic and error reporting
This commit is contained in:
antanst
2025-06-19 09:59:50 +03:00
parent 59893efc3d
commit af42383513
4 changed files with 66 additions and 62 deletions

View File

@@ -148,7 +148,7 @@ func spawnWorkers(total int) {
go func(a int) {
for {
job := <-jobs
common.RunWorkerWithTx(id, job)
common.RunWorkerWithTx(a, job)
}
}(id)
}
@@ -251,14 +251,14 @@ func runJobScheduler() {
// When out of pending URLs, add some random ones.
if len(distinctHosts) == 0 {
// Queue random old URLs from history.
count, err := fetchSnapshotsFromHistory(dbCtx, tx, config.CONFIG.NumOfWorkers*3, config.CONFIG.SkipIfUpdatedDays)
count, err := fetchSnapshotsFromHistory(dbCtx, tx, config.CONFIG.NumOfWorkers*10, config.CONFIG.SkipIfUpdatedDays)
if err != nil {
common.FatalErrorsChan <- err
return
}
if count == 0 {
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
time.Sleep(30 * time.Second)
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
time.Sleep(120 * time.Second)
continue
}
distinctHosts, err = gemdb.Database.GetUrlHosts(dbCtx, tx)
@@ -282,28 +282,39 @@ func runJobScheduler() {
}
if len(urls) == 0 {
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
time.Sleep(30 * time.Second)
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
time.Sleep(120 * time.Second)
continue
}
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Queueing %d distinct hosts -> %d urls to crawl", len(distinctHosts), len(urls))
// Add jobs to WaitGroup before queuing
common.WorkerWG.Add(len(urls))
for _, url := range urls {
jobs <- url
}
// Wait for all workers to complete their jobs
common.WorkerWG.Wait()
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "All workers done. New scheduler run starts")
logging.LogInfo("")
logging.LogInfo("")
}
}
func enqueueSeedURLs(ctx context.Context, tx *sqlx.Tx) error {
// Get seed URLs from seedList module
urls := seedList.GetSeedURLs()
for _, url := range urls {
err := gemdb.Database.InsertURL(ctx, tx, url)
if err != nil {
return err
}
}
//urls := seedList.GetSeedURLs()
//
//for _, url := range urls {
// err := gemdb.Database.InsertURL(ctx, tx, url)
// if err != nil {
// return err
// }
//}
return nil
}
@@ -332,7 +343,6 @@ func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age in
}
if len(snapshotURLs) == 0 {
contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No URLs with old latest crawl attempts found to recrawl")
return 0, nil
}

View File

@@ -1,6 +1,9 @@
package common
import "os"
import (
"os"
"sync"
)
// FatalErrorsChan accepts errors from workers.
// In case of fatal error, gracefully
@@ -8,6 +11,7 @@ import "os"
var (
FatalErrorsChan chan error
SignalsChan chan os.Signal
WorkerWG sync.WaitGroup
)
const VERSION string = "0.0.1"

View File

@@ -27,7 +27,6 @@ import (
)
func RunWorkerWithTx(workerID int, job string) {
// Extract host from URL for the context.
parsedURL, err := url2.ParseURL(job, "", true)
if err != nil {
logging.LogInfo("Failed to parse URL: %s Error: %s", job, err)
@@ -40,7 +39,6 @@ func RunWorkerWithTx(workerID int, job string) {
ctx, cancel := contextutil.NewRequestContext(baseCtx, job, host, workerID)
ctx = contextutil.ContextWithComponent(ctx, "worker")
defer cancel() // Ensure the context is cancelled when we're done
// contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "======================================\n\n")
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Starting worker for URL %s", job)
// Create a new db transaction
@@ -51,6 +49,7 @@ func RunWorkerWithTx(workerID int, job string) {
}
err = runWorker(ctx, tx, []string{job})
WorkerWG.Done()
if err != nil {
// Two cases to handle:
// - context cancellation/timeout errors (log and ignore)
@@ -114,17 +113,11 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
s, err := snapshot.SnapshotFromURL(url, true)
if err != nil {
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Failed to parse URL: %v", err)
return err
}
// We always use the normalized URL
if url != s.URL.Full {
//err = gemdb.Database.CheckAndUpdateNormalizedURL(ctx, tx, url, s.URL.Full)
//if err != nil {
// return err
//}
//contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Normalized URL: %s → %s", url, s.URL.Full)
url = s.URL.Full
}
@@ -147,7 +140,7 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
// Only check blacklist if URL is not whitelisted
if !isUrlWhitelisted && blackList.IsBlacklisted(s.URL.String()) {
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches blacklist, ignoring %s", url)
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "URL matches blacklist, skipped")
s.Error = null.StringFrom(commonErrors.ErrBlacklistMatch.Error())
return saveSnapshotAndRemoveURL(ctx, tx, s)
}
@@ -159,7 +152,7 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
// add it as an error and remove url
robotMatch = robotsMatch.RobotMatch(ctx, s.URL.String())
if robotMatch {
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches robots.txt, skipping")
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "URL matches robots.txt, skipped")
s.Error = null.StringFrom(commonErrors.ErrRobotsMatch.Error())
return saveSnapshotAndRemoveURL(ctx, tx, s)
}
@@ -184,7 +177,6 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
}
if err != nil {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Error visiting URL: %v", err)
return err
}
@@ -223,43 +215,32 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
}
}
// Save the snapshot and remove the URL from the queue
if s.Error.ValueOrZero() != "" {
// Only save error if we didn't have any valid
// snapshot data from a previous crawl!
shouldUpdateSnapshot, err := shouldUpdateSnapshotData(ctx, tx, s)
if err != nil {
return err
}
if shouldUpdateSnapshot {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
return saveSnapshotAndRemoveURL(ctx, tx, s)
} else {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s (but old content exists, updating crawl date)", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
err = gemdb.Database.UpdateLastCrawled(ctx, tx, s.URL.String())
if err != nil {
return err
}
return removeURL(ctx, tx, s.URL.String())
}
} else {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d", s.ResponseCode.ValueOrZero())
return saveSnapshotAndRemoveURL(ctx, tx, s)
}
}
func shouldUpdateSnapshotData(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) {
// If we don't have an error, save the new snapshot.
if !s.Error.Valid {
return true, nil
}
prevSnapshot, err := gemdb.Database.GetLatestSnapshot(ctx, tx, s.URL.String())
if err != nil {
return false, err
}
// If we don't have a previous snapshot, save it anyway.
if prevSnapshot == nil {
return true, nil
}
if prevSnapshot.ResponseCode.Valid {
return false, nil
}
// If we have a previous snapshot,
// and it didn't have an error, save.
// This means that we can have a max
// of one consecutive snapshot with
// an error.
if prevSnapshot.Error.ValueOrZero() == "" {
return true, nil
}
return false, nil
}
func isContentIdentical(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) {
@@ -299,11 +280,25 @@ func removeURL(ctx context.Context, tx *sqlx.Tx, url string) error {
}
func saveSnapshotAndRemoveURL(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error {
shouldUpdateSnapshot, err := shouldUpdateSnapshotData(ctx, tx, s)
if err != nil {
return err
}
if shouldUpdateSnapshot {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
err := gemdb.Database.SaveSnapshot(ctx, tx, s)
if err != nil {
return err
}
return gemdb.Database.DeleteURL(ctx, tx, s.URL.String())
return removeURL(ctx, tx, s.URL.String())
} else {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s (but old content exists, updating crawl date)", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
err = gemdb.Database.UpdateLastCrawled(ctx, tx, s.URL.String())
if err != nil {
return err
}
return removeURL(ctx, tx, s.URL.String())
}
}
// shouldPersistURL returns true given URL is a

View File

@@ -115,12 +115,7 @@ LIMIT $1
SQL_UPDATE_LAST_CRAWLED = `
UPDATE snapshots
SET last_crawled = CURRENT_TIMESTAMP
WHERE id = (
SELECT id FROM snapshots
WHERE url = $1
ORDER BY timestamp DESC
LIMIT 1
)
`
// SQL_FETCH_SNAPSHOTS_FROM_HISTORY Fetches URLs from snapshots for re-crawling based on last_crawled timestamp
// This query finds root domain URLs that haven't been crawled recently and selects
@@ -137,7 +132,7 @@ LIMIT $1
host,
COALESCE(MAX(last_crawled), '1970-01-01'::timestamp) as latest_attempt
FROM snapshots
WHERE url ~ '^gemini://[^/]+/?$' AND mimetype = 'text/gemini'
WHERE url ~ '^gemini://[^/]+/?$' AND mimetype = 'text/gemini' AND error IS NULL
GROUP BY url, host
),
root_urls_with_content AS (