本文介绍如何使用 Llama 2 和 FastAPI、Redis 和 Celery 构建基于大模型对话的应用程序,将介绍这些概念以及它们如何协同工作的。
我们一直在使用 FastAPI 来构建我们的LLM应用,是一个用高性能 Web 框架,其异步功能使其能够同时处理多个请求,这对于实时聊天应用程序至关重要。除了 FastAPI 之外,我们还使用 Celery 作为分布式任务队列来帮助管理从 LLM 生成响应的计算密集型任务。通过将此过程卸载到任务队列,应用程序在处理其他用户请求时保持对新用户请求的响应,从而确保用户不会等待。
由于我们使用的是分布式任务队列,因此我们需要一个消息代理来帮助异步任务处理。我们选择了 Redis 来完成这项工作。它将来自 FastAPI 的任务排队,以便由 Celery 拾取,从而实现高效、解耦的通信。Redis 的内存数据结构存储速度很快,允许实时分析、会话缓存和维护用户会话数据。
最后,我们使用 Docker 将应用程序及其依赖项封装到隔离的容器中,我们可以轻松地将其部署到各种环境中。
在这个架构中,FastAPI 用于创建接收传入请求的 Web 服务器,Celery 用于管理异步任务,Redis 充当 Celery 的代理和后端,存储任务及其结果。
Application
FastAPI 应用程序 (app.py) 用于生成文本和获取任务结果的终结点组成。
/generate/ 端点接受带有Prompt的输入的 POST 请求,并返回task ID。使用 Celery 任务generate_text_task异步启动任务。
/task/{task_id} 端点按任务的 ID 获取任务的状态/结果。
from fastapi import FastAPI
from pydantic import BaseModel
from celery.result import AsyncResult
from typing import Any
from celery_worker import generate_text_task
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
class Item(BaseModel):
prompt: str
@app.post("/generate/")
async def generate_text(item: Item) -> Any:
task = generate_text_task.delay(item.prompt)
return {"task_id": task.id}
@app.get("/task/{task_id}")
async def get_task(task_id: str) -> Any:
result = AsyncResult(task_id)
if result.ready():
res = result.get()
return {"result": res[0],
"time": res[1],
"memory": res[2]}
else:
return {"status": "Task not completed yet"}
Workers
Celery 辅助角色 (celery_worker.py) 文件创建一个 Celery 实例并定义 generate_text_task 函数。此函数接受提示并使用 Llama 2 模型生成文本。此函数在 @celery.task 装饰器中注册为 Celery 任务。
setup_model函数是一个工作线程初始化函数。它在工作进程启动时设置模型加载器。此函数注册为使用 @signals.worker_process_init.connect 装饰器在工作进程初始化事件上调用。
from celery import Celery, signals
from utils import generate_output
from model_loader import ModelLoader
def make_celery(app_name=__name__):
backend = broker = 'redis://llama2_redis_1:6379/0'
return Celery(app_name, backend=backend, broker=broker)
celery = make_celery()
model_loader = None
model_path = "meta-llama/Llama-2-7b-chat-hf"
@signals.worker_process_init.connect
def setup_model(signal, sender, **kwargs):
global model_loader
model_loader = ModelLoader(model_path)
@celery.task
def generate_text_task(prompt):
time, memory, outputs = generate_output(
prompt, model_loader.model, model_loader.tokenizer
)
return model_loader.tokenizer.decode(outputs[0]), time, memory
model
model_loader.py 中的 ModelLoader 类负责从给定的模型路径加载 Llama 2 模型。它使用 HuggingFace 的转换器库来加载模型及其标记器。
import os
from transformers import AutoModelForCausalLM, AutoConfig, AutoTokenizer
from dotenv import load_dotenv
load_dotenv()
class ModelLoader:
def __init__(self, model_path: str):
self.model_path = model_path
self.config = AutoConfig.from_pretrained(
self.model_path,
trust_remote_code=True,
use_auth_token=os.getenv("HUGGINGFACE_TOKEN"),
)
self.model = self._load_model()
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_path, use_auth_token=os.getenv("HUGGINGFACE_TOKEN")
)
def _load_model(self):
model = AutoModelForCausalLM.from_pretrained(
self.model_path,
config=self.config,
trust_remote_code=True,
load_in_4bit=True,
device_map="auto",
use_auth_token=os.getenv("HUGGINGFACE_TOKEN"),
)
return model
Broker
要设置 Redis,我们有两个选择:可以使用 docker 容器,也可以使用 Python 包redis_server。
如果使用 docker 容器(首选解决方案),只需运行以下命令即可。
-p 6379:6379 选项告诉 Docker 将传入主机端口 6379 的流量转发到容器的端口 6379。这样,实际上可以从 docker 容器外部访问 Redis。
docker run --name redis-db -p 6379:6379 -d redis
第二种选择是从 Python 接口执行此操作。redis_server.py脚本处理 Redis 服务器的安装和启动。Redis 既充当 Celery 的消息代理,又充当结果后端。
import subprocess
import redis_server
def install_redis_server(redis_version):
try:
subprocess.check_call(["pip", "install", f"redis-server=={redis_version}"])
print(f"Redis server version {redis_version} installed successfully.")
except subprocess.CalledProcessError:
print("Failed to install Redis server.")
exit(1)
def start_redis_server():
try:
redis_server_path = redis_server.REDIS_SERVER_PATH
subprocess.Popen([redis_server_path])
print("Redis server started successfully.")
except Exception as e:
print("Failed to start Redis server:", str(e))
exit(1)
def main():
redis_version = "6.0.9"
install_redis_server(redis_version)
start_redis_server()
if __name__ == "__main__":
main()
Run the application
主执行脚本 (run.py) 是与 FastAPI 应用程序通信的客户端脚本。它向 /generate/ 终结点发送提示,获取任务 ID,并定期轮询 /task/{task_id} 终结点,直到任务完成。
import http.client
import json
import time
API_HOST = "localhost"
API_PORT = 8000
def generate_text(prompt):
conn = http.client.HTTPConnection(API_HOST, API_PORT)
headers = {"Content-type": "application/json"}
data = {"prompt": prompt}
json_data = json.dumps(data)
conn.request("POST", "/generate/", json_data, headers)
response = conn.getresponse()
result = json.loads(response.read().decode())
conn.close()
return result["task_id"]
def get_task_status(task_id):
conn = http.client.HTTPConnection(API_HOST, API_PORT)
conn.request("GET", f"/task/{task_id}")
response = conn.getresponse()
status = response.read().decode()
conn.close()
return status
def main():
prompt = input("Enter the prompt: ")
task_id = generate_text(prompt)
while True:
status = get_task_status(task_id)
if "Task not completed yet" not in status:
print(status)
break
time.sleep(2)
if __name__ == "__main__":
main()
utils 模块 (utils.py) 提供了一个实用程序函数generate_output,用于使用 Llama 2 模型和分词器从提示生成文本。该函数使用 @time_decorator 和 @memory_decorator 进行装饰,以测量执行时间和内存使用情况。
import time
import torch
import functools
from transformers import AutoModelForCausalLM, AutoTokenizer
def time_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
exec_time = end_time - start_time
return (result, exec_time)
return wrapper
def memory_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
result, exec_time = func(*args, **kwargs)
peak_mem = torch.cuda.max_memory_allocated()
peak_mem_consumption = peak_mem / 1e9
return peak_mem_consumption, exec_time, result
return wrapper
@memory_decorator
@time_decorator
def generate_output(prompt: str, model: AutoModelForCausalLM, tokenizer: AutoTokenizer) -> torch.Tensor:
input_ids = tokenizer(prompt, return_tensors="pt").input_ids
input_ids = input_ids.to("cuda")
outputs = model.generate(input_ids, max_length=500)
return outputs
实质上,当通过 /generate/ 端点收到提示时,它会作为异步任务转发给 Celery 工作线程。工作人员使用 Llama 2 模型生成文本,并将结果存储在 Redis 中。您可以随时使用 /task/{task_id} 端点获取任务状态/结果。
Deployment
部署应用程序需要执行几个步骤。首先,让我们为我们的应用程序创建一个 Dockerfile:
FROM python:3.9-slim-buster
WORKDIR /app
ADD . /app
RUN pip install --no-cache-dir -r requirements.txt
EXPOSE 80
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "80"]
接下来,让我们定义 requirements.txt这是必需的,以便我们将所有依赖项安装在 Docker 容器中:
fastapi==0.99.1
uvicorn==0.22.0
pydantic==1.10.10
celery==5.3.1
redis==4.6.0
python-dotenv==1.0.0
transformers==4.30.2
torch==2.0.1
accelerate==0.21.0
bitsandbytes==0.41.0
scipy==1.11.1
创建一个 docker-compose.yml 文件,如下所示:
version: '3'
services:
web:
build: .
command: ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "80"]
volumes:
- .:/app
ports:
- 8000:80
depends_on:
- redis
worker:
build: .
command: celery -A celery_worker worker --loglevel=info
volumes:
- .:/app
depends_on:
- redis
redis:
image: "redis:alpine"
ports:
- 6379:6379
在 Docker Compose 配置中,“web”服务表示使用当前目录中的 Dockerfile 构建的 FastAPI 应用程序。它将主机的端口 8000 映射到容器的端口 80。“辅助角色”服务是 Celery 辅助角色,与 FastAPI 应用程序共享生成上下文。“redis”服务使用官方的Redis Docker镜像。该 depends_on
字段可确保 Redis 在“Web”和“worker”服务之前启动。所有这些服务都可以使用该 docker-compose up
命令启动。
References:
https://github.com/luisroque/large_laguage_models