Objective:
requests, pandas, scikit-learn, joblib, smtplib, email, logging, kafka-python.pandas to preprocess the transaction data for fraud detection.joblib and apply it to detect fraudulent transactions.pip install requests pandas scikit-learn joblib smtplib email logging kafka-python
We fetch transaction data in real-time from a Kafka stream:
from kafka import KafkaConsumer
import json
import logging
# Configure logging
logging.basicConfig(filename='fraud_detection.log', level=logging.INFO,
format='%(asctime)s:%(levelname)s:%(message)s')
def fetch_transaction_data(topic, bootstrap_servers):
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
logging.info("Connected to Kafka topic and waiting for messages.")
return consumer
topic = 'transactions'
bootstrap_servers = ['localhost:9092']
consumer = fetch_transaction_data(topic, bootstrap_servers)
kafka-python to create a Kafka consumer that connects to a specified topic and fetches transaction data in real-time.We preprocess the transaction data:
import pandas as pd
def preprocess_transaction_data(transaction):
df = pd.DataFrame([transaction])
# Example preprocessing steps
df['transaction_amount'] = df['transaction_amount'].astype(float)
df['transaction_date'] = pd.to_datetime(df['transaction_date'])
# Additional feature engineering can be done here
logging.info("Transaction data preprocessed successfully.")
return df
# Example transaction data received
transaction_data = {
'transaction_id': '12345',
'account_id': '67890',
'transaction_amount': '100.0',
'transaction_date': '2024-06-06T12:00:00',
'merchant': 'ABC Store'
}
preprocessed_data = preprocess_transaction_data(transaction_data)
We load a pre-trained model and apply it to detect fraudulent transactions:
import joblib
# Load pre-trained model
model = joblib.load('fraud_detection_model.pkl')
def detect_fraud(transaction_df):
prediction = model.predict(transaction_df)
fraud_probability = model.predict_proba(transaction_df)[:, 1]
is_fraud = prediction[0]
probability = fraud_probability[0]
logging.info(f"Fraud detection completed. Is fraud: {is_fraud}, Probability: {probability}")
return is_fraud, probability
is_fraud, fraud_probability = detect_fraud(preprocessed_data)
joblib.We log and store suspicious transactions:
import sqlite3
# Database connection
conn = sqlite3.connect('suspicious_transactions.db')
cursor = conn.cursor()
# Create table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS suspicious_transactions (
transaction_id TEXT PRIMARY KEY,
account_id TEXT,
transaction_amount REAL,
transaction_date TEXT,
merchant TEXT,
fraud_probability REAL
)
''')
conn.commit()
def log_and_store_suspicious_transaction(transaction, fraud_probability):
transaction['fraud_probability'] = fraud_probability
cursor.execute('''
INSERT INTO suspicious_transactions (
transaction_id, account_id, transaction_amount, transaction_date, merchant, fraud_probability
) VALUES (?, ?, ?, ?, ?, ?)
''', (
transaction['transaction_id'],
transaction['account_id'],
transaction['transaction_amount'],
transaction['transaction_date'],
transaction['merchant'],
transaction['fraud_probability']
))
conn.commit()
logging.info(f"Suspicious transaction logged and stored: {transaction['transaction_id']}")
if is_fraud:
log_and_store_suspicious_transaction(transaction_data, fraud_probability)
We generate and send email alerts for detected fraudulent transactions:
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
def send_alert(transaction, fraud_probability, to_email):
from_email = "youremail@example.com"
subject = "Fraudulent Transaction Alert"
body = f"""
Fraudulent Transaction Detected:
Transaction ID: {transaction['transaction_id']}
Account ID: {transaction['account_id']}
Transaction Amount: {transaction['transaction_amount']}
Transaction Date: {transaction['transaction_date']}
Merchant: {transaction['merchant']}
Fraud Probability: {fraud_probability}
"""
msg = MIMEMultipart()
msg['From'] = from_email
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
try:
server = smtplib.SMTP('smtp.example.com', 587)
server.starttls()
server.login(from_email, "yourpassword")
server.send_message(msg)
server.quit()
logging.info("Alert email sent successfully.")
except Exception as e:
logging.error(f"Error sending alert email: {e}")
to_email = "fraud_team@example.com"
if is_fraud:
send_alert(transaction_data, fraud_probability, to_email)
The full script integrates all the steps and ensures a seamless workflow from data streaming to alert generation. The main function can be wrapped up as follows:
def main():
consumer = fetch_transaction_data(topic, bootstrap_servers)
for message in consumer:
transaction_data = message.value
preprocessed_data = preprocess_transaction_data(transaction_data)
is_fraud, fraud_probability = detect_fraud(preprocessed_data)
if is_fraud:
log_and_store_suspicious_transaction(transaction_data, fraud_probability)
send_alert(transaction_data, fraud_probability, to_email)
if __name__ == '__main__':
main()
In this advanced real-world Python scenario, we developed a robust and automated real-time fraud detection system for a financial institution. The system integrates several components to ensure the efficient and effective detection of fraudulent transactions.
kafka-python, the system connects to a Kafka stream to fetch transaction data in real-time, ensuring timely processing of each transaction.pandas, the system preprocesses the transaction data to make it suitable for machine learning predictions, including data transformation and feature engineering.smtplib, the system generates and sends real-time email alerts to the relevant stakeholders, ensuring immediate action can be taken on detected fraudulent transactions.This solution demonstrates the power of integrating data streaming, preprocessing, machine learning, and alerting mechanisms into a seamless pipeline. By automating the detection and alerting process, the system helps financial institutions mitigate the risk of fraud, protect their customers, and maintain the integrity of their financial operations.