Fix infinite recrawl loop with skip-identical-content
Add last_crawled timestamp tracking to fix fetchSnapshotsFromHistory() infinite loop when SkipIdenticalContent=true. Now tracks actual crawl attempts separately from content changes via database DEFAULT timestamps.
This commit is contained in:
512
cmd/crawler/crawler.go
Normal file
512
cmd/crawler/crawler.go
Normal file
@@ -0,0 +1,512 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gemini-grc/common"
|
||||||
|
"gemini-grc/common/blackList"
|
||||||
|
"gemini-grc/common/contextlog"
|
||||||
|
"gemini-grc/common/seedList"
|
||||||
|
"gemini-grc/common/whiteList"
|
||||||
|
"gemini-grc/config"
|
||||||
|
"gemini-grc/contextutil"
|
||||||
|
gemdb "gemini-grc/db"
|
||||||
|
"gemini-grc/robotsMatch"
|
||||||
|
"gemini-grc/util"
|
||||||
|
"git.antanst.com/antanst/logging"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
)
|
||||||
|
|
||||||
|
var jobs chan string
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
err = initializeApp()
|
||||||
|
if err != nil {
|
||||||
|
handleUnexpectedError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = runApp()
|
||||||
|
if err != nil {
|
||||||
|
handleUnexpectedError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = shutdownApp()
|
||||||
|
if err != nil {
|
||||||
|
handleUnexpectedError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleUnexpectedError(err error) {
|
||||||
|
logging.LogError("Unexpected error: %v", err)
|
||||||
|
_ = shutdownApp()
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initializeApp() error {
|
||||||
|
config.CONFIG = *config.Initialize()
|
||||||
|
logging.InitSlogger(config.CONFIG.LogLevel)
|
||||||
|
|
||||||
|
logging.LogInfo("Starting up. Press Ctrl+C to exit")
|
||||||
|
common.SignalsChan = make(chan os.Signal, 1)
|
||||||
|
signal.Notify(common.SignalsChan, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
common.FatalErrorsChan = make(chan error)
|
||||||
|
jobs = make(chan string, config.CONFIG.NumOfWorkers)
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
err = blackList.Initialize()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = whiteList.Initialize()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = seedList.Initialize()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = robotsMatch.Initialize()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
err = gemdb.Database.Initialize(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.CONFIG.SeedUrlPath != "" {
|
||||||
|
err := AddURLsFromFile(ctx, config.CONFIG.SeedUrlPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func shutdownApp() error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
err = blackList.Shutdown()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = whiteList.Shutdown()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = seedList.Shutdown()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = robotsMatch.Shutdown()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
err = gemdb.Database.Shutdown(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func runApp() (err error) {
|
||||||
|
go spawnWorkers(config.CONFIG.NumOfWorkers)
|
||||||
|
go runJobScheduler()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-common.SignalsChan:
|
||||||
|
logging.LogWarn("Received SIGINT or SIGTERM signal, exiting")
|
||||||
|
return nil
|
||||||
|
case err := <-common.FatalErrorsChan:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func spawnWorkers(total int) {
|
||||||
|
for id := 0; id < total; id++ {
|
||||||
|
go func(a int) {
|
||||||
|
for {
|
||||||
|
job := <-jobs
|
||||||
|
common.RunWorkerWithTx(id, job)
|
||||||
|
}
|
||||||
|
}(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Current Logic Flow:
|
||||||
|
//
|
||||||
|
// 1. Create transaction
|
||||||
|
// 2. Get distinct hosts
|
||||||
|
// 3. If no hosts → fetch snapshots from history (adds URLs to queue)
|
||||||
|
// 4. Re-query for hosts (should now have some)
|
||||||
|
// 5. Get URLs from hosts
|
||||||
|
// 6. Commit transaction
|
||||||
|
// 7. Queue URLs for workers
|
||||||
|
func runJobScheduler() {
|
||||||
|
var tx *sqlx.Tx
|
||||||
|
var err error
|
||||||
|
|
||||||
|
ctx := contextutil.ContextWithComponent(context.Background(), "crawler")
|
||||||
|
tx, err = gemdb.Database.NewTx(ctx)
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func(tx *sqlx.Tx) {
|
||||||
|
if tx != nil {
|
||||||
|
if err := gemdb.SafeRollback(ctx, tx); err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(tx)
|
||||||
|
|
||||||
|
// First, check if the URLs table is empty.
|
||||||
|
var urlCount int
|
||||||
|
|
||||||
|
if config.CONFIG.GopherEnable {
|
||||||
|
err = tx.Get(&urlCount, "SELECT COUNT(*) FROM urls")
|
||||||
|
} else {
|
||||||
|
err = tx.Get(&urlCount, "SELECT COUNT(*) FROM urls WHERE url LIKE 'gemini://%'")
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no pending URLs, add the ones from the standard crawl set.
|
||||||
|
tx, err = gemdb.Database.NewTx(ctx)
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if urlCount == 0 {
|
||||||
|
logging.LogInfo("URLs table is empty, enqueueing standard crawl set")
|
||||||
|
err = enqueueSeedURLs(ctx, tx)
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Commit this tx here so the loop sees the changes.
|
||||||
|
err := tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Found %d pending URLs to crawl.", urlCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Main job loop.
|
||||||
|
// We get URLs from the pending URLs table,
|
||||||
|
// add crawling jobs for those,
|
||||||
|
// and sleep a bit after each run.
|
||||||
|
for {
|
||||||
|
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "Polling DB for jobs")
|
||||||
|
|
||||||
|
// Use fresh context for DB operations to avoid timeouts/cancellation
|
||||||
|
// from the long-lived scheduler context affecting database transactions
|
||||||
|
dbCtx := context.Background()
|
||||||
|
tx, err = gemdb.Database.NewTx(dbCtx)
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get all distinct hosts from pending URLs
|
||||||
|
distinctHosts, err := gemdb.Database.GetUrlHosts(dbCtx, tx)
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// When out of pending URLs, add some random ones.
|
||||||
|
if len(distinctHosts) == 0 {
|
||||||
|
// Queue random old URLs from history.
|
||||||
|
count, err := fetchSnapshotsFromHistory(dbCtx, tx, config.CONFIG.NumOfWorkers*3, config.CONFIG.SkipIfUpdatedDays)
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if count == 0 {
|
||||||
|
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
distinctHosts, err = gemdb.Database.GetUrlHosts(dbCtx, tx)
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get some URLs from each host, up to a limit
|
||||||
|
urls, err := gemdb.Database.GetRandomUrlsFromHosts(dbCtx, distinctHosts, 10, tx)
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
common.FatalErrorsChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(urls) == 0 {
|
||||||
|
contextlog.LogDebugWithContext(ctx, logging.GetSlogger(), "No work, waiting to poll DB...")
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Queueing %d distinct hosts -> %d urls to crawl", len(distinctHosts), len(urls))
|
||||||
|
for _, url := range urls {
|
||||||
|
jobs <- url
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func enqueueSeedURLs(ctx context.Context, tx *sqlx.Tx) error {
|
||||||
|
// Get seed URLs from seedList module
|
||||||
|
urls := seedList.GetSeedURLs()
|
||||||
|
|
||||||
|
for _, url := range urls {
|
||||||
|
err := gemdb.Database.InsertURL(ctx, tx, url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age int) (int, error) {
|
||||||
|
// // Select <num> snapshots from snapshots table for recrawling
|
||||||
|
// // They should be at least <age> days old, random order,
|
||||||
|
// // one snapshot per host if possible.
|
||||||
|
// historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory")
|
||||||
|
// contextlog.LogDebugWithContext(historyCtx, logging.GetSlogger(), "Looking for %d snapshots at least %d days old to recrawl", num, age)
|
||||||
|
//
|
||||||
|
// // Calculate the cutoff date
|
||||||
|
// cutoffDate := time.Now().AddDate(0, 0, -age)
|
||||||
|
//
|
||||||
|
// // SQL to select one random URL per host from snapshots older than the cutoff date
|
||||||
|
// // TODO implement when gopher enabled
|
||||||
|
// query := `
|
||||||
|
// WITH hosts AS (
|
||||||
|
// SELECT DISTINCT host
|
||||||
|
// FROM snapshots
|
||||||
|
// WHERE url ~ '^gemini://[^/]+/?$'
|
||||||
|
// AND gemtext IS NOT NULL
|
||||||
|
// AND timestamp < $1
|
||||||
|
// AND error IS NULL
|
||||||
|
// ),
|
||||||
|
// ranked_snapshots AS (
|
||||||
|
// SELECT
|
||||||
|
// s.url,
|
||||||
|
// s.host,
|
||||||
|
// s.timestamp,
|
||||||
|
// ROW_NUMBER() OVER (PARTITION BY s.host ORDER BY RANDOM()) as rank
|
||||||
|
// FROM snapshots s
|
||||||
|
// JOIN hosts h ON s.host = h.host
|
||||||
|
// WHERE s.timestamp < $1
|
||||||
|
// )
|
||||||
|
// SELECT url, host
|
||||||
|
// FROM ranked_snapshots
|
||||||
|
// WHERE rank = 1
|
||||||
|
// ORDER BY RANDOM()
|
||||||
|
// LIMIT $2
|
||||||
|
// `
|
||||||
|
//
|
||||||
|
// type SnapshotURL struct {
|
||||||
|
// URL string `db:"url"`
|
||||||
|
// Host string `db:"host"`
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Execute the query
|
||||||
|
// var snapshotURLs []SnapshotURL
|
||||||
|
// err := tx.Select(&snapshotURLs, query, cutoffDate, num)
|
||||||
|
// if err != nil {
|
||||||
|
// return 0, err
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if len(snapshotURLs) == 0 {
|
||||||
|
// historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory")
|
||||||
|
// contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No old snapshots found to recrawl")
|
||||||
|
// return 0, nil
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // For each selected snapshot, add the URL to the urls table
|
||||||
|
// insertCount := 0
|
||||||
|
// for _, snapshot := range snapshotURLs {
|
||||||
|
// err := gemdb.Database.InsertURL(ctx, tx, snapshot.URL)
|
||||||
|
// if err != nil {
|
||||||
|
// logging.LogError("Error inserting URL %s from old snapshot to queue: %v", snapshot.URL, err)
|
||||||
|
// return 0, err
|
||||||
|
// }
|
||||||
|
// insertCount++
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Note: The transaction is committed by the caller (runJobScheduler),
|
||||||
|
// // not here. This function is called as part of a larger transaction.
|
||||||
|
// if insertCount > 0 {
|
||||||
|
// historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory")
|
||||||
|
// contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "Added %d old URLs to recrawl queue", insertCount)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return insertCount, nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
func fetchSnapshotsFromHistory(ctx context.Context, tx *sqlx.Tx, num int, age int) (int, error) {
|
||||||
|
// Select <num> snapshots from snapshots table for recrawling
|
||||||
|
// Find URLs where the LATEST crawl attempt (via last_crawled) is at least <age> days old
|
||||||
|
// Now works correctly with SkipIdenticalContent=true - fixes infinite recrawl loop
|
||||||
|
historyCtx := contextutil.ContextWithComponent(context.Background(), "fetchSnapshotsFromHistory")
|
||||||
|
contextlog.LogDebugWithContext(historyCtx, logging.GetSlogger(), "Looking for %d URLs whose latest crawl attempt is at least %d days old to recrawl", num, age)
|
||||||
|
|
||||||
|
// Calculate the cutoff date
|
||||||
|
cutoffDate := time.Now().AddDate(0, 0, -age)
|
||||||
|
|
||||||
|
// SQL to select URLs where the latest crawl attempt is older than the cutoff date
|
||||||
|
// Now uses last_crawled timestamp to track actual crawl attempts
|
||||||
|
// Works correctly with SkipIdenticalContent setting - avoids infinite recrawl loop
|
||||||
|
// One URL per host for diversity, focusing on root domain URLs
|
||||||
|
query := `
|
||||||
|
WITH latest_attempts AS (
|
||||||
|
SELECT
|
||||||
|
url,
|
||||||
|
host,
|
||||||
|
COALESCE(MAX(last_crawled), '1970-01-01'::timestamp) as latest_attempt
|
||||||
|
FROM snapshots
|
||||||
|
WHERE url ~ '^gemini://[^/]+/?$'
|
||||||
|
GROUP BY url, host
|
||||||
|
),
|
||||||
|
root_urls_with_content AS (
|
||||||
|
SELECT DISTINCT
|
||||||
|
la.url,
|
||||||
|
la.host,
|
||||||
|
la.latest_attempt
|
||||||
|
FROM latest_attempts la
|
||||||
|
JOIN snapshots s ON s.url = la.url
|
||||||
|
WHERE (s.gemtext IS NOT NULL OR s.data IS NOT NULL)
|
||||||
|
AND s.response_code BETWEEN 20 AND 29
|
||||||
|
),
|
||||||
|
eligible_urls AS (
|
||||||
|
SELECT
|
||||||
|
url,
|
||||||
|
host,
|
||||||
|
latest_attempt
|
||||||
|
FROM root_urls_with_content
|
||||||
|
WHERE latest_attempt < $1
|
||||||
|
),
|
||||||
|
ranked_urls AS (
|
||||||
|
SELECT
|
||||||
|
url,
|
||||||
|
host,
|
||||||
|
latest_attempt,
|
||||||
|
ROW_NUMBER() OVER (PARTITION BY host ORDER BY RANDOM()) as rank
|
||||||
|
FROM eligible_urls
|
||||||
|
)
|
||||||
|
SELECT url, host
|
||||||
|
FROM ranked_urls
|
||||||
|
WHERE rank = 1
|
||||||
|
ORDER BY RANDOM()
|
||||||
|
LIMIT $2
|
||||||
|
`
|
||||||
|
|
||||||
|
type SnapshotURL struct {
|
||||||
|
URL string `db:"url"`
|
||||||
|
Host string `db:"host"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the query
|
||||||
|
var snapshotURLs []SnapshotURL
|
||||||
|
err := tx.Select(&snapshotURLs, query, cutoffDate, num)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(snapshotURLs) == 0 {
|
||||||
|
contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "No URLs with old latest crawl attempts found to recrawl")
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// For each selected snapshot, add the URL to the urls table
|
||||||
|
insertCount := 0
|
||||||
|
for _, snapshot := range snapshotURLs {
|
||||||
|
err := gemdb.Database.InsertURL(ctx, tx, snapshot.URL)
|
||||||
|
if err != nil {
|
||||||
|
logging.LogError("Error inserting URL %s from old snapshot to queue: %v", snapshot.URL, err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
insertCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: The transaction is committed by the caller (runJobScheduler),
|
||||||
|
// not here. This function is called as part of a larger transaction.
|
||||||
|
if insertCount > 0 {
|
||||||
|
contextlog.LogInfoWithContext(historyCtx, logging.GetSlogger(), "Added %d old URLs to recrawl queue", insertCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
return insertCount, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddURLsFromFile(ctx context.Context, filepath string) error {
|
||||||
|
data, err := os.ReadFile(filepath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
lines := strings.Split(string(data), "\n")
|
||||||
|
urls := util.Filter(lines, func(url string) bool {
|
||||||
|
return strings.TrimSpace(url) != ""
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create a context for database operations
|
||||||
|
tx, err := gemdb.Database.NewTx(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert all the URLs
|
||||||
|
for _, url := range urls {
|
||||||
|
fileCtx := contextutil.ContextWithComponent(context.Background(), "AddURLsFromFile")
|
||||||
|
contextlog.LogInfoWithContext(fileCtx, logging.GetSlogger(), "Adding %s to queue", url)
|
||||||
|
err := gemdb.Database.InsertURL(ctx, tx, url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -20,8 +20,9 @@ type Snapshot struct {
|
|||||||
Header null.String `db:"header" json:"header,omitempty"` // Response header.
|
Header null.String `db:"header" json:"header,omitempty"` // Response header.
|
||||||
Links null.Value[linkList.LinkList] `db:"links" json:"links,omitempty"`
|
Links null.Value[linkList.LinkList] `db:"links" json:"links,omitempty"`
|
||||||
Lang null.String `db:"lang" json:"lang,omitempty"`
|
Lang null.String `db:"lang" json:"lang,omitempty"`
|
||||||
ResponseCode null.Int `db:"response_code" json:"code,omitempty"` // Gemini response Status code.
|
ResponseCode null.Int `db:"response_code" json:"code,omitempty"` // Gemini response Status code.
|
||||||
Error null.String `db:"error" json:"error,omitempty"` // On network errors only
|
Error null.String `db:"error" json:"error,omitempty"` // On network errors only
|
||||||
|
LastCrawled null.Time `db:"last_crawled" json:"last_crawled,omitempty"` // When URL was last processed (regardless of content changes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func SnapshotFromURL(u string, normalize bool) (*Snapshot, error) {
|
func SnapshotFromURL(u string, normalize bool) (*Snapshot, error) {
|
||||||
|
|||||||
@@ -202,7 +202,12 @@ func WorkOnUrl(ctx context.Context, tx *sqlx.Tx, url string) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if skipIdentical {
|
if skipIdentical {
|
||||||
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Content identical to existing snapshot, skipping")
|
contextlog.LogInfoWithContext(ctx, logging.GetSlogger(), "Content identical to existing snapshot, recording crawl attempt")
|
||||||
|
// Record the crawl attempt to track that we processed this URL
|
||||||
|
err = gemdb.Database.RecordCrawlAttempt(ctx, tx, s)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return removeURL(ctx, tx, s.URL.String())
|
return removeURL(ctx, tx, s.URL.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
27
db/db.go
27
db/db.go
@@ -41,6 +41,7 @@ type DbService interface {
|
|||||||
// Snapshot methods
|
// Snapshot methods
|
||||||
SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
||||||
OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
||||||
|
RecordCrawlAttempt(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error
|
||||||
GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error)
|
GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error)
|
||||||
GetSnapshotAtTimestamp(ctx context.Context, tx *sqlx.Tx, url string, timestamp time.Time) (*snapshot.Snapshot, error)
|
GetSnapshotAtTimestamp(ctx context.Context, tx *sqlx.Tx, url string, timestamp time.Time) (*snapshot.Snapshot, error)
|
||||||
GetAllSnapshotsForURL(ctx context.Context, tx *sqlx.Tx, url string) ([]*snapshot.Snapshot, error)
|
GetAllSnapshotsForURL(ctx context.Context, tx *sqlx.Tx, url string) ([]*snapshot.Snapshot, error)
|
||||||
@@ -387,6 +388,7 @@ func (d *DbServiceImpl) SaveSnapshot(ctx context.Context, tx *sqlx.Tx, s *snapsh
|
|||||||
|
|
||||||
// Always ensure we have a current timestamp
|
// Always ensure we have a current timestamp
|
||||||
s.Timestamp = null.TimeFrom(time.Now())
|
s.Timestamp = null.TimeFrom(time.Now())
|
||||||
|
// last_crawled will be set automatically by database DEFAULT
|
||||||
|
|
||||||
// For PostgreSQL, use the global sqlx.NamedQueryContext function
|
// For PostgreSQL, use the global sqlx.NamedQueryContext function
|
||||||
// The SQL_INSERT_SNAPSHOT already has a RETURNING id clause
|
// The SQL_INSERT_SNAPSHOT already has a RETURNING id clause
|
||||||
@@ -421,6 +423,31 @@ func (d *DbServiceImpl) OverwriteSnapshot(ctx context.Context, tx *sqlx.Tx, s *s
|
|||||||
return d.SaveSnapshot(ctx, tx, s)
|
return d.SaveSnapshot(ctx, tx, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RecordCrawlAttempt records a crawl attempt without saving full content (when content is identical)
|
||||||
|
func (d *DbServiceImpl) RecordCrawlAttempt(ctx context.Context, tx *sqlx.Tx, s *snapshot.Snapshot) error {
|
||||||
|
dbCtx := contextutil.ContextWithComponent(ctx, "database")
|
||||||
|
contextlog.LogDebugWithContext(dbCtx, logging.GetSlogger(), "Recording crawl attempt for URL %s", s.URL.String())
|
||||||
|
|
||||||
|
// Check if the context is cancelled before proceeding
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record the crawl attempt with minimal data
|
||||||
|
// timestamp and last_crawled will be set automatically by database DEFAULT
|
||||||
|
_, err := tx.ExecContext(ctx, SQL_RECORD_CRAWL_ATTEMPT,
|
||||||
|
s.URL.String(),
|
||||||
|
s.Host,
|
||||||
|
s.MimeType.String,
|
||||||
|
s.ResponseCode.ValueOrZero(),
|
||||||
|
s.Error.String)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.NewError(fmt.Errorf("cannot record crawl attempt for URL %s: %w", s.URL.String(), err), 0, "", true)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetLatestSnapshot gets the latest snapshot with context
|
// GetLatestSnapshot gets the latest snapshot with context
|
||||||
func (d *DbServiceImpl) GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error) {
|
func (d *DbServiceImpl) GetLatestSnapshot(ctx context.Context, tx *sqlx.Tx, url string) (*snapshot.Snapshot, error) {
|
||||||
dbCtx := contextutil.ContextWithComponent(ctx, "database")
|
dbCtx := contextutil.ContextWithComponent(ctx, "database")
|
||||||
|
|||||||
@@ -94,7 +94,8 @@ links = :links,
|
|||||||
lang = :lang,
|
lang = :lang,
|
||||||
response_code = :response_code,
|
response_code = :response_code,
|
||||||
error = :error,
|
error = :error,
|
||||||
header = :header
|
header = :header,
|
||||||
|
last_crawled = CURRENT_TIMESTAMP
|
||||||
WHERE id = :id
|
WHERE id = :id
|
||||||
RETURNING id
|
RETURNING id
|
||||||
`
|
`
|
||||||
@@ -139,4 +140,9 @@ RETURNING id
|
|||||||
AND timestamp BETWEEN $2 AND $3
|
AND timestamp BETWEEN $2 AND $3
|
||||||
ORDER BY timestamp DESC
|
ORDER BY timestamp DESC
|
||||||
`
|
`
|
||||||
|
// New query to record crawl attempt when content is identical (no new snapshot needed)
|
||||||
|
SQL_RECORD_CRAWL_ATTEMPT = `
|
||||||
|
INSERT INTO snapshots (url, host, mimetype, response_code, error)
|
||||||
|
VALUES ($1, $2, $3, $4, $5)
|
||||||
|
`
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user