Source code for bitcoincash.electrum.client

#
# Client connect to an Electrum server.
#

# Runtime check for optional modules
from importlib import util as importutil
import json, warnings, asyncio, ssl
from .protocol import StratumProtocol
from .svr_info import ServerInfo

# Check if aiosocks is present, and load it if it is.
if importutil.find_spec("aiosocks") is not None:
    import aiosocks
    have_aiosocks = True
else:
    have_aiosocks = False

from collections import defaultdict
from .exc import ElectrumErrorResponse
import logging

logger = logging.getLogger(__name__)

[docs]class StratumClient: def __init__(self, loop=None): ''' Setup state needed to handle req/resp from a single Stratum server. Requires a transport (TransportABC) object to do the communication. ''' self.protocol = None self.next_id = 1 self.inflight = {} self.subscriptions = defaultdict(list) self.actual_connection = {} self.ka_task = None self.loop = loop or asyncio.get_event_loop() self.reconnect = None # call connect() first # next step: call connect() def _connection_lost(self, protocol): # Ignore connection_lost for old connections if protocol is not self.protocol: return self.protocol = None logger.warn("Electrum server connection lost") # cleanup keep alive task if self.ka_task: self.ka_task.cancel() self.ka_task = None
[docs] async def close(self): if self.protocol: self.protocol.close() # give the transport a moment to close await asyncio.sleep(0.1) self.protocol = None if self.ka_task: self.ka_task.cancel() try: await self.ka_task except asyncio.CancelledError: pass self.ka_task = None
[docs] async def connect(self, server_info=None, proto_code=None, *, use_tor=False, disable_cert_verify=False, proxy=None, short_term=False): ''' Start connection process. Destination must be specified in a ServerInfo() record (first arg). ''' if server_info is None: server_info = ServerInfo.from_default() self.server_info = server_info if not proto_code: proto_code,*_ = server_info.protocols self.proto_code = proto_code logger.debug("Connecting to: %r" % server_info) if proto_code == 'g': # websocket # to do this, we'll need a websockets implementation that # operates more like a asyncio.Transport # maybe: `asyncws` or `aiohttp` raise NotImplementedError('sorry no WebSocket transport yet') hostname, port, use_ssl = server_info.get_port(proto_code) if use_tor: if have_aiosocks: # Connect via Tor proxy proxy, assumed to be on localhost:9050 # unless a tuple is given with another host/port combo. try: socks_host, socks_port = use_tor except TypeError: socks_host, socks_port = 'localhost', 9050 # basically no-one has .onion SSL certificates, and # pointless anyway. disable_cert_verify = True assert not proxy, "Sorry not yet supporting proxy->tor->dest" logger.debug(" .. using TOR") proxy = aiosocks.Socks5Addr(socks_host, int(socks_port)) else: logger.debug("Error: want to use tor, but no aiosocks module.") if use_ssl == True and disable_cert_verify: # Create a more liberal SSL context that won't # object to self-signed certicates. This is # very bad on public Internet, but probably ok # over Tor use_ssl = ssl.create_default_context() use_ssl.check_hostname = False use_ssl.verify_mode = ssl.CERT_NONE logger.debug(" .. SSL cert check disabled") async def _reconnect(): if self.protocol: return # race/duplicate work if proxy: if have_aiosocks: transport, protocol = await aiosocks.create_connection( StratumProtocol, proxy=proxy, proxy_auth=None, remote_resolve=True, ssl=use_ssl, dst=(hostname, port)) else: logger.debug("Error: want to use proxy, but no aiosocks module.") else: transport, protocol = await self.loop.create_connection( StratumProtocol, host=hostname, port=port, ssl=use_ssl) self.protocol = protocol protocol.client = self # capture actual values used self.actual_connection = dict(hostname=hostname, port=int(port), ssl=bool(use_ssl), tor=bool(proxy)) self.actual_connection['ip_addr'] = transport.get_extra_info('peername', default=['unknown'])[0] if not short_term: self.ka_task = self.loop.create_task(self._keepalive()) logger.debug("Connected to: %r" % server_info) # close whatever we had if self.protocol: self.protocol.close() self.protocol = None self.reconnect = _reconnect await self.reconnect()
async def _keepalive(self): ''' Keep our connect to server alive forever, with some pointless traffic. ''' while self.protocol: vers = await self.RPC('server.version') logger.debug("Server version: " + repr(vers)) await asyncio.sleep(100) def _send_request(self, method, params=[], is_subscribe = False): ''' Send a new request to the server. Serialized the JSON and tracks id numbers and optional callbacks. ''' # pick a new ID self.next_id += 1 req_id = self.next_id # serialize as JSON msg = {'id': req_id, 'method': method, 'params': params} # subscriptions are a Q, normal requests are a future if is_subscribe: waitQ = asyncio.Queue() self.subscriptions[method].append(waitQ) fut = asyncio.Future(loop=self.loop) self.inflight[req_id] = (msg, fut) # send it via the transport, which serializes it if not self.protocol: logger.debug("Need to reconnect to server") async def connect_first(): await self.reconnect() self.protocol.send_data(msg) self.loop.create_task(connect_first()) else: # typical case, send request immediatedly, response is a future self.protocol.send_data(msg) return fut if not is_subscribe else (fut, waitQ) def _got_response(self, msg): ''' Decode and dispatch responses from the server. Has already been unframed and deserialized into an object. ''' #logger.debug("MSG: %r" % msg) resp_id = msg.get('id', None) if resp_id is None: # subscription traffic comes with method set, but no req id. method = msg.get('method', None) if not method: logger.error("Incoming server message had no ID nor method in it", msg) return # not obvious, but result is on params, not result, for subscriptions result = msg.get('params', None) logger.debug("Traffic on subscription: %s" % method) subs = self.subscriptions.get(method) for q in subs: self.loop.create_task(q.put(result)) return assert 'method' not in msg result = msg.get('result') # fetch and forget about the request inf = self.inflight.pop(resp_id) if not inf: logger.error("Incoming server message had unknown ID in it: %s" % resp_id) return # it's a future which is done now req, rv = inf if 'error' in msg: err = msg['error'] logger.info("Error response: '%s'" % err) rv.set_exception(ElectrumErrorResponse(err, req)) else: rv.set_result(result)
[docs] def RPC(self, method, *params): ''' Perform a remote command. Expects a method name, which look like: blockchain.address.get_balance .. and sometimes take arguments, all of which are positional. Returns a future which will you should await for the result from the server. Failures are returned as exceptions. ''' assert '.' in method #assert not method.endswith('subscribe') return self._send_request(method, params)
[docs] def subscribe(self, method, *params): ''' Perform a remote command which will stream events/data to us. Expects a method name, which look like: server.peers.subscribe .. and sometimes take arguments, all of which are positional. Returns a tuple: (Future, asyncio.Queue). The future will have the result of the initial call, and the queue will receive additional responses as they happen. ''' assert '.' in method assert method.endswith('subscribe') return self._send_request(method, params, is_subscribe=True)
if __name__ == '__main__': #from transport import SocketTransport from svr_info import KnownServers, ServerInfo logging.basicConfig(format="%(asctime)-11s %(message)s", datefmt="[%d/%m/%Y-%H:%M:%S]") loop = asyncio.get_event_loop() loop.set_debug(True) c = StratumClient(loop=loop) loop.run_until_complete(c.connect()) rv = loop.run_until_complete(c.RPC('server.peers.subscribe')) print("DONE!: this server has %d peers" % len(rv)) loop.run_until_complete(c.close()) loop.close()