diff --git a/config.py b/config.py new file mode 100644 index 0000000..e69de29 diff --git a/data/processed/assessment_prediction/company_id/output.csv b/data/processed/assessment_prediction/company_id/output.csv new file mode 100644 index 0000000..36b15fa --- /dev/null +++ b/data/processed/assessment_prediction/company_id/output.csv @@ -0,0 +1,6 @@ +open_items,red_flags,num_employees,duration,assessment_type_biweekly,assessment_type_quarterly,assessment_type_weekly,open_items_assessment_type_weekly_lag_1,open_items_assessment_type_biweekly_lag_1,open_items_assessment_type_quarterly_lag_1,open_items_weekly_ma_3,open_items_biweekly_ma_3,open_items_quarterly_ma_3,percentage_change_open_items +10,2,30,1,0,0,1,0.0,0.0,0.0,10.0,0.0,0.0,0.0 +12,1,25,1,1,0,0,10.0,0.0,0.0,10.0,12.0,0.0,19.999999999999996 +11,3,28,1,0,1,0,0.0,12.0,0.0,10.0,12.0,11.0,-8.333333333333337 +9,1,30,1,0,0,1,0.0,0.0,11.0,9.0,12.0,11.0,-18.181818181818176 +13,4,27,1,1,0,0,9.0,0.0,0.0,9.0,13.0,11.0,44.44444444444444 diff --git a/data/processed/assessment_prediction/testid/output.csv b/data/processed/assessment_prediction/testid/output.csv new file mode 100644 index 0000000..fe2c484 --- /dev/null +++ b/data/processed/assessment_prediction/testid/output.csv @@ -0,0 +1,6 @@ +start_date,end_date,open_items,red_flags,num_employees,assessment_type_biweekly,assessment_type_quarterly,assessment_type_weekly,open_items_assessment_type_weekly_lag_1,open_items_assessment_type_biweekly_lag_1,open_items_assessment_type_quarterly_lag_1,open_items_weekly_ma_3,open_items_biweekly_ma_3,open_items_quarterly_ma_3,time_since_last_event,percentage_change_open_items +2023-01-01,2023-01-02,10,2,30,0,0,1,0.0,0.0,0.0,10.0,0.0,0.0,0.0,0.0 +2023-01-08,2023-01-09,12,1,25,1,0,0,10.0,0.0,0.0,10.0,12.0,0.0,7.0,19.999999999999996 +2023-01-15,2023-01-16,11,3,28,0,1,0,0.0,12.0,0.0,10.0,12.0,11.0,7.0,-8.333333333333337 +2023-01-22,2023-01-23,9,1,30,0,0,1,0.0,0.0,11.0,9.0,12.0,11.0,7.0,-18.181818181818176 +2023-01-29,2023-01-30,13,4,27,1,0,0,9.0,0.0,0.0,9.0,13.0,11.0,7.0,44.44444444444444 diff --git a/data/raw/dummy_assessment_data.csv b/data/raw/dummy_assessment_data.csv new file mode 100644 index 0000000..28b8771 --- /dev/null +++ b/data/raw/dummy_assessment_data.csv @@ -0,0 +1,6 @@ +start_date,end_date,open_items,red_flags,num_employees,assessment_type +2023-01-01,2023-01-02,10,2,30,weekly +2023-01-08,2023-01-09,12,1,25,biweekly +2023-01-15,2023-01-16,11,3,28,quarterly +2023-01-22,2023-01-23,9,1,30,weekly +2023-01-29,2023-01-30,13,4,27,biweekly diff --git a/notebooks/data/processed/assessment_prediction/testid/output.csv b/notebooks/data/processed/assessment_prediction/testid/output.csv new file mode 100644 index 0000000..36b15fa --- /dev/null +++ b/notebooks/data/processed/assessment_prediction/testid/output.csv @@ -0,0 +1,6 @@ +open_items,red_flags,num_employees,duration,assessment_type_biweekly,assessment_type_quarterly,assessment_type_weekly,open_items_assessment_type_weekly_lag_1,open_items_assessment_type_biweekly_lag_1,open_items_assessment_type_quarterly_lag_1,open_items_weekly_ma_3,open_items_biweekly_ma_3,open_items_quarterly_ma_3,percentage_change_open_items +10,2,30,1,0,0,1,0.0,0.0,0.0,10.0,0.0,0.0,0.0 +12,1,25,1,1,0,0,10.0,0.0,0.0,10.0,12.0,0.0,19.999999999999996 +11,3,28,1,0,1,0,0.0,12.0,0.0,10.0,12.0,11.0,-8.333333333333337 +9,1,30,1,0,0,1,0.0,0.0,11.0,9.0,12.0,11.0,-18.181818181818176 +13,4,27,1,1,0,0,9.0,0.0,0.0,9.0,13.0,11.0,44.44444444444444 diff --git a/notebooks/models/assessment_prediction/testid/testid_latest_data.csv b/notebooks/models/assessment_prediction/testid/testid_latest_data.csv new file mode 100644 index 0000000..8068b2a --- /dev/null +++ b/notebooks/models/assessment_prediction/testid/testid_latest_data.csv @@ -0,0 +1,2 @@ +open_items,red_flags,num_employees,duration,assessment_type_biweekly,assessment_type_quarterly,assessment_type_weekly,open_items_assessment_type_weekly_lag_1,open_items_assessment_type_biweekly_lag_1,open_items_assessment_type_quarterly_lag_1,open_items_weekly_ma_3,open_items_biweekly_ma_3,open_items_quarterly_ma_3,percentage_change_open_items +13,4,27,1,1,0,0,9.0,0.0,0.0,9.0,13.0,11.0,44.44444444444444 diff --git a/notebooks/models/assessment_prediction/testid/testid_model.pkl b/notebooks/models/assessment_prediction/testid/testid_model.pkl new file mode 100644 index 0000000..9312980 Binary files /dev/null and b/notebooks/models/assessment_prediction/testid/testid_model.pkl differ diff --git a/requirements.txt b/requirements.txt index 838a4f2..fd72837 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,6 @@ python-dotenv pypdf pypandoc Spire.Doc -plum-dispatch==1.7.4 \ No newline at end of file +plum-dispatch==1.7.4 +pandas +scikit-learn \ No newline at end of file diff --git a/scripts/run_assessment_prediction_trainer.py b/scripts/run_assessment_prediction_trainer.py new file mode 100644 index 0000000..2618b86 --- /dev/null +++ b/scripts/run_assessment_prediction_trainer.py @@ -0,0 +1,45 @@ +import os +import logging +from logging.handlers import RotatingFileHandler +from sklearn.ensemble import RandomForestRegressor +from sklearn.multioutput import MultiOutputRegressor +from src.pipeline.data_preprocessor import DataPreprocessor +from src.pipeline.model_trainer import ModelTrainer + +# Set up logging +handler = RotatingFileHandler('/root/ds_erp_ai/logs/prediction_pipeline.log', maxBytes=100000, backupCount=3) +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +logger.addHandler(handler) + +# Example of DataPreprocessor and ModelTrainer classes from the previous steps +class CompanyModelPipeline: + def __init__(self, company_ids, input_base_path): + self.company_ids = company_ids + self.input_base_path = input_base_path + + def run_pipeline(self): + for company_id in self.company_ids: + try: + # Define paths for the company + input_path = os.path.join(self.input_base_path, f'{company_id}_raw_data.csv') + + logger.info(f"Starting preprocessing for company {company_id}.") + + # Step 1: Preprocess the data + preprocessor = DataPreprocessor(input_path=input_path, company_id=company_id) + processed_data_path = preprocessor.run() + logger.info(f"Data preprocessing completed for company {company_id}. Processed data saved to {processed_data_path}.") + + # Step 2: Train and save the model + model = MultiOutputRegressor(RandomForestRegressor(n_estimators=100, random_state=42)) + trainer = ModelTrainer(preprocessed_data_path=processed_data_path, company_id=company_id, model=model) + model_path, latest_data_path, evaluation_results = trainer.run() + + logger.info(f"Model training and evaluation completed for company {company_id}.") + logger.info(f"Model saved to {model_path} and latest data saved to {latest_data_path}.") + logger.info(f"Evaluation Results for company {company_id}: {evaluation_results}") + + except Exception as e: + logger.error(f"An error occurred while processing company {company_id}: {e}") + diff --git a/src/pipeline/data_preprocessor.py b/src/pipeline/data_preprocessor.py index e69de29..8bfa129 100644 --- a/src/pipeline/data_preprocessor.py +++ b/src/pipeline/data_preprocessor.py @@ -0,0 +1,68 @@ +import pandas as pd +import os + +class DataPreprocessor: + def __init__(self, input_path, company_id): + self.input_path = input_path + self.output_dir = os.path.join('data', 'processed', 'assessment_prediction', company_id) + self.company_id = company_id + self.df = None + + def load_data(self): + self.df = pd.read_csv(self.input_path) + + def preprocess(self): + # Convert 'start_date' and 'end_date' to datetime + self.df['start_date'] = pd.to_datetime(self.df['start_date']) + self.df['end_date'] = pd.to_datetime(self.df['end_date']) + + # Add duration (in days) by subtracting start_date from end_date + self.df['duration'] = (self.df['end_date'] - self.df['start_date']).dt.days + + # Drop the 'start_date' and 'end_date' columns as they are not needed for training + self.df.drop(columns=['start_date', 'end_date'], inplace=True) + + # Convert 'assessment_type' to categorical (one-hot encoding) + self.df = pd.get_dummies(self.df, columns=['assessment_type'], drop_first=False) + + # Convert boolean columns to 1s and 0s + self.df['assessment_type_weekly'] = self.df['assessment_type_weekly'].astype(int) + self.df['assessment_type_biweekly'] = self.df['assessment_type_biweekly'].astype(int) + self.df['assessment_type_quarterly'] = self.df['assessment_type_quarterly'].astype(int) + + # Function to create lagged features based on assessment type + def create_lagged_features(df, col, assessment_col): + lagged_col = f"{col}_{assessment_col}_lag_1" + df[lagged_col] = df[col].where(df[assessment_col] == 1).shift(1) + return df + + # Create lagged features for each assessment type + self.df = create_lagged_features(self.df, 'open_items', 'assessment_type_weekly') + self.df = create_lagged_features(self.df, 'open_items', 'assessment_type_biweekly') + self.df = create_lagged_features(self.df, 'open_items', 'assessment_type_quarterly') + + # Fill NaNs with 0 instead of dropping rows + self.df.fillna(0, inplace=True) + + # Create moving averages for each assessment type + self.df['open_items_weekly_ma_3'] = self.df['open_items'].where(self.df['assessment_type_weekly'] == 1).rolling(window=3, min_periods=1).mean().fillna(0) + self.df['open_items_biweekly_ma_3'] = self.df['open_items'].where(self.df['assessment_type_biweekly'] == 1).rolling(window=3, min_periods=1).mean().fillna(0) + self.df['open_items_quarterly_ma_3'] = self.df['open_items'].where(self.df['assessment_type_quarterly'] == 1).rolling(window=3, min_periods=1).mean().fillna(0) + + # Add percentage change in open items + self.df['percentage_change_open_items'] = self.df['open_items'].pct_change().fillna(0) * 100 + + def save_data(self): + os.makedirs(self.output_dir, exist_ok=True) # Ensure output directory exists + output_path = os.path.join(self.output_dir, 'output.csv') + self.df.to_csv(output_path, index=False) + return output_path + + def run(self): + self.load_data() + self.preprocess() + return self.save_data() + +# Example usage: +# preprocessor = DataPreprocessor(input_path='path_to_raw_data.csv', company_id='company_123') +# processed_data_path = preprocessor.run() diff --git a/src/pipeline/model_trainer.py b/src/pipeline/model_trainer.py index e69de29..dd7b769 100644 --- a/src/pipeline/model_trainer.py +++ b/src/pipeline/model_trainer.py @@ -0,0 +1,88 @@ +import pandas as pd +import os +from sklearn.model_selection import train_test_split +from sklearn.ensemble import RandomForestRegressor +from sklearn.multioutput import MultiOutputRegressor +import joblib +from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score +import logging +from logging.handlers import RotatingFileHandler + + +handler = RotatingFileHandler('/root/ds_erp_ai/logs/prediction_pipeline.log', maxBytes=100000, backupCount=3) +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +logger.addHandler(handler) + +class ModelTrainer: + def __init__(self, preprocessed_data_path, company_id, model): + self.preprocessed_data_path = preprocessed_data_path + self.output_dir = os.path.join('models', 'assessment_prediction', company_id) + self.company_id = company_id + self.df = None + self.model = model # Model passed as an argument + self.X_test = None + self.y_test = None + + def load_data(self): + self.df = pd.read_csv(self.preprocessed_data_path) + + def train_model(self): + # Split data into features (X) and target variables (y) + X = self.df.drop(columns=['open_items', 'red_flags']) + y = self.df[['open_items', 'red_flags']] # Multi-target for open items and red flags + + # Split into training and test sets with 10% as test size + X_train, self.X_test, y_train, self.y_test = train_test_split(X, y, test_size=0.1, random_state=42) + + # Train the model + self.model.fit(X_train, y_train) + + # Save the trained model + os.makedirs(self.output_dir, exist_ok=True) + model_path = os.path.join(self.output_dir, f'{self.company_id}_model.pkl') + joblib.dump(self.model, model_path) + print(f"Model saved to {model_path}") + + # Save the latest row (last assessment data) for inference + latest_data_path = os.path.join(self.output_dir, f'{self.company_id}_latest_data.csv') + self.df.tail(1).to_csv(latest_data_path, index=False) + print(f"Latest assessment data saved to {latest_data_path}") + + # Return the model path and latest data path + return model_path, latest_data_path + + def evaluate_model(self): + # Predict using the test data + y_pred = self.model.predict(self.X_test) + + # Calculate evaluation metrics + mae = mean_absolute_error(self.y_test, y_pred) + mse = mean_squared_error(self.y_test, y_pred) + r2 = r2_score(self.y_test, y_pred) + + print("Model Evaluation Metrics:") + print(f"Mean Absolute Error (MAE): {mae}") + print(f"Mean Squared Error (MSE): {mse}") + print(f"R-squared (R²): {r2}") + + # Return evaluation results + return {'mae': mae, 'mse': mse, 'r2': r2} + + def run(self): + # Load data and train the model + self.load_data() + model_path, latest_data_path = self.train_model() + + # Evaluate the model immediately after training + evaluation_results = self.evaluate_model() + + return model_path, latest_data_path, evaluation_results + +# Example usage +'''model = MultiOutputRegressor(RandomForestRegressor(n_estimators=100, random_state=42)) +trainer = ModelTrainer(preprocessed_data_path=res, company_id='testid', model=model) +model_path, latest_data_path, evaluation_results = trainer.run() +print(f"The model was saved at: {model_path}") +print(f"The latest data was saved at: {latest_data_path}") +print(f"Evaluation Results: {evaluation_results}")''' diff --git a/test.py b/test.py index 4cbb3f2..1e47022 100644 --- a/test.py +++ b/test.py @@ -1,16 +1,7 @@ -# Example usage of the Chatbot class: -from src.services.chatbot import Chatbot -from src.utils.document_loader import load_document -if __name__ == "__main__": - chatbot = Chatbot() +# Example usage +from scripts.run_assessment_prediction_trainer import CompanyModelPipeline +company_ids = ['company_123', 'company_456', 'company_789'] +input_base_path = '/root/ds_erp_ai/data/raw/dummy_assessment_data.csv' # The base path where the raw data for each company is stored - # Example inputs - path = r"C:\Users\User\Desktop\Blessing_AI\MKD\test_erp_ai\erp_ai\test\erp_ai\data\raw\coding_task_completion_document.pdf" - - question = "Have you completed Task X?" - user_input = "Yes" - docs = load_document(path) - - # Validate the worker's answer using the provided document - validation_result = chatbot.validate_worker(question, docs) - print(validation_result) \ No newline at end of file +pipeline = CompanyModelPipeline(company_ids=company_ids, input_base_path=input_base_path) +pipeline.run_pipeline() diff --git a/tests/test_pipeline/__init__.py b/tests/test_pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_pipeline/test_data_preprocessor.py b/tests/test_pipeline/test_data_preprocessor.py new file mode 100644 index 0000000..95abb7a --- /dev/null +++ b/tests/test_pipeline/test_data_preprocessor.py @@ -0,0 +1,18 @@ +import unittest +from src.pipeline.data_preprocessor import DataPreprocessor +import os +class TestDataPreprocessor(unittest.TestCase): + + def setUp(self): + self.dp = DataPreprocessor( + input_path="/root/ds_erp_ai/data/raw/dummy_assessment_data.csv", + company_id="company_id" + ) + + def test_run(self): + res = self.dp.run() + self.assertIsNotNone(res) # Check that the result is not None + self.assertTrue(os.path.exists(res)) # Check that the output file exists + +if __name__ == '__main__': + unittest.main() \ No newline at end of file