diff --git a/cmd/crawler/crawler.go b/cmd/crawler/crawler.go new file mode 100644 index 0000000..d011894 --- /dev/null +++ b/cmd/crawler/crawler.go @@ -0,0 +1,512 @@ +package main + +import ( + "context" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "gemini-grc/common" + "gemini-grc/common/blackList" + "gemini-grc/common/contextlog" + "gemini-grc/common/seedList" + "gemini-grc/common/whiteList" + "gemini-grc/config" + "gemini-grc/contextutil" + gemdb "gemini-grc/db" + "gemini-grc/robotsMatch" + "gemini-grc/util" + "git.antanst.com/antanst/logging" + "github.com/jmoiron/sqlx" +) + +var jobs chan string + +func main() { + var err error + + err = initializeApp() + if err != nil { + handleUnexpectedError(err) + } + + err = runApp() + if err != nil { + handleUnexpectedError(err) + } + + err = shutdownApp() + if err != nil { + handleUnexpectedError(err) + } +} + +func handleUnexpectedError(err error) { + logging.LogError("Unexpected error: %v", err) + _ = shutdownApp() + os.Exit(1) +} + +func initializeApp() error { + config.CONFIG = *config.Initialize() + logging.InitSlogger(config.CONFIG.LogLevel) + + logging.LogInfo("Starting up. Press Ctrl+C to exit") + common.SignalsChan = make(chan os.Signal, 1) + signal.Notify(common.SignalsChan, syscall.SIGINT, syscall.SIGTERM) + common.FatalErrorsChan = make(chan error) + jobs = make(chan string, config.CONFIG.NumOfWorkers) + + var err error + + err = blackList.Initialize() + if err != nil { + return err + } + + err = whiteList.Initialize() + if err != nil { + return err + } + + err = seedList.Initialize() + if err != nil { + return err + } + + err = robotsMatch.Initialize() + if err != nil { + return err + } + + ctx := context.Background() + err = gemdb.Database.Initialize(ctx) + if err != nil { + return err + } + + if config.CONFIG.SeedUrlPath != "" { + err := AddURLsFromFile(ctx, config.CONFIG.SeedUrlPath) + if err != nil { + return err + } + } + + return nil +} + +func shutdownApp() error { + var err error + + err = blackList.Shutdown() + if err != nil { + return err + } + + err = whiteList.Shutdown() + if err != nil { + return err + } + + err = seedList.Shutdown() + if err != nil { + return err + } + + err = robotsMatch.Shutdown() + if err != nil { + return err + } + + ctx := context.Background() + err = gemdb.Database.Shutdown(ctx) + if err != nil { + return err + } + + return nil +} + +func runApp() (err error) { + go spawnWorkers(config.CONFIG.NumOfWorkers) + go runJobScheduler() + for { + select { + case <-common.SignalsChan: + logging.LogWarn("Received SIGINT or SIGTERM signal, exiting") + return nil + case err := <-common.FatalErrorsChan: + return err + } + } +} + +func spawnWorkers(total int) { + for id := 0; id < total; id++ { + go func(a int) { + for { + job := <-jobs + common.RunWorkerWithTx(id, job) + } + }(id) + } +} + +// Current Logic Flow: +// +// 1. Create transaction +// 2. Get distinct hosts +// 3. If no hosts → fetch snapshots from history (adds URLs to queue) +// 4. Re-query for hosts (should now have some) +// 5. Get URLs from hosts +// 6. Commit transaction +// 7. Queue URLs for workers +func runJobScheduler() { + var tx *sqlx.Tx + var err error + + ctx := contextutil.ContextWithComponent(context.Background(), "crawler") + tx, err = gemdb.Database.NewTx(ctx) + if err != nil { + common.FatalErrorsChan <- err + return + } + + defer func(tx *sqlx.Tx) { + if tx != nil { + if err := gemdb.SafeRollback(ctx, tx); err != nil { + common.FatalErrorsChan <- err + } + } + }(tx) + + // First, check if the URLs table is empty. + var urlCount int + + if config.CONFIG.GopherEnable { + err = tx.Get(&urlCount, "SELECT COUNT(*) FROM urls") + } else { + err = tx.Get(&urlCount, "SELECT COUNT(*) FROM urls WHERE url LIKE 'gemini://%'") + } + if err != nil { + common.FatalErrorsChan <- err + return + } + + err = tx.Commit() + if err != nil { + common.FatalErrorsChan <- err + return + } + + // If no pending URLs, add the ones from the standard crawl set. + tx, err = gemdb.Database.NewTx(ctx) + if err != nil { + common.FatalErrorsChan <- err + return + } + + if urlCount == 0 { + logging.LogInfo("URLs table is empty, enqueueing standard crawl set") + err = enqueueSeedURLs(ctx, tx) + if err != nil { + common.FatalErrorsChan <- err + return + } + // Commit this tx here so the loop sees the changes. + err := tx.Commit() + if err != nil { + common.FatalErrorsChan <- err + return + } + } else { + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Found %d pending URLs to crawl.", urlCount) + } + + // Main job loop. + // We get URLs from the pending URLs table, + // add crawling jobs for those, + // and sleep a bit after each run. + for { + contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Polling DB for jobs") + + // Use fresh context for DB operations to avoid timeouts/cancellation + // from the long-lived scheduler context affecting database transactions + dbCtx := context.Background() + tx, err = gemdb.Database.NewTx(dbCtx) + if err != nil { + common.FatalErrorsChan <- err + return + } + + // Get all distinct hosts from pending URLs + distinctHosts, err := gemdb.Database.GetUrlHosts(dbCtx, tx) + if err != nil { + common.FatalErrorsChan <- err + return + } + + // When out of pending URLs, add some random ones. + if len(distinctHosts) == 0 { + // Queue random old URLs from history. + count, err := fetchSnapshotsFromHistory(dbCtx, tx, config.CONFIG.NumOfWorkers*3, config.CONFIG.SkipIfUpdatedDays) + if err != nil { + common.FatalErrorsChan <- err + return + } + if count == 0 { + contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...") + time.Sleep(30 * time.Second) + continue + } + distinctHosts, err = gemdb.Database.GetUrlHosts(dbCtx, tx) + if err != nil { + common.FatalErrorsChan <- err + return + } + } + + // Get some URLs from each host, up to a limit + urls, err := gemdb.Database.GetRandomUrlsFromHosts(dbCtx, distinctHosts, 10, tx) + if err != nil { + common.FatalErrorsChan <- err + return + } + + err = tx.Commit() + if err != nil { + common.FatalErrorsChan <- err + return + } + + if len(urls) == 0 { + contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...") + time.Sleep(30 * time.Second) + continue + } + + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Queueing %d distinct hosts -> %d urls to crawl", len(distinctHosts), len(urls)) + for _, url := range urls { + jobs <- url + } + } +} + +func enqueueSeedURLs(ctx context.Context, tx *sqlx.Tx) error { + // Get seed URLs from seedList module + urls := seedList.GetSeedURLs() + + for _, url := range urls { + err := gemdb.Database.InsertURL(ctx, tx, url) + if err != nil { + return err + } + } + return nil +} + +//func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age int) (int, error) { +// // Select snapshots from snapshots table for recrawling +// // They should be at least days old, random order, +// // one snapshot per host if possible. +// historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory") +// contextlog.LogDebugWithContext(historyCtx, logging.GetSlogger(), "Looking for %d snapshots at least %d days old to recrawl", num, age) +// +// // Calculate the cutoff date +// cutoffDate := time.Now().AddDate(0, 0, -age) +// +// // SQL to select one random URL per host from snapshots older than the cutoff date +// // TODO implement when gopher enabled +// query := ` +// WITH hosts AS ( +// SELECT DISTINCT host +// FROM snapshots +// WHERE url ~ '^gemini://[^/]+/?$' +// AND gemtext IS NOT NULL +// AND timestamp < $1 +// AND error IS NULL +// ), +// ranked_snapshots AS ( +// SELECT +// s.url, +// s.host, +// s.timestamp, +// ROW_NUMBER() OVER (PARTITION BY s.host ORDER BY RANDOM()) as rank +// FROM snapshots s +// JOIN hosts h ON s.host = h.host +// WHERE s.timestamp < $1 +// ) +// SELECT url, host +// FROM ranked_snapshots +// WHERE rank = 1 +// ORDER BY RANDOM() +// LIMIT $2 +// ` +// +// type SnapshotURL struct { +// URL string `db:"url"` +// Host string `db:"host"` +// } +// +// // Execute the query +// var snapshotURLs []SnapshotURL +// err := tx.Select(&snapshotURLs, query, cutoffDate, num) +// if err != nil { +// return 0, err +// } +// +// if len(snapshotURLs) == 0 { +// historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory") +// contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No old snapshots found to recrawl") +// return 0, nil +// } +// +// // For each selected snapshot, add the URL to the urls table +// insertCount := 0 +// for _, snapshot := range snapshotURLs { +// err := gemdb.Database.InsertURL(ctx, tx, snapshot.URL) +// if err != nil { +// logging.LogError("Error inserting URL %s from old snapshot to queue: %v", snapshot.URL, err) +// return 0, err +// } +// insertCount++ +// } +// +// // Note: The transaction is committed by the caller (runJobScheduler), +// // not here. This function is called as part of a larger transaction. +// if insertCount > 0 { +// historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory") +// contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "Added %d old URLs to recrawl queue", insertCount) +// } +// +// return insertCount, nil +//} + +func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age int) (int, error) { + // Select snapshots from snapshots table for recrawling + // Find URLs where the LATEST crawl attempt (via last_crawled) is at least days old + // Now works correctly with SkipIdenticalContent=true - fixes infinite recrawl loop + historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory") + contextlog.LogDebugWithContext(historyCtx, logging.GetSlogger(), "Looking for %d URLs whose latest crawl attempt is at least %d days old to recrawl", num, age) + + // Calculate the cutoff date + cutoffDate := time.Now().AddDate(0, 0, -age) + + // SQL to select URLs where the latest crawl attempt is older than the cutoff date + // Now uses last_crawled timestamp to track actual crawl attempts + // Works correctly with SkipIdenticalContent setting - avoids infinite recrawl loop + // One URL per host for diversity, focusing on root domain URLs + query := ` + WITH latest_attempts AS ( + SELECT + url, + host, + COALESCE(MAX(last_crawled), '1970-01-01'::timestamp) as latest_attempt + FROM snapshots + WHERE url ~ '^gemini://[^/]+/?$' + GROUP BY url, host + ), + root_urls_with_content AS ( + SELECT DISTINCT + la.url, + la.host, + la.latest_attempt + FROM latest_attempts la + JOIN snapshots s ON s.url = la.url + WHERE (s.gemtext IS NOT NULL OR s.data IS NOT NULL) + AND s.response_code BETWEEN 20 AND 29 + ), + eligible_urls AS ( + SELECT + url, + host, + latest_attempt + FROM root_urls_with_content + WHERE latest_attempt < $1 + ), + ranked_urls AS ( + SELECT + url, + host, + latest_attempt, + ROW_NUMBER() OVER (PARTITION BY host ORDER BY RANDOM()) as rank + FROM eligible_urls + ) + SELECT url, host + FROM ranked_urls + WHERE rank = 1 + ORDER BY RANDOM() + LIMIT $2 + ` + + type SnapshotURL struct { + URL string `db:"url"` + Host string `db:"host"` + } + + // Execute the query + var snapshotURLs []SnapshotURL + err := tx.Select(&snapshotURLs, query, cutoffDate, num) + if err != nil { + return 0, err + } + + if len(snapshotURLs) == 0 { + contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No URLs with old latest crawl attempts found to recrawl") + return 0, nil + } + + // For each selected snapshot, add the URL to the urls table + insertCount := 0 + for _, snapshot := range snapshotURLs { + err := gemdb.Database.InsertURL(ctx, tx, snapshot.URL) + if err != nil { + logging.LogError("Error inserting URL %s from old snapshot to queue: %v", snapshot.URL, err) + return 0, err + } + insertCount++ + } + + // Note: The transaction is committed by the caller (runJobScheduler), + // not here. This function is called as part of a larger transaction. + if insertCount > 0 { + contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "Added %d old URLs to recrawl queue", insertCount) + } + + return insertCount, nil +} + +func AddURLsFromFile(ctx context.Context, filepath string) error { + data, err := os.ReadFile(filepath) + if err != nil { + return err + } + lines := strings.Split(string(data), "\n") + urls := util.Filter(lines, func(url string) bool { + return strings.TrimSpace(url) != "" + }) + + // Create a context for database operations + tx, err := gemdb.Database.NewTx(ctx) + if err != nil { + return err + } + + // Insert all the URLs + for _, url := range urls { + fileCtx := contextutil.ContextWithComponent(context.Background(), "AddURLsFromFile") + contextlog.LogInfoWithContext(fileCtx, logging.GetSlogger(), "Adding %s to queue", url) + err := gemdb.Database.InsertURL(ctx, tx, url) + if err != nil { + return err + } + } + + err = tx.Commit() + if err != nil { + return err + } + return nil +} diff --git a/common/snapshot/snapshot.go b/common/snapshot/snapshot.go index ec3580d..d8f434a 100644 --- a/common/snapshot/snapshot.go +++ b/common/snapshot/snapshot.go @@ -20,8 +20,9 @@ type Snapshot struct { Header null.String `db:"header" json:"header,omitempty"` // Response header. Links null.Value[linkList.LinkList] `db:"links" json:"links,omitempty"` Lang null.String `db:"lang" json:"lang,omitempty"` - ResponseCode null.Int `db:"response_code" json:"code,omitempty"` // Gemini response Status code. - Error null.String `db:"error" json:"error,omitempty"` // On network errors only + ResponseCode null.Int `db:"response_code" json:"code,omitempty"` // Gemini response Status code. + Error null.String `db:"error" json:"error,omitempty"` // On network errors only + LastCrawled null.Time `db:"last_crawled" json:"last_crawled,omitempty"` // When URL was last processed (regardless of content changes) } func SnapshotFromURL(u string, normalize bool) (*Snapshot, error) { diff --git a/common/worker.go b/common/worker.go index 0ef4a53..4cf95fd 100644 --- a/common/worker.go +++ b/common/worker.go @@ -202,7 +202,12 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) { return err } if skipIdentical { - contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Content identical to existing snapshot, skipping") + contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Content identical to existing snapshot, recording crawl attempt") + // Record the crawl attempt to track that we processed this URL + err = gemdb.Database.RecordCrawlAttempt(ctx, tx, s) + if err != nil { + return err + } return removeURL(ctx, tx, s.URL.String()) } diff --git a/db/db.go b/db/db.go index bcd428b..6cdf6c2 100644 --- a/db/db.go +++ b/db/db.go @@ -41,6 +41,7 @@ type DbService interface { // Snapshot methods SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error + RecordCrawlAttempt(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error) GetSnapshotAtTimestamp(ctx context.Context, tx *sqlx.Tx, url string, timestamp time.Time) (*snapshot.Snapshot, error) GetAllSnapshotsForURL(ctx context.Context, tx *sqlx.Tx, url string) ([]*snapshot.Snapshot, error) @@ -387,6 +388,7 @@ func (d *DbServiceImpl) SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapsh // Always ensure we have a current timestamp s.Timestamp = null.TimeFrom(time.Now()) + // last_crawled will be set automatically by database DEFAULT // For PostgreSQL, use the global sqlx.NamedQueryContext function // The SQL_INSERT_SNAPSHOT already has a RETURNING id clause @@ -421,6 +423,31 @@ func (d *DbServiceImpl) OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *s return d.SaveSnapshot(ctx, tx, s) } +// RecordCrawlAttempt records a crawl attempt without saving full content (when content is identical) +func (d *DbServiceImpl) RecordCrawlAttempt(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error { + dbCtx := contextutil.ContextWithComponent(ctx, "database") + contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Recording crawl attempt for URL %s", s.URL.String()) + + // Check if the context is cancelled before proceeding + if err := ctx.Err(); err != nil { + return err + } + + // Record the crawl attempt with minimal data + // timestamp and last_crawled will be set automatically by database DEFAULT + _, err := tx.ExecContext(ctx, SQL_RECORD_CRAWL_ATTEMPT, + s.URL.String(), + s.Host, + s.MimeType.String, + s.ResponseCode.ValueOrZero(), + s.Error.String) + if err != nil { + return xerrors.NewError(fmt.Errorf("cannot record crawl attempt for URL %s: %w", s.URL.String(), err), 0, "", true) + } + + return nil +} + // GetLatestSnapshot gets the latest snapshot with context func (d *DbServiceImpl) GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error) { dbCtx := contextutil.ContextWithComponent(ctx, "database") diff --git a/db/db_queries.go b/db/db_queries.go index 12ebb6b..f3d2100 100644 --- a/db/db_queries.go +++ b/db/db_queries.go @@ -94,7 +94,8 @@ links = :links, lang = :lang, response_code = :response_code, error = :error, -header = :header +header = :header, +last_crawled = CURRENT_TIMESTAMP WHERE id = :id RETURNING id ` @@ -139,4 +140,9 @@ RETURNING id AND timestamp BETWEEN $2 AND $3 ORDER BY timestamp DESC ` + // New query to record crawl attempt when content is identical (no new snapshot needed) + SQL_RECORD_CRAWL_ATTEMPT = ` + INSERT INTO snapshots (url, host, mimetype, response_code, error) + VALUES ($1, $2, $3, $4, $5) + ` )