| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625 | from google.auth.transport.requests import Requestfrom google_auth_oauthlib.flow import InstalledAppFlowfrom googleapiclient.discovery import buildfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartfrom email.mime.base import MIMEBasefrom email import encodersfrom bs4 import BeautifulSoupimport osimport pytzimport base64import picklefrom datetime import datetime, timezoneimport jsonimport ollamafrom pypdf import PdfReaderfrom pathlib import PathSCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.compose']def authenticate_gmail(user_email):    creds = None    token_file = f'token_{user_email}.pickle'  # Unique token file for each user        # Load the user's token if it exists    if os.path.exists(token_file):        with open(token_file, 'rb') as token:            creds = pickle.load(token)        # If no valid credentials, prompt the user to log in    if not creds or not creds.valid:        if creds and creds.expired and creds.refresh_token:            creds.refresh(Request())        else:            flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)            creds = flow.run_console()                # Save the new credentials to a user-specific token file        with open(token_file, 'wb') as token:            pickle.dump(creds, token)        # Build the Gmail API service    service = build('gmail', 'v1', credentials=creds)    return servicedef num_of_emails(query=''):    response = service.users().messages().list(        userId='me',         q=query,         maxResults=1).execute()    return response.get('resultSizeEstimate', 0)def list_emails(query='', max_results=100):    emails = []    next_page_token = None    while True:        response = service.users().messages().list(            userId=user_id,            maxResults=max_results,            pageToken=next_page_token,            q=query        ).execute()                if 'messages' in response:            for msg in response['messages']:                sender, subject, received_time = get_email_info(msg['id'])                emails.append(                    {                        "message_id": msg['id'],                        "sender": sender,                        "subject": subject,                        "received_time": received_time                    }                )                next_page_token = response.get('nextPageToken')        if not next_page_token:            break        return emailsdef get_email_detail(detail, which=''):    if detail == 'body':        return get_email_body(which)    elif detail == 'attachment':        return get_email_attachments(which)def get_email_body(message_id):    try:        message = service.users().messages().get(            userId=user_id,             id=message_id,             format='full').execute()        # Recursive function to extract the parts        def extract_parts(payload):            text_body = ""            if 'parts' in payload:                for part in payload['parts']:                    return extract_parts(part)            else:                mime_type = payload.get('mimeType')                body = payload.get('body', {}).get('data')                if mime_type == 'text/html':                    decoded_body = base64.urlsafe_b64decode(body).decode('utf-8')                    soup = BeautifulSoup(decoded_body, 'html.parser')                    text_body = soup.get_text().strip()                elif mime_type == 'text/plain':                    decoded_body = base64.urlsafe_b64decode(body).decode('utf-8')                    text_body = decoded_body                return text_body        return extract_parts(message['payload'])    except Exception as e:        print(f"An error occurred: {e}")        return Nonedef parse_message(message):    payload = message['payload']    headers = payload.get("headers")    subject = None    sender = None    for header in headers:        if header['name'] == 'Subject':            subject = header['value']        elif header['name'] == 'From':            sender = header['value']        internal_date = message.get('internalDate')      utc_time = datetime.fromtimestamp(int(internal_date) / 1000, tz=timezone.utc)        # Convert UTC to the specified timezone    local_timezone = pytz.timezone("America/Los_Angeles")    local_time = utc_time.astimezone(local_timezone)        # Format the local time as a string    received_time = local_time.strftime('%Y-%m-%d %H:%M:%S %Z')    # Check if the email is plain text or multipart    if 'parts' in payload:        # Multipart message - find the text/plain or text/html part        for part in payload['parts']:            if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html':  # You can also look for 'text/html'                data = part['body']['data']                body = base64.urlsafe_b64decode(data).decode('utf-8')                return sender, subject, received_time, body            elif part['mimeType'] in ['multipart/related', 'multipart/mixed', 'multipart/alternative']:                return sender, subject, received_time, get_email_body(message.get('id'))    else:        # Single part message        data = payload['body']['data']        body = base64.urlsafe_b64decode(data).decode('utf-8')        return sender, subject, received_time, body    def get_email_info(msg_id):    message = service.users().messages().get(        userId=user_id,         id=msg_id,         format='full').execute()    sender, subject, received_time, body = parse_message(message)        return sender, subject, received_timedef reply_email(message_id, reply_text):    # Fetch the original message    original_message = service.users().messages().get(        userId=user_id,         id=message_id,         format='full').execute()        # Get headers    headers = original_message['payload']['headers']    subject = None    to = None    for header in headers:        if header['name'] == 'Subject':            subject = header['value']        if header['name'] == 'From':            to = header['value']        # Create the reply subject    if not subject.startswith("Re: "):        subject = "Re: " + subject    # Compose the reply message    reply_message = MIMEText(reply_text)    reply_message['to'] = to    reply_message['from'] = user_id    reply_message['subject'] = subject    reply_message['In-Reply-To'] = message_id        # Encode and send the message    raw_message = base64.urlsafe_b64encode(reply_message.as_bytes()).decode("utf-8")    body = {'raw': raw_message,             'threadId': original_message['threadId']}    sent_message = service.users().messages().send(        userId=user_id,         body=body).execute()    print("Reply sent. Message ID:", sent_message['id'])def forward_email(message_id, forward_to, email_body=None):    """    Forwards an email, preserving the original MIME type, including multipart/related.    """    # Get the original message in 'full' format    original_message = service.users().messages().get(        userId=user_id,        id=message_id,        format='full').execute()    # Extract the payload and headers    payload = original_message.get('payload', {})    headers = payload.get('headers', [])    parts = payload.get('parts', [])    # Get the Subject    subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No Subject')    # Create a new MIME message for forwarding    mime_message = MIMEMultipart(payload.get('mimeType', 'mixed').split('/')[-1])    mime_message['To'] = forward_to    mime_message['Subject'] = f"Fwd: {subject}"    # Add the optional custom email body    if email_body:        mime_message.attach(MIMEText(email_body, 'plain'))    # Function to fetch attachment data by attachmentId    def fetch_attachment_data(attachment_id, message_id):        attachment = service.users().messages().attachments().get(            userId=user_id, messageId=message_id, id=attachment_id        ).execute()        return base64.urlsafe_b64decode(attachment['data'])    # Rebuild MIME structure    def rebuild_parts(parts):        """        Recursively rebuild MIME parts.        """        if not parts:            return None        for part in parts:            part_mime_type = part.get('mimeType', 'text/plain')            part_body = part.get('body', {})            part_data = part_body.get('data', '')            part_parts = part.get('parts', [])  # Sub-parts for multipart types            filename = part.get('filename')            attachment_id = part_body.get('attachmentId')            if part_mime_type.startswith('multipart/'):                # Rebuild nested multipart                sub_multipart = MIMEMultipart(part_mime_type.split('/')[-1])                sub_parts = rebuild_parts(part_parts)                if sub_parts:                    for sub_part in sub_parts:                        sub_multipart.attach(sub_part)                yield sub_multipart            elif filename and attachment_id:                # Handle attachments                decoded_data = fetch_attachment_data(attachment_id, message_id)                attachment = MIMEBase(*part_mime_type.split('/'))                attachment.set_payload(decoded_data)                encoders.encode_base64(attachment)                attachment.add_header('Content-Disposition', f'attachment; filename="{filename}"')                yield attachment            else:                if part_data:                    # Decode and attach non-multipart parts                    decoded_data = base64.urlsafe_b64decode(part_data)                    if part_mime_type == 'text/plain':                        yield MIMEText(decoded_data.decode('utf-8'), 'plain')                    elif part_mime_type == 'text/html':                        yield MIMEText(decoded_data.decode('utf-8'), 'html')    # Rebuild the main MIME structure    rebuilt_parts = rebuild_parts(parts)    if rebuilt_parts:        for rebuilt_part in rebuilt_parts:            mime_message.attach(rebuilt_part)    # Encode the MIME message to base64    raw = base64.urlsafe_b64encode(mime_message.as_bytes()).decode('utf-8')    # Send the email    forward_body = {'raw': raw}    sent_message = service.users().messages().send(userId=user_id, body=forward_body).execute()    print(f"Message forwarded successfully! Message ID: {sent_message['id']}")def send_email(action, to, subject, body="", email_id=""):    if action == "compose":        message = MIMEText(body)        message['to'] = to        message['from'] = user_id        message['subject'] = subject                # Encode and send the message        raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8")        body = {'raw': raw_message}        sent_message = service.users().messages().send(            userId=user_id,             body=body).execute()        return sent_message['id']    elif action == "reply": # reply or forward; a message id is needed        reply_email(email_id, body)    elif action == "forward":        forward_email(email_id, to, body)def create_draft(action, to, subject, body="", email_id=""):    if action == "new":        message = MIMEText(body)        message['to'] = to        message['from'] = user_id        message['subject'] = subject                encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()        draft_body = {'message': {'raw': encoded_message}}        draft = service.users().drafts().create(            userId=user_id,             body=draft_body).execute()        print(f"Draft created with ID: {draft['id']}")        return draft['id']    elif action == "reply":        return create_reply_draft(email_id, body)    elif action == "forward":        return create_forward_draft(email_id, to, body)    else:        returndef create_reply_draft(message_id, reply_text):    # Fetch the original message    original_message = service.users().messages().get(        userId=user_id,        id=message_id,        format='full').execute()    # Get headers    headers = original_message['payload']['headers']    subject = None    to = None    for header in headers:        if header['name'] == 'Subject':            subject = header['value']        if header['name'] == 'From':            to = header['value']    # Create the reply subject    if not subject.startswith("Re: "):        subject = "Re: " + subject    # Compose the reply message    reply_message = MIMEText(reply_text)    reply_message['to'] = to    reply_message['from'] = user_id    reply_message['subject'] = subject    reply_message['In-Reply-To'] = message_id    encoded_message = base64.urlsafe_b64encode(reply_message.as_bytes()).decode()    draft_body = {'message': {'raw': encoded_message, 'threadId': original_message['threadId']}}    draft = service.users().drafts().create(userId=user_id, body=draft_body).execute()    return draft['id']def create_forward_draft(message_id, recipient_email, custom_message=None):    # Get the original message    original_message = service.users().messages().get(        userId=user_id,        id=message_id,        format='raw').execute()    # Decode the raw message    raw_message = base64.urlsafe_b64decode(original_message['raw'].encode('utf-8'))    # Prepare the forward header and optional custom message    forward_header = f"----- Forwarded message -----\nFrom: {recipient_email}\n\n"    if custom_message:        forward_header += f"{custom_message}\n\n"    # Combine the forward header with the original message    new_message = forward_header + raw_message.decode('utf-8')    # Encode the combined message into base64 format    encoded_message = base64.urlsafe_b64encode(new_message.encode('utf-8')).decode('utf-8')    draft_body = {'message': {'raw': encoded_message, 'threadId': original_message['threadId']}}    draft = service.users().drafts().create(userId=user_id, body=draft_body).execute()    print(f"Forward draft created with ID: {draft['id']}")    return draft['id']def send_draft(id):    sent_message = service.users().drafts().send(        userId=user_id,         body={'id': id}        ).execute()    return f"Draft sent with email ID: {sent_message['id']}"    def get_pdf_summary(file_name):    text = pdf2text(file_name)    print("Calling Llama to generate a summary...")    response = llama31(text, "Generate a summary of the input text in 5 sentences.")    return response    def get_email_attachments(message_id, mime_type='application/pdf'):    attachments = []    # Helper function to process email parts    def process_parts(parts):        for part in parts:            if part['mimeType'] in ['multipart/related', 'multipart/mixed', 'multipart/alternative']:                # Recursively process nested parts                if 'parts' in part:                    process_parts(part['parts'])            elif 'filename' in part and part['filename']:                if part['mimeType'] == mime_type:  # Check for the desired MIME type                    attachment_id = part['body'].get('attachmentId')                    if attachment_id:                        # Get the attachment data                        attachment = service.users().messages().attachments().get(                            userId=user_id,                             messageId=message_id,                             id=attachment_id                        ).execute()                                                # Decode the attachment content                        file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8'))                        with open(part['filename'], "wb") as f:                            f.write(file_data)                                                # Save the attachment information                        attachments.append(                            {'filename': part['filename'],                             'data': file_data,                            'size': attachment.get('size', 0)                            })    # Retrieve the email message    message = service.users().messages().get(        userId=user_id,        id=message_id,        format='full').execute()    payload = message['payload']    # Start processing the parts    if 'parts' in payload:        process_parts(payload['parts'])        rslt = ""    for a in attachments:                rslt += f"{a['filename']} - {a['size']} bytes\n"    return rslt #attachmentsdef pdf2text(file):    text = ''    try:        with Path(file).open("rb") as f:            reader = PdfReader(f)            text = "\n\n".join([page.extract_text() for page in reader.pages])    except Exception as e:        raise f"Error reading the PDF file: {str(e)}"    print(f"\nPDF text length: {len(text)}\n")    return textuser_email = Noneservice = Noneuser_id = 'me'def set_email_service(email):    global user_email    global service    user_email = email    service = authenticate_gmail(user_email)class Agent:    def __init__(self, system_prompt=""):        self.system_prompt = system_prompt        self.messages = []        # Gmagent-specific short term memory, used to answer follow up questions AFTER a list of emails is found matching user's query        self.emails = []        self.draft_id = None        if self.system_prompt:            self.messages.append({"role": "system", "content": system_prompt})    def __call__(self, user_prompt_or_tool_result, is_tool_call=False):        # if it's tool call result, use "ipython" instead of "user" for the role        self.messages.append({"role": ("ipython" if is_tool_call else "user"), "content": user_prompt_or_tool_result})        result = self.llama()        print(f"\nLlama returned: {result}.")        if type(result) == dict: # result is a dict only if it's a tool call spec            function_name = result["function_name"]            func = globals()[function_name]            parameters = result["parameters"]            if function_name == "get_email_detail":                # TODO: parse which - valid values are first, second,                # third, fourth, last, from xxx                if 'id' in parameters.keys():                    parameters['which'] = parameters['id']                    del parameters['id'] # per the function spec                elif 'which' in parameters.keys():                    if 'from ' in parameters['which']:                        sender = parameters['which'].split('from ')[-1]                        for email in self.emails:                            if email['sender'].find(sender) != -1:                                parameters['which'] = email['message_id']                                break                    if 'subject ' in parameters['which']:                        subject = parameters['which'].split('subject ')[-1]                        # exact match beats substring                        for email in self.emails:                            if email['subject'].upper() == subject.upper():                                parameters['which'] = email['message_id']                                break                            elif email['subject'].upper().find(subject.upper()) != -1:                                parameters['which'] = email['message_id']                    elif 'id_' in parameters['which']:                        parameters['which'] = parameters['which'].split('id_')[-1]                    else:                        parameters['which'] = self.emails[-1]['message_id']            elif function_name == "send_draft":                parameters['id'] = self.draft_id            print(f"\nCalling tool to access Gmail API: {function_name}, {parameters}...")            result = func(**parameters)            print(f"\nTool calling returned: {result}")            # convert function calling result to concise summary, offering interactive follow ups,            # for smooth and user friendly experience            if function_name == 'list_emails':                self.emails = result                num = len(result)                if num == 0:                    output = "I couldn't find any such emails. What else would you like to do?"                elif num <= 5:                    output = f"I found {num} email{'s' if num > 1 else ''} matching your query:\n"                    for i, email in enumerate(result, start=1):                        output += f"{i}. From: {email['sender']}, Subject: {email['subject']}, Received on: {email['received_time']}\n"                else:                    output = f"I found {num} emails matching your query. Here are the first 5 emails:\n"                    for i in range(1, 6):                        output += f"{i}. From: {result[i-1]['sender']}, Subject: {result[i-1]['subject']}, Received on: {result[i-1]['received_time']}\n"            elif function_name == "get_email_detail":                output = result            elif function_name == "get_pdf_summary":                output = result            elif function_name == "send_email":                output = "Email sent."            elif function_name == "create_draft":                output = "Draft created."                self.draft_id = result            elif function_name == "send_draft":                output = result            print(f"\n-------------------------\n\nGmagent: {output}\n")        else:            output = result # direct text, not JSON, response by Llama        # adding this may cause Llama to hallucinate when answering        # follow up questions. e.g. "do i have emails with attachments        # larger than 20mb" got right tool calling response, then        # follow up "larger than 10mb" got hallucinated response.        # self.messages.append({"role": "assistant", "content": output})        # this mitigates the hallucination        self.messages.append({"role": "assistant", "content": str(result)})        return output    def llama(self):        response = ollama.chat(model='llama3.1',            messages = self.messages,            options = {                "temperature": 0.0            }        )        result = response['message']['content']        try:          res = json.loads(result.split("<|python_tag|>")[-1])          function_name = res['name']          parameters = res['parameters']          return {"function_name": function_name,                  "parameters": parameters}        except:          return resultdef llama31(user_prompt: str, system_prompt = ""):    response = ollama.chat(model='llama3.1',        messages=[            {"role": "system", "content": system_prompt},            {"role": "user", "content": user_prompt},        ],    )    return response['message']['content']
 |