#
# Store information about servers. Filter and select based on their protocol support, etc.
#
# Runtime check for optional modules
from importlib import util as importutil
# Check if bottom is present.
if importutil.find_spec("bottom") is not None:
have_bottom = True
else:
have_bottom = False
import time, random, json
from .constants import DEFAULT_PORTS, BOOTSTRAP_SERVERS, DEFAULT_SERVER
[docs]class ServerInfo(dict):
'''
Information to be stored on a server. Originally based on IRC data published to a channel.
'''
FIELDS = ['nickname', 'hostname', 'ports', 'version', 'pruning_limit' ]
def __init__(self, nickname_or_dict, hostname=None, ports=None,
version=None, pruning_limit=None, ip_addr=None):
if not hostname and not ports:
# promote a dict, or similar
super(ServerInfo, self).__init__(nickname_or_dict)
return
self['nickname'] = nickname_or_dict or None
self['hostname'] = hostname
self['ip_addr'] = ip_addr or None
# For 'ports', take
# - a number (int), assumed to be TCP port, OR
# - a list of codes, OR
# - a string to be split apart.
# Keep version and pruning limit separate
#
if isinstance(ports, int):
ports = ['t%d' % ports]
elif isinstance(ports, str):
ports = ports.split()
# check we don't have junk in the ports list
for p in ports.copy():
if p[0] == 'v':
version = p[1:]
ports.remove(p)
elif p[0] == 'p':
try:
pruning_limit = int(p[1:])
except ValueError:
# ignore junk
pass
ports.remove(p)
assert ports, "Must have at least one port/protocol"
self['ports'] = ports
self['version'] = version
self['pruning_limit'] = int(pruning_limit or 0)
[docs] @classmethod
def from_response(cls, response_list):
# Create a list of servers based on data from response from Stratum.
# Give this the response to: "server.peers.subscribe"
#
# [...
# "91.63.237.12",
# "electrum3.hachre.de",
# [ "v1.0", "p10000", "t", "s" ]
# ...]
#
rv = []
for params in response_list:
ip_addr, hostname, ports = params
if ip_addr == hostname:
ip_addr = None
rv.append(ServerInfo(None, hostname, ports, ip_addr=ip_addr))
return rv
[docs] @classmethod
def from_dict(cls, d):
n = d.pop('nickname', None)
h = d.pop('hostname')
p = d.pop('ports')
rv = cls(n, h, p)
rv.update(d)
return rv
[docs] @classmethod
def from_default(cls):
return cls(
DEFAULT_SERVER,
DEFAULT_SERVER,
BOOTSTRAP_SERVERS[DEFAULT_SERVER])
@property
def protocols(self):
rv = set(i[0] for i in self['ports'])
assert 'p' not in rv, 'pruning limit got in there'
assert 'v' not in rv, 'version got in there'
return rv
@property
def pruning_limit(self):
return self.get('pruning_limit', 100)
@property
def hostname(self):
return self.get('hostname')
[docs] def get_port(self, for_protocol):
'''
Return (hostname, port number, ssl) pair for the protocol.
Assuming only one port per host.
'''
assert len(for_protocol) == 1, "expect single letter code"
use_ssl = for_protocol in ('s', 'g')
if 'port' in self: return self['hostname'], int(self['port']), use_ssl
rv = next(i for i in self['ports'] if i[0] == for_protocol)
port = None
if len(rv) >= 2:
try:
port = int(rv[1:])
except:
pass
port = port or DEFAULT_PORTS[for_protocol]
return self['hostname'], port, use_ssl
@property
def is_onion(self):
return self['hostname'].lower().endswith('.onion')
[docs] def select(self, protocol='s', is_onion=None, min_prune=0):
# predicate function for selection based on features/properties
return ((protocol in self.protocols)
and (self.is_onion == is_onion if is_onion is not None else True)
and (self.pruning_limit >= min_prune))
def __repr__(self):
return '<ServerInfo {hostname} nick={nickname} ports="{ports}" v={version} prune={pruning_limit}>'\
.format(**self)
def __str__(self):
# used as a dict key in a few places.
return self['hostname'].lower()
def __hash__(self):
# this one-line allows use as a set member, which is really handy!
return hash(self['hostname'].lower())
[docs]class KnownServers(dict):
'''
Store a list of known servers and their port numbers, etc.
- can add single entries
- can read from a CSV for seeding/bootstrap
- can read from IRC channel to find current hosts
We are a dictionary, with key being the hostname (in lowercase) of the server.
'''
[docs] def from_json(self, fname):
'''
Read contents of a CSV containing a list of servers.
'''
with open(fname, 'rt') as fp:
for row in json.load(fp):
nn = ServerInfo.from_dict(row)
self[str(nn)] = nn
[docs] def add_single(self, hostname, ports, nickname=None, **kws):
'''
Explicitly add a single entry.
Hostname is a FQDN and ports is either a single int (assumed to be TCP port)
or Electrum protocol/port number specification with spaces in between.
'''
nickname = nickname or hostname
self[hostname.lower()] = ServerInfo(nickname, hostname, ports, **kws)
[docs] def add_peer_response(self, response_list):
# Update with response from Stratum (lacks the nickname value tho):
#
# "91.63.237.12",
# "electrum3.hachre.de",
# [ "v1.0", "p10000", "t", "s" ]
#
additions = set()
for params in response_list:
ip_addr, hostname, ports = params
if ip_addr == hostname:
ip_addr = None
g = self.get(hostname.lower())
nickname = g['nickname'] if g else None
here = ServerInfo(nickname, hostname, ports, ip_addr=ip_addr)
self[str(here)] = here
if not g:
additions.add(str(here))
return additions
[docs] def save_json(self, fname='servers.json'):
'''
Write out to a CSV file.
'''
rows = sorted(self.keys())
with open(fname, 'wt') as fp:
json.dump([self[k] for k in rows], fp, indent=1)
[docs] def dump(self):
return '\n'.join(repr(i) for i in self.values())
[docs] def select(self, **kws):
'''
Find all servers with indicated protocol support. Shuffled.
Filter by TOR support, and pruning level.
'''
lst = [i for i in self.values() if i.select(**kws)]
random.shuffle(lst)
return lst
if __name__ == '__main__':
ks = KnownServers()
#ks.from_json('servers.json')
#print (ks.dump())
from constants import PROTOCOL_CODES
print ("%3d: servers in total" % len(ks))
for tor in [False, True]:
for pp in PROTOCOL_CODES.keys():
ll = ks.select(pp, is_onion=tor)
print ("%3d: %s" % (len(ll), PROTOCOL_CODES[pp] + (' [TOR]' if tor else '')))
# EOF