Validating Webhook Signatures
Important: Always validate webhook signatures before processing the payload. This ensures the request came from Lynkwell and hasn't been tampered with.
Signature Format
The X-Webhook-Signature header contains a timestamp and signature:
t=1705315800,v1=5257a869e7ecebeda32affa62cdca3fa51cad7e77a0e56ff536d0ce8e108d8bdt- Unix timestamp when the signature was generatedv1- HMAC-SHA256 signature
Validation Algorithm
- Extract the timestamp (
t) and signature (v1) from the header - Construct the signed payload:
{timestamp}.{raw_request_body} - Compute HMAC-SHA256 of the signed payload using your webhook secret
- Compare your computed signature with the provided signature
- Optionally, verify the timestamp is within an acceptable window (e.g., 5 minutes)
Code Examples
Node.js / TypeScript
import { createHmac, timingSafeEqual } from 'crypto';
interface WebhookValidationResult {
valid: boolean;
error?: string;
}
function validateWebhookSignature(
payload: string,
signatureHeader: string,
secret: string,
toleranceSeconds: number = 300
): WebhookValidationResult {
// Parse the signature header
const parts = signatureHeader.split(',');
const timestamp = parts.find((p) => p.startsWith('t='))?.slice(2);
const signature = parts.find((p) => p.startsWith('v1='))?.slice(3);
if (!timestamp || !signature) {
return { valid: false, error: 'Invalid signature header format' };
}
// Check timestamp tolerance (prevent replay attacks)
const timestampAge = Math.floor(Date.now() / 1000) - parseInt(timestamp, 10);
if (Math.abs(timestampAge) > toleranceSeconds) {
return { valid: false, error: 'Timestamp outside tolerance window' };
}
// Compute expected signature
const signedPayload = `${timestamp}.${payload}`;
const expectedSignature = createHmac('sha256', secret).update(signedPayload).digest('hex');
// Use timing-safe comparison to prevent timing attacks
const signatureBuffer = Buffer.from(signature, 'hex');
const expectedBuffer = Buffer.from(expectedSignature, 'hex');
if (signatureBuffer.length !== expectedBuffer.length) {
return { valid: false, error: 'Invalid signature' };
}
if (!timingSafeEqual(signatureBuffer, expectedBuffer)) {
return { valid: false, error: 'Invalid signature' };
}
return { valid: true };
}
// Express.js example
app.post('/webhooks/lynkwell', express.raw({ type: 'application/json' }), (req, res) => {
const signature = req.headers['x-webhook-signature'] as string;
const payload = req.body.toString();
const result = validateWebhookSignature(payload, signature, process.env.WEBHOOK_SECRET!);
if (!result.valid) {
console.error('Webhook validation failed:', result.error);
return res.status(401).json({ error: 'Invalid signature' });
}
const event = JSON.parse(payload);
// Process the webhook
console.log('Received event:', event.type, event.id);
// Always return 200 quickly to acknowledge receipt
res.status(200).json({ received: true });
});Python
import hmac
import hashlib
import time
from typing import Tuple
def validate_webhook_signature(
payload: str,
signature_header: str,
secret: str,
tolerance_seconds: int = 300
) -> Tuple[bool, str | None]:
"""
Validate a Lynkwell webhook signature.
Returns:
Tuple of (is_valid, error_message)
"""
# Parse the signature header
parts = dict(p.split('=', 1) for p in signature_header.split(',') if '=' in p)
timestamp = parts.get('t')
signature = parts.get('v1')
if not timestamp or not signature:
return False, 'Invalid signature header format'
# Check timestamp tolerance
timestamp_age = abs(int(time.time()) - int(timestamp))
if timestamp_age > tolerance_seconds:
return False, 'Timestamp outside tolerance window'
# Compute expected signature
signed_payload = f'{timestamp}.{payload}'
expected_signature = hmac.new(
secret.encode('utf-8'),
signed_payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Use constant-time comparison
if not hmac.compare_digest(signature, expected_signature):
return False, 'Invalid signature'
return True, None
# Flask example
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/webhooks/lynkwell', methods=['POST'])
def handle_webhook():
signature = request.headers.get('X-Webhook-Signature', '')
payload = request.get_data(as_text=True)
is_valid, error = validate_webhook_signature(
payload,
signature,
os.environ['WEBHOOK_SECRET']
)
if not is_valid:
return jsonify({'error': error}), 401
event = request.get_json()
# Process the webhook
print(f"Received event: {event['type']} {event['id']}")
return jsonify({'received': True}), 200Go
package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"
"strings"
"time"
)
func ValidateWebhookSignature(payload, signatureHeader, secret string, toleranceSeconds int64) (bool, error) {
// Parse the signature header
var timestamp, signature string
for _, part := range strings.Split(signatureHeader, ",") {
if strings.HasPrefix(part, "t=") {
timestamp = strings.TrimPrefix(part, "t=")
} else if strings.HasPrefix(part, "v1=") {
signature = strings.TrimPrefix(part, "v1=")
}
}
if timestamp == "" || signature == "" {
return false, fmt.Errorf("invalid signature header format")
}
// Check timestamp tolerance
ts, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return false, fmt.Errorf("invalid timestamp")
}
age := time.Now().Unix() - ts
if age < 0 {
age = -age
}
if age > toleranceSeconds {
return false, fmt.Errorf("timestamp outside tolerance window")
}
// Compute expected signature
signedPayload := fmt.Sprintf("%s.%s", timestamp, payload)
mac := hmac.New(sha256.New, []byte(secret))
mac.Write([]byte(signedPayload))
expectedSignature := hex.EncodeToString(mac.Sum(nil))
// Use constant-time comparison
if !hmac.Equal([]byte(signature), []byte(expectedSignature)) {
return false, fmt.Errorf("invalid signature")
}
return true, nil
}
