gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

commit b62071330cd7e0445e89660d07b7aed098f80285
parent 835c8e8b45487c1276426034b5afb915853b6eb1
Author: Bernd Fix <brf@hoi-polloi.org>
Date:   Mon, 15 Aug 2022 14:02:05 +0200

Milestone 2+3 (NLnet funding)

Diffstat:
Msrc/gnunet/cmd/gnunet-service-dht-go/main.go | 2+-
Msrc/gnunet/config/config.go | 9+++++----
Msrc/gnunet/config/gnunet-config.json | 5+++--
Msrc/gnunet/core/hello_test.go | 4++--
Msrc/gnunet/core/peer_test.go | 2+-
Msrc/gnunet/crypto/gns.go | 4+++-
Msrc/gnunet/crypto/hash.go | 2+-
Msrc/gnunet/go.mod | 4++--
Msrc/gnunet/go.sum | 4++--
Msrc/gnunet/message/msg_dht_p2p.go | 256++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
Msrc/gnunet/service/connection.go | 4++--
Msrc/gnunet/service/dht/blocks/filters.go | 16+++++++++++++---
Msrc/gnunet/service/dht/blocks/handlers.go | 13++++++++++---
Msrc/gnunet/service/dht/blocks/hello.go | 70+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
Asrc/gnunet/service/dht/blocks/hello_test.go | 164+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/gnunet/service/dht/local.go | 58++++++++++++++++++----------------------------------------
Msrc/gnunet/service/dht/messages.go | 191+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
Msrc/gnunet/service/dht/module.go | 128+++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------
Msrc/gnunet/service/dht/path/elements.go | 2+-
Msrc/gnunet/service/dht/path/handling.go | 9++++++---
Msrc/gnunet/service/dht/resulthandler.go | 45++++++++++++++++++++++++++++++++++-----------
Msrc/gnunet/service/dht/routingtable.go | 38+++++++++++++++++++++++---------------
Msrc/gnunet/service/dht/routingtable_test.go | 10+++++-----
Msrc/gnunet/service/gns/module.go | 1+
Msrc/gnunet/service/namecache/module.go | 16+++++++++++-----
Dsrc/gnunet/service/store/dhtstore_test.go | 109-------------------------------------------------------------------------------
Dsrc/gnunet/service/store/store.go | 285-------------------------------------------------------------------------------
Asrc/gnunet/service/store/store_dht.go | 380+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/gnunet/service/store/store_dht_meta.go | 213+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/gnunet/service/store/store_dht_meta.sql | 30++++++++++++++++++++++++++++++
Asrc/gnunet/service/store/store_dht_test.go | 118+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dsrc/gnunet/service/store/store_fs.go | 328-------------------------------------------------------------------------------
Dsrc/gnunet/service/store/store_fs_meta.go | 177-------------------------------------------------------------------------------
Dsrc/gnunet/service/store/store_fs_meta.sql | 29-----------------------------
Asrc/gnunet/service/store/store_kv.go | 229+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/gnunet/transport/responder.go | 9+++++----
Msrc/gnunet/util/map.go | 2+-
Msrc/gnunet/util/misc.go | 11+++++++++++
Msrc/gnunet/util/peer.go | 6+++---
39 files changed, 1774 insertions(+), 1209 deletions(-)

diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go b/src/gnunet/cmd/gnunet-service-dht-go/main.go @@ -137,7 +137,7 @@ func main() { // check for HELLO URL if strings.HasPrefix(bs, "gnunet://hello/") { var hb *blocks.HelloBlock - if hb, err = blocks.ParseHelloURL(bs, true); err != nil { + if hb, err = blocks.ParseHelloBlockFromURL(bs, true); err != nil { logger.Printf(logger.ERROR, "[dht] failed bootstrap HELLO URL %s: %s", bs, err.Error()) continue } diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go @@ -90,9 +90,9 @@ type ServiceConfig struct { // GNSConfig contains parameters for the GNU Name System service type GNSConfig struct { - Service *ServiceConfig `json:"service"` // socket for GNS service - DHTReplLevel int `json:"dhtReplLevel"` // DHT replication level - MaxDepth int `json:"maxDepth"` // maximum recursion depth in resolution + Service *ServiceConfig `json:"service"` // socket for GNS service + ReplLevel int `json:"replLevel"` // DHT replication level + MaxDepth int `json:"maxDepth"` // maximum recursion depth in resolution } //---------------------------------------------------------------------- @@ -109,7 +109,8 @@ type DHTConfig struct { // RoutingConfig holds parameters for routing tables type RoutingConfig struct { - PeerTTL int `json:"peerTTL"` // time-out for peers in table + PeerTTL int `json:"peerTTL"` // time-out for peers in table + ReplLevel int `json:"replLevel"` // replication level } //---------------------------------------------------------------------- diff --git a/src/gnunet/config/gnunet-config.json b/src/gnunet/config/gnunet-config.json @@ -37,7 +37,8 @@ "maxGB": 10 }, "routing": { - "peerTTL": 10800 + "peerTTL": 10800, + "replLevel": 5 }, "heartbeat": 900 }, @@ -48,7 +49,7 @@ "perm": "0770" } }, - "dhtReplLevel": 10, + "replLevel": 10, "maxDepth": 250 }, "namecache": { diff --git a/src/gnunet/core/hello_test.go b/src/gnunet/core/hello_test.go @@ -64,7 +64,7 @@ var ( func TestHelloURLDirect(t *testing.T) { for _, hu := range helloURL { - if _, err := blocks.ParseHelloURL(hu, false); err != nil { + if _, err := blocks.ParseHelloBlockFromURL(hu, false); err != nil { t.Fatal(err) } } @@ -93,7 +93,7 @@ func TestHelloURL(t *testing.T) { // convert to and from HELLO URL url1 := hd.URL() - hd2, err := blocks.ParseHelloURL(url1, true) + hd2, err := blocks.ParseHelloBlockFromURL(url1, true) if err != nil { t.Fatal(err) } diff --git a/src/gnunet/core/peer_test.go b/src/gnunet/core/peer_test.go @@ -67,7 +67,7 @@ func TestPeerHello(t *testing.T) { // convert to URL and back u := h.URL() t.Log(u) - h2, err := blocks.ParseHelloURL(u, true) + h2, err := blocks.ParseHelloBlockFromURL(u, true) if err != nil { t.Fatal(err) } diff --git a/src/gnunet/crypto/gns.go b/src/gnunet/crypto/gns.go @@ -417,7 +417,9 @@ func NewZoneSignature(d []byte) (sig *ZoneSignature, err error) { } // set signature implementation zs := impl.NewSignature() - err = zs.Init(sig.Signature) + if err = zs.Init(sig.Signature); err != nil { + return + } sig.impl = zs // set public key implementation zk := impl.NewPublic() diff --git a/src/gnunet/crypto/hash.go b/src/gnunet/crypto/hash.go @@ -58,7 +58,7 @@ func NewHashCode(data []byte) *HashCode { hc := new(HashCode) size := hc.Size() v := make([]byte, size) - if data != nil && len(data) > 0 { + if len(data) > 0 { if uint(len(data)) < size { util.CopyAlignedBlock(v, data) } else { diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod @@ -3,7 +3,7 @@ module gnunet go 1.18 require ( - github.com/bfix/gospel v1.2.17 + github.com/bfix/gospel v1.2.18 github.com/go-redis/redis/v8 v8.11.5 github.com/go-sql-driver/mysql v1.6.0 github.com/gorilla/mux v1.8.0 @@ -24,4 +24,4 @@ require ( golang.org/x/tools v0.1.11 // indirect ) -// replace github.com/bfix/gospel v1.2.17 => ../gospel +//replace github.com/bfix/gospel v1.2.18 => ../gospel diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum @@ -1,5 +1,5 @@ -github.com/bfix/gospel v1.2.17 h1:Stvm+OiCA2GIWIhI/HKc6uaLDMtrJNxXgw/g+v9witw= -github.com/bfix/gospel v1.2.17/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= +github.com/bfix/gospel v1.2.18 h1:X9hYudt5dvjYTGGmKC4T7qcLdb7ORblVD4kAC/ZYXdU= +github.com/bfix/gospel v1.2.18/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= diff --git a/src/gnunet/message/msg_dht_p2p.go b/src/gnunet/message/msg_dht_p2p.go @@ -113,20 +113,20 @@ func (m *DHTP2PGetMsg) Update(pf *blocks.PeerFilter, rf blocks.ResultFilter, hop // DHTP2PPutMsg wire layout type DHTP2PPutMsg struct { - MsgSize uint16 `order:"big"` // total size of message - MsgType uint16 `order:"big"` // DHT_P2P_PUT (146) - BType uint32 `order:"big"` // block type - Flags uint16 `order:"big"` // processing flags - HopCount uint16 `order:"big"` // message hops - ReplLvl uint16 `order:"big"` // replication level - PathL uint16 `order:"big"` // path length - Expiration util.AbsoluteTime `` // expiration date - PeerFilter *blocks.PeerFilter `` // peer bloomfilter - Key *crypto.HashCode `` // query key to block - TruncOrigin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) - PutPath []*path.Entry `size:"PathL"` // PUT path - LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) - Block []byte `size:"*"` // block data + MsgSize uint16 `order:"big"` // total size of message + MsgType uint16 `order:"big"` // DHT_P2P_PUT (146) + BType uint32 `order:"big"` // block type + Flags uint16 `order:"big"` // processing flags + HopCount uint16 `order:"big"` // message hops + ReplLvl uint16 `order:"big"` // replication level + PathL uint16 `order:"big"` // path length + Expiration util.AbsoluteTime `` // expiration date + PeerFilter *blocks.PeerFilter `` // peer bloomfilter + Key *crypto.HashCode `` // query key to block + TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (if TRUNCATED flag set) + PutPath []*path.Entry `size:"PathL"` // PUT path + LastSig *util.PeerSignature `opt:"(IsUsed)"` // signature of last hop (if RECORD_ROUTE flag is set) + Block []byte `size:"*"` // block data } // NewDHTP2PPutMsg creates an empty new DHTP2PPutMsg @@ -149,19 +149,15 @@ func NewDHTP2PPutMsg() *DHTP2PPutMsg { } } -// PESize calculates field sizes based on flags and attributes -func (m *DHTP2PPutMsg) PESize(field string) uint { +// IsUsed returns true if an optional field is used +func (m *DHTP2PPutMsg) IsUsed(field string) bool { switch field { case "Origin": - if m.Flags&enums.DHT_RO_TRUNCATED != 0 { - return util.NewPeerID(nil).Size() - } + return m.Flags&enums.DHT_RO_TRUNCATED != 0 case "LastSig": - if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { - return util.NewPeerSignature(nil).Size() - } + return m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 } - return 0 + return false } //---------------------------------------------------------------------- @@ -171,13 +167,13 @@ func (m *DHTP2PPutMsg) Update(p *path.Path, pf *blocks.PeerFilter, hop uint16) * msg := NewDHTP2PPutMsg() msg.Flags = m.Flags msg.HopCount = hop - msg.PathL = m.PathL + msg.PathL = p.NumList msg.Expiration = m.Expiration msg.PeerFilter = pf msg.Key = m.Key.Clone() - msg.TruncOrigin = m.TruncOrigin - msg.PutPath = util.Clone(m.PutPath) - msg.LastSig = m.LastSig + msg.TruncOrigin = p.TruncOrigin + msg.PutPath = util.Clone(p.List) + msg.LastSig = p.LastSig msg.Block = util.Clone(m.Block) msg.SetPath(p) return msg @@ -199,11 +195,11 @@ func (m *DHTP2PPutMsg) Path(sender *util.PeerID) *path.Path { // handle truncate origin if m.Flags&enums.DHT_RO_TRUNCATED == 1 { - if m.TruncOrigin == nil || len(m.TruncOrigin) == 0 { + if m.TruncOrigin == nil { logger.Printf(logger.WARN, "[path] truncated but no origin - flag reset") m.Flags &^= enums.DHT_RO_TRUNCATED } else { - pth.TruncOrigin = util.NewPeerID(m.TruncOrigin) + pth.TruncOrigin = m.TruncOrigin pth.Flags |= path.PathTruncated } } @@ -213,12 +209,12 @@ func (m *DHTP2PPutMsg) Path(sender *util.PeerID) *path.Path { pth.NumList = uint16(len(pth.List)) // handle last hop signature - if m.LastSig == nil || len(m.LastSig) == 0 { + if m.LastSig == nil { logger.Printf(logger.WARN, "[path] - last hop signature missing - path reset") return path.NewPath(crypto.Hash(m.Block), m.Expiration) } pth.Flags |= path.PathLastHop - pth.LastSig = util.NewPeerSignature(m.LastSig) + pth.LastSig = m.LastSig pth.LastHop = sender return pth } @@ -235,7 +231,13 @@ func (m *DHTP2PPutMsg) SetPath(p *path.Path) { if len(m.PutPath) > 0 { pes = m.PutPath[0].Size() } - oldSize := uint(len(m.PutPath))*pes + m.PESize("Origin") + m.PESize("LastSig") + oldSize := uint(len(m.PutPath)) * pes + if m.TruncOrigin != nil { + oldSize += m.TruncOrigin.Size() + } + if m.LastSig != nil { + oldSize += m.LastSig.Size() + } // if no new path is defined,... if p == nil { // ... remove existing path @@ -254,12 +256,12 @@ func (m *DHTP2PPutMsg) SetPath(p *path.Path) { if p.TruncOrigin != nil { // truncated path m.Flags |= enums.DHT_RO_TRUNCATED - m.TruncOrigin = p.TruncOrigin.Bytes() + m.TruncOrigin = p.TruncOrigin } m.PutPath = util.Clone(p.List) m.PathL = uint16(len(m.PutPath)) if p.LastSig != nil { - m.LastSig = p.LastSig.Bytes() + m.LastSig = p.LastSig } } @@ -283,52 +285,174 @@ func (m *DHTP2PPutMsg) Header() *Header { // DHTP2PResultMsg wire layout type DHTP2PResultMsg struct { - MsgSize uint16 `order:"big"` // total size of message - MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148) - BType uint32 `order:"big"` // Block type of result - Reserved uint32 `order:"big"` // Reserved for further use - PutPathL uint16 `order:"big"` // size of PUTPATH field - GetPathL uint16 `order:"big"` // size of GETPATH field - Expires util.AbsoluteTime `` // expiration date - Query *crypto.HashCode `` // Query key for block - Origin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) - PutPath []*path.Entry `size:"PutPathL"` // PUTPATH - GetPath []*path.Entry `size:"GetPathL"` // GETPATH - LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) - Block []byte `size:"*"` // block data + MsgSize uint16 `order:"big"` // total size of message + MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148) + BType uint32 `order:"big"` // Block type of result + Flags uint32 `order:"big"` // Message flags + PutPathL uint16 `order:"big"` // size of PUTPATH field + GetPathL uint16 `order:"big"` // size of GETPATH field + Expires util.AbsoluteTime `` // expiration date + Query *crypto.HashCode `` // Query key for block + TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (if TRUNCATED flag set) + PathList []*path.Entry `size:"(NumPath)"` // PATH + LastSig *util.PeerSignature `size:"(IsUsed)"` // signature of last hop (if RECORD_ROUTE flag is set) + Block []byte `size:"*"` // block data } // NewDHTP2PResultMsg creates a new empty DHTP2PResultMsg func NewDHTP2PResultMsg() *DHTP2PResultMsg { return &DHTP2PResultMsg{ - MsgSize: 88, // size of empty message - MsgType: DHT_P2P_RESULT, // DHT_P2P_RESULT (148) - BType: uint32(enums.BLOCK_TYPE_ANY), // type of returned block - Origin: nil, // no truncated origin - PutPathL: 0, // empty putpath - PutPath: nil, // -"- - GetPathL: 0, // empty getpath - GetPath: nil, // -"- - LastSig: nil, // no recorded route - Block: nil, // empty block + MsgSize: 88, // size of empty message + MsgType: DHT_P2P_RESULT, // DHT_P2P_RESULT (148) + BType: uint32(enums.BLOCK_TYPE_ANY), // type of returned block + TruncOrigin: nil, // no truncated origin + PutPathL: 0, // empty putpath + GetPathL: 0, // empty getpath + PathList: nil, // empty path list (put+get) + LastSig: nil, // no recorded route + Block: nil, // empty block } } -// PESize calculates field sizes based on flags and attributes -func (m *DHTP2PResultMsg) PESize(field string) uint { +// IsUsed returns if an optional field is present +func (m *DHTP2PResultMsg) IsUsed(field string) bool { switch field { case "Origin": - //if m.Flags&enums.DHT_RO_TRUNCATED != 0 { - return 32 - //} + return m.Flags&enums.DHT_RO_TRUNCATED != 0 case "LastSig": - //if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { - return 64 - //} + return m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 + } + return false +} + +// NumPath returns the total number of entries in path +func (m *DHTP2PResultMsg) NumPath(field string) uint { + return uint(m.GetPathL + m.PutPathL) +} + +//---------------------------------------------------------------------- +// Path handling (get/set path in message) +//---------------------------------------------------------------------- + +// Path returns the current path from message +func (m *DHTP2PResultMsg) Path(sender *util.PeerID) *path.Path { + // create a "real" path list from message data + pth := path.NewPath(crypto.Hash(m.Block), m.Expires) + + // return empty path if recording is switched off + if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 { + return pth + } + // handle truncate origin + if m.Flags&enums.DHT_RO_TRUNCATED == 1 { + if m.TruncOrigin == nil { + logger.Printf(logger.WARN, "[path] truncated but no origin - flag reset") + m.Flags &^= enums.DHT_RO_TRUNCATED + } else { + pth.TruncOrigin = m.TruncOrigin + pth.Flags |= path.PathTruncated + } + } + // copy path elements + pth.List = util.Clone(m.PathList) + pth.NumList = uint16(len(pth.List)) + + // check consistent length values; adjust if mismatched + if m.GetPathL+m.PutPathL != pth.NumList { + logger.Printf(logger.WARN, "[path] Inconsistent PATH length -- adjusting...") + if sp := pth.NumList - m.PutPathL; sp > 0 { + pth.SplitPos = sp + } else { + pth.SplitPos = 0 + } + } else { + pth.SplitPos = pth.NumList - m.PutPathL + } + // handle last hop signature + if m.LastSig == nil { + logger.Printf(logger.WARN, "[path] - last hop signature missing - path reset") + return path.NewPath(crypto.Hash(m.Block), m.Expires) + } + pth.Flags |= path.PathLastHop + pth.LastSig = m.LastSig + pth.LastHop = sender + return pth +} + +// Set path in message; corrects the message size accordingly +func (m *DHTP2PResultMsg) SetPath(p *path.Path) { + + // return if recording is switched off (don't touch path) + if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 { + return + } + // compute old path size + var pes uint + if len(m.PathList) > 0 { + pes = m.PathList[0].Size() + } + oldSize := uint(len(m.PathList)) * pes + if m.TruncOrigin != nil { + oldSize += m.TruncOrigin.Size() + } + if m.LastSig != nil { + oldSize += m.LastSig.Size() + } + // if no new path is defined,... + if p == nil { + // ... remove existing path + m.TruncOrigin = nil + m.PathList = make([]*path.Entry, 0) + m.LastSig = nil + m.GetPathL = 0 + m.PutPathL = 0 + m.Flags &^= enums.DHT_RO_TRUNCATED + m.MsgSize -= uint16(oldSize) + return + } + // adjust message size + m.MsgSize += uint16(p.Size() - oldSize) + + // transfer path data + if p.TruncOrigin != nil { + // truncated path + m.Flags |= enums.DHT_RO_TRUNCATED + m.TruncOrigin = p.TruncOrigin + } + m.PathList = util.Clone(p.List) + m.PutPathL = p.SplitPos + m.GetPathL = p.NumList - p.SplitPos + if p.LastSig != nil { + m.LastSig = p.LastSig + } +} + +//---------------------------------------------------------------------- + +// Update message (forwarding) +func (m *DHTP2PResultMsg) Update(pth *path.Path) *DHTP2PResultMsg { + // clone old message + msg := &DHTP2PResultMsg{ + MsgSize: m.MsgSize, + MsgType: m.MsgType, + BType: m.BType, + Flags: m.Flags, + PutPathL: m.PutPathL, + GetPathL: m.GetPathL, + Expires: m.Expires, + Query: m.Query.Clone(), + TruncOrigin: m.TruncOrigin, + PathList: util.Clone(m.PathList), + LastSig: m.LastSig, + Block: util.Clone(m.Block), } - return 0 + // set new path + msg.SetPath(pth) + return msg } +//---------------------------------------------------------------------- + // String returns a human-readable representation of the message. func (m *DHTP2PResultMsg) String() string { return fmt.Sprintf("DHTP2PResultMsg{btype=%s,putl=%d,getl=%d}", diff --git a/src/gnunet/service/connection.go b/src/gnunet/service/connection.go @@ -138,8 +138,8 @@ func (s *Connection) Receive(ctx context.Context) (message.Message, error) { } // Receiver returns the receiving client (string representation) -func (s *Connection) Receiver() string { - return fmt.Sprintf("uds:%d", s.id) +func (s *Connection) Receiver() *util.PeerID { + return nil } //---------------------------------------------------------------------- diff --git a/src/gnunet/service/dht/blocks/filters.go b/src/gnunet/service/dht/blocks/filters.go @@ -22,6 +22,7 @@ import ( "bytes" "crypto/sha512" "encoding/binary" + "gnunet/crypto" "gnunet/util" "github.com/bfix/gospel/logger" @@ -105,9 +106,12 @@ type ResultFilter interface { // Add entry to filter Add(Block) - // Contains returns true if entry is filtered + // Contains returns true if block is filtered Contains(Block) bool + // ContainsHash returns true if block hash is filtered + ContainsHash(bh *crypto.HashCode) bool + // Bytes returns the binary representation of a result filter Bytes() []byte @@ -140,9 +144,15 @@ func (rf *GenericResultFilter) Add(b Block) { rf.bf.Add(b.Bytes()) } -// Contains returns true if entry (binary representation) is filtered +// Contains returns true if a block is filtered func (rf *GenericResultFilter) Contains(b Block) bool { - return rf.bf.Contains(b.Bytes()) + bh := crypto.Hash(b.Bytes()) + return rf.bf.Contains(bh.Bits) +} + +// ContainsHash returns true if a block hash is filtered +func (rf *GenericResultFilter) ContainsHash(bh *crypto.HashCode) bool { + return rf.bf.Contains(bh.Bits) } // Bytes returns the binary representation of a result filter diff --git a/src/gnunet/service/dht/blocks/handlers.go b/src/gnunet/service/dht/blocks/handlers.go @@ -29,15 +29,22 @@ type BlockHandler interface { // Parse a block instance from binary data ParseBlock(buf []byte) (Block, error) - // ValidateBlockQuery is used to evaluate the request for a block as part of - // DHT-P2P-GET processing. Here, the block payload is unknown, but if possible - // the XQuery and Key SHOULD be verified. + // ValidateBlockQuery is used to evaluate the request for a block as part + // of DHT-P2P-GET processing. Here, the block payload is unknown, but if + // possible the XQuery and Key SHOULD be verified. ValidateBlockQuery(key *crypto.HashCode, xquery []byte) bool // ValidateBlockKey returns true if the block key is the same as the // query key used to access the block. ValidateBlockKey(b Block, key *crypto.HashCode) bool + // DeriveBlockKey is used to synthesize the block key from the block + // payload as part of PutMessage and ResultMessage processing. The special + // return value of 'nil' implies that this block type does not permit + // deriving the key from the block. A Key may be returned for a block that + // is ill-formed. + DeriveBlockKey(b Block) *crypto.HashCode + // ValidateBlockStoreRequest is used to evaluate a block payload as part of // PutMessage and ResultMessage processing. ValidateBlockStoreRequest(b Block) bool diff --git a/src/gnunet/service/dht/blocks/hello.go b/src/gnunet/service/dht/blocks/hello.go @@ -30,6 +30,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/bfix/gospel/crypto/ed25519" "github.com/bfix/gospel/data" @@ -63,8 +64,21 @@ type HelloBlock struct { addrs []*util.Address // cooked address data } +// NewHelloBlock initializes a new HELLO block (unsigned) +func NewHelloBlock(peer *util.PeerID, addrs []*util.Address, ttl time.Duration) *HelloBlock { + hb := new(HelloBlock) + hb.PeerID = peer + // limit expiration to second precision (HELLO-URL compatibility) + hb.Expires = util.NewAbsoluteTimeEpoch(uint64(time.Now().Add(ttl).Unix())) + hb.SetAddresses(addrs) + return hb +} + // SetAddresses adds a bulk of addresses for this HELLO block. func (h *HelloBlock) SetAddresses(a []*util.Address) { + if len(a) == 0 { + return + } h.addrs = util.Clone(a) if err := h.finalize(); err != nil { logger.Printf(logger.ERROR, "[HelloBlock.SetAddresses] failed: %s", err.Error()) @@ -76,10 +90,10 @@ func (h *HelloBlock) Addresses() []*util.Address { return util.Clone(h.addrs) } -// ParseHelloURL parses a HELLO URL of the following form: +// ParseHelloBlockFromURL parses a HELLO URL of the following form: // gnunet://hello/<PeerID>/<signature>/<expire>?<addrs> // The addresses are encoded. -func ParseHelloURL(u string, checkExpiry bool) (h *HelloBlock, err error) { +func ParseHelloBlockFromURL(u string, checkExpiry bool) (h *HelloBlock, err error) { // check and trim prefix if !strings.HasPrefix(u, helloPrefix) { err = fmt.Errorf("invalid HELLO-URL prefix: '%s'", u) @@ -158,8 +172,8 @@ func ParseHelloURL(u string, checkExpiry bool) (h *HelloBlock, err error) { return } -// ParseHelloFromBytes converts a byte array into a HelloBlock instance. -func ParseHelloFromBytes(buf []byte) (h *HelloBlock, err error) { +// ParseHelloBlockFromBytes converts a byte array into a HelloBlock instance. +func ParseHelloBlockFromBytes(buf []byte) (h *HelloBlock, err error) { h = new(HelloBlock) if err = data.Unmarshal(h, buf); err == nil { err = h.finalize() @@ -289,7 +303,7 @@ func (h *HelloBlock) SignedData() []byte { err := binary.Write(buf, binary.BigEndian, size) if err == nil { if err = binary.Write(buf, binary.BigEndian, purpose); err == nil { - if err = binary.Write(buf, binary.BigEndian, h.Expires.Epoch()*1000000); err == nil { + if err = binary.Write(buf, binary.BigEndian, h.Expires /*.Epoch()*1000000*/); err == nil { if n, err = buf.Write(hAddr[:]); err == nil { if n != len(hAddr[:]) { err = errors.New("signed data size mismatch") @@ -313,7 +327,7 @@ type HelloBlockHandler struct{} // Parse a block instance from binary data func (bh *HelloBlockHandler) ParseBlock(buf []byte) (Block, error) { - return ParseHelloFromBytes(buf) + return ParseHelloBlockFromBytes(buf) } // ValidateHelloBlockQuery validates query parameters for a @@ -326,20 +340,49 @@ func (bh *HelloBlockHandler) ValidateBlockQuery(key *crypto.HashCode, xquery []b // ValidateBlockKey returns true if the block key is the same as the // query key used to access the block. func (bh *HelloBlockHandler) ValidateBlockKey(b Block, key *crypto.HashCode) bool { + // check for matching keys + bkey := bh.DeriveBlockKey(b) + if bkey == nil { + logger.Println(logger.WARN, "[HelloHdlr] ValidateBlockKey: not a HELLO block") + return false + } + return key.Equals(bkey) +} + +// DeriveBlockKey is used to synthesize the block key from the block +// payload as part of PutMessage and ResultMessage processing. The special +// return value of 'nil' implies that this block type does not permit +// deriving the key from the block. A Key may be returned for a block that +// is ill-formed. +func (bh *HelloBlockHandler) DeriveBlockKey(b Block) *crypto.HashCode { + // check for correct type hb, ok := b.(*HelloBlock) if !ok { - return false + logger.Println(logger.WARN, "[HelloHdlr] DeriveBlockKey: not a HELLO block") + return nil } // key must be the hash of the peer id - bkey := crypto.Hash(hb.PeerID.Bytes()) - return key.Equals(bkey) + return crypto.Hash(hb.PeerID.Bytes()) } // ValidateBlockStoreRequest is used to evaluate a block payload as part of // PutMessage and ResultMessage processing. +// To validate a block store request is to verify the EdDSA SIGNATURE over +// the hashed ADDRESSES against the public key from the peer ID field. If the +// signature is valid true is returned. func (bh *HelloBlockHandler) ValidateBlockStoreRequest(b Block) bool { - // TODO: verify block payload - return true + // check for correct type + hb, ok := b.(*HelloBlock) + if !ok { + logger.Println(logger.WARN, "[HelloHdlr] ValidateBlockStoreRequest: not a HELLO block") + return false + } + // verify signature + ok, err := hb.Verify() + if err != nil { + ok = false + } + return ok } // SetupResultFilter is used to setup an empty result filter. The arguments @@ -428,6 +471,11 @@ func (rf *HelloResultFilter) Contains(b Block) bool { return false } +// ContainsHash checks if a block hash is contained in the result filter +func (rf *HelloResultFilter) ContainsHash(bh *crypto.HashCode) bool { + return rf.bf.Contains(bh.Bits) +} + // Bytes returns a binary representation of a HELLO result filter func (rf *HelloResultFilter) Bytes() []byte { return rf.bf.Bytes() diff --git a/src/gnunet/service/dht/blocks/hello_test.go b/src/gnunet/service/dht/blocks/hello_test.go @@ -0,0 +1,164 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package blocks + +import ( + "bytes" + "encoding/base64" + "encoding/hex" + "gnunet/util" + "strings" + "testing" + "time" + + "github.com/bfix/gospel/crypto/ed25519" + "github.com/bfix/gospel/data" +) + +var ( + block *HelloBlock + sk *ed25519.PrivateKey +) + +func setup(t *testing.T) { + t.Helper() + + // check for initialized values + if block != nil { + return + } + // generate keys + var pk *ed25519.PublicKey + pk, sk = ed25519.NewKeypair() + peer := util.NewPeerID(pk.Bytes()) + + // set addresses + addrs := []string{ + "ip+udp://172.17.0.6:2086", + "ip+udp://245.23.42.67:2086", + } + addrList := make([]*util.Address, 0) + for _, addr := range addrs { + frag := strings.Split(addr, "://") + e := util.NewAddress(frag[0], frag[1]) + if e == nil { + t.Fatal("invalid address: " + addr) + } + addrList = append(addrList, e) + } + + // create new HELLO block + block = NewHelloBlock(peer, addrList, time.Hour) + + // sign block. + sig, err := sk.EdSign(block.SignedData()) + if err != nil { + t.Fatal(err) + } + block.Signature = util.NewPeerSignature(sig.Bytes()) +} + +func TestHelloVerify(t *testing.T) { + setup(t) + + // verify signature + ok, err := block.Verify() + if err != nil { + t.Fatal(err) + } + if !ok { + t.Fatal("HELLO verify failed") + } +} + +func TestHelloURL(t *testing.T) { + setup(t) + + // create HELLO URL + url := block.URL() + t.Log(url) + + // read back + tblk, err := ParseHelloBlockFromURL(url, true) + if err != nil { + t.Fatal(err) + } + // verify identical blocks + if !bytes.Equal(tblk.Bytes(), block.Bytes()) { + t.Log(hex.EncodeToString(tblk.Bytes())) + t.Log(hex.EncodeToString(block.Bytes())) + t.Fatal("URL readback failed") + } +} + +func TestHelloBytes(t *testing.T) { + setup(t) + + buf := block.Bytes() + tblk, err := ParseHelloBlockFromBytes(buf) + if err != nil { + t.Fatal(err) + } + // verify identical blocks + if !bytes.Equal(tblk.Bytes(), block.Bytes()) { + t.Log(hex.EncodeToString(tblk.Bytes())) + t.Log(hex.EncodeToString(block.Bytes())) + t.Fatal("Bytes readback failed") + } +} + +func TestHelloDebug(t *testing.T) { + blkData := "QKObXJUbnnghRh9McDDjHaB9IIL6MhhEiQHc8VfO3QMABeZZJJhsA" + + "GlwK3VkcDovLzEyNy4wLjAuMToxMDAwMQBpcCt1ZHA6Ly8xNzIuMT" + + "cuMC40OjEwMDAxAGlwK3VkcDovL1s6OmZmZmY6MTcyLjE3LjAuNF06MTAwMDEA" + buf, err := base64.RawStdEncoding.DecodeString(blkData) + if err != nil { + t.Fatal(err) + } + hb, err := ParseHelloBlockFromBytes(buf) + if err != nil { + t.Fatal(err) + } + ok, err := hb.Verify() + if err != nil { + t.Fatal(err) + } + if !ok { + // trace problem + t.Log("Block: " + hex.EncodeToString(buf)) + t.Log("PeerID: " + hb.PeerID.String()) + t.Log(" -> " + hex.EncodeToString(hb.PeerID.Bytes())) + t.Logf("Expire: %d", hb.Expires.Val) + t.Logf(" -> " + hb.Expires.String()) + var exp util.AbsoluteTime + if err = data.Unmarshal(&exp, buf[32:40]); err != nil { + t.Fatal(err) + } + t.Logf(" -> " + exp.String()) + t.Log("AddrBin: " + hex.EncodeToString(hb.AddrBin)) + sd := hb.SignedData() + t.Log("SignedData: " + hex.EncodeToString(sd)) + t.Log("Addresses:") + for _, addr := range hb.Addresses() { + t.Logf("* " + addr.URI()) + } + t.Log("Signature: " + hex.EncodeToString(hb.Signature.Bytes())) + t.Fatal("debug HELLO verify failed") + } +} diff --git a/src/gnunet/service/dht/local.go b/src/gnunet/service/dht/local.go @@ -19,7 +19,6 @@ package dht import ( - "errors" "gnunet/enums" "gnunet/service/dht/blocks" "gnunet/service/store" @@ -28,58 +27,37 @@ import ( "github.com/bfix/gospel/math" ) -// getHelloCache tries to find the requested HELLO block in the HELLO cache -func (m *Module) getHelloCache(label string, addr *PeerAddress, rf blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int) { +// lookupHelloCache tries to find the requested HELLO block in the HELLO cache +func (m *Module) lookupHelloCache(label string, addr *PeerAddress, rf blocks.ResultFilter, approx bool) (results []*store.DHTResult) { logger.Printf(logger.DBG, "[%s] GET message for HELLO: check cache", label) // find best cached HELLO - var block blocks.Block - block, dist = m.rtable.BestHello(addr, rf) - - // if block is filtered, skip it - if block != nil { - if !rf.Contains(block) { - entry = &store.DHTEntry{Blk: block} - } else { - logger.Printf(logger.DBG, "[%s] GET message for HELLO: matching DHT block is filtered", label) - entry = nil - dist = nil - } - } - return + return m.rtable.LookupHello(addr, rf, approx) } // getLocalStorage tries to find the requested block in local storage -func (m *Module) getLocalStorage(label string, query blocks.Query, rf blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int, err error) { +func (m *Module) getLocalStorage(label string, query blocks.Query, rf blocks.ResultFilter) (results []*store.DHTResult, err error) { - // query DHT store for exact match (9.4.3.3c) - if entry, err = m.store.Get(query); err != nil { + // query DHT store for exact matches (9.4.3.3c) + var entries []*store.DHTEntry + if entries, err = m.store.Get(label, query, rf); err != nil { logger.Printf(logger.ERROR, "[%s] Failed to get DHT block from storage: %s", label, err.Error()) return } - if entry != nil { - dist = math.ZERO - // check if we are filtered out - if rf.Contains(entry.Blk) { - logger.Printf(logger.DBG, "[%s] matching DHT block is filtered", label) - entry = nil - dist = nil + for _, entry := range entries { + // add entry to result list + result := &store.DHTResult{ + Entry: entry, + Dist: math.ZERO, } + results = append(results, result) + // add to result filter + rf.Add(entry.Blk) } // if we have no exact match, find approximate block if requested - if entry == nil || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE != 0 { + if len(results) == 0 || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE != 0 { // no exact match: find approximate (9.4.3.3b) - match := func(e *store.DHTEntry) bool { - return rf.Contains(e.Blk) - } - var d any - entry, d, err = m.store.GetApprox(query, match) - var ok bool - dist, ok = d.(*math.Int) - if !ok { - err = errors.New("no approx distance") - } - if err != nil { - logger.Printf(logger.ERROR, "[%s] Failed to get (approx.) DHT block from storage: %s", label, err.Error()) + if results, err = m.store.GetApprox(label, query, rf); err != nil { + logger.Printf(logger.ERROR, "[%s] Failed to get (approx.) DHT blocks from storage: %s", label, err.Error()) } } return diff --git a/src/gnunet/service/dht/messages.go b/src/gnunet/service/dht/messages.go @@ -20,6 +20,7 @@ package dht import ( "context" + "gnunet/crypto" "gnunet/enums" "gnunet/message" "gnunet/service/dht/blocks" @@ -29,7 +30,6 @@ import ( "gnunet/util" "github.com/bfix/gospel/logger" - "github.com/bfix/gospel/math" ) //---------------------------------------------------------------------- @@ -38,6 +38,7 @@ import ( // HandleMessage handles a DHT request/response message. Responses are sent // to the specified responder. +//nolint:gocyclo // life sometimes is complex... func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn message.Message, back transport.Responder) bool { // assemble log label label := "dht" @@ -58,15 +59,18 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m // process message switch msg := msgIn.(type) { + //================================================================== + // DHT-P2P-GET + //================================================================== case *message.DHTP2PGetMsg: //-------------------------------------------------------------- // DHT-P2P GET //-------------------------------------------------------------- logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message", label) - query := blocks.NewGenericQuery(msg.Query, enums.BlockType(msg.BType), msg.Flags) - var entry *store.DHTEntry - var dist *math.Int + // assemble query and initialize (cache) results + query := blocks.NewGenericQuery(msg.Query, enums.BlockType(msg.BType), msg.Flags) + var results []*store.DHTResult //-------------------------------------------------------------- // validate query (based on block type requested) (9.4.3.1) @@ -113,7 +117,8 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0) demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0 approx := int(msg.Flags)&enums.DHT_RO_FIND_APPROXIMATE != 0 - // actions + + // enforced actions doResult := closest || (demux && approx) doForward := !closest || (demux && !approx) logger.Printf(logger.DBG, "[%s] GET message: closest=%v, demux=%v, approx=%v --> result=%v, forward=%v", @@ -122,53 +127,43 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m //------------------------------------------------------ // query for a HELLO? (9.4.3.3a) if btype == enums.BLOCK_TYPE_DHT_URL_HELLO { - // try to find result in HELLO cache - entry, dist = m.getHelloCache(label, addr, rf) + // try to find results in HELLO cache + results = m.lookupHelloCache(label, addr, rf, approx) } + //-------------------------------------------------------------- - // find the closest block that has that is not filtered by the result - // filter (in case we did not find an appropriate block in cache). + // query flags demand a result if doResult { - // save best-match values from cache - entryCache := entry - distCache := dist - dist = nil - - // if we don't have an exact match, try storage lookup - if entryCache == nil || (distCache != nil && !distCache.Equals(math.ZERO)) { - // get entry from local storage - var err error - if entry, dist, err = m.getLocalStorage(label, query, rf); err != nil { - entry = nil - dist = nil - } - // if we have a block from cache, check if it is better than the - // block found in the DHT - if entryCache != nil && dist != nil && distCache.Cmp(dist) < 0 { - entry = entryCache - dist = distCache + // if we don't have a result from cache or are in approx mode, + // try storage lookup + if len(results) == 0 || approx { + // get results from local storage + lclResults, err := m.getLocalStorage(label, query, rf) + if err == nil { + // append local results + results = append(results, lclResults...) } } - // if we have a block, send it as response - if entry != nil { + // if we have results, send them as response + for _, result := range results { + var pth *path.Path + // check if record the route + if msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 && result.Entry.Path != nil { + // update get path + pth = result.Entry.Path.Clone() + pth.SplitPos = pth.NumList + pe := pth.NewElement(pth.LastHop, local, back.Receiver()) + pth.Add(pe) + } + logger.Printf(logger.INFO, "[%s] sending DHT result message to caller", label) - if err := m.sendResult(ctx, query, entry.Blk, back); err != nil { + if err := m.sendResult(ctx, query, result.Entry.Blk, pth, back); err != nil { logger.Printf(logger.ERROR, "[%s] Failed to send DHT result message: %s", label, err.Error()) } } } - // check if we need to forward message based on filter result - if entry != nil && blockHdlr != nil { - switch blockHdlr.FilterResult(entry.Blk, query.Key(), rf, msg.XQuery) { - case blocks.RF_LAST: - // no need for further results - case blocks.RF_MORE: - // possibly more results - doForward = true - case blocks.RF_DUPLICATE, blocks.RF_IRRELEVANT: - // do not forward - } - } + //-------------------------------------------------------------- + // query flags demand a result if doForward { // build updated GET message pf.Add(local) @@ -195,6 +190,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m } logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message done", label) + //================================================================== + // DHT-P2P-PUT + //================================================================== case *message.DHTP2PPutMsg: //---------------------------------------------------------- // DHT-P2P PUT @@ -267,29 +265,29 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m logger.Printf(logger.ERROR, "[%s] failed to store DHT entry: %s", label, err.Error()) } } - //-------------------------------------------------------------- // if the put is for a HELLO block, add the sender to the // routing table (9.3.2.9) if btype == enums.BLOCK_TYPE_DHT_HELLO { // get addresses from HELLO block - hello, err := blocks.ParseHelloFromBytes(msg.Block) + hello, err := blocks.ParseHelloBlockFromBytes(msg.Block) if err != nil { logger.Printf(logger.ERROR, "[%s] failed to parse HELLO block: %s", label, err.Error()) } else { // check state of bucket for given address - if m.rtable.Check(NewPeerAddress(sender)) == 0 { + if m.rtable.Check(NewPeerAddress(hello.PeerID)) == 0 { // we could add the sender to the routing table for _, addr := range hello.Addresses() { if transport.CanHandleAddress(addr) { // try to connect to peer (triggers EV_CONNECTED on success) - m.core.TryConnect(sender, addr) + if err := m.core.TryConnect(sender, addr); err != nil { + logger.Printf(logger.ERROR, "[%s] try-connection to %s failed: %s", label, addr.URI(), err.Error()) + } } } } } } - //-------------------------------------------------------------- // check if we need to forward if !closest || demux { @@ -325,24 +323,100 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m } logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message done", label) + //================================================================== + // DHT-P2P-RESULT + //================================================================== case *message.DHTP2PResultMsg: //---------------------------------------------------------- // DHT-P2P RESULT //---------------------------------------------------------- - logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message", label) + logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message for type %s", + label, enums.BlockType(msg.BType).String()) - // check task list for handler + //-------------------------------------------------------------- + // check if request is expired (9.5.2.1) + if msg.Expires.Expired() { + logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT message expired (%s)", + label, msg.Expires.String()) + return false + } + //-------------------------------------------------------------- + btype := enums.BlockType(msg.BType) + var blkKey *crypto.HashCode + blockHdlr, ok := blocks.BlockHandlers[btype] + if ok { + // reconstruct block instance + if block, err := blockHdlr.ParseBlock(msg.Block); err == nil { + // validate block (9.5.2.2) + if !blockHdlr.ValidateBlockStoreRequest(block) { + logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT invalid block -- discarded", label) + return false + } + // Compute block key (9.5.2.4) + blkKey = blockHdlr.DeriveBlockKey(block) + } + } else { + logger.Printf(logger.INFO, "[%s] No validator defined for block type %s", label, btype.String()) + blockHdlr = nil + } + //-------------------------------------------------------------- + // verify path (9.5.2.3) + var pth *path.Path + if msg.GetPathL+msg.PutPathL > 0 { + pth = msg.Path(sender) + pth.Verify(local) + } + //-------------------------------------------------------------- + // if the put is for a HELLO block, add the originator to the + // routing table (9.5.2.5) + if btype == enums.BLOCK_TYPE_DHT_HELLO { + // get addresses from HELLO block + hello, err := blocks.ParseHelloBlockFromBytes(msg.Block) + if err != nil { + logger.Printf(logger.ERROR, "[%s] failed to parse HELLO block: %s", label, err.Error()) + } else { + // check state of bucket for given address + if m.rtable.Check(NewPeerAddress(hello.PeerID)) == 0 { + // we could add the originator to the routing table + for _, addr := range hello.Addresses() { + if transport.CanHandleAddress(addr) { + // try to connect to peer (triggers EV_CONNECTED on success) + if err := m.core.TryConnect(sender, addr); err != nil { + logger.Printf(logger.ERROR, "[%s] try-connection to %s failed: %s", label, addr.URI(), err.Error()) + } + } + } + } + } + } + // message forwarding to responder key := msg.Query.String() logger.Printf(logger.DBG, "[%s] DHT-P2P-RESULT key = %s", label, key) handled := false if list, ok := m.reshdlrs.Get(key); ok { for _, rh := range list { logger.Printf(logger.DBG, "[%s] Task #%d for DHT-P2P-RESULT found", label, rh.ID()) - // handle the message - go rh.Handle(ctx, msg) + + //-------------------------------------------------------------- + // check task list for handler (9.5.2.6) + if rh.Flags()&enums.DHT_RO_FIND_APPROXIMATE == 0 && blkKey != nil && !blkKey.Equals(rh.Key()) { + // (9.5.2.6.a) derived key mismatch + logger.Printf(logger.ERROR, "[%s] derived block key / query key mismatch:", label) + logger.Printf(logger.ERROR, "[%s] --> %s != %s", label, blkKey.String(), rh.Key().String()) + return false + } + // (9.5.2.6.b+c) check block against query + /* + if blockHdlr != nil { + blockHdlr.FilterBlockResult(block, rh.Key()) + } + */ + + //-------------------------------------------------------------- + // handle the message (forwarding) + go rh.Handle(ctx, msg, pth, sender, local) handled = true } - return true } if !handled { logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT not processed (no handler)", label) @@ -350,6 +424,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message done", label) return handled + //================================================================== + // DHT-P2P-HELLO + //================================================================== case *message.DHTP2PHelloMsg: //---------------------------------------------------------- // DHT-P2P HELLO @@ -379,8 +456,8 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m logger.Printf(logger.INFO, "[%s] Sending HELLO to %s: %s", label, sender, msgOut) err = m.core.Send(ctx, sender, msgOut) // no error if the message might have been sent - if err == transport.ErrEndpMaybeSent { - err = nil + if err != nil && err != transport.ErrEndpMaybeSent { + logger.Printf(logger.ERROR, "[%s] Failed to send HELLO message: %s", label, err.Error()) } } @@ -401,9 +478,9 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m }) } - //-------------------------------------------------------------- + //================================================================== // Legacy message types (not implemented) - //-------------------------------------------------------------- + //================================================================== case *message.DHTClientPutMsg: //---------------------------------------------------------- @@ -446,7 +523,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m } // send a result back to caller -func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk blocks.Block, back transport.Responder) error { +func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk blocks.Block, pth *path.Path, back transport.Responder) error { // assemble result message out := message.NewDHTP2PResultMsg() out.BType = uint32(query.Type()) diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go @@ -24,6 +24,7 @@ import ( "gnunet/config" "gnunet/core" "gnunet/crypto" + "gnunet/enums" "gnunet/message" "gnunet/service" "gnunet/service/dht/blocks" @@ -33,7 +34,6 @@ import ( "time" "github.com/bfix/gospel/logger" - "github.com/bfix/gospel/math" ) //====================================================================== @@ -41,22 +41,51 @@ import ( //====================================================================== //---------------------------------------------------------------------- -// Responder for local message handling +// Responder for local message handling (API, not message-based) //---------------------------------------------------------------------- -type LocalResponder struct { - ch chan blocks.Block // out-going channel for incoming blocks +// LocalBlockResponder is a message handler used to handle results for +// locally initiated GET calls +type LocalBlockResponder struct { + ch chan blocks.Block // out-going channel for incoming block results + rf blocks.ResultFilter // filter out duplicates } -func (lr *LocalResponder) Send(ctx context.Context, msg message.Message) error { +// NewLocalBlockResponder returns a new instance +func NewLocalBlockResponder() *LocalBlockResponder { + return &LocalBlockResponder{ + ch: make(chan blocks.Block), + rf: blocks.NewGenericResultFilter(), + } +} + +// C returns the back-channel +func (lr *LocalBlockResponder) C() <-chan blocks.Block { + return lr.ch +} + +// Send interface method: dissect message and relay block if appropriate +func (lr *LocalBlockResponder) Send(ctx context.Context, msg message.Message) error { + // check if incoming message is a DHT-RESULT + switch res := msg.(type) { + case *message.DHTP2PResultMsg: + // deliver incoming blocks + go func() { + lr.ch <- blocks.NewGenericBlock(res.Block) + }() + default: + logger.Println(logger.WARN, "[local] not a DHT-RESULT -- skipped") + } return nil } -func (lr *LocalResponder) Receiver() string { - return "@" +// Receiver is nil for local responders. +func (lr *LocalBlockResponder) Receiver() *util.PeerID { + return nil } -func (lr *LocalResponder) Close() { +// Close back-channel +func (lr *LocalBlockResponder) Close() { close(lr.ch) } @@ -68,8 +97,9 @@ func (lr *LocalResponder) Close() { type Module struct { service.ModuleImpl - store store.DHTStore // reference to the block storage mechanism - core *core.Core // reference to core services + cfg *config.DHTConfig // configuraion parameters + store *store.DHTStore // reference to the block storage mechanism + core *core.Core // reference to core services rtable *RoutingTable // routing table lastHello *message.DHTP2PHelloMsg // last own HELLO message used; re-create if expired @@ -80,7 +110,7 @@ type Module struct { // mechanism for persistence. func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Module, err error) { // create permanent storage handler - var storage store.DHTStore + var storage *store.DHTStore if storage, err = store.NewDHTStore(cfg.Storage); err != nil { return } @@ -90,6 +120,7 @@ func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Mod // return module instance m = &Module{ ModuleImpl: *service.NewModuleImpl(), + cfg: cfg, store: storage, core: c, rtable: rt, @@ -99,18 +130,58 @@ func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Mod pulse := time.Duration(cfg.Heartbeat) * time.Second listener := m.Run(ctx, m.event, m.Filter(), pulse, m.heartbeat) c.Register("dht", listener) + + // run periodic tasks (8.2. peer discovery) + ticker := time.NewTicker(5 * time.Minute) + key := crypto.Hash(m.core.PeerID().Data) + flags := uint16(enums.DHT_RO_FIND_APPROXIMATE | enums.DHT_RO_DEMULTIPLEX_EVERYWHERE) + var resCh <-chan blocks.Block + go func() { + for { + select { + // initiate peer discovery + case <-ticker.C: + // query DHT for our own HELLO block + query := blocks.NewGenericQuery(key, enums.BLOCK_TYPE_DHT_HELLO, flags) + resCh = m.Get(ctx, query) + + // handle peer discover results + case res := <-resCh: + // check for correct type + btype := res.Type() + if btype == enums.BLOCK_TYPE_DHT_HELLO { + hb, ok := res.(*blocks.HelloBlock) + if !ok { + logger.Printf(logger.WARN, "[dht] peer discovery received invalid block data") + } else { + // cache HELLO block + m.rtable.CacheHello(hb) + // add sender to routing table + m.rtable.Add(NewPeerAddress(hb.PeerID), "dht") + } + } else { + logger.Printf(logger.WARN, "[dht] peer discovery received invalid block type %s", btype.String()) + } + + // termination + case <-ctx.Done(): + ticker.Stop() + return + } + } + }() return } //---------------------------------------------------------------------- -// DHT methods for local use +// DHT methods for local use (API) //---------------------------------------------------------------------- // Get blocks from the DHT ["dht:get"] // Locally request blocks for a given query. The res channel will deliver the // returned results to the caller; the channel is closed if no further blocks // are expected or the query times out. -func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.Block) { +func (m *Module) Get(ctx context.Context, query blocks.Query) <-chan blocks.Block { // get the block handler for given block type to construct an empty // result filter. If no handler is defined, a default PassResultFilter // is created. @@ -123,14 +194,14 @@ func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.B logger.Println(logger.WARN, "[dht] unknown result filter implementation -- skipped") } // get additional query parameters - xquery, ok := util.GetParam[[]byte](query.Params(), "xquery") + xquery, _ := util.GetParam[[]byte](query.Params(), "xquery") // assemble a new GET message msg := message.NewDHTP2PGetMsg() msg.BType = uint32(query.Type()) msg.Flags = query.Flags() msg.HopCount = 0 - msg.ReplLevel = 10 + msg.ReplLevel = uint16(m.cfg.Routing.ReplLevel) msg.PeerFilter = blocks.NewPeerFilter() msg.ResFilter = rf.Bytes() msg.RfSize = uint16(len(msg.ResFilter)) @@ -138,15 +209,13 @@ func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.B msg.MsgSize += msg.RfSize + uint16(len(xquery)) // compose a response channel and handler - res = make(chan blocks.Block) - hdlr := &LocalResponder{ - ch: res, - } + hdlr := NewLocalBlockResponder() + // time-out handling ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout") if !ok { // defaults to 10 minutes - ttl = 600 * time.Second + ttl = 10 * time.Minute } lctx, cancel := context.WithTimeout(ctx, ttl) @@ -159,23 +228,12 @@ func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.B hdlr.Close() cancel() }() - return res + return hdlr.C() } // GetApprox returns the first block not excluded ["dht:getapprox"] -func (m *Module) GetApprox(ctx context.Context, query blocks.Query, excl func(*store.DHTEntry) bool) (entry *store.DHTEntry, dist *math.Int, err error) { - var val any - if entry, val, err = m.store.GetApprox(query, excl); err != nil { - return - } - hc, ok := val.(*crypto.HashCode) - if !ok { - err = errors.New("no approx result") - } - asked := NewQueryAddress(query.Key()) - found := NewQueryAddress(hc) - dist, _ = found.Distance(asked) - return +func (m *Module) GetApprox(ctx context.Context, query blocks.Query, rf blocks.ResultFilter) (results []*store.DHTResult, err error) { + return m.store.GetApprox("dht", query, rf) } // Put a block into the DHT ["dht:put"] @@ -191,7 +249,7 @@ func (m *Module) Put(ctx context.Context, query blocks.Query, block blocks.Block msg.Flags = query.Flags() msg.HopCount = 0 msg.PeerFilter = blocks.NewPeerFilter() - msg.ReplLvl = 10 + msg.ReplLvl = uint16(m.cfg.Routing.ReplLevel) msg.Expiration = expire msg.Block = block.Bytes() msg.Key = query.Key().Clone() diff --git a/src/gnunet/service/dht/path/elements.go b/src/gnunet/service/dht/path/elements.go @@ -38,8 +38,8 @@ var ( //---------------------------------------------------------------------- // Entry is an element of the path list type Entry struct { - Signer *util.PeerID // path element signer Signature *util.PeerSignature // path element signature + Signer *util.PeerID // path element signer } // Size returns the size of a path element in wire format diff --git a/src/gnunet/service/dht/path/handling.go b/src/gnunet/service/dht/path/handling.go @@ -35,8 +35,8 @@ import ( // path flags const ( - PathTruncated = iota - PathLastHop + PathTruncated = 1 + PathLastHop = 2 ) // Path is the complete list of verified hops a message travelled. @@ -48,6 +48,7 @@ type Path struct { Expire util.AbsoluteTime `` // expiration time TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (optional) NumList uint16 `order:"big"` // number of list entries + SplitPos uint16 `order:"big"` // optional split position List []*Entry `size:"NumList"` // list of path entries LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id @@ -72,6 +73,7 @@ func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime) *Path { Expire: expire, TruncOrigin: nil, NumList: 0, + SplitPos: 0, List: make([]*Entry, 0), LastSig: nil, LastHop: nil, @@ -81,7 +83,7 @@ func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime) *Path { // NewPathFromBytes reconstructs a path instance from binary data. The layout // of the data must match with the layout used in Path.Bytes(). func NewPathFromBytes(buf []byte) (path *Path, err error) { - if buf == nil || len(buf) == 0 { + if len(buf) == 0 { return } path = new(Path) @@ -116,6 +118,7 @@ func (p *Path) Clone() *Path { Expire: p.Expire, TruncOrigin: p.TruncOrigin, NumList: p.NumList, + SplitPos: p.SplitPos, List: util.Clone(p.List), LastSig: p.LastSig, LastHop: p.LastHop, diff --git a/src/gnunet/service/dht/resulthandler.go b/src/gnunet/service/dht/resulthandler.go @@ -22,8 +22,10 @@ import ( "bytes" "context" "gnunet/crypto" + "gnunet/enums" "gnunet/message" "gnunet/service/dht/blocks" + "gnunet/service/dht/path" "gnunet/transport" "gnunet/util" "time" @@ -52,7 +54,10 @@ type ResultHandler interface { Done() bool // Key returns the query/store key as string - Key() string + Key() *crypto.HashCode + + // Flags returns the query flags + Flags() uint16 // Compare two result handlers Compare(ResultHandler) int @@ -61,7 +66,7 @@ type ResultHandler interface { Merge(ResultHandler) bool // Handle result message - Handle(context.Context, *message.DHTP2PResultMsg) bool + Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool } // Compare return values @@ -108,8 +113,13 @@ func (t *GenericResultHandler) ID() int { } // Key returns the key string -func (t *GenericResultHandler) Key() string { - return t.key.String() +func (t *GenericResultHandler) Key() *crypto.HashCode { + return t.key +} + +// Flags returns the query flags +func (t *GenericResultHandler) Flags() uint16 { + return t.flags } // Done returns true if the result handler is no longer active. @@ -176,15 +186,28 @@ func NewForwardResultHandler(msgIn message.Message, rf blocks.ResultFilter, back } // Handle incoming DHT-P2P-RESULT message -func (t *ForwardResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg) bool { +func (t *ForwardResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool { // don't send result if it is filtered out if !t.Proceed(ctx, msg) { logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) return false } + // extend path if route is recorded + pp := pth.Clone() + if msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { + // yes: add path element for remote receivers + if rcv := t.resp.Receiver(); rcv != nil { + pe := pp.NewElement(sender, local, rcv) + pp.Add(pe) + } + } + + // build updated PUT message + msgOut := msg.Update(pp) + // send result message back to originator (result forwarding). logger.Printf(logger.INFO, "[dht-task-%d] sending result back to originator", t.id) - if err := t.resp.Send(ctx, msg); err != nil && err != transport.ErrEndpMaybeSent { + if err := t.resp.Send(ctx, msgOut); err != nil && err != transport.ErrEndpMaybeSent { logger.Printf(logger.ERROR, "[dht-task-%d] sending result back to originator failed: %s", t.id, err.Error()) return false } @@ -200,7 +223,7 @@ func (t *ForwardResultHandler) Compare(h ResultHandler) int { return RHC_DIFFER } // check for same recipient - if ht.resp.Receiver() != t.resp.Receiver() { + if ht.resp.Receiver().Equals(t.resp.Receiver()) { logger.Printf(logger.DBG, "[frh] recipients differ: %s -- %s", ht.resp.Receiver(), t.resp.Receiver()) return RHC_DIFFER } @@ -237,7 +260,7 @@ func (t *ForwardResultHandler) Merge(h ResultHandler) bool { //---------------------------------------------------------------------- // ResultHandlerFcn is the function prototype for custom handlers: -type ResultHandlerFcn func(context.Context, *message.DHTP2PResultMsg, chan<- any) bool +type ResultHandlerFcn func(context.Context, *message.DHTP2PResultMsg, *path.Path, chan<- any) bool // DirectResultHandler for local DHT-P2P-GET requests type DirectResultHandler struct { @@ -262,7 +285,7 @@ func NewDirectResultHandler(msgIn message.Message, rf blocks.ResultFilter, hdlr } // Handle incoming DHT-P2P-RESULT message -func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg) bool { +func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg, pth *path.Path, sender, local *util.PeerID) bool { // don't send result if it is filtered out if !t.Proceed(ctx, msg) { logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) @@ -271,7 +294,7 @@ func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PRes // check for correct message type and handler function if t.hdlr != nil { logger.Printf(logger.INFO, "[dht-task-%d] handling result message", t.id) - return t.hdlr(ctx, msg, t.rc) + return t.hdlr(ctx, msg, pth, t.rc) } return false } @@ -318,7 +341,7 @@ func NewResultHandlerList() *ResultHandlerList { // Add handler to list func (t *ResultHandlerList) Add(hdlr ResultHandler) bool { // get current list of handlers for key - key := hdlr.Key() + key := hdlr.Key().String() list, ok := t.list.Get(key, 0) modified := false if !ok { diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go @@ -25,6 +25,7 @@ import ( "gnunet/config" "gnunet/crypto" "gnunet/service/dht/blocks" + "gnunet/service/store" "gnunet/util" "sync" "time" @@ -84,11 +85,7 @@ func (addr *PeerAddress) Equals(p *PeerAddress) bool { // Distance between two addresses: returns a distance value and a // bucket index (smaller index = less distant). func (addr *PeerAddress) Distance(p *PeerAddress) (*math.Int, int) { - d := make([]byte, 64) - for i := range d { - d[i] = addr.Key.Bits[i] ^ p.Key.Bits[i] - } - r := math.NewIntFromBytes(d) + r := util.Distance(addr.Key.Bits, p.Key.Bits) return r, 512 - r.BitLen() } @@ -371,18 +368,29 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) { //---------------------------------------------------------------------- -func (rt *RoutingTable) BestHello(addr *PeerAddress, rf blocks.ResultFilter) (hb *blocks.HelloBlock, dist *math.Int) { - // iterate over cached HELLOs to find (best) match first - _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock, _ int) error { +// LookupHello returns blocks from the HELLO cache for given query. +func (rt *RoutingTable) LookupHello(addr *PeerAddress, rf blocks.ResultFilter, approx bool) (results []*store.DHTResult) { + // iterate over cached HELLOs to find matches; + // approximate search is limited by distance (max. diff for bucket index is 16) + _ = rt.helloCache.ProcessRange(func(key string, hb *blocks.HelloBlock, _ int) error { // check if block is excluded by result filter - if !rf.Contains(val) { - // check for better match - p := NewPeerAddress(val.PeerID) - d, _ := addr.Distance(p) - if hb == nil || d.Cmp(dist) < 0 { - hb = val - dist = d + var result *store.DHTResult + if !rf.Contains(hb) { + // no: possible result, compute distance + p := NewPeerAddress(hb.PeerID) + dist, idx := addr.Distance(p) + result = &store.DHTResult{ + Entry: &store.DHTEntry{ + Blk: hb, + }, + Dist: dist, + } + // check if we need to add result + if (approx && idx < 16) || idx == 0 { + results = append(results, result) } + } else { + logger.Printf(logger.DBG, "[RT] GET-HELLO: cache block is filtered") } return nil }, true) diff --git a/src/gnunet/service/dht/routingtable_test.go b/src/gnunet/service/dht/routingtable_test.go @@ -70,7 +70,7 @@ func TestRT(t *testing.T) { // helper functions genRemotePeer := func() *PeerAddress { d := make([]byte, 32) - _, _ = rand.Read(d) + _, _ = rand.Read(d) //nolint:gosec // good enough for testing return NewPeerAddress(util.NewPeerID(d)) } @@ -86,10 +86,10 @@ func TestRT(t *testing.T) { for i := range tasks { tasks[i] = new(Entry) tasks[i].addr = genRemotePeer() - tasks[i].born = rand.Int63n(EPOCHS) - tasks[i].ttl = 1000 + rand.Int63n(7000) - tasks[i].drop = 2000 + rand.Int63n(3000) - tasks[i].revive = rand.Int63n(2000) + tasks[i].born = rand.Int63n(EPOCHS) //nolint:gosec // good enough for testing + tasks[i].ttl = 1000 + rand.Int63n(7000) //nolint:gosec // good enough for testing + tasks[i].drop = 2000 + rand.Int63n(3000) //nolint:gosec // good enough for testing + tasks[i].revive = rand.Int63n(2000) //nolint:gosec // good enough for testing tasks[i].online = false } diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go @@ -200,6 +200,7 @@ func (m *Module) ResolveAbsolute( // ResolveRelative resolves a relative path (to a given zone) recursively by // processing simple (PKEY,Label) lookups in sequence and handle intermediate // GNS record types +//nolint:gocyclo // life sometimes is complex... func (m *Module) ResolveRelative( ctx context.Context, labels []string, diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go @@ -20,6 +20,7 @@ package namecache import ( "context" + "errors" "gnunet/config" "gnunet/core" "gnunet/service" @@ -39,7 +40,7 @@ import ( type Module struct { service.ModuleImpl - cache store.DHTStore // transient block cache + cache *store.DHTStore // transient block cache } // NewModule creates a new module instance. @@ -67,13 +68,18 @@ func (m *Module) Import(fcm map[string]any) { //---------------------------------------------------------------------- -// Get an entry from the cache if available. +// Get entry from the cache if available. func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { - var e *store.DHTEntry - if e, err = m.cache.Get(query); err != nil { + var e []*store.DHTEntry + rf := blocks.NewGenericResultFilter() + if e, err = m.cache.Get("namecache", query, rf); err != nil { return } - err = blocks.Unwrap(e.Blk, block) + if len(e) != 1 { + err = errors.New("only one DHT entry exppected") + } else { + err = blocks.Unwrap(e[0].Blk, block) + } return } diff --git a/src/gnunet/service/store/dhtstore_test.go b/src/gnunet/service/store/dhtstore_test.go @@ -1,109 +0,0 @@ -// This file is part of gnunet-go, a GNUnet-implementation in Golang. -// Copyright (C) 2019-2022 Bernd Fix >Y< -// -// gnunet-go is free software: you can redistribute it and/or modify it -// under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// gnunet-go is distributed in the hope that it will be useful, but -// WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. -// -// SPDX-License-Identifier: AGPL3.0-or-later - -package store - -import ( - "encoding/hex" - "gnunet/crypto" - "gnunet/enums" - "gnunet/service/dht/blocks" - "gnunet/util" - "math/rand" - "os" - "testing" -) - -// test constants -const ( - fsNumBlocks = 10 -) - -// TestDHTFileStore generates 'fsNumBlocks' fully-random blocks -// and stores them under their SHA512 key. It than retrieves -// each block from storage and checks for matching hash. -func TestDHTFilesStore(t *testing.T) { - // test configuration - path := "/tmp/dht-store" - defer func() { - os.RemoveAll(path) - }() - - cfg := make(util.ParameterSet) - cfg["mode"] = "file" - cfg["cache"] = false - cfg["path"] = path - cfg["maxGB"] = 10 - - // create file store - if _, err := os.Stat(path); err != nil { - os.MkdirAll(path, 0755) - } - fs, err := NewFileStore(cfg) - if err != nil { - t.Fatal(err) - } - // allocate keys - keys := make([]blocks.Query, 0, fsNumBlocks) - - // First round: save blocks - for i := 0; i < fsNumBlocks; i++ { - // generate random block - size := 1024 + rand.Intn(62000) - buf := make([]byte, size) - rand.Read(buf) - blk := blocks.NewGenericBlock(buf) - // generate associated key - k := crypto.Hash(buf) - key := blocks.NewGenericQuery(k, enums.BLOCK_TYPE_ANY, 0) - - // store entry - val := &DHTEntry{ - Blk: blk, - } - if err := fs.Put(key, val); err != nil { - t.Fatalf("[%d] %s", i, err) - } - // remember key - keys = append(keys, key) - } - - // Second round: retrieve blocks and check - for i, key := range keys { - // get block - val, err := fs.Get(key) - if err != nil { - t.Fatalf("[%d] %s", i, err) - } - buf := val.Blk.Bytes() - - // re-create key - k := crypto.Hash(buf) - - // do the keys match? - if !k.Equals(key.Key()) { - t.Log(hex.EncodeToString(k.Bits)) - t.Log(hex.EncodeToString(key.Key().Bits)) - t.Fatal("key/value mismatch") - } - } -} - -func TestDHTEntryStore(t *testing.T) { - // pth, sender, local := path.GenerateTestPath(10) -} diff --git a/src/gnunet/service/store/store.go b/src/gnunet/service/store/store.go @@ -1,285 +0,0 @@ -// This file is part of gnunet-go, a GNUnet-implementation in Golang. -// Copyright (C) 2019-2022 Bernd Fix >Y< -// -// gnunet-go is free software: you can redistribute it and/or modify it -// under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// gnunet-go is distributed in the hope that it will be useful, but -// WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. -// -// SPDX-License-Identifier: AGPL3.0-or-later - -package store - -import ( - "context" - "database/sql" - _ "embed" // use embedded filesystem - "errors" - "fmt" - "gnunet/service/dht/blocks" - "gnunet/service/dht/path" - "gnunet/util" - - redis "github.com/go-redis/redis/v8" -) - -// Error messages related to the key/value-store implementations -var ( - ErrStoreInvalidSpec = fmt.Errorf("invalid Store specification") - ErrStoreUnknown = fmt.Errorf("unknown Store type") - ErrStoreNotAvailable = fmt.Errorf("store not available") - ErrStoreNoApprox = fmt.Errorf("no approx search for store defined") - ErrStoreNoList = fmt.Errorf("no key listing for store defined") -) - -//------------------------------------------------------------ -// Generic storage interface. Can be used for persistent or -// transient (caching) storage of key/value data. -//------------------------------------------------------------ - -// Store is a key/value storage where the type of the key is either -// a SHA512 hash value or a string and the value is either a DHT -// block or a string. It is possiblle to mix any key/value types, -// but not used in this implementation. -type Store[K, V any] interface { - // Put value into storage under given key - Put(key K, val V) error - - // Get value with given key from storage - Get(key K) (V, error) - - // GetApprox returns the best-matching value with given key from storage - // that is not excluded. - GetApprox(key K, excl func(V) bool) (V, any, error) - - // List all store keys - List() ([]K, error) - - // Close store - Close() error -} - -//------------------------------------------------------------ -// Types for custom store requirements -//------------------------------------------------------------ - -// DHTEntry to be stored/retrieved -type DHTEntry struct { - Blk blocks.Block - Path *path.Path -} - -// DHTStore for DHT queries and blocks -type DHTStore Store[blocks.Query, *DHTEntry] - -// KVStore for key/value string pairs -type KVStore Store[string, string] - -//------------------------------------------------------------ -// NewDHTStore creates a new storage handler with given spec -// for use with DHT queries and blocks -func NewDHTStore(spec util.ParameterSet) (DHTStore, error) { - // get the mode parameter - mode, ok := util.GetParam[string](spec, "mode") - if !ok { - return nil, ErrStoreInvalidSpec - } - switch mode { - //------------------------------------------------------------------ - // File-base storage - //------------------------------------------------------------------ - case "file": - return NewFileStore(spec) - } - return nil, ErrStoreUnknown -} - -//------------------------------------------------------------ -// NewKVStore creates a new storage handler with given spec -// for use with key/value string pairs. -func NewKVStore(spec util.ParameterSet) (KVStore, error) { - // get the mode parameter - mode, ok := util.GetParam[string](spec, "mode") - if !ok { - return nil, ErrStoreInvalidSpec - } - switch mode { - //-------------------------------------------------------------- - // Redis service - //-------------------------------------------------------------- - case "redis": - return NewRedisStore(spec) - - //-------------------------------------------------------------- - // SQL database service - //-------------------------------------------------------------- - case "sql": - return NewSQLStore(spec) - } - return nil, errors.New("unknown storage mechanism") -} - -//------------------------------------------------------------ -// Redis: only use for caching purposes on key/value strings -//------------------------------------------------------------ - -// RedisStore uses a (local) Redis server for key/value storage -type RedisStore struct { - client *redis.Client // client connection - db int // index to database -} - -// NewRedisStore creates a Redis service client instance. -func NewRedisStore(spec util.ParameterSet) (s KVStore, err error) { - // get connection parameters - addr, ok := util.GetParam[string](spec, "addr") - if !ok { - return nil, ErrStoreInvalidSpec - } - passwd, ok := util.GetParam[string](spec, "passwd") - if !ok { - passwd = "" - } - db, ok := util.GetParam[int](spec, "db") - if !ok { - return nil, ErrStoreInvalidSpec - } - - // create new Redis store - kvs := new(RedisStore) - kvs.db = db - kvs.client = redis.NewClient(&redis.Options{ - Addr: addr, - Password: passwd, - DB: db, - }) - if kvs.client == nil { - err = ErrStoreNotAvailable - } - s = kvs - return -} - -// Put value into storage under given key -func (s *RedisStore) Put(key string, value string) (err error) { - return s.client.Set(context.TODO(), key, value, 0).Err() -} - -// Get value with given key from storage -func (s *RedisStore) Get(key string) (value string, err error) { - return s.client.Get(context.TODO(), key).Result() -} - -// GetApprox returns the best-matching value for given key from storage -func (s *RedisStore) GetApprox(key string, crit func(string) bool) (value string, vkey any, err error) { - return "", "", ErrStoreNoApprox -} - -// List all keys in store -func (s *RedisStore) List() (keys []string, err error) { - var ( - crs uint64 - segm []string - ctx = context.TODO() - ) - keys = make([]string, 0) - for { - segm, crs, err = s.client.Scan(ctx, crs, "*", 10).Result() - if err != nil { - return - } - if crs == 0 { - break - } - keys = append(keys, segm...) - } - return -} - -// Close redis connection -func (s *RedisStore) Close() error { - return s.client.Close() -} - -//------------------------------------------------------------ -// SQL-based key-value-store -//------------------------------------------------------------ - -// SQLStore for generic SQL database handling -type SQLStore struct { - db *DBConn -} - -// NewSQLStore creates a new SQL-based key/value store. -func NewSQLStore(spec util.ParameterSet) (s KVStore, err error) { - // get connection parameters - connect, ok := util.GetParam[string](spec, "connect") - if !ok { - return nil, ErrStoreInvalidSpec - } - // create SQL store - kvs := new(SQLStore) - - // connect to SQL database - kvs.db, err = DBPool.Connect(connect) - if err != nil { - return nil, err - } - // get number of key/value pairs (as a check for existing table) - row := kvs.db.QueryRow("select count(*) from store") - var num int - if row.Scan(&num) != nil { - return nil, ErrStoreNotAvailable - } - return kvs, nil -} - -// Put a key/value pair into the store -func (s *SQLStore) Put(key string, value string) error { - _, err := s.db.Exec("insert into store(key,value) values(?,?)", key, value) - return err -} - -// Get a value for a given key from store -func (s *SQLStore) Get(key string) (value string, err error) { - row := s.db.QueryRow("select value from store where key=?", key) - err = row.Scan(&value) - return -} - -// GetApprox returns the best-matching value for given key from storage -func (s *SQLStore) GetApprox(key string, crit func(string) bool) (value string, vkey any, err error) { - return "", "", ErrStoreNoApprox -} - -// List all keys in store -func (s *SQLStore) List() (keys []string, err error) { - var ( - rows *sql.Rows - key string - ) - keys = make([]string, 0) - rows, err = s.db.Query("select key from store") - if err == nil { - for rows.Next() { - if err = rows.Scan(&key); err != nil { - break - } - keys = append(keys, key) - } - } - return -} - -// Close redis connection -func (s *SQLStore) Close() error { - return s.db.Close() -} diff --git a/src/gnunet/service/store/store_dht.go b/src/gnunet/service/store/store_dht.go @@ -0,0 +1,380 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package store + +import ( + "encoding/hex" + "fmt" + "gnunet/crypto" + "gnunet/service/dht/blocks" + "gnunet/service/dht/path" + "gnunet/util" + "os" + + "github.com/bfix/gospel/data" + "github.com/bfix/gospel/logger" + "github.com/bfix/gospel/math" +) + +//============================================================ +// Filesystem-based storage +//============================================================ + +//------------------------------------------------------------ +// DHT entry is an entity stored in the DHT +//------------------------------------------------------------ + +// DHTEntry to be stored to/retrieved from local storage +type DHTEntry struct { + Blk blocks.Block // reference to DHT block + Path *path.Path // associated put path +} + +//------------------------------------------------------------ +// DHT result is a single DHT result +//------------------------------------------------------------ + +// Result as returned by local DHT queries +type DHTResult struct { + Entry *DHTEntry // reference to DHT entry + Dist *math.Int // distance of entry to query key +} + +//------------------------------------------------------------ + +type DHTResultSet struct { + list []*DHTResult // list of DHT results + pos int // iterator position +} + +func NewDHTResultSet() *DHTResultSet { + return &DHTResultSet{ + list: make([]*DHTResult, 0), + pos: 0, + } +} + +func (rs *DHTResultSet) Add(r *DHTResult) { + rs.list = append(rs.list, r) +} + +func (rs *DHTResultSet) Next() (result *DHTResult) { + if rs.pos == len(rs.list) { + return nil + } + result = rs.list[rs.pos] + rs.pos++ + return +} + +//------------------------------------------------------------ +// DHT store +//------------------------------------------------------------ + +// DHTStore implements a filesystem-based storage mechanism for +// DHT queries and blocks. +type DHTStore struct { + path string // storage path + cache bool // storage works as cache + args util.ParameterSet // arguments / settings + totalSize uint64 // total storage size (logical, not physical) + + // storage-mode metadata + meta *FileMetaDB // database for metadata + maxSpace int // max. storage space in GB + + // cache-mode metadata + cacheMeta []*FileMetadata // cached metadata + wrPos int // write position in cyclic list + size int // size of cache (number of entries) +} + +// NewDHTStore instantiates a new file storage handler. +func NewDHTStore(spec util.ParameterSet) (*DHTStore, error) { + // create file store handler + fs := new(DHTStore) + fs.args = spec + + // get parameter + var ok bool + if fs.path, ok = util.GetParam[string](spec, "path"); !ok { + return nil, ErrStoreInvalidSpec + } + if fs.cache, ok = util.GetParam[bool](spec, "cache"); !ok { + fs.cache = false + } + + // setup file store depending on mode (storage/cache) + if fs.cache { + // remove old cache content + os.RemoveAll(fs.path) + // get number of cache entries + if fs.size, ok = util.GetParam[int](spec, "num"); !ok { + // defaults to 1000 entries + fs.size = 1000 + } + fs.cacheMeta = make([]*FileMetadata, fs.size) + } else { + // connect to metadata database + var err error + if fs.meta, err = OpenMetaDB(fs.path); err != nil { + return nil, err + } + // normal storage is limited by quota (default: 10GB) + if fs.maxSpace, ok = util.GetParam[int](spec, "maxGB"); !ok { + fs.maxSpace = 10 + } + } + return fs, nil +} + +// Close file storage. +func (s *DHTStore) Close() (err error) { + if !s.cache { + // close database connection + err = s.meta.Close() + } + return +} + +// Put block into storage under given key +func (s *DHTStore) Put(query blocks.Query, entry *DHTEntry) (err error) { + // check for free space + if !s.cache { + if int(s.totalSize>>30) > s.maxSpace { + // drop a significant number of blocks + s.prune(20) + } + } + // get parameters + btype := query.Type() + expire := entry.Blk.Expire() + blkSize := len(entry.Blk.Bytes()) + + // write entry to file for storage + if err = s.writeEntry(query.Key().Bits, entry); err != nil { + return + } + // compile metadata + now := util.AbsoluteTimeNow() + meta := &FileMetadata{ + key: query.Key(), + size: uint64(blkSize), + btype: btype, + bhash: crypto.Hash(entry.Blk.Bytes()), + expires: expire, + stored: now, + lastUsed: now, + usedCount: 1, + } + if s.cache { + // store in cyclic list + s.cacheMeta[s.wrPos] = meta + s.wrPos = (s.wrPos + 1) % s.size + } else { + // store metadata in database + if err = s.meta.Store(meta); err != nil { + return + } + // add to total storage size + s.totalSize += meta.size + } + return +} + +// Get block with given key from storage +func (s *DHTStore) Get(label string, query blocks.Query, rf blocks.ResultFilter) (results []*DHTEntry, err error) { + // check if we have metadata for the query + var mds []*FileMetadata + if mds, err = s.meta.Get(query); err != nil || len(mds) == 0 { + return + } + // traverse list of results + for _, md := range mds { + // check for expired entry + if md.expires.Expired() { + if err = s.dropFile(md); err != nil { + logger.Printf(logger.ERROR, "[%s] can't drop DHT file: %s", label, err) + } + continue + } + // check for filtered block + if rf.ContainsHash(md.bhash) { + continue + } + // read entry from storage + var entry *DHTEntry + if entry, err = s.readEntry(md.key.Bits); err != nil { + logger.Printf(logger.ERROR, "[%s] can't read DHT entry: %s", label, err) + continue + } + results = append(results, entry) + // mark the block as newly used + if err = s.meta.Used(md.key.Bits, md.btype); err != nil { + logger.Printf(logger.ERROR, "[%s] can't flag DHT entry as used: %s", label, err) + continue + } + } + return +} + +// GetApprox returns the best-matching value with given key from storage +// that is not excluded +func (s *DHTStore) GetApprox(label string, query blocks.Query, rf blocks.ResultFilter) (results []*DHTResult, err error) { + // iterate over all keys; process each metadata instance + // (append to results if appropriate) + process := func(md *FileMetadata) { + // check for filtered block. + if rf.ContainsHash(md.bhash) { + // filtered out... + return + } + // check distance (max. 16 bucktes off) + dist := util.Distance(md.key.Bits, query.Key().Bits) + if (512 - dist.BitLen()) > 16 { + return + } + // read entry from storage + var entry *DHTEntry + if entry, err = s.readEntry(md.key.Bits); err != nil { + logger.Printf(logger.ERROR, "[%s] failed to retrieve block for %s", label, md.key.String()) + return + } + // add to result list + result := &DHTResult{ + Entry: entry, + Dist: dist, + } + results = append(results, result) + } + // traverse mestadata database + err = s.meta.Traverse(process) + return +} + +//---------------------------------------------------------------------- + +type entryLayout struct { + SizeBlk uint16 `order:"big"` // size of block data + SizePth uint16 `order:"big"` // size of path data + Block []byte `size:"SizeBlk"` // block data + Path []byte `size:"SizePth"` // path data +} + +// read entry from storage for given key +func (s *DHTStore) readEntry(key []byte) (entry *DHTEntry, err error) { + // get path and filename from key + folder, fname := s.expandPath(key) + + // open file for reading + var file *os.File + if file, err = os.Open(folder + "/" + fname); err != nil { + return + } + defer file.Close() + + // get file size + fi, _ := file.Stat() + size := int(fi.Size()) + + // read data + val := new(entryLayout) + if err = data.UnmarshalStream(file, val, size); err != nil { + return + } + // assemble entry + entry = new(DHTEntry) + entry.Blk = blocks.NewGenericBlock(val.Block) + entry.Path, err = path.NewPathFromBytes(val.Path) + return +} + +// write entry to storage for given key +func (s *DHTStore) writeEntry(key []byte, entry *DHTEntry) (err error) { + // get folder and filename from key + folder, fname := s.expandPath(key) + // make sure the folder exists + if err = os.MkdirAll(folder, 0755); err != nil { + return + } + // write to file content (block data) + var file *os.File + if file, err = os.Create(folder + "/" + fname); err != nil { + return + } + defer file.Close() + + // assemble and write entry + val := new(entryLayout) + val.Block = entry.Blk.Bytes() + val.SizeBlk = uint16(len(val.Block)) + if entry.Path != nil { + val.Path = entry.Path.Bytes() + val.SizePth = uint16(len(val.Path)) + } else { + val.Path = nil + val.SizePth = 0 + } + err = data.MarshalStream(file, val) + return +} + +//---------------------------------------------------------------------- + +// expandPath returns the full path to the file for given key. +func (s *DHTStore) expandPath(key []byte) (string, string) { + h := hex.EncodeToString(key) + return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] +} + +// Prune list of file headers so we drop at least n entries. +// returns number of removed entries. +func (s *DHTStore) prune(n int) (del int) { + // collect obsolete records + obsolete, err := s.meta.Obsolete(n) + if err != nil { + logger.Println(logger.ERROR, "[FileStore] failed to collect obsolete records: "+err.Error()) + return + } + for _, md := range obsolete { + if err := s.dropFile(md); err != nil { + return + } + del++ + } + return +} + +// drop file removes a file from metadatabase and the physical storage. +func (s *DHTStore) dropFile(md *FileMetadata) (err error) { + // adjust total size + s.totalSize -= md.size + // remove from database + if err = s.meta.Drop(md.key.Bits, md.btype); err != nil { + logger.Printf(logger.ERROR, "[store] can't remove metadata (%s,%d): %s", md.key, md.btype, err.Error()) + return + } + // remove from filesystem + h := hex.EncodeToString(md.key.Bits) + path := fmt.Sprintf("%s/%s/%s/%s", s.path, h[:2], h[2:4], h[4:]) + if err = os.Remove(path); err != nil { + logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error()) + } + return +} diff --git a/src/gnunet/service/store/store_dht_meta.go b/src/gnunet/service/store/store_dht_meta.go @@ -0,0 +1,213 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package store + +import ( + "database/sql" + _ "embed" + "gnunet/crypto" + "gnunet/enums" + "gnunet/service/dht/blocks" + "gnunet/util" + "os" +) + +//============================================================ +// Metadata handling for file storage +//============================================================ + +// FileMetadata holds information about a file (raw block data) +// and is stored in a SQL database for faster access. +type FileMetadata struct { + key *crypto.HashCode // storage key + size uint64 // size of file + btype enums.BlockType // block type + bhash *crypto.HashCode // block hash + stored util.AbsoluteTime // time added to store + expires util.AbsoluteTime // expiration time + lastUsed util.AbsoluteTime // time last used + usedCount uint64 // usage count +} + +// NewFileMetadata creates a new file metadata instance +func NewFileMetadata() *FileMetadata { + return &FileMetadata{ + key: crypto.NewHashCode(nil), + bhash: crypto.NewHashCode(nil), + } +} + +//------------------------------------------------------------ +// Metadata database: A SQLite3 database to hold metadata about +// blocks in file storage +//------------------------------------------------------------ + +//go:embed store_dht_meta.sql +var initScript []byte + +// FileMetaDB is a SQLite3 database for block metadata +type FileMetaDB struct { + conn *DBConn // database connection +} + +// OpenMetaDB opens a metadata database in the given path. The name of the +// database is "access.db". +func OpenMetaDB(path string) (db *FileMetaDB, err error) { + // connect to database + dbFile := path + "/acccess.db" + if _, err = os.Stat(path + "/acccess.db"); err != nil { + var file *os.File + if file, err = os.Create(dbFile); err != nil { + return + } + file.Close() + } + db = new(FileMetaDB) + if db.conn, err = DBPool.Connect("sqlite3:" + dbFile); err != nil { + return + } + // check for initialized database + res := db.conn.QueryRow("select name from sqlite_master where type='table' and name='meta'") + var s string + if res.Scan(&s) != nil { + // initialize database + if _, err = db.conn.Exec(string(initScript)); err != nil { + return + } + } + return +} + +// Store metadata in database: creates or updates a record for the metadata +// in the database; primary key is the query key +func (db *FileMetaDB) Store(md *FileMetadata) (err error) { + sql := "replace into meta(qkey,btype,bhash,size,stored,expires,lastUsed,usedCount) values(?,?,?,?,?,?,?,?)" + _, err = db.conn.Exec(sql, + md.key.Bits, md.btype, md.bhash.Bits, md.size, md.stored.Epoch(), + md.expires.Epoch(), md.lastUsed.Epoch(), md.usedCount) + return +} + +// Get block metadata from database +func (db *FileMetaDB) Get(query blocks.Query) (mds []*FileMetadata, err error) { + // select rows in database matching the query + stmt := "select size,bhash,stored,expires,lastUsed,usedCount from meta where qkey=?" + btype := query.Type() + var rows *sql.Rows + if btype == enums.BLOCK_TYPE_ANY { + rows, err = db.conn.Query(stmt, query.Key().Bits) + } else { + rows, err = db.conn.Query(stmt+" and btype=?", query.Key().Bits, btype) + } + if err != nil { + return + } + // process results + for rows.Next() { + md := NewFileMetadata() + md.key = query.Key() + md.btype = btype + var st, exp, lu uint64 + if err = rows.Scan(&md.size, &md.bhash.Bits, &st, &exp, &lu, &md.usedCount); err != nil { + if err == sql.ErrNoRows { + md = nil + err = nil + } + return + } + md.stored.Val = st * 1000000 + md.expires.Val = exp * 1000000 + md.lastUsed.Val = lu * 1000000 + mds = append(mds, md) + } + return +} + +// Drop metadata for block from database +func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) (err error) { + if btype != enums.BLOCK_TYPE_ANY { + _, err = db.conn.Exec("delete from meta where qkey=? and btype=?", key, btype) + } else { + _, err = db.conn.Exec("delete from meta where qkey=?", key) + } + return +} + +// Used a block from store: increment usage count and lastUsed time. +func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) (err error) { + stmt := "update meta set usedCount=usedCount+1,lastUsed=unixepoch() where qkey=?" + if btype != enums.BLOCK_TYPE_ANY { + _, err = db.conn.Exec(stmt+" and btype=?", key, btype) + } else { + _, err = db.conn.Exec(stmt, key) + } + return +} + +// Obsolete collects records from the meta database that are considered +// "removable". Entries are rated by the value of "(lifetime * size) / usedCount" +func (db *FileMetaDB) Obsolete(n int) (removable []*FileMetadata, err error) { + // get obsolete records from database + rate := "(unixepoch()-unixepoch(stored))*size/usedCount" + stmt := "select qkey,btype from meta order by " + rate + " limit ?" + var rows *sql.Rows + if rows, err = db.conn.Query(stmt, n); err != nil { + return + } + var md *FileMetadata + for rows.Next() { + var st, exp, lu uint64 + if err = rows.Scan(&md.key, &md.btype, &md.size, &st, &exp, &lu, &md.usedCount); err != nil { + return + } + md.stored.Val = st * 1000000 + md.expires.Val = exp * 1000000 + md.lastUsed.Val = lu * 1000000 + removable = append(removable, md) + } + return +} + +// Traverse metadata records and call function on each record +func (db *FileMetaDB) Traverse(f func(*FileMetadata)) error { + sql := "select qkey,btype,bhash,size,stored,expires,lastUsed,usedCount from meta" + rows, err := db.conn.Query(sql) + if err != nil { + return err + } + md := NewFileMetadata() + for rows.Next() { + var st, exp, lu uint64 + err = rows.Scan(&md.key.Bits, &md.btype, &md.bhash.Bits, &md.size, &st, &exp, &lu, &md.usedCount) + if err != nil { + return err + } + md.stored.Val = st * 1000000 + md.expires.Val = exp * 1000000 + md.lastUsed.Val = lu * 1000000 + // call process function + f(md) + } + return nil +} + +// Close metadata database +func (db *FileMetaDB) Close() error { + return db.conn.Close() +} diff --git a/src/gnunet/service/store/store_dht_meta.sql b/src/gnunet/service/store/store_dht_meta.sql @@ -0,0 +1,30 @@ +-- This file is part of gnunet-go, a GNUnet-implementation in Golang. +-- Copyright (C) 2019-2022 Bernd Fix >Y< +-- +-- gnunet-go is free software: you can redistribute it and/or modify it +-- under the terms of the GNU Affero General Public License as published +-- by the Free Software Foundation, either version 3 of the License, +-- or (at your option) any later version. +-- +-- gnunet-go is distributed in the hope that it will be useful, but +-- WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +-- Affero General Public License for more details. +-- +-- You should have received a copy of the GNU Affero General Public License +-- along with this program. If not, see <http://www.gnu.org/licenses/>. +-- +-- SPDX-License-Identifier: AGPL3.0-or-later + +create table meta ( + qkey blob, -- key (SHA512 hash) + btype integer, -- block type + bhash blob, -- block hash + size integer, -- size of file + stored integer, -- time added to store + expires integer, -- expiration time + lastUsed integer, -- time last used + usedCount integer, -- usage count + + unique(qkey,btype) -- unique key in database +); diff --git a/src/gnunet/service/store/store_dht_test.go b/src/gnunet/service/store/store_dht_test.go @@ -0,0 +1,118 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package store + +import ( + "encoding/hex" + "gnunet/crypto" + "gnunet/enums" + "gnunet/service/dht/blocks" + "gnunet/util" + "math/rand" + "os" + "testing" +) + +// test constants +const ( + fsNumBlocks = 10 +) + +// TestDHTFileStore generates 'fsNumBlocks' fully-random blocks +// and stores them under their SHA512 key. It than retrieves +// each block from storage and checks for matching hash. +func TestDHTFilesStore(t *testing.T) { + // test configuration + path := "/tmp/dht-store" + defer func() { + os.RemoveAll(path) + }() + + cfg := make(util.ParameterSet) + cfg["mode"] = "file" + cfg["cache"] = false + cfg["path"] = path + cfg["maxGB"] = 10 + + // create file store + if _, err := os.Stat(path); err != nil { + if err = os.MkdirAll(path, 0755); err != nil { + t.Fatal(err) + } + } + fs, err := NewDHTStore(cfg) + if err != nil { + t.Fatal(err) + } + // allocate keys + keys := make([]blocks.Query, 0, fsNumBlocks) + // create result filter + rf := blocks.NewGenericResultFilter() + + // First round: save blocks + for i := 0; i < fsNumBlocks; i++ { + // generate random block + size := 1024 + rand.Intn(62000) //nolint:gosec // good enough for testing + buf := make([]byte, size) + if _, err = rand.Read(buf); err != nil { //nolint:gosec // good enough for testing + t.Fatal(err) + } + blk := blocks.NewGenericBlock(buf) + // generate associated key + k := crypto.Hash(buf) + key := blocks.NewGenericQuery(k, enums.BLOCK_TYPE_ANY, 0) + + // store entry + val := &DHTEntry{ + Blk: blk, + } + if err := fs.Put(key, val); err != nil { + t.Fatalf("[%d] %s", i, err) + } + // remember key + keys = append(keys, key) + } + + // Second round: retrieve blocks and check + for i, key := range keys { + // get block + vals, err := fs.Get("test", key, rf) + if err != nil { + t.Fatalf("[%d] %s", i, err) + } + if len(vals) != 1 { + t.Fatalf("[%d] only one result expected", i) + } + buf := vals[0].Blk.Bytes() + + // re-create key + k := crypto.Hash(buf) + + // do the keys match? + if !k.Equals(key.Key()) { + t.Log(hex.EncodeToString(k.Bits)) + t.Log(hex.EncodeToString(key.Key().Bits)) + t.Fatal("key/value mismatch") + } + } +} + +func TestDHTEntryStore(t *testing.T) { + // pth, sender, local := path.GenerateTestPath(10) +} diff --git a/src/gnunet/service/store/store_fs.go b/src/gnunet/service/store/store_fs.go @@ -1,328 +0,0 @@ -// This file is part of gnunet-go, a GNUnet-implementation in Golang. -// Copyright (C) 2019-2022 Bernd Fix >Y< -// -// gnunet-go is free software: you can redistribute it and/or modify it -// under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// gnunet-go is distributed in the hope that it will be useful, but -// WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. -// -// SPDX-License-Identifier: AGPL3.0-or-later - -package store - -import ( - "encoding/hex" - "fmt" - "gnunet/service/dht/blocks" - "gnunet/service/dht/path" - "gnunet/util" - "os" - - "github.com/bfix/gospel/data" - "github.com/bfix/gospel/logger" - "github.com/bfix/gospel/math" -) - -//============================================================ -// Filesystem-based storage -//============================================================ - -// FileStore implements a filesystem-based storage mechanism for -// DHT queries and blocks. -type FileStore struct { - path string // storage path - cache bool // storage works as cache - args util.ParameterSet // arguments / settings - totalSize uint64 // total storage size (logical, not physical) - - // storage-mode metadata - meta *FileMetaDB // database for metadata - maxSpace int // max. storage space in GB - - // cache-mode metadata - cacheMeta []*FileMetadata // cached metadata - wrPos int // write position in cyclic list - size int // size of cache (number of entries) -} - -// NewFileStore instantiates a new file storage. -func NewFileStore(spec util.ParameterSet) (DHTStore, error) { - // create file store handler - fs := new(FileStore) - fs.args = spec - - // get parameter - var ok bool - if fs.path, ok = util.GetParam[string](spec, "path"); !ok { - return nil, ErrStoreInvalidSpec - } - if fs.cache, ok = util.GetParam[bool](spec, "cache"); !ok { - fs.cache = false - } - - // setup file store depending on mode (storage/cache) - if fs.cache { - // remove old cache content - os.RemoveAll(fs.path) - // get number of cache entries - if fs.size, ok = util.GetParam[int](spec, "num"); !ok { - // defaults to 1000 entries - fs.size = 1000 - } - fs.cacheMeta = make([]*FileMetadata, fs.size) - } else { - // connect to metadata database - var err error - if fs.meta, err = OpenMetaDB(fs.path); err != nil { - return nil, err - } - // normal storage is limited by quota (default: 10GB) - if fs.maxSpace, ok = util.GetParam[int](spec, "maxGB"); !ok { - fs.maxSpace = 10 - } - } - return fs, nil -} - -// Close file storage. -func (s *FileStore) Close() (err error) { - if !s.cache { - // close database connection - err = s.meta.Close() - } - return -} - -// Put block into storage under given key -func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) { - // check for free space - if !s.cache { - if int(s.totalSize>>30) > s.maxSpace { - // drop a significant number of blocks - s.prune(20) - } - } - // get parameters - btype := query.Type() - expire := entry.Blk.Expire() - blkSize := len(entry.Blk.Bytes()) - - // write entry to file for storage - if err = s.writeEntry(query.Key().Bits, entry); err != nil { - return - } - // compile metadata - now := util.AbsoluteTimeNow() - meta := &FileMetadata{ - key: query.Key().Bits, - size: uint64(blkSize), - btype: btype, - expires: expire, - stored: now, - lastUsed: now, - usedCount: 1, - } - if s.cache { - // store in cyclic list - s.cacheMeta[s.wrPos] = meta - s.wrPos = (s.wrPos + 1) % s.size - } else { - // store metadata in database - if err = s.meta.Store(meta); err != nil { - return - } - // add to total storage size - s.totalSize += meta.size - } - return -} - -// Get block with given key from storage -func (s *FileStore) Get(query blocks.Query) (entry *DHTEntry, err error) { - // check if we have metadata for the query - key := query.Key().Bits - btype := query.Type() - var md *FileMetadata - if md, err = s.meta.Get(key, btype); err != nil || md == nil { - return - } - // check for expired entry - if md.expires.Expired() { - err = s.dropFile(md) - return - } - // mark the block as newly used - if err = s.meta.Used(key, btype); err != nil { - return - } - return s.readEntry(key) -} - -// GetApprox returns the best-matching value with given key from storage -// that is not excluded -func (s *FileStore) GetApprox(query blocks.Query, excl func(*DHTEntry) bool) (entry *DHTEntry, key any, err error) { - var bestKey []byte - var bestEntry *DHTEntry - var bestDist *math.Int - // distance function - dist := func(a, b []byte) *math.Int { - c := make([]byte, len(a)) - for i := range a { - c[i] = a[i] ^ b[i] - } - return math.NewIntFromBytes(c) - } - // iterate over all keys - check := func(md *FileMetadata) { - // check for better match - d := dist(md.key, query.Key().Bits) - var entry *DHTEntry - if bestKey == nil || d.Cmp(bestDist) < 0 { - // we might have a match. check block for exclusion - if entry, err = s.readEntry(md.key); err != nil { - logger.Printf(logger.ERROR, "[dhtstore] failed to retrieve block for %s", hex.EncodeToString(md.key)) - return - } - if excl(entry) { - return - } - // remember best match - bestKey = md.key - bestEntry = entry - bestDist = d - } - } - if err = s.meta.Traverse(check); err != nil { - return - } - if bestEntry != nil { - // mark the block as newly used - if err = s.meta.Used(bestKey, bestEntry.Blk.Type()); err != nil { - return - } - } - return bestEntry, bestDist, nil -} - -// Get a list of all stored block keys (generic query). -func (s *FileStore) List() ([]blocks.Query, error) { - return nil, ErrStoreNoList -} - -//---------------------------------------------------------------------- - -type entryLayout struct { - SizeBlk uint16 `order:"big"` // size of block data - SizePth uint16 `order:"big"` // size of path data - Block []byte `size:"SizeBlk"` // block data - Path []byte `size:"SizePth"` // path data -} - -// read entry from storage for given key -func (s *FileStore) readEntry(key []byte) (entry *DHTEntry, err error) { - // get path and filename from key - folder, fname := s.expandPath(key) - - // open file for reading - var file *os.File - if file, err = os.Open(folder + "/" + fname); err != nil { - return - } - defer file.Close() - - // get file size - fi, _ := file.Stat() - size := int(fi.Size()) - - // read data - val := new(entryLayout) - if err = data.UnmarshalStream(file, val, size); err != nil { - return - } - // assemble entry - entry = new(DHTEntry) - entry.Blk = blocks.NewGenericBlock(val.Block) - entry.Path, err = path.NewPathFromBytes(val.Path) - return -} - -// write entry to storage for given key -func (s *FileStore) writeEntry(key []byte, entry *DHTEntry) (err error) { - // get folder and filename from key - folder, fname := s.expandPath(key) - // make sure the folder exists - if err = os.MkdirAll(folder, 0755); err != nil { - return - } - // write to file content (block data) - var file *os.File - if file, err = os.Create(folder + "/" + fname); err != nil { - return - } - defer file.Close() - - // assemble and write entry - val := new(entryLayout) - val.Block = entry.Blk.Bytes() - val.SizeBlk = uint16(len(val.Block)) - if entry.Path != nil { - val.Path = entry.Path.Bytes() - val.SizePth = uint16(len(val.Path)) - } else { - val.Path = nil - val.SizePth = 0 - } - err = data.MarshalStream(file, val) - return -} - -//---------------------------------------------------------------------- - -// expandPath returns the full path to the file for given key. -func (s *FileStore) expandPath(key []byte) (string, string) { - h := hex.EncodeToString(key) - return fmt.Sprintf("%s/%s/%s", s.path, h[:2], h[2:4]), h[4:] -} - -// Prune list of file headers so we drop at least n entries. -// returns number of removed entries. -func (s *FileStore) prune(n int) (del int) { - // collect obsolete records - obsolete, err := s.meta.Obsolete(n) - if err != nil { - logger.Println(logger.ERROR, "[FileStore] failed to collect obsolete records: "+err.Error()) - return - } - for _, md := range obsolete { - if err := s.dropFile(md); err != nil { - return - } - del++ - } - return -} - -// drop file removes a file from metadatabase and the physical storage. -func (s *FileStore) dropFile(md *FileMetadata) (err error) { - // adjust total size - s.totalSize -= md.size - // remove from database - if err = s.meta.Drop(md.key, md.btype); err != nil { - logger.Printf(logger.ERROR, "[store] can't remove metadata (%s,%d): %s", md.key, md.btype, err.Error()) - return - } - // remove from filesystem - path := fmt.Sprintf("%s/%s/%s/%s", s.path, md.key[:2], md.key[2:4], md.key[4:]) - if err = os.Remove(path); err != nil { - logger.Printf(logger.ERROR, "[store] can't remove file %s: %s", path, err.Error()) - } - return -} diff --git a/src/gnunet/service/store/store_fs_meta.go b/src/gnunet/service/store/store_fs_meta.go @@ -1,177 +0,0 @@ -// This file is part of gnunet-go, a GNUnet-implementation in Golang. -// Copyright (C) 2019-2022 Bernd Fix >Y< -// -// gnunet-go is free software: you can redistribute it and/or modify it -// under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// gnunet-go is distributed in the hope that it will be useful, but -// WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -// Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. -// -// SPDX-License-Identifier: AGPL3.0-or-later - -package store - -import ( - "database/sql" - _ "embed" - "gnunet/enums" - "gnunet/util" - "os" -) - -//============================================================ -// Metadata handling for file storage -//============================================================ - -// FileMetadata holds information about a file (raw block data) -// and is stored in a SQL database for faster access. -type FileMetadata struct { - key []byte // storage key - size uint64 // size of file - btype enums.BlockType // block type - stored util.AbsoluteTime // time added to store - expires util.AbsoluteTime // expiration time - lastUsed util.AbsoluteTime // time last used - usedCount uint64 // usage count -} - -//------------------------------------------------------------ -// Metadata database: A SQLite3 database to hold metadata about -// blocks in file storage -//------------------------------------------------------------ - -//go:embed store_fs_meta.sql -var initScript []byte - -// FileMetaDB is a SQLite3 database for block metadata -type FileMetaDB struct { - conn *DBConn // database connection -} - -// OpenMetaDB opens a metadata database in the given path. The name of the -// database is "access.db". -func OpenMetaDB(path string) (db *FileMetaDB, err error) { - // connect to database - dbFile := path + "/acccess.db" - if _, err = os.Stat(path + "/acccess.db"); err != nil { - var file *os.File - if file, err = os.Create(dbFile); err != nil { - return - } - file.Close() - } - db = new(FileMetaDB) - if db.conn, err = DBPool.Connect("sqlite3:" + dbFile); err != nil { - return - } - // check for initialized database - res := db.conn.QueryRow("select name from sqlite_master where type='table' and name='meta'") - var s string - if res.Scan(&s) != nil { - // initialize database - if _, err = db.conn.Exec(string(initScript)); err != nil { - return - } - } - return -} - -// Store metadata in database: creates or updates a record for the metadata -// in the database; primary key is the query key -func (db *FileMetaDB) Store(md *FileMetadata) (err error) { - sql := "replace into meta(qkey,btype,size,stored,expires,lastUsed,usedCount) values(?,?,?,?,?,?,?)" - _, err = db.conn.Exec(sql, md.key, md.btype, md.size, md.stored.Epoch(), md.expires.Epoch(), md.lastUsed.Epoch(), md.usedCount) - return -} - -// Get block metadata from database -func (db *FileMetaDB) Get(key []byte, btype enums.BlockType) (md *FileMetadata, err error) { - md = new(FileMetadata) - md.key = util.Clone(key) - md.btype = btype - stmt := "select size,stored,expires,lastUsed,usedCount from meta where qkey=? and btype=?" - row := db.conn.QueryRow(stmt, key, btype) - var st, exp, lu uint64 - if err = row.Scan(&md.size, &st, &exp, &lu, &md.usedCount); err != nil { - if err == sql.ErrNoRows { - md = nil - err = nil - } - } else { - md.stored.Val = st * 1000000 - md.expires.Val = exp * 1000000 - md.lastUsed.Val = lu * 1000000 - } - return -} - -// Drop metadata for block from database -func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) error { - _, err := db.conn.Exec("delete from meta where qkey=? and btype=?", key, btype) - return err -} - -// Used a block from store: increment usage count and lastUsed time. -func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) error { - _, err := db.conn.Exec("update meta set usedCount=usedCount+1,lastUsed=unixepoch() where qkey=? and btype=?", key, btype) - return err -} - -// Obsolete collects records from the meta database that are considered -// "removable". Entries are rated by the value of "(lifetime * size) / usedCount" -func (db *FileMetaDB) Obsolete(n int) (removable []*FileMetadata, err error) { - // get obsolete records from database - rate := "(unixepoch()-unixepoch(stored))*size/usedCount" - stmt := "select qkey,btype from meta order by " + rate + " limit ?" - var rows *sql.Rows - if rows, err = db.conn.Query(stmt, n); err != nil { - return - } - var md *FileMetadata - for rows.Next() { - var st, exp, lu uint64 - if err = rows.Scan(&md.key, &md.btype, &md.size, &st, &exp, &lu, &md.usedCount); err != nil { - return - } - md.stored.Val = st * 1000000 - md.expires.Val = exp * 1000000 - md.lastUsed.Val = lu * 1000000 - removable = append(removable, md) - } - return -} - -// Traverse metadata records and call function on each record -func (db *FileMetaDB) Traverse(f func(*FileMetadata)) error { - sql := "select qkey,btype,size,stored,expires,lastUsed,usedCount from meta" - rows, err := db.conn.Query(sql) - if err != nil { - return err - } - md := new(FileMetadata) - for rows.Next() { - var st, exp, lu uint64 - err = rows.Scan(&md.key, &md.btype, &md.size, &st, &exp, &lu, &md.usedCount) - if err != nil { - return err - } - md.stored.Val = st * 1000000 - md.expires.Val = exp * 1000000 - md.lastUsed.Val = lu * 1000000 - // call process function - f(md) - } - return nil -} - -// Close metadata database -func (db *FileMetaDB) Close() error { - return db.conn.Close() -} diff --git a/src/gnunet/service/store/store_fs_meta.sql b/src/gnunet/service/store/store_fs_meta.sql @@ -1,29 +0,0 @@ --- This file is part of gnunet-go, a GNUnet-implementation in Golang. --- Copyright (C) 2019-2022 Bernd Fix >Y< --- --- gnunet-go is free software: you can redistribute it and/or modify it --- under the terms of the GNU Affero General Public License as published --- by the Free Software Foundation, either version 3 of the License, --- or (at your option) any later version. --- --- gnunet-go is distributed in the hope that it will be useful, but --- WITHOUT ANY WARRANTY; without even the implied warranty of --- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU --- Affero General Public License for more details. --- --- You should have received a copy of the GNU Affero General Public License --- along with this program. If not, see <http://www.gnu.org/licenses/>. --- --- SPDX-License-Identifier: AGPL3.0-or-later - -create table meta ( - qkey blob, -- key (SHA512 hash) - btype integer, -- block type - size integer, -- size of file - stored integer, -- time added to store - expires integer, -- expiration time - lastUsed integer, -- time last used - usedCount integer, -- usage count - - unique(qkey,btype) -- unique key in database -); diff --git a/src/gnunet/service/store/store_kv.go b/src/gnunet/service/store/store_kv.go @@ -0,0 +1,229 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package store + +import ( + "context" + "database/sql" + _ "embed" // use embedded filesystem + "errors" + "fmt" + "gnunet/util" + + redis "github.com/go-redis/redis/v8" +) + +// Error messages related to the key/value-store implementations +var ( + ErrStoreInvalidSpec = fmt.Errorf("invalid Store specification") + ErrStoreUnknown = fmt.Errorf("unknown Store type") + ErrStoreNotAvailable = fmt.Errorf("store not available") + ErrStoreNoApprox = fmt.Errorf("no approx search for store defined") + ErrStoreNoList = fmt.Errorf("no key listing for store defined") +) + +//------------------------------------------------------------ + +// KVStore us a eneric key/value storage interface. Can be used for +// persistent or transient (caching) storage of stringed key/value data. +type KVStore interface { + // Put value into storage under given key + Put(key string, val string) error + + // Get value with given key from storage + Get(key string) (string, error) + + // List all store keys + List() ([]string, error) + + // Close store + Close() error +} + +//------------------------------------------------------------ +// NewKVStore creates a new storage handler with given spec +// for use with key/value string pairs. +func NewKVStore(spec util.ParameterSet) (KVStore, error) { + // get the mode parameter + mode, ok := util.GetParam[string](spec, "mode") + if !ok { + return nil, ErrStoreInvalidSpec + } + switch mode { + //-------------------------------------------------------------- + // Redis service + //-------------------------------------------------------------- + case "redis": + return NewRedisStore(spec) + + //-------------------------------------------------------------- + // SQL database service + //-------------------------------------------------------------- + case "sql": + return NewSQLStore(spec) + } + return nil, errors.New("unknown storage mechanism") +} + +//------------------------------------------------------------ +// Redis: only use for caching purposes on key/value strings +//------------------------------------------------------------ + +// RedisStore uses a (local) Redis server for key/value storage +type RedisStore struct { + client *redis.Client // client connection + db int // index to database +} + +// NewRedisStore creates a Redis service client instance. +func NewRedisStore(spec util.ParameterSet) (s KVStore, err error) { + // get connection parameters + addr, ok := util.GetParam[string](spec, "addr") + if !ok { + return nil, ErrStoreInvalidSpec + } + passwd, ok := util.GetParam[string](spec, "passwd") + if !ok { + passwd = "" + } + db, ok := util.GetParam[int](spec, "db") + if !ok { + return nil, ErrStoreInvalidSpec + } + + // create new Redis store + kvs := new(RedisStore) + kvs.db = db + kvs.client = redis.NewClient(&redis.Options{ + Addr: addr, + Password: passwd, + DB: db, + }) + if kvs.client == nil { + err = ErrStoreNotAvailable + } + s = kvs + return +} + +// Put value into storage under given key +func (s *RedisStore) Put(key string, value string) (err error) { + return s.client.Set(context.TODO(), key, value, 0).Err() +} + +// Get value with given key from storage +func (s *RedisStore) Get(key string) (value string, err error) { + return s.client.Get(context.TODO(), key).Result() +} + +// List all keys in store +func (s *RedisStore) List() (keys []string, err error) { + var ( + crs uint64 + segm []string + ctx = context.TODO() + ) + keys = make([]string, 0) + for { + segm, crs, err = s.client.Scan(ctx, crs, "*", 10).Result() + if err != nil { + return + } + if crs == 0 { + break + } + keys = append(keys, segm...) + } + return +} + +// Close redis connection +func (s *RedisStore) Close() error { + return s.client.Close() +} + +//------------------------------------------------------------ +// SQL-based key-value-store +//------------------------------------------------------------ + +// SQLStore for generic SQL database handling +type SQLStore struct { + db *DBConn +} + +// NewSQLStore creates a new SQL-based key/value store. +func NewSQLStore(spec util.ParameterSet) (s KVStore, err error) { + // get connection parameters + connect, ok := util.GetParam[string](spec, "connect") + if !ok { + return nil, ErrStoreInvalidSpec + } + // create SQL store + kvs := new(SQLStore) + + // connect to SQL database + kvs.db, err = DBPool.Connect(connect) + if err != nil { + return nil, err + } + // get number of key/value pairs (as a check for existing table) + row := kvs.db.QueryRow("select count(*) from store") + var num int + if row.Scan(&num) != nil { + return nil, ErrStoreNotAvailable + } + return kvs, nil +} + +// Put a key/value pair into the store +func (s *SQLStore) Put(key string, value string) error { + _, err := s.db.Exec("insert into store(key,value) values(?,?)", key, value) + return err +} + +// Get a value for a given key from store +func (s *SQLStore) Get(key string) (value string, err error) { + row := s.db.QueryRow("select value from store where key=?", key) + err = row.Scan(&value) + return +} + +// List all keys in store +func (s *SQLStore) List() (keys []string, err error) { + var ( + rows *sql.Rows + key string + ) + keys = make([]string, 0) + rows, err = s.db.Query("select key from store") + if err == nil { + for rows.Next() { + if err = rows.Scan(&key); err != nil { + break + } + keys = append(keys, key) + } + } + return +} + +// Close redis connection +func (s *SQLStore) Close() error { + return s.db.Close() +} diff --git a/src/gnunet/transport/responder.go b/src/gnunet/transport/responder.go @@ -33,8 +33,9 @@ type Responder interface { // Handle outgoing message Send(ctx context.Context, msg message.Message) error - // Receiver returns the receiving peer (string representation) - Receiver() string + // Receiver returns the receiving peer. Returns nil if + // this is a local responder (service.Connection) + Receiver() *util.PeerID } //---------------------------------------------------------------------- @@ -55,6 +56,6 @@ func (r *TransportResponder) Send(ctx context.Context, msg message.Message) erro } // Receiver returns the receiving peer id -func (r *TransportResponder) Receiver() string { - return r.Peer.String() +func (r *TransportResponder) Receiver() *util.PeerID { + return r.Peer } diff --git a/src/gnunet/util/map.go b/src/gnunet/util/map.go @@ -152,7 +152,7 @@ func (m *Map[K, V]) GetRandom(pid int) (key K, value V, ok bool) { ok = false if size := m.Size(); size > 0 { - idx := rand.Intn(size) + idx := rand.Intn(size) //nolint:gosec // good enough for selection for key, value = range m.list { if idx == 0 { ok = true diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go @@ -24,6 +24,7 @@ import ( "strings" "github.com/bfix/gospel/data" + "github.com/bfix/gospel/math" ) //---------------------------------------------------------------------- @@ -76,6 +77,16 @@ func GetParam[V any](params ParameterSet, key string) (i V, ok bool) { // additional helpers //---------------------------------------------------------------------- +// Distance returns the XOR distance between to byte arrays +func Distance(a, b []byte) *math.Int { + size := len(a) + d := make([]byte, size) + for i := range d { + d[i] = a[i] ^ b[i] + } + return math.NewIntFromBytes(d) +} + // StripPathRight returns a dot-separated path without // its last (right-most) element. func StripPathRight(s string) string { diff --git a/src/gnunet/util/peer.go b/src/gnunet/util/peer.go @@ -38,7 +38,7 @@ func NewPeerPublicKey(data []byte) *PeerPublicKey { pk := new(PeerPublicKey) size := pk.Size() v := make([]byte, size) - if data != nil && len(data) > 0 { + if len(data) > 0 { if uint(len(data)) < size { CopyAlignedBlock(v, data) } else { @@ -68,7 +68,7 @@ func (pk *PeerPublicKey) Verify(data []byte, sig *PeerSignature) (bool, error) { // Peer identifier: //---------------------------------------------------------------------- -// PeerID is a wrpped PeerPublicKey +// PeerID is a wrapped PeerPublicKey type PeerID struct { PeerPublicKey } @@ -109,7 +109,7 @@ func NewPeerSignature(data []byte) *PeerSignature { s := new(PeerSignature) size := s.Size() v := make([]byte, size) - if data != nil && len(data) > 0 { + if len(data) > 0 { if uint(len(data)) < size { CopyAlignedBlock(v, data) } else {