Add robots.txt checking
Still needs periodic cache refresh
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -3,6 +3,7 @@
|
|||||||
**/*~
|
**/*~
|
||||||
/cmd
|
/cmd
|
||||||
/db/initdb.sql
|
/db/initdb.sql
|
||||||
|
/db/*sh
|
||||||
/run*.sh
|
/run*.sh
|
||||||
/gemini-grc
|
/gemini-grc
|
||||||
/snaps
|
/snaps
|
||||||
|
|||||||
@@ -10,10 +10,10 @@ A Gemini crawler.
|
|||||||
- [x] Configuration via environment variables
|
- [x] Configuration via environment variables
|
||||||
- [x] Storing snapshots in PostgreSQL
|
- [x] Storing snapshots in PostgreSQL
|
||||||
- [x] Proper response header & body UTF-8 and format validation
|
- [x] Proper response header & body UTF-8 and format validation
|
||||||
|
- [x] Follow robots.txt
|
||||||
|
|
||||||
## TODO
|
## TODO
|
||||||
- [ ] Follow robots.txt gemini://geminiprotocol.net/docs/companion/
|
- [ ] Take into account gemini://geminiprotocol.net/docs/companion/robots.gmi
|
||||||
- [ ] Test with gemini://alexey.shpakovsky.ru/maze
|
|
||||||
- [ ] Proper handling of all response codes
|
- [ ] Proper handling of all response codes
|
||||||
- [ ] Handle 3X redirects properly
|
- [ ] Handle 3X redirects properly
|
||||||
- [ ] Handle URLs that need presentation of a TLS cert, like astrobotany
|
- [ ] Handle URLs that need presentation of a TLS cert, like astrobotany
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
gemi.dev
|
|
||||||
kennedy.gemi.dev
|
|
||||||
alexey.shpakovsky.ru
|
|
||||||
musicbrainz.uploadedlobster.com
|
|
||||||
gemini.bunburya.eu
|
|
||||||
@@ -42,4 +42,7 @@ CREATE INDEX idx_lang ON snapshots (lang);
|
|||||||
CREATE INDEX idx_response_code ON snapshots (response_code);
|
CREATE INDEX idx_response_code ON snapshots (response_code);
|
||||||
CREATE INDEX idx_error ON snapshots (error);
|
CREATE INDEX idx_error ON snapshots (error);
|
||||||
CREATE INDEX idx_host ON snapshots (host);
|
CREATE INDEX idx_host ON snapshots (host);
|
||||||
|
CREATE INDEX idx_snapshots_unprocessed_no_data ON snapshots (host)
|
||||||
|
WHERE response_code IS NULL AND error IS NULL
|
||||||
|
INCLUDE (id, uid, url, timestamp, mimetype, gemtext, links, lang);
|
||||||
CREATE INDEX idx_response_code_error_nulls ON snapshots (response_code, error) WHERE response_code IS NULL AND error IS NULL;
|
CREATE INDEX idx_response_code_error_nulls ON snapshots (response_code, error) WHERE response_code IS NULL AND error IS NULL;
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
SELECT
|
SELECT
|
||||||
COUNT(CASE WHEN response_code IS NOT NULL AND error IS NULL THEN 1 END) AS "Visited",
|
COUNT(CASE WHEN response_code IS NOT NULL AND error IS NULL THEN 1 END) AS "Visited",
|
||||||
COUNT(CASE WHEN response_code IS NULL THEN 1 END) AS "Pending",
|
COUNT(CASE WHEN response_code IS NULL AND error IS NULL THEN 1 END) AS "Pending",
|
||||||
COUNT(CASE WHEN error IS NOT NULL THEN 1 END) AS "Errors"
|
COUNT(CASE WHEN error IS NOT NULL THEN 1 END) AS "Errors"
|
||||||
FROM snapshots;
|
FROM snapshots;
|
||||||
|
|||||||
@@ -1,22 +0,0 @@
|
|||||||
package gemini
|
|
||||||
|
|
||||||
import "gemini-grc/logging"
|
|
||||||
|
|
||||||
var Blacklist *[]string
|
|
||||||
|
|
||||||
func InBlacklist(s *Snapshot) bool {
|
|
||||||
if Blacklist == nil {
|
|
||||||
data := ReadLines("blacklists/domains.txt")
|
|
||||||
Blacklist = &data
|
|
||||||
logging.LogInfo("Loaded %d blacklisted domains", len(*Blacklist))
|
|
||||||
}
|
|
||||||
for _, l := range *Blacklist {
|
|
||||||
if s.Host == l {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
// if strings.HasPrefix(s.URL.String(), l) {
|
|
||||||
// return true
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
@@ -5,14 +5,14 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"gemini-grc/logging"
|
"gemini-grc/logging"
|
||||||
"net/url"
|
"net/url"
|
||||||
go_url "net/url"
|
gourl "net/url"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func isGeminiURL(url string) bool {
|
func isGeminiURL(url string) bool {
|
||||||
_, err := go_url.Parse(url)
|
_, err := gourl.Parse(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logging.LogWarn("[%s] Invalid URL: %v", url, err)
|
logging.LogWarn("[%s] Invalid URL: %v", url, err)
|
||||||
return false
|
return false
|
||||||
@@ -36,17 +36,17 @@ func checkGeminiStatusCode(code int) error {
|
|||||||
case code == 20:
|
case code == 20:
|
||||||
return nil
|
return nil
|
||||||
case code >= 10 && code < 20:
|
case code >= 10 && code < 20:
|
||||||
return fmt.Errorf("Gemini response %d needs data input", code)
|
return fmt.Errorf("gemini response %d needs data input", code)
|
||||||
case code >= 30 && code < 40:
|
case code >= 30 && code < 40:
|
||||||
return fmt.Errorf("Gemini response %d redirect", code)
|
return fmt.Errorf("gemini response %d redirect", code)
|
||||||
case code >= 40 && code < 50:
|
case code >= 40 && code < 50:
|
||||||
return fmt.Errorf("Gemini response %d server error", code)
|
return fmt.Errorf("gemini response %d server error", code)
|
||||||
case code >= 50 && code < 60:
|
case code >= 50 && code < 60:
|
||||||
return fmt.Errorf("Gemini response %d server permanent error", code)
|
return fmt.Errorf("gemini response %d server permanent error", code)
|
||||||
case code >= 60 && code < 70:
|
case code >= 60 && code < 70:
|
||||||
return fmt.Errorf("Gemini response %d certificate error", code)
|
return fmt.Errorf("gemini response %d certificate error", code)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unexpected/unhandled Gemini response %d", code)
|
return fmt.Errorf("unexpected/unhandled Gemini response %d", code)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,14 +57,14 @@ func ProcessGemini(snapshot *Snapshot) *Snapshot {
|
|||||||
|
|
||||||
// Normalize URLs in links, and store them in snapshot
|
// Normalize URLs in links, and store them in snapshot
|
||||||
for _, line := range linkLines {
|
for _, line := range linkLines {
|
||||||
normalizedLink, descr, error := NormalizeLink(line, snapshot.URL.String())
|
normalizedLink, descr, err := NormalizeLink(line, snapshot.URL.String())
|
||||||
if error != nil {
|
if err != nil {
|
||||||
logging.LogWarn("Cannot normalize URL in line '%s': %v", line, error)
|
logging.LogDebug("Cannot normalize URL in line '%s': %v", line, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
geminiUrl, error := ParseUrl(normalizedLink, descr)
|
geminiUrl, err := ParseUrl(normalizedLink, descr)
|
||||||
if error != nil {
|
if err != nil {
|
||||||
logging.LogWarn("Cannot parse URL in link '%s': %v", line, error)
|
logging.LogDebug("Cannot parse URL in link '%s': %v", line, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if snapshot.Links == nil {
|
if snapshot.Links == nil {
|
||||||
@@ -79,18 +79,18 @@ func ProcessGemini(snapshot *Snapshot) *Snapshot {
|
|||||||
func ParseUrl(input string, descr string) (*GeminiUrl, error) {
|
func ParseUrl(input string, descr string) (*GeminiUrl, error) {
|
||||||
u, err := url.Parse(input)
|
u, err := url.Parse(input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Error parsing URL %s: %w", input, err)
|
return nil, fmt.Errorf("error parsing URL %s: %w", input, err)
|
||||||
}
|
}
|
||||||
protocol := u.Scheme
|
protocol := u.Scheme
|
||||||
hostname := u.Hostname()
|
hostname := u.Hostname()
|
||||||
str_port := u.Port()
|
strPort := u.Port()
|
||||||
path := u.Path
|
path := u.Path
|
||||||
if str_port == "" {
|
if strPort == "" {
|
||||||
str_port = "1965"
|
strPort = "1965"
|
||||||
}
|
}
|
||||||
port, err := strconv.Atoi(str_port)
|
port, err := strconv.Atoi(strPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Error parsing URL %s: %w", input, err)
|
return nil, fmt.Errorf("error parsing URL %s: %w", input, err)
|
||||||
}
|
}
|
||||||
return &GeminiUrl{Protocol: protocol, Hostname: hostname, Port: port, Path: path, Descr: descr, Full: u.String()}, nil
|
return &GeminiUrl{Protocol: protocol, Hostname: hostname, Port: port, Path: path, Descr: descr, Full: u.String()}, nil
|
||||||
}
|
}
|
||||||
@@ -106,14 +106,14 @@ func ExtractLinkLines(gemtext string) []string {
|
|||||||
return matches
|
return matches
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take a single link line and the current URL,
|
// NormalizeLink takes a single link line and the current URL,
|
||||||
// return the URL converted to an absolute URL
|
// return the URL converted to an absolute URL
|
||||||
// and its description.
|
// and its description.
|
||||||
func NormalizeLink(linkLine string, currentURL string) (link string, descr string, err error) {
|
func NormalizeLink(linkLine string, currentURL string) (link string, descr string, err error) {
|
||||||
// Parse the current URL
|
// Parse the current URL
|
||||||
baseURL, err := url.Parse(currentURL)
|
baseURL, err := url.Parse(currentURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", fmt.Errorf("Invalid current URL: %v", err)
|
return "", "", fmt.Errorf("invalid current URL: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Regular expression to extract the URL part from a link line
|
// Regular expression to extract the URL part from a link line
|
||||||
@@ -123,13 +123,13 @@ func NormalizeLink(linkLine string, currentURL string) (link string, descr strin
|
|||||||
matches := re.FindStringSubmatch(linkLine)
|
matches := re.FindStringSubmatch(linkLine)
|
||||||
if len(matches) == 0 {
|
if len(matches) == 0 {
|
||||||
// If the line doesn't match the expected format, return it unchanged
|
// If the line doesn't match the expected format, return it unchanged
|
||||||
return "", "", fmt.Errorf("Not a link line: %v", linkLine)
|
return "", "", fmt.Errorf("not a link line: %v", linkLine)
|
||||||
}
|
}
|
||||||
|
|
||||||
originalURLStr := matches[1]
|
originalURLStr := matches[1]
|
||||||
_, err = url.QueryUnescape(originalURLStr)
|
_, err = url.QueryUnescape(originalURLStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", fmt.Errorf("Error decoding URL: %w", err)
|
return "", "", fmt.Errorf("error decoding URL: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
restOfLine := ""
|
restOfLine := ""
|
||||||
@@ -141,7 +141,7 @@ func NormalizeLink(linkLine string, currentURL string) (link string, descr strin
|
|||||||
parsedURL, err := url.Parse(originalURLStr)
|
parsedURL, err := url.Parse(originalURLStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If URL parsing fails, return an error
|
// If URL parsing fails, return an error
|
||||||
return "", "", fmt.Errorf("Invalid URL '%s': %v", originalURLStr, err)
|
return "", "", fmt.Errorf("invalid URL '%s': %v", originalURLStr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resolve relative URLs against the base URL
|
// Resolve relative URLs against the base URL
|
||||||
|
|||||||
@@ -57,6 +57,34 @@ func SaveSnapshotToDB(tx *sqlx.Tx, s *Snapshot) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SaveLinksToDBinBatches(tx *sqlx.Tx, snapshots []*Snapshot) error {
|
||||||
|
// Approximately 5,957 rows maximum (65535/11 parameters), use 5000 to be safe
|
||||||
|
const batchSize = 5000
|
||||||
|
|
||||||
|
query := `
|
||||||
|
INSERT INTO snapshots (uid, url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error)
|
||||||
|
VALUES (:uid, :url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error)
|
||||||
|
ON CONFLICT (uid) DO NOTHING
|
||||||
|
`
|
||||||
|
|
||||||
|
for i := 0; i < len(snapshots); i += batchSize {
|
||||||
|
end := i + batchSize
|
||||||
|
if end > len(snapshots) {
|
||||||
|
end = len(snapshots)
|
||||||
|
}
|
||||||
|
|
||||||
|
batch := snapshots[i:end]
|
||||||
|
|
||||||
|
_, err := tx.NamedExec(query, batch)
|
||||||
|
if err != nil {
|
||||||
|
logging.LogError("Error batch inserting snapshots: %w", err)
|
||||||
|
return fmt.Errorf("DB error: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func SaveLinksToDB(tx *sqlx.Tx, snapshots []*Snapshot) error {
|
func SaveLinksToDB(tx *sqlx.Tx, snapshots []*Snapshot) error {
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO snapshots (uid, url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error)
|
INSERT INTO snapshots (uid, url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error)
|
||||||
|
|||||||
83
gemini/robotmatch.go
Normal file
83
gemini/robotmatch.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package gemini
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"gemini-grc/logging"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// key: "host:port" (string)
|
||||||
|
// value:
|
||||||
|
// empty []string if no robots data, or
|
||||||
|
// list of URL prefixes ([]string) in robots
|
||||||
|
var RobotsCache sync.Map
|
||||||
|
|
||||||
|
func populateBlacklist(key string) (entries []string) {
|
||||||
|
// We either store an empty list when
|
||||||
|
// no rules, or a list of disallowed URLs.
|
||||||
|
// This applies even if we have an error
|
||||||
|
// finding/downloading robots.txt
|
||||||
|
defer func() {
|
||||||
|
RobotsCache.Store(key, entries)
|
||||||
|
}()
|
||||||
|
url := fmt.Sprintf("gemini://%s/robots.txt", key)
|
||||||
|
robotsContent, err := ConnectAndGetData(url)
|
||||||
|
if err != nil {
|
||||||
|
logging.LogDebug("robots.txt error %s", err)
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
robotsData, err := processData(robotsContent)
|
||||||
|
if err != nil {
|
||||||
|
logging.LogDebug("robots.txt error %s", err)
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
if robotsData.ResponseCode != 20 {
|
||||||
|
logging.LogDebug("robots.txt error code %d, ignoring", robotsData.ResponseCode)
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
// Some return text/plain, others text/gemini.
|
||||||
|
// According to spec, the first is correct,
|
||||||
|
// however let's be lenient
|
||||||
|
var data string
|
||||||
|
if robotsData.MimeType == "text/plain" {
|
||||||
|
data = string(robotsData.Data)
|
||||||
|
} else if robotsData.MimeType == "text/gemini" {
|
||||||
|
data = robotsData.GemText
|
||||||
|
} else {
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
entries = ParseRobotsTxt(string(data), key)
|
||||||
|
return entries
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the snapshot URL matches
|
||||||
|
// a robots.txt allow rule.
|
||||||
|
func RobotMatch(s *Snapshot) bool {
|
||||||
|
logging.LogDebug("Checking robots.txt cache for %s", s.URL.String())
|
||||||
|
key := fmt.Sprintf("%s:%d", s.Host, s.URL.Port)
|
||||||
|
v, ok := RobotsCache.Load(key)
|
||||||
|
if ok == false {
|
||||||
|
// First time check, populate robot cache
|
||||||
|
logging.LogDebug("No robots.txt entry, populating cache for %s", s.URL.String())
|
||||||
|
disallowedURLs := populateBlacklist(key)
|
||||||
|
for _, url := range disallowedURLs {
|
||||||
|
if strings.HasPrefix(s.URL.String(), url) {
|
||||||
|
logging.LogDebug("robots.txt match: %s %s", s.URL.String(), url)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if len(v.([]string)) == 0 {
|
||||||
|
logging.LogDebug("No robots.txt or no rules, allowed")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, url := range v.([]string) {
|
||||||
|
if strings.HasPrefix(s.URL.String(), url) {
|
||||||
|
logging.LogDebug("robots.txt match: %s %s", s.URL.String(), url)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
@@ -1,8 +1,8 @@
|
|||||||
package gemini
|
package gemini
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestParseRobotsTxt(t *testing.T) {
|
func TestParseRobotsTxt(t *testing.T) {
|
||||||
@@ -15,6 +15,7 @@ Disallow: /admin/`
|
|||||||
expected := []string{
|
expected := []string{
|
||||||
"gemini://example.com/cgi-bin/wp.cgi/view",
|
"gemini://example.com/cgi-bin/wp.cgi/view",
|
||||||
"gemini://example.com/cgi-bin/wp.cgi/media",
|
"gemini://example.com/cgi-bin/wp.cgi/media",
|
||||||
|
"gemini://example.com/admin/",
|
||||||
}
|
}
|
||||||
|
|
||||||
result := ParseRobotsTxt(input, "example.com")
|
result := ParseRobotsTxt(input, "example.com")
|
||||||
@@ -23,3 +24,13 @@ Disallow: /admin/`
|
|||||||
t.Errorf("ParseRobotsTxt() = %v, want %v", result, expected)
|
t.Errorf("ParseRobotsTxt() = %v, want %v", result, expected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseRobotsTxtEmpty(t *testing.T) {
|
||||||
|
input := ``
|
||||||
|
|
||||||
|
result := ParseRobotsTxt(input, "example.com")
|
||||||
|
|
||||||
|
if len(result) != 0 {
|
||||||
|
t.Errorf("ParseRobotsTxt() = %v, want empty []string", result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
159
gemini/worker.go
159
gemini/worker.go
@@ -1,12 +1,11 @@
|
|||||||
package gemini
|
package gemini
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"gemini-grc/config"
|
"gemini-grc/config"
|
||||||
"gemini-grc/logging"
|
"gemini-grc/logging"
|
||||||
"gemini-grc/uid"
|
"gemini-grc/uid"
|
||||||
"runtime/debug"
|
"gemini-grc/util"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -25,17 +24,63 @@ func SpawnWorkers(numOfWorkers int, db *sqlx.DB) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func printPoolIPs() {
|
func runWorker(id int, db *sqlx.DB) {
|
||||||
fmt.Printf("%v", IpPool.IPs)
|
// Start the DB transaction
|
||||||
|
tx, err := db.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
logging.LogError("Failed to begin transaction: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
logging.LogError("[%d] Failed to commit transaction: %w", id, err)
|
||||||
|
err := tx.Rollback()
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("[%d] Failed to roll back transaction: %v", id, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
snapshots, err := GetRandomSnapshotsDistinctHosts(tx)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logging.LogError("[%d] Error retrieving snapshot: %w", id, err)
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
return
|
||||||
|
} else if len(snapshots) == 0 {
|
||||||
|
logging.LogInfo("[%d] No remaining snapshots to visit.", id)
|
||||||
|
time.Sleep(1 * time.Minute)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
total := len(snapshots)
|
||||||
|
for i, s := range snapshots {
|
||||||
|
logging.LogInfo("[%d] Starting %d/%d %s", id, i+1, total, s.URL)
|
||||||
|
err = workOnSnapshot(id, tx, &s)
|
||||||
|
if err != nil {
|
||||||
|
logging.LogError("[%d] [%s] Error %w", id, s.URL, err)
|
||||||
|
util.PrintStackAndPanic(err)
|
||||||
|
}
|
||||||
|
if s.Error.Valid {
|
||||||
|
logging.LogWarn("[%d] [%s] Error: %v", id, s.URL, fmt.Errorf(s.Error.String))
|
||||||
|
}
|
||||||
|
logging.LogDebug("[%d] Done %d/%d.", id, i, total)
|
||||||
|
}
|
||||||
|
logging.LogInfo("[%d] Worker done.", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func workOnSnapshot(id int, tx *sqlx.Tx, s *Snapshot) (err error) {
|
func workOnSnapshot(id int, tx *sqlx.Tx, s *Snapshot) (err error) {
|
||||||
// Wrap errors with more info.
|
// If URL matches a robots.txt disallow line,
|
||||||
defer func() {
|
// add it as an error so next time it won't be
|
||||||
|
// crawled.
|
||||||
|
if RobotMatch(s) {
|
||||||
|
s.Error = null.StringFrom("robots.txt disallow match")
|
||||||
|
err = SaveSnapshotToDB(tx, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("[%d] Worker Error: %w", id, err)
|
return fmt.Errorf("[%d] DB Error: %w", id, err)
|
||||||
}
|
}
|
||||||
}()
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
IPs, err := getHostIPAddresses(s.Host)
|
IPs, err := getHostIPAddresses(s.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -88,22 +133,22 @@ func workOnSnapshot(id int, tx *sqlx.Tx, s *Snapshot) (err error) {
|
|||||||
if s.Links != nil {
|
if s.Links != nil {
|
||||||
var batchSnapshots []*Snapshot
|
var batchSnapshots []*Snapshot
|
||||||
timestamp := null.TimeFrom(time.Now())
|
timestamp := null.TimeFrom(time.Now())
|
||||||
|
|
||||||
for _, link := range *s.Links {
|
for _, link := range *s.Links {
|
||||||
if shouldPersistURL(tx, link) {
|
if shouldPersistURL(tx, link) {
|
||||||
newSnapshot := &Snapshot{
|
newSnapshot := &Snapshot{
|
||||||
UID: uid.UID(),
|
UID: uid.UID(),
|
||||||
URL: link,
|
URL: link,
|
||||||
Host: link.Hostname,
|
Host: link.Hostname,
|
||||||
Timestamp: timestamp,
|
Timestamp: timestamp,
|
||||||
}
|
}
|
||||||
batchSnapshots = append(batchSnapshots, newSnapshot)
|
batchSnapshots = append(batchSnapshots, newSnapshot)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(batchSnapshots) > 0 {
|
if len(batchSnapshots) > 0 {
|
||||||
logging.LogDebug("[%d] Batch saving %d links", id, len(batchSnapshots))
|
logging.LogDebug("[%d] Batch saving %d links", id, len(batchSnapshots))
|
||||||
err = SaveLinksToDB(tx, batchSnapshots)
|
err = SaveLinksToDBinBatches(tx, batchSnapshots)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("[%d] DB Error: %w", id, err)
|
return fmt.Errorf("[%d] DB Error: %w", id, err)
|
||||||
}
|
}
|
||||||
@@ -127,45 +172,6 @@ func shouldPersistURL(tx *sqlx.Tx, u GeminiUrl) bool {
|
|||||||
return !exists
|
return !exists
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select a random snapshot.
|
|
||||||
func GetRandomSnapshot(tx *sqlx.Tx) (*Snapshot, error) {
|
|
||||||
query := `SELECT * FROM snapshots
|
|
||||||
WHERE response_code IS NULL
|
|
||||||
AND error IS NULL
|
|
||||||
ORDER BY RANDOM()
|
|
||||||
LIMIT 1
|
|
||||||
FOR UPDATE SKIP LOCKED`
|
|
||||||
// AND (timestamp < NOW() - INTERVAL '1 day' OR timestamp IS NULL)
|
|
||||||
var snapshot Snapshot
|
|
||||||
err := tx.Get(&snapshot, query)
|
|
||||||
if err != nil {
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
// Handle the case where no rows were found
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
// Handle other potential errors
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &snapshot, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetRandomSnapshots(tx *sqlx.Tx) ([]Snapshot, error) {
|
|
||||||
query := `
|
|
||||||
SELECT * FROM snapshots
|
|
||||||
WHERE response_code IS NULL
|
|
||||||
AND error IS NULL
|
|
||||||
ORDER BY RANDOM()
|
|
||||||
LIMIT $1
|
|
||||||
FOR UPDATE SKIP LOCKED
|
|
||||||
`
|
|
||||||
var snapshots []Snapshot
|
|
||||||
err := tx.Select(&snapshots, query, config.CONFIG.WorkerBatchSize)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return snapshots, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetRandomSnapshotsDistinctHosts(tx *sqlx.Tx) ([]Snapshot, error) {
|
func GetRandomSnapshotsDistinctHosts(tx *sqlx.Tx) ([]Snapshot, error) {
|
||||||
// Old, unoptimized query
|
// Old, unoptimized query
|
||||||
//
|
//
|
||||||
@@ -199,50 +205,3 @@ func GetRandomSnapshotsDistinctHosts(tx *sqlx.Tx) ([]Snapshot, error) {
|
|||||||
}
|
}
|
||||||
return snapshots, nil
|
return snapshots, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func runWorker(id int, db *sqlx.DB) {
|
|
||||||
// Start the transaction
|
|
||||||
tx, err := db.Beginx()
|
|
||||||
if err != nil {
|
|
||||||
logging.LogError("Failed to begin transaction: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
err = tx.Commit()
|
|
||||||
if err != nil {
|
|
||||||
logging.LogError("[%d] Failed to commit transaction: %w", id, err)
|
|
||||||
tx.Rollback()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
snapshots, err := GetRandomSnapshotsDistinctHosts(tx)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logging.LogError("[%d] Error retrieving snapshot: %w", id, err)
|
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
return
|
|
||||||
} else if len(snapshots) == 0 {
|
|
||||||
logging.LogInfo("[%d] No remaining snapshots to visit.", id)
|
|
||||||
time.Sleep(1 * time.Minute)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
total := len(snapshots)
|
|
||||||
for i, s := range snapshots {
|
|
||||||
if InBlacklist(&s) {
|
|
||||||
logging.LogDebug("[%d] Ignoring %d/%d blacklisted URL %s", id, i+1, total, s.URL)
|
|
||||||
}
|
|
||||||
logging.LogInfo("[%d] Starting %d/%d %s", id, i+1, total, s.URL)
|
|
||||||
err = workOnSnapshot(id, tx, &s)
|
|
||||||
if err != nil {
|
|
||||||
logging.LogError("[%d] [%s] Error %w", id, s.URL, err)
|
|
||||||
// TODO Remove panic and gracefully handle/log error
|
|
||||||
fmt.Printf("Error %s Stack trace:\n%s", err, debug.Stack())
|
|
||||||
panic("ERROR encountered")
|
|
||||||
}
|
|
||||||
if s.Error.Valid {
|
|
||||||
logging.LogWarn("[%d] [%s] Error: %v", id, s.URL, fmt.Errorf(s.Error.String))
|
|
||||||
}
|
|
||||||
logging.LogDebug("[%d] Done %d/%d.", id, i, total)
|
|
||||||
}
|
|
||||||
logging.LogInfo("[%d] Worker done.", id)
|
|
||||||
}
|
|
||||||
|
|||||||
11
util/util.go
Normal file
11
util/util.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"runtime/debug"
|
||||||
|
)
|
||||||
|
|
||||||
|
func PrintStackAndPanic(err error) {
|
||||||
|
fmt.Printf("Error %s Stack trace:\n%s", err, debug.Stack())
|
||||||
|
panic("PANIC")
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user