Skip to content

evrmore-rpc: Python Client for Evrmore Blockchain

PyPI version License: MIT Python Versions

A high-performance, fully featured Python client for the Evrmore blockchain. Designed for both synchronous and asynchronous environments, it includes full RPC and ZMQ support, automatic decoding of blocks and transactions, intelligent asset detection, and robust configuration options.


๐Ÿš€ Features

  • ๐Ÿ”„ Context-Aware: Automatically switches between sync and async modes
  • โš™๏ธ Flexible Configuration: Load settings from evrmore.conf, env vars, or manual args
  • ๐Ÿ’ก Smart RPC Handling: Full method coverage with type hints and structured responses
  • โšก Fast + Efficient: Connection pooling for low-latency concurrent RPC calls
  • ๐Ÿง  Asset Intelligence: Auto-parses asset transactions with enhanced metadata
  • ๐Ÿ“ก ZMQ Notifications: Real-time support for BLOCK, TX, HASH_, RAW_, and asset events
  • ๐Ÿงฐ Fully Tested Utilities: Includes stress test, coverage audit, pooling demo, and more

๐Ÿ“ฆ Installation

pip install evrmore-rpc

๐Ÿงช Quick Start

from evrmore_rpc import EvrmoreClient

client = EvrmoreClient()
info = client.getblockchaininfo()
print("Height:", info['blocks'])
print("Difficulty:", info['difficulty'])

๐Ÿ” Asynchronous Usage

import asyncio
from evrmore_rpc import EvrmoreClient

async def main():
    client = EvrmoreClient()
    info = await client.getblockchaininfo()
    print("Height:", info['blocks'])
    await client.close()

asyncio.run(main())

๐Ÿงฉ Configuration Options

# Default (evrmore.conf)
client = EvrmoreClient()

# Env vars (EVR_RPC_*)
client = EvrmoreClient()

# Manual args
client = EvrmoreClient(url="http://localhost:8819", rpcuser="user", rpcpassword="pass")

# Testnet toggle
client = EvrmoreClient(testnet=True)

Supports cookie authentication and auto-parsing of .cookie file.


๐Ÿ’ฐ Asset Support

# Get asset info
info = client.getassetdata("MYTOKEN")
print(info['amount'], info['reissuable'])

# Transfer asset
txid = client.transfer("MYTOKEN", 100, "EVRAddress")

๐Ÿ“ก ZMQ Notifications (Real-Time)

ZMQ Notifications Guide

This guide covers how to use ZeroMQ (ZMQ) notifications with the evrmore-rpc package to receive real-time blockchain updates.

Overview

The EvrmoreZMQClient provides a seamless interface for receiving real-time blockchain notifications via ZeroMQ. It supports:

  • Automatic context detection (sync/async)
  • Enhanced asset metadata in decoded transactions
  • Automatic reconnection on connection loss
  • Clean shutdown and resource management
  • Typed notification data with structured fields

Configuration

Required evrmore.conf Settings

Add these lines to your evrmore.conf:

zmqpubhashblock=tcp://127.0.0.1:28332
zmqpubhashtx=tcp://127.0.0.1:28332
zmqpubrawblock=tcp://127.0.0.1:28332
zmqpubrawtx=tcp://127.0.0.1:28332

Client Setup

from evrmore_rpc import EvrmoreClient
from evrmore_rpc.zmq import EvrmoreZMQClient, ZMQTopic

# Create RPC client for auto-decoding
rpc = EvrmoreClient()

# Create ZMQ client
zmq = EvrmoreZMQClient(
    zmq_host="127.0.0.1",
    zmq_port=28332,
    topics=[ZMQTopic.BLOCK, ZMQTopic.TX],
    rpc_client=rpc,
    auto_decode=True
)

Available Topics

Topic Description Requires RPC
HASH_BLOCK Block hash notifications No
HASH_TX Transaction hash notifications No
RAW_BLOCK Raw serialized block data No
RAW_TX Raw serialized transaction data No
BLOCK Auto-decoded block data Yes
TX Auto-decoded transaction data Yes

Event Handlers

Block Notifications

@zmq.on(ZMQTopic.BLOCK)
def handle_block(notification):
    print(f"Block #{notification.height}")
    print(f"Hash: {notification.hex}")
    print(f"Transactions: {len(notification.block['tx'])}")

    # Access block data
    block = notification.block
    print(f"Size: {block.get('size', 'N/A')} bytes")
    print(f"Time: {block.get('time', 'N/A')}")
    print(f"Difficulty: {block.get('difficulty', 'N/A')}")

Transaction Notifications

@zmq.on(ZMQTopic.TX)
def handle_tx(notification):
    print(f"TX {notification.tx['txid']}")
    print(f"Size: {notification.tx.get('size', 'N/A')} bytes")
    print(f"Version: {notification.tx.get('version', 'N/A')}")

    # Check for asset data
    if notification.has_assets:
        print("\nAsset Information:")
        for asset in notification.asset_info:
            print(f"Asset: {asset.get('name', 'N/A')}")
            print(f"Amount: {asset.get('amount', 'N/A')}")
            print(f"Type: {asset.get('type', 'N/A')}")

Usage Examples

Synchronous Usage

import signal
import sys
from evrmore_rpc.zmq import EvrmoreZMQClient, ZMQTopic

# Initialize client
zmq = EvrmoreZMQClient()
zmq.set_lingering(0)  # For fast shutdown

# Handle Ctrl+C
def signal_handler(sig, frame):
    print("\nShutting down...")
    zmq.stop()
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

# Register handlers
@zmq.on(ZMQTopic.BLOCK)
def handle_block(notification):
    print(f"Block #{notification.height}")

@zmq.on(ZMQTopic.TX)
def handle_tx(notification):
    print(f"TX {notification.tx['txid']}")

# Start the client
zmq.start()

# Keep running
while True:
    import time
    time.sleep(1)

Asynchronous Usage

import asyncio
from evrmore_rpc.zmq import EvrmoreZMQClient, ZMQTopic

async def main():
    # Initialize client
    zmq = EvrmoreZMQClient()
    zmq.set_lingering(0)

    # Register handlers
    @zmq.on(ZMQTopic.BLOCK)
    async def handle_block(notification):
        print(f"Block #{notification.height}")

    @zmq.on(ZMQTopic.TX)
    async def handle_tx(notification):
        print(f"TX {notification.tx['txid']}")

    # Start the client
    await zmq.start()

    try:
        # Keep running
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        print("\nShutting down...")
    finally:
        await zmq.stop()

asyncio.run(main())

Advanced Features

Custom Message Processing

class BlockchainMonitor:
    def __init__(self):
        self.zmq_client = EvrmoreZMQClient()
        self.rpc_client = EvrmoreClient()
        self.blocks_processed = 0
        self.transactions_processed = 0
        self.asset_transfers = []

    async def start(self):
        # Register handlers
        self.zmq_client.on_block(self.handle_block)
        self.zmq_client.on_transaction(self.handle_transaction)

        # Start the ZMQ client
        await self.zmq_client.start()
        print("Blockchain monitor started")

    async def stop(self):
        await self.zmq_client.stop()
        print("Blockchain monitor stopped")

    async def handle_block(self, notification):
        block_hash = notification.hex
        block = self.rpc_client.getblock(block_hash)

        self.blocks_processed += 1
        print(f"New block: {block.hash} (height: {block.height})")
        print(f"Block contains {len(block.tx)} transactions")

        # Analyze block data
        if len(block.tx) > 100:
            print(f"Large block detected: {len(block.tx)} transactions")

    async def handle_transaction(self, notification):
        txid = notification.hex

        try:
            tx = self.rpc_client.getrawtransaction(txid, True)
            self.transactions_processed += 1

            # Check for asset transfers
            for vout in tx.vout:
                if "asset" in vout.get("scriptPubKey", {}).get("asset", {}):
                    asset = vout["scriptPubKey"]["asset"]
                    print(f"Asset transfer detected: {asset['name']} ({asset['amount']})")

                    self.asset_transfers.append({
                        "txid": txid,
                        "asset": asset["name"],
                        "amount": asset["amount"],
                        "time": tx.time if hasattr(tx, "time") else None
                    })
        except Exception as e:
            print(f"Error processing transaction {txid}: {e}")

Error Handling

from evrmore_rpc.zmq import EvrmoreZMQError

try:
    zmq.start()
except EvrmoreZMQError as e:
    print(f"ZMQ error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Performance Considerations

  • Use set_lingering(0) for fast shutdown
  • Set appropriate cleanup timeouts
  • Consider using async mode for better performance
  • Monitor memory usage with large transaction volumes

See Also


๐Ÿ“Š Stress Test Results

Real benchmark results from the stress_test.py utility:

Mode Time RPS Avg (ms) Median Min Max
Local Async 0.01 s 10442.42 0.59 0.50 0.39 1.84
Local Sync 0.06 s 1861.26 1.52 1.42 0.43 3.40
Remote Async 1.75 s 57.31 167.77 155.93 111 324
Remote Sync 1.86 s 53.83 160.39 163.26 112 310

Tested with getblockcount, 100 requests, 10 concurrent workers


๐Ÿงฌ Utilities & Examples

python3 -m evrmore_rpc.stress_test --sync --remote

Examples: - readme_test.py โ€” basic usage demo - stress_test.py โ€” concurrency performance test - connection_pooling.py โ€” show pooled reuse efficiency - flexible_config.py โ€” env var / conf / manual setup demo - rpc_coverage.py โ€” validates full RPC method mapping - zmq_notifications.py โ€” real-time block/tx decode stream


๐Ÿงช Requirements

  • Python 3.8+
  • Evrmore daemon running with RPC and optional ZMQ endpoints

๐Ÿชช License

MIT License โ€” See LICENSE


๐Ÿค Contributing

PRs welcome! Please lint, document, and include examples.

git clone https://github.com/manticoretechnologies/evrmore-rpc-dev
cd evrmore-rpc
python3 -m venv .venv && source .venv/bin/activate
pip install -e .[dev]

Run tests with:

python3 -m evrmore_rpc.stress_test

๐Ÿงญ Summary

evrmore-rpc is more than a wrapper โ€” it's a full dev toolkit for Evrmore. Built for production-grade systems needing precision, flexibility, and real-time awareness. Perfect for explorers, exchanges, indexers, and power users.