LLama2: 使用 FastAPI、Celery、Redis 和 Docker 构建可扩展的聊天机器人

本文介绍如何使用 Llama 2 和 FastAPI、Redis 和 Celery 构建基于大模型对话的应用程序,将介绍这些概念以及它们如何协同工作的。

LLama2: 使用 FastAPI、Celery、Redis 和 Docker 构建可扩展的聊天机器人

我们一直在使用 FastAPI 来构建我们的LLM应用,是一个用高性能 Web 框架,其异步功能使其能够同时处理多个请求,这对于实时聊天应用程序至关重要。除了 FastAPI 之外,我们还使用 Celery 作为分布式任务队列来帮助管理从 LLM 生成响应的计算密集型任务。通过将此过程卸载到任务队列,应用程序在处理其他用户请求时保持对新用户请求的响应,从而确保用户不会等待。

由于我们使用的是分布式任务队列,因此我们需要一个消息代理来帮助异步任务处理。我们选择了 Redis 来完成这项工作。它将来自 FastAPI 的任务排队,以便由 Celery 拾取,从而实现高效、解耦的通信。Redis 的内存数据结构存储速度很快,允许实时分析、会话缓存和维护用户会话数据。

最后,我们使用 Docker 将应用程序及其依赖项封装到隔离的容器中,我们可以轻松地将其部署到各种环境中。

在这个架构中,FastAPI 用于创建接收传入请求的 Web 服务器,Celery 用于管理异步任务,Redis 充当 Celery 的代理和后端,存储任务及其结果。

Application

FastAPI 应用程序 (app.py) 用于生成文本和获取任务结果的终结点组成。

/generate/ 端点接受带有Prompt的输入的 POST 请求,并返回task ID。使用 Celery 任务generate_text_task异步启动任务。

/task/{task_id} 端点按任务的 ID 获取任务的状态/结果。

from fastapi import FastAPI
from pydantic import BaseModel
from celery.result import AsyncResult
from typing import Any
from celery_worker import generate_text_task
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()


class Item(BaseModel):
    prompt: str


@app.post("/generate/")
async def generate_text(item: Item) -> Any:
    task = generate_text_task.delay(item.prompt)
    return {"task_id": task.id}


@app.get("/task/{task_id}")
async def get_task(task_id: str) -> Any:
    result = AsyncResult(task_id)
    if result.ready():
        res = result.get()
        return {"result": res[0],
                "time": res[1],
                "memory": res[2]}
    else:
        return {"status": "Task not completed yet"}

Workers

Celery 辅助角色 (celery_worker.py) 文件创建一个 Celery 实例并定义 generate_text_task 函数。此函数接受提示并使用 Llama 2 模型生成文本。此函数在 @celery.task 装饰器中注册为 Celery 任务。

setup_model函数是一个工作线程初始化函数。它在工作进程启动时设置模型加载器。此函数注册为使用 @signals.worker_process_init.connect 装饰器在工作进程初始化事件上调用。

from celery import Celery, signals
from utils import generate_output
from model_loader import ModelLoader


def make_celery(app_name=__name__):
    backend = broker = 'redis://llama2_redis_1:6379/0'
    return Celery(app_name, backend=backend, broker=broker)


celery = make_celery()

model_loader = None
model_path = "meta-llama/Llama-2-7b-chat-hf"


@signals.worker_process_init.connect
def setup_model(signal, sender, **kwargs):
    global model_loader
    model_loader = ModelLoader(model_path)


@celery.task
def generate_text_task(prompt):
    time, memory, outputs = generate_output(
        prompt, model_loader.model, model_loader.tokenizer
    )
    return model_loader.tokenizer.decode(outputs[0]), time, memory

model

model_loader.py 中的 ModelLoader 类负责从给定的模型路径加载 Llama 2 模型。它使用 HuggingFace 的转换器库来加载模型及其标记器。

import os
from transformers import AutoModelForCausalLM, AutoConfig, AutoTokenizer
from dotenv import load_dotenv

load_dotenv()


class ModelLoader:
    def __init__(self, model_path: str):
        self.model_path = model_path
        self.config = AutoConfig.from_pretrained(
            self.model_path,
            trust_remote_code=True,
            use_auth_token=os.getenv("HUGGINGFACE_TOKEN"),
        )
        self.model = self._load_model()
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.model_path, use_auth_token=os.getenv("HUGGINGFACE_TOKEN")
        )

    def _load_model(self):
        model = AutoModelForCausalLM.from_pretrained(
            self.model_path,
            config=self.config,
            trust_remote_code=True,
            load_in_4bit=True,
            device_map="auto",
            use_auth_token=os.getenv("HUGGINGFACE_TOKEN"),
        )
        return model

Broker

要设置 Redis,我们有两个选择:可以使用 docker 容器,也可以使用 Python 包redis_server。

如果使用 docker 容器(首选解决方案),只需运行以下命令即可。

-p 6379:6379 选项告诉 Docker 将传入主机端口 6379 的流量转发到容器的端口 6379。这样,实际上可以从 docker 容器外部访问 Redis。

docker run --name redis-db -p 6379:6379 -d redis

第二种选择是从 Python 接口执行此操作。redis_server.py脚本处理 Redis 服务器的安装和启动。Redis 既充当 Celery 的消息代理,又充当结果后端。

import subprocess
import redis_server


def install_redis_server(redis_version):
    try:
        subprocess.check_call(["pip", "install", f"redis-server=={redis_version}"])
        print(f"Redis server version {redis_version} installed successfully.")
    except subprocess.CalledProcessError:
        print("Failed to install Redis server.")
        exit(1)


def start_redis_server():
    try:
        redis_server_path = redis_server.REDIS_SERVER_PATH
        subprocess.Popen([redis_server_path])
        print("Redis server started successfully.")
    except Exception as e:
        print("Failed to start Redis server:", str(e))
        exit(1)


def main():
    redis_version = "6.0.9"
    install_redis_server(redis_version)
    start_redis_server()


if __name__ == "__main__":
    main()

Run the application

主执行脚本 (run.py) 是与 FastAPI 应用程序通信的客户端脚本。它向 /generate/ 终结点发送提示,获取任务 ID,并定期轮询 /task/{task_id} 终结点,直到任务完成。

import http.client
import json
import time

API_HOST = "localhost"
API_PORT = 8000


def generate_text(prompt):
    conn = http.client.HTTPConnection(API_HOST, API_PORT)
    headers = {"Content-type": "application/json"}
    data = {"prompt": prompt}
    json_data = json.dumps(data)
    conn.request("POST", "/generate/", json_data, headers)
    response = conn.getresponse()
    result = json.loads(response.read().decode())
    conn.close()
    return result["task_id"]


def get_task_status(task_id):
    conn = http.client.HTTPConnection(API_HOST, API_PORT)
    conn.request("GET", f"/task/{task_id}")
    response = conn.getresponse()
    status = response.read().decode()
    conn.close()
    return status


def main():
    prompt = input("Enter the prompt: ")

    task_id = generate_text(prompt)
    while True:
        status = get_task_status(task_id)
        if "Task not completed yet" not in status:
            print(status)
            break
        time.sleep(2)


if __name__ == "__main__":
    main()

utils 模块 (utils.py) 提供了一个实用程序函数generate_output,用于使用 Llama 2 模型和分词器从提示生成文本。该函数使用 @time_decorator 和 @memory_decorator 进行装饰,以测量执行时间和内存使用情况。

import time
import torch
import functools
from transformers import AutoModelForCausalLM, AutoTokenizer


def time_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        exec_time = end_time - start_time
        return (result, exec_time)
    return wrapper


def memory_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        torch.cuda.empty_cache()
        torch.cuda.reset_peak_memory_stats()
        result, exec_time = func(*args, **kwargs)
        peak_mem = torch.cuda.max_memory_allocated()
        peak_mem_consumption = peak_mem / 1e9
        return peak_mem_consumption, exec_time, result
    return wrapper


@memory_decorator
@time_decorator
def generate_output(prompt: str, model: AutoModelForCausalLM, tokenizer: AutoTokenizer) -> torch.Tensor:
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids
    input_ids = input_ids.to("cuda")
    outputs = model.generate(input_ids, max_length=500)
    return outputs

实质上,当通过 /generate/ 端点收到提示时,它会作为异步任务转发给 Celery 工作线程。工作人员使用 Llama 2 模型生成文本,并将结果存储在 Redis 中。您可以随时使用 /task/{task_id} 端点获取任务状态/结果。

Deployment 

部署应用程序需要执行几个步骤。首先,让我们为我们的应用程序创建一个 Dockerfile:

FROM python:3.9-slim-buster

WORKDIR /app
ADD . /app

RUN pip install --no-cache-dir -r requirements.txt
EXPOSE 80

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "80"]

接下来,让我们定义 requirements.txt这是必需的,以便我们将所有依赖项安装在 Docker 容器中:

fastapi==0.99.1
uvicorn==0.22.0
pydantic==1.10.10
celery==5.3.1
redis==4.6.0
python-dotenv==1.0.0
transformers==4.30.2
torch==2.0.1
accelerate==0.21.0
bitsandbytes==0.41.0
scipy==1.11.1

创建一个 docker-compose.yml 文件,如下所示:

version: '3'
services:
  web:
    build: .
    command: ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "80"]
    volumes:
      - .:/app
    ports:
      - 8000:80
    depends_on:
      - redis
  worker:
    build: .
    command: celery -A celery_worker worker --loglevel=info
    volumes:
      - .:/app
    depends_on:
      - redis
  redis:
    image: "redis:alpine"
    ports:
      - 6379:6379

在 Docker Compose 配置中,“web”服务表示使用当前目录中的 Dockerfile 构建的 FastAPI 应用程序。它将主机的端口 8000 映射到容器的端口 80。“辅助角色”服务是 Celery 辅助角色,与 FastAPI 应用程序共享生成上下文。“redis”服务使用官方的Redis Docker镜像。该 depends_on 字段可确保 Redis 在“Web”和“worker”服务之前启动。所有这些服务都可以使用该 docker-compose up 命令启动。

References:

https://github.com/luisroque/large_laguage_models

LLama2: 使用 FastAPI、Celery、Redis 和 Docker 构建可扩展的聊天机器人

原创文章。转载请注明: 作者:JiangYuan 网址: https://www.icnma.com
Like (0)
JiangYuan管理
Previous 05/06/2023 18:05
Next 04/08/2023 11:55

猜你想看