Do not use bind for stun. bind may cause windows firewall popup

This commit is contained in:
Oleksii Shevchuk 2018-09-24 15:59:05 +03:00
parent d7e7f4a2bc
commit 4aff6bbcb9
1 changed files with 24 additions and 29 deletions

View File

@ -27,8 +27,6 @@ STUN_SERVERS = (
DEFAULTS = {
'stun_port': 3478,
'source_ip': '0.0.0.0',
'source_port': 54320
}
# stun attributes
@ -107,7 +105,7 @@ def gen_tran_id():
# return binascii.a2b_hex(a)
return a
def stun_test(sock, host, port, source_ip, source_port, send_data=""):
def stun_test(sock, host, port, send_data=""):
retVal = {'Resp': False, 'ExternalIP': None, 'ExternalPort': None,
'SourceIP': None, 'SourcePort': None, 'ChangedIP': None,
'ChangedPort': None}
@ -116,6 +114,7 @@ def stun_test(sock, host, port, source_ip, source_port, send_data=""):
str_data = ''.join([BindRequestMsg, str_len, tranid, send_data])
data = binascii.a2b_hex(str_data)
recvCorr = False
soure_ip, source_port = None, None
while not recvCorr:
recieved = False
count = 3
@ -123,20 +122,22 @@ def stun_test(sock, host, port, source_ip, source_port, send_data=""):
log.debug("sendto: %s", (host, port))
try:
sock.sendto(data, (host, port))
source_ip, source_port = sock.getsockname()
except socket.gaierror:
retVal['Resp'] = False
return retVal
return soure_ip, source_port, retVal
try:
buf, addr = sock.recvfrom(2048)
log.debug("recvfrom: %s", addr)
recieved = True
except Exception:
except Exception, e:
print e
recieved = False
if count > 0:
count -= 1
else:
retVal['Resp'] = False
return retVal
return soure_ip, source_port, retVal
msgtype = binascii.b2a_hex(buf[0:2])
bind_resp_msg = dictValToMsgType[msgtype] == "BindResponseMsg"
tranid_match = tranid.upper() == binascii.b2a_hex(buf[4:20]).upper()
@ -184,21 +185,21 @@ def stun_test(sock, host, port, source_ip, source_port, send_data=""):
base = base + 4 + attr_len
len_remain = len_remain - (4 + attr_len)
# s.close()
return retVal
return soure_ip, source_port, retVal
def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478, only_ip=False):
def get_nat_type(s, stun_host=None, stun_port=3478, only_ip=False):
_initialize()
port = stun_port
log.debug("Do Test1")
resp = False
if stun_host:
ret = stun_test(s, stun_host, port, source_ip, source_port)
source_ip, source_port, ret = stun_test(s, stun_host, port)
resp = ret['Resp']
else:
for stun_host in STUN_SERVERS:
log.debug('Trying STUN host: %s', stun_host)
ret = stun_test(s, stun_host, port, source_ip, source_port)
source_ip, source_port, ret = stun_test(s, stun_host, port)
resp = ret['Resp']
if resp:
break
@ -221,8 +222,7 @@ def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478, only
if ret['ExternalIP'] == source_ip:
changeRequest = ''.join([ChangeRequest, '0004', "00000006"])
ret = stun_test(s, stun_host, port, source_ip, source_port,
changeRequest)
source_ip, source_port, ret = stun_test(s, stun_host, port, changeRequest)
if ret['Resp']:
typ = OpenInternet
else:
@ -231,15 +231,14 @@ def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478, only
else:
changeRequest = ''.join([ChangeRequest, '0004', "00000006"])
log.debug("Do Test2")
ret = stun_test(s, stun_host, port, source_ip, source_port,
changeRequest)
source_ip, source_port, ret = stun_test(s, stun_host, port, changeRequest)
log.debug("Result: %s", ret)
if ret['Resp']:
typ = FullCone
else:
log.debug("Do Test1")
ret = stun_test(s, changedIP, changedPort, source_ip, source_port)
source_ip, source_port, ret = stun_test(s, changedIP, changedPort)
log.debug("Result: %s", ret)
if not ret['Resp']:
typ = ChangedAddressError
@ -250,8 +249,7 @@ def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478, only
])
log.debug("Do Test3")
ret = stun_test(s, changedIP, port, source_ip, source_port,
changePortRequest)
source_ip, source_port, ret = stun_test(s, changedIP, port, changePortRequest)
log.debug("Result: %s", ret)
if ret['Resp']:
@ -264,35 +262,32 @@ def get_nat_type(s, source_ip, source_port, stun_host=None, stun_port=3478, only
return typ, ret
def get_ip_info(source_ip="0.0.0.0", source_port=54320, stun_host=None,
stun_port=3478):
def get_ip_info(stun_host=None, stun_port=3478):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.settimeout(2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((source_ip, source_port))
nat_type, nat = get_nat_type(s, source_ip, source_port,
stun_host=stun_host, stun_port=stun_port)
external_ip = nat['ExternalIP']
external_port = nat['ExternalPort']
nat_type, nat = get_nat_type(s, stun_host=stun_host, stun_port=stun_port)
external_ip = None
external_port = None
if nat_type != Blocked:
external_ip = nat['ExternalIP']
external_port = nat['ExternalPort']
return (nat_type, external_ip, external_port)
finally:
s.close()
def get_ip(source_ip="0.0.0.0", source_port=54320, stun_host=None,
stun_port=3478):
def get_ip(stun_host=None, stun_port=3478):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.settimeout(2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((source_ip, source_port))
return get_nat_type(
s, source_ip, source_port,
stun_host=stun_host, stun_port=stun_port, only_ip=True)
s, stun_host=stun_host, stun_port=stun_port, only_ip=True)
finally:
s.close()