HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

03-Feature Store 特征存储

概述

Feature Store(特征存储)是机器学习基础设施的核心组件,解决了特征工程中的复用、一致性和服务化问题。本文深入探讨 Feature Store 的架构设计、主流方案对比和生产实践。

为什么需要 Feature Store

特征工程的挑战

┌─────────────────────────────────────────────────────────────────────┐
│                    没有 Feature Store 的问题                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    特征重复计算                              │   │
│  │                                                             │   │
│  │  Team A: user_age = today - birth_date                     │   │
│  │  Team B: age = (now() - dob).days / 365                    │   │
│  │  Team C: customer_age = DATEDIFF(current, birthday)        │   │
│  │                                                             │   │
│  │  ► 同一特征,多处实现,结果可能不一致                        │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    训练-服务偏差                             │   │
│  │                                                             │   │
│  │  训练时:  SQL + Spark 批处理计算特征                        │   │
│  │  服务时:  Python + Redis 实时计算特征                       │   │
│  │                                                             │   │
│  │  ► 实现不同导致训练和推理结果不一致                          │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    特征发现困难                              │   │
│  │                                                             │   │
│  │  Q: 有没有现成的用户画像特征?                              │   │
│  │  A: 可能有...在某个 notebook 里...或者某个 SQL 文件...      │   │
│  │                                                             │   │
│  │  ► 缺乏特征目录,难以复用已有工作                            │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Feature Store 核心价值

┌─────────────────────────────────────────────────────────────────────┐
│                    Feature Store 架构                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────────────┐     ┌──────────────────────┐            │
│  │    特征生产者        │     │    特征消费者        │            │
│  │                      │     │                      │            │
│  │  • 数据工程师        │     │  • 训练 Pipeline     │            │
│  │  • ML 工程师         │     │  • 推理服务          │            │
│  │  • 业务分析师        │     │  • 数据分析          │            │
│  └──────────┬───────────┘     └──────────┬───────────┘            │
│             │                            │                         │
│             ▼                            ▼                         │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                     Feature Store                            │   │
│  │                                                             │   │
│  │  ┌──────────────────────────────────────────────────────┐  │   │
│  │  │                  特征目录 (Catalog)                   │  │   │
│  │  │  • 特征定义与文档    • 血缘追踪                       │  │   │
│  │  │  • 数据类型与Schema  • 访问控制                       │  │   │
│  │  └──────────────────────────────────────────────────────┘  │   │
│  │                                                             │   │
│  │  ┌────────────────────┐  ┌────────────────────┐           │   │
│  │  │   Offline Store    │  │   Online Store     │           │   │
│  │  │   (批量特征)        │  │   (实时特征)        │           │   │
│  │  │                    │  │                    │           │   │
│  │  │  • 历史特征存储    │  │  • 低延迟查询      │           │   │
│  │  │  • Point-in-time  │  │  • 最新特征值      │           │   │
│  │  │  • 大规模数据      │  │  • 高可用          │           │   │
│  │  └────────────────────┘  └────────────────────┘           │   │
│  │                                                             │   │
│  │  ┌──────────────────────────────────────────────────────┐  │   │
│  │  │              特征计算引擎 (Transformation)            │  │   │
│  │  │  • 批处理    • 流处理    • 按需计算                   │  │   │
│  │  └──────────────────────────────────────────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

主流 Feature Store 对比

特性FeastTectonDatabricksHopsworksSageMaker
开源✓✗✗✓✗
Offline Store✓✓✓✓✓
Online Store✓✓✓✓✓
流式特征有限✓✓✓✓
特征转换基础✓✓✓基础
血缘追踪基础✓✓✓✓
K8s 原生✓✓✗✓✗
成本低高中中按量

Feast 深度实践

Feast 架构

┌─────────────────────────────────────────────────────────────────────┐
│                        Feast 架构                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      Feast SDK                               │   │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │   │
│  │  │ Python   │  │ Go       │  │ Java     │  │ CLI      │    │   │
│  │  │ Client   │  │ Client   │  │ Client   │  │          │    │   │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘    │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                      │
│                              ▼                                      │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                   Feast Server (可选)                        │   │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐                   │   │
│  │  │ REST API │  │ gRPC API │  │ Metrics  │                   │   │
│  │  └──────────┘  └──────────┘  └──────────┘                   │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                      │
│                              ▼                                      │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Registry (元数据)                         │   │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐                   │   │
│  │  │ 文件     │  │ SQL      │  │ S3/GCS   │                   │   │
│  │  └──────────┘  └──────────┘  └──────────┘                   │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                              │                                      │
│         ┌────────────────────┴────────────────────┐                │
│         ▼                                         ▼                │
│  ┌────────────────────┐              ┌────────────────────┐       │
│  │   Offline Store    │              │   Online Store     │       │
│  │                    │              │                    │       │
│  │  • BigQuery        │              │  • Redis           │       │
│  │  • Snowflake       │              │  • DynamoDB        │       │
│  │  • Redshift        │              │  • Bigtable        │       │
│  │  • Spark/Parquet   │              │  • SQLite          │       │
│  └────────────────────┘              └────────────────────┘       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Feast 配置与定义

# feature_store.py
"""
Feast Feature Store 配置与特征定义
"""

from datetime import timedelta
from feast import (
    Entity,
    Feature,
    FeatureView,
    FileSource,
    Field,
    PushSource,
    RequestSource,
    FeatureService,
    OnDemandFeatureView,
    BatchFeatureView,
    StreamFeatureView
)
from feast.types import Float32, Int64, String, Array, Float64
from feast.value_type import ValueType


# ==================== 数据源定义 ====================

# 批量数据源
user_features_source = FileSource(
    name="user_features_source",
    path="s3://feature-store/user_features/*.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

transaction_source = FileSource(
    name="transaction_source",
    path="s3://feature-store/transactions/*.parquet",
    timestamp_field="transaction_time",
)

# 流式数据源
user_activity_push_source = PushSource(
    name="user_activity_push_source",
    batch_source=FileSource(
        name="user_activity_batch",
        path="s3://feature-store/user_activity/*.parquet",
        timestamp_field="event_timestamp",
    ),
)

# 请求时数据(实时输入)
request_source = RequestSource(
    name="request_source",
    schema=[
        Field(name="current_latitude", dtype=Float64),
        Field(name="current_longitude", dtype=Float64),
    ]
)


# ==================== 实体定义 ====================

user_entity = Entity(
    name="user_id",
    description="User identifier",
    join_keys=["user_id"],
    value_type=ValueType.STRING,
)

item_entity = Entity(
    name="item_id",
    description="Item identifier",
    join_keys=["item_id"],
    value_type=ValueType.STRING,
)

merchant_entity = Entity(
    name="merchant_id",
    description="Merchant identifier",
    join_keys=["merchant_id"],
    value_type=ValueType.STRING,
)


# ==================== 特征视图定义 ====================

# 用户基础特征
user_profile_fv = FeatureView(
    name="user_profile",
    entities=[user_entity],
    ttl=timedelta(days=365),
    schema=[
        Field(name="age", dtype=Int64),
        Field(name="gender", dtype=String),
        Field(name="city", dtype=String),
        Field(name="registration_days", dtype=Int64),
        Field(name="is_premium", dtype=Int64),
    ],
    source=user_features_source,
    online=True,
    tags={"team": "user-platform", "data_quality": "high"},
)

# 用户行为特征(聚合)
user_behavior_fv = FeatureView(
    name="user_behavior",
    entities=[user_entity],
    ttl=timedelta(days=30),
    schema=[
        Field(name="total_purchases_30d", dtype=Int64),
        Field(name="total_spend_30d", dtype=Float32),
        Field(name="avg_order_value_30d", dtype=Float32),
        Field(name="purchase_frequency_30d", dtype=Float32),
        Field(name="last_purchase_days_ago", dtype=Int64),
        Field(name="favorite_category", dtype=String),
        Field(name="category_diversity", dtype=Float32),
    ],
    source=user_features_source,
    online=True,
    tags={"team": "growth", "refresh": "daily"},
)

# 交易特征
transaction_features_fv = FeatureView(
    name="transaction_features",
    entities=[user_entity],
    ttl=timedelta(days=7),
    schema=[
        Field(name="transaction_count_1h", dtype=Int64),
        Field(name="transaction_count_24h", dtype=Int64),
        Field(name="avg_transaction_amount_24h", dtype=Float32),
        Field(name="max_transaction_amount_24h", dtype=Float32),
        Field(name="unique_merchants_24h", dtype=Int64),
        Field(name="unique_countries_24h", dtype=Int64),
    ],
    source=transaction_source,
    online=True,
    tags={"team": "risk", "latency": "low"},
)

# 商户特征
merchant_features_fv = FeatureView(
    name="merchant_features",
    entities=[merchant_entity],
    ttl=timedelta(days=30),
    schema=[
        Field(name="merchant_category", dtype=String),
        Field(name="avg_transaction_amount", dtype=Float32),
        Field(name="transaction_volume_30d", dtype=Int64),
        Field(name="chargeback_rate", dtype=Float32),
        Field(name="risk_score", dtype=Float32),
    ],
    source=user_features_source,
    online=True,
    tags={"team": "merchant-platform"},
)


# ==================== On-Demand 特征 ====================

@on_demand_feature_view(
    sources=[user_profile_fv, request_source],
    schema=[
        Field(name="distance_to_home", dtype=Float64),
        Field(name="age_bucket", dtype=String),
    ],
)
def user_context_features(inputs: pd.DataFrame) -> pd.DataFrame:
    """实时计算的上下文特征"""
    import numpy as np

    df = pd.DataFrame()

    # 计算距离(简化版)
    # 实际应该使用用户的家庭位置
    df["distance_to_home"] = np.sqrt(
        inputs["current_latitude"] ** 2 +
        inputs["current_longitude"] ** 2
    )

    # 年龄分桶
    df["age_bucket"] = pd.cut(
        inputs["age"],
        bins=[0, 18, 25, 35, 50, 65, 100],
        labels=["<18", "18-25", "26-35", "36-50", "51-65", "65+"]
    ).astype(str)

    return df


# ==================== 流式特征视图 ====================

user_activity_stream_fv = StreamFeatureView(
    name="user_activity_stream",
    entities=[user_entity],
    ttl=timedelta(hours=1),
    schema=[
        Field(name="clicks_last_5min", dtype=Int64),
        Field(name="page_views_last_5min", dtype=Int64),
        Field(name="session_duration_seconds", dtype=Int64),
    ],
    source=user_activity_push_source,
    aggregations=[
        Aggregation(column="click_event", function="count", time_window=timedelta(minutes=5)),
        Aggregation(column="page_view", function="count", time_window=timedelta(minutes=5)),
        Aggregation(column="session_duration", function="sum", time_window=timedelta(minutes=30)),
    ],
    online=True,
    tags={"team": "realtime", "latency": "sub-second"},
)


# ==================== Feature Service ====================

fraud_detection_service = FeatureService(
    name="fraud_detection",
    features=[
        user_profile_fv[["age", "registration_days", "is_premium"]],
        user_behavior_fv[["total_purchases_30d", "avg_order_value_30d"]],
        transaction_features_fv,
        merchant_features_fv[["merchant_category", "risk_score"]],
    ],
    tags={"model": "fraud_v3", "owner": "risk-team"},
)

recommendation_service = FeatureService(
    name="recommendation",
    features=[
        user_profile_fv,
        user_behavior_fv,
    ],
    tags={"model": "rec_v2", "owner": "growth-team"},
)

Feast SDK 使用

# feast_client.py
"""
Feast 客户端封装
支持特征注册、物化、查询
"""

import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
import pandas as pd
import numpy as np
from feast import FeatureStore
from feast.infra.offline_stores.offline_store import RetrievalJob


@dataclass
class FeatureStoreConfig:
    """Feature Store 配置"""
    repo_path: str
    online_store_type: str = "redis"
    offline_store_type: str = "file"
    registry_type: str = "file"


class FeastFeatureStore:
    """Feast 特征存储客户端"""

    def __init__(self, config: FeatureStoreConfig):
        self.config = config
        self.store = FeatureStore(repo_path=config.repo_path)

    # ==================== 特征物化 ====================

    def materialize(
        self,
        start_date: datetime,
        end_date: datetime,
        feature_views: Optional[List[str]] = None
    ):
        """物化特征到 Online Store"""
        self.store.materialize(
            start_date=start_date,
            end_date=end_date,
            feature_views=feature_views
        )

    def materialize_incremental(
        self,
        end_date: datetime,
        feature_views: Optional[List[str]] = None
    ):
        """增量物化"""
        self.store.materialize_incremental(
            end_date=end_date,
            feature_views=feature_views
        )

    # ==================== 离线特征获取 ====================

    def get_historical_features(
        self,
        entity_df: pd.DataFrame,
        features: List[str],
        full_feature_names: bool = True
    ) -> pd.DataFrame:
        """获取历史特征(Point-in-Time Join)"""
        job = self.store.get_historical_features(
            entity_df=entity_df,
            features=features,
            full_feature_names=full_feature_names
        )
        return job.to_df()

    def create_training_dataset(
        self,
        entity_df: pd.DataFrame,
        feature_service: str,
        output_path: str
    ) -> str:
        """创建训练数据集"""
        # 获取 Feature Service
        fs = self.store.get_feature_service(feature_service)

        # 获取历史特征
        training_df = self.store.get_historical_features(
            entity_df=entity_df,
            features=fs
        ).to_df()

        # 保存数据集
        training_df.to_parquet(output_path, index=False)

        return output_path

    # ==================== 在线特征获取 ====================

    def get_online_features(
        self,
        entity_rows: List[Dict[str, Any]],
        features: Union[List[str], str]
    ) -> Dict[str, List[Any]]:
        """获取在线特征"""
        if isinstance(features, str):
            # Feature Service 名称
            feature_service = self.store.get_feature_service(features)
            response = self.store.get_online_features(
                features=feature_service,
                entity_rows=entity_rows
            )
        else:
            response = self.store.get_online_features(
                features=features,
                entity_rows=entity_rows
            )

        return response.to_dict()

    def get_online_features_batch(
        self,
        entity_rows: List[Dict[str, Any]],
        features: List[str],
        batch_size: int = 100
    ) -> Dict[str, List[Any]]:
        """批量获取在线特征"""
        all_results = {feat.split(":")[-1]: [] for feat in features}

        for i in range(0, len(entity_rows), batch_size):
            batch = entity_rows[i:i + batch_size]
            response = self.store.get_online_features(
                features=features,
                entity_rows=batch
            ).to_dict()

            for key, values in response.items():
                if key in all_results:
                    all_results[key].extend(values)

        return all_results

    # ==================== 特征推送(流式)====================

    def push_features(
        self,
        push_source_name: str,
        df: pd.DataFrame
    ):
        """推送特征到 Online Store"""
        self.store.push(
            push_source_name=push_source_name,
            df=df
        )

    # ==================== 特征元数据 ====================

    def list_feature_views(self) -> List[Dict[str, Any]]:
        """列出所有特征视图"""
        feature_views = self.store.list_feature_views()
        return [
            {
                "name": fv.name,
                "entities": [e.name for e in fv.entities],
                "features": [f.name for f in fv.features],
                "ttl": str(fv.ttl),
                "online": fv.online,
                "tags": fv.tags
            }
            for fv in feature_views
        ]

    def list_feature_services(self) -> List[Dict[str, Any]]:
        """列出所有特征服务"""
        services = self.store.list_feature_services()
        return [
            {
                "name": svc.name,
                "features": [str(f) for f in svc.features],
                "tags": svc.tags
            }
            for svc in services
        ]

    def get_feature_view_schema(self, name: str) -> Dict[str, Any]:
        """获取特征视图 Schema"""
        fv = self.store.get_feature_view(name)
        return {
            "name": fv.name,
            "features": [
                {
                    "name": f.name,
                    "dtype": str(f.dtype)
                }
                for f in fv.features
            ],
            "entities": [e.name for e in fv.entities],
            "source": str(fv.batch_source)
        }


class FeatureComputeEngine:
    """特征计算引擎"""

    def __init__(self, feast_store: FeastFeatureStore):
        self.store = feast_store

    def compute_aggregation_features(
        self,
        df: pd.DataFrame,
        entity_column: str,
        timestamp_column: str,
        value_column: str,
        windows: List[timedelta],
        functions: List[str]
    ) -> pd.DataFrame:
        """计算聚合特征"""
        result_df = df[[entity_column, timestamp_column]].copy()

        for window in windows:
            window_name = f"{window.days}d" if window.days else f"{window.seconds // 3600}h"

            for func in functions:
                feature_name = f"{value_column}_{func}_{window_name}"

                if func == "count":
                    agg_values = self._rolling_count(
                        df, entity_column, timestamp_column, value_column, window
                    )
                elif func == "sum":
                    agg_values = self._rolling_sum(
                        df, entity_column, timestamp_column, value_column, window
                    )
                elif func == "mean":
                    agg_values = self._rolling_mean(
                        df, entity_column, timestamp_column, value_column, window
                    )
                elif func == "max":
                    agg_values = self._rolling_max(
                        df, entity_column, timestamp_column, value_column, window
                    )
                elif func == "min":
                    agg_values = self._rolling_min(
                        df, entity_column, timestamp_column, value_column, window
                    )
                else:
                    continue

                result_df[feature_name] = agg_values

        return result_df

    def _rolling_count(
        self,
        df: pd.DataFrame,
        entity_col: str,
        ts_col: str,
        value_col: str,
        window: timedelta
    ) -> pd.Series:
        """滚动计数"""
        df = df.sort_values([entity_col, ts_col])
        df["_window_start"] = df[ts_col] - window

        counts = []
        for idx, row in df.iterrows():
            entity = row[entity_col]
            ts = row[ts_col]
            window_start = row["_window_start"]

            mask = (
                (df[entity_col] == entity) &
                (df[ts_col] >= window_start) &
                (df[ts_col] <= ts)
            )
            counts.append(mask.sum())

        return pd.Series(counts, index=df.index)

    def _rolling_sum(self, df, entity_col, ts_col, value_col, window):
        df = df.sort_values([entity_col, ts_col])
        result = df.groupby(entity_col)[value_col].rolling(
            window=window, on=ts_col, min_periods=1
        ).sum().reset_index(level=0, drop=True)
        return result

    def _rolling_mean(self, df, entity_col, ts_col, value_col, window):
        df = df.sort_values([entity_col, ts_col])
        result = df.groupby(entity_col)[value_col].rolling(
            window=window, on=ts_col, min_periods=1
        ).mean().reset_index(level=0, drop=True)
        return result

    def _rolling_max(self, df, entity_col, ts_col, value_col, window):
        df = df.sort_values([entity_col, ts_col])
        result = df.groupby(entity_col)[value_col].rolling(
            window=window, on=ts_col, min_periods=1
        ).max().reset_index(level=0, drop=True)
        return result

    def _rolling_min(self, df, entity_col, ts_col, value_col, window):
        df = df.sort_values([entity_col, ts_col])
        result = df.groupby(entity_col)[value_col].rolling(
            window=window, on=ts_col, min_periods=1
        ).min().reset_index(level=0, drop=True)
        return result


# ==================== 特征服务 API ====================

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any

app = FastAPI(title="Feature Store API")

# 全局 Feature Store 实例
feature_store: Optional[FeastFeatureStore] = None


class FeatureRequest(BaseModel):
    entity_rows: List[Dict[str, Any]]
    feature_service: str


class FeatureResponse(BaseModel):
    features: Dict[str, List[Any]]
    latency_ms: float


@app.on_event("startup")
async def startup():
    global feature_store
    config = FeatureStoreConfig(repo_path="./feature_repo")
    feature_store = FeastFeatureStore(config)


@app.post("/features", response_model=FeatureResponse)
async def get_features(request: FeatureRequest):
    """获取在线特征"""
    import time

    start_time = time.time()

    try:
        features = feature_store.get_online_features(
            entity_rows=request.entity_rows,
            features=request.feature_service
        )

        latency_ms = (time.time() - start_time) * 1000

        return FeatureResponse(
            features=features,
            latency_ms=latency_ms
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/feature-views")
async def list_feature_views():
    """列出特征视图"""
    return feature_store.list_feature_views()


@app.get("/feature-services")
async def list_feature_services():
    """列出特征服务"""
    return feature_store.list_feature_services()


# 使用示例
if __name__ == "__main__":
    # 初始化
    config = FeatureStoreConfig(repo_path="./feature_repo")
    fs = FeastFeatureStore(config)

    # 物化特征
    fs.materialize_incremental(end_date=datetime.now())

    # 获取训练数据
    entity_df = pd.DataFrame({
        "user_id": ["user_1", "user_2", "user_3"],
        "event_timestamp": [
            datetime.now() - timedelta(days=1),
            datetime.now() - timedelta(days=2),
            datetime.now() - timedelta(days=3)
        ]
    })

    training_df = fs.get_historical_features(
        entity_df=entity_df,
        features=[
            "user_profile:age",
            "user_profile:gender",
            "user_behavior:total_purchases_30d",
            "user_behavior:avg_order_value_30d"
        ]
    )
    print(f"Training data shape: {training_df.shape}")

    # 获取在线特征
    online_features = fs.get_online_features(
        entity_rows=[{"user_id": "user_1"}],
        features="fraud_detection"  # Feature Service
    )
    print(f"Online features: {online_features}")

自建 Feature Store

简化版 Feature Store 实现

# custom_feature_store.py
"""
自建 Feature Store
轻量级实现,适合中小规模场景
"""

import os
import json
import pickle
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
import redis
from sqlalchemy import create_engine, Column, String, DateTime, Text, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import JSONB
import hashlib


Base = declarative_base()


# ==================== 数据模型 ====================

class FeatureDefinition(Base):
    """特征定义表"""
    __tablename__ = "feature_definitions"

    id = Column(String(64), primary_key=True)
    name = Column(String(255), nullable=False, unique=True)
    entity = Column(String(255), nullable=False)
    dtype = Column(String(50), nullable=False)
    description = Column(Text)
    transformation = Column(Text)  # 特征计算逻辑
    source_table = Column(String(255))
    ttl_seconds = Column(Float, default=86400)
    tags = Column(JSONB, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)


class FeatureValue(Base):
    """特征值表(Offline Store)"""
    __tablename__ = "feature_values"

    id = Column(String(64), primary_key=True)
    feature_id = Column(String(64), nullable=False)
    entity_id = Column(String(255), nullable=False)
    value = Column(Text)
    event_timestamp = Column(DateTime, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)


# ==================== 存储接口 ====================

class OnlineStore(ABC):
    """在线存储抽象"""

    @abstractmethod
    def set(self, key: str, value: Any, ttl: Optional[int] = None):
        pass

    @abstractmethod
    def get(self, key: str) -> Optional[Any]:
        pass

    @abstractmethod
    def mget(self, keys: List[str]) -> List[Optional[Any]]:
        pass

    @abstractmethod
    def delete(self, key: str):
        pass


class RedisOnlineStore(OnlineStore):
    """Redis 在线存储"""

    def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
        self.client = redis.Redis(host=host, port=port, db=db)
        self.prefix = "fs:"

    def _make_key(self, key: str) -> str:
        return f"{self.prefix}{key}"

    def set(self, key: str, value: Any, ttl: Optional[int] = None):
        serialized = pickle.dumps(value)
        if ttl:
            self.client.setex(self._make_key(key), ttl, serialized)
        else:
            self.client.set(self._make_key(key), serialized)

    def get(self, key: str) -> Optional[Any]:
        data = self.client.get(self._make_key(key))
        if data:
            return pickle.loads(data)
        return None

    def mget(self, keys: List[str]) -> List[Optional[Any]]:
        full_keys = [self._make_key(k) for k in keys]
        values = self.client.mget(full_keys)
        return [pickle.loads(v) if v else None for v in values]

    def delete(self, key: str):
        self.client.delete(self._make_key(key))


class OfflineStore(ABC):
    """离线存储抽象"""

    @abstractmethod
    def write(self, feature_id: str, df: pd.DataFrame, entity_column: str, timestamp_column: str):
        pass

    @abstractmethod
    def read(
        self,
        feature_id: str,
        entity_ids: List[str],
        start_time: datetime,
        end_time: datetime
    ) -> pd.DataFrame:
        pass

    @abstractmethod
    def point_in_time_join(
        self,
        entity_df: pd.DataFrame,
        feature_ids: List[str],
        entity_column: str,
        timestamp_column: str
    ) -> pd.DataFrame:
        pass


class PostgresOfflineStore(OfflineStore):
    """PostgreSQL 离线存储"""

    def __init__(self, connection_string: str):
        self.engine = create_engine(connection_string)
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)

    def write(
        self,
        feature_id: str,
        df: pd.DataFrame,
        entity_column: str,
        timestamp_column: str
    ):
        session = self.Session()
        try:
            for _, row in df.iterrows():
                fv = FeatureValue(
                    id=hashlib.sha256(
                        f"{feature_id}:{row[entity_column]}:{row[timestamp_column]}".encode()
                    ).hexdigest()[:64],
                    feature_id=feature_id,
                    entity_id=str(row[entity_column]),
                    value=json.dumps(row.drop([entity_column, timestamp_column]).to_dict()),
                    event_timestamp=row[timestamp_column]
                )
                session.merge(fv)  # Upsert

            session.commit()
        finally:
            session.close()

    def read(
        self,
        feature_id: str,
        entity_ids: List[str],
        start_time: datetime,
        end_time: datetime
    ) -> pd.DataFrame:
        session = self.Session()
        try:
            query = session.query(FeatureValue).filter(
                FeatureValue.feature_id == feature_id,
                FeatureValue.entity_id.in_(entity_ids),
                FeatureValue.event_timestamp >= start_time,
                FeatureValue.event_timestamp <= end_time
            )

            records = []
            for fv in query.all():
                record = json.loads(fv.value)
                record["entity_id"] = fv.entity_id
                record["event_timestamp"] = fv.event_timestamp
                records.append(record)

            return pd.DataFrame(records)
        finally:
            session.close()

    def point_in_time_join(
        self,
        entity_df: pd.DataFrame,
        feature_ids: List[str],
        entity_column: str,
        timestamp_column: str
    ) -> pd.DataFrame:
        """Point-in-Time Join 实现"""
        result_df = entity_df.copy()

        for feature_id in feature_ids:
            feature_values = self._get_latest_before(
                feature_id,
                entity_df[entity_column].tolist(),
                entity_df[timestamp_column].tolist()
            )

            # 合并特征
            for col, values in feature_values.items():
                result_df[f"{feature_id}_{col}"] = values

        return result_df

    def _get_latest_before(
        self,
        feature_id: str,
        entity_ids: List[str],
        timestamps: List[datetime]
    ) -> Dict[str, List[Any]]:
        """获取时间点之前的最新特征值"""
        session = self.Session()
        try:
            results = {}

            for entity_id, ts in zip(entity_ids, timestamps):
                query = session.query(FeatureValue).filter(
                    FeatureValue.feature_id == feature_id,
                    FeatureValue.entity_id == entity_id,
                    FeatureValue.event_timestamp <= ts
                ).order_by(FeatureValue.event_timestamp.desc()).first()

                if query:
                    values = json.loads(query.value)
                    for col, val in values.items():
                        if col not in results:
                            results[col] = []
                        results[col].append(val)
                else:
                    # 填充 None
                    for col in results:
                        results[col].append(None)

            return results
        finally:
            session.close()


# ==================== Feature Store 核心 ====================

@dataclass
class FeatureSpec:
    """特征规格"""
    name: str
    entity: str
    dtype: str
    description: str = ""
    transformation: Optional[Callable] = None
    source_table: str = ""
    ttl_seconds: int = 86400
    tags: Dict[str, str] = field(default_factory=dict)


class SimpleFeatureStore:
    """简化版 Feature Store"""

    def __init__(
        self,
        online_store: OnlineStore,
        offline_store: OfflineStore,
        metadata_db_url: str
    ):
        self.online_store = online_store
        self.offline_store = offline_store
        self.metadata_engine = create_engine(metadata_db_url)
        Base.metadata.create_all(self.metadata_engine)
        self.MetaSession = sessionmaker(bind=self.metadata_engine)

    # ==================== 特征注册 ====================

    def register_feature(self, spec: FeatureSpec) -> str:
        """注册特征"""
        session = self.MetaSession()
        try:
            feature_id = hashlib.sha256(spec.name.encode()).hexdigest()[:64]

            definition = FeatureDefinition(
                id=feature_id,
                name=spec.name,
                entity=spec.entity,
                dtype=spec.dtype,
                description=spec.description,
                transformation=pickle.dumps(spec.transformation).hex() if spec.transformation else None,
                source_table=spec.source_table,
                ttl_seconds=spec.ttl_seconds,
                tags=spec.tags
            )
            session.merge(definition)
            session.commit()
            return feature_id
        finally:
            session.close()

    def get_feature_definition(self, name: str) -> Optional[FeatureSpec]:
        """获取特征定义"""
        session = self.MetaSession()
        try:
            definition = session.query(FeatureDefinition).filter_by(name=name).first()
            if definition:
                return FeatureSpec(
                    name=definition.name,
                    entity=definition.entity,
                    dtype=definition.dtype,
                    description=definition.description,
                    transformation=pickle.loads(bytes.fromhex(definition.transformation)) if definition.transformation else None,
                    source_table=definition.source_table,
                    ttl_seconds=int(definition.ttl_seconds),
                    tags=definition.tags
                )
            return None
        finally:
            session.close()

    def list_features(self, entity: Optional[str] = None) -> List[Dict[str, Any]]:
        """列出特征"""
        session = self.MetaSession()
        try:
            query = session.query(FeatureDefinition)
            if entity:
                query = query.filter_by(entity=entity)

            return [
                {
                    "id": f.id,
                    "name": f.name,
                    "entity": f.entity,
                    "dtype": f.dtype,
                    "description": f.description,
                    "tags": f.tags
                }
                for f in query.all()
            ]
        finally:
            session.close()

    # ==================== 特征写入 ====================

    def ingest_features(
        self,
        feature_name: str,
        df: pd.DataFrame,
        entity_column: str,
        timestamp_column: str,
        materialize_online: bool = True
    ):
        """摄入特征"""
        spec = self.get_feature_definition(feature_name)
        if not spec:
            raise ValueError(f"Feature {feature_name} not registered")

        feature_id = hashlib.sha256(feature_name.encode()).hexdigest()[:64]

        # 写入离线存储
        self.offline_store.write(
            feature_id=feature_id,
            df=df,
            entity_column=entity_column,
            timestamp_column=timestamp_column
        )

        # 物化到在线存储
        if materialize_online:
            self._materialize_to_online(
                feature_name=feature_name,
                df=df,
                entity_column=entity_column,
                ttl=spec.ttl_seconds
            )

    def _materialize_to_online(
        self,
        feature_name: str,
        df: pd.DataFrame,
        entity_column: str,
        ttl: int
    ):
        """物化到在线存储"""
        # 获取每个实体的最新值
        latest = df.sort_values("event_timestamp").groupby(entity_column).last()

        for entity_id, row in latest.iterrows():
            key = f"{feature_name}:{entity_id}"
            value = row.drop(["event_timestamp"], errors="ignore").to_dict()
            self.online_store.set(key, value, ttl=ttl)

    # ==================== 特征查询 ====================

    def get_online_features(
        self,
        feature_names: List[str],
        entity_ids: List[str]
    ) -> Dict[str, Dict[str, Any]]:
        """获取在线特征"""
        results = {}

        for entity_id in entity_ids:
            results[entity_id] = {}

            for feature_name in feature_names:
                key = f"{feature_name}:{entity_id}"
                value = self.online_store.get(key)
                if value:
                    results[entity_id].update(value)

        return results

    def get_historical_features(
        self,
        entity_df: pd.DataFrame,
        feature_names: List[str],
        entity_column: str = "entity_id",
        timestamp_column: str = "event_timestamp"
    ) -> pd.DataFrame:
        """获取历史特征(Point-in-Time Join)"""
        feature_ids = [
            hashlib.sha256(name.encode()).hexdigest()[:64]
            for name in feature_names
        ]

        return self.offline_store.point_in_time_join(
            entity_df=entity_df,
            feature_ids=feature_ids,
            entity_column=entity_column,
            timestamp_column=timestamp_column
        )

    # ==================== 特征计算 ====================

    def compute_feature(
        self,
        feature_name: str,
        source_df: pd.DataFrame
    ) -> pd.DataFrame:
        """计算特征(应用转换)"""
        spec = self.get_feature_definition(feature_name)
        if not spec or not spec.transformation:
            raise ValueError(f"Feature {feature_name} has no transformation")

        return spec.transformation(source_df)


# 使用示例
if __name__ == "__main__":
    # 初始化存储
    online_store = RedisOnlineStore(host="localhost", port=6379)
    offline_store = PostgresOfflineStore("postgresql://user:pass@localhost/features")

    # 创建 Feature Store
    fs = SimpleFeatureStore(
        online_store=online_store,
        offline_store=offline_store,
        metadata_db_url="postgresql://user:pass@localhost/features"
    )

    # 注册特征
    fs.register_feature(FeatureSpec(
        name="user_purchase_count_30d",
        entity="user_id",
        dtype="int64",
        description="User purchase count in last 30 days",
        ttl_seconds=86400,
        tags={"team": "growth", "refresh": "daily"}
    ))

    # 摄入特征
    feature_df = pd.DataFrame({
        "user_id": ["u1", "u2", "u3"],
        "purchase_count": [10, 5, 20],
        "event_timestamp": [datetime.now()] * 3
    })

    fs.ingest_features(
        feature_name="user_purchase_count_30d",
        df=feature_df,
        entity_column="user_id",
        timestamp_column="event_timestamp"
    )

    # 获取在线特征
    online_features = fs.get_online_features(
        feature_names=["user_purchase_count_30d"],
        entity_ids=["u1", "u2"]
    )
    print(f"Online features: {online_features}")

    # 获取历史特征
    entity_df = pd.DataFrame({
        "entity_id": ["u1", "u2"],
        "event_timestamp": [datetime.now() - timedelta(days=1), datetime.now()]
    })

    historical_features = fs.get_historical_features(
        entity_df=entity_df,
        feature_names=["user_purchase_count_30d"]
    )
    print(f"Historical features:\n{historical_features}")

总结

Feature Store 是 ML 基础设施的核心组件:

  1. 核心价值

    • 特征复用:避免重复计算
    • 一致性保证:训练-服务一致
    • 特征发现:统一目录和文档
  2. 架构要点

    • Offline Store:历史特征,支持 Point-in-Time
    • Online Store:低延迟查询
    • Registry:特征元数据
  3. 选型建议

    • 小规模:自建简化版
    • 中规模:Feast
    • 大规模:Tecton/Databricks
  4. 最佳实践

    • 特征命名规范化
    • 完善的文档和血缘
    • 监控特征质量

下一章节将探讨模型评测体系的设计与实现。

Prev
02-数据集工程
Next
04-模型评测体系