Browse Source

Adds discord notifier, docker-compose file, changes to list of events

No longer sends a queue event for each event sent from fitbit, instead just forwards exactly what we get from fitbit to the queue for processing.
master
Noah Pederson 2 years ago
parent
commit
e473de1486
6 changed files with 163 additions and 7 deletions
  1. +39
    -0
      Dockerfile.discordworker
  2. +60
    -0
      cmd/discordworker/main.go
  3. +2
    -2
      cmd/worker/main.go
  4. +54
    -0
      docker-compose.yml
  5. +5
    -0
      pkg/listener/queue.go
  6. +3
    -5
      pkg/listener/web.go

+ 39
- 0
Dockerfile.discordworker View File

@ -0,0 +1,39 @@
FROM golang:1.11-alpine3.8 AS builder
MAINTAINER Noah Pederson
EXPOSE 8008
# Install some dependencies needed to build the project
RUN apk add bash ca-certificates git gcc g++ libc-dev
FROM builder AS build
RUN mkdir /build
COPY . /build
WORKDIR /build/cmd/discordworker
ENV GO111MODULES on
RUN go mod download
RUN go build
FROM alpine:3.8 AS runner
RUN mkdir /app
RUN apk add ca-certificates
RUN addgroup -g 1000 app && \
adduser -D -h /app -u 1000 -G app app
RUN chown app:app /app
USER app
WORKDIR /app
COPY --from=build /build/cmd/discordworker/discordworker /app/discordworker
ENTRYPOINT [ "./discordworker" ]

+ 60
- 0
cmd/discordworker/main.go View File

@ -0,0 +1,60 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"git.packetlostandfound.us/chiefnoah/fitbit-processor/pkg/common"
nats "github.com/nats-io/go-nats"
)
func main() {
natsHost := os.Getenv("NATS_HOST")
discordWebhook := os.Getenv("DISCORD_WEBHOOK_URL")
discordUsername := os.Getenv("DISCORD_WEBHOOK_USERNAME")
log.Printf("Discord webhook URL set to %s", discordWebhook)
log.Printf("Connecting to queue at %s", natsHost)
nc, err := nats.Connect(natsHost)
if err != nil {
log.Fatalf("Unable to connect to queue: %s", err.Error())
}
q, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatalf("Unable to create encoded queue connection: %s", err.Error())
}
_, err = q.Subscribe(common.NatsQueueName, func(n *[]common.FitBitNotification) {
log.Printf("Received message: %+v", n)
content, _ := json.MarshalIndent(n, "", " ")
message := map[string]string{"content": fmt.Sprintf("Received fitbit update notification:\n```json\n%s\n```",
content), "username": discordUsername}
m, _ := json.Marshal(message)
resp, err := http.Post(discordWebhook, "application/json", bytes.NewBuffer(m))
if err != nil {
log.Printf("Error sending discord webhook: %s", err.Error())
return
}
log.Printf("Sent discord webhook. Response code: %s", resp.Status)
})
if err != nil {
log.Fatalf("Unable to subscrbie to %s as queue group %s: %s", common.NatsQueueName, common.NatsQueueGroup, err.Error())
}
var gracefulStop = make(chan os.Signal)
signal.Notify(gracefulStop, syscall.SIGTERM)
signal.Notify(gracefulStop, syscall.SIGINT)
// block here until we get told to shutdown
sig := <-gracefulStop
fmt.Printf("caught sig: %+v\n", sig)
fmt.Println("closing queue connection...")
q.Close()
os.Exit(0)
}

+ 2
- 2
cmd/worker/main.go View File

@ -24,8 +24,8 @@ func main() {
log.Fatalf("Unable to create encoded queue connection: %s", err.Error())
}
_, err = q.QueueSubscribe(common.NatsQueueName, common.NatsQueueGroup, func(n *common.FitBitNotification) {
log.Printf("Received message: %+v", n)
_, err = q.QueueSubscribe(common.NatsQueueName, common.NatsQueueGroup, func(n *[]common.FitBitNotification) {
log.Printf("Received message: %+v", *n)
})
if err != nil {
log.Fatalf("Unable to subscrbie to %s as queue group %s: %s", common.NatsQueueName, common.NatsQueueGroup, err.Error())


+ 54
- 0
docker-compose.yml View File

@ -0,0 +1,54 @@
---
version: "3.7"
services:
fitbit-listener:
image: "fitbit-listener:latest"
build:
context: .
dockerfile: Dockerfile.listener
expose:
- "8008"
ports:
- "8008:8008"
restart: always
environment:
- NATS_HOST=nats-main
- SUBSCRIBER_VERIFICATION_CODE=valid
depends_on:
- nats-main
networks:
- fitbit
fitbit-worker:
image: "fitbit-worker:latest"
build:
context: .
dockerfile: Dockerfile.worker
restart: always
depends_on:
- nats-main
environment:
- NATS_HOST=nats-main
networks:
- fitbit
fitbit-discordworker:
image: "fitbit-discordworker:latest"
build:
context: .
dockerfile: Dockerfile.discordworker
restart: always
depends_on:
- nats-main
environment:
- NATS_HOST=nats-main
- "DISCORD_WEBHOOK_URL=https://discordapp.com/api/webhooks/526498412334415874/_j21Nj8RMDAHLvpMvKvwpdzagmPZl5hEV5v4xIpyNM2SXKF3Rx0EvbURS1xJNlFLIRjT"
networks:
- fitbit
nats-main:
image: "nats:1.3.0-linux"
restart: always
networks:
- fitbit
networks:
fitbit:
external: false

+ 5
- 0
pkg/listener/queue.go View File

@ -33,3 +33,8 @@ func (s *Service) Cleanup() {
func (s *Service) PublishNotification(n *common.FitBitNotification) error {
return s.q.Publish(common.NatsQueueName, n)
}
//PublishNotifications sends a fitbit notification to the queue for processing
func (s *Service) PublishNotifications(n *[]common.FitBitNotification) error {
return s.q.Publish(common.NatsQueueName, n)
}

+ 3
- 5
pkg/listener/web.go View File

@ -42,11 +42,9 @@ func (s *Service) HandleFitbitNotification(w http.ResponseWriter, r *http.Reques
http.Error(w, "Invalid JSON request", http.StatusBadRequest)
return
}
for _, noticiation := range notifications {
err = s.PublishNotification(&noticiation)
if err != nil {
log.Printf("Error queuing notification: %s", err.Error())
}
err = s.PublishNotifications(&notifications)
if err != nil {
log.Printf("Error queuing notification: %s", err.Error())
}
w.WriteHeader(http.StatusNoContent)
}

Loading…
Cancel
Save