#!/usr/bin/python2 # J. Welsh, December 2019 from os import getenv, open as os_open, O_RDONLY, O_WRONLY, mkdir, mkfifo, read, write, close, stat from stat import S_ISDIR, S_ISFIFO from sys import argv, stdin, stdout, stderr, exit from socket import socket from threading import Thread, Event from binascii import a2b_hex, b2a_hex from base64 import b64encode from struct import Struct from hashlib import sha256 as _sha256 from decimal import Decimal from inspect import getdoc from getpass import getpass import errno import signal import string import json import sqlite3 from sqlite3 import IntegrityError # Safety level: scanning stops this many blocks behind tip CONFIRMS = 6 # There's no provision for handling forks/reorgs. In the event of one deeper than CONFIRMS, a heavy workaround would be: # $ sqlite3 ~/.gbw/db # sqlite> DELETE FROM output; # sqlite> DELETE FROM input; # sqlite> DELETE FROM tx; # sqlite> .exit # $ gbw-node reset # $ gbw-node scan gbw_home = getenv('HOME') + '/.gbw' bitcoin_conf_path = getenv('HOME') + '/.bitcoin/bitcoin.conf' # Further knobs in main() for database tuning. db = None b2lx = lambda b: b2a_hex(b[::-1]) lx2b = lambda x: a2b_hex(x)[::-1] COIN = 10**8 def format_coin(v): neg = False if v < 0: v = -v neg = True s = '%d.%08d' % divmod(v, COIN) if neg: return '-' + s return s u16 = Struct('> 8 data_bytes.extend([0] * leading_zeros) return ''.join(chr(b) for b in reversed(data_bytes)) def a2b_base58check(data): data = a2b_base58(data) payload = data[:-4] check = data[-4:] if check != sha256d(payload)[:4]: raise BadChecksum return payload class BadAddressLength(ValueError): pass class BadAddressVersion(ValueError): pass def parse_address(a): b = a2b_base58check(a) if len(b) != 21: raise BadAddressLength if b[0] != '\x00': raise BadAddressVersion(ord(b[0])) return b[1:] def format_address(b): return b2a_base58check('\x00' + b) ################################################## # Common database operations def get_address_id(a): r = db.execute('SELECT address_id FROM address WHERE address=?', (buffer(a),)).fetchone() return None if r is None else r[0] def insert_or_get_address_id(a): try: return db.execute('INSERT INTO address (address) VALUES (?)', (buffer(a),)).lastrowid except IntegrityError: return get_address_id(a) def get_tx_id(hash): r = db.execute('SELECT tx_id FROM tx WHERE hash=?', (buffer(hash),)).fetchone() return None if r is None else r[0] def insert_or_get_tx_id(hash, blkhash, height, n, size): try: return db.execute('INSERT INTO tx (hash, block_hash, block_height, n, size) VALUES (?,?,?,?,?)', (buffer(hash), buffer(blkhash), height, n, size)).lastrowid except IntegrityError: r = db.execute('SELECT tx_id, block_hash, block_height, n, size FROM tx WHERE hash=?', (buffer(hash),)).fetchone() if r is None: raise Conflict('differing transactions with same (height, index)', (height, n), hash) if r[1:] != (buffer(blkhash), height, n, size): raise Conflict('differing transactions with same hash', hash, (blkhash, height, n, size), r[1:]) return r[0] def insert_output(tx_id, n, addr_id, val): try: db.execute('INSERT INTO output (tx_id, n, address_id, value) VALUES (?,?,?,?)', (tx_id, n, addr_id, val)) except IntegrityError: r = db.execute('SELECT address_id, value FROM output WHERE tx_id=? AND n=?', (tx_id, n)).fetchone() if r != (addr_id, val): raise Conflict('output differs from previous content', tx_id, n, (addr_id, val), r) def insert_input(tx_id, n, prevout_id): try: input_id = db.execute('INSERT INTO input (tx_id, n) VALUES (?,?)', (tx_id, n)).lastrowid except IntegrityError: input_id = db.execute('SELECT input_id FROM input WHERE tx_id=? AND n=?', (tx_id, n)).fetchone()[0] db.execute('UPDATE output SET spent=? WHERE output_id=?', (input_id, prevout_id)) def get_output_id(tx_id, n): r = db.execute('SELECT output_id FROM output WHERE tx_id=? AND n=?', (tx_id, n)).fetchone() return None if r is None else r[0] def get_tag_id(name): r = db.execute('SELECT tag_id FROM tag WHERE name=?', (name,)).fetchone() return None if r is None else r[0] def insert_or_get_tag_id(name): try: return db.execute('INSERT INTO tag (name) VALUES (?)', (name,)).lastrowid except IntegrityError: return get_tag_id(name) ################################################## # Command implementations def scan_block(height, v): stdout.write('block %s' % height) (blkhash, prev, time, target, txs), size = load_block(v) count_out = 0 n_tx = 0 for (hash, size, txins, txouts) in txs: matched_outs = [] for n, txout in enumerate(txouts): val, script = txout a = out_script_address(script) if a is not None: #print format_address(a) addr_id = get_address_id(a) if addr_id is not None: matched_outs.append((n, addr_id, val)) if len(matched_outs) > 0: tx_id = insert_or_get_tx_id(hash, blkhash, height, n_tx, size) for n, addr_id, val in matched_outs: insert_output(tx_id, n, addr_id, val) count_out += len(matched_outs) n_tx += 1 stdout.write(' new-outs %s' % count_out) # Inputs scanned second in case an output from the same block is spent. # Coinbase (input of first tx in block) doesn't reference anything. count_in = 0 n_tx = 1 for (hash, size, txins, txouts) in txs[1:]: matched_ins = [] for n, txin in enumerate(txins): prevout_hash, prevout_n, scriptsig = txin prevout_tx_id = get_tx_id(prevout_hash) if prevout_tx_id is not None: prevout_id = get_output_id(prevout_tx_id, prevout_n) if prevout_id is not None: matched_ins.append((n, prevout_id)) if len(matched_ins) > 0: tx_id = insert_or_get_tx_id(hash, blkhash, height, n_tx, size) for n, prevout_id in matched_ins: insert_input(tx_id, n, prevout_id) count_in += len(matched_ins) n_tx += 1 stdout.write(' spent-outs %s\n' % count_in) def die(msg, help=False): stderr.write('gbw-node: %s\n' % msg) if help: cmd_help([]) exit(-1) def require_tag(name): i = get_tag_id(name) if i is None: die('tag not found: %r' % name) return i def cmd_scan(argv): ''' scan Iterate blocks from bitcoind, indexing transaction inputs and outputs affecting watched addresses. May be safely interrupted and resumed. NOT PRESENTLY SAFE TO RUN CONCURRENT INSTANCES due to the dumpblock to named pipe kludge. ''' db.execute('PRAGMA synchronous=NORMAL') height = db.execute('SELECT scan_height FROM state').fetchone()[0] # RPC calls can have high latency, so refresh the goal height upon reaching the previous rather than for each block scanned. last_blockcount = -1 while True: blockcount = max(-1, rpc('getblockcount') - CONFIRMS) if blockcount == last_blockcount: break last_blockcount = blockcount while height < blockcount: height += 1 scan_block(height, memoryview(getblock(height))) db.execute('UPDATE state SET scan_height = ?', (height,)) db.commit() def cmd_reset(argv): ''' reset Reset the scan pointer so the next scan will proceed from the genesis block, to find transactions associated with newly watched addresses. ''' db.execute('UPDATE state SET scan_height = -1') db.commit() def cmd_tags(argv): ''' tags [ADDRESS] List tags for the given ADDRESS (or all tags). ''' if len(argv) > 0: addr_id = get_address_id(parse_address(argv.pop(0))) if addr_id is None: die('address not found: %r' % name) r = db.execute('SELECT name FROM tag \ JOIN address_tag ON tag.tag_id=address_tag.tag_id \ WHERE address_id=? ORDER BY name', (addr_id,)) else: r = db.execute('SELECT name FROM tag ORDER BY name') for name, in r: stdout.write(name + '\n') def cmd_addresses(argv): ''' addresses [TAG] List addresses with the given TAG (or all watched addresses). ''' if len(argv) > 0: tag_id = require_tag(argv.pop(0)) r = db.execute('SELECT address FROM address \ JOIN address_tag ON address.address_id=address_tag.address_id \ WHERE tag_id=? ORDER BY address', (tag_id,)) else: r = db.execute('SELECT address FROM address ORDER BY address') for a, in r: stdout.write(format_address(str(a)) + '\n') def cmd_unspent_outs(argv): ''' unspent-outs [TAG] Display the unspent outputs table for addresses with the given TAG (or all watched addresses), as required by the offline wallet, ordered by age. ''' if len(argv) > 0: tag_id = require_tag(argv.pop(0)) r = db.execute('SELECT address, value, hash, output.n, block_height, tx.n FROM output \ JOIN address ON output.address_id = address.address_id \ JOIN tx ON output.tx_id = tx.tx_id \ JOIN address_tag ON output.address_id = address_tag.address_id \ WHERE spent IS NULL AND tag_id=? \ ORDER BY block_height DESC', (tag_id,)) else: r = db.execute('SELECT address, value, hash, output.n, block_height, tx.n FROM output \ JOIN address ON output.address_id = address.address_id \ JOIN tx ON output.tx_id = tx.tx_id \ WHERE spent IS NULL \ ORDER BY block_height DESC') for a, v, hash, n_out, height, n_tx in r: stdout.write('%s %s %s %s #blk %s tx %s\n' % (format_address(str(a)), format_coin(v), b2lx(hash), n_out, height, n_tx)) def cmd_balance(argv): ''' balance [TAG] Display confirmed balance of addresses with the given TAG (or all watched addresses). ''' if len(argv) > 0: tag_id = require_tag(argv.pop(0)) r = db.execute('SELECT COALESCE(SUM(value),0) FROM output \ JOIN address_tag ON output.address_id = address_tag.address_id \ WHERE spent IS NULL AND tag_id=?', (tag_id,)) else: r = db.execute('SELECT COALESCE(SUM(value),0) FROM output WHERE spent IS NULL') bal, = r.fetchone() stdout.write('%s\n' % format_coin(bal)) def cmd_register(argv): ''' register [TAG] Display a tab-delimited transaction register report for addresses with the given TAG (or all watched addresses). Columns are: - confirmation block height - number of transaction within block - total deposits (new outputs) - total withdrawals (spent outputs) - running balance ''' if len(argv) > 0: tag_id = require_tag(argv.pop(0)) outs = db.execute('SELECT block_height, tx.n, COALESCE(SUM(value),0) FROM tx \ JOIN output ON output.tx_id = tx.tx_id \ JOIN address_tag ON output.address_id = address_tag.address_id \ WHERE tag_id=? \ GROUP BY tx.tx_id \ ORDER BY block_height, tx.n', (tag_id,)) ins = db.execute('SELECT block_height, tx.n, COALESCE(SUM(value),0) FROM tx \ JOIN input ON input.tx_id = tx.tx_id \ JOIN output ON input.input_id = output.spent \ JOIN address_tag ON output.address_id = address_tag.address_id \ WHERE tag_id=? \ GROUP BY tx.tx_id \ ORDER BY block_height, tx.n', (tag_id,)) else: outs = db.execute('SELECT block_height, tx.n, COALESCE(SUM(value),0) FROM tx \ JOIN output ON output.tx_id = tx.tx_id \ GROUP BY tx.tx_id \ ORDER BY block_height, tx.n') ins = db.execute('SELECT block_height, tx.n, COALESCE(SUM(value),0) FROM tx \ JOIN input ON input.tx_id = tx.tx_id \ JOIN output ON input.input_id = output.spent \ GROUP BY tx.tx_id \ ORDER BY block_height, tx.n') bal = 0 for height, n, o_val, i_val in merge_moves(outs.fetchall(), ins.fetchall()): bal = bal + o_val - i_val stdout.write('%s\t%s\t%s\t%s\t%s\n' % (height, n, format_coin(o_val), format_coin(-i_val), format_coin(bal))) # Merge ordered lists of total input and output values per transaction into single table with columns for both. def merge_moves(outs, ins): i = o = 0 while True: if o == len(outs): for height, n, val in ins[i:]: yield (height, n, 0, val) return o_height, o_n, o_val = outs[o] o_key = (o_height, o_n) if i == len(ins): for height, n, val in outs[o:]: yield (height, n, val, 0) return i_height, i_n, i_val = ins[i] i_key = (i_height, i_n) if o_key < i_key: yield (o_height, o_n, o_val, 0) o += 1 elif i_key < o_key: yield (i_height, i_n, 0, i_val) i += 1 else: yield (o_height, o_n, o_val, i_val) i += 1 o += 1 def cmd_watch(argv): ''' watch [TAG] Import a set of addresses to watch linewise from stdin, optionally named by the given TAG. Addresses can be associated with multiple tags using multiple watch commands. ''' tag_id = None if len(argv) > 0: name = argv.pop(0) if '\n' in name: die('newline not allowed in tag name') tag_id = insert_or_get_tag_id(name) while True: l = stdin.readline() if len(l) == 0: break addr_id = insert_or_get_address_id(parse_address(l.rstrip('\n'))) if tag_id is not None: try: db.execute('INSERT INTO address_tag (address_id, tag_id) VALUES (?,?)', (addr_id, tag_id)) except IntegrityError: pass db.commit() def cmd_push(argv): ''' push Import raw hex transactions linewise from stdin and send to bitcoind. ''' while True: line = stdin.readline() if len(line) == 0: break tx_hex = line.rstrip('\n') stdout.write('txid %s\n' % rpc('sendrawtransaction', tx_hex)) def cmd_unlock_wallet(argv): ''' unlock-wallet [TIMEOUT] Read encryption passphrase from the terminal and unlock the bitcoind internal wallet for TIMEOUT seconds (default 60). ''' timeout = int(argv.pop(0)) if len(argv) > 0 else 60 r = rpc('walletpassphrase', getpass('Passphrase: '), timeout) if r is not None: stdout.write('%r\n' % r) else: stdout.write('Cached for %s seconds\n' % timeout) def cmd_help(argv): ''' help [COMMAND] Display help for a given command or list all commands. ''' if len(argv) > 0: name = argv.pop(0) name, func = get_command(name) doc = getdoc(func) if doc is None: stdout.write('No help for %r\n' % name) else: stdout.write('gbw-node %s\n' % doc) else: stdout.write('''Usage: gbw-node COMMAND [ARGS] Available commands (can be abbreviated when unambiguous): %s ''' % '\n'.join([name for name, f in cmds])) cmds = ( ('help', cmd_help), ('scan', cmd_scan), ('reset', cmd_reset), ('tags', cmd_tags), ('addresses', cmd_addresses), ('unspent-outs', cmd_unspent_outs), ('watch', cmd_watch), ('push', cmd_push), ('balance', cmd_balance), ('register', cmd_register), ('unlock-wallet', cmd_unlock_wallet), ) def get_command(name): rows = [r for r in cmds if r[0].startswith(name)] if len(rows) == 0: die('command not found: %s' % name) if len(rows) > 1: die('ambiguous command %s. Completions: %s' % (name, ' '.join([r[0] for r in rows]))) return rows[0] def main(): global db signal.signal(signal.SIGINT, signal.SIG_DFL) require_dir(gbw_home) db = sqlite3.connect(gbw_home + '/db', timeout=600) # in seconds db.execute('PRAGMA foreign_keys=ON') db.execute('PRAGMA cache_size=-8000') # negative means in KiB db.execute('PRAGMA wal_autocheckpoint=10000') # in pages (4k) if len(argv) < 2: die('missing command', help=True) get_command(argv[1])[1](argv[2:]) if __name__ == '__main__': main()