gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

commit 3cee953814e10b8e8bca10164e7f25e93f4a6d3f
parent 21d7292dbd062ff11194fdc235a3d54830d7ba57
Author: Bernd Fix <brf@hoi-polloi.org>
Date:   Wed, 24 Aug 2022 17:27:59 +0200

Integration tests: more bug fixes.

Diffstat:
Msrc/gnunet/crypto/signature.go | 7++++++-
Asrc/gnunet/service/dht/blocks/default.go | 162+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/gnunet/service/dht/blocks/filters.go | 96++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Msrc/gnunet/service/dht/blocks/generic.go | 31++++++++++++++++++++++---------
Msrc/gnunet/service/dht/blocks/gns.go | 4++++
Msrc/gnunet/service/dht/blocks/handlers.go | 1+
Msrc/gnunet/service/dht/blocks/hello.go | 96+++++++++----------------------------------------------------------------------
Msrc/gnunet/service/dht/messages.go | 70+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
Msrc/gnunet/service/dht/module.go | 10+++++++---
Msrc/gnunet/service/dht/path/handling.go | 13+++++++++----
Msrc/gnunet/service/dht/resulthandler.go | 22+++++++++++++++++++---
Msrc/gnunet/service/dht/routingtable.go | 26+++++++++++++-------------
Msrc/gnunet/service/namecache/module.go | 3++-
Msrc/gnunet/service/store/store_dht.go | 121+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
Msrc/gnunet/service/store/store_dht_test.go | 2+-
Msrc/gnunet/transport/reader_writer.go | 16+++++++++-------
Msrc/gnunet/util/peer.go | 3+++
17 files changed, 466 insertions(+), 217 deletions(-)

diff --git a/src/gnunet/crypto/signature.go b/src/gnunet/crypto/signature.go @@ -29,7 +29,7 @@ type SignaturePurpose struct { Purpose enums.SigPurpose `order:"big"` // Signature purpose } -// Signable interface for objects that can get signed by peer +// Signable interface for objects that can get signed by a Signer type Signable interface { // SignedData returns the byte array to be signed SignedData() []byte @@ -37,3 +37,8 @@ type Signable interface { // SetSignature returns the signature to the signable object SetSignature(*util.PeerSignature) error } + +// Signer instance for creating signatures +type Signer interface { + Sign(Signable) error +} diff --git a/src/gnunet/service/dht/blocks/default.go b/src/gnunet/service/dht/blocks/default.go @@ -0,0 +1,162 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package blocks + +import ( + "fmt" + "gnunet/crypto" + "gnunet/enums" + "gnunet/util" +) + +//---------------------------------------------------------------------- +// TEST block +//---------------------------------------------------------------------- + +// TestBlock (BLOCK_TYPE_TEST) is a block for testing the DHT with non-HELLO +// blocks. Applications using the DHT are encouraged to define custom blocks +// with appropriate internal logic. TestBlocks are just a pile of bits that +// never expire... +type TestBlock struct { + expire util.AbsoluteTime `` // expiry (transient!) + Data []byte `size:"*"` // block data +} + +// NewTestBlock creates a new empty test block +func NewTestBlock() Block { + return &TestBlock{ + expire: util.AbsoluteTimeNever(), + Data: nil, + } +} + +// Prepare a block to be of given type and expiration. +// Use expiration date for test block. +func (t *TestBlock) Prepare(_ enums.BlockType, expire util.AbsoluteTime) { + t.expire = expire +} + +// Return the block type +func (t *TestBlock) Type() enums.BlockType { + return enums.BLOCK_TYPE_TEST +} + +// Bytes returns the raw block data +func (t *TestBlock) Bytes() []byte { + return util.Clone(t.Data) +} + +// Expire returns the block expiration +func (t *TestBlock) Expire() util.AbsoluteTime { + return t.expire +} + +// String returns the human-readable representation of a block +func (t *TestBlock) String() string { + return fmt.Sprintf("TestBlock{%d bytes}", len(t.Data)) +} + +// Verify the integrity of a block (optional). Override in custom query +// types to implement block-specific integrity checks (see GNSBlock for +// example). This verification is usually weaker than the verification +// method from a Query (see GNSBlock.Verify for explanation). +func (t *TestBlock) Verify() (bool, error) { + // no internal verification defined. All good. + return true, nil +} + +//---------------------------------------------------------------------- +// TEST block handler +//---------------------------------------------------------------------- + +// TestBlockHandler methods related to HELLO blocks +type TestBlockHandler struct{} + +// Parse a block instance from binary data +func (bh *TestBlockHandler) ParseBlock(buf []byte) (Block, error) { + return &TestBlock{ + Data: util.Clone(buf), + }, nil +} + +// ValidateHelloBlockQuery validates query parameters for a +// DHT-GET request for HELLO blocks. +func (bh *TestBlockHandler) ValidateBlockQuery(key *crypto.HashCode, xquery []byte) bool { + // no internal logic + return true +} + +// ValidateBlockKey returns true if the block key is the same as the +// query key used to access the block. +func (bh *TestBlockHandler) ValidateBlockKey(b Block, key *crypto.HashCode) bool { + // no internal logic + return true +} + +// DeriveBlockKey is used to synthesize the block key from the block +// payload as part of PutMessage and ResultMessage processing. The special +// return value of 'nil' implies that this block type does not permit +// deriving the key from the block. A Key may be returned for a block that +// is ill-formed. +func (bh *TestBlockHandler) DeriveBlockKey(b Block) *crypto.HashCode { + return nil +} + +// ValidateBlockStoreRequest is used to evaluate a block payload as part of +// PutMessage and ResultMessage processing. +// To validate a block store request is to verify the EdDSA SIGNATURE over +// the hashed ADDRESSES against the public key from the peer ID field. If the +// signature is valid true is returned. +func (bh *TestBlockHandler) ValidateBlockStoreRequest(b Block) bool { + // no internal logic + return true +} + +// SetupResultFilter is used to setup an empty result filter. The arguments +// are the set of results that must be filtered at the initiator, and a +// MUTATOR value which MAY be used to deterministically re-randomize +// probabilistic data structures. +func (bh *TestBlockHandler) SetupResultFilter(filterSize int, mutator uint32) ResultFilter { + return NewGenericResultFilter(filterSize, mutator) +} + +// ParseResultFilter from binary data +func (bh *TestBlockHandler) ParseResultFilter(data []byte) ResultFilter { + return NewGenericResultFilterFromBytes(data) +} + +// FilterResult is used to filter results against specific queries. This +// function does not check the validity of the block itself or that it +// matches the given key, as this must have been checked earlier. Thus, +// locally stored blocks from previously observed ResultMessages and +// PutMessages use this function to perform filtering based on the request +// parameters of a particular GET operation. Possible values for the +// FilterEvaluationResult are defined above. If the main evaluation result +// is RF_MORE, the function also returns and updated result filter where +// the block is added to the set of filtered replies. An implementation is +// not expected to actually differentiate between the RF_DUPLICATE and +// RF_IRRELEVANT return values: in both cases the block is ignored for +// this query. +func (bh *TestBlockHandler) FilterResult(b Block, key *crypto.HashCode, rf ResultFilter, xQuery []byte) int { + if rf.Contains(b) { + return RF_DUPLICATE + } + rf.Add(b) + return RF_LAST +} diff --git a/src/gnunet/service/dht/blocks/filters.go b/src/gnunet/service/dht/blocks/filters.go @@ -125,81 +125,87 @@ type ResultFilter interface { } //---------------------------------------------------------------------- -// Generic result filter: -// Filter duplicate blocks (identical hash value over content) +// Generic result filter //---------------------------------------------------------------------- -// GenericResultFilter is a dummy result filter with no state. +// GenericResultFilter is the default resultfilter implementation for +// DHT blocks. It is used by the two predefined block types (BLOCK_TYPE_TEST +// and BLOCK_TYPE_DHT_URL_HELLO) and can serve custom blocks as well if +// no custom result filter is required. type GenericResultFilter struct { bf *BloomFilter } -// NewGenericResultFilter creates a new empty result bloom filter -func NewGenericResultFilter() *GenericResultFilter { - return &GenericResultFilter{ - bf: NewBloomFilter(128), +// NewGenericResultFilter initializes an empty result filter +func NewGenericResultFilter(filterSize int, mutator uint32) *GenericResultFilter { + // HELLO result filters are BloomFilters with a mutator + rf := new(GenericResultFilter) + rf.bf = NewBloomFilter(filterSize) + rf.bf.SetMutator(mutator) + return rf +} + +// NewGenericResultFilterFromBytes creates a new result filter from a binary +// representation: 'data' is the concatenaion 'mutator|bloomfilter'. +// If 'withMutator' is false, no mutator is used. +func NewGenericResultFilterFromBytes(data []byte) *GenericResultFilter { + //logger.Printf(logger.DBG, "[filter] FromBytes = %d:%s (mutator: %v)",len(data), hex.EncodeToString(data), withMutator) + + // handle mutator input + mSize := 4 + rf := new(GenericResultFilter) + rf.bf = &BloomFilter{ + Bits: util.Clone(data[mSize:]), } + if mSize > 0 { + rf.bf.SetMutator(data[:mSize]) + } + return rf } -// Add a block to the result filter. +// Add a HELLO block to th result filter func (rf *GenericResultFilter) Add(b Block) { - bh := crypto.Hash(b.Bytes()) - rf.bf.Add(bh.Data) + if hb, ok := b.(*HelloBlock); ok { + hAddr := sha512.Sum512(hb.AddrBin) + rf.bf.Add(hAddr[:]) + } } -// Contains returns true if a block is filtered +// Contains checks if a block is contained in the result filter func (rf *GenericResultFilter) Contains(b Block) bool { - bh := crypto.Hash(b.Bytes()) - return rf.bf.Contains(bh.Data) + if hb, ok := b.(*HelloBlock); ok { + hAddr := sha512.Sum512(hb.AddrBin) + return rf.bf.Contains(hAddr[:]) + } + return false } -// ContainsHash returns true if a block hash is filtered +// ContainsHash checks if a block hash is contained in the result filter func (rf *GenericResultFilter) ContainsHash(bh *crypto.HashCode) bool { return rf.bf.Contains(bh.Data) } -// Bytes returns the binary representation of a result filter -func (rf *GenericResultFilter) Bytes() (buf []byte) { +// Bytes returns a binary representation of a HELLO result filter +func (rf *GenericResultFilter) Bytes() []byte { return rf.bf.Bytes() } -// Merge two result filters -func (rf *GenericResultFilter) Merge(t ResultFilter) bool { - // check for correct type +// Compare two HELLO result filters +func (rf *GenericResultFilter) Compare(t ResultFilter) int { trf, ok := t.(*GenericResultFilter) if !ok { - return false - } - // check for identical mutator (if any) - if !bytes.Equal(rf.bf.mInput, trf.bf.mInput) { - return false - } - // check for same size - if len(rf.bf.Bits) != len(trf.bf.Bits) { - return false - } - // merge bloomfilters - for i := range rf.bf.Bits { - rf.bf.Bits[i] ^= trf.bf.Bits[i] + return CMP_DIFFER } - return true + return rf.bf.Compare(trf.bf) } -// Compare two result filters -func (rf *GenericResultFilter) Compare(t ResultFilter) int { +// Merge two HELLO result filters +func (rf *GenericResultFilter) Merge(t ResultFilter) bool { trf, ok := t.(*GenericResultFilter) if !ok { - return CMP_DIFFER - } - // check for identical mutator (if any) - if !bytes.Equal(rf.bf.mInput, trf.bf.mInput) { - return CMP_DIFFER - } - // check for identical bits - if bytes.Equal(rf.bf.Bits, trf.bf.Bits) { - return CMP_SAME + return false } - return CMP_MERGE + return rf.bf.Merge(trf.bf) } //====================================================================== diff --git a/src/gnunet/service/dht/blocks/generic.go b/src/gnunet/service/dht/blocks/generic.go @@ -80,6 +80,10 @@ type Block interface { // String returns the human-readable representation of a block String() string + + // Prepare a block to be of given type and expiration. Block types + // decide if and which information to change/set in the block instance. + Prepare(enums.BlockType, util.AbsoluteTime) } // Unwrap (raw) block to a specific block type @@ -173,6 +177,12 @@ func NewGenericBlock(btype enums.BlockType, expire util.AbsoluteTime, blk []byte } } +// Prepare a block to be of given type and expiration. +func (b *GenericBlock) Prepare(btype enums.BlockType, expire util.AbsoluteTime) { + b.BType = btype + b.Expire_ = expire +} + // Bytes returns the DHT block data (unstructured without type and // expiration information. func (b *GenericBlock) Bytes() []byte { @@ -184,9 +194,9 @@ func (b *GenericBlock) Type() enums.BlockType { return b.BType } -// Expire returns the block expiration (never for custom blocks) +// Expire returns the block expiration func (b *GenericBlock) Expire() util.AbsoluteTime { - return util.AbsoluteTimeNever() + return b.Expire_ } // Verify the integrity of a block (optional). Override in custom query @@ -199,7 +209,7 @@ func (b *GenericBlock) Verify() (bool, error) { // String returns the human-readable representation of a block func (b *GenericBlock) String() string { - return fmt.Sprintf("Block{type=%s,expire=%s,data=[%d]", b.BType, b.Expire_, len(b.Data)) + return fmt.Sprintf("Block{type=%s,expire=%s,data=[%d]}", b.BType, b.Expire_, len(b.Data)) } //---------------------------------------------------------------------- @@ -211,16 +221,19 @@ var ( blkFactory = map[enums.BlockType]func() Block{ enums.BLOCK_TYPE_GNS_NAMERECORD: NewGNSBlock, enums.BLOCK_TYPE_DHT_URL_HELLO: NewHelloBlock, + enums.BLOCK_TYPE_TEST: NewTestBlock, } ) // NewGenericBlock creates a Block from binary data. -func NewBlock(btype enums.BlockType, expires util.AbsoluteTime, blk []byte) (b Block, err error) { - fac, ok := blkFactory[btype] - if !ok { - return NewGenericBlock(btype, expires, blk), nil +func NewBlock(btype enums.BlockType, expire util.AbsoluteTime, blk []byte) (b Block, err error) { + if fac, ok := blkFactory[btype]; ok { + b = fac() + if err = data.Unmarshal(b, blk); err == nil { + b.Prepare(btype, expire) + } + } else { + b, err = NewGenericBlock(btype, expire, blk), nil } - b = fac() - err = data.Unmarshal(b, blk) return } diff --git a/src/gnunet/service/dht/blocks/gns.go b/src/gnunet/service/dht/blocks/gns.go @@ -177,6 +177,10 @@ func NewGNSBlock() Block { } } +// Prepare a block to be of given type and expiration. +// Not required for GNS blocks +func (b *GNSBlock) Prepare(enums.BlockType, util.AbsoluteTime) {} + // Verify the integrity of the block data from a signature. // Only the cryptographic signature is verified; the formal correctness of // the association between the block and a GNS label in a GNS zone can't diff --git a/src/gnunet/service/dht/blocks/handlers.go b/src/gnunet/service/dht/blocks/handlers.go @@ -84,4 +84,5 @@ func init() { // add validation functions BlockHandlers[enums.BLOCK_TYPE_DHT_URL_HELLO] = new(HelloBlockHandler) + BlockHandlers[enums.BLOCK_TYPE_TEST] = new(TestBlockHandler) } diff --git a/src/gnunet/service/dht/blocks/hello.go b/src/gnunet/service/dht/blocks/hello.go @@ -20,7 +20,6 @@ package blocks import ( "bytes" - "crypto/sha512" "errors" "fmt" "gnunet/crypto" @@ -192,6 +191,10 @@ func ParseHelloBlockFromBytes(buf []byte) (h *HelloBlock, err error) { return } +// Prepare a block to be of given type and expiration. +// Not required for HELLO blocks +func (h *HelloBlock) Prepare(enums.BlockType, util.AbsoluteTime) {} + // finalize block data (generate dependent fields) func (h *HelloBlock) finalize() (err error) { if h.addrs == nil { @@ -199,15 +202,19 @@ func (h *HelloBlock) finalize() (err error) { pos := 0 h.addrs = make([]*util.Address, 0) for { + // reconstruct address string var as string as, pos = util.ReadCString(h.AddrBin, pos) if pos == -1 { break } + // convert to target address type var addr *util.Address if addr, err = util.ParseAddress(as); err != nil { return } + addr.Expire = h.Expire_ + // append to list h.addrs = append(h.addrs, addr) } } else if h.AddrBin == nil { @@ -245,7 +252,7 @@ func (h *HelloBlock) Expire() util.AbsoluteTime { // String returns the human-readable representation of a block func (h *HelloBlock) String() string { return fmt.Sprintf("HelloBlock{peer=%s,expires=%s,addrs=[%d]}", - h.PeerID, h.Expire_, len(h.Addresses())) + h.PeerID.Short(), h.Expire_, len(h.Addresses())) } // URL returns the HELLO URL for the data. @@ -405,12 +412,12 @@ func (bh *HelloBlockHandler) ValidateBlockStoreRequest(b Block) bool { // MUTATOR value which MAY be used to deterministically re-randomize // probabilistic data structures. func (bh *HelloBlockHandler) SetupResultFilter(filterSize int, mutator uint32) ResultFilter { - return NewHelloResultFilter(filterSize, mutator) + return NewGenericResultFilter(filterSize, mutator) } // ParseResultFilter from binary data func (bh *HelloBlockHandler) ParseResultFilter(data []byte) ResultFilter { - return NewHelloResultFilterFromBytes(data) + return NewGenericResultFilterFromBytes(data) } // FilterResult is used to filter results against specific queries. This @@ -432,84 +439,3 @@ func (bh *HelloBlockHandler) FilterResult(b Block, key *crypto.HashCode, rf Resu rf.Add(b) return RF_LAST } - -//---------------------------------------------------------------------- -// HELLO result filter -//---------------------------------------------------------------------- - -// HelloResultFilter is a result filter implementation for HELLO blocks -type HelloResultFilter struct { - bf *BloomFilter -} - -// NewHelloResultFilter initializes an empty resut filter -func NewHelloResultFilter(filterSize int, mutator uint32) *HelloResultFilter { - // HELLO result filters are BloomFilters with a mutator - rf := new(HelloResultFilter) - rf.bf = NewBloomFilter(filterSize) - rf.bf.SetMutator(mutator) - return rf -} - -// NewHelloResultFilterFromBytes creates a new result filter from a binary -// representation: 'data' is the concatenaion 'mutator|bloomfilter'. -// If 'withMutator' is false, no mutator is used. -func NewHelloResultFilterFromBytes(data []byte) *HelloResultFilter { - //logger.Printf(logger.DBG, "[filter] FromBytes = %d:%s (mutator: %v)",len(data), hex.EncodeToString(data), withMutator) - - // handle mutator input - mSize := 4 - rf := new(HelloResultFilter) - rf.bf = &BloomFilter{ - Bits: util.Clone(data[mSize:]), - } - if mSize > 0 { - rf.bf.SetMutator(data[:mSize]) - } - return rf -} - -// Add a HELLO block to th result filter -func (rf *HelloResultFilter) Add(b Block) { - if hb, ok := b.(*HelloBlock); ok { - hAddr := sha512.Sum512(hb.AddrBin) - rf.bf.Add(hAddr[:]) - } -} - -// Contains checks if a block is contained in the result filter -func (rf *HelloResultFilter) Contains(b Block) bool { - if hb, ok := b.(*HelloBlock); ok { - hAddr := sha512.Sum512(hb.AddrBin) - return rf.bf.Contains(hAddr[:]) - } - return false -} - -// ContainsHash checks if a block hash is contained in the result filter -func (rf *HelloResultFilter) ContainsHash(bh *crypto.HashCode) bool { - return rf.bf.Contains(bh.Data) -} - -// Bytes returns a binary representation of a HELLO result filter -func (rf *HelloResultFilter) Bytes() []byte { - return rf.bf.Bytes() -} - -// Compare two HELLO result filters -func (rf *HelloResultFilter) Compare(t ResultFilter) int { - trf, ok := t.(*HelloResultFilter) - if !ok { - return CMP_DIFFER - } - return rf.bf.Compare(trf.bf) -} - -// Merge two HELLO result filters -func (rf *HelloResultFilter) Merge(t ResultFilter) bool { - trf, ok := t.(*HelloResultFilter) - if !ok { - return false - } - return rf.bf.Merge(trf.bf) -} diff --git a/src/gnunet/service/dht/messages.go b/src/gnunet/service/dht/messages.go @@ -98,11 +98,12 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m } } else { // ... or create a new one + mut := util.RndUInt32() if blockHdlr != nil { - rf = blockHdlr.SetupResultFilter(128, util.RndUInt32()) + rf = blockHdlr.SetupResultFilter(128, mut) } else { logger.Printf(logger.WARN, "[%s] using default result filter", label) - rf = blocks.NewGenericResultFilter() + rf = blocks.NewGenericResultFilter(128, mut) } } // clone peer filter @@ -131,6 +132,10 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m if btype == enums.BLOCK_TYPE_DHT_URL_HELLO { // try to find results in HELLO cache results = m.lookupHelloCache(label, addr, rf, approx) + // DEBUG: + for i, res := range results { + logger.Printf(logger.DBG, "[%s] cache #%d = %s", label, i, res) + } } //-------------------------------------------------------------- @@ -142,8 +147,29 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m // get results from local storage lclResults, err := m.getLocalStorage(label, query, rf) if err == nil { - // append local results - results = append(results, lclResults...) + // DEBUG: + for i, res := range lclResults { + logger.Printf(logger.DBG, "[%s] local #%d = %s", label, i, res) + } + // create total result list + if len(results) == 0 { + results = lclResults + } else if len(results)+len(lclResults) <= 10 { + // handle few results directly + results = append(results, lclResults...) + } else { + // compile a new sorted list from results. + list := store.NewSortedDHTResults(10) + for pos, res := range results { + list.Add(res, pos) + } + for _, res := range lclResults { + if pos := list.Accepts(res.Dist); pos != -1 { + list.Add(res, pos) + } + } + results = list.GetResults() + } } } // if we have results, send them as response on the back channel @@ -159,7 +185,11 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m pth = result.Entry.Path.Clone() pth.SplitPos = pth.NumList pe := pth.NewElement(pth.LastHop, local, back.Receiver()) - pth.Add(pe) + if err := m.core.Sign(pe); err != nil { + logger.Printf(logger.ERROR, "[%s] failed to sign path element: %s", label, err.Error()) + } else { + pth.Add(pe) + } } logger.Printf(logger.INFO, "[%s] sending result message to %s", label, rcv) @@ -186,7 +216,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m } pf.Add(p.Peer) // create open get-forward result handler - rh := NewResultHandler(msg, rf, back) + rh := NewResultHandler(msg, rf, back, m.core) logger.Printf(logger.INFO, "[%s] result handler task #%d (key %s) started", label, rh.ID(), rh.Key().Short()) m.reshdlrs.Add(rh) @@ -318,7 +348,11 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m // yes: add path element pp = entry.Path.Clone() pe := pp.NewElement(sender, local, p.Peer) - pp.Add(pe) + if err := m.core.Sign(pe); err != nil { + logger.Printf(logger.ERROR, "[%s] failed to sign path element: %s", label, err.Error()) + } else { + pp.Add(pe) + } } // build updated PUT message msgOut := msg.Update(pp, pf, msg.HopCount+1) @@ -409,8 +443,20 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m key := msg.Query.String() if list, ok := m.reshdlrs.Get(key); ok { for _, rh := range list { - logger.Printf(logger.DBG, "[%s] Result handler task #%d found", label, rh.ID()) + logger.Printf(logger.DBG, "[%s] Result handler task #%d found (receiver %s)", label, rh.ID(), rh.Receiver().Short()) + // check if the handler can really handle the result + if rh.Type() != btype { + // this is another block type, we don't handle it + logger.Printf(logger.DBG, "[%s] Result handler not suitable (%s != %s) -- skipped", label, rh.Type(), btype) + continue + } + /* + if rh.Flags()&enums.DHT_RO_FIND_APPROXIMATE != msg.Flags&enums.DHT_RO_FIND_APPROXIMATE { + logger.Printf(logger.DBG, "[%s] Result handler asked for match, got approx -- ignored", label) + continue + } + */ //-------------------------------------------------------------- // check task list for handler (9.5.2.6) if rh.Flags()&enums.DHT_RO_FIND_APPROXIMATE == 0 && blkKey != nil && !blkKey.Equal(rh.Key()) { @@ -485,6 +531,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m } // we need to cache a new(er) HELLO if isNew { + logger.Printf(logger.INFO, "[%s] caching HELLO from %s", label, sender.Short()) m.rtable.CacheHello(&blocks.HelloBlock{ PeerID: sender, Signature: msg.Signature, @@ -552,7 +599,12 @@ func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk blocks. out.Block = blk.Bytes() out.MsgSize += uint16(len(out.Block)) out.SetPath(pth) - + /* + // DEBUG: + if out.BType == enums.BLOCK_TYPE_TEST { + logger.Printf(logger.DBG, "result message = %s", util.Dump(out, "json")) + } + */ // send message return back.Send(ctx, out) } diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go @@ -56,7 +56,7 @@ type LocalBlockResponder struct { func NewLocalBlockResponder() *LocalBlockResponder { return &LocalBlockResponder{ ch: make(chan blocks.Block), - rf: blocks.NewGenericResultFilter(), + rf: blocks.NewGenericResultFilter(128, util.RndUInt32()), } } @@ -77,6 +77,10 @@ func (lr *LocalBlockResponder) Send(ctx context.Context, msg message.Message) er lr.ch <- blk } else { logger.Println(logger.WARN, "[local] DHT-RESULT block problem: "+err.Error()) + // DEBUG: + logger.Printf(logger.DBG, "[local] btype=%s, expire=%s", res.BType, res.Expire) + logger.Printf(logger.DBG, "[local] block=%s", hex.EncodeToString(res.Block)) + panic("@@@") } }() default: @@ -161,7 +165,7 @@ func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Mod if !ok { logger.Println(logger.WARN, "[dht-discovery] received invalid block data") logger.Printf(logger.DBG, "[dht-discovery] -> %s", hex.EncodeToString(res.Bytes())) - } else { + } else if !hb.PeerID.Equal(m.core.PeerID()) { // cache HELLO block m.rtable.CacheHello(hb) // add sender to routing table @@ -385,7 +389,7 @@ func (m *Module) getHello(label string) (msg *message.DHTP2PHelloMsg, err error) // save for later use m.lastHello = msg - // DEBUG + // DEBUG: var ok bool if ok, err = msg.Verify(m.core.PeerID()); !ok || err != nil { if !ok { diff --git a/src/gnunet/service/dht/path/handling.go b/src/gnunet/service/dht/path/handling.go @@ -43,7 +43,7 @@ type Path struct { NumList uint16 `order:"big"` // number of list entries SplitPos uint16 `order:"big"` // optional split position List []*Entry `size:"NumList"` // list of path entries - LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature + LastSig *util.PeerSignature `opt:"(IsUsed)"` // last hop signature LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id } @@ -90,16 +90,21 @@ func (p *Path) Size() uint { if p.TruncOrigin != nil { size += p.TruncOrigin.Size() } - size += uint(p.NumList) * p.List[0].Size() + if p.NumList > 0 { + size += uint(p.NumList) * p.List[0].Size() + } if p.LastSig != nil { - size += p.LastSig.Size() + p.LastHop.Size() + size += p.LastSig.Size() } return size } // Bytes returns a binary representation func (p *Path) Bytes() []byte { - buf, _ := data.Marshal(p) + buf, err := data.Marshal(p) + if err != nil { + logger.Println(logger.WARN, "[path] failed serialization: "+err.Error()) + } return buf } diff --git a/src/gnunet/service/dht/resulthandler.go b/src/gnunet/service/dht/resulthandler.go @@ -65,11 +65,12 @@ type ResultHandler struct { started util.AbsoluteTime // Timestamp of session start active bool // is the task active? resp transport.Responder // back-channel to deliver result + signer crypto.Signer // signing instance } // NewResultHandler creates an instance from a DHT-GET message and a // result filter instance. -func NewResultHandler(msg *message.DHTP2PGetMsg, rf blocks.ResultFilter, back transport.Responder) *ResultHandler { +func NewResultHandler(msg *message.DHTP2PGetMsg, rf blocks.ResultFilter, back transport.Responder, signer crypto.Signer) *ResultHandler { return &ResultHandler{ id: util.NextID(), key: msg.Query.Clone(), @@ -80,6 +81,7 @@ func NewResultHandler(msg *message.DHTP2PGetMsg, rf blocks.ResultFilter, back tr started: util.AbsoluteTimeNow(), active: true, resp: back, + signer: signer, } } @@ -93,6 +95,16 @@ func (t *ResultHandler) Key() *crypto.HashCode { return t.key } +// Receiver returns the destination peer +func (t *ResultHandler) Receiver() *util.PeerID { + return t.resp.Receiver() +} + +// Type returns the requested block type +func (t *ResultHandler) Type() enums.BlockType { + return t.btype +} + // Flags returns the query flags func (t *ResultHandler) Flags() uint16 { return t.flags @@ -161,7 +173,11 @@ func (t *ResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg pp = pth.Clone() // yes: add path element pe := pp.NewElement(sender, local, rcv) - pp.Add(pe) + if err := t.signer.Sign(pe); err == nil { + logger.Printf(logger.ERROR, "[dht-task-%d] failed to sign path element: %s", t.id, err.Error()) + } else { + pp.Add(pe) + } } // build updated PUT message msg = msg.Update(pp) @@ -213,7 +229,7 @@ func (t *ResultHandlerList) Add(hdlr *ResultHandler) bool { case RHC_MERGE: // merge the two result handlers oldMod := modified - modified = h.Merge(hdlr) || modified + modified = h.Merge(hdlr) logger.Printf(logger.DBG, "[rhl] resultfilter compare: MERGE (%v -- %v)", oldMod, modified) break loop case RHC_REPLACE: diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go @@ -342,7 +342,7 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) { // check if we can/need to drop a peer drop := timeout.Compare(p.lastSeen.Elapsed()) < 0 if drop || timeout.Compare(p.lastUsed.Elapsed()) < 0 { - logger.Printf(logger.DBG, "[dht-rt-hb] removing %v: %v, %v", p, p.lastSeen.Elapsed(), p.lastUsed.Elapsed()) + logger.Printf(logger.DBG, "[dht-rt-hb] removing %s: lastSeen %s, lastUsed %v", p.Peer.Short(), p.lastSeen.Elapsed(), p.lastUsed.Elapsed()) rt.Remove(p, "dht-rt-hb", pid) } return nil @@ -367,29 +367,29 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) { // LookupHello returns blocks from the HELLO cache for given query. func (rt *RoutingTable) LookupHello(addr *PeerAddress, rf blocks.ResultFilter, approx bool, label string) (results []*store.DHTResult) { // iterate over cached HELLOs to find matches; - // approximate search is limited by distance (max. diff for bucket index is 16) + // approximate search is guided by distance + list := store.NewSortedDHTResults(10) _ = rt.helloCache.ProcessRange(func(key string, hb *blocks.HelloBlock, _ int) error { // check if block is excluded by result filter - var result *store.DHTResult if !rf.Contains(hb) { // no: possible result, compute distance p := NewPeerAddress(hb.PeerID) - dist, idx := addr.Distance(p) - result = &store.DHTResult{ - Entry: &store.DHTEntry{ - Blk: hb, - }, - Dist: dist, - } - // check if we need to add result - if (approx && idx < 16) || idx == 0 { - results = append(results, result) + dist, _ := addr.Distance(p) + if pos := list.Accepts(dist); pos != -1 { + result := &store.DHTResult{ + Entry: &store.DHTEntry{ + Blk: hb, + }, + Dist: dist, + } + list.Add(result, pos) } } else { logger.Println(logger.DBG, "[%s] LookupHello: cached HELLO block is filtered") } return nil }, true) + results = list.GetResults() return } diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go @@ -26,6 +26,7 @@ import ( "gnunet/service" "gnunet/service/dht/blocks" "gnunet/service/store" + "gnunet/util" ) //====================================================================== @@ -71,7 +72,7 @@ func (m *Module) Import(fcm map[string]any) { // Get entry from the cache if available. func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { var e []*store.DHTEntry - rf := blocks.NewGenericResultFilter() + rf := blocks.NewGenericResultFilter(128, util.RndUInt32()) if e, err = m.cache.Get("namecache", query, rf); err != nil { return } diff --git a/src/gnunet/service/store/store_dht.go b/src/gnunet/service/store/store_dht.go @@ -22,6 +22,7 @@ import ( "encoding/hex" "fmt" "gnunet/crypto" + "gnunet/enums" "gnunet/service/dht/blocks" "gnunet/service/dht/path" "gnunet/util" @@ -46,6 +47,11 @@ type DHTEntry struct { Path *path.Path // associated put path } +// String returns a human-readable representation +func (e *DHTEntry) String() string { + return fmt.Sprintf("DHTEntry{%s,path=%s}", e.Blk, e.Path) +} + //------------------------------------------------------------ // DHT result is a single DHT result //------------------------------------------------------------ @@ -56,31 +62,62 @@ type DHTResult struct { Dist *math.Int // distance of entry to query key } -//------------------------------------------------------------ +// String returns a human-readable representation +func (r *DHTResult) String() string { + return fmt.Sprintf("DHTResult{%s,dist=%d}", r.Entry, r.Dist.BitLen()) +} + +//---------------------------------------------------------------------- +// Sorted DHT result list +//---------------------------------------------------------------------- + +// SortedDHTResults is a length-limit result list which only adds entries +// if they are "better" than another listed entry. "better" means "less +// distant" from the search key +type SortedDHTResults struct { + list []*DHTResult +} -type DHTResultSet struct { - list []*DHTResult // list of DHT results - pos int // iterator position +// NewSortedDHTResults creates a new sorted result list +func NewSortedDHTResults(n int) *SortedDHTResults { + return &SortedDHTResults{ + list: make([]*DHTResult, n), + } } -func NewDHTResultSet() *DHTResultSet { - return &DHTResultSet{ - list: make([]*DHTResult, 0), - pos: 0, +// Accepts checks if given distance would be inserted into the list +// at pos. If pos < 0 the distance is rejected. +func (rl *SortedDHTResults) Accepts(dist *math.Int) int { + for pos, entry := range rl.list { + if entry == nil || entry.Dist.Cmp(dist) > 0 { + return pos + } } + return -1 } -func (rs *DHTResultSet) Add(r *DHTResult) { - rs.list = append(rs.list, r) +// Add result at given position with sanity check +func (rl *SortedDHTResults) Add(res *DHTResult, pos int) { + // check index + if pos < 0 || pos > len(rl.list)-1 { + return + } + // check entry + entry := rl.list[pos] + if entry == nil || entry.Dist.Cmp(res.Dist) > 0 { + rl.list[pos] = res + } } -func (rs *DHTResultSet) Next() (result *DHTResult) { - if rs.pos == len(rs.list) { - return nil +// GetResults returns the final result list +func (rl *SortedDHTResults) GetResults() []*DHTResult { + out := make([]*DHTResult, 0) + for _, res := range rl.list { + if res != nil { + out = append(out, res) + } } - result = rs.list[rs.pos] - rs.pos++ - return + return out } //------------------------------------------------------------ @@ -167,8 +204,8 @@ func (s *DHTStore) Put(query blocks.Query, entry *DHTEntry) (err error) { expire := entry.Blk.Expire() blkSize := len(entry.Blk.Bytes()) - logger.Printf(logger.INFO, "[dht-store] storing %d bytes @ %s (path %s)", - blkSize, query.Key().Short(), entry.Path) + logger.Printf(logger.INFO, "[dht-store] storing %d bytes @ %s (path %s), expires %s", + blkSize, query.Key().Short(), entry.Path, expire) // write entry to file for storage if err = s.writeEntry(query.Key().Data, entry); err != nil { @@ -233,41 +270,53 @@ func (s *DHTStore) Get(label string, query blocks.Query, rf blocks.ResultFilter) logger.Printf(logger.ERROR, "[%s] can't flag DHT entry as used: %s", label, err) continue } + logger.Printf(logger.INFO, "[dht-store] retrieving %d bytes @ %s (path %s)", + len(entry.Blk.Bytes()), query.Key().Short(), entry.Path) } return } -// GetApprox returns the best-matching value with given key from storage -// that is not excluded +// GetApprox returns the best-matching values with given key from storage +// that are not excluded func (s *DHTStore) GetApprox(label string, query blocks.Query, rf blocks.ResultFilter) (results []*DHTResult, err error) { + btype := query.Type() + + // List of possible results (size limited) + list := NewSortedDHTResults(10) + // iterate over all keys; process each metadata instance - // (append to results if appropriate) process := func(md *FileMetadata) { + // filter by block type + if btype != enums.BLOCK_TYPE_ANY && btype != md.btype { + // block type not matching + return + } // check for filtered block. if rf.ContainsHash(md.bhash) { // filtered out... return } - // check distance (max. 16 bucktes off) + // check distance in result list dist := util.Distance(md.key.Data, query.Key().Data) - if (512 - dist.BitLen()) > 16 { - return - } - // read entry from storage - var entry *DHTEntry - if entry, err = s.readEntry(md); err != nil { - logger.Printf(logger.ERROR, "[%s] failed to retrieve block for %s", label, md.key.String()) - return - } - // add to result list - result := &DHTResult{ - Entry: entry, - Dist: dist, + if pos := list.Accepts(dist); pos != -1 { + + // read entry from storage + var entry *DHTEntry + if entry, err = s.readEntry(md); err != nil { + logger.Printf(logger.ERROR, "[%s] failed to retrieve block for %s", label, md.key.String()) + return + } + // add to result list + result := &DHTResult{ + Entry: entry, + Dist: dist, + } + list.Add(result, pos) } - results = append(results, result) } // traverse mestadata database err = s.meta.Traverse(process) + results = list.GetResults() return } diff --git a/src/gnunet/service/store/store_dht_test.go b/src/gnunet/service/store/store_dht_test.go @@ -63,7 +63,7 @@ func TestDHTFilesStore(t *testing.T) { // allocate keys keys := make([]blocks.Query, 0, fsNumBlocks) // create result filter - rf := blocks.NewGenericResultFilter() + rf := blocks.NewGenericResultFilter(128, 236742) // First round: save blocks btype := enums.BLOCK_TYPE_TEST diff --git a/src/gnunet/transport/reader_writer.go b/src/gnunet/transport/reader_writer.go @@ -20,7 +20,6 @@ package transport import ( "context" - "errors" "fmt" "gnunet/message" "io" @@ -42,10 +41,13 @@ func WriteMessage(ctx context.Context, wrt io.WriteCloser, msg message.Message) return } /* - // debug for outgoing messages - if msg.Type() == enums.MSG_DHT_P2P_HELLO { - logger.Printf(logger.DBG, "[rw_msg] msg=%s", hex.EncodeToString(buf)) - logger.Printf(logger.DBG, "[rw_msg] msg=%s", util.Dump(msg, "json")) + // DEBUG: outgoing messages + if msg.Type() == enums.MSG_DHT_P2P_RESULT { + tmsg, _ := msg.(*message.DHTP2PResultMsg) + if tmsg.BType == enums.BLOCK_TYPE_TEST { + logger.Printf(logger.DBG, "[rw_msg] msg=%s", hex.EncodeToString(buf)) + logger.Printf(logger.DBG, "[rw_msg] msg=%s", util.Dump(msg, "json")) + } } */ // check message header size and packet size @@ -54,7 +56,7 @@ func WriteMessage(ctx context.Context, wrt io.WriteCloser, msg message.Message) return } if len(buf) != int(mh.MsgSize) { - return errors.New("WriteMessage: message size mismatch") + return fmt.Errorf("WriteMessage: message size mismatch (%d != %d)", len(buf), mh.MsgSize) } // perform write operation var n int @@ -116,7 +118,7 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser, buf []byte) (msg messag } err = data.Unmarshal(msg, buf[:mh.MsgSize]) /* - // debug for incoming messages + // DEBUG: incoming messages if mh.MsgType == enums.MSG_DHT_P2P_RESULT { logger.Printf(logger.DBG, "[rw_msg] msg=%s", hex.EncodeToString(buf[:mh.MsgSize])) logger.Printf(logger.DBG, "[rw_msg] msg=%s", util.Dump(msg, "json")) diff --git a/src/gnunet/util/peer.go b/src/gnunet/util/peer.go @@ -95,6 +95,9 @@ func (p *PeerID) String() string { // SHort returns a shortened peer id for display func (p *PeerID) Short() string { + if p == nil { + return "local" + } return p.String()[:8] }