Automated Data Analysis and Reporting System

Objective:

  • Automate the process of fetching sales data.
  • Analyze the data to extract key performance indicators (KPIs).
  • Store the processed data in a PostgreSQL database.
  • Generate automated reports and send them via email.

Step 1: Setting Up the Environment

First, we install the necessary libraries using pip:

pip install requests pandas sqlalchemy matplotlib smtplib email logging psycopg2-binary

These libraries are used for making HTTP requests, data manipulation, database interaction, data visualization, sending emails, and logging.

Step 2: Database Configuration

We configure the database using SQLAlchemy and psycopg2:

from sqlalchemy import create_engine, Table, Column, Integer, String, Float, MetaData
import psycopg2

# Database connection
engine = create_engine('postgresql://user:password@localhost/retail_db')
metadata = MetaData()

# Define tables
raw_sales = Table('raw_sales', metadata,
                  Column('id', Integer, primary_key=True),
                  Column('order_id', String),
                  Column('product_id', String),
                  Column('category', String),
                  Column('quantity', Integer),
                  Column('price', Float),
                  Column('order_date', String))

processed_sales = Table('processed_sales', metadata,
                        Column('id', Integer, primary_key=True),
                        Column('total_sales', Float),
                        Column('average_order_value', Float),
                        Column('sales_by_category', String))

# Create tables
metadata.create_all(engine)
  • We create a SQLAlchemy engine for connecting to the PostgreSQL database.
  • We define two tables: raw_sales for storing raw sales data and processed_sales for storing processed and analyzed data.
  • We call metadata.create_all(engine) to create these tables in the database.

Step 3: Data Fetching

We fetch sales data from an API:

import requests
import logging

# Configure logging
logging.basicConfig(filename='data_analysis.log', level=logging.INFO,
                    format='%(asctime)s:%(levelname)s:%(message)s')

def fetch_sales_data(api_url):
    try:
        response = requests.get(api_url)
        response.raise_for_status()
        sales_data = response.json()
        logging.info("Sales data fetched successfully.")
        return sales_data
    except requests.exceptions.RequestException as e:
        logging.error(f"Error fetching sales data: {e}")
        return None

api_url = "https://api.example.com/sales"
sales_data = fetch_sales_data(api_url)
  • We configure logging to record the process and errors.
  • The fetch_sales_data function makes a GET request to the provided API URL.
  • If successful, it logs the success message and returns the sales data in JSON format.
  • If there is an error, it logs the error message and returns None.

Step 4: Data Processing and Analysis

We process and analyze the fetched sales data:

import pandas as pd

def process_sales_data(sales_data):
    df = pd.DataFrame(sales_data)
    
    # Calculate total sales
    df['total_sales'] = df['quantity'] * df['price']
    
    # Calculate average order value
    avg_order_value = df.groupby('order_id')['total_sales'].sum().mean()
    
    # Sales by category
    sales_by_category = df.groupby('category')['total_sales'].sum().to_dict()
    
    # Prepare processed data
    processed_data = {
        'total_sales': df['total_sales'].sum(),
        'average_order_value': avg_order_value,
        'sales_by_category': sales_by_category
    }
    
    logging.info("Sales data processed successfully.")
    return df, processed_data

raw_df, processed_data = process_sales_data(sales_data)
  • We use pandas to create a DataFrame from the sales data.
  • We calculate total sales for each order by multiplying quantity and price.
  • We calculate the average order value by grouping by order_id, summing the total sales for each order, and taking the mean.
  • We calculate sales by category by grouping by category and summing the total sales.
  • We compile these metrics into a dictionary called processed_data.

Step 5: Data Storage

We store the raw and processed data in the database:

def store_data(raw_df, processed_data, engine):
    # Store raw data
    raw_df.to_sql('raw_sales', engine, if_exists='replace', index=False)
    
    # Store processed data
    processed_df = pd.DataFrame([processed_data])
    processed_df.to_sql('processed_sales', engine, if_exists='replace', index=False)
    
    logging.info("Data stored in database successfully.")

store_data(raw_df, processed_data, engine)
  • We store the raw data in the raw_sales table.
  • We convert the processed data dictionary into a DataFrame and store it in the processed_sales table.

Step 6: Report Generation

We generate a report based on the processed data:

import matplotlib.pyplot as plt

def generate_report(processed_data):
    # Generate sales by category pie chart
    categories = list(processed_data['sales_by_category'].keys())
    sales = list(processed_data['sales_by_category'].values())
    
    plt.figure(figsize=(10, 6))
    plt.pie(sales, labels=categories, autopct='%1.1f%%', startangle=140)
    plt.title('Sales by Category')
    plt.savefig('sales_by_category.png')
    plt.close()
    
    # Compile report content
    report_content = f"""
    Total Sales: ${processed_data['total_sales']:.2f}
    Average Order Value: ${processed_data['average_order_value']:.2f}
    
    See the attached sales by category chart for more details.
    """
    
    logging.info("Report generated successfully.")
    return report_content, 'sales_by_category.png'

report_content, report_chart_path = generate_report(processed_data)
  • We generate a pie chart of sales by category using matplotlib.
  • We compile a textual report with the total sales and average order value.
  • We save the pie chart as an image file and return the report content and image file path.

Step 7: Automated Emailing

We send the generated report via email:

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders

def send_email_report(report_content, report_chart_path, to_email):
    from_email = "youremail@example.com"
    subject = "Weekly Sales Report"
    body = report_content
    
    msg = MIMEMultipart()
    msg['From'] = from_email
    msg['To'] = to_email
    msg['Subject'] = subject
    
    msg.attach(MIMEText(body, 'plain'))
    
    attachment = open(report_chart_path, "rb")
    part = MIMEBase('application', 'octet-stream')
    part.set_payload(attachment.read())
    encoders.encode_base64(part)
    part.add_header('Content-Disposition', f"attachment; filename= {report_chart_path}")
    
    msg.attach(part)
    
    try:
        server = smtplib.SMTP('smtp.example.com', 587)
        server.starttls()
        server.login(from_email, "yourpassword")
        server.send_message(msg)
        server.quit()
        
        logging.info("Email report sent successfully.")
    except Exception as e:
        logging.error(f"Error sending email report: {e}")

to_email = "stakeholder@example.com"
send_email_report(report_content, report_chart_path, to_email)
  • We use the smtplib library to send emails.
  • We create a multipart email message and attach the report content and the pie chart image.
  • We connect to the SMTP server, log in, and send the email.

Step 8: Error Handling and Logging

Throughout the script, we use try-except blocks to handle errors and log messages to record the status of operations and any errors that occur. This ensures that the system is robust and any issues can be quickly identified and addressed.

Full Script Execution

The full script integrates all the steps and ensures a seamless workflow from data fetching to reporting. The main function can be wrapped up as follows:

def main():
    # Fetch sales data
    sales_data = fetch_sales_data(api_url)
    
    if sales_data:
        # Process sales data
        raw_df, processed_data = process_sales_data(sales_data)
        
        # Store data in database
        store_data(raw_df, processed_data, engine)
        
        # Generate report
        report_content, report_chart_path = generate_report(processed_data)
        
        # Send email report
        send_email_report(report_content, report_chart_path, to_email)
        
    # Close database connection
    engine.dispose()

if __name__ == '__main__':
    main()

Summary

  • Data Fetching: Uses requests to fetch data from an API.
  • Data Processing: Uses pandas to clean and analyze the data, generating key metrics.
  • Data Storage: Uses SQLAlchemy to store raw and processed data in a PostgreSQL database.
  • Report Generation: Uses matplotlib to create visualizations and compile a textual report.
  • Automated Emailing: Uses smtplib to send the report via email.
  • Error Handling and Logging: Implements robust error handling and logging throughout the script.