Browse Source

Adds dockerfiles for worker and listener, more implementation

master
Noah Pederson 2 years ago
parent
commit
dcf24697a0
10 changed files with 212 additions and 5 deletions
  1. +37
    -0
      Dockerfile.listener
  2. +37
    -0
      Dockerfile.worker
  3. +9
    -2
      cmd/listener/main.go
  4. +39
    -0
      cmd/worker/main.go
  5. +7
    -0
      go.mod
  6. +10
    -0
      go.sum
  7. +7
    -0
      pkg/common/const.go
  8. +7
    -0
      pkg/common/models.go
  9. +35
    -0
      pkg/listener/queue.go
  10. +24
    -3
      pkg/listener/web.go

+ 37
- 0
Dockerfile.listener View File

@ -0,0 +1,37 @@
FROM golang:1.11-alpine3.8 AS builder
MAINTAINER Noah Pederson
EXPOSE 8008
# Install some dependencies needed to build the project
RUN apk add bash ca-certificates git gcc g++ libc-dev
FROM builder AS build
RUN mkdir /build
COPY . /build
WORKDIR /build/cmd/listener
ENV GO111MODULES on
RUN go mod download
RUN go build
FROM alpine:3.8 AS runner
RUN mkdir /app
RUN addgroup -g 1000 app && \
adduser -D -h /app -u 1000 -G app app
RUN chown app:app /app
USER app
WORKDIR /app
COPY --from=build /build/cmd/listener/listener /app/listener
ENTRYPOINT [ "./listener" ]

+ 37
- 0
Dockerfile.worker View File

@ -0,0 +1,37 @@
FROM golang:1.11-alpine3.8 AS builder
MAINTAINER Noah Pederson
EXPOSE 8008
# Install some dependencies needed to build the project
RUN apk add bash ca-certificates git gcc g++ libc-dev
FROM builder AS build
RUN mkdir /build
COPY . /build
WORKDIR /build/cmd/worker
ENV GO111MODULES on
RUN go mod download
RUN go build
FROM alpine:3.8 AS runner
RUN mkdir /app
RUN addgroup -g 1000 app && \
adduser -D -h /app -u 1000 -G app app
RUN chown app:app /app
USER app
WORKDIR /app
COPY --from=build /build/cmd/worker/worker /app/worker
ENTRYPOINT [ "./worker" ]

+ 9
- 2
cmd/listener/main.go View File

@ -11,12 +11,19 @@ import (
func main() {
port := os.Getenv("HTTP_PORT")
natsHost := os.Getenv("NATS_HOST")
if port == "" {
port = "8008"
}
listerService := listener.Service{
listenerService := listener.Service{
SubscriberVerificationCode: os.Getenv("SUBSCRIBER_VERIFICATION_CODE"),
}
http.HandleFunc("/webhook", listerService.HandleFitbitNotification)
err := listenerService.InitQueue(natsHost)
if err != nil {
log.Fatalf("Unable to connect to queue: %s", err.Error())
}
defer listenerService.Cleanup()
http.HandleFunc("/webhook", listenerService.HandleFitbitNotification)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
}

+ 39
- 0
cmd/worker/main.go View File

@ -1,5 +1,44 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"git.packetlostandfound.us/chiefnoah/fitbit-processor/pkg/common"
nats "github.com/nats-io/go-nats"
)
func main() {
natsHost := os.Getenv("NATS_HOST")
log.Printf("Connecting to queue at %s", natsHost)
nc, err := nats.Connect(natsHost)
if err != nil {
log.Fatalf("Unable to connect to queue: %s", err.Error())
}
q, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
log.Fatalf("Unable to create encoded queue connection: %s", err.Error())
}
_, err = q.QueueSubscribe(common.NatsQueueName, common.NatsQueueGroup, func(n *common.FitBitNotification) {
log.Printf("Received message: %+v", n)
})
if err != nil {
log.Fatalf("Unable to subscrbie to %s as queue group %s: %s", common.NatsQueueName, common.NatsQueueGroup, err.Error())
}
var gracefulStop = make(chan os.Signal)
signal.Notify(gracefulStop, syscall.SIGTERM)
signal.Notify(gracefulStop, syscall.SIGINT)
// block here until we get told to shutdown
sig := <-gracefulStop
fmt.Printf("caught sig: %+v\n", sig)
fmt.Println("closing queue connection...")
q.Close()
os.Exit(0)
}

+ 7
- 0
go.mod View File

@ -1 +1,8 @@
module git.packetlostandfound.us/chiefnoah/fitbit-processor
require (
github.com/lib/pq v1.0.0
github.com/nats-io/go-nats v1.7.0
github.com/nats-io/nkeys v0.0.2 // indirect
github.com/nats-io/nuid v1.0.0 // indirect
)

+ 10
- 0
go.sum View File

@ -0,0 +1,10 @@
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/nats-io/go-nats v1.7.0 h1:oQOfHcLr8hb43QG8yeVyY2jtarIaTjOv41CGdF3tTvQ=
github.com/nats-io/go-nats v1.7.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0=
github.com/nats-io/nkeys v0.0.2 h1:+qM7QpgXnvDDixitZtQUBDY9w/s9mu1ghS+JIbsrx6M=
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs=
github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

+ 7
- 0
pkg/common/const.go View File

@ -0,0 +1,7 @@
package common
//NatsQueueName is the name of the queue used by the publisher and subscriber to transmit messages
const NatsQueueName = "fitbit.notifications"
//NatsQueueGroup defines the queue group for fitbit notifications
const NatsQueueGroup = "fitbit_group"

+ 7
- 0
pkg/common/models.go View File

@ -1,3 +1,10 @@
package common
// Models
type FitBitNotification struct {
CollectionType string `json:"collectionType"`
Date string `json:"date"`
OwnerID string `json:"ownerId"`
OwnerType string `json:"ownerType"`
SubscriptionID string `json:"subscriptionId"`
}

+ 35
- 0
pkg/listener/queue.go View File

@ -0,0 +1,35 @@
package listener
import (
"log"
"git.packetlostandfound.us/chiefnoah/fitbit-processor/pkg/common"
nats "github.com/nats-io/go-nats"
)
//InitQueue establishes a connection to a queue server
func (s *Service) InitQueue(host string) error {
log.Printf("Connecting to queue at %s", host)
nc, err := nats.Connect(host)
if err != nil {
return err
}
q, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
return err
}
s.q = q
return nil
}
//Cleanup cleans up connections set up for a Service to run
func (s *Service) Cleanup() {
log.Print("Closing connection to queue")
s.q.Close()
}
//PublishNotification sends a fitbit notification to the queue for processing
func (s *Service) PublishNotification(n *common.FitBitNotification) error {
return s.q.Publish(common.NatsQueueName, n)
}

+ 24
- 3
pkg/listener/web.go View File

@ -1,18 +1,24 @@
package listener
import (
"encoding/json"
"log"
"net/http"
"git.packetlostandfound.us/chiefnoah/fitbit-processor/pkg/common"
nats "github.com/nats-io/go-nats"
)
// FitbitListenerService defines an instance of a listner service for receving data events from Fitbit
// Service defines an instance of a listner service for receving data events from Fitbit
type Service struct {
SubscriberVerificationCode string
q *nats.EncodedConn
}
//HandleFitbitNotification handles a notification event from Fitbit
func (s *Service) HandleFitbitNotification(w http.ResponseWriter, r *http.Request) {
log.Printf("Handling request from fitbit: %+f", r)
log.Printf("Handling request from fitbit: %s", r.URL.Path)
verify, ok := r.URL.Query()["verify"]
if ok {
if verify[0] == s.SubscriberVerificationCode {
@ -27,5 +33,20 @@ func (s *Service) HandleFitbitNotification(w http.ResponseWriter, r *http.Reques
return
}
}
w.WriteHeader(http.StatusAccepted)
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
var notifications []common.FitBitNotification
err := decoder.Decode(&notifications)
if err != nil {
log.Printf("Called: %s - Invalid JSON payload: %s", r.URL.Path, err)
http.Error(w, "Invalid JSON request", http.StatusBadRequest)
return
}
for _, noticiation := range notifications {
err = s.PublishNotification(&noticiation)
if err != nil {
log.Printf("Error queuing notification: %s", err.Error())
}
}
w.WriteHeader(http.StatusNoContent)
}

Loading…
Cancel
Save