package gemini import ( "errors" "fmt" "strings" "time" "gemini-grc/config" "gemini-grc/logging" "gemini-grc/util" "github.com/guregu/null/v5" "github.com/jmoiron/sqlx" ) func SpawnWorkers(numOfWorkers int, db *sqlx.DB) { logging.LogInfo("Spawning %d workers", numOfWorkers) for i := range numOfWorkers { go func(i int) { for { RunWorker(i, db, nil) } }(i) } } func RunWorker(id int, db *sqlx.DB, url *string) { // Each worker runs within a DB transaction. tx, err := db.Beginx() if err != nil { logging.LogError("Failed to begin transaction: %w", err) } // Commit/rollback at the end defer func() { err = tx.Commit() if err != nil { logging.LogError("[%d] Failed to commit transaction: %w", id, err) err := tx.Rollback() if err != nil { panic(fmt.Sprintf("[%d] Failed to roll back transaction: %v", id, err)) } } }() var snapshots []Snapshot // If not given a specific URL, // get some random ones to visit from DB. if url == nil { snapshots, err = GetRandomSnapshotsDistinctHosts(tx) if err != nil { logging.LogError("[%d] GeminiError retrieving snapshot: %w", id, err) panic("This should never happen") } else if len(snapshots) == 0 { logging.LogInfo("[%d] No snapshots to visit.", id) time.Sleep(1 * time.Minute) return } } else { snapshotURL, err := ParseURL(*url, "") if err != nil { logging.LogError("Invalid URL given: " + *url) return } snapshots = []Snapshot{{ //UID: uid.UID(), URL: *snapshotURL, Host: snapshotURL.Hostname, Timestamp: null.TimeFrom(time.Now()), }} } // Start visiting URLs. total := len(snapshots) for i, s := range snapshots { logging.LogInfo("[%d] Starting %d/%d %s", id, i+1, total, s.URL.String()) // We differentiate between errors: // Unexpected errors are the ones returned from the following function. // If an error is unexpected (which should never happen) we panic. // Expected errors are stored as strings within the snapshot, // so that they can also be stored in DB. err = workOnSnapshot(id, tx, &s) if err != nil { logging.LogError("[%d] [%s] Unexpected GeminiError %w", id, s.URL.String(), err) util.PrintStackAndPanic(err) } if s.Error.Valid { logging.LogWarn("[%d] Error: %v", id, s.Error.String) } logging.LogDebug("[%d] Done %d/%d.", id, i+1, total) } logging.LogInfo("[%d] Worker done.", id) } // workOnSnapshot visits a URL and stores the result. // errors should be returned only if they are unexpected. func workOnSnapshot(id int, tx *sqlx.Tx, s *Snapshot) (err error) { if IsBlacklisted(s.URL) { logging.LogInfo("[%d] URL matches Blacklist, ignoring %s", id, s.URL.String()) return nil } // If URL matches a robots.txt disallow line, // add it as an error so next time it won't be // crawled. if RobotMatch(s.URL) { s.Error = null.StringFrom(ErrGeminiRobotsDisallowed.Error()) err = UpsertSnapshot(id, tx, s) if err != nil { return fmt.Errorf("[%d] %w", id, err) } return nil } // Resolve IP address via DNS IPs, err := getHostIPAddresses(s.Host) if err != nil { s.Error = null.StringFrom(err.Error()) err = UpsertSnapshot(id, tx, s) if err != nil { return fmt.Errorf("[%d] %w", id, err) } return nil } // If the host's ip is in the connections pool we stop IpPool.Lock.RLock() logging.LogDebug("[%d] [%s] Checking pool for IP", id, s.URL.String()) for _, ip := range IPs { _, ok := IpPool.IPs[ip] if ok { logging.LogDebug("[%d] Another worker is visiting this host: %s", id, s.URL.String()) IpPool.Lock.RUnlock() time.Sleep(1 * time.Second) // Avoid flood-retrying return nil } } IpPool.Lock.RUnlock() AddIPsToPool(IPs) // After finishing, remove the host IPs from // the connections pool, with a small delay // to avoid potentially hitting the same IP quickly. defer func() { time.Sleep(5 * time.Second) RemoveIPsFromPool(IPs) }() url := s.URL.String() logging.LogDebug("[%d] Dialing %s", id, url) err = Visit(s) if err != nil { if !IsKnownError(err) { logging.LogError("[%d] Unknown error visiting %s: %w", id, url, err) return err } // Check if error is redirection, and handle it s.Error = null.StringFrom(err.Error()) if errors.As(err, new(*GeminiError)) && err.(*GeminiError).Msg == "redirect" { err = handleRedirection(id, tx, s) if err != nil { return err } } } logging.LogInfo("[%d] Done, response code %d.", id, s.ResponseCode.ValueOrZero()) // If this is a gemini page, parse possible links inside if !s.Error.Valid && s.MimeType.Valid && s.MimeType.String == "text/gemini" { links := GetPageLinks(s.URL, s.GemText.String) logging.LogDebug("[%d] Found %d links", id, len(links)) if len(links) > 0 { s.Links = null.ValueFrom(links) } } else { logging.LogDebug("[%d] Not looking for page links", id) } err = UpsertSnapshot(id, tx, s) if err != nil { return err } err = storeLinks(tx, s) if err != nil { return err } return nil } func storeLinks(tx *sqlx.Tx, s *Snapshot) error { if s.Links.Valid { var batchSnapshots []*Snapshot for _, link := range s.Links.ValueOrZero() { if shouldPersistURL(link) { newSnapshot := &Snapshot{ //UID: uid.UID(), URL: link, Host: link.Hostname, Timestamp: null.TimeFrom(time.Now()), } batchSnapshots = append(batchSnapshots, newSnapshot) } } if len(batchSnapshots) > 0 { err := SaveLinksToDBinBatches(tx, batchSnapshots) if err != nil { return err } } } return nil } // shouldPersistURL returns true if we // should save the URL in the DB. // Only gemini:// urls are saved. func shouldPersistURL(u URL) bool { return strings.HasPrefix(u.String(), "gemini://") } func handleRedirection(id int, tx *sqlx.Tx, s *Snapshot) error { newURL, err := extractRedirectTarget(s.URL, s.Error.ValueOrZero()) if err != nil { return err } logging.LogDebug("[%d] Page redirects to %s", id, newURL) // Insert fresh snapshot with new URL snapshot := &Snapshot{ //UID: uid.UID(), URL: *newURL, Host: newURL.Hostname, Timestamp: null.TimeFrom(time.Now()), } logging.LogDebug("[%d] Saving empty snapshot for %s", id, snapshot.URL.String()) err = SaveSnapshotIfNew(tx, snapshot) if err != nil { return err } return nil } func GetRandomSnapshotsDistinctHosts(tx *sqlx.Tx) ([]Snapshot, error) { // Old, unoptimized query // query := ` // SELECT DISTINCT ON (host) * // FROM snapshots // WHERE response_code IS NULL // AND error IS NULL // ORDER BY host, RANDOM() // LIMIT $1 // ` query := ` SELECT * FROM snapshots WHERE response_code IS NULL AND error IS NULL ORDER BY RANDOM() LIMIT $1 ` //query := ` // WITH RankedSnapshots AS ( // SELECT id, url, host, timestamp, mimetype, data, gemtext, // links, lang, response_code, error, // ROW_NUMBER() OVER (PARTITION BY host ORDER BY RANDOM()) as rn // FROM snapshots // WHERE response_code IS NULL // AND error IS NULL // ) // SELECT id, url, host, timestamp, mimetype, data, gemtext, // links, lang, response_code, error // FROM RankedSnapshots // WHERE rn = 1 // LIMIT $1 //` var snapshots []Snapshot err := tx.Select(&snapshots, query, config.CONFIG.WorkerBatchSize) if err != nil { return nil, err } return snapshots, nil } func GetSnapshotFromURL(tx *sqlx.Tx, url string) ([]Snapshot, error) { query := ` SELECT * FROM snapshots WHERE url=$1 LIMIT 1 ` var snapshots []Snapshot err := tx.Select(&snapshots, query, url) if err != nil { return nil, err } return snapshots, nil }