Bypassing Event Handlers: Using Database Constraints as Last Line of Defence
Many codebases rely on on_change callbacks, Django signals (post_save, pre_save, etc.) or Rails callbacks, etc., to protect updates to sensitive fields like is_internal_admin_user, roles, account_locks, and audit events. These checks help guard against privilege escalation, but they operate only at the application or ORM layer. In practice, they can be bypassed in several ways.
This post explains those bypass vectors with concrete examples in Rails and Django, shows how to implement database constraints and triggers to detect them, and demonstrates monitoring and alerting strategies. The conclusion: database constraints and alerts are essential for defense in depth. Callbacks alone leave bulk updates, raw SQL, migrations, and external systems outside your safety net.
The Problem: Callbacks Only Run on the ORM Path
Application callbacks are a useful first layer. They:
- Centralize business rules (e.g. "only admins can set
is_internal_admin_user") - Provide a single place to log or audit changes
- Reject invalid transitions (e.g. unlocking an account without approval)
But they only run when the ORM is used in the normal way. Any code path that writes to the database without going through the model lifecycle will skip them. That leaves sensitive invariants unprotected for a large class of real-world operations.
Organizations often assume privileges are only granted through "controlled" ORM paths. In reality, former employees, customers, or internal accounts may still have sensitive access because it was granted via raw migrations, bulk updates, or one-off scripts. Database constraints and alerts apply regardless of how the data is modified and provide a reliable last line of defense.
Callbacks vs Bypasses vs Database Enforcement (Rails + Django)
Each subsection below shows Rails and Django side by side for the same idea: application-layer protection, then how it's bypassed, then database-level defense.
Protecting a Sensitive Flag with Callbacks
We want only certain admin actions to set is_internal_admin_user. Application callbacks/signals enforce that on the normal save path. Basically, if the API comes through this path, then the validation checks will work.
Rails Example
# app/models/user.rb
class User < ApplicationRecord
attr_accessor :grant_admin_by
before_save :validate_internal_admin_change
private
def validate_internal_admin_change
return unless is_internal_admin_user_changed?
return if grant_admin_by.present? && grant_admin_by.is_a?(AdminUser)
if is_internal_admin_user?
errors.add(:base, "Only admins can grant internal admin status")
throw(:abort)
end
end
end
Controlled grant: user.grant_admin_by = current_admin then user.update!(is_internal_admin_user: true).
Django Example
# myapp/models.py
from django.db import models
class User(models.Model):
email = models.EmailField(unique=True)
is_internal_admin_user = models.BooleanField(default=False)
role = models.CharField(max_length=32, default='user')
class Meta:
db_table = 'users'
# myapp/signals.py
from django.db.models.signals import pre_save
from django.dispatch import receiver
from django.core.exceptions import PermissionDenied
from .models import User
@receiver(pre_save, sender=User)
def validate_internal_admin_change(sender, instance, **kwargs):
# On pre_save, instance.pk exists for updates; old record still in DB
# For new instances, instance.pk is None, so skip validation
if not instance.pk:
return
try:
# Fetch the current state from the database before this save
old = User.objects.get(pk=instance.pk)
# Check if is_internal_admin_user is being changed
if old.is_internal_admin_user != instance.is_internal_admin_user:
# Only allow if granted by an admin
if not getattr(instance, '_granted_by_admin', False):
raise PermissionDenied(
"Only admins can grant or revoke internal admin status"
)
except User.DoesNotExist:
pass
Controlled grant: user._granted_by_admin = True then user.save().
Bypass #1 , Bulk Update (No Callbacks / No Signals)
Bulk update APIs issue a single SQL UPDATE and do not run per-record callbacks or signals. Your validation code is completely skipped. So if your code relies on this, then your earlier assumption of validation will fail. This means that any API or a job that uses bulk update will automatically bypass the admin level validation check.
Django Bulk Update
# pre_save / post_save signals are NOT sent
User.objects.filter(id__in=user_ids).update(is_internal_admin_user=True)
# Database updated, validation never ran
Rails Bulk Update
# before_save / after_save callbacks are NOT run
User.where(id: user_ids).update_all(is_internal_admin_user: true)
# Database updated, callbacks never fired
Bypass #2 , Raw SQL (No ORM Path)
Direct SQL never touches the model lifecycle. So any process or code path that acts on direct sql will never encounter the validation check, thus resulting in a bypass.
Django Raw SQL
from django.db import connection
with connection.cursor() as cursor:
cursor.execute(
"UPDATE users SET is_internal_admin_user = true WHERE id = %s",
[user_id]
)
# Validation never runs
Rails Raw SQL
ActiveRecord::Base.connection.execute(
"UPDATE users SET is_internal_admin_user = true WHERE id = #{user_id}"
)
# Callbacks never fire
Bypass #3 , Migrations (No ORM Callbacks in Practice)
Migrations run with full DB access; they often use raw SQL or bulk updates, so app callbacks/signals don't run.
Rails Migration
# db/migrate/20250217000000_fix_admin_flags.rb
class FixAdminFlags < ActiveRecord::Migration[7.0]
def up
execute <<-SQL
UPDATE users SET is_internal_admin_user = true WHERE email LIKE '%@company.com';
SQL
end
end
Django Migration
# myapp/migrations/0002_fix_admin_flags.py
from django.db import migrations
def set_internal_admins(apps, schema_editor):
User = apps.get_model('myapp', 'User')
# Bulk update in migration: signals don't fire
User.objects.using(schema_editor.connection.alias).filter(
email__endswith='@company.com'
).update(is_internal_admin_user=True)
class Migration(migrations.Migration):
dependencies = [('myapp', '0001_initial')]
operations = [
migrations.RunPython(set_internal_admins, migrations.RunPython.noop),
]
Bypass #4 , Race Conditions (TOCTOU)
Two concurrent requests can both pass validation and create an inconsistent state.
# Django Example: Check-Then-Act Race Condition
from django.db import models
class User(models.Model):
is_admin = models.BooleanField(default=False)
is_locked = models.BooleanField(default=False)
def save(self, *args, **kwargs):
# Check invariant: can't be both locked and admin
if self.is_locked and self.is_admin:
raise ValueError("Locked users cannot be admins")
super().save(*args, **kwargs)
# Request 1: Grant admin
user = User.objects.get(id=1) # Fetch: is_locked=False, is_admin=False
# Validation passes (is_locked=False)
user.is_admin = True
user.save() # ✓ Saved
# Request 2 (concurrent with Request 1): Lock account
user2 = User.objects.get(id=1) # Fetch: is_locked=False, is_admin=False (stale)
# Validation passes (is_admin=False from old data)
user2.is_locked = True
user2.save() # ✓ Saved
# Result: User is both locked AND admin (invalid state)
# The validation couldn't prevent this because it ran before concurrent write
With a database constraint, this is impossible:
ALTER TABLE users
ADD CONSTRAINT no_locked_admins
CHECK (NOT (is_locked AND is_admin));
Now Request 2's save will fail at the database level, forcing the application to handle the conflict.
# Rails Equivalent
class User < ApplicationRecord
before_save :validate_state
def validate_state
if is_locked && is_admin
raise "Locked users cannot be admins"
end
end
end
# Same race condition: two concurrent requests both pass validation
# Constraint prevents the invalid final state
Database Constraint as Last Line of Defense
We can't express "only an admin may set this" purely in SQL, but we can enforce allowed values (e.g. role enum) and audit every change with triggers. These apply to app, migrations, raw SQL, and external services.
Rails , CHECK + Trigger (PostgreSQL)
# db/migrate/20250217100000_add_sensitive_role_constraints.rb
class AddSensitiveRoleConstraints < ActiveRecord::Migration[7.0]
def up
# Enforce valid role values across ALL code paths
execute <<-SQL
ALTER TABLE users
ADD CONSTRAINT chk_users_role
CHECK (role IN ('user', 'support', 'admin', 'super_admin'));
SQL
# Create audit table for sensitive changes
execute <<-SQL
CREATE TABLE IF NOT EXISTS sensitive_user_changes_audit (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
changed_by_app VARCHAR(255),
old_admin BOOLEAN,
new_admin BOOLEAN,
old_role VARCHAR(32),
new_role VARCHAR(32)
);
SQL
# Trigger to log all sensitive changes
execute <<-SQL
CREATE OR REPLACE FUNCTION log_sensitive_user_changes()
RETURNS TRIGGER AS $$
BEGIN
IF (OLD.is_internal_admin_user IS DISTINCT FROM NEW.is_internal_admin_user)
OR (OLD.role IS DISTINCT FROM NEW.role) THEN
INSERT INTO sensitive_user_changes_audit (
user_id, changed_at, changed_by_app, old_admin, new_admin, old_role, new_role
) VALUES (
NEW.id, NOW(), current_setting('application_name', true),
OLD.is_internal_admin_user, NEW.is_internal_admin_user, OLD.role, NEW.role
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trg_sensitive_user_changes ON users;
CREATE TRIGGER trg_sensitive_user_changes
AFTER UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION log_sensitive_user_changes();
SQL
end
def down
execute "ALTER TABLE users DROP CONSTRAINT IF EXISTS chk_users_role;"
execute "DROP TRIGGER IF EXISTS trg_sensitive_user_changes ON users;"
execute "DROP FUNCTION IF EXISTS log_sensitive_user_changes();"
execute "DROP TABLE IF EXISTS sensitive_user_changes_audit;"
end
end
Django , CHECK Constraint + Trigger (PostgreSQL)
CHECK in a migration:
# myapp/migrations/0003_add_user_constraints.py
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [('myapp', '0002_initial')]
operations = [
# Enforce valid role values across ALL code paths
migrations.RunSQL(
sql="""
ALTER TABLE users
ADD CONSTRAINT chk_users_role
CHECK (role IN ('user', 'support', 'admin', 'super_admin'));
""",
reverse_sql="ALTER TABLE users DROP CONSTRAINT IF EXISTS chk_users_role;",
),
]
Trigger-based audit in a separate migration:
# myapp/migrations/0004_add_sensitive_audit_trigger.py
from django.db import migrations
CREATE_AUDIT_TABLE = """
CREATE TABLE IF NOT EXISTS sensitive_user_changes_audit (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
changed_by_app VARCHAR(255),
old_admin BOOLEAN,
new_admin BOOLEAN,
old_role VARCHAR(32),
new_role VARCHAR(32)
);
"""
CREATE_TRIGGER_FUNC = """
CREATE OR REPLACE FUNCTION log_sensitive_user_changes()
RETURNS TRIGGER AS $$
BEGIN
IF (OLD.is_internal_admin_user IS DISTINCT FROM NEW.is_internal_admin_user)
OR (OLD.role IS DISTINCT FROM NEW.role) THEN
INSERT INTO sensitive_user_changes_audit
(user_id, changed_at, changed_by_app, old_admin, new_admin, old_role, new_role)
VALUES (
NEW.id, NOW(), current_setting('application_name', true),
OLD.is_internal_admin_user, NEW.is_internal_admin_user, OLD.role, NEW.role
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
CREATE_TRIGGER = """
CREATE TRIGGER trg_sensitive_user_changes
AFTER UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION log_sensitive_user_changes();
"""
class Migration(migrations.Migration):
dependencies = [('myapp', '0003_add_user_constraints')]
operations = [
migrations.RunSQL(
sql=CREATE_AUDIT_TABLE,
reverse_sql="DROP TABLE IF EXISTS sensitive_user_changes_audit;",
),
migrations.RunSQL(
sql=CREATE_TRIGGER_FUNC,
reverse_sql="DROP FUNCTION IF EXISTS log_sensitive_user_changes();",
),
migrations.RunSQL(
sql=CREATE_TRIGGER,
reverse_sql="DROP TRIGGER IF EXISTS trg_sensitive_user_changes ON users;",
),
]
Real-Time Monitoring with ClickHouse
Instead of managing background listener processes, stream audit data to ClickHouse and use its native materialized views for real-time analytics and detection of privilege changes.
For a comprehensive guide on integrating PostgreSQL and ClickHouse as a unified data stack, refer to PostgreSQL + ClickHouse as the Open Source unified data stack. This approach uses PeerDB for CDC and pg_clickhouse for transparent query routing, allowing minimal application changes while scaling analytics efficiently.
ClickHouse Architecture
PostgreSQL Trigger
↓
├─→ Write to audit table (local backup/compliance)
└─→ Stream to ClickHouse (real-time analytics)
↓
ClickHouse Materialized View
├─→ Detect privilege grants
├─→ Detect role changes
└─→ Feed alerts pipeline
Setting Up ClickHouse Streaming
Once PostgreSQL data is replicated to ClickHouse via PeerDB (as covered in the open source stack section), the materialized views automatically detect privilege changes. Here's how the data flows:
Data Flow:
- PostgreSQL changes (inserts, updates, deletes)
- PeerDB captures changes via CDC
- Data replicates to
sensitive_user_changes_audittable in ClickHouse privilege_alerts_mvmaterialized view detects and categorizes changesprivilege_alertstable stores the alerts- Scheduled queries read alerts and send to Slack
The materialized views in the "Alerting on Privilege Changes" section automatically process all incoming data from PostgreSQL without requiring additional triggers or external pipelines. This is the power of ClickHouse's native architecture, detection happens at scale as data arrives.
Benefits of ClickHouse Approach
- No background processes to manage – Scheduled queries run natively
- Real-time analytics – Materialized views detect changes instantly
- Scalable – ClickHouse handles millions of events/second
- Observable – Full audit trail queryable from any tool
- Compliant – Natural compliance engine (Materialized views = audit logs)
- Team visibility – Analysts, security, ops can write their own queries Cost-effective – Single database eliminates external infrastructure
Alerting on Privilege Changes
Once privilege changes are detected in ClickHouse materialized views, you can create native Slack alerts directly within ClickHouse using scheduled queries. This eliminates the need for external Python scripts or background workers, keeping your entire alerting pipeline within ClickHouse.
Step 1: Create Alert Detection Table
This table stores the detected privilege alerts:
-- Table to store detected privilege alerts
CREATE TABLE privilege_alerts (
alert_id UUID DEFAULT generateUUIDv4(),
alert_type String,
user_id UInt32,
email String,
severity String,
old_admin Nullable(UInt8),
new_admin Nullable(UInt8),
old_role Nullable(String),
new_role Nullable(String),
changed_by_app String,
changed_at DateTime,
alert_created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (alert_created_at, user_id);
Step 2: Create Materialized View for Detection
This materialized view automatically categorizes privilege changes as it receives data from PostgreSQL:
-- Materialized view that detects privilege escalations
CREATE MATERIALIZED VIEW privilege_alerts_mv TO privilege_alerts AS
SELECT
generateUUIDv4() as alert_id,
CASE
WHEN (new_admin = 1 AND old_admin = 0) THEN 'admin_grant'
WHEN (new_admin = 0 AND old_admin = 1) THEN 'admin_revoke'
WHEN new_role IN ('admin', 'super_admin') AND new_role != old_role THEN 'critical_role_grant'
ELSE 'privilege_change'
END as alert_type,
user_id,
email,
CASE
WHEN (new_admin = 1 AND old_admin = 0) THEN 'critical'
WHEN new_role IN ('admin', 'super_admin') AND new_role != old_role THEN 'critical'
ELSE 'warning'
END as severity,
old_admin,
new_admin,
old_role,
new_role,
changed_by_app,
changed_at
FROM sensitive_user_changes_audit
WHERE (new_admin != old_admin) OR (new_role != old_role);
Step 3: Create Alert Tracking Table
This table prevents duplicate Slack messages:
-- Table to track sent alerts and avoid duplicates
CREATE TABLE sent_alerts_log (
alert_id UUID,
sent_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY sent_at;
Step 4: Set Up Slack Webhook
Generate a Slack incoming webhook:
- Go to https://api.slack.com/apps
- Create a new app or select an existing one
- Enable "Incoming Webhooks" from the sidebar
- Click "Add New Webhook to Workspace"
- Select the channel for alerts
- Copy the generated webhook URL
Replace YOUR/SLACK/WEBHOOK/URL in the next step with your actual URL.
Step 5: Create Scheduled Query for Slack Alerts
This scheduled query runs every 10 seconds and sends unsent alerts to Slack:
-- Scheduled query that sends privilege alerts to Slack
CREATE QUERY send_privilege_alerts AS
SELECT
http(
'https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK/URL',
'POST',
'{
"attachments": [{
"color": "' || (severity = 'critical' ? 'danger' : 'warning') || '",
"title": "🚨 Privilege Change: ' || alert_type || '",
"text": "User ' || email || ' (' || toString(user_id) || ')",
"fields": [
{"title": "Alert Type", "value": "' || alert_type || '", "short": true},
{"title": "Severity", "value": "' || severity || '", "short": true},
{"title": "Changed By", "value": "' || changed_by_app || '", "short": true},
{"title": "Old Admin", "value": "' || toString(old_admin) || '", "short": true},
{"title": "New Admin", "value": "' || toString(new_admin) || '", "short": true},
{"title": "Old Role", "value": "' || old_role || '", "short": true},
{"title": "New Role", "value": "' || new_role || '", "short": true},
{"title": "Timestamp", "value": "' || toString(changed_at) || '", "short": false}
]
}]
}'
) AS slack_response,
alert_id
FROM privilege_alerts pa
LEFT JOIN sent_alerts_log sal USING alert_id
WHERE sal.alert_id IS NULL AND pa.alert_created_at > now() - interval 1 minute
EVERY 10 SECOND;
Step 6: Log Sent Alerts
Automatically log sent alerts to prevent duplicates:
-- Materialized view that logs sent alerts
CREATE MATERIALIZED VIEW log_sent_alerts_mv TO sent_alerts_log AS
SELECT
alert_id
FROM privilege_alerts pa
LEFT JOIN sent_alerts_log sal USING alert_id
WHERE sal.alert_id IS NULL AND pa.alert_created_at > now() - interval 1 minute;
Querying Alerts
Once alerts are being created, query them for monitoring and compliance:
Find all critical alerts from the last 24 hours:
SELECT
alert_id,
alert_type,
email,
user_id,
severity,
old_role,
new_role,
changed_by_app,
changed_at
FROM privilege_alerts
WHERE severity = 'critical'
AND changed_at > now() - interval 24 hour
ORDER BY changed_at DESC;
Find admin grants with their source:
SELECT
alert_type,
email,
user_id,
changed_by_app,
old_admin,
new_admin,
changed_at
FROM privilege_alerts
WHERE alert_type = 'admin_grant'
AND changed_at > now() - interval 7 day
ORDER BY changed_at DESC;
Count alerts by type and hour:
SELECT
toStartOfHour(changed_at) as hour,
alert_type,
severity,
COUNT(*) as alert_count
FROM privilege_alerts
WHERE changed_at > now() - interval 7 day
GROUP BY hour, alert_type, severity
ORDER BY hour DESC, alert_count DESC;
Slack Alert Message Format
The scheduled query sends Slack messages with color-coding by severity:
🚨 Privilege Change: admin_grant
User admin@example.com (123)
Alert Type: admin_grant | Severity: critical
Changed By: web_app | Old Admin: null | New Admin: true
Old Role: user | New Role: admin
Timestamp: 2025-02-17 15:32:45
Advantages of Native ClickHouse Alerting
- No external dependencies – Everything runs within ClickHouse
- Real-time detection – Materialized views process data as it arrives
- Scalable – Handles millions of privilege changes per second
- Built-in functions – Uses ClickHouse's native http() function
- Full audit trail – All alerts stored in ClickHouse tables
- Deduplication – Prevents duplicate Slack messages
- Observable – Query alerts alongside raw audit data
- Cost-effective – Single database for sync, detection, and alerting
Testing That Constraints Actually Work
Write tests to verify constraints are enforced from all code paths and you actually receive alerts when something goes wrong.
⚠️ Critical Limitation: Database Access Still Bypasses Constraints
Database constraints and triggers detect and audit sensitive changes, but they do not prevent them if someone has raw database access. If an attacker or compromised service can connect directly to the database (via psql, a database IDE, or application code with connection strings), they can:
-- Attacker with direct DB access bypasses constraints like this:
UPDATE users SET is_internal_admin_user = true WHERE id = 123;
-- CHECK constraint can prevent invalid combinations (e.g. locked + admin)
-- But cannot prevent "only admins can set this" rules
What constraints DO prevent:
- Logical contradictions (e.g.
is_locked AND is_adminif forbidden) - Invalid enum values (e.g. role must be in a specific set)
- Orphaned foreign keys
What constraints DON'T prevent:
- Unauthorized value assignments if the attacker has direct DB access
- Privilege escalation if the attacker has raw SQL access
Mitigation:
- Restrict database access – Limit who/what can connect directly to the production database
- Use triggers to audit every change – Log who (app name, service name) changed what and when
- Alert on unexpected changes – Monitor the audit table for suspicious activity
- Regular reconciliation – Compare "who has admin in the DB" against "who should have admin per source of truth"
Defense in Depth: Summary
| Layer | What it does | Limitation |
|---|---|---|
| App callbacks / signals | Validate and enforce policy on normal ORM usage | Skipped by bulk updates, raw SQL, migrations, other services; vulnerable to TOCTOU |
| DB CHECK constraints | Enforce allowed values and logical invariants on every write | Cannot encode "who" is allowed to set a value; only prevents invalid combinations |
| DB triggers + audit | Log every sensitive change from any source | Requires schema and operational care; detects but doesn't prevent if attacker has DB access |
| Slack/monitoring alerts | Notify on suspicious privilege changes | Reactive; must be acted on quickly |
| Weekly reconciliation | Detect privilege creep and drift from source of truth | Slower feedback; catches things alerts might miss |
| Access control | Limit who can connect directly to the database | Prevents raw SQL bypass at the source |
Application checks are useful guardrails. Database enforcement, auditing, alerting, and reconciliation are essential for protecting critical invariants and knowing when they're violated, regardless of how the data is modified.
Relying only on on_change callbacks leaves bulk updates, raw SQL, migrations, and external systems outside your safety net. Database constraints, triggers, alerts, and reconciliation close that gap and give you a reliable, observable, defense-in-depth approach to privilege management.
Conclusion
Your application's callbacks are a good start. But they're not enough.
Every organization has paths to the database that bypass the ORM: migrations, bulk updates, background jobs, maintenance scripts, external services. Your callbacks can't protect those paths. Database constraints, triggers, and auditing can.
The strategy:
- Enforce invariants at the database level – CHECK constraints prevent logical contradictions
- Audit every sensitive change – Triggers log who changed what and when
- Alert on grants and escalations – Monitor the audit table and send Slack alerts
- Reconcile regularly – Weekly check that reality matches your source of truth
- Restrict database access – Limit who can connect directly and modify data
Callbacks give you policy. Constraints give you enforcement. Triggers give you visibility. Alerts give you response. Together, they give you confidence that your sensitive data is actually protected.
Please subscribe if you would like to receive more content like this :)