From 7fb20c2ea9cfd2e126068db00b56ca44db646513 Mon Sep 17 00:00:00 2001 From: Elias Kohout Date: Mon, 20 Jan 2025 21:22:45 +0100 Subject: [PATCH] make capitalization uniform --- src/internal/app/article.go | 42 ++++ src/internal/app/index.go | 53 ++++ src/internal/app/upsearch.go | 42 ++++ src/internal/crawler/crawlerfacade.go | 69 ++++++ src/internal/crawler/resource.go | 6 + src/internal/crawler/spiegelconverter.go | 96 +++++++ src/internal/crawler/webfeed.go | 67 +++++ src/internal/crawler/zeitconverter.go | 100 ++++++++ .../model/database/articlerepository.go | 234 ++++++++++++++++++ .../model/database/documentrepository.go | 109 ++++++++ src/internal/util/distributer.go | 37 +++ src/internal/util/iclone.go | 5 + 12 files changed, 860 insertions(+) create mode 100644 src/internal/app/article.go create mode 100644 src/internal/app/index.go create mode 100644 src/internal/app/upsearch.go create mode 100644 src/internal/crawler/crawlerfacade.go create mode 100644 src/internal/crawler/resource.go create mode 100644 src/internal/crawler/spiegelconverter.go create mode 100644 src/internal/crawler/webfeed.go create mode 100644 src/internal/crawler/zeitconverter.go create mode 100644 src/internal/model/database/articlerepository.go create mode 100644 src/internal/model/database/documentrepository.go create mode 100644 src/internal/util/distributer.go create mode 100644 src/internal/util/iclone.go diff --git a/src/internal/app/article.go b/src/internal/app/article.go new file mode 100644 index 0000000..028c666 --- /dev/null +++ b/src/internal/app/article.go @@ -0,0 +1,42 @@ +package app + +import ( + "html/template" + "net/http" + "strconv" +) + +// Enpoint that returns a list of articles given search terms in the post +// request of a search form. Uses the content template. +func (app *App) Article(w http.ResponseWriter, req *http.Request) { + // get id + id, err := strconv.ParseUint(req.PathValue("id"), 10, 64) + if err != nil { + http.NotFound(w, req) + return + } + + // get articles + article, err := app.articles.ById(int(id)) + if err != nil { + // treat as no result + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // render template + t := template.Must(template.ParseFiles( + "assets/templates/articlePage.html", + "assets/templates/layout.html", + )) + + data := map[string]interface{}{ + "SelectedNavItemArticle": false, + "ArticlePageVM": article.PageViewModel(), + } + err = t.ExecuteTemplate(w, "base", data) + if err != nil { + http.Error(w, "Failed to render template", http.StatusInternalServerError) + return + } +} diff --git a/src/internal/app/index.go b/src/internal/app/index.go new file mode 100644 index 0000000..777a074 --- /dev/null +++ b/src/internal/app/index.go @@ -0,0 +1,53 @@ +package app + +import ( + "crowsnest/internal/model" + "html/template" + "net/http" + "strconv" +) + +// List the latest articles using the base template. +func (app *App) Index(w http.ResponseWriter, req *http.Request) { + const pageSize = 15 + var limit, offset, pageId uint64 = pageSize, 0, 0 + var err error + + // get page number + if pageId, err = strconv.ParseUint(req.PathValue("id"), 10, 32); err == nil { + pageId-- + offset = pageId * pageSize + } + + // get articles + articleVMs, err := app.articles.AllArticleViewModels(int(limit), int(offset)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // get count of total articles + totalCount, err := app.articles.CountAll() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + totalCount /= pageSize + + // render template + t := template.Must(template.ParseFiles( + "assets/templates/article.html", + "assets/templates/layout.html", + "assets/templates/components/pagination.html")) + + data := map[string]interface{}{ + "SelectedNavItemArticle": true, + "ArticleVMs": &articleVMs, + "Paginations": model.NewPaginationViewModel(uint(pageId+1), totalCount+1), + } + err = t.ExecuteTemplate(w, "base", data) + if err != nil { + http.Error(w, "Failed to render template", http.StatusInternalServerError) + return + } +} diff --git a/src/internal/app/upsearch.go b/src/internal/app/upsearch.go new file mode 100644 index 0000000..d6a21ed --- /dev/null +++ b/src/internal/app/upsearch.go @@ -0,0 +1,42 @@ +package app + +import ( + "html/template" + "net/http" +) + +// Enpoint that returns a list of articles given search terms in the post +// request of a search form. Uses the content template. +func (app *App) UpSearch(w http.ResponseWriter, req *http.Request) { + // construct search query + searchTerms := req.FormValue("search") + if searchTerms == "" { + app.Index(w, req) + return + } + + // get articles + articleVMs, err := app.articles.SearchArticleViewModel(searchTerms) + if err != nil { + // treat as no result + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // render template + t := template.Must(template.ParseFiles( + "assets/templates/article.html", + "assets/templates/layout.html", + "assets/templates/components/pagination.html")) + + data := map[string]interface{}{ + "SelectedNavItemArticle": true, + "ArticleVMs": &articleVMs, + "Paginations": nil, + } + err = t.ExecuteTemplate(w, "base", data) + if err != nil { + http.Error(w, "Failed to render template", http.StatusInternalServerError) + return + } +} diff --git a/src/internal/crawler/crawlerfacade.go b/src/internal/crawler/crawlerfacade.go new file mode 100644 index 0000000..17585a7 --- /dev/null +++ b/src/internal/crawler/crawlerfacade.go @@ -0,0 +1,69 @@ +package crawler + +import ( + "crowsnest/internal/model" + "crowsnest/internal/util" + + "github.com/gocolly/colly/v2" +) + +type CrawlerFacade struct { + spiegelFeedDistributer *util.Distributer[*model.Article] + zeitFeedDistributer *util.Distributer[*model.Article] +} + +func (cf *CrawlerFacade) Init() { + // init + cf.spiegelFeedDistributer = &util.Distributer[*model.Article]{} + cf.spiegelFeedDistributer.Init() + cf.zeitFeedDistributer = &util.Distributer[*model.Article]{} + cf.zeitFeedDistributer.Init() + + // run spiegel feed + sf := &WebFeed{} + sf.Init( + "https://www.spiegel.de/", + colly.AllowedDomains("www.spiegel.de", "spiegel.de"), + colly.CacheDir("./persistence/spiegel_cache"), + colly.MaxDepth(1), + ) + sf_feed := sf.Feed() + sf_converter := ConverterSpiegel{} + sf_converter.Init() + + go func() { + for val := range sf_feed { + article, err := sf_converter.Convert(val) + if err != nil { continue } + cf.spiegelFeedDistributer.Publish(article) + } + }() + + // run zeit feed + zf := &WebFeed{} + zf.Init( + "https://www.zeit.de/index", + colly.AllowedDomains("www.zeit.de", "zeit.de"), + colly.CacheDir("./persistence/zeit_cache"), + colly.MaxDepth(1), + ) + zf_feed := zf.Feed() + zf_converter := ZeitConverter{} + zf_converter.Init() + + go func() { + for val := range zf_feed { + article, err := zf_converter.Convert(val) + if err != nil { continue } + cf.zeitFeedDistributer.Publish(article) + } + }() +} + +func (cf *CrawlerFacade) SubscribeToSpiegelFeed(hook func(*model.Article)) { + cf.spiegelFeedDistributer.Subscribe(hook) +} + +func (cf *CrawlerFacade) SubscribeToZeitFeed(hook func(*model.Article)) { + cf.zeitFeedDistributer.Subscribe(hook) +} diff --git a/src/internal/crawler/resource.go b/src/internal/crawler/resource.go new file mode 100644 index 0000000..b2a4239 --- /dev/null +++ b/src/internal/crawler/resource.go @@ -0,0 +1,6 @@ +package crawler + +type Resource struct { + Url string + Body string +} diff --git a/src/internal/crawler/spiegelconverter.go b/src/internal/crawler/spiegelconverter.go new file mode 100644 index 0000000..0d8240b --- /dev/null +++ b/src/internal/crawler/spiegelconverter.go @@ -0,0 +1,96 @@ +package crawler + +import ( + "crowsnest/internal/model" + "errors" + "regexp" + "strings" + "time" + + "github.com/PuerkitoBio/goquery" +) + +type ConverterSpiegel struct { + pattern_paywall *regexp.Regexp + pattern_url *regexp.Regexp + pattern_whitespace *regexp.Regexp +} + +func (c *ConverterSpiegel) Init() { + c.pattern_paywall = regexp.MustCompile(`"paywall":{"attributes":{"is_active":true`) + c.pattern_url = regexp.MustCompile(`^https://(www\.)?spiegel.de.*`) + c.pattern_whitespace = regexp.MustCompile(`\s+`) +} + +func (c *ConverterSpiegel) Convert(res *Resource) (*model.Article, error) { + // check url url pattern + if !c.pattern_url.Match([]byte(res.Url)) { + return nil, errors.New("invalid url pattern") + } + + // check for paywall + if c.pattern_paywall.Match([]byte(res.Body)) { + return nil, errors.New("unable to extract article due to paywal") + } + + // construct goquery doc + doc, err := goquery.NewDocumentFromReader(strings.NewReader(res.Body)) + if err != nil { + return nil, err + } + + // check for article type + tag := doc.Find("meta[property='og:type']") + pagetype, exists := tag.Attr("content") + if !exists || pagetype != "article" { + return nil, errors.New("unable to extract article, not of type article") + } + + // get title + tag = doc.Find("meta[property='og:title']") + title, exists := tag.Attr("content") + if !exists { + return nil, errors.New("unable to extract article, no title tag") + } + + // prepend description to content of article + tag = doc.Find("meta[name='description']") + content, exists := tag.Attr("content") + content += " " + if !exists { + return nil, errors.New("unable to extract article, no description tag") + } + + // get publishing date + tag = doc.Find("meta[name='date']") + datestr, exists := tag.Attr("content") + if !exists { + return nil, errors.New("unable to extract article, no date tag") + } + + date, err := time.Parse("2006-01-02T15:04:05-07:00", datestr) + if err != nil { + return nil, err + } + + // get content + tag = doc.Find("main[id='Inhalt'] div > p") + + tag.Each(func(index int, p *goquery.Selection) { + content += " " + p.Text() + }) + + // clean up content string + content = string(c.pattern_whitespace.ReplaceAll([]byte(content), []byte(" "))) + content = strings.ReplaceAll(content, "»", "\"") + content = strings.ReplaceAll(content, "«", "\"") + + // create new article + return &model.Article{ + SourceUrl: res.Url, + PublishDate: date, + FetchDate: time.Now(), + Title: title, + Content: content, + }, nil +} diff --git a/src/internal/crawler/webfeed.go b/src/internal/crawler/webfeed.go new file mode 100644 index 0000000..7363d3e --- /dev/null +++ b/src/internal/crawler/webfeed.go @@ -0,0 +1,67 @@ +package crawler + +import ( + "crowsnest/internal/util" + "log" + "strings" + "time" + + "github.com/gocolly/colly/v2" +) + +type WebFeed struct { + feed chan *Resource + collector *colly.Collector +} + +// Init the WebFeed, starting the process of collecting Resources. +func (sf *WebFeed) Init(indexUrl string, options ...colly.CollectorOption) { + // create feed + sf.feed = make(chan *Resource, 100) + + // set cache, domain pattern and max recursion depth + sf.collector = colly.NewCollector(options...) + + // return IResources aka pages + sf.collector.OnResponse(func(r *colly.Response) { + url := r.Request.URL.String() + body := string(r.Body) + sf.feed <- &Resource{Url: url, Body: body} + }) + + // cascade + sf.collector.OnHTML("a[href]", func(e *colly.HTMLElement) { + url := e.Attr("href") + if !strings.HasPrefix(url, "http") { + return + } + e.Request.Visit(url) + }) + + // start runner + go sf.runner(indexUrl) +} + +// Get the channel into which the collected Resources will be written. +func (sf *WebFeed) Feed() <-chan *Resource { + return sf.feed +} + +func (sf *WebFeed) runner(indexUrl string) { + for { + // sleep for 5min + time.Sleep(time.Second * 300) + + // collect index + urls, err := util.GetAllURLs(indexUrl) + if err != nil { + log.Println("error in WebFeed runner: ", err.Error()) + continue + } + + // visit urls + for _, url := range urls { + sf.collector.Visit(url) + } + } +} diff --git a/src/internal/crawler/zeitconverter.go b/src/internal/crawler/zeitconverter.go new file mode 100644 index 0000000..3fe7845 --- /dev/null +++ b/src/internal/crawler/zeitconverter.go @@ -0,0 +1,100 @@ +package crawler + +import ( + "crowsnest/internal/model" + "errors" + "regexp" + "strings" + "time" + + "github.com/PuerkitoBio/goquery" +) + +type ZeitConverter struct { + pattern_url *regexp.Regexp + pattern_whitespace *regexp.Regexp +} + +func (c *ZeitConverter) Init() { + c.pattern_url = regexp.MustCompile(`^https://(www\.)?zeit\.de[^#]*$`) + c.pattern_whitespace = regexp.MustCompile(`\s+`) +} + +func (c *ZeitConverter) Convert(res *Resource) (*model.Article, error) { + // check url url pattern + if !c.pattern_url.Match([]byte(res.Url)) { + return nil, errors.New("invalid url pattern") + } + + // construct goquery doc + doc, err := goquery.NewDocumentFromReader(strings.NewReader(res.Body)) + if err != nil { + return nil, err + } + + // check for article type + tag := doc.Find("meta[property='og:type']") + pagetype, exists := tag.Attr("content") + if !exists || pagetype != "article" { + return nil, errors.New("unable to extract article, not of type article") + } + + // check for paywall + tag = doc.Find("meta[property='article:content_tier']") + pagetype, exists = tag.Attr("content") + if !exists || pagetype != "free" { + return nil, errors.New("unable to extract article due to paywal") + } + + // get title + tag = doc.Find("meta[property='og:title']") + title, exists := tag.Attr("content") + if !exists { + return nil, errors.New("unable to extract article, no title tag") + } + + // prepend description to content of article + tag = doc.Find("meta[name='description']") + content, exists := tag.Attr("content") + content += " " + if !exists { + return nil, errors.New("unable to extract article, no description tag") + } + + if strings.Contains(content, "Das Liveblog") { + return nil, errors.New("unable to extract article, no support for liveblog") + } + + // get publishing date + tag = doc.Find("meta[name='date']") + datestr, exists := tag.Attr("content") + if !exists { + return nil, errors.New("unable to extract article, no date tag") + } + + date, err := time.Parse("2006-01-02T15:04:05-07:00", datestr) + if err != nil { + return nil, err + } + + // get content + tag = doc.Find("main > article > div.article-body p.article__item") + + tag.Each(func(index int, p *goquery.Selection) { + content += " " + p.Text() + }) + + // clean up content string + content = string(c.pattern_whitespace.ReplaceAll([]byte(content), []byte(" "))) + content = strings.ReplaceAll(content, "»", "\"") + content = strings.ReplaceAll(content, "«", "\"") + + // create new article + return &model.Article{ + SourceUrl: res.Url, + PublishDate: date, + FetchDate: time.Now(), + Title: title, + Content: content, + }, nil +} diff --git a/src/internal/model/database/articlerepository.go b/src/internal/model/database/articlerepository.go new file mode 100644 index 0000000..c2ee7a2 --- /dev/null +++ b/src/internal/model/database/articlerepository.go @@ -0,0 +1,234 @@ +package database + +import ( + "crowsnest/internal/model" + "database/sql" + "net/url" + "strings" +) + +type ArticleRepository struct { + DB *sql.DB +} + +// Gets all the article objects from the database. This may throw an error if +// the connection to the database fails. +func (m *ArticleRepository) All(limit int, offset int) ([]model.Article, error) { + stmt := ` + SELECT id, title, sourceUrl, content, publishDate, fetchDate + FROM articles + ORDER BY publishDate DESC + LIMIT $1 OFFSET $2 + ` + rows, err := m.DB.Query(stmt, limit, offset) + if err != nil { + return nil, err + } + + articles := []model.Article{} + for rows.Next() { + a := model.Article{} + err := rows.Scan(&a.Id, &a.Title, &a.SourceUrl, &a.Content, &a.PublishDate, &a.FetchDate) + if err != nil { + return nil, err + } + + articles = append(articles, a) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return articles, nil +} + +func (m *ArticleRepository) AllArticleViewModels(limit int, offset int) ([]*model.ArticleViewModel, error) { + stmt := ` + SELECT a.id, a.title, a.sourceUrl, a.publishDate, d.summary + FROM articles a JOIN documents d ON a.document_id = d.id + ORDER BY a.publishDate DESC + LIMIT $1 OFFSET $2 + ` + rows, err := m.DB.Query(stmt, limit, offset) + if err != nil { + return nil, err + } + + articleVMs := []*model.ArticleViewModel{} + var sourceUrl string + for rows.Next() { + a := model.ArticleViewModel{} + + err := rows.Scan(&a.Id, &a.Title, &sourceUrl, &a.PublishDate, &a.Summary) + if err != nil { + return nil, err + } + + // summary + if a.Summary == "" { + a.Summary = "N/A" + } + + // short url + parsedURL, err := url.Parse(sourceUrl) + if err == nil { + a.ShortSource = parsedURL.Hostname() + } else { + a.ShortSource = "" + } + + // ai summary always false + a.AiSummarized = false + + articleVMs = append(articleVMs, &a) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return articleVMs, nil + +} + +// Counts all articles in the database. This may throw an error if the +// connection to the database fails. +func (m *ArticleRepository) CountAll() (uint, error) { + stmt := `SELECT count(id) FROM articles ` + + rows := m.DB.QueryRow(stmt) + + count := uint(0) + if err := rows.Scan(&count); err != nil { + return 0, err + } + + return count, nil +} + +// Will use the full-text search features of the underlying database to search +// articles for a given search query. This may fail if the connection to the +// database fails. +func (m *ArticleRepository) SearchArticleViewModel(query string) ([]*model.ArticleViewModel, error) { + stmt := ` + SELECT a.id, a.title, a.sourceUrl, a.publishDate, d.summary + FROM articles a JOIN documents d ON a.document_id = d.id + WHERE to_tsvector('german', d.content) @@ to_tsquery('german', $1) + ORDER BY ts_rank(to_tsvector('german', d.content), to_tsquery('german', $1)) DESC + LIMIT 10 + ` + + query = strings.Join(strings.Split(strings.TrimSpace(query), " "), " | ") + rows, err := m.DB.Query(stmt, query) + if err != nil { + return nil, err + } + + articleVMs := []*model.ArticleViewModel{} + for rows.Next() { + a := &model.ArticleViewModel{} + var sourceUrl string + err := rows.Scan(&a.Id, &a.Title, &sourceUrl, &a.PublishDate, &a.Summary) + if err != nil { + return nil, err + } + // summary + if a.Summary == "" { + a.Summary = "N/A" + } + + // short url + parsedURL, err := url.Parse(sourceUrl) + if err == nil { + a.ShortSource = parsedURL.Hostname() + } else { + a.ShortSource = "" + } + + // ai summary always false + a.AiSummarized = false + + articleVMs = append(articleVMs, a) + } + + if err = rows.Err(); err != nil { + return nil, err + } + return articleVMs, nil +} + +// Will use the full-text search features of the underlying database to search +// articles for a given search query. This may fail if the connection to the +// database fails. +func (m *ArticleRepository) Search(query string) ([]model.Article, error) { + stmt := ` + SELECT a.id, a.title, a.sourceurl, a.content, a.publishdate, a.fetchDate + FROM articles a JOIN documents d ON a.document_id = d.id + WHERE to_tsvector('german', d.content) @@ to_tsquery('german', $1) + ORDER BY ts_rank(to_tsvector('german', d.content), to_tsquery('german', $1)) DESC + LIMIT 10 + ` + + query = strings.Join(strings.Split(strings.TrimSpace(query), " "), " | ") + rows, err := m.DB.Query(stmt, query) + if err != nil { + return nil, err + } + + articles := []model.Article{} + for rows.Next() { + a := model.Article{} + err := rows.Scan(&a.Id, &a.Title, &a.SourceUrl, &a.Content, &a.PublishDate, &a.FetchDate) + if err != nil { + return nil, err + } + + articles = append(articles, a) + } + + if err = rows.Err(); err != nil { + return nil, err + } + return articles, nil +} + +// Will return an article given an id. This may fail if the connection to the +// database fails or there is no aritcle with the given id. +func (m *ArticleRepository) ById(id int) (*model.Article, error) { + stmt := ` + SELECT a.id, a.title, a.sourceurl, a.content, a.publishdate, a.fetchDate + FROM articles a + WHERE a.id = $1 + ` + + rows := m.DB.QueryRow(stmt, id) + + a := &model.Article{} + if err := rows.Scan(&a.Id, &a.Title, &a.SourceUrl, &a.Content, &a.PublishDate, &a.FetchDate); err != nil { + return nil, err + } + + return a, nil +} + +// Inserts a new article into the database. The id attribute of the given +// article will be ignored. May throw an error if the execution of the database +// query fails. +func (m *ArticleRepository) Insert(a *model.Article) error { + // insert article + stmt := `INSERT INTO articles (title, sourceUrl, content, publishDate, fetchDate) + VALUES ($1, $2, $3, $4, $5, $6) + ` + _, err := m.DB.Exec(stmt, a.Title, a.SourceUrl, a.Content, a.PublishDate, a.FetchDate) + return err +} + +func (m *ArticleRepository) Update(a *model.Article) error { + stmt := `UPDATE articles + SET title = $1, sourceUrl = $2, content = $4, publishDate = $5, fetchDate = $6 + WHERE id = $8 + ` + _, err := m.DB.Exec(stmt, a.Title, a.SourceUrl, a.Content, a.PublishDate, a.FetchDate, a.Id) + return err +} diff --git a/src/internal/model/database/documentrepository.go b/src/internal/model/database/documentrepository.go new file mode 100644 index 0000000..944148d --- /dev/null +++ b/src/internal/model/database/documentrepository.go @@ -0,0 +1,109 @@ +package database + +import ( + "crowsnest/internal/model" + "database/sql" +) + +type DocumentRepository struct { + DB *sql.DB +} + +// Gets all the documents objects from the database. This may throw an error if +// the connection to the database fails. +func (d *DocumentRepository) All(limit int, offset int) ([]*model.Document, error) { + stmt := ` + SELECT id, content, summary + FROM documents + LIMIT $1 OFFSET $2 + ` + rows, err := d.DB.Query(stmt, limit, offset) + if err != nil { + return nil, err + } + + docs := []*model.Document{} + for rows.Next() { + d := model.Document{} + err := rows.Scan(&d.Id, &d.Content, &d.Summary) + if err != nil { + return nil, err + } + + docs = append(docs, &d) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return docs, nil +} + +// Will return an article given an id. This may fail if the connection to the +// database fails or there is no aritcle with the given id. +func (m *DocumentRepository) ById(id int) (*model.Document, error) { + stmt := ` + SELECT id, content, summary + FROM documents + WHERE a.id = $1 + ` + + rows := m.DB.QueryRow(stmt, id) + + d := &model.Document{} + if err := rows.Scan(&d.Id, &d.Content, &d.Summary); err != nil { + return nil, err + } + + return d, nil +} + +// Counts all documents in the database. This may throw an error if the +// connection to the database fails. +func (d *DocumentRepository) CountAll() (uint, error) { + stmt := `SELECT count(id) FROM documents` + + rows := d.DB.QueryRow(stmt) + + count := uint(0) + if err := rows.Scan(&count); err != nil { + return 0, err + } + + return count, nil +} + +func (m *DocumentRepository) Update(d *model.Document) error { + stmt := `UPDATE documents + SET content = $1, summary = $2 + WHERE id = $3 + ` + _, err := m.DB.Exec(stmt, d.Content, d.Summary, d.Id) + return err +} + +func (d *DocumentRepository) Map(transform func(*model.Document) *model.Document) (int, error) { + processed := 0 + + count, err := d.CountAll() + if err != nil { + return processed, err + } + + for i := 0; i < int(count); i += 10 { + docs, err := d.All(10, i) + if err != nil { + return processed, err + } + + for _, doc := range docs { + new_doc := transform(doc) + err = d.Update(new_doc) + if err != nil { return processed, err } + processed++ + } + } + + return processed, nil +} diff --git a/src/internal/util/distributer.go b/src/internal/util/distributer.go new file mode 100644 index 0000000..46f5a01 --- /dev/null +++ b/src/internal/util/distributer.go @@ -0,0 +1,37 @@ +package util + +type Distributer[T IClone[T]] struct { + queue chan T + hooks []func(T) +} + +func (d *Distributer[T]) Init() { + d.queue = make(chan T, 100) + d.hooks = make([]func(T), 0) +} + +// Distribute a copy of an item to every hook that has described to this +// Collector. +func (d *Distributer[T]) Publish(item T) { + d.queue <- item +} + +// Add a new hook to the Collector. The hook will be called async whenever a +// new item is published. +func (d *Distributer[T]) Subscribe(hook func(T)) { + d.hooks = append(d.hooks, hook) + if len(d.hooks) == 1 { + go d.runner() + } +} + +// Will be started to run async when Subscribe is first called. Whenever +// Publish is called the runner will distribute a clone of the new item to +// every hook. +func (d *Distributer[T]) runner() { + for val := range d.queue { + for _, f := range d.hooks { + go f(val.Clone()) + } + } +} diff --git a/src/internal/util/iclone.go b/src/internal/util/iclone.go new file mode 100644 index 0000000..e31801c --- /dev/null +++ b/src/internal/util/iclone.go @@ -0,0 +1,5 @@ +package util + +type IClone[T any] interface { + Clone() T +}