Lots of features, first version that reliably crawls Geminispace.
- [x] Concurrent downloading with workers - [x] Concurrent connection limit per host - [x] URL Blacklist - [x] Save image/* and text/* files - [x] Configuration via environment variables - [x] Storing snapshots in PostgreSQL - [x] Proper response header & body UTF-8 and format validation . . .
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -1,6 +1,8 @@
|
||||
.idea
|
||||
**/.#*
|
||||
**/*~
|
||||
/run.sh
|
||||
/cmd
|
||||
/db/initdb.sql
|
||||
/run*.sh
|
||||
/gemini-grc
|
||||
/snaps
|
||||
|
||||
28
README.md
28
README.md
@@ -2,21 +2,29 @@
|
||||
|
||||
A Gemini crawler.
|
||||
|
||||
## Done
|
||||
- [x] Concurrent downloading with workers
|
||||
- [x] Concurrent connection limit per host
|
||||
- [x] URL Blacklist
|
||||
- [x] Save image/* and text/* files
|
||||
- [x] Configuration via environment variables
|
||||
- [x] Storing snapshots in PostgreSQL
|
||||
- [x] Proper response header & body UTF-8 and format validation
|
||||
|
||||
## TODO
|
||||
- [ ] Save image/* and text/* files
|
||||
- [ ] Wide events logging
|
||||
- [ ] Handle URLs that need presentation of a TLS cert? Like astrobotany
|
||||
+ [ ] Probably have a common "grc" cert for all
|
||||
- [ ] Follow robots.txt gemini://geminiprotocol.net/docs/companion/
|
||||
- [ ] Test with gemini://alexey.shpakovsky.ru/maze
|
||||
- [ ] Proper handling of all response codes
|
||||
- [ ] Handle 3X redirects properly
|
||||
- [ ] Handle URLs that need presentation of a TLS cert, like astrobotany
|
||||
+ [ ] Probably have a common "grc" cert for all?
|
||||
- [ ] Proper input and response validations:
|
||||
+ [ ] When making a request, the URI MUST NOT exceed 1024 bytes
|
||||
+ [ ] Response headers MUST be UTF-8 encoded text and MUST NOT begin with the Byte Order Mark U+FEFF.
|
||||
- [ ] Proper handling of all response codes
|
||||
- [ ] Proper validation (or logging) of invalid/expired TLS certs?
|
||||
- [ ] Subscribe to gemini pages? gemini://geminiprotocol.net/docs/companion/
|
||||
- [ ] Follow robots.txt gemini://geminiprotocol.net/docs/companion/
|
||||
- [ ] Subscriptions to gemini pages? gemini://geminiprotocol.net/docs/companion/
|
||||
|
||||
## TODO later
|
||||
## TODO for later
|
||||
- [ ] Add other protocols
|
||||
+ [ ] Gopher
|
||||
+ [ ] Scroll gemini://auragem.letz.dev/devlog/20240316.gmi
|
||||
+ [ ] Spartan
|
||||
+ [ ] Nex
|
||||
|
||||
5
blacklist.txt
Normal file
5
blacklist.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
gemini://gemi.dev/cgi-bin/waffle.cgi
|
||||
gemini://alexey.shpakovsky.ru/maze
|
||||
gemini://kennedy.gemi.dev
|
||||
gemini://musicbrainz.uploadedlobster.com
|
||||
gemini://gemini.bunburya.eu/remini
|
||||
@@ -1,4 +1,4 @@
|
||||
package main
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -9,19 +9,23 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
logLevel zerolog.Level
|
||||
LogLevel zerolog.Level
|
||||
rootPath string
|
||||
numOfWorkers int
|
||||
maxResponseSize int
|
||||
responseTimeout int
|
||||
MaxResponseSize int
|
||||
NumOfWorkers int
|
||||
ResponseTimeout int
|
||||
WorkerBatchSize int
|
||||
}
|
||||
|
||||
func getConfig() *Config {
|
||||
var CONFIG Config
|
||||
|
||||
func GetConfig() *Config {
|
||||
var config Config
|
||||
for _, envVar := range []string{
|
||||
"LOG_LEVEL",
|
||||
"ROOT_PATH",
|
||||
"NUM_OF_WORKERS",
|
||||
"WORKER_BATCH_SIZE",
|
||||
"MAX_RESPONSE_SIZE",
|
||||
"RESPONSE_TIMEOUT",
|
||||
} {
|
||||
@@ -37,7 +41,7 @@ func getConfig() *Config {
|
||||
fmt.Fprintf(os.Stderr, "Invalid LOG_LEVEL value\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
config.logLevel = logLevel
|
||||
config.LogLevel = logLevel
|
||||
}
|
||||
case "ROOT_PATH":
|
||||
{
|
||||
@@ -49,7 +53,16 @@ func getConfig() *Config {
|
||||
fmt.Fprintf(os.Stderr, "Invalid NUM_OF_WORKERS value\n")
|
||||
os.Exit(1)
|
||||
} else {
|
||||
config.numOfWorkers = numOfWorkers
|
||||
config.NumOfWorkers = numOfWorkers
|
||||
}
|
||||
}
|
||||
case "WORKER_BATCH_SIZE":
|
||||
{
|
||||
if workerBatchSize, err := strconv.Atoi(env); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Invalid WORKER_BATCH_SIZE value\n")
|
||||
os.Exit(1)
|
||||
} else {
|
||||
config.WorkerBatchSize = workerBatchSize
|
||||
}
|
||||
}
|
||||
case "MAX_RESPONSE_SIZE":
|
||||
@@ -58,7 +71,7 @@ func getConfig() *Config {
|
||||
fmt.Fprintf(os.Stderr, "Invalid MAX_RESPONSE_SIZE value\n")
|
||||
os.Exit(1)
|
||||
} else {
|
||||
config.maxResponseSize = maxResponseSize
|
||||
config.MaxResponseSize = maxResponseSize
|
||||
}
|
||||
}
|
||||
case "RESPONSE_TIMEOUT":
|
||||
@@ -67,7 +80,7 @@ func getConfig() *Config {
|
||||
fmt.Fprintf(os.Stderr, "Invalid RESPONSE_TIMEOUT value\n")
|
||||
os.Exit(1)
|
||||
} else {
|
||||
config.responseTimeout = val
|
||||
config.ResponseTimeout = val
|
||||
}
|
||||
}
|
||||
}
|
||||
16
db/backup-table.sql
Normal file
16
db/backup-table.sql
Normal file
@@ -0,0 +1,16 @@
|
||||
BEGIN;
|
||||
|
||||
-- Increase statement timeout
|
||||
SET statement_timeout = '10min';
|
||||
|
||||
-- Step 1: Create a new table with the same schema
|
||||
CREATE TABLE backup (LIKE snapshots INCLUDING ALL);
|
||||
|
||||
-- Step 2: Copy data from the old table to the new one
|
||||
INSERT INTO backup SELECT * FROM snapshots;
|
||||
|
||||
-- (Optional) Step 3: Truncate the original table if you are moving the data
|
||||
-- TRUNCATE TABLE snapshots;
|
||||
|
||||
-- Commit the transaction if everything went well
|
||||
COMMIT;
|
||||
26
db/delete-dups.sql
Normal file
26
db/delete-dups.sql
Normal file
@@ -0,0 +1,26 @@
|
||||
-- Explanation:
|
||||
|
||||
-- WITH DuplicateSnapshots AS:
|
||||
-- This is a Common Table Expression (CTE) that selects all rows from the snapshots table.
|
||||
-- ROW_NUMBER() OVER (PARTITION BY url ORDER BY id): This assigns a unique row number to each row with the same url. The PARTITION BY url groups the rows by url, and ORDER BY id ensures that the row with the smallest id is given row_num = 1.
|
||||
-- DELETE FROM snapshots WHERE id IN:
|
||||
-- The DELETE statement deletes rows from the snapshots table where the id is in the result of the subquery.
|
||||
-- WHERE row_num > 1:
|
||||
-- In the subquery, we select only rows where row_num > 1, which means only the duplicate rows (since row_num = 1 is the one row we want to keep).
|
||||
|
||||
-- Result:
|
||||
|
||||
-- This query will delete all duplicate rows from the snapshots table, keeping only the row with the smallest id for each url.
|
||||
-- If multiple rows share the same url, only the first one (based on id) will be retained.
|
||||
|
||||
WITH DuplicateSnapshots AS (
|
||||
SELECT id,
|
||||
ROW_NUMBER() OVER (PARTITION BY url ORDER BY id) AS row_num
|
||||
FROM snapshots
|
||||
)
|
||||
DELETE FROM snapshots
|
||||
WHERE id IN (
|
||||
SELECT id
|
||||
FROM DuplicateSnapshots
|
||||
WHERE row_num > 1
|
||||
);
|
||||
5
db/host_stats.sql
Normal file
5
db/host_stats.sql
Normal file
@@ -0,0 +1,5 @@
|
||||
SELECT host, COUNT(*) AS row_count
|
||||
FROM snapshots
|
||||
GROUP BY host
|
||||
ORDER BY row_count DESC
|
||||
LIMIT 10;
|
||||
45
db/initdb.sql
Normal file
45
db/initdb.sql
Normal file
@@ -0,0 +1,45 @@
|
||||
-- DB creation and users
|
||||
CREATE USER gemini;
|
||||
ALTER USER gemini WITH PASSWORD 'gemini';
|
||||
CREATE DATABASE gemini;
|
||||
GRANT ALL PRIVILEGES ON DATABASE gemini TO gemini;
|
||||
ALTER DATABASE gemini OWNER TO gemini;
|
||||
GRANT ALL PRIVILEGES ON SCHEMA public TO gemini;
|
||||
GRANT ALL PRIVILEGES ON gemini TO gemini;
|
||||
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO gemini;
|
||||
|
||||
-- Extensions
|
||||
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
||||
|
||||
|
||||
\c gemini
|
||||
|
||||
-- Tables
|
||||
DROP TABLE IF EXISTS snapshots;
|
||||
|
||||
CREATE TABLE snapshots (
|
||||
id SERIAL PRIMARY KEY,
|
||||
uid TEXT NOT NULL UNIQUE,
|
||||
url TEXT NOT NULL,
|
||||
host TEXT NOT NULL,
|
||||
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
mimetype TEXT,
|
||||
data BYTEA,
|
||||
gemtext TEXT,
|
||||
links JSONB,
|
||||
lang TEXT,
|
||||
response_code INTEGER,
|
||||
error TEXT
|
||||
);
|
||||
|
||||
ALTER TABLE snapshots OWNER TO gemini;
|
||||
|
||||
CREATE INDEX idx_uid ON snapshots (uid);
|
||||
CREATE INDEX idx_url ON snapshots (url);
|
||||
CREATE INDEX idx_timestamp ON snapshots (timestamp);
|
||||
CREATE INDEX idx_mimetype ON snapshots (mimetype);
|
||||
CREATE INDEX idx_lang ON snapshots (lang);
|
||||
CREATE INDEX idx_response_code ON snapshots (response_code);
|
||||
CREATE INDEX idx_error ON snapshots (error);
|
||||
CREATE INDEX idx_host ON snapshots (host);
|
||||
CREATE INDEX idx_response_code_error_nulls ON snapshots (response_code, error) WHERE response_code IS NULL AND error IS NULL;
|
||||
99
db/migrate1_host.go
Normal file
99
db/migrate1_host.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gemini-grc/gemini"
|
||||
"os"
|
||||
|
||||
_ "github.com/jackc/pgx/v5/stdlib" // PGX driver for PostgreSQL
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
func checkIfDone() bool { return true }
|
||||
|
||||
// Populates the `host` field
|
||||
func main() {
|
||||
db := connectToDB()
|
||||
|
||||
if checkIfDone() {
|
||||
fmt.Println("Migration already applied")
|
||||
return
|
||||
}
|
||||
|
||||
count := 0
|
||||
for {
|
||||
// Start the transaction
|
||||
tx, err := db.Beginx()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT * FROM snapshots
|
||||
WHERE host IS NULL
|
||||
LIMIT 5000
|
||||
`
|
||||
var snapshots []gemini.Snapshot
|
||||
err = tx.Select(&snapshots, query)
|
||||
if len(snapshots) == 0 {
|
||||
fmt.Println("Done!")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
err := tx.Rollback()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
for _, s := range snapshots {
|
||||
s.Host = s.URL.Hostname
|
||||
fmt.Println(count, s.UID, s.URL.Hostname)
|
||||
err := gemini.SaveSnapshotToDB(tx, &s)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
err := tx.Rollback()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
count += 1
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
err := tx.Rollback()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func connectToDB() *sqlx.DB {
|
||||
connStr := fmt.Sprintf("postgres://%s:%s@%s:%s/%s",
|
||||
os.Getenv("PG_USER"),
|
||||
os.Getenv("PG_PASSWORD"),
|
||||
os.Getenv("PG_HOST"),
|
||||
os.Getenv("PG_PORT"),
|
||||
os.Getenv("PG_DATABASE"),
|
||||
)
|
||||
|
||||
// Create a connection pool
|
||||
db, err := sqlx.Open("pgx", connStr)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to connect to database with URL %s: %v\n", connStr, err))
|
||||
}
|
||||
db.SetMaxOpenConns(20)
|
||||
err = db.Ping()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to ping database: %v\n", err))
|
||||
}
|
||||
|
||||
fmt.Println("Connected to database")
|
||||
return db
|
||||
}
|
||||
14
db/migrate1_host.sh
Executable file
14
db/migrate1_host.sh
Executable file
@@ -0,0 +1,14 @@
|
||||
#!/bin/sh
|
||||
set -eu
|
||||
|
||||
MAX_RESPONSE_SIZE=10485760 \
|
||||
LOG_LEVEL=info \
|
||||
ROOT_PATH=./snaps \
|
||||
RESPONSE_TIMEOUT=10 \
|
||||
NUM_OF_WORKERS=5 \
|
||||
PG_DATABASE=gemini \
|
||||
PG_HOST=127.0.0.1 \
|
||||
PG_PORT=5433 \
|
||||
PG_USER=gemini \
|
||||
PG_PASSWORD=gemini \
|
||||
go run ./migrate1_host.go
|
||||
12
db/pg_stats.sql
Normal file
12
db/pg_stats.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
SELECT
|
||||
query,
|
||||
total_exec_time AS total_time, -- total time spent on the query execution
|
||||
calls, -- number of times the query has been called
|
||||
mean_exec_time AS mean_time -- average time per execution
|
||||
-- max_exec_time AS max_time -- maximum time taken for any single execution
|
||||
FROM
|
||||
pg_stat_statements
|
||||
ORDER BY
|
||||
total_exec_time DESC -- order by total execution time
|
||||
LIMIT 5;
|
||||
|
||||
1
db/pg_stats_reset.sql
Normal file
1
db/pg_stats_reset.sql
Normal file
@@ -0,0 +1 @@
|
||||
SELECT pg_stat_statements_reset();
|
||||
20
db/populateDB.go
Normal file
20
db/populateDB.go
Normal file
@@ -0,0 +1,20 @@
|
||||
// func PopulateDB(db *sqlx.DB) {
|
||||
// // Delete all rows in the snapshots table
|
||||
// db.MustExec("TRUNCATE snapshots;")
|
||||
|
||||
// // Prepare the query for inserting a snapshot with uid, url, and timestamp
|
||||
// query := `INSERT INTO snapshots(uid, url, timestamp)
|
||||
// VALUES ($1, $2, $3)`
|
||||
|
||||
// // Calculate the timestamp for 2 days ago
|
||||
// timestamp := time.Now().Add(-48 * time.Hour)
|
||||
|
||||
// db.MustExec(query, uid.UID(), "gemini://geminiprotocol.net/", timestamp)
|
||||
// db.MustExec(query, uid.UID(), "gemini://warmedal.se/~antenna", timestamp)
|
||||
// db.MustExec(query, uid.UID(), "gemini://skyjake.fi/~Cosmos/", timestamp)
|
||||
// db.MustExec(query, uid.UID(), "gemini://gemini.circumlunar.space/capcom/", timestamp)
|
||||
// db.MustExec(query, uid.UID(), "gemini://auragem.letz.dev/", timestamp)
|
||||
// db.MustExec(query, uid.UID(), "gemini://gemplex.space/", timestamp)
|
||||
// db.MustExec(query, uid.UID(), "gemini://kennedy.gemi.dev/", timestamp)
|
||||
// db.MustExec(query, uid.UID(), "gemini://tlgs.one/", timestamp)
|
||||
// }
|
||||
9
db/restore-table.sql
Normal file
9
db/restore-table.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
BEGIN;
|
||||
|
||||
SET statement_timeout = '10min';
|
||||
|
||||
TRUNCATE TABLE snapshots;
|
||||
|
||||
INSERT INTO snapshots SELECT * FROM backup;
|
||||
|
||||
COMMIT;
|
||||
9
db/show-dups.sql
Normal file
9
db/show-dups.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
WITH DuplicateSnapshots AS (
|
||||
SELECT id,
|
||||
url,
|
||||
ROW_NUMBER() OVER (PARTITION BY url ORDER BY id) AS row_num
|
||||
FROM snapshots
|
||||
)
|
||||
SELECT *
|
||||
FROM DuplicateSnapshots
|
||||
WHERE row_num > 1;
|
||||
5
db/stats.sql
Normal file
5
db/stats.sql
Normal file
@@ -0,0 +1,5 @@
|
||||
SELECT
|
||||
COUNT(CASE WHEN response_code IS NOT NULL AND error IS NULL THEN 1 END) AS "Visited",
|
||||
COUNT(CASE WHEN response_code IS NULL THEN 1 END) AS "Pending",
|
||||
COUNT(CASE WHEN error IS NOT NULL THEN 1 END) AS "Errors"
|
||||
FROM snapshots;
|
||||
8
error_urls.txt
Normal file
8
error_urls.txt
Normal file
@@ -0,0 +1,8 @@
|
||||
// body with null byte
|
||||
gemini://kennedy.gemi.dev/archive/cached?url=gemini://spam.works/mirrors/textfiles/fun/consult.how&t=638427244900000000&raw=False
|
||||
|
||||
// has invalid url
|
||||
gemini://tlgs.one/known-hosts
|
||||
|
||||
// Needs SNI TLS info (our bug)
|
||||
gemini://hanzbrix.pollux.casa/gemlog/20241002.gmi
|
||||
18
gemini/blacklist.go
Normal file
18
gemini/blacklist.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package gemini
|
||||
|
||||
import "strings"
|
||||
|
||||
var Blacklist *[]string
|
||||
|
||||
func InBlacklist(s *Snapshot) bool {
|
||||
if Blacklist == nil {
|
||||
data := ReadLines("blacklist.txt")
|
||||
Blacklist = &data
|
||||
}
|
||||
for _, l := range *Blacklist {
|
||||
if strings.HasPrefix(s.URL.String(), l) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
32
gemini/connectionPool.go
Normal file
32
gemini/connectionPool.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"gemini-grc/logging"
|
||||
)
|
||||
|
||||
var IpPool IpAddressPool = IpAddressPool{IPs: make(map[string]int)}
|
||||
|
||||
func AddIPsToPool(IPs []string) {
|
||||
IpPool.Lock.Lock()
|
||||
for _, ip := range IPs {
|
||||
logging.LogDebug("Adding %s to pool", ip)
|
||||
IpPool.IPs[ip]++
|
||||
}
|
||||
IpPool.Lock.Unlock()
|
||||
}
|
||||
|
||||
func RemoveIPsFromPool(IPs []string) {
|
||||
IpPool.Lock.Lock()
|
||||
for _, ip := range IPs {
|
||||
_, ok := IpPool.IPs[ip]
|
||||
if ok {
|
||||
logging.LogDebug("Removing %s from pool", ip)
|
||||
if IpPool.IPs[ip] == 1 {
|
||||
delete(IpPool.IPs, ip)
|
||||
} else {
|
||||
IpPool.IPs[ip]--
|
||||
}
|
||||
}
|
||||
}
|
||||
IpPool.Lock.Unlock()
|
||||
}
|
||||
@@ -1,7 +1,8 @@
|
||||
package main
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gemini-grc/logging"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
@@ -61,7 +62,7 @@ func calcFilePath(rootPath, urlPath string) (string, error) {
|
||||
return finalPath, nil
|
||||
}
|
||||
|
||||
func SaveSnapshot(rootPath string, s *Snapshot) {
|
||||
func SaveToFile(rootPath string, s *Snapshot, done chan struct{}) {
|
||||
parentPath := path.Join(rootPath, s.URL.Hostname)
|
||||
urlPath := s.URL.Path
|
||||
// If path is empty, add `index.gmi` as the file to save
|
||||
@@ -76,17 +77,37 @@ func SaveSnapshot(rootPath string, s *Snapshot) {
|
||||
|
||||
finalPath, err := calcFilePath(parentPath, urlPath)
|
||||
if err != nil {
|
||||
LogError("Error saving %s: %w", s.URL, err)
|
||||
logging.LogError("Error saving %s: %w", s.URL, err)
|
||||
return
|
||||
}
|
||||
// Ensure the directory exists
|
||||
dir := filepath.Dir(finalPath)
|
||||
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
|
||||
LogError("Failed to create directory: %w", err)
|
||||
logging.LogError("Failed to create directory: %w", err)
|
||||
return
|
||||
}
|
||||
err = os.WriteFile(finalPath, []byte((*s).Data), 0666)
|
||||
if err != nil {
|
||||
LogError("Error saving %s: %w", s.URL.Full, err)
|
||||
if s.MimeType.Valid && s.MimeType.String == "text/gemini" {
|
||||
err = os.WriteFile(finalPath, (*s).Data.V, 0666)
|
||||
} else {
|
||||
err = os.WriteFile(finalPath, []byte((*s).GemText.String), 0666)
|
||||
}
|
||||
if err != nil {
|
||||
logging.LogError("Error saving %s: %w", s.URL.Full, err)
|
||||
}
|
||||
close(done)
|
||||
}
|
||||
|
||||
func ReadLines(path string) []string {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to read blacklist file: %s", err))
|
||||
}
|
||||
lines := strings.Split(string(data), "\n")
|
||||
// Remove last line if empty
|
||||
// (happens when file ends with '\n')
|
||||
if lines[len(lines)-1] == "" {
|
||||
lines = lines[:len(lines)-1]
|
||||
}
|
||||
logging.LogInfo("Loaded %d blacklist URLs", len(lines))
|
||||
return lines
|
||||
}
|
||||
@@ -1,13 +1,36 @@
|
||||
package main
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"gemini-grc/logging"
|
||||
"net/url"
|
||||
go_url "net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func isGeminiURL(url string) bool {
|
||||
_, err := go_url.Parse(url)
|
||||
if err != nil {
|
||||
logging.LogWarn("[%s] Invalid URL: %v", url, err)
|
||||
return false
|
||||
}
|
||||
return strings.HasPrefix(url, "gemini://")
|
||||
}
|
||||
|
||||
func parseLinks(s Snapshot, queue chan string) {
|
||||
for _, link := range *s.Links {
|
||||
if strings.HasPrefix(link.Full, "gemini://") {
|
||||
go func(link GeminiUrl) {
|
||||
// fmt.Printf("LINK: %s\n", link)
|
||||
queue <- link.Full
|
||||
}(link)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkGeminiStatusCode(code int) error {
|
||||
switch {
|
||||
case code == 20:
|
||||
@@ -29,21 +52,26 @@ func checkGeminiStatusCode(code int) error {
|
||||
|
||||
func ProcessGemini(snapshot *Snapshot) *Snapshot {
|
||||
// Grab link lines
|
||||
linkLines := ExtractLinkLines(snapshot.GemText)
|
||||
LogDebug("[%s] Found %d links", snapshot.URL.String(), len(linkLines))
|
||||
linkLines := ExtractLinkLines(snapshot.GemText.String)
|
||||
logging.LogDebug("[%s] Found %d links", snapshot.URL.String(), len(linkLines))
|
||||
|
||||
// Normalize URLs in links, and store them in snapshot
|
||||
for _, line := range linkLines {
|
||||
normalizedLink, descr, error := NormalizeLink(line, snapshot.URL.String())
|
||||
if error != nil {
|
||||
LogError("[%s] Invalid link URL %w", snapshot.URL.String(), error)
|
||||
logging.LogWarn("Cannot normalize URL in line '%s': %v", line, error)
|
||||
continue
|
||||
}
|
||||
geminiUrl, error := ParseUrl(normalizedLink, descr)
|
||||
if error != nil {
|
||||
LogError("[%s] Unparseable gemini link %w", snapshot.URL.String(), error)
|
||||
logging.LogWarn("Cannot parse URL in link '%s': %v", line, error)
|
||||
continue
|
||||
}
|
||||
if snapshot.Links == nil {
|
||||
snapshot.Links = &LinkList{*geminiUrl}
|
||||
} else {
|
||||
*snapshot.Links = append(*snapshot.Links, *geminiUrl)
|
||||
}
|
||||
snapshot.Links = append(snapshot.Links, *geminiUrl)
|
||||
}
|
||||
return snapshot
|
||||
}
|
||||
@@ -85,7 +113,7 @@ func NormalizeLink(linkLine string, currentURL string) (link string, descr strin
|
||||
// Parse the current URL
|
||||
baseURL, err := url.Parse(currentURL)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("invalid current URL: %v", err)
|
||||
return "", "", fmt.Errorf("Invalid current URL: %v", err)
|
||||
}
|
||||
|
||||
// Regular expression to extract the URL part from a link line
|
||||
@@ -99,7 +127,7 @@ func NormalizeLink(linkLine string, currentURL string) (link string, descr strin
|
||||
}
|
||||
|
||||
originalURLStr := matches[1]
|
||||
decodedURLStr, err := url.QueryUnescape(originalURLStr)
|
||||
_, err = url.QueryUnescape(originalURLStr)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("Error decoding URL: %w", err)
|
||||
}
|
||||
@@ -110,10 +138,10 @@ func NormalizeLink(linkLine string, currentURL string) (link string, descr strin
|
||||
}
|
||||
|
||||
// Parse the URL from the link line
|
||||
parsedURL, err := url.Parse(decodedURLStr)
|
||||
parsedURL, err := url.Parse(originalURLStr)
|
||||
if err != nil {
|
||||
// If URL parsing fails, return an error
|
||||
return "", "", fmt.Errorf("Invalid URL in link line '%s': %v", decodedURLStr, err)
|
||||
return "", "", fmt.Errorf("Invalid URL '%s': %v", originalURLStr, err)
|
||||
}
|
||||
|
||||
// Resolve relative URLs against the base URL
|
||||
@@ -1,7 +1,9 @@
|
||||
package main
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"gemini-grc/logging"
|
||||
)
|
||||
|
||||
type GeminiUrl struct {
|
||||
@@ -13,6 +15,24 @@ type GeminiUrl struct {
|
||||
Full string `json:"full,omitempty"`
|
||||
}
|
||||
|
||||
func (g *GeminiUrl) Scan(value interface{}) error {
|
||||
if value == nil {
|
||||
// Clear the fields in the current GeminiUrl object (not the pointer itself)
|
||||
*g = GeminiUrl{}
|
||||
return nil
|
||||
}
|
||||
b, ok := value.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to scan GeminiUrl: expected string, got %T", value)
|
||||
}
|
||||
parsedUrl, err := ParseUrl(b, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*g = *parsedUrl
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u GeminiUrl) String() string {
|
||||
return u.Full
|
||||
// return fmt.Sprintf("%s://%s:%d%s", u.Protocol, u.Hostname, u.Port, u.Path)
|
||||
@@ -22,7 +42,7 @@ func GeminiUrltoJSON(g GeminiUrl) string {
|
||||
// Serialize the Person struct to JSON
|
||||
jsonData, err := json.Marshal(g)
|
||||
if err != nil {
|
||||
LogError("Error serializing to JSON: %w", err)
|
||||
logging.LogError("Error serializing to JSON: %w", err)
|
||||
}
|
||||
return string(jsonData)
|
||||
}
|
||||
@@ -31,7 +51,7 @@ func GeminiUrlFromJSON(input string) GeminiUrl {
|
||||
var geminiUrl GeminiUrl
|
||||
err := json.Unmarshal([]byte(input), &geminiUrl)
|
||||
if err != nil {
|
||||
LogError("Error deserializing from JSON: %w", err)
|
||||
logging.LogError("Error deserializing from JSON: %w", err)
|
||||
}
|
||||
return geminiUrl
|
||||
}
|
||||
54
gemini/ip-address-pool.go
Normal file
54
gemini/ip-address-pool.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package gemini
|
||||
|
||||
import "sync"
|
||||
|
||||
// Used to limit requests per
|
||||
// IP address. Maps IP address
|
||||
// to number of active connections.
|
||||
type IpAddressPool struct {
|
||||
IPs map[string]int
|
||||
Lock sync.RWMutex
|
||||
}
|
||||
|
||||
func (p *IpAddressPool) Set(key string, value int) {
|
||||
p.Lock.Lock() // Lock for writing
|
||||
defer p.Lock.Unlock() // Ensure mutex is unlocked after the write
|
||||
p.IPs[key] = value
|
||||
}
|
||||
|
||||
func (p *IpAddressPool) Get(key string) int {
|
||||
p.Lock.RLock() // Lock for reading
|
||||
defer p.Lock.RUnlock() // Ensure mutex is unlocked after reading
|
||||
if value, ok := p.IPs[key]; !ok {
|
||||
return 0
|
||||
} else {
|
||||
return value
|
||||
}
|
||||
}
|
||||
|
||||
func (p *IpAddressPool) Delete(key string) {
|
||||
p.Lock.Lock()
|
||||
defer p.Lock.Unlock()
|
||||
delete(p.IPs, key)
|
||||
}
|
||||
|
||||
func (p *IpAddressPool) Incr(key string) {
|
||||
p.Lock.Lock()
|
||||
defer p.Lock.Unlock()
|
||||
if _, ok := p.IPs[key]; !ok {
|
||||
p.IPs[key] = 1
|
||||
} else {
|
||||
p.IPs[key] = p.IPs[key] + 1
|
||||
}
|
||||
}
|
||||
|
||||
func (p *IpAddressPool) Decr(key string) {
|
||||
p.Lock.Lock()
|
||||
defer p.Lock.Unlock()
|
||||
if val, ok := p.IPs[key]; ok {
|
||||
p.IPs[key] = val - 1
|
||||
if p.IPs[key] == 0 {
|
||||
delete(p.IPs, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
192
gemini/network.go
Normal file
192
gemini/network.go
Normal file
@@ -0,0 +1,192 @@
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"gemini-grc/config"
|
||||
"io"
|
||||
"net"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/guregu/null/v5"
|
||||
)
|
||||
|
||||
// Resolve the URL hostname and
|
||||
// check if we already have an open
|
||||
// connection to this host.
|
||||
// If we can connect, return a list
|
||||
// of the resolved IPs.
|
||||
func getHostIPAddresses(hostname string) ([]string, error) {
|
||||
addrs, err := net.LookupHost(hostname)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
IpPool.Lock.RLock()
|
||||
defer func() {
|
||||
IpPool.Lock.RUnlock()
|
||||
}()
|
||||
return addrs, nil
|
||||
}
|
||||
|
||||
// Connect to given URL, using the Gemini protocol.
|
||||
// Return a Snapshot with the data or the error.
|
||||
// Any errors are stored within the snapshot.
|
||||
func Visit(s *Snapshot) {
|
||||
// Establish the underlying TCP connection.
|
||||
host := fmt.Sprintf("%s:%d", s.Host, s.URL.Port)
|
||||
dialer := &net.Dialer{
|
||||
Timeout: time.Duration(config.CONFIG.ResponseTimeout) * time.Second, // Set the overall connection timeout
|
||||
KeepAlive: 30 * time.Second,
|
||||
}
|
||||
conn, err := dialer.Dial("tcp", host)
|
||||
if err != nil {
|
||||
s.Error = null.StringFrom(fmt.Sprintf("TCP connection failed: %v", err))
|
||||
return
|
||||
}
|
||||
// Make sure we always close the connection.
|
||||
defer func() {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
s.Error = null.StringFrom(fmt.Sprintf("Error closing connection: %s", err))
|
||||
}
|
||||
}()
|
||||
|
||||
// Set read and write timeouts on the TCP connection.
|
||||
err = conn.SetReadDeadline(time.Now().Add(time.Duration(config.CONFIG.ResponseTimeout) * time.Second))
|
||||
if err != nil {
|
||||
s.Error = null.StringFrom(fmt.Sprintf("Error setting connection deadline: %s", err))
|
||||
return
|
||||
}
|
||||
err = conn.SetWriteDeadline(time.Now().Add(time.Duration(config.CONFIG.ResponseTimeout) * time.Second))
|
||||
if err != nil {
|
||||
s.Error = null.StringFrom(fmt.Sprintf("Error setting connection deadline: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Perform the TLS handshake
|
||||
tlsConfig := &tls.Config{
|
||||
InsecureSkipVerify: true, // Accept all TLS certs, even if insecure.
|
||||
ServerName: s.URL.Hostname, // SNI
|
||||
// MinVersion: tls.VersionTLS12, // Use a minimum TLS version. Warning breaks a lot of sites.
|
||||
}
|
||||
tlsConn := tls.Client(conn, tlsConfig)
|
||||
if err := tlsConn.Handshake(); err != nil {
|
||||
s.Error = null.StringFrom(fmt.Sprintf("TLS handshake error: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// We read `buf`-sized chunks and add data to `data`.
|
||||
buf := make([]byte, 4096)
|
||||
var data []byte
|
||||
|
||||
// Send Gemini request to trigger server response.
|
||||
_, err = tlsConn.Write([]byte(fmt.Sprintf("%s\r\n", s.URL.String())))
|
||||
if err != nil {
|
||||
s.Error = null.StringFrom(fmt.Sprintf("Error sending network request: %s", err))
|
||||
return
|
||||
}
|
||||
// Read response bytes in len(buf) byte chunks
|
||||
for {
|
||||
n, err := tlsConn.Read(buf)
|
||||
if n > 0 {
|
||||
data = append(data, buf[:n]...)
|
||||
}
|
||||
if len(data) > config.CONFIG.MaxResponseSize {
|
||||
data = []byte{}
|
||||
s.Error = null.StringFrom(fmt.Sprintf("Response size exceeded maximum of %d bytes", config.CONFIG.MaxResponseSize))
|
||||
}
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else {
|
||||
s.Error = null.StringFrom(fmt.Sprintf("Network error: %s", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
// Great, response data received.
|
||||
err = processResponse(s, data)
|
||||
if err != nil {
|
||||
s.Error = null.StringFrom(err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Update given snapshot with the
|
||||
// Gemini header data: response code,
|
||||
// mime type and lang (optional)
|
||||
func processResponse(snapshot *Snapshot, data []byte) error {
|
||||
headers, body, err := getHeadersAndData(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
code, mimeType, lang := getMimeTypeAndLang(headers)
|
||||
geminiError := checkGeminiStatusCode(code)
|
||||
if geminiError != nil {
|
||||
return geminiError
|
||||
}
|
||||
snapshot.ResponseCode = null.IntFrom(int64(code))
|
||||
snapshot.MimeType = null.StringFrom(mimeType)
|
||||
snapshot.Lang = null.StringFrom(lang)
|
||||
// If we've got a Gemini document, populate
|
||||
// `GemText` field, otherwise raw data goes to `Data`.
|
||||
if mimeType == "text/gemini" {
|
||||
validBody, err := EnsureValidUTF8(body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("UTF-8 error: %w", err)
|
||||
}
|
||||
snapshot.GemText = null.StringFrom(string(validBody))
|
||||
} else {
|
||||
snapshot.Data = null.ValueFrom(body)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checks for a Gemini header, which is
|
||||
// basically the first line of the response
|
||||
// and should contain the response code,
|
||||
// mimeType and language.
|
||||
func getHeadersAndData(data []byte) (firstLine string, rest []byte, err error) {
|
||||
firstLineEnds := slices.Index(data, '\n')
|
||||
if firstLineEnds == -1 {
|
||||
return "", nil, fmt.Errorf("Could not parse response header")
|
||||
}
|
||||
firstLine = string(data[:firstLineEnds])
|
||||
rest = data[firstLineEnds+1:]
|
||||
return string(firstLine), rest, nil
|
||||
}
|
||||
|
||||
// Parses code, mime type and language
|
||||
// from a Gemini header.
|
||||
// Examples:
|
||||
// `20 text/gemini lang=en` (code, mimetype, lang)
|
||||
// `20 text/gemini` (code, mimetype)
|
||||
// `31 gemini://redirected.to/other/site` (code)
|
||||
func getMimeTypeAndLang(headers string) (code int, mimeType string, lang string) {
|
||||
// Regex that parses code, mimetype & lang
|
||||
re := regexp.MustCompile(`^(\d+)\s+([a-zA-Z0-9/\-+]+)(?:[;\s]+(lang=([a-zA-Z0-9-]+)))?\s*$`)
|
||||
matches := re.FindStringSubmatch(headers)
|
||||
if matches == nil || len(matches) <= 1 {
|
||||
// Try to get code at least.
|
||||
re := regexp.MustCompile(`^(\d+)\s+`)
|
||||
matches := re.FindStringSubmatch(headers)
|
||||
if matches == nil || len(matches) <= 1 {
|
||||
return 0, "", ""
|
||||
}
|
||||
code, err := strconv.Atoi(matches[1])
|
||||
if err != nil {
|
||||
return 0, "", ""
|
||||
}
|
||||
return code, "", ""
|
||||
}
|
||||
code, err := strconv.Atoi(matches[1])
|
||||
if err != nil {
|
||||
return 0, "", ""
|
||||
}
|
||||
mimeType = matches[2]
|
||||
lang = matches[4]
|
||||
return code, mimeType, lang
|
||||
}
|
||||
48
gemini/network_test.go
Normal file
48
gemini/network_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Test for input: `20 text/gemini`
|
||||
func TestGetMimeTypeAndLang1(t *testing.T) {
|
||||
code, mimeType, lang := getMimeTypeAndLang("20 text/gemini")
|
||||
if code != 20 || mimeType != "text/gemini" || lang != "" {
|
||||
t.Errorf("Expected (20, 'text/gemini', ''), got (%d, '%s', '%s')", code, mimeType, lang)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMimeTypeAndLang11(t *testing.T) {
|
||||
code, mimeType, lang := getMimeTypeAndLang("20 text/gemini\n")
|
||||
if code != 20 || mimeType != "text/gemini" || lang != "" {
|
||||
t.Errorf("Expected (20, 'text/gemini', ''), got (%d, '%s', '%s')", code, mimeType, lang)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTypeAndLang2(t *testing.T) {
|
||||
code, mimeType, lang := getMimeTypeAndLang("20 text/gemini lang=en")
|
||||
if code != 20 || mimeType != "text/gemini" || lang != "en" {
|
||||
t.Errorf("Expected (20, 'text/gemini', 'en'), got (%d, '%s', '%s')", code, mimeType, lang)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMimeTypeAndLang3(t *testing.T) {
|
||||
code, mimeType, lang := getMimeTypeAndLang("31 gemini://redirect.to/page")
|
||||
if code != 31 || mimeType != "" || lang != "" {
|
||||
t.Errorf("Expected (20, '', ''), got (%d, '%s', '%s')", code, mimeType, lang)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMimeTypeAndLang4(t *testing.T) {
|
||||
code, mimeType, lang := getMimeTypeAndLang("aaafdasdasd")
|
||||
if code != 0 || mimeType != "" || lang != "" {
|
||||
t.Errorf("Expected (0, '', ''), got (%d, '%s', '%s')", code, mimeType, lang)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMimeTypeAndLang5(t *testing.T) {
|
||||
code, mimeType, lang := getMimeTypeAndLang("")
|
||||
if code != 0 || mimeType != "" || lang != "" {
|
||||
t.Errorf("Expected (0, '', ''), got (%d, '%s', '%s')", code, mimeType, lang)
|
||||
}
|
||||
}
|
||||
72
gemini/persistence.go
Normal file
72
gemini/persistence.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gemini-grc/logging"
|
||||
"os"
|
||||
|
||||
_ "github.com/jackc/pgx/v5/stdlib" // PGX driver for PostgreSQL
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
func ConnectToDB() *sqlx.DB {
|
||||
connStr := fmt.Sprintf("postgres://%s:%s@%s:%s/%s",
|
||||
os.Getenv("PG_USER"),
|
||||
os.Getenv("PG_PASSWORD"),
|
||||
os.Getenv("PG_HOST"),
|
||||
os.Getenv("PG_PORT"),
|
||||
os.Getenv("PG_DATABASE"),
|
||||
)
|
||||
|
||||
// Create a connection pool
|
||||
db, err := sqlx.Open("pgx", connStr)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to connect to database with URL %s: %v\n", connStr, err))
|
||||
}
|
||||
db.SetMaxOpenConns(20)
|
||||
err = db.Ping()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to ping database: %v\n", err))
|
||||
}
|
||||
|
||||
logging.LogDebug("Connected to database")
|
||||
return db
|
||||
}
|
||||
|
||||
func SaveSnapshotToDB(tx *sqlx.Tx, s *Snapshot) error {
|
||||
query := `
|
||||
INSERT INTO snapshots (uid, url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error)
|
||||
VALUES (:uid, :url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error)
|
||||
ON CONFLICT (uid) DO UPDATE SET
|
||||
url = EXCLUDED.url,
|
||||
host = EXCLUDED.host,
|
||||
timestamp = EXCLUDED.timestamp,
|
||||
mimetype = EXCLUDED.mimetype,
|
||||
data = EXCLUDED.data,
|
||||
gemtext = EXCLUDED.gemtext,
|
||||
links = EXCLUDED.links,
|
||||
lang = EXCLUDED.lang,
|
||||
response_code = EXCLUDED.response_code,
|
||||
error = EXCLUDED.error
|
||||
`
|
||||
_, err := tx.NamedExec(query, s)
|
||||
if err != nil {
|
||||
logging.LogError("[%s] [%s] Error upserting snapshot: %w", s.URL, s.MimeType.String, err)
|
||||
return fmt.Errorf("DB error: %w", err) // Return the error instead of panicking
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SaveLinkToDB(tx *sqlx.Tx, s *Snapshot) error {
|
||||
query := `
|
||||
INSERT INTO snapshots (uid, url, host, timestamp, mimetype, data, gemtext, links, lang, response_code, error)
|
||||
VALUES (:uid, :url, :host, :timestamp, :mimetype, :data, :gemtext, :links, :lang, :response_code, :error)
|
||||
ON CONFLICT (uid) DO NOTHING
|
||||
`
|
||||
_, err := tx.NamedExec(query, s)
|
||||
if err != nil {
|
||||
logging.LogError("[%s] [%s] Error upserting snapshot: %w", s.URL, s.MimeType.String, err)
|
||||
return fmt.Errorf("DB error: %w", err) // Return the error instead of panicking
|
||||
}
|
||||
return nil
|
||||
}
|
||||
33
gemini/processing.go
Normal file
33
gemini/processing.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"unicode/utf8"
|
||||
|
||||
"golang.org/x/text/encoding/charmap"
|
||||
"golang.org/x/text/transform"
|
||||
)
|
||||
|
||||
func EnsureValidUTF8(input []byte) (string, error) {
|
||||
// Remove NULL byte 0x00
|
||||
inputNoNull := bytes.ReplaceAll(input, []byte{0}, nil)
|
||||
isValidUTF8 := utf8.Valid(inputNoNull)
|
||||
if !isValidUTF8 {
|
||||
encodings := []transform.Transformer{
|
||||
charmap.ISO8859_1.NewDecoder(), // First try ISO8859-1
|
||||
charmap.Windows1252.NewDecoder(), // Then try Windows-1252, etc
|
||||
// TODO: Try more encodings?
|
||||
}
|
||||
for _, encoding := range encodings {
|
||||
reader := transform.NewReader(bytes.NewReader(inputNoNull), encoding)
|
||||
result, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("UTF-8 error: %w", err)
|
||||
}
|
||||
return string(result), nil
|
||||
}
|
||||
}
|
||||
return string(inputNoNull), nil
|
||||
}
|
||||
13
gemini/processing_test.go
Normal file
13
gemini/processing_test.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package gemini
|
||||
|
||||
import "testing"
|
||||
|
||||
// Make sure NULL bytes are removed
|
||||
func TestEnsureValidUTF8(t *testing.T) {
|
||||
// Create a string with a null byte
|
||||
strWithNull := "Hello" + string('\x00') + "world"
|
||||
result, _ := EnsureValidUTF8([]byte(strWithNull))
|
||||
if result != "Helloworld" {
|
||||
t.Errorf("Expected string without NULL byte, got %s", result)
|
||||
}
|
||||
}
|
||||
74
gemini/snapshot.go
Normal file
74
gemini/snapshot.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"gemini-grc/logging"
|
||||
"strings"
|
||||
|
||||
"github.com/guregu/null/v5"
|
||||
)
|
||||
|
||||
type LinkList []GeminiUrl
|
||||
|
||||
func (l LinkList) Value() (driver.Value, error) {
|
||||
return json.Marshal(l)
|
||||
}
|
||||
|
||||
func (l *LinkList) Scan(value interface{}) error {
|
||||
if value == nil {
|
||||
*l = nil
|
||||
return nil
|
||||
}
|
||||
b, ok := value.([]byte) // Type assertion! Converts to []byte
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to scan LinkList: expected []byte, got %T", value)
|
||||
}
|
||||
return json.Unmarshal(b, l)
|
||||
}
|
||||
|
||||
type Snapshot struct {
|
||||
ID int `db:"id" json:"id,omitempty"`
|
||||
UID string `db:"uid" json:"uid,omitempty"`
|
||||
URL GeminiUrl `db:"url" json:"url,omitempty"`
|
||||
Host string `db:"host" json:"host,omitempty"`
|
||||
Timestamp null.Time `db:"timestamp" json:"timestamp,omitempty"`
|
||||
MimeType null.String `db:"mimetype" json:"mimetype,omitempty"`
|
||||
Data null.Value[[]byte] `db:"data" json:"data,omitempty"` // For non text/gemini files.
|
||||
GemText null.String `db:"gemtext" json:"gemtext,omitempty"` // For text/gemini files.
|
||||
Links *LinkList `db:"links" json:"links,omitempty"`
|
||||
Lang null.String `db:"lang" json:"lang,omitempty"`
|
||||
ResponseCode null.Int `db:"response_code" json:"code,omitempty"` // Gemini response status code.
|
||||
Error null.String `db:"error" json:"error,omitempty"` // On network errors only
|
||||
}
|
||||
|
||||
func SnapshotToJSON(g Snapshot) string {
|
||||
// Serialize the Person struct to JSON
|
||||
jsonData, err := json.MarshalIndent(g, "", "\t")
|
||||
if err != nil {
|
||||
logging.LogError("Error serializing to JSON: %w", err)
|
||||
}
|
||||
return string(jsonData)
|
||||
}
|
||||
|
||||
func SnapshotFromJSON(input string) Snapshot {
|
||||
var snapshot Snapshot
|
||||
err := json.Unmarshal([]byte(input), &snapshot)
|
||||
if err != nil {
|
||||
logging.LogError("Error deserializing from JSON: %w", err)
|
||||
}
|
||||
return snapshot
|
||||
}
|
||||
|
||||
func ShouldPersistSnapshot(result *Snapshot) bool {
|
||||
if !result.MimeType.Valid {
|
||||
return false
|
||||
}
|
||||
if result.MimeType.String == "text/gemini" ||
|
||||
strings.HasPrefix(result.MimeType.String, "image/") ||
|
||||
strings.HasPrefix(result.MimeType.String, "text/") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
218
gemini/worker.go
Normal file
218
gemini/worker.go
Normal file
@@ -0,0 +1,218 @@
|
||||
package gemini
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/logging"
|
||||
"gemini-grc/uid"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/guregu/null/v5"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
func SpawnWorkers(numOfWorkers int, db *sqlx.DB) {
|
||||
logging.LogInfo("Spawning %d workers", numOfWorkers)
|
||||
for i := 0; i < numOfWorkers; i++ {
|
||||
go func(i int) {
|
||||
for {
|
||||
runWorker(i, db)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
|
||||
func printPoolIPs() {
|
||||
fmt.Printf("%v", IpPool.IPs)
|
||||
}
|
||||
|
||||
func workOnSnapshot(id int, tx *sqlx.Tx, s *Snapshot) (err error) {
|
||||
// Wrap errors with more info.
|
||||
defer func() {
|
||||
if err != nil {
|
||||
err = fmt.Errorf("[%d] Worker Error: %w", id, err)
|
||||
}
|
||||
}()
|
||||
|
||||
IPs, err := getHostIPAddresses(s.Host)
|
||||
if err != nil {
|
||||
s.Error = null.StringFrom("DNS Resolve error")
|
||||
err = SaveSnapshotToDB(tx, s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[%d] DB Error: %w", id, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the host's ip is in the pool, stop
|
||||
// and add the url in the queue later.
|
||||
IpPool.Lock.RLock()
|
||||
logging.LogDebug("[%d] [%s] Checking pool for IP", id, s.URL)
|
||||
for _, ip := range IPs {
|
||||
_, ok := IpPool.IPs[ip]
|
||||
if ok {
|
||||
logging.LogDebug("[%d] Another worker is visiting this host: %s", id, s.URL)
|
||||
IpPool.Lock.RUnlock()
|
||||
time.Sleep(1 * time.Second) // Avoid flood-retrying when few URLs remain
|
||||
return nil
|
||||
}
|
||||
}
|
||||
IpPool.Lock.RUnlock()
|
||||
|
||||
AddIPsToPool(IPs)
|
||||
|
||||
url := s.URL.String()
|
||||
logging.LogDebug("[%d] Dialing %s", id, url)
|
||||
Visit(s)
|
||||
logging.LogDebug("[%d] Finished dialing.", id)
|
||||
|
||||
go func() {
|
||||
time.Sleep(5 * time.Second)
|
||||
RemoveIPsFromPool(IPs)
|
||||
}()
|
||||
|
||||
if s.MimeType.Valid && s.MimeType.String == "text/gemini" {
|
||||
logging.LogDebug("[%d] [%s] Processing", id, url)
|
||||
s = ProcessGemini(s)
|
||||
}
|
||||
logging.LogDebug("[%d] Saving", id)
|
||||
err = SaveSnapshotToDB(tx, s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[%d] DB Error: %w", id, err)
|
||||
}
|
||||
|
||||
// Store links
|
||||
if s.Links != nil {
|
||||
for _, link := range *s.Links {
|
||||
newSnapshot := Snapshot{UID: uid.UID(), URL: link, Host: link.Hostname, Timestamp: null.TimeFrom(time.Now())}
|
||||
if shouldPersistURL(tx, link) {
|
||||
logging.LogDebug("[%d] Saving link %s", id, link)
|
||||
err = SaveLinkToDB(tx, &newSnapshot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[%d] DB Error: %w", id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func shouldPersistURL(tx *sqlx.Tx, u GeminiUrl) bool {
|
||||
if !strings.HasPrefix(u.String(), "gemini://") {
|
||||
return false
|
||||
}
|
||||
query := `SELECT EXISTS(SELECT 1 FROM snapshots WHERE URL=$1)`
|
||||
var exists bool
|
||||
err := tx.Get(&exists, query, u.String())
|
||||
if err != nil {
|
||||
fmt.Println("Error executing query:", err)
|
||||
return false
|
||||
}
|
||||
return !exists
|
||||
}
|
||||
|
||||
// Select a random snapshot.
|
||||
func GetRandomSnapshot(tx *sqlx.Tx) (*Snapshot, error) {
|
||||
query := `SELECT * FROM snapshots
|
||||
WHERE response_code IS NULL
|
||||
AND error IS NULL
|
||||
ORDER BY RANDOM()
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED`
|
||||
// AND (timestamp < NOW() - INTERVAL '1 day' OR timestamp IS NULL)
|
||||
var snapshot Snapshot
|
||||
err := tx.Get(&snapshot, query)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
// Handle the case where no rows were found
|
||||
return nil, nil
|
||||
}
|
||||
// Handle other potential errors
|
||||
return nil, err
|
||||
}
|
||||
return &snapshot, nil
|
||||
}
|
||||
|
||||
func GetRandomSnapshots(tx *sqlx.Tx) ([]Snapshot, error) {
|
||||
query := `
|
||||
SELECT * FROM snapshots
|
||||
WHERE response_code IS NULL
|
||||
AND error IS NULL
|
||||
ORDER BY RANDOM()
|
||||
LIMIT $1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
`
|
||||
var snapshots []Snapshot
|
||||
err := tx.Select(&snapshots, query, config.CONFIG.WorkerBatchSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
func GetRandomSnapshotsDistinctHosts(tx *sqlx.Tx) ([]Snapshot, error) {
|
||||
query := `
|
||||
SELECT DISTINCT ON (host) *
|
||||
FROM snapshots
|
||||
WHERE response_code IS NULL
|
||||
AND error IS NULL
|
||||
ORDER BY host, RANDOM()
|
||||
LIMIT $1
|
||||
`
|
||||
var snapshots []Snapshot
|
||||
err := tx.Select(&snapshots, query, config.CONFIG.WorkerBatchSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
func runWorker(id int, db *sqlx.DB) {
|
||||
// Start the transaction
|
||||
tx, err := db.Beginx()
|
||||
if err != nil {
|
||||
logging.LogError("Failed to begin transaction: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
logging.LogError("[%d] Failed to commit transaction: %w", id, err)
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
snapshots, err := GetRandomSnapshotsDistinctHosts(tx)
|
||||
|
||||
if err != nil {
|
||||
logging.LogError("[%d] Error retrieving snapshot: %w", id, err)
|
||||
time.Sleep(10 * time.Second)
|
||||
return
|
||||
} else if len(snapshots) == 0 {
|
||||
logging.LogInfo("[%d] No remaining snapshots to visit.", id)
|
||||
time.Sleep(1 * time.Minute)
|
||||
return
|
||||
}
|
||||
total := len(snapshots)
|
||||
for i, s := range snapshots {
|
||||
if InBlacklist(&s) {
|
||||
logging.LogWarn("[%d] Ignoring %d/%d blacklisted URL %s", id, i+1, total, s.URL)
|
||||
}
|
||||
logging.LogInfo("[%d] Starting %d/%d %s", id, i+1, total, s.URL)
|
||||
err = workOnSnapshot(id, tx, &s)
|
||||
if err != nil {
|
||||
logging.LogError("[%d] [%s] Error %w", id, s.URL, err)
|
||||
// TODO Remove panic and gracefully handle/log error
|
||||
fmt.Printf("Error %s Stack trace:\n%s", err, debug.Stack())
|
||||
panic("ERROR encountered")
|
||||
}
|
||||
if s.Error.Valid {
|
||||
logging.LogWarn("[%d] [%s] Error: %v", id, s.URL, fmt.Errorf(s.Error.String))
|
||||
}
|
||||
logging.LogDebug("[%d] Done %d/%d.", id, i, total)
|
||||
}
|
||||
logging.LogInfo("[%d] Worker done.", id)
|
||||
}
|
||||
9
go.mod
9
go.mod
@@ -8,9 +8,18 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/guregu/null/v5 v5.0.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/pgx/v5 v5.7.1 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/jmoiron/sqlx v1.4.0 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
golang.org/x/crypto v0.27.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20241004190924-225e2abe05e6 // indirect
|
||||
golang.org/x/net v0.27.0 // indirect
|
||||
golang.org/x/sync v0.8.0 // indirect
|
||||
golang.org/x/sys v0.25.0 // indirect
|
||||
golang.org/x/text v0.18.0 // indirect
|
||||
)
|
||||
|
||||
29
go.sum
29
go.sum
@@ -1,25 +1,54 @@
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/gabriel-vasile/mimetype v1.4.5 h1:J7wGKdGu33ocBOhGy0z653k/lFKLFDPJMG8Gql0kxn4=
|
||||
github.com/gabriel-vasile/mimetype v1.4.5/go.mod h1:ibHel+/kbxn9x2407k1izTA1S81ku1z/DlgOW2QE0M4=
|
||||
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/guregu/null/v5 v5.0.0 h1:PRxjqyOekS11W+w/7Vfz6jgJE/BCwELWtgvOJzddimw=
|
||||
github.com/guregu/null/v5 v5.0.0/go.mod h1:SjupzNy+sCPtwQTKWhUCqjhVCO69hpsl2QsZrWHjlwU=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||
github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs=
|
||||
github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/jaevor/go-nanoid v1.4.0 h1:mPz0oi3CrQyEtRxeRq927HHtZCJAAtZ7zdy7vOkrvWs=
|
||||
github.com/jaevor/go-nanoid v1.4.0/go.mod h1:GIpPtsvl3eSBsjjIEFQdzzgpi50+Bo1Luk+aYlbJzlc=
|
||||
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
|
||||
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
|
||||
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
|
||||
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
|
||||
golang.org/x/exp v0.0.0-20241004190924-225e2abe05e6 h1:1wqE9dj9NpSm04INVsJhhEUzhuDVjbcyKH91sVyPATw=
|
||||
golang.org/x/exp v0.0.0-20241004190924-225e2abe05e6/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
|
||||
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
|
||||
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
|
||||
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
|
||||
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
|
||||
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
|
||||
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package main
|
||||
package logging
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
141
main.go
141
main.go
@@ -1,137 +1,52 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gemini-grc/config"
|
||||
"gemini-grc/gemini"
|
||||
"gemini-grc/logging"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
zlog "github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var CONFIG Config
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
func main() {
|
||||
wg.Add(1)
|
||||
CONFIG = *getConfig()
|
||||
config.CONFIG = *config.GetConfig()
|
||||
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
|
||||
zerolog.SetGlobalLevel(CONFIG.logLevel)
|
||||
zerolog.SetGlobalLevel(config.CONFIG.LogLevel)
|
||||
zlog.Logger = zlog.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "[2006-01-02 15:04:05]"})
|
||||
if err := runApp(); err != nil {
|
||||
LogError("Application error: %w", err)
|
||||
logging.LogError("Application error: %w", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func runApp() error {
|
||||
// urls := []string{"gemini://smol.gr"}
|
||||
// urls := []string{"gemini://gemini.circumlunar.space/users/solderpunk/gemlog/orphans-of-netscape.gmi"} // Test 31 redirect
|
||||
// urls := []string{"gemini://zaibatsu.circumlunar.space/~solderpunk/gemlog/orphans-of-netscape.gmi"}
|
||||
// urls := []string{"gemini://farcaster.net/berlin/dared.jpg"}
|
||||
// urls := []string{"gemini://smol.gr/media/amstrad_cpc_6128.jpg", "https://go.dev/blog/go-brand/Go-Logo/PNG/Go-Logo_Blue.png"}
|
||||
urls := []string{"gemini://tlgs.one/", "gemini://gmi.noulin.net/", "gemini://warmedal.se/~antenna/"}
|
||||
logging.LogInfo("Starting up. Press Ctrl+C to exit")
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
queue := make(chan string, 1000)
|
||||
results := make(chan Snapshot, 100)
|
||||
done := make(chan struct{})
|
||||
db := gemini.ConnectToDB()
|
||||
|
||||
go spawnStatsReport(queue, results)
|
||||
go resultsHandler(queue, results)
|
||||
spawnWorkers(CONFIG.numOfWorkers, queue, results)
|
||||
// !!! DANGER !!!
|
||||
// Removes all rows and adds some seed URLs.
|
||||
// populateDB(db)
|
||||
|
||||
for _, url := range urls {
|
||||
queue <- url
|
||||
defer func(db *sqlx.DB) {
|
||||
err := db.Close()
|
||||
if err != nil {
|
||||
// TODO properly log & hangle error
|
||||
panic(err)
|
||||
}
|
||||
<-done
|
||||
}(db)
|
||||
|
||||
go gemini.SpawnWorkers(config.CONFIG.NumOfWorkers, db)
|
||||
|
||||
<-sigs
|
||||
logging.LogInfo("Received SIGINT or SIGTERM signal, exiting")
|
||||
return nil
|
||||
}
|
||||
|
||||
func spawnStatsReport(queue chan string, results chan Snapshot) {
|
||||
ticker := time.NewTicker(time.Duration(time.Second * 10))
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
LogInfo("Queue length: %d", len(queue))
|
||||
LogInfo("Results length: %d", len(results))
|
||||
}
|
||||
}
|
||||
|
||||
func spawnWorkers(numOfWorkers int, queue <-chan string, results chan Snapshot) {
|
||||
LogInfo("Spawning %d workers", numOfWorkers)
|
||||
for i := 0; i < numOfWorkers; i++ {
|
||||
go func(i int) {
|
||||
worker(i, queue, results)
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
|
||||
func resultsHandler(queue chan string, results <-chan Snapshot) {
|
||||
for result := range results {
|
||||
if result.Error != nil {
|
||||
LogError("[%s] %w", result.URL, result.Error)
|
||||
} else {
|
||||
LogDebug("[%s] Done", result.URL)
|
||||
for _, link := range result.Links {
|
||||
if strings.HasPrefix(link.Full, "gemini://") {
|
||||
go func(link GeminiUrl) {
|
||||
queue <- link.Full
|
||||
// fmt.Printf("Sent %s to queue\n", link.Full)
|
||||
}(link)
|
||||
}
|
||||
}
|
||||
// if result.MimeType == "text/gemini" {
|
||||
// result.Data = ""
|
||||
// fmt.Printf(SnapshotToJSON(result))
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func worker(id int, queue <-chan string, results chan Snapshot) {
|
||||
for url := range queue {
|
||||
if !shouldVisit(url) {
|
||||
LogInfo("Skipping %s", url)
|
||||
continue
|
||||
}
|
||||
LogInfo("Worker %d visiting %s", id, url)
|
||||
result, err := Visit(url)
|
||||
if err != nil {
|
||||
LogError("[%s] %w", url, err)
|
||||
continue
|
||||
}
|
||||
// If we encountered an error when
|
||||
// visiting, skip processing
|
||||
if result.Error != nil {
|
||||
results <- *result
|
||||
continue
|
||||
}
|
||||
LogDebug("Worker %d processing %s", id, url)
|
||||
if result.MimeType == "text/gemini" {
|
||||
result = ProcessGemini(result)
|
||||
}
|
||||
if shouldPersist(result) {
|
||||
LogDebug("Worker %d saving %s", id, url)
|
||||
SaveSnapshot(CONFIG.rootPath, result)
|
||||
}
|
||||
results <- *result
|
||||
// time.Sleep(time.Duration(rand.IntN(5)) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func shouldVisit(url string) bool {
|
||||
if !strings.HasPrefix(url, "gemini://") {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func shouldPersist(result *Snapshot) bool {
|
||||
if result.MimeType == "text/gemini" ||
|
||||
strings.HasPrefix(result.MimeType, "image/") ||
|
||||
strings.HasPrefix(result.MimeType, "text/") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
117
network.go
117
network.go
@@ -1,117 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Visit(url string) (snapshot *Snapshot, err error) {
|
||||
snapshot = &Snapshot{Timestamp: time.Now(), UID: UID()}
|
||||
|
||||
geminiUrl, err := ParseUrl(url, "")
|
||||
if err != nil {
|
||||
snapshot.Error = fmt.Errorf("[%s] %w", url, err)
|
||||
return snapshot, nil
|
||||
}
|
||||
snapshot.URL = *geminiUrl
|
||||
|
||||
LogDebug("[%s] Connecting", geminiUrl)
|
||||
|
||||
// Establish a TLS connection
|
||||
tlsConfig := &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
conn, err := tls.Dial("tcp", fmt.Sprintf("%s:%d", geminiUrl.Hostname, geminiUrl.Port), tlsConfig)
|
||||
if err != nil {
|
||||
snapshot.Error = err
|
||||
return snapshot, nil
|
||||
}
|
||||
// Defer properly: Also handle possible
|
||||
// error of conn.Close()
|
||||
defer func() {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
snapshot.Error = fmt.Errorf("[%s] Closing connection error, ignoring: %w", snapshot.URL.String(), err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Read data from the connection
|
||||
conn.SetReadDeadline(time.Now().Add(time.Duration(CONFIG.responseTimeout) * time.Second))
|
||||
buf := make([]byte, 4096)
|
||||
var data []byte
|
||||
|
||||
// Write Gemini request to get response.
|
||||
// paths := []string{"/", ".", ""}
|
||||
// if slices.Contains(paths, geminiUrl.Path) || strings.HasSuffix(geminiUrl.Path, "gmi") {
|
||||
conn.Write([]byte(fmt.Sprintf("%s\r\n", geminiUrl.String())))
|
||||
// }
|
||||
|
||||
// Read response bytes in len(buf) byte chunks
|
||||
for {
|
||||
n, err := conn.Read(buf)
|
||||
if n > 0 {
|
||||
data = append(data, buf[:n]...)
|
||||
}
|
||||
if len(data) > CONFIG.maxResponseSize {
|
||||
snapshot.Error = fmt.Errorf("[%s] Response size exceeded maximum of %d bytes", url, CONFIG.maxResponseSize)
|
||||
return snapshot, nil
|
||||
}
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else {
|
||||
snapshot.Error = fmt.Errorf("[%s] %w", url, err)
|
||||
return snapshot, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
LogDebug("[%s] Received %d bytes", geminiUrl.String(), len(data))
|
||||
err = processResponse(snapshot, data)
|
||||
if err != nil {
|
||||
snapshot.Error = fmt.Errorf("%w", err)
|
||||
}
|
||||
return snapshot, nil
|
||||
}
|
||||
|
||||
func processResponse(snapshot *Snapshot, data []byte) error {
|
||||
headers, body, err := getHeadersAndData(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
code, mimeType, lang := getMimeTypeAndLang(headers)
|
||||
snapshot.ResponseCode, snapshot.MimeType, snapshot.Lang, snapshot.Data = code, mimeType, lang, body
|
||||
if mimeType == "text/gemini" {
|
||||
snapshot.GemText = string(body)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getHeadersAndData(data []byte) (string, []byte, error) {
|
||||
firstLineEnds := slices.Index(data, '\n')
|
||||
if firstLineEnds == -1 {
|
||||
return "", nil, fmt.Errorf("Could not parse response header")
|
||||
}
|
||||
firstLine := data[:firstLineEnds]
|
||||
rest := data[firstLineEnds+1:]
|
||||
return string(firstLine), rest, nil
|
||||
}
|
||||
|
||||
func getMimeTypeAndLang(headers string) (int, string, string) {
|
||||
re := regexp.MustCompile(`^(\d+)\s+([a-zA-Z0-9/\-+]+)[;\s]+(lang=([a-zA-Z0-9-]+))?`)
|
||||
matches := re.FindStringSubmatch(headers)
|
||||
if matches == nil || len(matches) <= 1 {
|
||||
return 0, "", ""
|
||||
}
|
||||
code, err := strconv.Atoi(matches[1])
|
||||
if err != nil {
|
||||
return 0, "", ""
|
||||
}
|
||||
mimeType := matches[2]
|
||||
lang := matches[4]
|
||||
return code, mimeType, lang
|
||||
}
|
||||
48
snapshot.go
48
snapshot.go
@@ -1,48 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Snapshot struct {
|
||||
UID string `json:"uid,omitempty"`
|
||||
URL GeminiUrl `json:"url,omitempty"`
|
||||
Timestamp time.Time `json:"timestamp,omitempty"`
|
||||
MimeType string `json:"mimetype,omitempty"`
|
||||
Data []byte `json:"data,omitempty"`
|
||||
GemText string `json:"gemtext,omitempty"`
|
||||
Links []GeminiUrl `json:"links,omitempty"`
|
||||
Lang string `json:"lang,omitempty"`
|
||||
// Gemini status code
|
||||
ResponseCode int `json:"code,omitempty"`
|
||||
// On network errors, for Gemini server errors
|
||||
// we have ResponseCode above.
|
||||
Error error `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func (u Snapshot) String() string {
|
||||
return fmt.Sprintf(
|
||||
"[%s] %s %s %s %d %s %s %s",
|
||||
u.UID, u.URL, u.Timestamp, u.Links, u.ResponseCode, u.MimeType, u.Lang, u.Error,
|
||||
)
|
||||
}
|
||||
|
||||
func SnapshotToJSON(g Snapshot) string {
|
||||
// Serialize the Person struct to JSON
|
||||
jsonData, err := json.Marshal(g)
|
||||
if err != nil {
|
||||
LogError("Error serializing to JSON: %w", err)
|
||||
}
|
||||
return string(jsonData)
|
||||
}
|
||||
|
||||
func SnapshotFromJSON(input string) Snapshot {
|
||||
var snapshot Snapshot
|
||||
err := json.Unmarshal([]byte(input), &snapshot)
|
||||
if err != nil {
|
||||
LogError("Error deserializing from JSON: %w", err)
|
||||
}
|
||||
return snapshot
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package main
|
||||
package uid
|
||||
|
||||
import (
|
||||
nanoid "github.com/jaevor/go-nanoid"
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
|
||||
func UID() string {
|
||||
// Missing o,O and l
|
||||
uid, err := nanoid.CustomASCII("abcdefghijkmnpqrstuvwxyzABCDEFGHIJKLMNPQRSTUVWXYZ0123456789", 18)
|
||||
uid, err := nanoid.CustomASCII("abcdefghijkmnpqrstuvwxyzABCDEFGHIJKLMNPQRSTUVWXYZ0123456789", 20)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
Reference in New Issue
Block a user