Files
veola/internal/scheduler/scheduler.go
prosolis edb732ee1f Auction end times, visual flair, and pre-launch cleanup
Auction handling:
- Capture itemEndDate from eBay Browse API and ending_date from ZenMarket
  (Yahoo JP); plumb through results.ends_at column. Permissive ZenMarket
  parser (multiple layouts, JST when offset missing).
- Per-row "Ends" countdown column + "Ending soon" banner on results pages,
  live-ticked by flair.js with urgent/critical tinting under 1h/5m.
- Backfill ends_at for known auctions when their URL reappears in a poll
  (dedup hit no longer drops the new end time).
- Hide ended auctions from result listings by default via
  ResultsQuery.ExcludeEnded; rows stay in the DB.

Visual flair:
- Glassy backdrop-blur v-cards with gradient-mask borders and hover-lift.
- htmx swap fade-in via transient .v-just-swapped class.
- Count-up animation on dashboard stats. All animations gated behind
  prefers-reduced-motion.

eBay condition + region filters (auctions-style scoping):
- items.condition and items.region columns; threaded through item form,
  CreateItem/UpdateItem, scheduler eBay plan input, and previewKey so
  cache invalidates when these change.
- ebay.SearchParams gains conditionIds and itemLocationCountry filters.

Run Now reload + countdown engine:
- Run Now now sets HX-Refresh: true (non-htmx fallback: 303 redirect) so
  the entire results view — best price, chart, badge, last polled —
  reflects the new poll, instead of swapping just one partial.

Pre-launch hardening (P1 set):
- auth.EqualizeLoginTiming on no-such-user branch.
- (*App).serverError centralizes 500s; replaces err.Error() leaks across
  results/settings/items/users/dashboard handlers.
- main.go server: ReadTimeout 30s / WriteTimeout 60s / IdleTimeout 120s
  alongside the existing ReadHeaderTimeout.
- noListFS wrapper blocks static directory listings.
- Credential fields in settings no longer render value=; blank submission
  preserves the saved value, with per-field "Saved in settings / Set in
  config.toml / Not set" status indicator.

Misc:
- -debug flag wires slog to LevelDebug; raw ZenMarket items logged for
  format diagnosis.
- /healthz public endpoint for reverse-proxy probes.
- deploy/veola.service systemd unit template (hardening flags, single
  ReadWritePaths=/var/lib/veola).
- handlers_test.go covers /healthz, setup-gate redirect, auth gate, and
  /login render with httptest + in-memory sqlite.
- best_price_currency on items; templates pick the right symbol per row.
- .gitignore now excludes *.log / veola-debug.log.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 17:47:09 -07:00

713 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package scheduler
import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"
"github.com/robfig/cron/v3"
"veola/internal/apify"
"veola/internal/config"
"veola/internal/db"
"veola/internal/ebay"
"veola/internal/models"
"veola/internal/ntfy"
)
// Provider labels distinguish how a plan is executed: through an Apify actor
// run, or through eBay's official Browse API.
const (
providerApify = "apify"
providerEbay = "ebay"
)
type Scheduler struct {
cfg *config.Config
store *db.Store
apify *apify.Client
ebay *ebay.Client
ntfy *ntfy.Client
cron *cron.Cron
mu sync.Mutex
entries map[int64]cron.EntryID
rootCtx context.Context
cancel context.CancelFunc
}
func New(cfg *config.Config, store *db.Store, ap *apify.Client, nt *ntfy.Client) *Scheduler {
rootCtx, cancel := context.WithCancel(context.Background())
return &Scheduler{
cfg: cfg,
store: store,
apify: ap,
ebay: ebay.New(cfg.Ebay.ClientID, cfg.Ebay.ClientSecret, cfg.Ebay.Environment),
ntfy: nt,
cron: cron.New(),
entries: make(map[int64]cron.EntryID),
rootCtx: rootCtx,
cancel: cancel,
}
}
func (s *Scheduler) Start(ctx context.Context) error {
items, err := s.store.ListActiveItems(ctx)
if err != nil {
return err
}
for _, it := range items {
s.register(it)
}
s.cron.Start()
slog.Info("scheduler started", "items", len(items))
return nil
}
// Stop blocks until running jobs complete.
func (s *Scheduler) Stop() {
s.cancel()
stopCtx := s.cron.Stop()
<-stopCtx.Done()
slog.Info("scheduler stopped")
}
// SyncItem registers, re-registers, or removes the cron job for an item based
// on its current Active flag. Call after create/update/toggle/delete.
func (s *Scheduler) SyncItem(it models.Item) {
s.mu.Lock()
defer s.mu.Unlock()
if existing, ok := s.entries[it.ID]; ok {
s.cron.Remove(existing)
delete(s.entries, it.ID)
}
if !it.Active {
return
}
s.registerLocked(it)
}
func (s *Scheduler) RemoveItem(id int64) {
s.mu.Lock()
defer s.mu.Unlock()
if existing, ok := s.entries[id]; ok {
s.cron.Remove(existing)
delete(s.entries, id)
}
}
func (s *Scheduler) register(it models.Item) {
s.mu.Lock()
defer s.mu.Unlock()
s.registerLocked(it)
}
func (s *Scheduler) registerLocked(it models.Item) {
mins := it.PollIntervalMinutes
if mins <= 0 {
mins = s.cfg.Scheduler.GlobalPollIntervalMinutes
}
if mins <= 0 {
mins = 60
}
spec := fmt.Sprintf("@every %dm", mins)
id := it.ID
entryID, err := s.cron.AddFunc(spec, func() {
ctx, cancel := context.WithTimeout(s.rootCtx, 10*time.Minute)
defer cancel()
fresh, err := s.store.GetItem(ctx, id)
if err != nil || fresh == nil || !fresh.Active {
return
}
s.RunPoll(ctx, *fresh)
})
if err != nil {
slog.Error("schedule failed", "item_id", it.ID, "err", err)
return
}
s.entries[it.ID] = entryID
}
// RunPoll executes one poll cycle for an item. Public so handlers can trigger
// "Run Now" without going through cron. Iterates over each (alias × marketplace)
// pair; a single failing combo does not poison the others.
func (s *Scheduler) RunPoll(ctx context.Context, it models.Item) {
plans := s.buildAllInputs(it)
if len(plans) == 0 {
s.recordError(ctx, it.ID, "no marketplaces configured for this item")
return
}
apifyClient := s.apifyClient(ctx)
var results []apify.UnifiedResult
var errs []string
successes := 0
for _, p := range plans {
decoded, err := s.ExecutePlan(ctx, p)
if err != nil {
label := p.marketplace
if p.query != "" {
label = fmt.Sprintf("query %q on %s", p.query, p.marketplace)
}
errs = append(errs, fmt.Sprintf("%s: %s", label, err.Error()))
slog.Error("plan failed", "item_id", it.ID, "provider", p.provider, "marketplace", p.marketplace, "query", p.query, "err", err)
continue
}
results = append(results, decoded...)
successes++
}
if successes == 0 {
s.recordError(ctx, it.ID, strings.Join(errs, "; "))
return
}
if it.UsePriceComparison {
pcID := it.ActorPriceCompare
if pcID == "" {
pcID = s.cfg.Apify.Actors.PriceComparison
}
if pcID != "" {
pcQueries := it.SearchQueries()
if len(pcQueries) == 0 && it.URL != "" {
pcQueries = []string{""}
}
for _, q := range pcQueries {
pcRaw, err := apifyClient.Run(ctx, pcID, apify.PriceComparisonInput{
Query: q, URL: it.URL,
ProxyConfiguration: s.proxyConfig(),
})
if err == nil {
pc, _ := apify.Decode(pcRaw, apify.SourcePriceCompare)
for i := range pc {
pc[i].MatchedQuery = q
}
results = append(results, pc...)
} else {
slog.Warn("price comparison failed", "item_id", it.ID, "query", q, "err", err)
}
}
}
}
beforeDedup := len(results)
results = DedupByURL(results)
threshold := s.cfg.Scheduler.MatchConfidenceThreshold
beforeFilter := len(results)
results = FilterResults(results, threshold, it.IncludeOutOfStock)
results = ApplyItemFilters(results, it.MinPrice, it.ExcludeKeywordsList())
slog.Info("filter applied",
"item_id", it.ID,
"before_dedup", beforeDedup,
"before_filter", beforeFilter,
"after", len(results),
"min_confidence", threshold,
"min_price", it.MinPrice,
"exclude_count", len(it.ExcludeKeywordsList()),
"include_out_of_stock", it.IncludeOutOfStock,
)
bestIdx := PickBest(results)
alertsSent := 0
for _, r := range results {
exists, err := s.store.ResultExists(ctx, it.ID, r.URL)
if err != nil {
slog.Error("dedup check failed", "err", err)
continue
}
if exists {
// Row already stored — but if this poll surfaced an end time we
// didn't have before (or the row predates the ends_at column),
// backfill it so countdowns light up for known auctions.
if r.EndsAt != nil {
if err := s.store.BackfillResultEndsAt(ctx, it.ID, r.URL, *r.EndsAt); err != nil {
slog.Error("backfill ends_at failed", "err", err)
}
}
continue
}
alerted := false
if ShouldAlert(it.TargetPrice, r.Price) {
if err := s.sendNotification(ctx, it, r); err != nil {
slog.Error("ntfy send failed", "err", err)
} else {
alerted = true
alertsSent++
}
}
price := r.Price
_, err = s.store.InsertResult(ctx, &models.Result{
ItemID: it.ID,
Title: r.Title,
Price: &price,
Currency: r.Currency,
URL: r.URL,
Source: r.Source,
ImageURL: r.ImageURL,
MatchedQuery: r.MatchedQuery,
Alerted: alerted,
EndsAt: r.EndsAt,
})
if err != nil {
slog.Error("insert result failed", "err", err)
}
}
errMsg := ""
if len(errs) > 0 {
errMsg = strings.Join(errs, "; ")
}
if bestIdx >= 0 {
best := results[bestIdx]
bp := best.Price
_ = s.store.UpdateItemPollResult(ctx, it.ID, &models.Item{
BestPrice: &bp,
BestPriceCurrency: best.Currency,
BestPriceStore: best.Store,
BestPriceURL: best.URL,
BestPriceImageURL: best.ImageURL,
BestPriceTitle: best.Title,
}, errMsg)
_ = s.store.InsertPricePoint(ctx, &models.PricePoint{
ItemID: it.ID,
Price: bp,
Store: best.Store,
})
} else {
_ = s.store.UpdateItemPollResult(ctx, it.ID, nil, errMsg)
}
slog.Info("poll completed",
"item_id", it.ID,
"item_name", it.Name,
"marketplaces", len(plans),
"successes", successes,
"results", len(results),
"alerts_sent", alertsSent,
)
}
func (s *Scheduler) recordError(ctx context.Context, id int64, msg string) {
if err := s.store.UpdateItemPollResult(ctx, id, nil, msg); err != nil {
slog.Error("record error failed", "err", err)
}
}
// apifyClient returns an apify.Client whose API key reflects the latest
// value from settings, falling back to config.toml.
func (s *Scheduler) apifyClient(ctx context.Context) *apify.Client {
key := s.cfg.Apify.APIKey
if v, _ := s.store.GetSetting(ctx, "apify_api_key"); v != "" {
key = v
}
return apify.New(key)
}
// ebayClient returns the shared eBay client with credentials refreshed from
// settings (falling back to config.toml). The client caches its OAuth token
// in memory, so the same instance is reused across polls; credentials are
// only re-applied when they actually change.
func (s *Scheduler) ebayClient(ctx context.Context) *ebay.Client {
id := s.cfg.Ebay.ClientID
secret := s.cfg.Ebay.ClientSecret
if v, _ := s.store.GetSetting(ctx, "ebay_client_id"); v != "" {
id = v
}
if v, _ := s.store.GetSetting(ctx, "ebay_client_secret"); v != "" {
secret = v
}
s.ebay.EnsureCredentials(id, secret)
return s.ebay
}
// EbayUsage returns the number of eBay Browse API calls made so far today and
// the configured daily limit. A limit <= 0 means uncapped. Settings override
// config.toml for the limit, mirroring how credentials are resolved.
func (s *Scheduler) EbayUsage(ctx context.Context) (used, limit int) {
used, _ = s.store.EbayUsageToday(ctx)
limit = s.cfg.Ebay.DailyCallLimit
if v, _ := s.store.GetSetting(ctx, "ebay_daily_call_limit"); v != "" {
if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil {
limit = n
}
}
return used, limit
}
// ExecutePlan runs one plan and returns decoded, provider-agnostic results
// with MatchedQuery already stamped. eBay plans go through the official
// Browse API; all other plans run an Apify actor. Callers handle per-plan
// errors without poisoning sibling plans.
func (s *Scheduler) ExecutePlan(ctx context.Context, p actorPlan) ([]apify.UnifiedResult, error) {
var decoded []apify.UnifiedResult
switch p.provider {
case providerEbay:
sp, ok := p.input.(ebay.SearchParams)
if !ok {
return nil, fmt.Errorf("ebay plan has wrong input type %T", p.input)
}
used, limit := s.EbayUsage(ctx)
if limit > 0 && used >= limit {
return nil, fmt.Errorf("ebay daily API call limit reached (%d/%d); polling halted until the next reset (midnight US Pacific)", used, limit)
}
listings, err := s.ebayClient(ctx).Search(ctx, sp)
// The call hit eBay (or at least was attempted against it) whether
// or not it succeeded, so it counts against the daily allowance.
if n, incErr := s.store.IncrementEbayUsage(ctx); incErr != nil {
slog.Error("ebay usage increment failed", "err", incErr)
} else if limit > 0 && n >= limit {
slog.Warn("ebay daily API call limit reached", "used", n, "limit", limit)
}
if err != nil {
return nil, err
}
decoded = make([]apify.UnifiedResult, 0, len(listings))
for _, l := range listings {
decoded = append(decoded, apify.UnifiedResult{
Title: l.Title,
Price: l.Price,
Currency: l.Currency,
URL: l.URL,
Store: l.Store,
ImageURL: l.ImageURL,
Source: apify.SourceActiveEbay,
EndsAt: l.EndsAt,
})
}
default:
if p.actorID == "" {
return nil, fmt.Errorf("no actor configured for %s", p.marketplace)
}
raw, err := s.apifyClient(ctx).Run(ctx, p.actorID, p.input)
if err != nil {
return nil, err
}
decoded, _ = apify.Decode(raw, p.source)
}
usable := 0
for i := range decoded {
decoded[i].MatchedQuery = p.query
if decoded[i].URL != "" && decoded[i].Price > 0 {
usable++
}
}
slog.Info("plan executed",
"provider", p.provider,
"marketplace", p.marketplace,
"query", p.query,
"decoded", len(decoded),
"usable", usable,
)
return decoded, nil
}
func (s *Scheduler) sendNotification(ctx context.Context, it models.Item, r apify.UnifiedResult) error {
tags := []string{"mag"}
if it.TargetPrice != nil && r.Price <= *it.TargetPrice {
tags = []string{"shopping_cart", "tada"}
}
priority := it.NtfyPriority
if priority == "" {
priority = "default"
}
topic := it.NtfyTopic
if topic == "" {
if v, _ := s.store.GetSetting(ctx, "ntfy_default_topic"); v != "" {
topic = v
} else {
topic = s.cfg.Ntfy.DefaultTopic
}
}
msg := fmt.Sprintf("%s %s%.2f", r.Store, currencyPrefix(r.Currency), r.Price)
if it.TargetPrice != nil {
msg += fmt.Sprintf(" (target: %s%.2f)", currencyPrefix(r.Currency), *it.TargetPrice)
}
if r.Title != "" {
msg += "\n" + r.Title
}
baseURL := s.cfg.Ntfy.BaseURL
if v, _ := s.store.GetSetting(ctx, "ntfy_base_url"); v != "" {
baseURL = v
}
token, _ := s.store.GetSetting(ctx, "ntfy_token")
client := ntfy.NewWithToken(baseURL, token)
return client.Send(ctx, ntfy.Notification{
Topic: topic,
Title: fmt.Sprintf("Veola Alert: %s", it.Name),
Message: msg,
Priority: priority,
Tags: tags,
Click: r.URL,
})
}
func currencyPrefix(c string) string {
switch c {
case "USD", "":
return "$"
case "GBP":
return "£"
case "EUR":
return "€"
case "JPY":
return "¥"
}
return c + " "
}
// BuildPreviewInputs returns one actor plan per alias for the first marketplace
// on the item. Preview deliberately uses only one marketplace to limit actor
// runs, but exercises every alias so the operator sees the full result set.
func (s *Scheduler) BuildPreviewInputs(it models.Item) []actorPlan {
queries := it.SearchQueries()
if len(queries) == 0 {
queries = []string{""}
}
markets := it.Marketplaces
if len(markets) > 1 {
markets = markets[:1]
}
var out []actorPlan
for _, q := range queries {
out = append(out, s.buildInputsForQuery(it, q, markets)...)
}
return out
}
type actorPlan struct {
marketplace string
source string
provider string
actorID string
query string
input any
}
// Marketplace returns the marketplace for this plan.
func (p actorPlan) Marketplace() string { return p.marketplace }
// Source returns the result-source label (used to pick a decoder).
func (p actorPlan) Source() string { return p.source }
// ActorID returns the Apify actor ID this plan will invoke.
func (p actorPlan) ActorID() string { return p.actorID }
// Query returns the alias string this plan searches for. Empty for URL-only items.
func (p actorPlan) Query() string { return p.query }
// Input returns the actor input payload as expected by apify.Client.Run.
func (p actorPlan) Input() any { return p.input }
// Provider returns "apify" or "ebay" — how this plan is executed.
func (p actorPlan) Provider() string { return p.provider }
// buildAllInputs returns one actor plan per (alias × marketplace) for the item.
// For URL-only items (no aliases), produces one plan per marketplace with an
// empty query string.
func (s *Scheduler) buildAllInputs(it models.Item) []actorPlan {
queries := it.SearchQueries()
if len(queries) == 0 {
queries = []string{""}
}
markets := it.Marketplaces
if len(markets) == 0 {
markets = []string{"ebay.com"}
}
var out []actorPlan
for _, q := range queries {
out = append(out, s.buildInputsForQuery(it, q, markets)...)
}
return out
}
// buildInputsForQuery returns one actor plan per marketplace, all using the
// same query string. Used by both the scheduler and the preview path.
func (s *Scheduler) buildInputsForQuery(it models.Item, query string, markets []string) []actorPlan {
url := strings.ToLower(it.URL)
plans := make([]actorPlan, 0, len(markets))
for _, m := range markets {
mk := strings.ToLower(m)
switch {
case strings.Contains(mk, "yahoo") || strings.Contains(url, "yahoo.co.jp"):
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.YahooAuctionsJP)
plans = append(plans, actorPlan{
marketplace: m, source: apify.SourceYahooJP, provider: providerApify,
actorID: actorID, query: query,
input: apify.YahooAuctionsJPInput{SearchTerm: query, MaxPages: 1},
})
case strings.Contains(mk, "mercari") || strings.Contains(url, "mercari"):
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.MercariJP)
plans = append(plans, actorPlan{
marketplace: m, source: apify.SourceMercariJP, provider: providerApify,
actorID: actorID, query: query,
input: apify.MercariJPInput{
SearchKeywords: []string{query},
Status: "on_sale",
MaxResults: 30,
},
})
case ebay.IsEbayMarketplace(mk):
// eBay marketplaces are polled through eBay's official Browse
// API, not an Apify scraper actor.
plans = append(plans, actorPlan{
marketplace: m, source: apify.SourceActiveEbay, provider: providerEbay,
query: query,
input: ebay.SearchParams{
MarketplaceID: ebay.MarketplaceID(mk),
Query: query,
ListingType: it.ListingType,
Condition: it.Condition,
Region: it.Region,
Limit: 30,
},
})
default:
// Non-eBay custom marketplaces still fall back to the Apify
// active-listings actor.
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.ActiveListings)
plans = append(plans, actorPlan{
marketplace: m, source: apify.SourceActiveEbay, provider: providerApify,
actorID: actorID, query: query,
input: apify.ActiveListingInput{
SearchQueries: []string{query},
MaxProductsPerSearch: 30,
MaxSearchPages: 1,
Sort: "best_match",
ListingType: mapListingType(it.ListingType),
ProxyConfiguration: s.proxyConfig(),
},
})
}
}
return plans
}
// DedupByURL collapses duplicates within a single result set. When the same
// listing matches multiple aliases the first occurrence wins, including its
// MatchedQuery tag.
func DedupByURL(in []apify.UnifiedResult) []apify.UnifiedResult {
seen := map[string]bool{}
out := make([]apify.UnifiedResult, 0, len(in))
for _, r := range in {
if r.URL == "" {
out = append(out, r)
continue
}
key := r.Source + "|" + r.URL
if seen[key] {
continue
}
seen[key] = true
out = append(out, r)
}
return out
}
// proxyConfig returns the apify proxyConfiguration block built from
// config.toml. Returns nil — meaning omit the field from actor input
// entirely — if use_apify_proxy is false. Group and country are ignored when
// use_apify_proxy is false to prevent contradictory input.
func (s *Scheduler) proxyConfig() *apify.ProxyConfiguration {
p := s.cfg.Apify.Proxy
if !p.UseApifyProxy {
return nil
}
return &apify.ProxyConfiguration{
UseApifyProxy: true,
ApifyProxyGroups: p.Groups,
ApifyProxyCountry: p.Country,
}
}
// mapListingType translates Veola's listing-type vocabulary ("all", "BIN",
// "auction") into the automation-lab/ebay-scraper input vocabulary
// ("all", "buy_it_now", "auction"). Unrecognized values fall through as-is
// in case the user pasted a value the actor accepts but we don't.
func mapListingType(s string) string {
switch strings.ToLower(s) {
case "", "all":
return "all"
case "bin", "buy_it_now":
return "buy_it_now"
case "auction":
return "auction"
}
return s
}
func firstNonEmpty(vs ...string) string {
for _, v := range vs {
if v != "" {
return v
}
}
return ""
}
// SeedSoldHistory runs the sold-listings actor and writes price_history rows
// for an item just added. Errors are logged and swallowed: a missing baseline
// is not fatal.
func (s *Scheduler) SeedSoldHistory(ctx context.Context, it models.Item) {
queries := it.SearchQueries()
if len(queries) == 0 {
return
}
markets := it.Marketplaces
if len(markets) == 0 {
markets = []string{"ebay.com"}
}
for _, q := range queries {
for _, m := range markets {
s.seedSoldHistoryFor(ctx, it, q, m)
}
}
}
func (s *Scheduler) seedSoldHistoryFor(ctx context.Context, it models.Item, query, marketplace string) {
actorID := firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.SoldListings)
source := apify.SourceSoldEbay
if strings.Contains(strings.ToLower(marketplace), "yahoo") {
actorID = firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.YahooAuctionsJPSold)
source = apify.SourceSoldYahooJP
}
if actorID == "" {
return
}
raw, err := s.apifyClient(ctx).Run(ctx, actorID, apify.SoldListingInput{
Query: query, Marketplace: marketplace, MaxResults: 50, DaysBack: 30,
ProxyConfiguration: s.proxyConfig(),
})
if err != nil {
slog.Warn("sold history seed failed", "item_id", it.ID, "marketplace", marketplace, "query", query, "err", err)
return
}
for _, r := range raw {
var sold apify.SoldListingResult
if err := jsonUnmarshal(r, &sold); err != nil || sold.SoldPrice <= 0 {
continue
}
t, _ := time.Parse(time.RFC3339, sold.SoldAt)
if t.IsZero() {
t = time.Now()
}
_ = s.store.InsertPricePoint(ctx, &models.PricePoint{
ItemID: it.ID,
Price: sold.SoldPrice,
Store: sourceLabelToStore(source),
PolledAt: t,
})
}
}
func sourceLabelToStore(src string) string {
switch src {
case apify.SourceSoldYahooJP:
return "yahoo-auctions-jp-sold"
}
return "ebay-sold"
}