Files
veola/internal/scheduler/scheduler.go
prosolis 1ae2c50b9a Add eBay Browse API integration with daily call quota
eBay marketplaces are now polled through eBay's official Buy > Browse API (client-credentials OAuth2) instead of an Apify scraper actor; Apify still handles Yahoo JP and Mercari. Browse API calls are tracked per day in a new ebay_api_usage table and capped (default 5000, configurable) on eBay's Pacific-time reset clock, so polling halts before the limit is hit. Credentials live in config.toml [ebay] and are overridable via /settings, which also surfaces the day's running call count.

Also carries the server.secure_cookies config plumbing (field, accessor, example) consumed by the following commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 12:10:39 -07:00

700 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package scheduler
import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"
"github.com/robfig/cron/v3"
"veola/internal/apify"
"veola/internal/config"
"veola/internal/db"
"veola/internal/ebay"
"veola/internal/models"
"veola/internal/ntfy"
)
// Provider labels distinguish how a plan is executed: through an Apify actor
// run, or through eBay's official Browse API.
const (
providerApify = "apify"
providerEbay = "ebay"
)
type Scheduler struct {
cfg *config.Config
store *db.Store
apify *apify.Client
ebay *ebay.Client
ntfy *ntfy.Client
cron *cron.Cron
mu sync.Mutex
entries map[int64]cron.EntryID
rootCtx context.Context
cancel context.CancelFunc
}
func New(cfg *config.Config, store *db.Store, ap *apify.Client, nt *ntfy.Client) *Scheduler {
rootCtx, cancel := context.WithCancel(context.Background())
return &Scheduler{
cfg: cfg,
store: store,
apify: ap,
ebay: ebay.New(cfg.Ebay.ClientID, cfg.Ebay.ClientSecret, cfg.Ebay.Environment),
ntfy: nt,
cron: cron.New(),
entries: make(map[int64]cron.EntryID),
rootCtx: rootCtx,
cancel: cancel,
}
}
func (s *Scheduler) Start(ctx context.Context) error {
items, err := s.store.ListActiveItems(ctx)
if err != nil {
return err
}
for _, it := range items {
s.register(it)
}
s.cron.Start()
slog.Info("scheduler started", "items", len(items))
return nil
}
// Stop blocks until running jobs complete.
func (s *Scheduler) Stop() {
s.cancel()
stopCtx := s.cron.Stop()
<-stopCtx.Done()
slog.Info("scheduler stopped")
}
// SyncItem registers, re-registers, or removes the cron job for an item based
// on its current Active flag. Call after create/update/toggle/delete.
func (s *Scheduler) SyncItem(it models.Item) {
s.mu.Lock()
defer s.mu.Unlock()
if existing, ok := s.entries[it.ID]; ok {
s.cron.Remove(existing)
delete(s.entries, it.ID)
}
if !it.Active {
return
}
s.registerLocked(it)
}
func (s *Scheduler) RemoveItem(id int64) {
s.mu.Lock()
defer s.mu.Unlock()
if existing, ok := s.entries[id]; ok {
s.cron.Remove(existing)
delete(s.entries, id)
}
}
func (s *Scheduler) register(it models.Item) {
s.mu.Lock()
defer s.mu.Unlock()
s.registerLocked(it)
}
func (s *Scheduler) registerLocked(it models.Item) {
mins := it.PollIntervalMinutes
if mins <= 0 {
mins = s.cfg.Scheduler.GlobalPollIntervalMinutes
}
if mins <= 0 {
mins = 60
}
spec := fmt.Sprintf("@every %dm", mins)
id := it.ID
entryID, err := s.cron.AddFunc(spec, func() {
ctx, cancel := context.WithTimeout(s.rootCtx, 10*time.Minute)
defer cancel()
fresh, err := s.store.GetItem(ctx, id)
if err != nil || fresh == nil || !fresh.Active {
return
}
s.RunPoll(ctx, *fresh)
})
if err != nil {
slog.Error("schedule failed", "item_id", it.ID, "err", err)
return
}
s.entries[it.ID] = entryID
}
// RunPoll executes one poll cycle for an item. Public so handlers can trigger
// "Run Now" without going through cron. Iterates over each (alias × marketplace)
// pair; a single failing combo does not poison the others.
func (s *Scheduler) RunPoll(ctx context.Context, it models.Item) {
plans := s.buildAllInputs(it)
if len(plans) == 0 {
s.recordError(ctx, it.ID, "no marketplaces configured for this item")
return
}
apifyClient := s.apifyClient(ctx)
var results []apify.UnifiedResult
var errs []string
successes := 0
for _, p := range plans {
decoded, err := s.ExecutePlan(ctx, p)
if err != nil {
label := p.marketplace
if p.query != "" {
label = fmt.Sprintf("query %q on %s", p.query, p.marketplace)
}
errs = append(errs, fmt.Sprintf("%s: %s", label, err.Error()))
slog.Error("plan failed", "item_id", it.ID, "provider", p.provider, "marketplace", p.marketplace, "query", p.query, "err", err)
continue
}
results = append(results, decoded...)
successes++
}
if successes == 0 {
s.recordError(ctx, it.ID, strings.Join(errs, "; "))
return
}
if it.UsePriceComparison {
pcID := it.ActorPriceCompare
if pcID == "" {
pcID = s.cfg.Apify.Actors.PriceComparison
}
if pcID != "" {
pcQueries := it.SearchQueries()
if len(pcQueries) == 0 && it.URL != "" {
pcQueries = []string{""}
}
for _, q := range pcQueries {
pcRaw, err := apifyClient.Run(ctx, pcID, apify.PriceComparisonInput{
Query: q, URL: it.URL,
ProxyConfiguration: s.proxyConfig(),
})
if err == nil {
pc, _ := apify.Decode(pcRaw, apify.SourcePriceCompare)
for i := range pc {
pc[i].MatchedQuery = q
}
results = append(results, pc...)
} else {
slog.Warn("price comparison failed", "item_id", it.ID, "query", q, "err", err)
}
}
}
}
beforeDedup := len(results)
results = DedupByURL(results)
threshold := s.cfg.Scheduler.MatchConfidenceThreshold
beforeFilter := len(results)
results = FilterResults(results, threshold, it.IncludeOutOfStock)
results = ApplyItemFilters(results, it.MinPrice, it.ExcludeKeywordsList())
slog.Info("filter applied",
"item_id", it.ID,
"before_dedup", beforeDedup,
"before_filter", beforeFilter,
"after", len(results),
"min_confidence", threshold,
"min_price", it.MinPrice,
"exclude_count", len(it.ExcludeKeywordsList()),
"include_out_of_stock", it.IncludeOutOfStock,
)
bestIdx := PickBest(results)
alertsSent := 0
for _, r := range results {
exists, err := s.store.ResultExists(ctx, it.ID, r.URL)
if err != nil {
slog.Error("dedup check failed", "err", err)
continue
}
if exists {
continue
}
alerted := false
if ShouldAlert(it.TargetPrice, r.Price) {
if err := s.sendNotification(ctx, it, r); err != nil {
slog.Error("ntfy send failed", "err", err)
} else {
alerted = true
alertsSent++
}
}
price := r.Price
_, err = s.store.InsertResult(ctx, &models.Result{
ItemID: it.ID,
Title: r.Title,
Price: &price,
Currency: r.Currency,
URL: r.URL,
Source: r.Source,
ImageURL: r.ImageURL,
MatchedQuery: r.MatchedQuery,
Alerted: alerted,
})
if err != nil {
slog.Error("insert result failed", "err", err)
}
}
errMsg := ""
if len(errs) > 0 {
errMsg = strings.Join(errs, "; ")
}
if bestIdx >= 0 {
best := results[bestIdx]
bp := best.Price
_ = s.store.UpdateItemPollResult(ctx, it.ID, &models.Item{
BestPrice: &bp,
BestPriceStore: best.Store,
BestPriceURL: best.URL,
BestPriceImageURL: best.ImageURL,
BestPriceTitle: best.Title,
}, errMsg)
_ = s.store.InsertPricePoint(ctx, &models.PricePoint{
ItemID: it.ID,
Price: bp,
Store: best.Store,
})
} else {
_ = s.store.UpdateItemPollResult(ctx, it.ID, nil, errMsg)
}
slog.Info("poll completed",
"item_id", it.ID,
"item_name", it.Name,
"marketplaces", len(plans),
"successes", successes,
"results", len(results),
"alerts_sent", alertsSent,
)
}
func (s *Scheduler) recordError(ctx context.Context, id int64, msg string) {
if err := s.store.UpdateItemPollResult(ctx, id, nil, msg); err != nil {
slog.Error("record error failed", "err", err)
}
}
// apifyClient returns an apify.Client whose API key reflects the latest
// value from settings, falling back to config.toml.
func (s *Scheduler) apifyClient(ctx context.Context) *apify.Client {
key := s.cfg.Apify.APIKey
if v, _ := s.store.GetSetting(ctx, "apify_api_key"); v != "" {
key = v
}
return apify.New(key)
}
// ebayClient returns the shared eBay client with credentials refreshed from
// settings (falling back to config.toml). The client caches its OAuth token
// in memory, so the same instance is reused across polls; credentials are
// only re-applied when they actually change.
func (s *Scheduler) ebayClient(ctx context.Context) *ebay.Client {
id := s.cfg.Ebay.ClientID
secret := s.cfg.Ebay.ClientSecret
if v, _ := s.store.GetSetting(ctx, "ebay_client_id"); v != "" {
id = v
}
if v, _ := s.store.GetSetting(ctx, "ebay_client_secret"); v != "" {
secret = v
}
s.ebay.EnsureCredentials(id, secret)
return s.ebay
}
// EbayUsage returns the number of eBay Browse API calls made so far today and
// the configured daily limit. A limit <= 0 means uncapped. Settings override
// config.toml for the limit, mirroring how credentials are resolved.
func (s *Scheduler) EbayUsage(ctx context.Context) (used, limit int) {
used, _ = s.store.EbayUsageToday(ctx)
limit = s.cfg.Ebay.DailyCallLimit
if v, _ := s.store.GetSetting(ctx, "ebay_daily_call_limit"); v != "" {
if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil {
limit = n
}
}
return used, limit
}
// ExecutePlan runs one plan and returns decoded, provider-agnostic results
// with MatchedQuery already stamped. eBay plans go through the official
// Browse API; all other plans run an Apify actor. Callers handle per-plan
// errors without poisoning sibling plans.
func (s *Scheduler) ExecutePlan(ctx context.Context, p actorPlan) ([]apify.UnifiedResult, error) {
var decoded []apify.UnifiedResult
switch p.provider {
case providerEbay:
sp, ok := p.input.(ebay.SearchParams)
if !ok {
return nil, fmt.Errorf("ebay plan has wrong input type %T", p.input)
}
used, limit := s.EbayUsage(ctx)
if limit > 0 && used >= limit {
return nil, fmt.Errorf("ebay daily API call limit reached (%d/%d); polling halted until the next reset (midnight US Pacific)", used, limit)
}
listings, err := s.ebayClient(ctx).Search(ctx, sp)
// The call hit eBay (or at least was attempted against it) whether
// or not it succeeded, so it counts against the daily allowance.
if n, incErr := s.store.IncrementEbayUsage(ctx); incErr != nil {
slog.Error("ebay usage increment failed", "err", incErr)
} else if limit > 0 && n >= limit {
slog.Warn("ebay daily API call limit reached", "used", n, "limit", limit)
}
if err != nil {
return nil, err
}
decoded = make([]apify.UnifiedResult, 0, len(listings))
for _, l := range listings {
decoded = append(decoded, apify.UnifiedResult{
Title: l.Title,
Price: l.Price,
Currency: l.Currency,
URL: l.URL,
Store: l.Store,
ImageURL: l.ImageURL,
Source: apify.SourceActiveEbay,
})
}
default:
if p.actorID == "" {
return nil, fmt.Errorf("no actor configured for %s", p.marketplace)
}
raw, err := s.apifyClient(ctx).Run(ctx, p.actorID, p.input)
if err != nil {
return nil, err
}
decoded, _ = apify.Decode(raw, p.source)
}
usable := 0
for i := range decoded {
decoded[i].MatchedQuery = p.query
if decoded[i].URL != "" && decoded[i].Price > 0 {
usable++
}
}
slog.Info("plan executed",
"provider", p.provider,
"marketplace", p.marketplace,
"query", p.query,
"decoded", len(decoded),
"usable", usable,
)
return decoded, nil
}
func (s *Scheduler) sendNotification(ctx context.Context, it models.Item, r apify.UnifiedResult) error {
tags := []string{"mag"}
if it.TargetPrice != nil && r.Price <= *it.TargetPrice {
tags = []string{"shopping_cart", "tada"}
}
priority := it.NtfyPriority
if priority == "" {
priority = "default"
}
topic := it.NtfyTopic
if topic == "" {
if v, _ := s.store.GetSetting(ctx, "ntfy_default_topic"); v != "" {
topic = v
} else {
topic = s.cfg.Ntfy.DefaultTopic
}
}
msg := fmt.Sprintf("%s %s%.2f", r.Store, currencyPrefix(r.Currency), r.Price)
if it.TargetPrice != nil {
msg += fmt.Sprintf(" (target: %s%.2f)", currencyPrefix(r.Currency), *it.TargetPrice)
}
if r.Title != "" {
msg += "\n" + r.Title
}
baseURL := s.cfg.Ntfy.BaseURL
if v, _ := s.store.GetSetting(ctx, "ntfy_base_url"); v != "" {
baseURL = v
}
token, _ := s.store.GetSetting(ctx, "ntfy_token")
client := ntfy.NewWithToken(baseURL, token)
return client.Send(ctx, ntfy.Notification{
Topic: topic,
Title: fmt.Sprintf("Veola Alert: %s", it.Name),
Message: msg,
Priority: priority,
Tags: tags,
Click: r.URL,
})
}
func currencyPrefix(c string) string {
switch c {
case "USD", "":
return "$"
case "GBP":
return "£"
case "EUR":
return "€"
case "JPY":
return "¥"
}
return c + " "
}
// BuildPreviewInputs returns one actor plan per alias for the first marketplace
// on the item. Preview deliberately uses only one marketplace to limit actor
// runs, but exercises every alias so the operator sees the full result set.
func (s *Scheduler) BuildPreviewInputs(it models.Item) []actorPlan {
queries := it.SearchQueries()
if len(queries) == 0 {
queries = []string{""}
}
markets := it.Marketplaces
if len(markets) > 1 {
markets = markets[:1]
}
var out []actorPlan
for _, q := range queries {
out = append(out, s.buildInputsForQuery(it, q, markets)...)
}
return out
}
type actorPlan struct {
marketplace string
source string
provider string
actorID string
query string
input any
}
// Marketplace returns the marketplace for this plan.
func (p actorPlan) Marketplace() string { return p.marketplace }
// Source returns the result-source label (used to pick a decoder).
func (p actorPlan) Source() string { return p.source }
// ActorID returns the Apify actor ID this plan will invoke.
func (p actorPlan) ActorID() string { return p.actorID }
// Query returns the alias string this plan searches for. Empty for URL-only items.
func (p actorPlan) Query() string { return p.query }
// Input returns the actor input payload as expected by apify.Client.Run.
func (p actorPlan) Input() any { return p.input }
// Provider returns "apify" or "ebay" — how this plan is executed.
func (p actorPlan) Provider() string { return p.provider }
// buildAllInputs returns one actor plan per (alias × marketplace) for the item.
// For URL-only items (no aliases), produces one plan per marketplace with an
// empty query string.
func (s *Scheduler) buildAllInputs(it models.Item) []actorPlan {
queries := it.SearchQueries()
if len(queries) == 0 {
queries = []string{""}
}
markets := it.Marketplaces
if len(markets) == 0 {
markets = []string{"ebay.com"}
}
var out []actorPlan
for _, q := range queries {
out = append(out, s.buildInputsForQuery(it, q, markets)...)
}
return out
}
// buildInputsForQuery returns one actor plan per marketplace, all using the
// same query string. Used by both the scheduler and the preview path.
func (s *Scheduler) buildInputsForQuery(it models.Item, query string, markets []string) []actorPlan {
url := strings.ToLower(it.URL)
plans := make([]actorPlan, 0, len(markets))
for _, m := range markets {
mk := strings.ToLower(m)
switch {
case strings.Contains(mk, "yahoo") || strings.Contains(url, "yahoo.co.jp"):
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.YahooAuctionsJP)
plans = append(plans, actorPlan{
marketplace: m, source: apify.SourceYahooJP, provider: providerApify,
actorID: actorID, query: query,
input: apify.YahooAuctionsJPInput{SearchTerm: query, MaxPages: 1},
})
case strings.Contains(mk, "mercari") || strings.Contains(url, "mercari"):
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.MercariJP)
plans = append(plans, actorPlan{
marketplace: m, source: apify.SourceMercariJP, provider: providerApify,
actorID: actorID, query: query,
input: apify.MercariJPInput{
SearchKeywords: []string{query},
Status: "on_sale",
MaxResults: 30,
},
})
case ebay.IsEbayMarketplace(mk):
// eBay marketplaces are polled through eBay's official Browse
// API, not an Apify scraper actor.
plans = append(plans, actorPlan{
marketplace: m, source: apify.SourceActiveEbay, provider: providerEbay,
query: query,
input: ebay.SearchParams{
MarketplaceID: ebay.MarketplaceID(mk),
Query: query,
ListingType: it.ListingType,
Limit: 30,
},
})
default:
// Non-eBay custom marketplaces still fall back to the Apify
// active-listings actor.
actorID := firstNonEmpty(it.ActorActive, s.cfg.Apify.Actors.ActiveListings)
plans = append(plans, actorPlan{
marketplace: m, source: apify.SourceActiveEbay, provider: providerApify,
actorID: actorID, query: query,
input: apify.ActiveListingInput{
SearchQueries: []string{query},
MaxProductsPerSearch: 30,
MaxSearchPages: 1,
Sort: "best_match",
ListingType: mapListingType(it.ListingType),
ProxyConfiguration: s.proxyConfig(),
},
})
}
}
return plans
}
// DedupByURL collapses duplicates within a single result set. When the same
// listing matches multiple aliases the first occurrence wins, including its
// MatchedQuery tag.
func DedupByURL(in []apify.UnifiedResult) []apify.UnifiedResult {
seen := map[string]bool{}
out := make([]apify.UnifiedResult, 0, len(in))
for _, r := range in {
if r.URL == "" {
out = append(out, r)
continue
}
key := r.Source + "|" + r.URL
if seen[key] {
continue
}
seen[key] = true
out = append(out, r)
}
return out
}
// proxyConfig returns the apify proxyConfiguration block built from
// config.toml. Returns nil — meaning omit the field from actor input
// entirely — if use_apify_proxy is false. Group and country are ignored when
// use_apify_proxy is false to prevent contradictory input.
func (s *Scheduler) proxyConfig() *apify.ProxyConfiguration {
p := s.cfg.Apify.Proxy
if !p.UseApifyProxy {
return nil
}
return &apify.ProxyConfiguration{
UseApifyProxy: true,
ApifyProxyGroups: p.Groups,
ApifyProxyCountry: p.Country,
}
}
// mapListingType translates Veola's listing-type vocabulary ("all", "BIN",
// "auction") into the automation-lab/ebay-scraper input vocabulary
// ("all", "buy_it_now", "auction"). Unrecognized values fall through as-is
// in case the user pasted a value the actor accepts but we don't.
func mapListingType(s string) string {
switch strings.ToLower(s) {
case "", "all":
return "all"
case "bin", "buy_it_now":
return "buy_it_now"
case "auction":
return "auction"
}
return s
}
func firstNonEmpty(vs ...string) string {
for _, v := range vs {
if v != "" {
return v
}
}
return ""
}
// SeedSoldHistory runs the sold-listings actor and writes price_history rows
// for an item just added. Errors are logged and swallowed: a missing baseline
// is not fatal.
func (s *Scheduler) SeedSoldHistory(ctx context.Context, it models.Item) {
queries := it.SearchQueries()
if len(queries) == 0 {
return
}
markets := it.Marketplaces
if len(markets) == 0 {
markets = []string{"ebay.com"}
}
for _, q := range queries {
for _, m := range markets {
s.seedSoldHistoryFor(ctx, it, q, m)
}
}
}
func (s *Scheduler) seedSoldHistoryFor(ctx context.Context, it models.Item, query, marketplace string) {
actorID := firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.SoldListings)
source := apify.SourceSoldEbay
if strings.Contains(strings.ToLower(marketplace), "yahoo") {
actorID = firstNonEmpty(it.ActorSold, s.cfg.Apify.Actors.YahooAuctionsJPSold)
source = apify.SourceSoldYahooJP
}
if actorID == "" {
return
}
raw, err := s.apifyClient(ctx).Run(ctx, actorID, apify.SoldListingInput{
Query: query, Marketplace: marketplace, MaxResults: 50, DaysBack: 30,
ProxyConfiguration: s.proxyConfig(),
})
if err != nil {
slog.Warn("sold history seed failed", "item_id", it.ID, "marketplace", marketplace, "query", query, "err", err)
return
}
for _, r := range raw {
var sold apify.SoldListingResult
if err := jsonUnmarshal(r, &sold); err != nil || sold.SoldPrice <= 0 {
continue
}
t, _ := time.Parse(time.RFC3339, sold.SoldAt)
if t.IsZero() {
t = time.Now()
}
_ = s.store.InsertPricePoint(ctx, &models.PricePoint{
ItemID: it.ID,
Price: sold.SoldPrice,
Store: sourceLabelToStore(source),
PolledAt: t,
})
}
}
func sourceLabelToStore(src string) string {
switch src {
case apify.SourceSoldYahooJP:
return "yahoo-auctions-jp-sold"
}
return "ebay-sold"
}