import os import pickle from typing import List, Dict, Any from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from dotenv import load_dotenv load_dotenv() class GmailClient: def __init__(self): self.service = None self._authenticate() def _authenticate(self): """Authenticate with Gmail API using OAuth2""" creds = None token_path = 'token.pickle' if os.path.exists(token_path): with open(token_path, 'rb') as token: creds = pickle.load(token) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_config( { "installed": { "client_id": os.getenv("GOOGLE_CLIENT_ID"), "client_secret": os.getenv("GOOGLE_CLIENT_SECRET"), "redirect_uris": [os.getenv("GOOGLE_REDIRECT_URI")], "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token" } }, os.getenv("GMAIL_SCOPES", "https://www.googleapis.com/auth/gmail.readonly").split() ) creds = flow.run_local_server(port=8080) with open(token_path, 'wb') as token: pickle.dump(creds, token) self.service = build('gmail', 'v1', credentials=creds) def fetch_emails(self, query: str = None, max_results: int = None) -> List[Dict[str, Any]]: """Fetch emails from Gmail with optional query filter""" try: request = self.service.users().messages().list( userId='me', labelIds=[os.getenv("INBOX_LABEL", "INBOX")], q=query, maxResults=max_results or int(os.getenv("MAX_RESULTS", 100)) ) response = request.execute() messages = response.get('messages', []) return [self._get_message_details(msg['id']) for msg in messages] except Exception as e: print(f"Error fetching emails: {e}") return [] def _get_message_details(self, message_id: str) -> Dict[str, Any]: """Get detailed message information""" try: message = self.service.users().messages().get( userId='me', id=message_id, format='metadata', metadataHeaders=['From', 'Subject', 'Date', 'Message-ID'] ).execute() headers = message['payload']['headers'] return { 'id': message_id, 'threadId': message['threadId'], 'from': next((h['value'] for h in headers if h['name'] == 'From'), ''), 'subject': next((h['value'] for h in headers if h['name'] == 'Subject'), ''), 'date': next((h['value'] for h in headers if h['name'] == 'Date'), ''), 'messageId': next((h['value'] for h in headers if h['name'] == 'Message-ID'), ''), 'snippet': message.get('snippet', '') } except Exception as e: print(f"Error getting message details: {e}") return {'id': message_id, 'error': str(e)} def get_thread_messages(self, thread_id: str) -> List[Dict[str, Any]]: """Get all messages in a thread""" try: thread = self.service.users().threads().get( userId='me', id=thread_id ).execute() messages = [] for msg in thread['messages']: headers = msg['payload']['headers'] messages.append({ 'id': msg['id'], 'threadId': thread_id, 'from': next((h['value'] for h in headers if h['name'] == 'From'), ''), 'subject': next((h['value'] for h in headers if h['name'] == 'Subject'), ''), 'date': next((h['value'] for h in headers if h['name'] == 'Date'), ''), 'messageId': next((h['value'] for h in headers if h['name'] == 'Message-ID'), ''), 'snippet': msg.get('snippet', '') }) return messages except Exception as e: print(f"Error getting thread messages: {e}") return [] if __name__ == "__main__": client = GmailClient() emails = client.fetch_emails() print(f"Fetched {len(emails)} emails")