Youtu-RAG
Youtu Embedding

使用 Docker 部署

本文档提供构建包含所有模型权重和依赖项的自包含 Youtu Embedding Docker 镜像的说明。

环境要求:

  • 系统已安装 Docker
  • 足够的磁盘空间(镜像约 8GB)
  • 支持 CUDA 的 NVIDIA GPU(可选,用于 GPU 加速)

设置构建目录

首先,创建用于构建 Docker 镜像的目录并下载模型权重:

mkdir youtu-embedding-docker && cd youtu-embedding-docker

# 下载模型权重
git lfs install
git clone https://huggingface.co/tencent/Youtu-Embedding

# 如果您希望使用 BF16 权重以减少内存使用:
# git clone -b bfloat16 --single-branch https://huggingface.co/tencent/Youtu-Embedding

创建 Embedding 服务器脚本

创建名为 embedding_server.py 的文件,包含服务器实现。您可以从本地部署指南复制此内容或使用以下内容创建:

import fastapi
from fastapi.responses import JSONResponse
import uvicorn
from argparse import ArgumentParser
from transformers import AutoModel, AutoTokenizer
from typing import List
from pydantic import BaseModel
import torch
import base64
import numpy as np

class LLMEmbeddingModel():

    def __init__(self, 
                model_name_or_path, 
                batch_size=128, 
                max_length=1024, 
                gpu_id=0):
        """Local embedding model with automatic device selection"""
        self.model = AutoModel.from_pretrained(model_name_or_path, trust_remote_code=True)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, padding_side="right", trust_remote_code=True)

        # Device selection: CUDA -> MPS -> CPU
        if torch.cuda.is_available():
            self.device = torch.device(f"cuda:{gpu_id}")
        elif torch.backends.mps.is_available():
            self.device = torch.device("mps")
        else:
            self.device = torch.device("cpu")
        
        self.model.to(self.device).eval()

        self.max_length = max_length
        self.batch_size = batch_size

        query_instruction = "Given a search query, retrieve passages that answer the question"
        if query_instruction:
            self.query_instruction = f"Instruction: {query_instruction} \nQuery:"
        else:
            self.query_instruction = "Query:"

        self.doc_instruction = ""
        print(f"Model loaded: {model_name_or_path}")
        print(f"Device: {self.device}")

    def mean_pooling(self, hidden_state, attention_mask):
        s = torch.sum(hidden_state * attention_mask.unsqueeze(-1).float(), dim=1)
        d = attention_mask.sum(dim=1, keepdim=True).float()
        embedding = s / d
        return embedding

    @torch.no_grad()
    def encode(self, sentences_batch, instruction):
        inputs = self.tokenizer(
            sentences_batch,
            padding=True,
            truncation=True,
            return_tensors="pt",
            max_length=self.max_length,
            add_special_tokens=True,
        )
        inputs = {k: v.to(self.device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = self.model(**inputs)
            last_hidden_state = outputs[0]

            instruction_tokens = self.tokenizer(
                instruction,
                padding=False,
                truncation=True,
                max_length=self.max_length,
                add_special_tokens=True,
            )["input_ids"]
            if len(np.shape(np.array(instruction_tokens))) == 1:
                if len(instruction_tokens) > 0:
                     inputs["attention_mask"][:, :len(instruction_tokens)] = 0
            else:
                instruction_length = [len(item) for item in instruction_tokens]
                for idx in range(len(instruction_length)):
                    inputs["attention_mask"][idx, :instruction_length[idx]] = 0

            embeddings = self.mean_pooling(last_hidden_state, inputs["attention_mask"])
            embeddings = torch.nn.functional.normalize(embeddings, dim=-1)
        return embeddings

    def encode_queries(self, queries):
        queries = queries if isinstance(queries, list) else [queries]
        queries = [f"{self.query_instruction}{query}" for query in queries]
        return self.encode(queries, self.query_instruction)

    def encode_passages(self, passages):
        passages = passages if isinstance(passages, list) else [passages]
        passages = [f"{self.doc_instruction}{passage}" for passage in passages]
        return self.encode(passages, self.doc_instruction)


def parse_args():
    parser = ArgumentParser()
    parser.add_argument("--checkpoint", default="/app/Youtu-Embedding")
    parser.add_argument("--max_length", default=1024, type=int)
    parser.add_argument("--port", default=8501, type=int)
    parser.add_argument("--host", default="0.0.0.0")
    args = parser.parse_args()
    return args

args = parse_args()
print(args)

model_wrapper = LLMEmbeddingModel(
    model_name_or_path=args.checkpoint,
    max_length=args.max_length
)

app = fastapi.FastAPI()

class Query(BaseModel):
    query: str

class Doc(BaseModel):
    docs: List[str]

class InputText(BaseModel):
    texts: List[str]
    instruction: str = ""

@app.post("/embed_query")
def embed_query(query: Query):
    text = " " if query.query == "" else query.query
    embedding_tensor = model_wrapper.encode_queries([text])
    embedding = embedding_tensor.cpu().numpy()
    
    rsp = {
        "query": query.query,
        "embedding": base64.b64encode(embedding.tobytes()).decode("ascii"),
        "shape": embedding.shape,
    }
    return JSONResponse(rsp)

@app.post("/embed_docs")
def embed_doc(docs: Doc):
    if len(docs.docs) > 100:
        return fastapi.responses.PlainTextResponse(
            "number of docs too large", status_code=501
        )

    texts = [" " if text == "" else text for text in docs.docs]
    embedding_tensor = model_wrapper.encode_passages(texts)
    embedding = embedding_tensor.cpu().numpy()

    rsp = dict(
        docs=docs.docs,
        embedding=base64.b64encode(embedding.tobytes()).decode("ascii"),
        shape=embedding.shape,
    )
    return JSONResponse(rsp)

@app.post("/embed")
def embed(docs: Doc):
    if len(docs.docs) > 100:
         return fastapi.responses.PlainTextResponse(
            "number of texts too large", status_code=501
        )
    
    texts = [" " if text == "" else text for text in docs.docs]
    embedding_tensor = model_wrapper.encode_passages(texts)
    embedding = embedding_tensor.cpu().numpy()

    rsp = dict(
        docs=docs.docs,
        embedding=base64.b64encode(embedding.tobytes()).decode("ascii"),
        shape=embedding.shape,
    )
    return JSONResponse(rsp)

@app.post("/embed_texts")
def embed_texts(inputs: InputText):
    if len(inputs.texts) > 100:
        return fastapi.responses.PlainTextResponse(
            "number of texts too large", status_code=501
        )

    texts = [" " if text == "" else text for text in inputs.texts]
    instruction = inputs.instruction
    full_texts = [f"{instruction}{text}" for text in texts]
    
    embedding_tensor = model_wrapper.encode(full_texts, instruction)
    embedding = embedding_tensor.cpu().numpy()

    rsp = dict(
        texts=inputs.texts,
        embedding=base64.b64encode(embedding.tobytes()).decode("ascii"),
        shape=embedding.shape,
    )
    return JSONResponse(rsp)

@app.get("/model_id")
def model_id():
    return args.checkpoint

@app.get("/health")
def health():
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(app, host=args.host, port=args.port)

创建 Dockerfile

创建名为 Dockerfile 的文件,内容如下:

# Use official PyTorch image with CUDA support
FROM pytorch/pytorch:2.5.1-cuda12.4-cudnn9-runtime

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    git \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
RUN pip install --no-cache-dir \
    transformers==4.51.3 \
    numpy \
    scipy \
    scikit-learn \
    huggingface_hub \
    fastapi \
    uvicorn

# Copy the embedding server script
COPY embedding_server.py /app/embedding_server.py

# Copy the model weights into the container
COPY Youtu-Embedding/ /app/Youtu-Embedding/

# Set environment variables
ENV CHECKPOINT_PATH=/app/Youtu-Embedding
ENV PORT=8501

# Expose the server port
EXPOSE 8501

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
    CMD curl -f http://localhost:8501/health || exit 1

# Set the entrypoint
ENTRYPOINT ["python", "/app/embedding_server.py"]

# Default command arguments
CMD ["--checkpoint", "/app/Youtu-Embedding", "--port", "8501", "--host", "0.0.0.0"]

构建 Docker 镜像

使用以下命令构建 Docker 镜像:

docker build -t youtu-embedding:latest .

此过程可能需要几分钟,因为它会下载基础镜像并复制模型权重。

运行 Docker 容器

使用 GPU 支持运行容器:

docker run --gpus all -p 8501:8501 youtu-embedding:latest

仅 CPU 模式:

docker run -p 8501:8501 youtu-embedding:latest

您可以自定义服务器参数:

docker run --gpus all -p 8501:8501 youtu-embedding:latest \
    --checkpoint /app/Youtu-Embedding \
    --port 8501 \
    --host 0.0.0.0 \
    --max_length 2048

推送到容器注册表

要在远程机器上部署,请将镜像推送到容器注册表:

# 为您的注册表标记镜像
docker tag youtu-embedding:latest your-registry.com/youtu-embedding:latest

# 推送到注册表
docker push your-registry.com/youtu-embedding:latest

在远程机器上运行

在远程机器上,拉取并运行镜像:

# 拉取镜像
docker pull your-registry.com/youtu-embedding:latest

# 使用 GPU 支持运行容器
docker run --gpus all -p 8501:8501 your-registry.com/youtu-embedding:latest

Youtu Embedding 服务将在 http://<remote-machine-ip>:8501 可用。

API 端点

运行后,以下端点可用:

端点方法描述
/embed_queryPOST嵌入单个查询
/embed_docsPOST嵌入多个文档
/embedPOST通用嵌入端点
/embed_textsPOST使用自定义指令嵌入文本
/model_idGET获取模型检查点路径
/healthGET健康检查端点

最终目录结构

构建前,您的 youtu-embedding-docker 目录应具有以下结构:

youtu-embedding-docker/
├── Dockerfile
├── embedding_server.py
└── Youtu-Embedding/
    ├── config.json
    ├── config_sentence_transformers.json
    ├── configuration_youtu.py
    ├── modeling_youtu.py
    ├── model-00001-of-00002.safetensors
    ├── model-00002-of-00002.safetensors
    ├── model.safetensors.index.json
    ├── modules.json
    ├── sentence_bert_config.json
    ├── special_tokens_map.json
    ├── tokenizer_config.json
    ├── tokenizer.json
    └── 1_Pooling/
        └── config.json

On this page