import asyncio from langchain_openai import ChatOpenAI from langchain.agents import tool from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.agents.format_scratchpad.openai_tools import ( format_to_openai_tool_messages, ) from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser from langchain.agents import AgentExecutor from dotenv import load_dotenv import os from sqlalchemy import Boolean import base64 import httpx from email import encoders from email.header import Header from email.mime.text import MIMEText from email.utils import parseaddr, formataddr import smtplib def _format_addr(s): name, addr = parseaddr(s) return formataddr((Header(name, "utf-8").encode(), addr)) def _send_email(content: str): from_addr = "[email protected]" password = "" to_addr = "[email protected]" smtp_server = "smtp.gmail.com" msg = MIMEText(content, "plain", "utf-8") msg["From"] = _format_addr("FIRE ALARMER <%s>" % from_addr) msg["To"] = _format_addr("ADMIN <%s>" % to_addr) msg["Subject"] = Header("You got one fire alarm!", "utf-8").encode() print("---- before login 11-----") server = smtplib.SMTP_SSL(smtp_server, 465) print("---- before login 1122-----") server.set_debuglevel(1) print("---- before login -----") server.login(from_addr, password) print("---- after login -----") server.sendmail(from_addr, [to_addr], msg.as_string()) server.quit() load_dotenv() # llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0) llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) @tool def get_word_length(word: str) -> int: """Returns the length of a word.""" return len(word) # print(get_word_length.invoke("abc")) @tool def turn_on_light() -> Boolean: """ turn on light Return the result of turning on light. """ return True @tool def send_one_email(content: str) -> Boolean: """ send one email to admin. Return the result of sending one email. """ _send_email(content) return True tools = [get_word_length, turn_on_light, send_one_email] prompt = ChatPromptTemplate.from_messages( [ ( "system", "You are very powerful assistant, but don't know current events", ), ("user", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad"), ] ) # # # prompt = ChatPromptTemplate.from_messages( # [ # ( # "system", # "You are very powerful assistant, but don't know current events", # ), # ( # "user", # [ # { # "type": "text", # "text": "{input}" # }, # { # "type": "image_url", # "image_url": {"url": "data:image/jpeg;base64,{image_data}"}, # } # ], # ), # MessagesPlaceholder(variable_name="agent_scratchpad"), # ] # ) llm_with_tools = llm.bind_tools(tools) agent = ( { "input": lambda x: x["input"], "image_data": lambda x: x['image_data'], "agent_scratchpad": lambda x: format_to_openai_tool_messages( x["intermediate_steps"] ), } | prompt | llm_with_tools | OpenAIToolsAgentOutputParser() ) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) # image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" image_url = "https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQhSUzTow_Xta3T3VF8op28XCTCM4D3boxhaA3ZrNyS5xkQXEpTNvsrUhmvfkwKOb5Z7jY&usqp=CAU" image_data = base64.b64encode(httpx.get(image_url).content).decode("utf-8") # async def main(): # resp = agent_executor.stream({"input": "How many letters in the word eudca"}) # async for one in resp: # print(one) # # asyncio.run(main()) print(agent_executor.invoke({"input": "How many letters in the word eudca", "image_data": ""})) # list(agent_executor.stream({"input": "How many letters in the word eudca"})) # list(agent_executor.stream({"input": "Please turn on light in this room."})) # list(agent_executor.stream({"input": ''' # Please send one email with the following text as content: # The fire is too heavy, please take action immediately. # '''})) # list(agent_executor.stream({"input": "please understand this image as of fire alarm, if any fire risk then send email with that fire description, otherwise do not send alarm email.", "image_data": image_data})) # _send_email("The fire is too heavy, please take action immediately.")
标签:code,addr,image,agent,sample,import,tools,email From: https://www.cnblogs.com/lightsong/p/18534108