Event Hooks¶
The Event Hooks system in Evrmore Authentication allows you to add custom behavior at key points in the authentication flow. This enables you to extend functionality, add logging, integrate with other systems, or implement custom business logic.
Overview¶
Event hooks are implemented as callback functions that are executed at specific points in the authentication process. You can register hooks for various events and they will be called with relevant context information when those events occur.
Available Hook Points¶
The following hook points are available:
| Hook Point | Description | Triggered When |
|---|---|---|
pre_challenge |
Before generating a challenge | generate_challenge() is called |
post_challenge |
After generating a challenge | A challenge has been created |
pre_auth |
Before authenticating a user | authenticate() is called |
post_auth |
After successful authentication | Authentication succeeds |
pre_verify |
Before verifying a signature | Signature verification begins |
post_verify |
After verifying a signature | Signature verification completes |
pre_token_validate |
Before validating a token | validate_token() is called |
post_token_validate |
After validating a token | Token validation completes |
pre_token_invalidate |
Before invalidating a token | invalidate_token() is called |
post_token_invalidate |
After invalidating a token | Token invalidation completes |
Registering Hooks¶
Hooks can be registered using the add_hook method or as a decorator:
Using the Method¶
from evrmore_authentication import EvrmoreAuth
auth = EvrmoreAuth()
def my_pre_challenge_hook(evrmore_address, expire_minutes):
print(f"Generating challenge for {evrmore_address}")
return {"custom_data": "value"} # Optional return value
# Register the hook
auth.add_hook('pre_challenge', my_pre_challenge_hook)
Using the Decorator¶
from evrmore_authentication import EvrmoreAuth
auth = EvrmoreAuth()
@auth.add_hook('post_auth')
def post_auth_hook(user, session):
print(f"User {user.evrmore_address} authenticated successfully")
# Perform custom actions after authentication
notify_login(user.id)
Hook Parameters¶
Each hook receives specific parameters relevant to its context:
pre_challenge¶
@auth.add_hook('pre_challenge')
def pre_challenge_hook(evrmore_address, expire_minutes):
"""
evrmore_address: The address for which the challenge is being generated
expire_minutes: The expiration time in minutes
Return value: Can return a dict to modify parameters
"""
# Can modify expire_minutes
return {"expire_minutes": 30}
post_challenge¶
@auth.add_hook('post_challenge')
def post_challenge_hook(evrmore_address, challenge, challenge_record):
"""
evrmore_address: The address for which the challenge was generated
challenge: The challenge text
challenge_record: The Challenge object created
"""
# No return value expected
pre_auth¶
@auth.add_hook('pre_auth')
def pre_auth_hook(evrmore_address, challenge, signature, skip_ownership_check):
"""
evrmore_address: The address being authenticated
challenge: The challenge text
signature: The signature to verify
skip_ownership_check: Whether to skip challenge ownership verification
Return value: Can return a dict to modify parameters
"""
return {"skip_ownership_check": True}
post_auth¶
@auth.add_hook('post_auth')
def post_auth_hook(user, session):
"""
user: The User object for the authenticated user
session: The Session object created
"""
# No return value expected
pre_verify¶
@auth.add_hook('pre_verify')
def pre_verify_hook(address, message, signature):
"""
address: The Evrmore address
message: The message (challenge) to verify
signature: The signature to verify
Return value: Can return a dict to modify parameters
"""
# Can modify verification parameters
return {"message": modified_message}
post_verify¶
@auth.add_hook('post_verify')
def post_verify_hook(address, message, signature, is_valid):
"""
address: The Evrmore address
message: The message that was verified
signature: The signature that was verified
is_valid: Boolean indicating if the signature was valid
"""
# No return value expected
pre_token_validate¶
@auth.add_hook('pre_token_validate')
def pre_token_validate_hook(token):
"""
token: The token to validate
Return value: Can return a dict to modify parameters
"""
# No modifications typically needed
return {}
post_token_validate¶
@auth.add_hook('post_token_validate')
def post_token_validate_hook(token, token_data, is_valid):
"""
token: The token that was validated
token_data: The decoded token data if valid, None otherwise
is_valid: Boolean indicating if the token was valid
"""
# No return value expected
pre_token_invalidate¶
@auth.add_hook('pre_token_invalidate')
def pre_token_invalidate_hook(token):
"""
token: The token to invalidate
Return value: Can return a dict to modify parameters
"""
# No modifications typically needed
return {}
post_token_invalidate¶
@auth.add_hook('post_token_invalidate')
def post_token_invalidate_hook(token, session):
"""
token: The token that was invalidated
session: The Session object that was invalidated
"""
# No return value expected
Multiple Hooks¶
You can register multiple hooks for the same event. They will be executed in the order they were registered:
@auth.add_hook('post_auth')
def log_auth(user, session):
print(f"User {user.evrmore_address} authenticated")
@auth.add_hook('post_auth')
def notify_auth(user, session):
send_notification(user.id, "New login detected")
Modifying Parameters¶
Hooks that run before an operation (pre_* hooks) can modify the parameters passed to the operation by returning a dictionary with the modified values:
@auth.add_hook('pre_challenge')
def extend_challenge_expiry(evrmore_address, expire_minutes):
if is_premium_user(evrmore_address):
# Premium users get longer-lived challenges
return {"expire_minutes": 60}
return {} # Return empty dict to keep default parameters
Practical Examples¶
Logging Authentication Attempts¶
@auth.add_hook('pre_auth')
def log_auth_attempt(evrmore_address, challenge, signature, skip_ownership_check):
print(f"Authentication attempt by {evrmore_address}")
return {}
@auth.add_hook('post_auth')
def log_auth_success(user, session):
print(f"Successful authentication by {user.evrmore_address}")
@auth.add_hook('post_verify')
def log_verification_result(address, message, signature, is_valid):
if not is_valid:
print(f"Failed signature verification for {address}")
Custom User Management¶
@auth.add_hook('post_auth')
def update_user_data(user, session):
# Update last login time in a custom database
db.execute(
"UPDATE user_metadata SET last_login = ? WHERE user_id = ?",
(datetime.datetime.utcnow().isoformat(), user.id)
)
# Count login attempts
login_count = db.fetchone(
"SELECT login_count FROM user_stats WHERE user_id = ?",
(user.id,)
)
if login_count:
db.execute(
"UPDATE user_stats SET login_count = login_count + 1 WHERE user_id = ?",
(user.id,)
)
else:
db.execute(
"INSERT INTO user_stats (user_id, login_count) VALUES (?, ?)",
(user.id, 1)
)
Security Enhancements¶
@auth.add_hook('pre_challenge')
def rate_limit_challenges(evrmore_address, expire_minutes):
# Check if too many challenges have been generated recently
count = db.fetchone(
"""SELECT COUNT(*) as count FROM challenges
WHERE evrmore_address = ? AND created_at > ?""",
(evrmore_address, (datetime.datetime.utcnow() - datetime.timedelta(minutes=5)).isoformat())
)
if count and count['count'] > 5:
raise Exception("Rate limit exceeded for challenge generation")
return {}
@auth.add_hook('post_auth')
def check_suspicious_activity(user, session):
# Check if the user has logged in from a new location
# Implementation depends on how you track locations
current_ip = get_request_ip()
known_ips = db.fetchall(
"SELECT ip_address FROM user_locations WHERE user_id = ?",
(user.id,)
)
if current_ip not in [row['ip_address'] for row in known_ips]:
# New location, store it and notify user
db.execute(
"INSERT INTO user_locations (user_id, ip_address, first_seen) VALUES (?, ?, ?)",
(user.id, current_ip, datetime.datetime.utcnow().isoformat())
)
send_notification(user.id, f"New login from {get_location_from_ip(current_ip)}")
Integration with External Systems¶
@auth.add_hook('post_auth')
def sync_with_crm(user, session):
# Update user in CRM system
crm_api.update_user(
user_id=user.id,
evrmore_address=user.evrmore_address,
last_login=datetime.datetime.utcnow().isoformat()
)
@auth.add_hook('post_token_invalidate')
def update_activity_tracking(token, session):
# Log user logout in analytics system
analytics.track_event(
session.user_id,
'user_logout',
{
'session_duration': (datetime.datetime.utcnow() - session.created_at).total_seconds(),
'session_id': session.id
}
)
Error Handling¶
Errors in hooks are logged but do not interrupt the main authentication flow by default. To change this behavior, you can set the propagate_hook_errors parameter when initializing EvrmoreAuth:
With this setting, any exception raised in a hook will be propagated to the caller.
Testing Hooks¶
To test hooks, you may want to mock the authentication systems:
from unittest.mock import patch, MagicMock
def test_post_auth_hook():
auth = EvrmoreAuth()
# Create a spy to check if the hook was called
spy = MagicMock()
@auth.add_hook('post_auth')
def my_hook(user, session):
spy(user.id, session.id)
# Mock authenticate to avoid actual authentication
with patch.object(auth, 'authenticate') as mock_auth:
# Create mock user and session
mock_user = MagicMock()
mock_user.id = "123"
mock_user.evrmore_address = "EXaMPLeEvRMoReAddResS"
mock_session = MagicMock()
mock_session.id = "456"
mock_session.user_id = "123"
# Make authenticate return our mock session
mock_auth.return_value = mock_session
# Call authenticate
auth.authenticate(
evrmore_address="EXaMPLeEvRMoReAddResS",
challenge="test_challenge",
signature="test_signature"
)
# Verify hook was called with correct parameters
spy.assert_called_once_with("123", "456")