Real-Time Fraud Detection System

Objective:

  • Automate the process of fetching transaction data.
  • Preprocess and transform the data for model prediction.
  • Apply a pre-trained machine learning model to detect fraud.
  • Log and store suspicious transactions.
  • Generate and send alerts for detected fraudulent transactions.

Step-by-Step Solution

  1. Setup Environment:
    • Install necessary libraries: requests, pandas, scikit-learn, joblib, smtplib, email, logging, kafka-python.
  2. Data Streaming:
    • Define functions to fetch transaction data from a Kafka stream.
  3. Data Preprocessing:
    • Use pandas to preprocess the transaction data for fraud detection.
  4. Model Prediction:
    • Load a pre-trained model using joblib and apply it to detect fraudulent transactions.
  5. Logging and Storing:
    • Log suspicious transactions and store them in a database.
  6. Alert Generation:
    • Generate and send email alerts for detected fraudulent transactions.
  7. Error Handling and Logging:
    • Implement robust error handling and logging throughout the process.

Code Implementation

1. Setting Up the Environment

pip install requests pandas scikit-learn joblib smtplib email logging kafka-python

2. Data Streaming

We fetch transaction data in real-time from a Kafka stream:

from kafka import KafkaConsumer
import json
import logging

# Configure logging
logging.basicConfig(filename='fraud_detection.log', level=logging.INFO,
                    format='%(asctime)s:%(levelname)s:%(message)s')

def fetch_transaction_data(topic, bootstrap_servers):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    
    logging.info("Connected to Kafka topic and waiting for messages.")
    return consumer

topic = 'transactions'
bootstrap_servers = ['localhost:9092']
consumer = fetch_transaction_data(topic, bootstrap_servers)
  • Kafka Consumer: We use kafka-python to create a Kafka consumer that connects to a specified topic and fetches transaction data in real-time.
  • Logging Configuration: We configure logging to record the process and errors.

3. Data Preprocessing

We preprocess the transaction data:

import pandas as pd

def preprocess_transaction_data(transaction):
    df = pd.DataFrame([transaction])
    
    # Example preprocessing steps
    df['transaction_amount'] = df['transaction_amount'].astype(float)
    df['transaction_date'] = pd.to_datetime(df['transaction_date'])
    
    # Additional feature engineering can be done here
    
    logging.info("Transaction data preprocessed successfully.")
    return df

# Example transaction data received
transaction_data = {
    'transaction_id': '12345',
    'account_id': '67890',
    'transaction_amount': '100.0',
    'transaction_date': '2024-06-06T12:00:00',
    'merchant': 'ABC Store'
}

preprocessed_data = preprocess_transaction_data(transaction_data)
  • DataFrame Creation: We create a DataFrame from the incoming transaction data.
  • Data Transformation: We convert the transaction amount to a float and the transaction date to a datetime format.
  • Additional Feature Engineering: We can perform additional feature engineering as needed.

4. Model Prediction

We load a pre-trained model and apply it to detect fraudulent transactions:

import joblib

# Load pre-trained model
model = joblib.load('fraud_detection_model.pkl')

def detect_fraud(transaction_df):
    prediction = model.predict(transaction_df)
    fraud_probability = model.predict_proba(transaction_df)[:, 1]
    
    is_fraud = prediction[0]
    probability = fraud_probability[0]
    
    logging.info(f"Fraud detection completed. Is fraud: {is_fraud}, Probability: {probability}")
    return is_fraud, probability

is_fraud, fraud_probability = detect_fraud(preprocessed_data)
  • Model Loading: We load a pre-trained fraud detection model using joblib.
  • Fraud Detection: We apply the model to the preprocessed transaction data to detect fraud and obtain the probability.

5. Logging and Storing

We log and store suspicious transactions:

import sqlite3

# Database connection
conn = sqlite3.connect('suspicious_transactions.db')
cursor = conn.cursor()

# Create table if it doesn't exist
cursor.execute('''
    CREATE TABLE IF NOT EXISTS suspicious_transactions (
        transaction_id TEXT PRIMARY KEY,
        account_id TEXT,
        transaction_amount REAL,
        transaction_date TEXT,
        merchant TEXT,
        fraud_probability REAL
    )
''')
conn.commit()

def log_and_store_suspicious_transaction(transaction, fraud_probability):
    transaction['fraud_probability'] = fraud_probability
    cursor.execute('''
        INSERT INTO suspicious_transactions (
            transaction_id, account_id, transaction_amount, transaction_date, merchant, fraud_probability
        ) VALUES (?, ?, ?, ?, ?, ?)
    ''', (
        transaction['transaction_id'],
        transaction['account_id'],
        transaction['transaction_amount'],
        transaction['transaction_date'],
        transaction['merchant'],
        transaction['fraud_probability']
    ))
    conn.commit()
    logging.info(f"Suspicious transaction logged and stored: {transaction['transaction_id']}")

if is_fraud:
    log_and_store_suspicious_transaction(transaction_data, fraud_probability)
  • Database Connection: We connect to an SQLite database and create a table for storing suspicious transactions.
  • Logging and Storing: We log suspicious transactions and store them in the database along with the fraud probability.

6. Alert Generation

We generate and send email alerts for detected fraudulent transactions:

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

def send_alert(transaction, fraud_probability, to_email):
    from_email = "youremail@example.com"
    subject = "Fraudulent Transaction Alert"
    body = f"""
    Fraudulent Transaction Detected:
    
    Transaction ID: {transaction['transaction_id']}
    Account ID: {transaction['account_id']}
    Transaction Amount: {transaction['transaction_amount']}
    Transaction Date: {transaction['transaction_date']}
    Merchant: {transaction['merchant']}
    Fraud Probability: {fraud_probability}
    """
    
    msg = MIMEMultipart()
    msg['From'] = from_email
    msg['To'] = to_email
    msg['Subject'] = subject
    msg.attach(MIMEText(body, 'plain'))
    
    try:
        server = smtplib.SMTP('smtp.example.com', 587)
        server.starttls()
        server.login(from_email, "yourpassword")
        server.send_message(msg)
        server.quit()
        logging.info("Alert email sent successfully.")
    except Exception as e:
        logging.error(f"Error sending alert email: {e}")

to_email = "fraud_team@example.com"
if is_fraud:
    send_alert(transaction_data, fraud_probability, to_email)
  • Email Composition: We create a multipart email message and attach the alert content.
  • SMTP Connection: We connect to the SMTP server, log in, and send the email alert.

Step 7: Full Script Execution

The full script integrates all the steps and ensures a seamless workflow from data streaming to alert generation. The main function can be wrapped up as follows:

def main():
    consumer = fetch_transaction_data(topic, bootstrap_servers)
    
    for message in consumer:
        transaction_data = message.value
        preprocessed_data = preprocess_transaction_data(transaction_data)
        is_fraud, fraud_probability = detect_fraud(preprocessed_data)
        
        if is_fraud:
            log_and_store_suspicious_transaction(transaction_data, fraud_probability)
            send_alert(transaction_data, fraud_probability, to_email)

if __name__ == '__main__':
    main()

Conclusion

In this advanced real-world Python scenario, we developed a robust and automated real-time fraud detection system for a financial institution. The system integrates several components to ensure the efficient and effective detection of fraudulent transactions.

  1. Data Streaming: Using kafka-python, the system connects to a Kafka stream to fetch transaction data in real-time, ensuring timely processing of each transaction.
  2. Data Preprocessing: Leveraging pandas, the system preprocesses the transaction data to make it suitable for machine learning predictions, including data transformation and feature engineering.
  3. Model Prediction: By loading a pre-trained machine learning model, the system applies advanced algorithms to predict the likelihood of fraud, providing both a binary classification and a probability score.
  4. Logging and Storing: The system logs suspicious transactions and stores them in an SQLite database, maintaining a record of all potentially fraudulent activities for further investigation.
  5. Alert Generation: Using smtplib, the system generates and sends real-time email alerts to the relevant stakeholders, ensuring immediate action can be taken on detected fraudulent transactions.
  6. Error Handling and Logging: Comprehensive error handling and logging throughout the process ensure that the system is robust, resilient, and capable of recovering from and reporting any issues that arise.

This solution demonstrates the power of integrating data streaming, preprocessing, machine learning, and alerting mechanisms into a seamless pipeline. By automating the detection and alerting process, the system helps financial institutions mitigate the risk of fraud, protect their customers, and maintain the integrity of their financial operations.