#!/usr/bin/python2 # J. Welsh, December 2019 from os import getenv, open as os_open, O_RDONLY, O_WRONLY, mkdir, mkfifo, read, write, close, stat from stat import S_ISDIR, S_ISFIFO from sys import argv, stdin, stdout, stderr, exit from socket import socket from threading import Thread, Event from binascii import a2b_hex, b2a_hex from base64 import b64encode from struct import Struct from hashlib import sha256 as _sha256 from decimal import Decimal from inspect import getdoc import errno import signal import string import json import sqlite3 from sqlite3 import IntegrityError # Safety level: scanning stops this many blocks behind tip CONFIRMS = 6 # There's no provision for handling forks/reorgs. In the event of one deeper than CONFIRMS, a heavy workaround would be: # $ sqlite3 ~/.gbw/db # sqlite> DELETE FROM output; # sqlite> DELETE FROM input; # sqlite> DELETE FROM tx; # sqlite> .exit # $ gbw-node reset # $ gbw-node scan gbw_home = getenv('HOME') + '/.gbw' bitcoin_conf_path = getenv('HOME') + '/.bitcoin/bitcoin.conf' # Further knobs in main() for database tuning. db = None b2lx = lambda b: b2a_hex(b[::-1]) lx2b = lambda x: a2b_hex(x)[::-1] def format_coin(v): neg = False if v < 0: v = -v neg = True s = '%d.%08d' % divmod(v, 100000000) if neg: return '-' + s return s u16 = Struct('> 8 data_bytes.extend([0] * leading_zeros) return ''.join(chr(b) for b in reversed(data_bytes)) def a2b_base58check(data): data = a2b_base58(data) payload = data[:-4] check = data[-4:] if check != sha256d(payload)[:4]: raise BadChecksum return payload class BadAddressLength(ValueError): pass class BadAddressVersion(ValueError): pass def parse_address(a): b = a2b_base58check(a) if len(b) != 21: raise BadAddressLength if b[0] != '\x00': raise BadAddressVersion(ord(b[0])) return b[1:] def format_address(b): return b2a_base58check('\x00' + b) ################################################## # Common database operations def get_address_id(a): r = db.execute('SELECT address_id FROM address WHERE address=?', (buffer(a),)).fetchone() return None if r is None else r[0] def insert_or_get_address_id(a): i = get_address_id(a) if i is not None: return i return db.execute('INSERT INTO address (address) VALUES (?)', (buffer(a),)).lastrowid def get_tx_id(hash): r = db.execute('SELECT tx_id FROM tx WHERE hash=?', (buffer(hash),)).fetchone() return None if r is None else r[0] def insert_or_get_tx_id(hash, blkhash, height, n, size): try: return db.execute('INSERT INTO tx (hash, block_hash, block_height, n, size) VALUES (?,?,?,?,?)', (buffer(hash), buffer(blkhash), height, n, size)).lastrowid except IntegrityError: # XXX check equality? return get_tx_id(hash) def insert_output(tx_id, n, addr_id, val): try: db.execute('INSERT INTO output (tx_id, n, address_id, value) VALUES (?,?,?,?)', (tx_id, n, addr_id, val)) except IntegrityError: r = db.execute('SELECT address_id, value FROM output WHERE tx_id=? AND n=?', (tx_id, n)).fetchone() if r != (addr_id, val): raise Conflict('output differs from previous content', tx_id, n, (addr_id, val), r) def insert_input(tx_id, n, prevout_id): try: input_id = db.execute('INSERT INTO input (tx_id, n) VALUES (?,?)', (tx_id, n)).lastrowid except IntegrityError: input_id = db.execute('SELECT input_id FROM input WHERE tx_id=? AND n=?', (tx_id, n)).fetchone()[0] db.execute('UPDATE output SET spent=? WHERE output_id=?', (input_id, prevout_id)) def get_output_id(tx_id, n): r = db.execute('SELECT output_id FROM output WHERE tx_id=? AND n=?', (tx_id, n)).fetchone() return None if r is None else r[0] def get_tag_id(name): r = db.execute('SELECT tag_id FROM tag WHERE name=?', (name,)).fetchone() return None if r is None else r[0] def insert_or_get_tag_id(name): i = get_tag_id(name) if i is not None: return i return db.execute('INSERT INTO tag (name) VALUES (?)', (name,)).lastrowid ################################################## # Command implementations def scan_block(height, v): stdout.write('block %s' % height) # [perf] computing every tx hash (blkhash, prev, time, target, txs), size = load_block(v) count_out = 0 n_tx = 0 for (hash, size, txins, txouts) in txs: matched_outs = [] for n, txout in enumerate(txouts): val, script = txout a = out_script_address(script) if a is not None: #print format_address(a) addr_id = get_address_id(a) if addr_id is not None: matched_outs.append((n, addr_id, val)) if len(matched_outs) > 0: tx_id = insert_or_get_tx_id(hash, blkhash, height, n_tx, size) for n, addr_id, val in matched_outs: insert_output(tx_id, n, addr_id, val) count_out += len(matched_outs) n_tx += 1 stdout.write(' new-outs %s' % count_out) # Inputs scanned second in case an output from the same block is spent. # Coinbase (input of first tx in block) doesn't reference anything. count_in = 0 n_tx = 1 for (hash, size, txins, txouts) in txs[1:]: matched_ins = [] for n, txin in enumerate(txins): prevout_hash, prevout_n, scriptsig = txin prevout_tx_id = get_tx_id(prevout_hash) if prevout_tx_id is not None: prevout_id = get_output_id(prevout_tx_id, prevout_n) if prevout_id is not None: matched_ins.append((n, prevout_id)) if len(matched_ins) > 0: tx_id = insert_or_get_tx_id(hash, blkhash, height, n_tx, size) for n, prevout_id in matched_ins: insert_input(tx_id, n, prevout_id) count_in += len(matched_ins) n_tx += 1 stdout.write(' spent-outs %s\n' % count_in) def die(msg, help=False): stderr.write('gbw-node: %s\n' % msg) if help: cmd_help([]) exit(-1) def require_tag(name): i = get_tag_id(name) if i is None: die('tag not found: %r' % name) return i def cmd_scan(argv): ''' scan Iterate blocks from bitcoind, indexing transaction inputs and outputs affecting watched addresses. May be safely interrupted and resumed. NOT PRESENTLY SAFE TO RUN CONCURRENT INSTANCES due to the dumpblock to named pipe kludge. ''' db.execute('PRAGMA synchronous=NORMAL') height = db.execute('SELECT scan_height FROM state').fetchone()[0] blockcount = max(-1, rpc('getblockcount') - CONFIRMS) while height < blockcount: height += 1 scan_block(height, memoryview(getblock(height))) db.execute('UPDATE state SET scan_height = ?', (height,)) db.commit() def cmd_reset(argv): ''' reset Reset the scan pointer so the next scan will proceed from the genesis block, to find transactions associated with newly watched addresses. ''' db.execute('UPDATE state SET scan_height = -1') db.commit() def cmd_tags(argv): ''' tags List all tag names. ''' for name, in db.execute('SELECT name FROM tag'): stdout.write(name + '\n') def cmd_addresses(argv): ''' addresses [TAG] List addresses with the given TAG (or all watched addresses). ''' if len(argv) > 0: tag_id = require_tag(argv.pop(0)) r = db.execute('SELECT address FROM address \ JOIN address_tag ON address.address_id=address_tag.address_id \ WHERE tag_id=?', (tag_id,)) else: r = db.execute('SELECT address FROM address') for a, in r: stdout.write(format_address(str(a)) + '\n') def cmd_unspent_outs(argv): ''' unspent-outs [TAG] Display the unspent outputs table for addresses with the given TAG (or all watched addresses), as required by the offline wallet, ordered by age. ''' if len(argv) > 0: tag_id = require_tag(argv.pop(0)) r = db.execute('SELECT address, value, hash, output.n, block_height, tx.n FROM output \ JOIN address ON output.address_id = address.address_id \ JOIN tx ON output.tx_id = tx.tx_id \ JOIN address_tag ON output.address_id = address_tag.address_id \ WHERE spent IS NULL AND tag_id=? \ ORDER BY block_height DESC', (tag_id,)) else: r = db.execute('SELECT address, value, hash, output.n, block_height, tx.n FROM output \ JOIN address ON output.address_id = address.address_id \ JOIN tx ON output.tx_id = tx.tx_id \ WHERE spent IS NULL \ ORDER BY block_height DESC') for a, v, hash, n_out, height, n_tx in r: stdout.write('%s %s %s %s #blk %s tx %s\n' % (format_address(str(a)), format_coin(v), b2lx(hash), n_out, height, n_tx)) def cmd_balance(argv): ''' balance [TAG] Display confirmed balance of addresses with the given TAG (or all watched addresses). ''' if len(argv) > 0: tag_id = require_tag(argv.pop(0)) r = db.execute('SELECT SUM(value) FROM output \ JOIN address_tag ON output.address_id = address_tag.address_id \ WHERE spent IS NULL AND tag_id=?', (tag_id,)) else: r = db.execute('SELECT SUM(value) FROM output WHERE spent IS NULL') bal, = r.fetchone() stdout.write('%s\n' % format_coin(bal)) def cmd_register(argv): ''' register [TAG] Display a tab-delimited transaction register report for addresses with the given TAG (or all watched addresses). Columns are: - confirmation block height - number of transaction within block - total deposits (new outputs) - total withdrawals (spent outputs) - running balance ''' if len(argv) > 0: tag_id = require_tag(argv.pop(0)) outs = db.execute('SELECT block_height, tx.n, SUM(value) FROM tx \ JOIN output ON output.tx_id = tx.tx_id \ JOIN address_tag ON output.address_id = address_tag.address_id \ WHERE tag_id=? \ GROUP BY tx.tx_id \ ORDER BY block_height, tx.n', (tag_id,)) ins = db.execute('SELECT block_height, tx.n, SUM(value) FROM tx \ JOIN input ON input.tx_id = tx.tx_id \ JOIN output ON input.input_id = output.spent \ JOIN address_tag ON output.address_id = address_tag.address_id \ WHERE tag_id=? \ GROUP BY tx.tx_id \ ORDER BY block_height, tx.n', (tag_id,)) else: outs = db.execute('SELECT block_height, tx.n, SUM(value) FROM tx \ JOIN output ON output.tx_id = tx.tx_id \ GROUP BY tx.tx_id \ ORDER BY block_height, tx.n') ins = db.execute('SELECT block_height, tx.n, SUM(value) FROM tx \ JOIN input ON input.tx_id = tx.tx_id \ JOIN output ON input.input_id = output.spent \ GROUP BY tx.tx_id \ ORDER BY block_height, tx.n') bal = 0 for height, n, o_val, i_val in merge_moves(outs.fetchall(), ins.fetchall()): bal = bal + o_val - i_val stdout.write('%s\t%s\t%s\t%s\t%s\n' % (height, n, format_coin(o_val), format_coin(-i_val), format_coin(bal))) # Merge ordered lists of total input and output values per transaction into single table with columns for both. def merge_moves(outs, ins): i = o = 0 while True: if o == len(outs): for height, n, val in ins[i:]: yield (height, n, 0, val) return o_height, o_n, o_val = outs[o] o_key = (o_height, o_n) if i == len(ins): for height, n, val in outs[o:]: yield (height, n, val, 0) return i_height, i_n, i_val = ins[i] i_key = (i_height, i_n) if o_key < i_key: yield (o_height, o_n, o_val, 0) o += 1 elif i_key < o_key: yield (i_height, i_n, 0, i_val) i += 1 else: yield (o_height, o_n, o_val, i_val) i += 1 o += 1 def cmd_watch(argv): ''' watch [TAG] Import a set of addresses to watch linewise from stdin, optionally named by the given TAG. Addresses can be associated with multiple tags using multiple watch commands. ''' tag_id = None if len(argv) > 0: name = argv.pop(0) if '\n' in name: die('newline not allowed in tag name') tag_id = insert_or_get_tag_id(name) while True: l = stdin.readline() if len(l) == 0: break addr_id = insert_or_get_address_id(parse_address(l.rstrip('\n'))) if tag_id is not None: try: db.execute('INSERT INTO address_tag (address_id, tag_id) VALUES (?,?)', (addr_id, tag_id)) except IntegrityError: pass db.commit() def cmd_push(argv): ''' push Import raw hex transactions linewise from stdin and send to bitcoind. ''' while True: line = stdin.readline() if len(line) == 0: break tx_hex = line.rstrip('\n') stdout.write('txid %s\n' % rpc('sendrawtransaction', tx_hex)) def cmd_help(argv): ''' help [COMMAND] Display help for a given command or list all commands. ''' if len(argv) > 0: name = argv.pop(0) name, func = get_command(name) doc = getdoc(func) if doc is None: stdout.write('No help for %r\n' % name) else: stdout.write('gbw-node %s\n' % doc) else: stdout.write('''Usage: gbw-node COMMAND [ARGS] Available commands (can be abbreviated when unambiguous): %s ''' % '\n'.join([name for name, f in cmds])) cmds = ( ('help', cmd_help), ('scan', cmd_scan), ('reset', cmd_reset), ('tags', cmd_tags), ('addresses', cmd_addresses), ('unspent-outs', cmd_unspent_outs), ('watch', cmd_watch), ('push', cmd_push), ('balance', cmd_balance), ('register', cmd_register), ) def get_command(name): rows = [r for r in cmds if r[0].startswith(name)] if len(rows) == 0: die('command not found: %s' % name) if len(rows) > 1: die('ambiguous command %s. Completions: %s' % (name, ' '.join([r[0] for r in rows]))) return rows[0] def main(): global db signal.signal(signal.SIGINT, signal.SIG_DFL) require_dir(gbw_home) db = sqlite3.connect(gbw_home + '/db', timeout=600) # in seconds db.execute('PRAGMA foreign_keys=ON') db.execute('PRAGMA cache_size=-8000') # negative means in KiB db.execute('PRAGMA wal_autocheckpoint=10000') # in pages (4k) if len(argv) < 2: die('missing command', help=True) get_command(argv[1])[1](argv[2:]) if __name__ == '__main__': main()