Files
email_alerts/gmail_client.py
T

122 lines
4.8 KiB
Python
Raw Normal View History

2025-08-05 22:29:54 +01:00
import os
import pickle
from typing import List, Dict, Any
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from dotenv import load_dotenv
load_dotenv()
class GmailClient:
def __init__(self):
self.service = None
self._authenticate()
def _authenticate(self):
"""Authenticate with Gmail API using OAuth2"""
creds = None
token_path = 'token.pickle'
if os.path.exists(token_path):
with open(token_path, 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_config(
{
"installed": {
"client_id": os.getenv("GOOGLE_CLIENT_ID"),
"client_secret": os.getenv("GOOGLE_CLIENT_SECRET"),
"redirect_uris": [os.getenv("GOOGLE_REDIRECT_URI")],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token"
}
},
os.getenv("GMAIL_SCOPES", "https://www.googleapis.com/auth/gmail.readonly").split()
)
creds = flow.run_local_server(port=8080)
with open(token_path, 'wb') as token:
pickle.dump(creds, token)
self.service = build('gmail', 'v1', credentials=creds)
def fetch_emails(self, query: str = None, max_results: int = None) -> List[Dict[str, Any]]:
"""Fetch emails from Gmail with optional query filter"""
try:
request = self.service.users().messages().list(
userId='me',
labelIds=[os.getenv("INBOX_LABEL", "INBOX")],
q=query,
maxResults=max_results or int(os.getenv("MAX_RESULTS", 100))
)
response = request.execute()
messages = response.get('messages', [])
return [self._get_message_details(msg['id']) for msg in messages]
except Exception as e:
print(f"Error fetching emails: {e}")
return []
def _get_message_details(self, message_id: str) -> Dict[str, Any]:
"""Get detailed message information"""
try:
message = self.service.users().messages().get(
userId='me',
id=message_id,
format='metadata',
metadataHeaders=['From', 'Subject', 'Date', 'Message-ID']
).execute()
headers = message['payload']['headers']
return {
'id': message_id,
'threadId': message['threadId'],
'from': next((h['value'] for h in headers if h['name'] == 'From'), ''),
'subject': next((h['value'] for h in headers if h['name'] == 'Subject'), ''),
'date': next((h['value'] for h in headers if h['name'] == 'Date'), ''),
'messageId': next((h['value'] for h in headers if h['name'] == 'Message-ID'), ''),
'snippet': message.get('snippet', '')
}
except Exception as e:
print(f"Error getting message details: {e}")
return {'id': message_id, 'error': str(e)}
def get_thread_messages(self, thread_id: str) -> List[Dict[str, Any]]:
"""Get all messages in a thread"""
try:
thread = self.service.users().threads().get(
userId='me',
id=thread_id
).execute()
messages = []
for msg in thread['messages']:
headers = msg['payload']['headers']
messages.append({
'id': msg['id'],
'threadId': thread_id,
'from': next((h['value'] for h in headers if h['name'] == 'From'), ''),
'subject': next((h['value'] for h in headers if h['name'] == 'Subject'), ''),
'date': next((h['value'] for h in headers if h['name'] == 'Date'), ''),
'messageId': next((h['value'] for h in headers if h['name'] == 'Message-ID'), ''),
'snippet': msg.get('snippet', '')
})
return messages
except Exception as e:
print(f"Error getting thread messages: {e}")
return []
if __name__ == "__main__":
client = GmailClient()
emails = client.fetch_emails()
print(f"Fetched {len(emails)} emails")