Improve crawler performance and worker coordination
- Add WaitGroup synchronization for workers to prevent overlapping scheduler runs - Increase history fetch multiplier and sleep intervals for better resource usage - Simplify error handling and logging in worker processing - Update SQL query to exclude error snapshots from history selection - Fix worker ID variable reference in spawning loop - Streamline snapshot update logic and error reporting
This commit is contained in:
@@ -148,7 +148,7 @@ func spawnWorkers(total int) {
|
||||
go func(a int) {
|
||||
for {
|
||||
job := <-jobs
|
||||
common.RunWorkerWithTx(id, job)
|
||||
common.RunWorkerWithTx(a, job)
|
||||
}
|
||||
}(id)
|
||||
}
|
||||
@@ -251,14 +251,14 @@ func runJobScheduler() {
|
||||
// When out of pending URLs, add some random ones.
|
||||
if len(distinctHosts) == 0 {
|
||||
// Queue random old URLs from history.
|
||||
count, err := fetchSnapshotsFromHistory(dbCtx, tx, config.CONFIG.NumOfWorkers*3, config.CONFIG.SkipIfUpdatedDays)
|
||||
count, err := fetchSnapshotsFromHistory(dbCtx, tx, config.CONFIG.NumOfWorkers*10, config.CONFIG.SkipIfUpdatedDays)
|
||||
if err != nil {
|
||||
common.FatalErrorsChan <- err
|
||||
return
|
||||
}
|
||||
if count == 0 {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
|
||||
time.Sleep(30 * time.Second)
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
|
||||
time.Sleep(120 * time.Second)
|
||||
continue
|
||||
}
|
||||
distinctHosts, err = gemdb.Database.GetUrlHosts(dbCtx, tx)
|
||||
@@ -282,28 +282,39 @@ func runJobScheduler() {
|
||||
}
|
||||
|
||||
if len(urls) == 0 {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
|
||||
time.Sleep(30 * time.Second)
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
|
||||
time.Sleep(120 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Queueing %d distinct hosts -> %d urls to crawl", len(distinctHosts), len(urls))
|
||||
|
||||
// Add jobs to WaitGroup before queuing
|
||||
common.WorkerWG.Add(len(urls))
|
||||
|
||||
for _, url := range urls {
|
||||
jobs <- url
|
||||
}
|
||||
|
||||
// Wait for all workers to complete their jobs
|
||||
common.WorkerWG.Wait()
|
||||
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "All workers done. New scheduler run starts")
|
||||
logging.LogInfo("")
|
||||
logging.LogInfo("")
|
||||
}
|
||||
}
|
||||
|
||||
func enqueueSeedURLs(ctx context.Context, tx *sqlx.Tx) error {
|
||||
// Get seed URLs from seedList module
|
||||
urls := seedList.GetSeedURLs()
|
||||
|
||||
for _, url := range urls {
|
||||
err := gemdb.Database.InsertURL(ctx, tx, url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
//urls := seedList.GetSeedURLs()
|
||||
//
|
||||
//for _, url := range urls {
|
||||
// err := gemdb.Database.InsertURL(ctx, tx, url)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -332,7 +343,6 @@ func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age in
|
||||
}
|
||||
|
||||
if len(snapshotURLs) == 0 {
|
||||
contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No URLs with old latest crawl attempts found to recrawl")
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package common
|
||||
|
||||
import "os"
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// FatalErrorsChan accepts errors from workers.
|
||||
// In case of fatal error, gracefully
|
||||
@@ -8,6 +11,7 @@ import "os"
|
||||
var (
|
||||
FatalErrorsChan chan error
|
||||
SignalsChan chan os.Signal
|
||||
WorkerWG sync.WaitGroup
|
||||
)
|
||||
|
||||
const VERSION string = "0.0.1"
|
||||
|
||||
@@ -27,7 +27,6 @@ import (
|
||||
)
|
||||
|
||||
func RunWorkerWithTx(workerID int, job string) {
|
||||
// Extract host from URL for the context.
|
||||
parsedURL, err := url2.ParseURL(job, "", true)
|
||||
if err != nil {
|
||||
logging.LogInfo("Failed to parse URL: %s Error: %s", job, err)
|
||||
@@ -40,7 +39,6 @@ func RunWorkerWithTx(workerID int, job string) {
|
||||
ctx, cancel := contextutil.NewRequestContext(baseCtx, job, host, workerID)
|
||||
ctx = contextutil.ContextWithComponent(ctx, "worker")
|
||||
defer cancel() // Ensure the context is cancelled when we're done
|
||||
// contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "======================================\n\n")
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Starting worker for URL %s", job)
|
||||
|
||||
// Create a new db transaction
|
||||
@@ -51,6 +49,7 @@ func RunWorkerWithTx(workerID int, job string) {
|
||||
}
|
||||
|
||||
err = runWorker(ctx, tx, []string{job})
|
||||
WorkerWG.Done()
|
||||
if err != nil {
|
||||
// Two cases to handle:
|
||||
// - context cancellation/timeout errors (log and ignore)
|
||||
@@ -114,17 +113,11 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
||||
|
||||
s, err := snapshot.SnapshotFromURL(url, true)
|
||||
if err != nil {
|
||||
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Failed to parse URL: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// We always use the normalized URL
|
||||
if url != s.URL.Full {
|
||||
//err = gemdb.Database.CheckAndUpdateNormalizedURL(ctx, tx, url, s.URL.Full)
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
//contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Normalized URL: %s → %s", url, s.URL.Full)
|
||||
url = s.URL.Full
|
||||
}
|
||||
|
||||
@@ -147,7 +140,7 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
||||
|
||||
// Only check blacklist if URL is not whitelisted
|
||||
if !isUrlWhitelisted && blackList.IsBlacklisted(s.URL.String()) {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches blacklist, ignoring %s", url)
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "URL matches blacklist, skipped")
|
||||
s.Error = null.StringFrom(commonErrors.ErrBlacklistMatch.Error())
|
||||
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
||||
}
|
||||
@@ -159,7 +152,7 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
||||
// add it as an error and remove url
|
||||
robotMatch = robotsMatch.RobotMatch(ctx, s.URL.String())
|
||||
if robotMatch {
|
||||
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches robots.txt, skipping")
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "URL matches robots.txt, skipped")
|
||||
s.Error = null.StringFrom(commonErrors.ErrRobotsMatch.Error())
|
||||
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
||||
}
|
||||
@@ -184,7 +177,6 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Error visiting URL: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -223,43 +215,32 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Save the snapshot and remove the URL from the queue
|
||||
if s.Error.ValueOrZero() != "" {
|
||||
// Only save error if we didn't have any valid
|
||||
// snapshot data from a previous crawl!
|
||||
shouldUpdateSnapshot, err := shouldUpdateSnapshotData(ctx, tx, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if shouldUpdateSnapshot {
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
|
||||
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
||||
} else {
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s (but old content exists, updating crawl date)", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
|
||||
err = gemdb.Database.UpdateLastCrawled(ctx, tx, s.URL.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return removeURL(ctx, tx, s.URL.String())
|
||||
}
|
||||
} else {
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d", s.ResponseCode.ValueOrZero())
|
||||
return saveSnapshotAndRemoveURL(ctx, tx, s)
|
||||
}
|
||||
}
|
||||
|
||||
func shouldUpdateSnapshotData(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) {
|
||||
// If we don't have an error, save the new snapshot.
|
||||
if !s.Error.Valid {
|
||||
return true, nil
|
||||
}
|
||||
prevSnapshot, err := gemdb.Database.GetLatestSnapshot(ctx, tx, s.URL.String())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// If we don't have a previous snapshot, save it anyway.
|
||||
if prevSnapshot == nil {
|
||||
return true, nil
|
||||
}
|
||||
if prevSnapshot.ResponseCode.Valid {
|
||||
return false, nil
|
||||
}
|
||||
// If we have a previous snapshot,
|
||||
// and it didn't have an error, save.
|
||||
// This means that we can have a max
|
||||
// of one consecutive snapshot with
|
||||
// an error.
|
||||
if prevSnapshot.Error.ValueOrZero() == "" {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func isContentIdentical(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) {
|
||||
@@ -299,11 +280,25 @@ func removeURL(ctx context.Context, tx *sqlx.Tx, url string) error {
|
||||
}
|
||||
|
||||
func saveSnapshotAndRemoveURL(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error {
|
||||
shouldUpdateSnapshot, err := shouldUpdateSnapshotData(ctx, tx, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if shouldUpdateSnapshot {
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
|
||||
err := gemdb.Database.SaveSnapshot(ctx, tx, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return gemdb.Database.DeleteURL(ctx, tx, s.URL.String())
|
||||
return removeURL(ctx, tx, s.URL.String())
|
||||
} else {
|
||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s (but old content exists, updating crawl date)", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
|
||||
err = gemdb.Database.UpdateLastCrawled(ctx, tx, s.URL.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return removeURL(ctx, tx, s.URL.String())
|
||||
}
|
||||
}
|
||||
|
||||
// shouldPersistURL returns true given URL is a
|
||||
|
||||
@@ -115,12 +115,7 @@ LIMIT $1
|
||||
SQL_UPDATE_LAST_CRAWLED = `
|
||||
UPDATE snapshots
|
||||
SET last_crawled = CURRENT_TIMESTAMP
|
||||
WHERE id = (
|
||||
SELECT id FROM snapshots
|
||||
WHERE url = $1
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT 1
|
||||
)
|
||||
`
|
||||
// SQL_FETCH_SNAPSHOTS_FROM_HISTORY Fetches URLs from snapshots for re-crawling based on last_crawled timestamp
|
||||
// This query finds root domain URLs that haven't been crawled recently and selects
|
||||
@@ -137,7 +132,7 @@ LIMIT $1
|
||||
host,
|
||||
COALESCE(MAX(last_crawled), '1970-01-01'::timestamp) as latest_attempt
|
||||
FROM snapshots
|
||||
WHERE url ~ '^gemini://[^/]+/?$' AND mimetype = 'text/gemini'
|
||||
WHERE url ~ '^gemini://[^/]+/?$' AND mimetype = 'text/gemini' AND error IS NULL
|
||||
GROUP BY url, host
|
||||
),
|
||||
root_urls_with_content AS (
|
||||
|
||||
Reference in New Issue
Block a user