Advanced Usage Guide¶
This guide covers advanced features and production patterns for the evrmore-rpc package.
Connection Management¶
Connection reuse¶
EvrmoreClient maintains internal HTTP sessions (requests.Session for sync, aiohttp.ClientSession for async), so repeated calls reuse a single connection rather than reconnecting for every request.
from evrmore_rpc import EvrmoreClient
import asyncio
def run_sync():
client = EvrmoreClient()
try:
for _ in range(3):
info = client.getblockchaininfo()
print("Height:", info["blocks"])
finally:
client.close_sync()
async def run_async():
async with EvrmoreClient() as client:
for _ in range(3):
info = await client.getblockchaininfo()
print("Height:", info["blocks"])
if __name__ == "__main__":
run_sync()
asyncio.run(run_async())
Custom context manager¶
You can encapsulate client creation and teardown in your own context manager:
from evrmore_rpc import EvrmoreClient
class EvrmoreContext:
def __init__(
self,
url: str = "http://localhost:8819",
rpcuser: str | None = None,
rpcpassword: str | None = None,
):
self.client = EvrmoreClient(url=url, rpcuser=rpcuser, rpcpassword=rpcpassword)
def __enter__(self):
return self.client
def __exit__(self, exc_type, exc_val, exc_tb):
self.client.close_sync()
# Usage
with EvrmoreContext() as client:
result = client.getblockchaininfo()
Performance Optimization¶
Batch Processing¶
def process_blocks(start_height, end_height, batch_size=100):
for i in range(start_height, end_height + 1, batch_size):
batch_end = min(i + batch_size, end_height + 1)
blocks = []
for height in range(i, batch_end):
block = client.getblock(height)
blocks.append(block)
# Process batch
process_block_batch(blocks)
def process_block_batch(blocks):
for block in blocks:
# Process block data
pass
Caching¶
from functools import lru_cache
import time
class CachedEvrmoreClient:
def __init__(self, client):
self.client = client
self.block_cache = {}
self.tx_cache = {}
@lru_cache(maxsize=1000)
def get_block(self, height):
return self.client.getblock(height)
@lru_cache(maxsize=10000)
def get_transaction(self, txid):
return self.client.getrawtransaction(txid, True)
def clear_cache(self):
self.get_block.cache_clear()
self.get_transaction.cache_clear()
Error Handling¶
Custom Error Classes¶
from evrmore_rpc import EvrmoreRPCError
class EvrmoreClientError(Exception):
"""Base class for Evrmore client errors"""
pass
class BlockNotFoundError(EvrmoreClientError):
"""Raised when a block is not found"""
pass
class TransactionNotFoundError(EvrmoreClientError):
"""Raised when a transaction is not found"""
pass
def get_block_safe(height):
try:
return client.getblock(height)
except EvrmoreRPCError as e:
if "Block not found" in str(e):
raise BlockNotFoundError(f"Block at height {height} not found")
raise
Retry Logic¶
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def get_block_with_retry(height):
return client.getblock(height)
Real-Time Processing¶
ZMQ Integration¶
from evrmore_rpc.zmq import EvrmoreZMQClient, ZMQTopic
class BlockchainProcessor:
def __init__(self):
self.zmq = EvrmoreZMQClient()
self.rpc = EvrmoreClient()
self.blocks_processed = 0
self.transactions_processed = 0
def start(self):
# Register handlers
self.zmq.on_block(self.handle_block)
self.zmq.on_transaction(self.handle_transaction)
# Start ZMQ client
self.zmq.start()
def handle_block(self, notification):
block = self.rpc.getblock(notification.hex)
self.blocks_processed += 1
# Process block
self.process_block(block)
def handle_transaction(self, notification):
tx = self.rpc.getrawtransaction(notification.hex, True)
self.transactions_processed += 1
# Process transaction
self.process_transaction(tx)
WebSocket Integration (architecture)¶
evrmore-rpc ships ZMQ support in this version. A WebSocket layer can be built on top of ZMQ notifications by running a separate WebSocket server that subscribes to ZMQ and broadcasts decoded events to connected clients.
High-level pattern:
- Use
EvrmoreZMQClientto subscribe toBLOCK/TXtopics. - In each handler, serialize the notification (or associated RPC data) to JSON.
- Broadcast those JSON messages to connected WebSocket clients.
This pattern is used in higher-level stacks (for example, explorer frontends) but is not part of the core evrmore-rpc package.
Asset Management¶
Asset Tracking¶
class AssetTracker:
def __init__(self):
self.client = EvrmoreClient()
self.assets = {}
def track_asset(self, asset_name):
# Get asset info
asset = self.client.getassetinfo(asset_name)
self.assets[asset_name] = {
"name": asset.name,
"amount": asset.amount,
"units": asset.units,
"reissuable": asset.reissuable,
"transfers": []
}
def get_asset_transfers(self, asset_name, start_block=0):
if asset_name not in self.assets:
self.track_asset(asset_name)
current_height = self.client.getblockcount()
for height in range(start_block, current_height + 1):
block = self.client.getblock(height)
for txid in block.tx:
tx = self.client.getrawtransaction(txid, True)
for vout in tx.vout:
if "asset" in vout.get("scriptPubKey", {}).get("asset", {}):
asset = vout["scriptPubKey"]["asset"]
if asset["name"] == asset_name:
self.assets[asset_name]["transfers"].append({
"txid": txid,
"amount": asset["amount"],
"block": height,
"time": block.time
})
Asset Issuance¶
class AssetIssuer:
def __init__(self):
self.client = EvrmoreClient()
def issue_asset(self, name, amount, units=0, reissuable=True, ipfs_hash=None):
# Issue new asset
txid = self.client.issueasset(
name=name,
amount=amount,
units=units,
reissuable=reissuable,
ipfs_hash=ipfs_hash
)
# Wait for confirmation
self.wait_for_confirmation(txid)
return txid
def reissue_asset(self, name, amount, ipfs_hash=None):
# Reissue existing asset
txid = self.client.reissueasset(
name=name,
amount=amount,
ipfs_hash=ipfs_hash
)
# Wait for confirmation
self.wait_for_confirmation(txid)
return txid
def wait_for_confirmation(self, txid, confirmations=1):
while True:
tx = self.client.getrawtransaction(txid, True)
if tx.confirmations >= confirmations:
break
time.sleep(1)
Production Patterns¶
Logging¶
import logging
class EvrmoreLogger:
def __init__(self):
self.logger = logging.getLogger("evrmore")
self.logger.setLevel(logging.INFO)
# Add handlers
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_block(self, block):
self.logger.info(f"Processing block {block.height}")
def log_transaction(self, tx):
self.logger.info(f"Processing transaction {tx.txid}")
def log_error(self, error):
self.logger.error(f"Error: {error}")
Metrics Collection¶
from prometheus_client import Counter, Gauge, start_http_server
class EvrmoreMetrics:
def __init__(self):
# Counters
self.blocks_processed = Counter(
'evrmore_blocks_processed_total',
'Total number of blocks processed'
)
self.transactions_processed = Counter(
'evrmore_transactions_processed_total',
'Total number of transactions processed'
)
# Gauges
self.current_height = Gauge(
'evrmore_current_height',
'Current blockchain height'
)
self.connection_status = Gauge(
'evrmore_connection_status',
'Connection status (1=connected, 0=disconnected)'
)
def start_server(self, port=8000):
start_http_server(port)
def update_metrics(self, client):
info = client.getblockchaininfo()
self.current_height.set(info.blocks)
self.connection_status.set(1)
Health Checks¶
class EvrmoreHealth:
def __init__(self):
self.client = EvrmoreClient()
self.last_check = 0
self.check_interval = 60 # seconds
def is_healthy(self):
current_time = time.time()
if current_time - self.last_check < self.check_interval:
return True
try:
# Check connection
self.client.getblockchaininfo()
self.last_check = current_time
return True
except Exception as e:
return False
def get_status(self):
try:
info = self.client.getblockchaininfo()
return {
"status": "healthy",
"blocks": info.blocks,
"headers": info.headers,
"verification_progress": info.verificationprogress
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}
See Also¶
- Getting Started for basic usage
- API Reference for detailed API docs
- Examples for code samples
- ZMQ Guide for real-time notifications
- WebSocket Guide for WebSocket integration