mirror of https://github.com/bmuller/kademlia.git
Merge pull request #3 from cpacia/transfer
Bug fixes when transferring key/values
This commit is contained in:
commit
00ecaf5e31
|
@ -32,12 +32,12 @@ class KademliaProtocol(RPCProtocol):
|
||||||
|
|
||||||
def rpc_ping(self, sender, nodeid):
|
def rpc_ping(self, sender, nodeid):
|
||||||
source = Node(nodeid, sender[0], sender[1])
|
source = Node(nodeid, sender[0], sender[1])
|
||||||
self.router.addContact(source)
|
self.welcomeIfNewNode(source)
|
||||||
return self.sourceNode.id
|
return self.sourceNode.id
|
||||||
|
|
||||||
def rpc_store(self, sender, nodeid, key, value):
|
def rpc_store(self, sender, nodeid, key, value):
|
||||||
source = Node(nodeid, sender[0], sender[1])
|
source = Node(nodeid, sender[0], sender[1])
|
||||||
self.router.addContact(source)
|
self.welcomeIfNewNode(source)
|
||||||
self.log.debug("got a store request from %s, storing value" % str(sender))
|
self.log.debug("got a store request from %s, storing value" % str(sender))
|
||||||
self.storage[key] = value
|
self.storage[key] = value
|
||||||
return True
|
return True
|
||||||
|
@ -45,13 +45,13 @@ class KademliaProtocol(RPCProtocol):
|
||||||
def rpc_find_node(self, sender, nodeid, key):
|
def rpc_find_node(self, sender, nodeid, key):
|
||||||
self.log.info("finding neighbors of %i in local table" % long(nodeid.encode('hex'), 16))
|
self.log.info("finding neighbors of %i in local table" % long(nodeid.encode('hex'), 16))
|
||||||
source = Node(nodeid, sender[0], sender[1])
|
source = Node(nodeid, sender[0], sender[1])
|
||||||
self.router.addContact(source)
|
self.welcomeIfNewNode(source)
|
||||||
node = Node(key)
|
node = Node(key)
|
||||||
return map(tuple, self.router.findNeighbors(node, exclude=source))
|
return map(tuple, self.router.findNeighbors(node, exclude=source))
|
||||||
|
|
||||||
def rpc_find_value(self, sender, nodeid, key):
|
def rpc_find_value(self, sender, nodeid, key):
|
||||||
source = Node(nodeid, sender[0], sender[1])
|
source = Node(nodeid, sender[0], sender[1])
|
||||||
self.router.addContact(source)
|
self.welcomeIfNewNode(source)
|
||||||
value = self.storage.get(key, None)
|
value = self.storage.get(key, None)
|
||||||
if value is None:
|
if value is None:
|
||||||
return self.rpc_find_node(sender, nodeid, key)
|
return self.rpc_find_node(sender, nodeid, key)
|
||||||
|
@ -77,9 +77,10 @@ class KademliaProtocol(RPCProtocol):
|
||||||
d = self.store(address, self.sourceNode.id, key, value)
|
d = self.store(address, self.sourceNode.id, key, value)
|
||||||
return d.addCallback(self.handleCallResponse, nodeToAsk)
|
return d.addCallback(self.handleCallResponse, nodeToAsk)
|
||||||
|
|
||||||
def transferKeyValues(self, node):
|
def welcomeIfNewNode(self, node):
|
||||||
"""
|
"""
|
||||||
Given a new node, send it all the keys/values it should be storing.
|
Given a new node, send it all the keys/values it should be storing,
|
||||||
|
then add it to the routing table.
|
||||||
|
|
||||||
@param node: A new node that just joined (or that we just found out
|
@param node: A new node that just joined (or that we just found out
|
||||||
about).
|
about).
|
||||||
|
@ -90,16 +91,18 @@ class KademliaProtocol(RPCProtocol):
|
||||||
is closer than the closest in that list, then store the key/value
|
is closer than the closest in that list, then store the key/value
|
||||||
on the new node (per section 2.5 of the paper)
|
on the new node (per section 2.5 of the paper)
|
||||||
"""
|
"""
|
||||||
ds = []
|
if self.router.isNewNode(node):
|
||||||
for key, value in self.storage.iteritems():
|
ds = []
|
||||||
keynode = Node(digest(key))
|
for key, value in self.storage.iteritems():
|
||||||
neighbors = self.router.findNeighbors(keynode)
|
keynode = Node(digest(key))
|
||||||
if len(neighbors) > 0:
|
neighbors = self.router.findNeighbors(keynode)
|
||||||
newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode)
|
if len(neighbors) > 0:
|
||||||
thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode)
|
newNodeClose = node.distanceTo(keynode) < neighbors[-1].distanceTo(keynode)
|
||||||
if len(neighbors) == 0 or (newNodeClose and thisNodeClosest):
|
thisNodeClosest = self.sourceNode.distanceTo(keynode) < neighbors[0].distanceTo(keynode)
|
||||||
ds.append(self.callStore(node, key, value))
|
if len(neighbors) == 0 or (newNodeClose and thisNodeClosest):
|
||||||
return defer.gatherResults(ds)
|
ds.append(self.callStore(node, key, value))
|
||||||
|
self.router.addContact(node)
|
||||||
|
return defer.gatherResults(ds)
|
||||||
|
|
||||||
def handleCallResponse(self, result, node):
|
def handleCallResponse(self, result, node):
|
||||||
"""
|
"""
|
||||||
|
@ -108,9 +111,7 @@ class KademliaProtocol(RPCProtocol):
|
||||||
"""
|
"""
|
||||||
if result[0]:
|
if result[0]:
|
||||||
self.log.info("got response from %s, adding to router" % node)
|
self.log.info("got response from %s, adding to router" % node)
|
||||||
self.router.addContact(node)
|
self.welcomeIfNewNode(node)
|
||||||
if self.router.isNewNode(node):
|
|
||||||
self.transferKeyValues(node)
|
|
||||||
else:
|
else:
|
||||||
self.log.debug("no response from %s, removing from router" % node)
|
self.log.debug("no response from %s, removing from router" % node)
|
||||||
self.router.removeContact(node)
|
self.router.removeContact(node)
|
||||||
|
|
Loading…
Reference in New Issue