diff --git a/.gitignore b/.gitignore index 7318b3a..7ef970b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ .idea **/.#* **/*~ -/run.sh +/cmd +/db/initdb.sql +/run*.sh /gemini-grc /snaps diff --git a/README.md b/README.md index 1282424..397613b 100644 --- a/README.md +++ b/README.md @@ -2,21 +2,29 @@ A Gemini crawler. +## Done +- [x] Concurrent downloading with workers +- [x] Concurrent connection limit per host +- [x] URL Blacklist +- [x] Save image/* and text/* files +- [x] Configuration via environment variables +- [x] Storing snapshots in PostgreSQL +- [x] Proper response header & body UTF-8 and format validation + ## TODO -- [ ] Save image/* and text/* files -- [ ] Wide events logging -- [ ] Handle URLs that need presentation of a TLS cert? Like astrobotany - + [ ] Probably have a common "grc" cert for all +- [ ] Follow robots.txt gemini://geminiprotocol.net/docs/companion/ + - [ ] Test with gemini://alexey.shpakovsky.ru/maze +- [ ] Proper handling of all response codes + - [ ] Handle 3X redirects properly +- [ ] Handle URLs that need presentation of a TLS cert, like astrobotany + + [ ] Probably have a common "grc" cert for all? - [ ] Proper input and response validations: + [ ] When making a request, the URI MUST NOT exceed 1024 bytes - + [ ] Response headers MUST be UTF-8 encoded text and MUST NOT begin with the Byte Order Mark U+FEFF. -- [ ] Proper handling of all response codes -- [ ] Proper validation (or logging) of invalid/expired TLS certs? -- [ ] Subscribe to gemini pages? gemini://geminiprotocol.net/docs/companion/ -- [ ] Follow robots.txt gemini://geminiprotocol.net/docs/companion/ +- [ ] Subscriptions to gemini pages? gemini://geminiprotocol.net/docs/companion/ -## TODO later +## TODO for later - [ ] Add other protocols + + [ ] Gopher + [ ] Scroll gemini://auragem.letz.dev/devlog/20240316.gmi + [ ] Spartan + [ ] Nex diff --git a/blacklist.txt b/blacklist.txt new file mode 100644 index 0000000..9004dac --- /dev/null +++ b/blacklist.txt @@ -0,0 +1,5 @@ +gemini://gemi.dev/cgi-bin/waffle.cgi +gemini://alexey.shpakovsky.ru/maze +gemini://kennedy.gemi.dev +gemini://musicbrainz.uploadedlobster.com +gemini://gemini.bunburya.eu/remini diff --git a/config.go b/config/config.go similarity index 67% rename from config.go rename to config/config.go index 3051bfa..2e37f5d 100644 --- a/config.go +++ b/config/config.go @@ -1,4 +1,4 @@ -package main +package config import ( "fmt" @@ -9,19 +9,23 @@ import ( ) type Config struct { - logLevel zerolog.Level + LogLevel zerolog.Level rootPath string - numOfWorkers int - maxResponseSize int - responseTimeout int + MaxResponseSize int + NumOfWorkers int + ResponseTimeout int + WorkerBatchSize int } -func getConfig() *Config { +var CONFIG Config + +func GetConfig() *Config { var config Config for _, envVar := range []string{ "LOG_LEVEL", "ROOT_PATH", "NUM_OF_WORKERS", + "WORKER_BATCH_SIZE", "MAX_RESPONSE_SIZE", "RESPONSE_TIMEOUT", } { @@ -37,7 +41,7 @@ func getConfig() *Config { fmt.Fprintf(os.Stderr, "Invalid LOG_LEVEL value\n") os.Exit(1) } - config.logLevel = logLevel + config.LogLevel = logLevel } case "ROOT_PATH": { @@ -49,7 +53,16 @@ func getConfig() *Config { fmt.Fprintf(os.Stderr, "Invalid NUM_OF_WORKERS value\n") os.Exit(1) } else { - config.numOfWorkers = numOfWorkers + config.NumOfWorkers = numOfWorkers + } + } + case "WORKER_BATCH_SIZE": + { + if workerBatchSize, err := strconv.Atoi(env); err != nil { + fmt.Fprintf(os.Stderr, "Invalid WORKER_BATCH_SIZE value\n") + os.Exit(1) + } else { + config.WorkerBatchSize = workerBatchSize } } case "MAX_RESPONSE_SIZE": @@ -58,7 +71,7 @@ func getConfig() *Config { fmt.Fprintf(os.Stderr, "Invalid MAX_RESPONSE_SIZE value\n") os.Exit(1) } else { - config.maxResponseSize = maxResponseSize + config.MaxResponseSize = maxResponseSize } } case "RESPONSE_TIMEOUT": @@ -67,7 +80,7 @@ func getConfig() *Config { fmt.Fprintf(os.Stderr, "Invalid RESPONSE_TIMEOUT value\n") os.Exit(1) } else { - config.responseTimeout = val + config.ResponseTimeout = val } } } diff --git a/db/backup-table.sql b/db/backup-table.sql new file mode 100644 index 0000000..b69250f --- /dev/null +++ b/db/backup-table.sql @@ -0,0 +1,16 @@ +BEGIN; + +-- Increase statement timeout +SET statement_timeout = '10min'; + +-- Step 1: Create a new table with the same schema +CREATE TABLE backup (LIKE snapshots INCLUDING ALL); + +-- Step 2: Copy data from the old table to the new one +INSERT INTO backup SELECT * FROM snapshots; + +-- (Optional) Step 3: Truncate the original table if you are moving the data +-- TRUNCATE TABLE snapshots; + +-- Commit the transaction if everything went well +COMMIT; diff --git a/db/delete-dups.sql b/db/delete-dups.sql new file mode 100644 index 0000000..2393c27 --- /dev/null +++ b/db/delete-dups.sql @@ -0,0 +1,26 @@ +-- Explanation: + +-- WITH DuplicateSnapshots AS: +-- This is a Common Table Expression (CTE) that selects all rows from the snapshots table. +-- ROW_NUMBER() OVER (PARTITION BY url ORDER BY id): This assigns a unique row number to each row with the same url. The PARTITION BY url groups the rows by url, and ORDER BY id ensures that the row with the smallest id is given row_num = 1. +-- DELETE FROM snapshots WHERE id IN: +-- The DELETE statement deletes rows from the snapshots table where the id is in the result of the subquery. +-- WHERE row_num > 1: +-- In the subquery, we select only rows where row_num > 1, which means only the duplicate rows (since row_num = 1 is the one row we want to keep). + +-- Result: + +-- This query will delete all duplicate rows from the snapshots table, keeping only the row with the smallest id for each url. +-- If multiple rows share the same url, only the first one (based on id) will be retained. + +WITH DuplicateSnapshots AS ( + SELECT id, + ROW_NUMBER() OVER (PARTITION BY url ORDER BY id) AS row_num + FROM snapshots +) +DELETE FROM snapshots +WHERE id IN ( + SELECT id + FROM DuplicateSnapshots + WHERE row_num > 1 +); diff --git a/db/host_stats.sql b/db/host_stats.sql new file mode 100644 index 0000000..0d16692 --- /dev/null +++ b/db/host_stats.sql @@ -0,0 +1,5 @@ +SELECT host, COUNT(*) AS row_count +FROM snapshots +GROUP BY host +ORDER BY row_count DESC +LIMIT 10; diff --git a/db/initdb.sql b/db/initdb.sql new file mode 100644 index 0000000..424fd51 --- /dev/null +++ b/db/initdb.sql @@ -0,0 +1,45 @@ +-- DB creation and users +CREATE USER gemini; +ALTER USER gemini WITH PASSWORD 'gemini'; +CREATE DATABASE gemini; +GRANT ALL PRIVILEGES ON DATABASE gemini TO gemini; +ALTER DATABASE gemini OWNER TO gemini; +GRANT ALL PRIVILEGES ON SCHEMA public TO gemini; +GRANT ALL PRIVILEGES ON gemini TO gemini; +GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO gemini; + +-- Extensions +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + + +\c gemini + +-- Tables +DROP TABLE IF EXISTS snapshots; + +CREATE TABLE snapshots ( + id SERIAL PRIMARY KEY, + uid TEXT NOT NULL UNIQUE, + url TEXT NOT NULL, + host TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + mimetype TEXT, + data BYTEA, + gemtext TEXT, + links JSONB, + lang TEXT, + response_code INTEGER, + error TEXT +); + +ALTER TABLE snapshots OWNER TO gemini; + +CREATE INDEX idx_uid ON snapshots (uid); +CREATE INDEX idx_url ON snapshots (url); +CREATE INDEX idx_timestamp ON snapshots (timestamp); +CREATE INDEX idx_mimetype ON snapshots (mimetype); +CREATE INDEX idx_lang ON snapshots (lang); +CREATE INDEX idx_response_code ON snapshots (response_code); +CREATE INDEX idx_error ON snapshots (error); +CREATE INDEX idx_host ON snapshots (host); +CREATE INDEX idx_response_code_error_nulls ON snapshots (response_code, error) WHERE response_code IS NULL AND error IS NULL; diff --git a/db/migrate1_host.go b/db/migrate1_host.go new file mode 100644 index 0000000..ebc43ae --- /dev/null +++ b/db/migrate1_host.go @@ -0,0 +1,99 @@ +package main + +import ( + "fmt" + "gemini-grc/gemini" + "os" + + _ "github.com/jackc/pgx/v5/stdlib" // PGX driver for PostgreSQL + "github.com/jmoiron/sqlx" +) + +func checkIfDone() bool { return true } + +// Populates the `host` field +func main() { + db := connectToDB() + + if checkIfDone() { + fmt.Println("Migration already applied") + return + } + + count := 0 + for { + // Start the transaction + tx, err := db.Beginx() + if err != nil { + fmt.Println(err) + return + } + + query := ` + SELECT * FROM snapshots + WHERE host IS NULL + LIMIT 5000 + ` + var snapshots []gemini.Snapshot + err = tx.Select(&snapshots, query) + if len(snapshots) == 0 { + fmt.Println("Done!") + return + } + if err != nil { + fmt.Println(err) + err := tx.Rollback() + if err != nil { + panic(err) + } + } + for _, s := range snapshots { + s.Host = s.URL.Hostname + fmt.Println(count, s.UID, s.URL.Hostname) + err := gemini.SaveSnapshotToDB(tx, &s) + if err != nil { + fmt.Println(err) + err := tx.Rollback() + if err != nil { + panic(err) + } + } + count += 1 + } + + err = tx.Commit() + if err != nil { + fmt.Println(err) + err := tx.Rollback() + if err != nil { + panic(err) + } + } + + } + +} + +func connectToDB() *sqlx.DB { + connStr := fmt.Sprintf("postgres://%s:%s@%s:%s/%s", + os.Getenv("PG_USER"), + os.Getenv("PG_PASSWORD"), + os.Getenv("PG_HOST"), + os.Getenv("PG_PORT"), + os.Getenv("PG_DATABASE"), + ) + + // Create a connection pool + db, err := sqlx.Open("pgx", connStr) + if err != nil { + panic(fmt.Sprintf("Unable to connect to database with URL %s: %v\n", connStr, err)) + } + db.SetMaxOpenConns(20) + err = db.Ping() + if err != nil { + panic(fmt.Sprintf("Unable to ping database: %v\n", err)) + } + + fmt.Println("Connected to database") + return db +} diff --git a/db/migrate1_host.sh b/db/migrate1_host.sh new file mode 100755 index 0000000..8463b89 --- /dev/null +++ b/db/migrate1_host.sh @@ -0,0 +1,14 @@ +#!/bin/sh +set -eu + +MAX_RESPONSE_SIZE=10485760 \ +LOG_LEVEL=info \ +ROOT_PATH=./snaps \ +RESPONSE_TIMEOUT=10 \ +NUM_OF_WORKERS=5 \ +PG_DATABASE=gemini \ +PG_HOST=127.0.0.1 \ +PG_PORT=5433 \ +PG_USER=gemini \ +PG_PASSWORD=gemini \ +go run ./migrate1_host.go diff --git a/db/pg_stats.sql b/db/pg_stats.sql new file mode 100644 index 0000000..f9a423d --- /dev/null +++ b/db/pg_stats.sql @@ -0,0 +1,12 @@ +SELECT + query, + total_exec_time AS total_time, -- total time spent on the query execution + calls, -- number of times the query has been called + mean_exec_time AS mean_time -- average time per execution +-- max_exec_time AS max_time -- maximum time taken for any single execution +FROM + pg_stat_statements +ORDER BY + total_exec_time DESC -- order by total execution time +LIMIT 5; + diff --git a/db/pg_stats_reset.sql b/db/pg_stats_reset.sql new file mode 100644 index 0000000..98b04e7 --- /dev/null +++ b/db/pg_stats_reset.sql @@ -0,0 +1 @@ +SELECT pg_stat_statements_reset(); diff --git a/db/populateDB.go b/db/populateDB.go new file mode 100644 index 0000000..a9de324 --- /dev/null +++ b/db/populateDB.go @@ -0,0 +1,20 @@ +// func PopulateDB(db *sqlx.DB) { +// // Delete all rows in the snapshots table +// db.MustExec("TRUNCATE snapshots;") + +// // Prepare the query for inserting a snapshot with uid, url, and timestamp +// query := `INSERT INTO snapshots(uid, url, timestamp) +// VALUES ($1, $2, $3)` + +// // Calculate the timestamp for 2 days ago +// timestamp := time.Now().Add(-48 * time.Hour) + +// db.MustExec(query, uid.UID(), "gemini://geminiprotocol.net/", timestamp) +// db.MustExec(query, uid.UID(), "gemini://warmedal.se/~antenna", timestamp) +// db.MustExec(query, uid.UID(), "gemini://skyjake.fi/~Cosmos/", timestamp) +// db.MustExec(query, uid.UID(), "gemini://gemini.circumlunar.space/capcom/", timestamp) +// db.MustExec(query, uid.UID(), "gemini://auragem.letz.dev/", timestamp) +// db.MustExec(query, uid.UID(), "gemini://gemplex.space/", timestamp) +// db.MustExec(query, uid.UID(), "gemini://kennedy.gemi.dev/", timestamp) +// db.MustExec(query, uid.UID(), "gemini://tlgs.one/", timestamp) +// } diff --git a/db/restore-table.sql b/db/restore-table.sql new file mode 100644 index 0000000..4d82aba --- /dev/null +++ b/db/restore-table.sql @@ -0,0 +1,9 @@ +BEGIN; + +SET statement_timeout = '10min'; + +TRUNCATE TABLE snapshots; + +INSERT INTO snapshots SELECT * FROM backup; + +COMMIT; diff --git a/db/show-dups.sql b/db/show-dups.sql new file mode 100644 index 0000000..65b2a6b --- /dev/null +++ b/db/show-dups.sql @@ -0,0 +1,9 @@ +WITH DuplicateSnapshots AS ( + SELECT id, + url, + ROW_NUMBER() OVER (PARTITION BY url ORDER BY id) AS row_num + FROM snapshots +) +SELECT * +FROM DuplicateSnapshots +WHERE row_num > 1; diff --git a/db/stats.sql b/db/stats.sql new file mode 100644 index 0000000..afbba51 --- /dev/null +++ b/db/stats.sql @@ -0,0 +1,5 @@ +SELECT + COUNT(CASE WHEN response_code IS NOT NULL AND error IS NULL THEN 1 END) AS "Visited", + COUNT(CASE WHEN response_code IS NULL THEN 1 END) AS "Pending", + COUNT(CASE WHEN error IS NOT NULL THEN 1 END) AS "Errors" +FROM snapshots; diff --git a/error_urls.txt b/error_urls.txt new file mode 100644 index 0000000..a817dd7 --- /dev/null +++ b/error_urls.txt @@ -0,0 +1,8 @@ +// body with null byte +gemini://kennedy.gemi.dev/archive/cached?url=gemini://spam.works/mirrors/textfiles/fun/consult.how&t=638427244900000000&raw=False + +// has invalid url +gemini://tlgs.one/known-hosts + +// Needs SNI TLS info (our bug) +gemini://hanzbrix.pollux.casa/gemlog/20241002.gmi diff --git a/gemini/blacklist.go b/gemini/blacklist.go new file mode 100644 index 0000000..0b56e8f --- /dev/null +++ b/gemini/blacklist.go @@ -0,0 +1,18 @@ +package gemini + +import "strings" + +var Blacklist *[]string + +func InBlacklist(s *Snapshot) bool { + if Blacklist == nil { + data := ReadLines("blacklist.txt") + Blacklist = &data + } + for _, l := range *Blacklist { + if strings.HasPrefix(s.URL.String(), l) { + return true + } + } + return false +} diff --git a/gemini/connectionPool.go b/gemini/connectionPool.go new file mode 100644 index 0000000..a1eab2f --- /dev/null +++ b/gemini/connectionPool.go @@ -0,0 +1,32 @@ +package gemini + +import ( + "gemini-grc/logging" +) + +var IpPool IpAddressPool = IpAddressPool{IPs: make(map[string]int)} + +func AddIPsToPool(IPs []string) { + IpPool.Lock.Lock() + for _, ip := range IPs { + logging.LogDebug("Adding %s to pool", ip) + IpPool.IPs[ip]++ + } + IpPool.Lock.Unlock() +} + +func RemoveIPsFromPool(IPs []string) { + IpPool.Lock.Lock() + for _, ip := range IPs { + _, ok := IpPool.IPs[ip] + if ok { + logging.LogDebug("Removing %s from pool", ip) + if IpPool.IPs[ip] == 1 { + delete(IpPool.IPs, ip) + } else { + IpPool.IPs[ip]-- + } + } + } + IpPool.Lock.Unlock() +} diff --git a/fs.go b/gemini/files.go similarity index 72% rename from fs.go rename to gemini/files.go index 24c2ea4..362737b 100644 --- a/fs.go +++ b/gemini/files.go @@ -1,7 +1,8 @@ -package main +package gemini import ( "fmt" + "gemini-grc/logging" "net/url" "os" "path" @@ -61,7 +62,7 @@ func calcFilePath(rootPath, urlPath string) (string, error) { return finalPath, nil } -func SaveSnapshot(rootPath string, s *Snapshot) { +func SaveToFile(rootPath string, s *Snapshot, done chan struct{}) { parentPath := path.Join(rootPath, s.URL.Hostname) urlPath := s.URL.Path // If path is empty, add `index.gmi` as the file to save @@ -76,17 +77,37 @@ func SaveSnapshot(rootPath string, s *Snapshot) { finalPath, err := calcFilePath(parentPath, urlPath) if err != nil { - LogError("Error saving %s: %w", s.URL, err) + logging.LogError("Error saving %s: %w", s.URL, err) return } // Ensure the directory exists dir := filepath.Dir(finalPath) if err := os.MkdirAll(dir, os.ModePerm); err != nil { - LogError("Failed to create directory: %w", err) + logging.LogError("Failed to create directory: %w", err) return } - err = os.WriteFile(finalPath, []byte((*s).Data), 0666) - if err != nil { - LogError("Error saving %s: %w", s.URL.Full, err) + if s.MimeType.Valid && s.MimeType.String == "text/gemini" { + err = os.WriteFile(finalPath, (*s).Data.V, 0666) + } else { + err = os.WriteFile(finalPath, []byte((*s).GemText.String), 0666) } + if err != nil { + logging.LogError("Error saving %s: %w", s.URL.Full, err) + } + close(done) +} + +func ReadLines(path string) []string { + data, err := os.ReadFile(path) + if err != nil { + panic(fmt.Sprintf("Failed to read blacklist file: %s", err)) + } + lines := strings.Split(string(data), "\n") + // Remove last line if empty + // (happens when file ends with '\n') + if lines[len(lines)-1] == "" { + lines = lines[:len(lines)-1] + } + logging.LogInfo("Loaded %d blacklist URLs", len(lines)) + return lines } diff --git a/gemini.go b/gemini/gemini.go similarity index 78% rename from gemini.go rename to gemini/gemini.go index ed1b52d..f7341f6 100644 --- a/gemini.go +++ b/gemini/gemini.go @@ -1,13 +1,36 @@ -package main +package gemini import ( "errors" "fmt" + "gemini-grc/logging" "net/url" + go_url "net/url" "regexp" "strconv" + "strings" ) +func isGeminiURL(url string) bool { + _, err := go_url.Parse(url) + if err != nil { + logging.LogWarn("[%s] Invalid URL: %v", url, err) + return false + } + return strings.HasPrefix(url, "gemini://") +} + +func parseLinks(s Snapshot, queue chan string) { + for _, link := range *s.Links { + if strings.HasPrefix(link.Full, "gemini://") { + go func(link GeminiUrl) { + // fmt.Printf("LINK: %s\n", link) + queue <- link.Full + }(link) + } + } +} + func checkGeminiStatusCode(code int) error { switch { case code == 20: @@ -29,21 +52,26 @@ func checkGeminiStatusCode(code int) error { func ProcessGemini(snapshot *Snapshot) *Snapshot { // Grab link lines - linkLines := ExtractLinkLines(snapshot.GemText) - LogDebug("[%s] Found %d links", snapshot.URL.String(), len(linkLines)) + linkLines := ExtractLinkLines(snapshot.GemText.String) + logging.LogDebug("[%s] Found %d links", snapshot.URL.String(), len(linkLines)) // Normalize URLs in links, and store them in snapshot for _, line := range linkLines { normalizedLink, descr, error := NormalizeLink(line, snapshot.URL.String()) if error != nil { - LogError("[%s] Invalid link URL %w", snapshot.URL.String(), error) + logging.LogWarn("Cannot normalize URL in line '%s': %v", line, error) continue } geminiUrl, error := ParseUrl(normalizedLink, descr) if error != nil { - LogError("[%s] Unparseable gemini link %w", snapshot.URL.String(), error) + logging.LogWarn("Cannot parse URL in link '%s': %v", line, error) + continue + } + if snapshot.Links == nil { + snapshot.Links = &LinkList{*geminiUrl} + } else { + *snapshot.Links = append(*snapshot.Links, *geminiUrl) } - snapshot.Links = append(snapshot.Links, *geminiUrl) } return snapshot } @@ -85,7 +113,7 @@ func NormalizeLink(linkLine string, currentURL string) (link string, descr strin // Parse the current URL baseURL, err := url.Parse(currentURL) if err != nil { - return "", "", fmt.Errorf("invalid current URL: %v", err) + return "", "", fmt.Errorf("Invalid current URL: %v", err) } // Regular expression to extract the URL part from a link line @@ -99,7 +127,7 @@ func NormalizeLink(linkLine string, currentURL string) (link string, descr strin } originalURLStr := matches[1] - decodedURLStr, err := url.QueryUnescape(originalURLStr) + _, err = url.QueryUnescape(originalURLStr) if err != nil { return "", "", fmt.Errorf("Error decoding URL: %w", err) } @@ -110,10 +138,10 @@ func NormalizeLink(linkLine string, currentURL string) (link string, descr strin } // Parse the URL from the link line - parsedURL, err := url.Parse(decodedURLStr) + parsedURL, err := url.Parse(originalURLStr) if err != nil { // If URL parsing fails, return an error - return "", "", fmt.Errorf("Invalid URL in link line '%s': %v", decodedURLStr, err) + return "", "", fmt.Errorf("Invalid URL '%s': %v", originalURLStr, err) } // Resolve relative URLs against the base URL diff --git a/gemini_url.go b/gemini/gemini_url.go similarity index 57% rename from gemini_url.go rename to gemini/gemini_url.go index 41f9a1b..1c29302 100644 --- a/gemini_url.go +++ b/gemini/gemini_url.go @@ -1,7 +1,9 @@ -package main +package gemini import ( "encoding/json" + "fmt" + "gemini-grc/logging" ) type GeminiUrl struct { @@ -13,6 +15,24 @@ type GeminiUrl struct { Full string `json:"full,omitempty"` } +func (g *GeminiUrl) Scan(value interface{}) error { + if value == nil { + // Clear the fields in the current GeminiUrl object (not the pointer itself) + *g = GeminiUrl{} + return nil + } + b, ok := value.(string) + if !ok { + return fmt.Errorf("failed to scan GeminiUrl: expected string, got %T", value) + } + parsedUrl, err := ParseUrl(b, "") + if err != nil { + return err + } + *g = *parsedUrl + return nil +} + func (u GeminiUrl) String() string { return u.Full // return fmt.Sprintf("%s://%s:%d%s", u.Protocol, u.Hostname, u.Port, u.Path) @@ -22,7 +42,7 @@ func GeminiUrltoJSON(g GeminiUrl) string { // Serialize the Person struct to JSON jsonData, err := json.Marshal(g) if err != nil { - LogError("Error serializing to JSON: %w", err) + logging.LogError("Error serializing to JSON: %w", err) } return string(jsonData) } @@ -31,7 +51,7 @@ func GeminiUrlFromJSON(input string) GeminiUrl { var geminiUrl GeminiUrl err := json.Unmarshal([]byte(input), &geminiUrl) if err != nil { - LogError("Error deserializing from JSON: %w", err) + logging.LogError("Error deserializing from JSON: %w", err) } return geminiUrl } diff --git a/gemini/ip-address-pool.go b/gemini/ip-address-pool.go new file mode 100644 index 0000000..b87bec6 --- /dev/null +++ b/gemini/ip-address-pool.go @@ -0,0 +1,54 @@ +package gemini + +import "sync" + +// Used to limit requests per +// IP address. Maps IP address +// to number of active connections. +type IpAddressPool struct { + IPs map[string]int + Lock sync.RWMutex +} + +func (p *IpAddressPool) Set(key string, value int) { + p.Lock.Lock() // Lock for writing + defer p.Lock.Unlock() // Ensure mutex is unlocked after the write + p.IPs[key] = value +} + +func (p *IpAddressPool) Get(key string) int { + p.Lock.RLock() // Lock for reading + defer p.Lock.RUnlock() // Ensure mutex is unlocked after reading + if value, ok := p.IPs[key]; !ok { + return 0 + } else { + return value + } +} + +func (p *IpAddressPool) Delete(key string) { + p.Lock.Lock() + defer p.Lock.Unlock() + delete(p.IPs, key) +} + +func (p *IpAddressPool) Incr(key string) { + p.Lock.Lock() + defer p.Lock.Unlock() + if _, ok := p.IPs[key]; !ok { + p.IPs[key] = 1 + } else { + p.IPs[key] = p.IPs[key] + 1 + } +} + +func (p *IpAddressPool) Decr(key string) { + p.Lock.Lock() + defer p.Lock.Unlock() + if val, ok := p.IPs[key]; ok { + p.IPs[key] = val - 1 + if p.IPs[key] == 0 { + delete(p.IPs, key) + } + } +} diff --git a/gemini/network.go b/gemini/network.go new file mode 100644 index 0000000..b0acea8 --- /dev/null +++ b/gemini/network.go @@ -0,0 +1,192 @@ +package gemini + +import ( + "crypto/tls" + "fmt" + "gemini-grc/config" + "io" + "net" + "regexp" + "slices" + "strconv" + "time" + + "github.com/guregu/null/v5" +) + +// Resolve the URL hostname and +// check if we already have an open +// connection to this host. +// If we can connect, return a list +// of the resolved IPs. +func getHostIPAddresses(hostname string) ([]string, error) { + addrs, err := net.LookupHost(hostname) + if err != nil { + return nil, err + } + IpPool.Lock.RLock() + defer func() { + IpPool.Lock.RUnlock() + }() + return addrs, nil +} + +// Connect to given URL, using the Gemini protocol. +// Return a Snapshot with the data or the error. +// Any errors are stored within the snapshot. +func Visit(s *Snapshot) { + // Establish the underlying TCP connection. + host := fmt.Sprintf("%s:%d", s.Host, s.URL.Port) + dialer := &net.Dialer{ + Timeout: time.Duration(config.CONFIG.ResponseTimeout) * time.Second, // Set the overall connection timeout + KeepAlive: 30 * time.Second, + } + conn, err := dialer.Dial("tcp", host) + if err != nil { + s.Error = null.StringFrom(fmt.Sprintf("TCP connection failed: %v", err)) + return + } + // Make sure we always close the connection. + defer func() { + err := conn.Close() + if err != nil { + s.Error = null.StringFrom(fmt.Sprintf("Error closing connection: %s", err)) + } + }() + + // Set read and write timeouts on the TCP connection. + err = conn.SetReadDeadline(time.Now().Add(time.Duration(config.CONFIG.ResponseTimeout) * time.Second)) + if err != nil { + s.Error = null.StringFrom(fmt.Sprintf("Error setting connection deadline: %s", err)) + return + } + err = conn.SetWriteDeadline(time.Now().Add(time.Duration(config.CONFIG.ResponseTimeout) * time.Second)) + if err != nil { + s.Error = null.StringFrom(fmt.Sprintf("Error setting connection deadline: %s", err)) + return + } + + // Perform the TLS handshake + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, // Accept all TLS certs, even if insecure. + ServerName: s.URL.Hostname, // SNI + // MinVersion: tls.VersionTLS12, // Use a minimum TLS version. Warning breaks a lot of sites. + } + tlsConn := tls.Client(conn, tlsConfig) + if err := tlsConn.Handshake(); err != nil { + s.Error = null.StringFrom(fmt.Sprintf("TLS handshake error: %v", err)) + return + } + + // We read `buf`-sized chunks and add data to `data`. + buf := make([]byte, 4096) + var data []byte + + // Send Gemini request to trigger server response. + _, err = tlsConn.Write([]byte(fmt.Sprintf("%s\r\n", s.URL.String()))) + if err != nil { + s.Error = null.StringFrom(fmt.Sprintf("Error sending network request: %s", err)) + return + } + // Read response bytes in len(buf) byte chunks + for { + n, err := tlsConn.Read(buf) + if n > 0 { + data = append(data, buf[:n]...) + } + if len(data) > config.CONFIG.MaxResponseSize { + data = []byte{} + s.Error = null.StringFrom(fmt.Sprintf("Response size exceeded maximum of %d bytes", config.CONFIG.MaxResponseSize)) + } + if err != nil { + if err == io.EOF { + break + } else { + s.Error = null.StringFrom(fmt.Sprintf("Network error: %s", err)) + return + } + } + } + // Great, response data received. + err = processResponse(s, data) + if err != nil { + s.Error = null.StringFrom(err.Error()) + } + return +} + +// Update given snapshot with the +// Gemini header data: response code, +// mime type and lang (optional) +func processResponse(snapshot *Snapshot, data []byte) error { + headers, body, err := getHeadersAndData(data) + if err != nil { + return err + } + code, mimeType, lang := getMimeTypeAndLang(headers) + geminiError := checkGeminiStatusCode(code) + if geminiError != nil { + return geminiError + } + snapshot.ResponseCode = null.IntFrom(int64(code)) + snapshot.MimeType = null.StringFrom(mimeType) + snapshot.Lang = null.StringFrom(lang) + // If we've got a Gemini document, populate + // `GemText` field, otherwise raw data goes to `Data`. + if mimeType == "text/gemini" { + validBody, err := EnsureValidUTF8(body) + if err != nil { + return fmt.Errorf("UTF-8 error: %w", err) + } + snapshot.GemText = null.StringFrom(string(validBody)) + } else { + snapshot.Data = null.ValueFrom(body) + } + return nil +} + +// Checks for a Gemini header, which is +// basically the first line of the response +// and should contain the response code, +// mimeType and language. +func getHeadersAndData(data []byte) (firstLine string, rest []byte, err error) { + firstLineEnds := slices.Index(data, '\n') + if firstLineEnds == -1 { + return "", nil, fmt.Errorf("Could not parse response header") + } + firstLine = string(data[:firstLineEnds]) + rest = data[firstLineEnds+1:] + return string(firstLine), rest, nil +} + +// Parses code, mime type and language +// from a Gemini header. +// Examples: +// `20 text/gemini lang=en` (code, mimetype, lang) +// `20 text/gemini` (code, mimetype) +// `31 gemini://redirected.to/other/site` (code) +func getMimeTypeAndLang(headers string) (code int, mimeType string, lang string) { + // Regex that parses code, mimetype & lang + re := regexp.MustCompile(`^(\d+)\s+([a-zA-Z0-9/\-+]+)(?:[;\s]+(lang=([a-zA-Z0-9-]+)))?\s*$`) + matches := re.FindStringSubmatch(headers) + if matches == nil || len(matches) <= 1 { + // Try to get code at least. + re := regexp.MustCompile(`^(\d+)\s+`) + matches := re.FindStringSubmatch(headers) + if matches == nil || len(matches) <= 1 { + return 0, "", "" + } + code, err := strconv.Atoi(matches[1]) + if err != nil { + return 0, "", "" + } + return code, "", "" + } + code, err := strconv.Atoi(matches[1]) + if err != nil { + return 0, "", "" + } + mimeType = matches[2] + lang = matches[4] + return code, mimeType, lang +} diff --git a/gemini/network_test.go b/gemini/network_test.go new file mode 100644 index 0000000..4d72b13 --- /dev/null +++ b/gemini/network_test.go @@ -0,0 +1,48 @@ +package gemini + +import ( + "testing" +) + +// Test for input: `20 text/gemini` +func TestGetMimeTypeAndLang1(t *testing.T) { + code, mimeType, lang := getMimeTypeAndLang("20 text/gemini") + if code != 20 || mimeType != "text/gemini" || lang != "" { + t.Errorf("Expected (20, 'text/gemini', ''), got (%d, '%s', '%s')", code, mimeType, lang) + } +} + +func TestGetMimeTypeAndLang11(t *testing.T) { + code, mimeType, lang := getMimeTypeAndLang("20 text/gemini\n") + if code != 20 || mimeType != "text/gemini" || lang != "" { + t.Errorf("Expected (20, 'text/gemini', ''), got (%d, '%s', '%s')", code, mimeType, lang) + } +} + +func TestGetTypeAndLang2(t *testing.T) { + code, mimeType, lang := getMimeTypeAndLang("20 text/gemini lang=en") + if code != 20 || mimeType != "text/gemini" || lang != "en" { + t.Errorf("Expected (20, 'text/gemini', 'en'), got (%d, '%s', '%s')", code, mimeType, lang) + } +} + +func TestGetMimeTypeAndLang3(t *testing.T) { + code, mimeType, lang := getMimeTypeAndLang("31 gemini://redirect.to/page") + if code != 31 || mimeType != "" || lang != "" { + t.Errorf("Expected (20, '', ''), got (%d, '%s', '%s')", code, mimeType, lang) + } +} + +func TestGetMimeTypeAndLang4(t *testing.T) { + code, mimeType, lang := getMimeTypeAndLang("aaafdasdasd") + if code != 0 || mimeType != "" || lang != "" { + t.Errorf("Expected (0, '', ''), got (%d, '%s', '%s')", code, mimeType, lang) + } +} + +func TestGetMimeTypeAndLang5(t *testing.T) { + code, mimeType, lang := getMimeTypeAndLang("") + if code != 0 || mimeType != "" || lang != "" { + t.Errorf("Expected (0, '', ''), got (%d, '%s', '%s')", code, mimeType, lang) + } +} diff --git a/gemini/persistence.go b/gemini/persistence.go new file mode 100644 index 0000000..fea48ce --- /dev/null +++ b/gemini/persistence.go @@ -0,0 +1,72 @@ +package gemini + +import ( + "fmt" + "gemini-grc/logging" + "os" + + _ "github.com/jackc/pgx/v5/stdlib" // PGX driver for PostgreSQL + "github.com/jmoiron/sqlx" +) + +func ConnectToDB() *sqlx.DB { + connStr := fmt.Sprintf("postgres://%s:%s@%s:%s/%s", + os.Getenv("PG_USER"), + os.Getenv("PG_PASSWORD"), + os.Getenv("PG_HOST"), + os.Getenv("PG_PORT"), + os.Getenv("PG_DATABASE"), + ) + + // Create a connection pool + db, err := sqlx.Open("pgx", connStr) + if err != nil { + panic(fmt.Sprintf("Unable to connect to database with URL %s: %v\n", connStr, err)) + } + db.SetMaxOpenConns(20) + err = db.Ping() + if err != nil { + panic(fmt.Sprintf("Unable to ping database: %v\n", err)) + } + + logging.LogDebug("Connected to database") + return db +} + +func SaveSnapshotToDB(tx *sqlx.Tx, s *Snapshot) error { + query := ` + INSERT INTO snapshots (uid, url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error) + VALUES (:uid, :url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error) + ON CONFLICT (uid) DO UPDATE SET + url = EXCLUDED.url, + host = EXCLUDED.host, + timestamp = EXCLUDED.timestamp, + mimetype = EXCLUDED.mimetype, + data = EXCLUDED.data, + gemtext = EXCLUDED.gemtext, + links = EXCLUDED.links, + lang = EXCLUDED.lang, + response_code = EXCLUDED.response_code, + error = EXCLUDED.error + ` + _, err := tx.NamedExec(query, s) + if err != nil { + logging.LogError("[%s] [%s] Error upserting snapshot: %w", s.URL, s.MimeType.String, err) + return fmt.Errorf("DB error: %w", err) // Return the error instead of panicking + } + return nil +} + +func SaveLinkToDB(tx *sqlx.Tx, s *Snapshot) error { + query := ` + INSERT INTO snapshots (uid, url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error) + VALUES (:uid, :url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error) + ON CONFLICT (uid) DO NOTHING + ` + _, err := tx.NamedExec(query, s) + if err != nil { + logging.LogError("[%s] [%s] Error upserting snapshot: %w", s.URL, s.MimeType.String, err) + return fmt.Errorf("DB error: %w", err) // Return the error instead of panicking + } + return nil +} diff --git a/gemini/processing.go b/gemini/processing.go new file mode 100644 index 0000000..aa15fd4 --- /dev/null +++ b/gemini/processing.go @@ -0,0 +1,33 @@ +package gemini + +import ( + "bytes" + "fmt" + "io" + "unicode/utf8" + + "golang.org/x/text/encoding/charmap" + "golang.org/x/text/transform" +) + +func EnsureValidUTF8(input []byte) (string, error) { + // Remove NULL byte 0x00 + inputNoNull := bytes.ReplaceAll(input, []byte{0}, nil) + isValidUTF8 := utf8.Valid(inputNoNull) + if !isValidUTF8 { + encodings := []transform.Transformer{ + charmap.ISO8859_1.NewDecoder(), // First try ISO8859-1 + charmap.Windows1252.NewDecoder(), // Then try Windows-1252, etc + // TODO: Try more encodings? + } + for _, encoding := range encodings { + reader := transform.NewReader(bytes.NewReader(inputNoNull), encoding) + result, err := io.ReadAll(reader) + if err != nil { + return "", fmt.Errorf("UTF-8 error: %w", err) + } + return string(result), nil + } + } + return string(inputNoNull), nil +} diff --git a/gemini/processing_test.go b/gemini/processing_test.go new file mode 100644 index 0000000..d47209f --- /dev/null +++ b/gemini/processing_test.go @@ -0,0 +1,13 @@ +package gemini + +import "testing" + +// Make sure NULL bytes are removed +func TestEnsureValidUTF8(t *testing.T) { + // Create a string with a null byte + strWithNull := "Hello" + string('\x00') + "world" + result, _ := EnsureValidUTF8([]byte(strWithNull)) + if result != "Helloworld" { + t.Errorf("Expected string without NULL byte, got %s", result) + } +} diff --git a/gemini/snapshot.go b/gemini/snapshot.go new file mode 100644 index 0000000..d2c3ed7 --- /dev/null +++ b/gemini/snapshot.go @@ -0,0 +1,74 @@ +package gemini + +import ( + "database/sql/driver" + "encoding/json" + "fmt" + "gemini-grc/logging" + "strings" + + "github.com/guregu/null/v5" +) + +type LinkList []GeminiUrl + +func (l LinkList) Value() (driver.Value, error) { + return json.Marshal(l) +} + +func (l *LinkList) Scan(value interface{}) error { + if value == nil { + *l = nil + return nil + } + b, ok := value.([]byte) // Type assertion! Converts to []byte + if !ok { + return fmt.Errorf("failed to scan LinkList: expected []byte, got %T", value) + } + return json.Unmarshal(b, l) +} + +type Snapshot struct { + ID int `db:"id" json:"id,omitempty"` + UID string `db:"uid" json:"uid,omitempty"` + URL GeminiUrl `db:"url" json:"url,omitempty"` + Host string `db:"host" json:"host,omitempty"` + Timestamp null.Time `db:"timestamp" json:"timestamp,omitempty"` + MimeType null.String `db:"mimetype" json:"mimetype,omitempty"` + Data null.Value[[]byte] `db:"data" json:"data,omitempty"` // For non text/gemini files. + GemText null.String `db:"gemtext" json:"gemtext,omitempty"` // For text/gemini files. + Links *LinkList `db:"links" json:"links,omitempty"` + Lang null.String `db:"lang" json:"lang,omitempty"` + ResponseCode null.Int `db:"response_code" json:"code,omitempty"` // Gemini response status code. + Error null.String `db:"error" json:"error,omitempty"` // On network errors only +} + +func SnapshotToJSON(g Snapshot) string { + // Serialize the Person struct to JSON + jsonData, err := json.MarshalIndent(g, "", "\t") + if err != nil { + logging.LogError("Error serializing to JSON: %w", err) + } + return string(jsonData) +} + +func SnapshotFromJSON(input string) Snapshot { + var snapshot Snapshot + err := json.Unmarshal([]byte(input), &snapshot) + if err != nil { + logging.LogError("Error deserializing from JSON: %w", err) + } + return snapshot +} + +func ShouldPersistSnapshot(result *Snapshot) bool { + if !result.MimeType.Valid { + return false + } + if result.MimeType.String == "text/gemini" || + strings.HasPrefix(result.MimeType.String, "image/") || + strings.HasPrefix(result.MimeType.String, "text/") { + return true + } + return false +} diff --git a/gemini/worker.go b/gemini/worker.go new file mode 100644 index 0000000..5480182 --- /dev/null +++ b/gemini/worker.go @@ -0,0 +1,218 @@ +package gemini + +import ( + "database/sql" + "fmt" + "gemini-grc/config" + "gemini-grc/logging" + "gemini-grc/uid" + "runtime/debug" + "strings" + "time" + + "github.com/guregu/null/v5" + "github.com/jmoiron/sqlx" +) + +func SpawnWorkers(numOfWorkers int, db *sqlx.DB) { + logging.LogInfo("Spawning %d workers", numOfWorkers) + for i := 0; i < numOfWorkers; i++ { + go func(i int) { + for { + runWorker(i, db) + } + }(i) + } +} + +func printPoolIPs() { + fmt.Printf("%v", IpPool.IPs) +} + +func workOnSnapshot(id int, tx *sqlx.Tx, s *Snapshot) (err error) { + // Wrap errors with more info. + defer func() { + if err != nil { + err = fmt.Errorf("[%d] Worker Error: %w", id, err) + } + }() + + IPs, err := getHostIPAddresses(s.Host) + if err != nil { + s.Error = null.StringFrom("DNS Resolve error") + err = SaveSnapshotToDB(tx, s) + if err != nil { + return fmt.Errorf("[%d] DB Error: %w", id, err) + } + return nil + } + + // If the host's ip is in the pool, stop + // and add the url in the queue later. + IpPool.Lock.RLock() + logging.LogDebug("[%d] [%s] Checking pool for IP", id, s.URL) + for _, ip := range IPs { + _, ok := IpPool.IPs[ip] + if ok { + logging.LogDebug("[%d] Another worker is visiting this host: %s", id, s.URL) + IpPool.Lock.RUnlock() + time.Sleep(1 * time.Second) // Avoid flood-retrying when few URLs remain + return nil + } + } + IpPool.Lock.RUnlock() + + AddIPsToPool(IPs) + + url := s.URL.String() + logging.LogDebug("[%d] Dialing %s", id, url) + Visit(s) + logging.LogDebug("[%d] Finished dialing.", id) + + go func() { + time.Sleep(5 * time.Second) + RemoveIPsFromPool(IPs) + }() + + if s.MimeType.Valid && s.MimeType.String == "text/gemini" { + logging.LogDebug("[%d] [%s] Processing", id, url) + s = ProcessGemini(s) + } + logging.LogDebug("[%d] Saving", id) + err = SaveSnapshotToDB(tx, s) + if err != nil { + return fmt.Errorf("[%d] DB Error: %w", id, err) + } + + // Store links + if s.Links != nil { + for _, link := range *s.Links { + newSnapshot := Snapshot{UID: uid.UID(), URL: link, Host: link.Hostname, Timestamp: null.TimeFrom(time.Now())} + if shouldPersistURL(tx, link) { + logging.LogDebug("[%d] Saving link %s", id, link) + err = SaveLinkToDB(tx, &newSnapshot) + if err != nil { + return fmt.Errorf("[%d] DB Error: %w", id, err) + } + } + } + } + return nil +} + +func shouldPersistURL(tx *sqlx.Tx, u GeminiUrl) bool { + if !strings.HasPrefix(u.String(), "gemini://") { + return false + } + query := `SELECT EXISTS(SELECT 1 FROM snapshots WHERE URL=$1)` + var exists bool + err := tx.Get(&exists, query, u.String()) + if err != nil { + fmt.Println("Error executing query:", err) + return false + } + return !exists +} + +// Select a random snapshot. +func GetRandomSnapshot(tx *sqlx.Tx) (*Snapshot, error) { + query := `SELECT * FROM snapshots + WHERE response_code IS NULL + AND error IS NULL + ORDER BY RANDOM() + LIMIT 1 + FOR UPDATE SKIP LOCKED` + // AND (timestamp < NOW() - INTERVAL '1 day' OR timestamp IS NULL) + var snapshot Snapshot + err := tx.Get(&snapshot, query) + if err != nil { + if err == sql.ErrNoRows { + // Handle the case where no rows were found + return nil, nil + } + // Handle other potential errors + return nil, err + } + return &snapshot, nil +} + +func GetRandomSnapshots(tx *sqlx.Tx) ([]Snapshot, error) { + query := ` + SELECT * FROM snapshots + WHERE response_code IS NULL + AND error IS NULL + ORDER BY RANDOM() + LIMIT $1 + FOR UPDATE SKIP LOCKED + ` + var snapshots []Snapshot + err := tx.Select(&snapshots, query, config.CONFIG.WorkerBatchSize) + if err != nil { + return nil, err + } + return snapshots, nil +} + +func GetRandomSnapshotsDistinctHosts(tx *sqlx.Tx) ([]Snapshot, error) { + query := ` + SELECT DISTINCT ON (host) * + FROM snapshots + WHERE response_code IS NULL + AND error IS NULL + ORDER BY host, RANDOM() + LIMIT $1 + ` + var snapshots []Snapshot + err := tx.Select(&snapshots, query, config.CONFIG.WorkerBatchSize) + if err != nil { + return nil, err + } + return snapshots, nil +} + +func runWorker(id int, db *sqlx.DB) { + // Start the transaction + tx, err := db.Beginx() + if err != nil { + logging.LogError("Failed to begin transaction: %w", err) + } + + defer func() { + err = tx.Commit() + if err != nil { + logging.LogError("[%d] Failed to commit transaction: %w", id, err) + tx.Rollback() + } + }() + + snapshots, err := GetRandomSnapshotsDistinctHosts(tx) + + if err != nil { + logging.LogError("[%d] Error retrieving snapshot: %w", id, err) + time.Sleep(10 * time.Second) + return + } else if len(snapshots) == 0 { + logging.LogInfo("[%d] No remaining snapshots to visit.", id) + time.Sleep(1 * time.Minute) + return + } + total := len(snapshots) + for i, s := range snapshots { + if InBlacklist(&s) { + logging.LogWarn("[%d] Ignoring %d/%d blacklisted URL %s", id, i+1, total, s.URL) + } + logging.LogInfo("[%d] Starting %d/%d %s", id, i+1, total, s.URL) + err = workOnSnapshot(id, tx, &s) + if err != nil { + logging.LogError("[%d] [%s] Error %w", id, s.URL, err) + // TODO Remove panic and gracefully handle/log error + fmt.Printf("Error %s Stack trace:\n%s", err, debug.Stack()) + panic("ERROR encountered") + } + if s.Error.Valid { + logging.LogWarn("[%d] [%s] Error: %v", id, s.URL, fmt.Errorf(s.Error.String)) + } + logging.LogDebug("[%d] Done %d/%d.", id, i, total) + } + logging.LogInfo("[%d] Worker done.", id) +} diff --git a/go.mod b/go.mod index 467f629..1f53aad 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,18 @@ require ( ) require ( + github.com/guregu/null/v5 v5.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.7.1 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/jmoiron/sqlx v1.4.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + golang.org/x/crypto v0.27.0 // indirect golang.org/x/exp v0.0.0-20241004190924-225e2abe05e6 // indirect golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect ) diff --git a/go.sum b/go.sum index f5cf7b2..9f7f26a 100644 --- a/go.sum +++ b/go.sum @@ -1,25 +1,54 @@ +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gabriel-vasile/mimetype v1.4.5 h1:J7wGKdGu33ocBOhGy0z653k/lFKLFDPJMG8Gql0kxn4= github.com/gabriel-vasile/mimetype v1.4.5/go.mod h1:ibHel+/kbxn9x2407k1izTA1S81ku1z/DlgOW2QE0M4= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/guregu/null/v5 v5.0.0 h1:PRxjqyOekS11W+w/7Vfz6jgJE/BCwELWtgvOJzddimw= +github.com/guregu/null/v5 v5.0.0/go.mod h1:SjupzNy+sCPtwQTKWhUCqjhVCO69hpsl2QsZrWHjlwU= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jaevor/go-nanoid v1.4.0 h1:mPz0oi3CrQyEtRxeRq927HHtZCJAAtZ7zdy7vOkrvWs= github.com/jaevor/go-nanoid v1.4.0/go.mod h1:GIpPtsvl3eSBsjjIEFQdzzgpi50+Bo1Luk+aYlbJzlc= +github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= +github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/exp v0.0.0-20241004190924-225e2abe05e6 h1:1wqE9dj9NpSm04INVsJhhEUzhuDVjbcyKH91sVyPATw= golang.org/x/exp v0.0.0-20241004190924-225e2abe05e6/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logging.go b/logging/logging.go similarity index 96% rename from logging.go rename to logging/logging.go index 0a54e83..3b8ec62 100644 --- a/logging.go +++ b/logging/logging.go @@ -1,4 +1,4 @@ -package main +package logging import ( "fmt" diff --git a/main.go b/main.go index 0692efa..ab2aab4 100644 --- a/main.go +++ b/main.go @@ -1,137 +1,56 @@ package main import ( + "gemini-grc/config" + "gemini-grc/gemini" + "gemini-grc/logging" "os" - "strings" + "os/signal" "sync" - "time" + "syscall" + + "github.com/jmoiron/sqlx" "github.com/rs/zerolog" zlog "github.com/rs/zerolog/log" ) -var CONFIG Config - var wg sync.WaitGroup func main() { wg.Add(1) - CONFIG = *getConfig() + config.CONFIG = *config.GetConfig() zerolog.TimeFieldFormat = zerolog.TimeFormatUnix - zerolog.SetGlobalLevel(CONFIG.logLevel) + zerolog.SetGlobalLevel(config.CONFIG.LogLevel) zlog.Logger = zlog.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "[2006-01-02 15:04:05]"}) if err := runApp(); err != nil { - LogError("Application error: %w", err) + logging.LogError("Application error: %w", err) os.Exit(1) } } func runApp() error { - // urls := []string{"gemini://smol.gr"} - // urls := []string{"gemini://gemini.circumlunar.space/users/solderpunk/gemlog/orphans-of-netscape.gmi"} // Test 31 redirect - // urls := []string{"gemini://zaibatsu.circumlunar.space/~solderpunk/gemlog/orphans-of-netscape.gmi"} - // urls := []string{"gemini://farcaster.net/berlin/dared.jpg"} - // urls := []string{"gemini://smol.gr/media/amstrad_cpc_6128.jpg", "https://go.dev/blog/go-brand/Go-Logo/PNG/Go-Logo_Blue.png"} - urls := []string{"gemini://tlgs.one/", "gemini://gmi.noulin.net/", "gemini://warmedal.se/~antenna/"} + logging.LogInfo("Starting up. Press Ctrl+C to exit") + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - queue := make(chan string, 1000) - results := make(chan Snapshot, 100) - done := make(chan struct{}) + db := gemini.ConnectToDB() - go spawnStatsReport(queue, results) - go resultsHandler(queue, results) - spawnWorkers(CONFIG.numOfWorkers, queue, results) + // !!! DANGER !!! + // Removes all rows and adds some seed URLs. + // populateDB(db) - for _, url := range urls { - queue <- url - } - <-done + defer func(db *sqlx.DB) { + err := db.Close() + if err != nil { + // TODO properly log & hangle error + panic(err) + } + }(db) + + go gemini.SpawnWorkers(config.CONFIG.NumOfWorkers, db) + + <-sigs + logging.LogInfo("Received SIGINT or SIGTERM signal, exiting") return nil } - -func spawnStatsReport(queue chan string, results chan Snapshot) { - ticker := time.NewTicker(time.Duration(time.Second * 10)) - defer ticker.Stop() - for range ticker.C { - LogInfo("Queue length: %d", len(queue)) - LogInfo("Results length: %d", len(results)) - } -} - -func spawnWorkers(numOfWorkers int, queue <-chan string, results chan Snapshot) { - LogInfo("Spawning %d workers", numOfWorkers) - for i := 0; i < numOfWorkers; i++ { - go func(i int) { - worker(i, queue, results) - }(i) - } -} - -func resultsHandler(queue chan string, results <-chan Snapshot) { - for result := range results { - if result.Error != nil { - LogError("[%s] %w", result.URL, result.Error) - } else { - LogDebug("[%s] Done", result.URL) - for _, link := range result.Links { - if strings.HasPrefix(link.Full, "gemini://") { - go func(link GeminiUrl) { - queue <- link.Full - // fmt.Printf("Sent %s to queue\n", link.Full) - }(link) - } - } - // if result.MimeType == "text/gemini" { - // result.Data = "" - // fmt.Printf(SnapshotToJSON(result)) - // } - } - } -} - -func worker(id int, queue <-chan string, results chan Snapshot) { - for url := range queue { - if !shouldVisit(url) { - LogInfo("Skipping %s", url) - continue - } - LogInfo("Worker %d visiting %s", id, url) - result, err := Visit(url) - if err != nil { - LogError("[%s] %w", url, err) - continue - } - // If we encountered an error when - // visiting, skip processing - if result.Error != nil { - results <- *result - continue - } - LogDebug("Worker %d processing %s", id, url) - if result.MimeType == "text/gemini" { - result = ProcessGemini(result) - } - if shouldPersist(result) { - LogDebug("Worker %d saving %s", id, url) - SaveSnapshot(CONFIG.rootPath, result) - } - results <- *result - // time.Sleep(time.Duration(rand.IntN(5)) * time.Second) - } -} - -func shouldVisit(url string) bool { - if !strings.HasPrefix(url, "gemini://") { - return false - } - return true -} - -func shouldPersist(result *Snapshot) bool { - if result.MimeType == "text/gemini" || - strings.HasPrefix(result.MimeType, "image/") || - strings.HasPrefix(result.MimeType, "text/") { - return true - } - return false -} diff --git a/network.go b/network.go deleted file mode 100644 index 9274667..0000000 --- a/network.go +++ /dev/null @@ -1,117 +0,0 @@ -package main - -import ( - "crypto/tls" - "fmt" - "io" - "regexp" - "slices" - "strconv" - "time" -) - -func Visit(url string) (snapshot *Snapshot, err error) { - snapshot = &Snapshot{Timestamp: time.Now(), UID: UID()} - - geminiUrl, err := ParseUrl(url, "") - if err != nil { - snapshot.Error = fmt.Errorf("[%s] %w", url, err) - return snapshot, nil - } - snapshot.URL = *geminiUrl - - LogDebug("[%s] Connecting", geminiUrl) - - // Establish a TLS connection - tlsConfig := &tls.Config{ - InsecureSkipVerify: true, - } - conn, err := tls.Dial("tcp", fmt.Sprintf("%s:%d", geminiUrl.Hostname, geminiUrl.Port), tlsConfig) - if err != nil { - snapshot.Error = err - return snapshot, nil - } - // Defer properly: Also handle possible - // error of conn.Close() - defer func() { - err := conn.Close() - if err != nil { - snapshot.Error = fmt.Errorf("[%s] Closing connection error, ignoring: %w", snapshot.URL.String(), err) - } - }() - - // Read data from the connection - conn.SetReadDeadline(time.Now().Add(time.Duration(CONFIG.responseTimeout) * time.Second)) - buf := make([]byte, 4096) - var data []byte - - // Write Gemini request to get response. - // paths := []string{"/", ".", ""} - // if slices.Contains(paths, geminiUrl.Path) || strings.HasSuffix(geminiUrl.Path, "gmi") { - conn.Write([]byte(fmt.Sprintf("%s\r\n", geminiUrl.String()))) - // } - - // Read response bytes in len(buf) byte chunks - for { - n, err := conn.Read(buf) - if n > 0 { - data = append(data, buf[:n]...) - } - if len(data) > CONFIG.maxResponseSize { - snapshot.Error = fmt.Errorf("[%s] Response size exceeded maximum of %d bytes", url, CONFIG.maxResponseSize) - return snapshot, nil - } - if err != nil { - if err == io.EOF { - break - } else { - snapshot.Error = fmt.Errorf("[%s] %w", url, err) - return snapshot, nil - } - } - } - LogDebug("[%s] Received %d bytes", geminiUrl.String(), len(data)) - err = processResponse(snapshot, data) - if err != nil { - snapshot.Error = fmt.Errorf("%w", err) - } - return snapshot, nil -} - -func processResponse(snapshot *Snapshot, data []byte) error { - headers, body, err := getHeadersAndData(data) - if err != nil { - return err - } - code, mimeType, lang := getMimeTypeAndLang(headers) - snapshot.ResponseCode, snapshot.MimeType, snapshot.Lang, snapshot.Data = code, mimeType, lang, body - if mimeType == "text/gemini" { - snapshot.GemText = string(body) - } - return nil -} - -func getHeadersAndData(data []byte) (string, []byte, error) { - firstLineEnds := slices.Index(data, '\n') - if firstLineEnds == -1 { - return "", nil, fmt.Errorf("Could not parse response header") - } - firstLine := data[:firstLineEnds] - rest := data[firstLineEnds+1:] - return string(firstLine), rest, nil -} - -func getMimeTypeAndLang(headers string) (int, string, string) { - re := regexp.MustCompile(`^(\d+)\s+([a-zA-Z0-9/\-+]+)[;\s]+(lang=([a-zA-Z0-9-]+))?`) - matches := re.FindStringSubmatch(headers) - if matches == nil || len(matches) <= 1 { - return 0, "", "" - } - code, err := strconv.Atoi(matches[1]) - if err != nil { - return 0, "", "" - } - mimeType := matches[2] - lang := matches[4] - return code, mimeType, lang -} diff --git a/snapshot.go b/snapshot.go deleted file mode 100644 index ad94368..0000000 --- a/snapshot.go +++ /dev/null @@ -1,48 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "time" -) - -type Snapshot struct { - UID string `json:"uid,omitempty"` - URL GeminiUrl `json:"url,omitempty"` - Timestamp time.Time `json:"timestamp,omitempty"` - MimeType string `json:"mimetype,omitempty"` - Data []byte `json:"data,omitempty"` - GemText string `json:"gemtext,omitempty"` - Links []GeminiUrl `json:"links,omitempty"` - Lang string `json:"lang,omitempty"` - // Gemini status code - ResponseCode int `json:"code,omitempty"` - // On network errors, for Gemini server errors - // we have ResponseCode above. - Error error `json:"error,omitempty"` -} - -func (u Snapshot) String() string { - return fmt.Sprintf( - "[%s] %s %s %s %d %s %s %s", - u.UID, u.URL, u.Timestamp, u.Links, u.ResponseCode, u.MimeType, u.Lang, u.Error, - ) -} - -func SnapshotToJSON(g Snapshot) string { - // Serialize the Person struct to JSON - jsonData, err := json.Marshal(g) - if err != nil { - LogError("Error serializing to JSON: %w", err) - } - return string(jsonData) -} - -func SnapshotFromJSON(input string) Snapshot { - var snapshot Snapshot - err := json.Unmarshal([]byte(input), &snapshot) - if err != nil { - LogError("Error deserializing from JSON: %w", err) - } - return snapshot -} diff --git a/uid.go b/uid/uid.go similarity index 81% rename from uid.go rename to uid/uid.go index 0df6238..d0af93f 100644 --- a/uid.go +++ b/uid/uid.go @@ -1,4 +1,4 @@ -package main +package uid import ( nanoid "github.com/jaevor/go-nanoid" @@ -6,7 +6,7 @@ import ( func UID() string { // Missing o,O and l - uid, err := nanoid.CustomASCII("abcdefghijkmnpqrstuvwxyzABCDEFGHIJKLMNPQRSTUVWXYZ0123456789", 18) + uid, err := nanoid.CustomASCII("abcdefghijkmnpqrstuvwxyzABCDEFGHIJKLMNPQRSTUVWXYZ0123456789", 20) if err != nil { panic(err) }