Improve crawler performance and worker coordination

- Add WaitGroup synchronization for workers to prevent overlapping scheduler runs
- Increase history fetch multiplier and sleep intervals for better resource usage
- Simplify error handling and logging in worker processing
- Update SQL query to exclude error snapshots from history selection
- Fix worker ID variable reference in spawning loop
- Streamline snapshot update logic and error reporting

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
antanst
2025-06-19 09:59:50 +03:00
parent a74f29d7b0
commit 3bdff0e22e
4 changed files with 66 additions and 62 deletions

View File

@@ -27,7 +27,6 @@ import (
)
func RunWorkerWithTx(workerID int, job string) {
// Extract host from URL for the context.
parsedURL, err := url2.ParseURL(job, "", true)
if err != nil {
logging.LogInfo("Failed to parse URL: %s Error: %s", job, err)
@@ -40,7 +39,6 @@ func RunWorkerWithTx(workerID int, job string) {
ctx, cancel := contextutil.NewRequestContext(baseCtx, job, host, workerID)
ctx = contextutil.ContextWithComponent(ctx, "worker")
defer cancel() // Ensure the context is cancelled when we're done
// contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "======================================\n\n")
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Starting worker for URL %s", job)
// Create a new db transaction
@@ -51,6 +49,7 @@ func RunWorkerWithTx(workerID int, job string) {
}
err = runWorker(ctx, tx, []string{job})
WorkerWG.Done()
if err != nil {
// Two cases to handle:
// - context cancellation/timeout errors (log and ignore)
@@ -114,17 +113,11 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
s, err := snapshot.SnapshotFromURL(url, true)
if err != nil {
contextlog.LogErrorWithContext(ctx, logging.GetSlogger(), "Failed to parse URL: %v", err)
return err
}
// We always use the normalized URL
if url != s.URL.Full {
//err = gemdb.Database.CheckAndUpdateNormalizedURL(ctx, tx, url, s.URL.Full)
//if err != nil {
// return err
//}
//contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Normalized URL: %s → %s", url, s.URL.Full)
url = s.URL.Full
}
@@ -147,7 +140,7 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
// Only check blacklist if URL is not whitelisted
if !isUrlWhitelisted && blackList.IsBlacklisted(s.URL.String()) {
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches blacklist, ignoring %s", url)
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "URL matches blacklist, skipped")
s.Error = null.StringFrom(commonErrors.ErrBlacklistMatch.Error())
return saveSnapshotAndRemoveURL(ctx, tx, s)
}
@@ -159,7 +152,7 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
// add it as an error and remove url
robotMatch = robotsMatch.RobotMatch(ctx, s.URL.String())
if robotMatch {
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "URL matches robots.txt, skipping")
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "URL matches robots.txt, skipped")
s.Error = null.StringFrom(commonErrors.ErrRobotsMatch.Error())
return saveSnapshotAndRemoveURL(ctx, tx, s)
}
@@ -184,7 +177,6 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
}
if err != nil {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Error visiting URL: %v", err)
return err
}
@@ -223,43 +215,32 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
}
}
// Save the snapshot and remove the URL from the queue
if s.Error.ValueOrZero() != "" {
// Only save error if we didn't have any valid
// snapshot data from a previous crawl!
shouldUpdateSnapshot, err := shouldUpdateSnapshotData(ctx, tx, s)
if err != nil {
return err
}
if shouldUpdateSnapshot {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
return saveSnapshotAndRemoveURL(ctx, tx, s)
} else {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s (but old content exists, updating crawl date)", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
err = gemdb.Database.UpdateLastCrawled(ctx, tx, s.URL.String())
if err != nil {
return err
}
return removeURL(ctx, tx, s.URL.String())
}
} else {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d", s.ResponseCode.ValueOrZero())
return saveSnapshotAndRemoveURL(ctx, tx, s)
}
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d", s.ResponseCode.ValueOrZero())
return saveSnapshotAndRemoveURL(ctx, tx, s)
}
func shouldUpdateSnapshotData(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) {
// If we don't have an error, save the new snapshot.
if !s.Error.Valid {
return true, nil
}
prevSnapshot, err := gemdb.Database.GetLatestSnapshot(ctx, tx, s.URL.String())
if err != nil {
return false, err
}
// If we don't have a previous snapshot, save it anyway.
if prevSnapshot == nil {
return true, nil
}
if prevSnapshot.ResponseCode.Valid {
return false, nil
// If we have a previous snapshot,
// and it didn't have an error, save.
// This means that we can have a max
// of one consecutive snapshot with
// an error.
if prevSnapshot.Error.ValueOrZero() == "" {
return true, nil
}
return true, nil
return false, nil
}
func isContentIdentical(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) (bool, error) {
@@ -299,11 +280,25 @@ func removeURL(ctx context.Context, tx *sqlx.Tx, url string) error {
}
func saveSnapshotAndRemoveURL(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error {
err := gemdb.Database.SaveSnapshot(ctx, tx, s)
shouldUpdateSnapshot, err := shouldUpdateSnapshotData(ctx, tx, s)
if err != nil {
return err
}
return gemdb.Database.DeleteURL(ctx, tx, s.URL.String())
if shouldUpdateSnapshot {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
err := gemdb.Database.SaveSnapshot(ctx, tx, s)
if err != nil {
return err
}
return removeURL(ctx, tx, s.URL.String())
} else {
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "%2d %s (but old content exists, updating crawl date)", s.ResponseCode.ValueOrZero(), s.Error.ValueOrZero())
err = gemdb.Database.UpdateLastCrawled(ctx, tx, s.URL.String())
if err != nil {
return err
}
return removeURL(ctx, tx, s.URL.String())
}
}
// shouldPersistURL returns true given URL is a