mirror of https://github.com/perkeep/perkeep.git
server/camnetdns: persist records in datastore
Store records in Google Datastore in production (on GCE), with an LRU cache in front. Keep dev mode that stores in sorted mem key value though. Change-Id: I17b088b56a68a019f4e253b60c6bd42395a64984
This commit is contained in:
parent
885389a10f
commit
9a32570a48
|
@ -21,10 +21,15 @@ package main
|
|||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"camlistore.org/pkg/gpgchallenge"
|
||||
"camlistore.org/pkg/lru"
|
||||
"camlistore.org/pkg/sorted"
|
||||
|
||||
"cloud.google.com/go/compute/metadata"
|
||||
|
@ -32,21 +37,19 @@ import (
|
|||
"cloud.google.com/go/logging"
|
||||
"github.com/miekg/dns"
|
||||
"go4.org/cloud/cloudlaunch"
|
||||
"golang.org/x/net/context"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
)
|
||||
|
||||
var flagServerIP = flag.String("server_ip", "104.154.231.160", "The IP address of the authoritative name server for camlistore.net, i.e. the address where this program will run.")
|
||||
|
||||
// TODO(mpl): pass the server ip to the launchConfig, so we create the instance
|
||||
// with this specific IP. Which means, we'll have to book it as a static address in
|
||||
// Google Cloud I suppose?
|
||||
// Or, we hope we're lucky and we never have to destroy the camnet-dns VM (and lose
|
||||
// its current IP)?
|
||||
var (
|
||||
addr = flag.String("addr", defaultListenAddr(), "specify address for server to listen on")
|
||||
flagServerIP = flag.String("server_ip", "104.154.231.160", "The IP address of the authoritative name server for camlistore.net, i.e. the address where this program will run.")
|
||||
)
|
||||
|
||||
var launchConfig = &cloudlaunch.Config{
|
||||
Name: "camnetdns",
|
||||
BinaryBucket: "camlistore-dnsserver-resource",
|
||||
GCEProjectID: "camlistore-website",
|
||||
GCEProjectID: GCEProjectID,
|
||||
Scopes: []string{
|
||||
compute.ComputeScope,
|
||||
logging.Scope,
|
||||
|
@ -54,10 +57,17 @@ var launchConfig = &cloudlaunch.Config{
|
|||
},
|
||||
}
|
||||
|
||||
// DefaultResponseTTL is the record TTL in seconds
|
||||
const DefaultResponseTTL = 300
|
||||
const (
|
||||
GCEProjectID = "camlistore-website"
|
||||
// DefaultResponseTTL is the record TTL in seconds
|
||||
DefaultResponseTTL = 300
|
||||
// even if record already existed in store, we overwrite it if it is older than 30 days, for analytics.
|
||||
staleRecord = 30 * 24 * time.Hour
|
||||
// max number of records in the lru cache
|
||||
cacheSize = 1e6
|
||||
)
|
||||
|
||||
var ErrRecordNotFound = errors.New("record not found")
|
||||
var errRecordNotFound = errors.New("record not found")
|
||||
|
||||
func defaultListenAddr() string {
|
||||
if metadata.OnGCE() {
|
||||
|
@ -66,13 +76,121 @@ func defaultListenAddr() string {
|
|||
return ":5300"
|
||||
}
|
||||
|
||||
// DNSServer implements the dns.Handler interface to serve A and AAAA
|
||||
// records using a sorted.KeyValue for the lookups.
|
||||
type DNSServer struct {
|
||||
dataSource sorted.KeyValue
|
||||
type keyValue interface {
|
||||
// Get fetches the value for key. It returns errRecordNotFound when
|
||||
// there is no such record.
|
||||
Get(key string) (string, error)
|
||||
Set(key, value string) error
|
||||
}
|
||||
|
||||
func NewDNSServer(src sorted.KeyValue) *DNSServer {
|
||||
// cachedStore is a keyValue implementation that stores in Google's datastore
|
||||
// with dsClient. It automatically stores to cache as well on writes, and always
|
||||
// tries to read from cache first.
|
||||
type cachedStore struct {
|
||||
// datastore client to store the records. It should not be nil.
|
||||
dsClient *datastore.Client
|
||||
// cache stores the most recent records. It should not be nil.
|
||||
cache *lru.Cache
|
||||
}
|
||||
|
||||
// dsValue is the value type written to the datastore
|
||||
type dsValue struct {
|
||||
// Record is the RHS of an A or AAAA DNS record, i.e. an IPV4 or IPV6
|
||||
// address.
|
||||
Record string
|
||||
// Updated is the last time this key value pair was inserted. Values
|
||||
// older than 30 days are rewritten on writes.
|
||||
Updated time.Time
|
||||
}
|
||||
|
||||
func (cs cachedStore) Get(key string) (string, error) {
|
||||
val, ok := cs.cache.Get(key)
|
||||
if ok {
|
||||
return val.(string), nil
|
||||
}
|
||||
// Cache Miss. hit the datastore.
|
||||
ctx := context.Background()
|
||||
dk := datastore.NewKey(ctx, "camnetdns", key, 0, nil)
|
||||
var value dsValue
|
||||
if err := cs.dsClient.Get(ctx, dk, &value); err != nil {
|
||||
if err != datastore.ErrNoSuchEntity {
|
||||
return "", fmt.Errorf("error getting value for %q from datastore: %v", key, err)
|
||||
}
|
||||
return "", errRecordNotFound
|
||||
}
|
||||
// And cache it.
|
||||
cs.cache.Add(key, value.Record)
|
||||
return value.Record, nil
|
||||
}
|
||||
|
||||
func (cs cachedStore) put(ctx context.Context, key, value string) error {
|
||||
dk := datastore.NewKey(ctx, "camnetdns", key, 0, nil)
|
||||
val := &dsValue{
|
||||
Record: value,
|
||||
Updated: time.Now(),
|
||||
}
|
||||
if _, err := cs.dsClient.Put(ctx, dk, val); err != nil {
|
||||
return fmt.Errorf("error writing (%q : %q) record to datastore: %v", key, value, err)
|
||||
}
|
||||
// and cache it.
|
||||
cs.cache.Add(key, value)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set writes the key, value pair to cs. But it does not actually write if the
|
||||
// value already exists, is up to date, and is more recent than 30 days.
|
||||
func (cs cachedStore) Set(key, value string) error {
|
||||
// check if record already exists
|
||||
ctx := context.Background()
|
||||
dk := datastore.NewKey(ctx, "camnetdns", key, 0, nil)
|
||||
var oldValue dsValue
|
||||
if err := cs.dsClient.Get(ctx, dk, &oldValue); err != nil {
|
||||
if err != datastore.ErrNoSuchEntity {
|
||||
return fmt.Errorf("error checking if record exists for %q from datastore: %v", key, err)
|
||||
}
|
||||
// record does not exist, write it.
|
||||
return cs.put(ctx, key, value)
|
||||
}
|
||||
// record already exists
|
||||
if oldValue.Record != value {
|
||||
// new record is different, overwrite old one.
|
||||
return cs.put(ctx, key, value)
|
||||
}
|
||||
// record is the same as before
|
||||
if oldValue.Updated.Add(staleRecord).After(time.Now()) {
|
||||
// record is still fresh, nothing to do.
|
||||
return nil
|
||||
}
|
||||
// record is older than 30 days, so we rewrite it, for analytics.
|
||||
return cs.put(ctx, key, value)
|
||||
}
|
||||
|
||||
type memkv struct {
|
||||
skv sorted.KeyValue
|
||||
}
|
||||
|
||||
func (kv memkv) Get(key string) (string, error) {
|
||||
val, err := kv.skv.Get(key)
|
||||
if err != nil {
|
||||
if err != sorted.ErrNotFound {
|
||||
return "", err
|
||||
}
|
||||
return "", errRecordNotFound
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
func (kv memkv) Set(key, value string) error {
|
||||
return kv.skv.Set(key, value)
|
||||
}
|
||||
|
||||
// DNSServer implements the dns.Handler interface to serve A and AAAA
|
||||
// records, using a KeyValue store for the lookups.
|
||||
type DNSServer struct {
|
||||
dataSource keyValue
|
||||
}
|
||||
|
||||
func newDNSServer(src keyValue) *DNSServer {
|
||||
return &DNSServer{
|
||||
dataSource: src,
|
||||
}
|
||||
|
@ -87,7 +205,7 @@ const (
|
|||
domain = "camlistore.net."
|
||||
authorityNS = "camnetdns.camlistore.org."
|
||||
// Increment after every change with format YYYYMMDDnn.
|
||||
soaSerial = 2016102101
|
||||
soaSerial = 2016121401
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -160,7 +278,7 @@ func (ds *DNSServer) ServeDNS(rw dns.ResponseWriter, mes *dns.Msg) {
|
|||
log.Printf("DNS request from %s: %s", rw.RemoteAddr(), &q)
|
||||
|
||||
answer, err := ds.HandleLookup(q.Name)
|
||||
if err == sorted.ErrNotFound {
|
||||
if err == errRecordNotFound {
|
||||
resp.SetRcode(mes, dns.RcodeNameError)
|
||||
if err := rw.WriteMsg(resp); err != nil {
|
||||
log.Printf("error responding to DNS query: %s", err)
|
||||
|
@ -251,38 +369,63 @@ func (ds *DNSServer) ServeDNS(rw dns.ResponseWriter, mes *dns.Msg) {
|
|||
|
||||
func main() {
|
||||
launchConfig.MaybeDeploy()
|
||||
addr := flag.String("addr", defaultListenAddr(), "specify address for server to listen on")
|
||||
flag.Parse()
|
||||
|
||||
memkv := sorted.NewMemoryKeyValue()
|
||||
if err := memkv.Set("6401800c.camlistore.net.", "159.203.246.79"); err != nil {
|
||||
panic(err)
|
||||
var kv keyValue
|
||||
var httpsListenAddr string
|
||||
if metadata.OnGCE() {
|
||||
httpsListenAddr = ":443"
|
||||
dsClient, err := datastore.NewClient(context.Background(), GCEProjectID)
|
||||
if err != nil {
|
||||
log.Fatalf("Error creating datastore client for records: %v", err)
|
||||
}
|
||||
kv = cachedStore{
|
||||
dsClient: dsClient,
|
||||
cache: lru.New(cacheSize),
|
||||
}
|
||||
} else {
|
||||
httpsListenAddr = ":4430"
|
||||
kv = memkv{skv: sorted.NewMemoryKeyValue()}
|
||||
}
|
||||
if err := memkv.Set(domain, *flagServerIP); err != nil {
|
||||
panic(err)
|
||||
if err := kv.Set("6401800c.camlistore.net.", "159.203.246.79"); err != nil {
|
||||
log.Fatalf("Error adding %v:%v record", "6401800c.camlistore.net.", "159.203.246.79")
|
||||
}
|
||||
if err := memkv.Set("www.camlistore.net.", *flagServerIP); err != nil {
|
||||
panic(err)
|
||||
if err := kv.Set(domain, *flagServerIP); err != nil {
|
||||
log.Fatalf("Error adding %v:%v record", domain, *flagServerIP)
|
||||
}
|
||||
if err := memkv.Set("wip.camlistore.net.", "104.199.42.193"); err != nil {
|
||||
panic(err)
|
||||
if err := kv.Set("www.camlistore.net.", *flagServerIP); err != nil {
|
||||
log.Fatalf("Error adding %v:%v record", "www.camlistore.net.", *flagServerIP)
|
||||
}
|
||||
|
||||
ds := NewDNSServer(memkv)
|
||||
ds := newDNSServer(kv)
|
||||
cs := &gpgchallenge.Server{
|
||||
OnSuccess: func(identity string, address string) error {
|
||||
log.Printf("Adding %v.camlistore.net. as %v", identity, address)
|
||||
return ds.dataSource.Set(strings.ToLower(identity+".camlistore.net."), address)
|
||||
},
|
||||
}
|
||||
|
||||
log.Printf("serving DNS on %s\n", *addr)
|
||||
tcperr := make(chan error, 1)
|
||||
udperr := make(chan error, 1)
|
||||
httperr := make(chan error, 1)
|
||||
log.Printf("serving DNS on %s\n", *addr)
|
||||
go func() {
|
||||
tcperr <- dns.ListenAndServe(*addr, "tcp", ds)
|
||||
}()
|
||||
go func() {
|
||||
udperr <- dns.ListenAndServe(*addr, "udp", ds)
|
||||
}()
|
||||
go func() {
|
||||
// TODO(mpl): get a cert for camnetdns.camlistore.org from Let's
|
||||
// Encrypt. Then we can switch to ListenAndServeTLS here. Next CL.
|
||||
httperr <- http.ListenAndServe(httpsListenAddr, cs)
|
||||
}()
|
||||
select {
|
||||
case err := <-tcperr:
|
||||
log.Fatalf("DNS over TCP error: %v", err)
|
||||
case err := <-udperr:
|
||||
log.Fatalf("DNS error: %v", err)
|
||||
case err := <-httperr:
|
||||
log.Fatalf("HTTP server error: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue