Validating Webhook Signatures

Important: Always validate webhook signatures before processing the payload. This ensures the request came from Lynkwell and hasn't been tampered with.

Signature Format

The X-Webhook-Signature header contains a timestamp and signature:

t=1705315800,v1=5257a869e7ecebeda32affa62cdca3fa51cad7e77a0e56ff536d0ce8e108d8bd
  • t - Unix timestamp when the signature was generated
  • v1 - HMAC-SHA256 signature

Validation Algorithm

  1. Extract the timestamp (t) and signature (v1) from the header
  2. Construct the signed payload: {timestamp}.{raw_request_body}
  3. Compute HMAC-SHA256 of the signed payload using your webhook secret
  4. Compare your computed signature with the provided signature
  5. Optionally, verify the timestamp is within an acceptable window (e.g., 5 minutes)

Code Examples

Node.js / TypeScript

import { createHmac, timingSafeEqual } from 'crypto';

interface WebhookValidationResult {
  valid: boolean;
  error?: string;
}

function validateWebhookSignature(
  payload: string,
  signatureHeader: string,
  secret: string,
  toleranceSeconds: number = 300
): WebhookValidationResult {
  // Parse the signature header
  const parts = signatureHeader.split(',');
  const timestamp = parts.find((p) => p.startsWith('t='))?.slice(2);
  const signature = parts.find((p) => p.startsWith('v1='))?.slice(3);

  if (!timestamp || !signature) {
    return { valid: false, error: 'Invalid signature header format' };
  }

  // Check timestamp tolerance (prevent replay attacks)
  const timestampAge = Math.floor(Date.now() / 1000) - parseInt(timestamp, 10);
  if (Math.abs(timestampAge) > toleranceSeconds) {
    return { valid: false, error: 'Timestamp outside tolerance window' };
  }

  // Compute expected signature
  const signedPayload = `${timestamp}.${payload}`;
  const expectedSignature = createHmac('sha256', secret).update(signedPayload).digest('hex');

  // Use timing-safe comparison to prevent timing attacks
  const signatureBuffer = Buffer.from(signature, 'hex');
  const expectedBuffer = Buffer.from(expectedSignature, 'hex');

  if (signatureBuffer.length !== expectedBuffer.length) {
    return { valid: false, error: 'Invalid signature' };
  }

  if (!timingSafeEqual(signatureBuffer, expectedBuffer)) {
    return { valid: false, error: 'Invalid signature' };
  }

  return { valid: true };
}

// Express.js example
app.post('/webhooks/lynkwell', express.raw({ type: 'application/json' }), (req, res) => {
  const signature = req.headers['x-webhook-signature'] as string;
  const payload = req.body.toString();

  const result = validateWebhookSignature(payload, signature, process.env.WEBHOOK_SECRET!);

  if (!result.valid) {
    console.error('Webhook validation failed:', result.error);
    return res.status(401).json({ error: 'Invalid signature' });
  }

  const event = JSON.parse(payload);

  // Process the webhook
  console.log('Received event:', event.type, event.id);

  // Always return 200 quickly to acknowledge receipt
  res.status(200).json({ received: true });
});

Python

import hmac
import hashlib
import time
from typing import Tuple

def validate_webhook_signature(
    payload: str,
    signature_header: str,
    secret: str,
    tolerance_seconds: int = 300
) -> Tuple[bool, str | None]:
    """
    Validate a Lynkwell webhook signature.

    Returns:
        Tuple of (is_valid, error_message)
    """
    # Parse the signature header
    parts = dict(p.split('=', 1) for p in signature_header.split(',') if '=' in p)
    timestamp = parts.get('t')
    signature = parts.get('v1')

    if not timestamp or not signature:
        return False, 'Invalid signature header format'

    # Check timestamp tolerance
    timestamp_age = abs(int(time.time()) - int(timestamp))
    if timestamp_age > tolerance_seconds:
        return False, 'Timestamp outside tolerance window'

    # Compute expected signature
    signed_payload = f'{timestamp}.{payload}'
    expected_signature = hmac.new(
        secret.encode('utf-8'),
        signed_payload.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    # Use constant-time comparison
    if not hmac.compare_digest(signature, expected_signature):
        return False, 'Invalid signature'

    return True, None


# Flask example
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/webhooks/lynkwell', methods=['POST'])
def handle_webhook():
    signature = request.headers.get('X-Webhook-Signature', '')
    payload = request.get_data(as_text=True)

    is_valid, error = validate_webhook_signature(
        payload,
        signature,
        os.environ['WEBHOOK_SECRET']
    )

    if not is_valid:
        return jsonify({'error': error}), 401

    event = request.get_json()

    # Process the webhook
    print(f"Received event: {event['type']} {event['id']}")

    return jsonify({'received': True}), 200

Go

package main

import (
    "crypto/hmac"
    "crypto/sha256"
    "encoding/hex"
    "fmt"
    "strconv"
    "strings"
    "time"
)

func ValidateWebhookSignature(payload, signatureHeader, secret string, toleranceSeconds int64) (bool, error) {
    // Parse the signature header
    var timestamp, signature string
    for _, part := range strings.Split(signatureHeader, ",") {
        if strings.HasPrefix(part, "t=") {
            timestamp = strings.TrimPrefix(part, "t=")
        } else if strings.HasPrefix(part, "v1=") {
            signature = strings.TrimPrefix(part, "v1=")
        }
    }

    if timestamp == "" || signature == "" {
        return false, fmt.Errorf("invalid signature header format")
    }

    // Check timestamp tolerance
    ts, err := strconv.ParseInt(timestamp, 10, 64)
    if err != nil {
        return false, fmt.Errorf("invalid timestamp")
    }

    age := time.Now().Unix() - ts
    if age < 0 {
        age = -age
    }
    if age > toleranceSeconds {
        return false, fmt.Errorf("timestamp outside tolerance window")
    }

    // Compute expected signature
    signedPayload := fmt.Sprintf("%s.%s", timestamp, payload)
    mac := hmac.New(sha256.New, []byte(secret))
    mac.Write([]byte(signedPayload))
    expectedSignature := hex.EncodeToString(mac.Sum(nil))

    // Use constant-time comparison
    if !hmac.Equal([]byte(signature), []byte(expectedSignature)) {
        return false, fmt.Errorf("invalid signature")
    }

    return true, nil
}

Was this page helpful?