正在设计OpenCode的技能注册表,这是一个关键组件,负责技能的发现、索引和分类。以下是详细的方案,包括UML建模和具体实现。

一、UML建模

1. 核心类图

manages

uses

uses

has

creates

SkillRegistry

-Map<String, Skill> skills

-Map~String, Set<String~> categoryIndex

-Map~String, Set<String~> tagIndex

-SearchEngine searchEngine

+discoverSkills(paths: List<String>) : : void

+registerSkill(skill: Skill) : : void

+unregisterSkill(skillId: String) : : void

+findSkill(skillId: String) : : Skill

+searchSkills(query: String) : : List<Skill>

+getSkillsByCategory(category: String) : : List<Skill>

+getSkillsByTag(tag: String) : : List<Skill>

+getAllCategories() : : Set<String>

+getAllTags() : : Set<String>

Skill

-String id

-String name

-String version

-String description

-List<String> categories

-List<String> tags

-SkillMetadata metadata

-String skillPath

+execute(context: ExecutionContext) : : ExecutionResult

SkillMetadata

-String author

-String createdAt

-String updatedAt

-Map<String, String> dependencies

-List<String> requiredPermissions

SearchEngine

-Index index

+indexSkill(skill: Skill) : : void

+removeSkill(skillId: String) : : void

+search(query: String) : : List<String> // returns skill IDs

SkillDiscovery

-List<String> searchPaths

+discover() : : List<Skill>

+scanDirectory(path: String) : : List<Skill>

+loadSkill(skillPath: String) : : Skill

2. 序列图:技能发现与注册

SearchEngine SkillDiscovery SkillRegistry User SearchEngine SkillDiscovery SkillRegistry User loop [for each skillPath] discoverSkills(paths) discover() scanDirectory(path) skillPaths loadSkill(skillPath) skill registerSkill(skill) indexSkill(skill) indexed skills registered

3. 状态图:技能生命周期

register

index

unregister

unregister

discard

Discovered

Registered

Indexed

二、详细设计方案

1. 技能注册表核心实现

# skill_registry.py
import os
import logging
from pathlib import Path
from typing import Dict, List, Set, Optional, Any
from dataclasses import dataclass, field
import hashlib
import json
from datetime import datetime
import fnmatch

from .skill_parser import SkillDSLParser, Skill, SkillMetadata

logger = logging.getLogger(__name__)

@dataclass
class SkillIndexEntry:
    """技能索引条目"""
    skill_id: str
    name: str
    description: str
    categories: List[str]
    tags: List[str]
    metadata: Dict[str, Any]
    skill_path: str
    last_updated: datetime

class SearchEngine:
    """搜索引擎,用于技能全文检索"""
    
    def __init__(self):
        self.index = {}  # 简化实现,实际可以使用Whoosh、Elasticsearch等
        
    def index_skill(self, skill: Skill, skill_path: str):
        """索引技能"""
        # 这里实现简单的倒排索引
        terms = self._extract_terms(skill)
        for term in terms:
            if term not in self.index:
                self.index[term] = set()
            self.index[term].add(skill.metadata.name)
            
    def remove_skill(self, skill_name: str):
        """从索引中移除技能"""
        # 遍历索引,移除包含该技能的项
        for term, skills in self.index.items():
            if skill_name in skills:
                skills.remove(skill_name)
                
    def search(self, query: str) -> List[str]:
        """搜索技能,返回技能名称列表"""
        query_terms = self._extract_terms_from_query(query)
        results = set()
        
        for term in query_terms:
            if term in self.index:
                results.update(self.index[term])
                
        return list(results)
    
    def _extract_terms(self, skill: Skill) -> List[str]:
        """从技能中提取索引项"""
        terms = []
        # 从名称中提取
        terms.extend(skill.metadata.name.lower().split())
        # 从描述中提取
        terms.extend(skill.metadata.description.lower().split())
        # 从标签中提取
        terms.extend(tag.lower() for tag in skill.metadata.tags)
        # 从分类中提取
        terms.extend(category.lower() for category in skill.metadata.categories)
        
        # 去重并过滤停用词
        terms = set(terms)
        stop_words = {'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for'}
        terms = [term for term in terms if term not in stop_words and len(term) > 2]
        
        return terms
    
    def _extract_terms_from_query(self, query: str) -> List[str]:
        """从查询中提取搜索项"""
        return [term.lower() for term in query.split() if len(term) > 2]

class SkillDiscovery:
    """技能发现机制"""
    
    def __init__(self, parser: SkillDSLParser):
        self.parser = parser
        self.search_paths = []
        
    def add_search_path(self, path: str):
        """添加搜索路径"""
        self.search_paths.append(path)
        
    def discover(self) -> List[Skill]:
        """发现所有技能"""
        skills = []
        
        for search_path in self.search_paths:
            if not os.path.exists(search_path):
                logger.warning(f"Search path does not exist: {search_path}")
                continue
                
            skills.extend(self.scan_directory(search_path))
            
        return skills
    
    def scan_directory(self, directory: str) -> List[Skill]:
        """扫描目录中的技能"""
        skills = []
        
        for root, dirs, files in os.walk(directory):
            # 跳过隐藏目录
            dirs[:] = [d for d in dirs if not d.startswith('.')]
            
            for file in files:
                if self._is_skill_file(file):
                    skill_path = os.path.join(root, file)
                    try:
                        skill = self.load_skill(skill_path)
                        skills.append(skill)
                    except Exception as e:
                        logger.error(f"Failed to load skill from {skill_path}: {e}")
                        
        return skills
    
    def load_skill(self, skill_path: str) -> Skill:
        """加载单个技能"""
        # 使用解析器解析技能文件
        skill = self.parser.parse_file(skill_path)
        
        # 设置技能路径
        skill.skill_path = skill_path
        
        return skill
    
    def _is_skill_file(self, filename: str) -> bool:
        """判断是否为技能文件"""
        skill_patterns = ['*.skill.yaml', '*.skill.yml', 'skill.yaml', 'skill.yml']
        return any(fnmatch.fnmatch(filename, pattern) for pattern in skill_patterns)

class SkillRegistry:
    """技能注册表"""
    
    def __init__(self):
        self.skills: Dict[str, Skill] = {}
        self.category_index: Dict[str, Set[str]] = {}
        self.tag_index: Dict[str, Set[str]] = {}
        self.search_engine = SearchEngine()
        self.discovery = SkillDiscovery(SkillDSLParser())
        
    def discover_skills(self, paths: List[str]):
        """发现并注册技能"""
        for path in paths:
            self.discovery.add_search_path(path)
            
        skills = self.discovery.discover()
        
        for skill in skills:
            self.register_skill(skill)
            
        logger.info(f"Discovered and registered {len(skills)} skills")
    
    def register_skill(self, skill: Skill):
        """注册单个技能"""
        skill_id = self._generate_skill_id(skill)
        
        if skill_id in self.skills:
            logger.warning(f"Skill {skill.metadata.name} already registered, updating...")
            self.unregister_skill(skill_id)
            
        # 注册到主字典
        self.skills[skill_id] = skill
        
        # 更新分类索引
        for category in skill.metadata.categories:
            if category not in self.category_index:
                self.category_index[category] = set()
            self.category_index[category].add(skill_id)
            
        # 更新标签索引
        for tag in skill.metadata.tags:
            if tag not in self.tag_index:
                self.tag_index[tag] = set()
            self.tag_index[tag].add(skill_id)
            
        # 更新搜索索引
        self.search_engine.index_skill(skill, skill.skill_path)
        
        logger.info(f"Registered skill: {skill.metadata.name} (id: {skill_id})")
    
    def unregister_skill(self, skill_id: str):
        """取消注册技能"""
        if skill_id not in self.skills:
            return
            
        skill = self.skills[skill_id]
        
        # 从分类索引中移除
        for category in skill.metadata.categories:
            if category in self.category_index and skill_id in self.category_index[category]:
                self.category_index[category].remove(skill_id)
                if not self.category_index[category]:
                    del self.category_index[category]
                    
        # 从标签索引中移除
        for tag in skill.metadata.tags:
            if tag in self.tag_index and skill_id in self.tag_index[tag]:
                self.tag_index[tag].remove(skill_id)
                if not self.tag_index[tag]:
                    del self.tag_index[tag]
                    
        # 从搜索索引中移除
        self.search_engine.remove_skill(skill.metadata.name)
        
        # 从主字典中移除
        del self.skills[skill_id]
        
        logger.info(f"Unregistered skill: {skill.metadata.name}")
    
    def find_skill(self, skill_id: str) -> Optional[Skill]:
        """根据ID查找技能"""
        return self.skills.get(skill_id)
    
    def find_skill_by_name(self, name: str) -> Optional[Skill]:
        """根据名称查找技能"""
        for skill in self.skills.values():
            if skill.metadata.name == name:
                return skill
        return None
    
    def search_skills(self, query: str) -> List[Skill]:
        """搜索技能"""
        skill_names = self.search_engine.search(query)
        skills = []
        
        for skill_name in skill_names:
            skill = self.find_skill_by_name(skill_name)
            if skill:
                skills.append(skill)
                
        return skills
    
    def get_skills_by_category(self, category: str) -> List[Skill]:
        """根据分类获取技能"""
        skill_ids = self.category_index.get(category, set())
        return [self.skills[skill_id] for skill_id in skill_ids if skill_id in self.skills]
    
    def get_skills_by_tag(self, tag: str) -> List[Skill]:
        """根据标签获取技能"""
        skill_ids = self.tag_index.get(tag, set())
        return [self.skills[skill_id] for skill_id in skill_ids if skill_id in self.skills]
    
    def get_all_categories(self) -> Set[str]:
        """获取所有分类"""
        return set(self.category_index.keys())
    
    def get_all_tags(self) -> Set[str]:
        """获取所有标签"""
        return set(self.tag_index.keys())
    
    def get_skill_count(self) -> int:
        """获取技能总数"""
        return len(self.skills)
    
    def _generate_skill_id(self, skill: Skill) -> str:
        """生成技能ID"""
        # 使用名称、版本和路径的哈希作为ID
        content = f"{skill.metadata.name}:{skill.metadata.version}:{skill.skill_path}"
        return hashlib.sha256(content.encode()).hexdigest()[:16]

class SkillRegistryManager:
    """技能注册表管理器,提供高级功能"""
    
    def __init__(self, registry: SkillRegistry):
        self.registry = registry
        
    def export_registry(self, export_path: str):
        """导出注册表信息"""
        export_data = {
            'exported_at': datetime.now().isoformat(),
            'skill_count': self.registry.get_skill_count(),
            'skills': []
        }
        
        for skill_id, skill in self.registry.skills.items():
            skill_data = {
                'id': skill_id,
                'name': skill.metadata.name,
                'version': skill.metadata.version,
                'description': skill.metadata.description,
                'categories': skill.metadata.categories,
                'tags': skill.metadata.tags,
                'skill_path': skill.skill_path,
                'metadata': {
                    'author': skill.metadata.author,
                    'created_at': skill.metadata.created_at,
                    'updated_at': skill.metadata.updated_at,
                    'dependencies': skill.metadata.dependencies,
                    'required_permissions': skill.metadata.required_permissions
                }
            }
            export_data['skills'].append(skill_data)
            
        with open(export_path, 'w') as f:
            json.dump(export_data, f, indent=2)
            
        logger.info(f"Exported registry to {export_path}")
    
    def import_registry(self, import_path: str):
        """导入注册表信息"""
        if not os.path.exists(import_path):
            logger.error(f"Import file not found: {import_path}")
            return
            
        with open(import_path, 'r') as f:
            import_data = json.load(f)
            
        # 这里可以根据导入数据重新注册技能
        # 注意:这不会重新发现技能,只是从导出文件中恢复索引
        logger.info(f"Imported registry from {import_path}")
    
    def generate_report(self) -> Dict[str, Any]:
        """生成注册表报告"""
        return {
            'total_skills': self.registry.get_skill_count(),
            'categories': {
                category: len(skill_ids)
                for category, skill_ids in self.registry.category_index.items()
            },
            'tags': {
                tag: len(skill_ids)
                for tag, skill_ids in self.registry.tag_index.items()
            },
            'skills_by_author': self._group_skills_by_author()
        }
    
    def _group_skills_by_author(self) -> Dict[str, int]:
        """按作者分组技能"""
        authors = {}
        
        for skill in self.registry.skills.values():
            author = skill.metadata.author or 'Unknown'
            authors[author] = authors.get(author, 0) + 1
            
        return authors

# 单例模式,全局注册表实例
_registry_instance = None

def get_registry() -> SkillRegistry:
    """获取全局技能注册表实例"""
    global _registry_instance
    if _registry_instance is None:
        _registry_instance = SkillRegistry()
    return _registry_instance

2. 技能模型扩展

我们需要扩展之前定义的Skill模型,以包含分类和标签信息:

# skill_models.py
from typing import List, Dict, Optional
from pydantic import BaseModel, Field

class SkillMetadata(BaseModel):
    """技能元数据扩展"""
    name: str
    version: str
    description: str
    author: Optional[str] = None
    categories: List[str] = Field(default_factory=list)
    tags: List[str] = Field(default_factory=list)
    created_at: Optional[str] = None
    updated_at: Optional[str] = None
    dependencies: Dict[str, str] = Field(default_factory=dict)
    required_permissions: List[str] = Field(default_factory=list)
    
    class Config:
        json_schema_extra = {
            "example": {
                "name": "git-release",
                "version": "1.0.0",
                "description": "Automates semantic versioning and release process",
                "author": "OpenCode Team",
                "categories": ["git", "automation"],
                "tags": ["release", "versioning", "ci-cd"],
                "dependencies": {
                    "git": ">=2.25",
                    "gh": ">=2.0"
                },
                "required_permissions": ["file.read", "file.write", "network"]
            }
        }

3. 配置文件支持

我们可以通过配置文件来指定技能搜索路径和其他设置:

# opencode_skills_config.yaml
skill_registry:
  # 技能搜索路径
  search_paths:
    - ~/.opencode/skills
    - ~/.claude/skills
    - ./skills
    - ./opencode/skills
  
  # 自动发现间隔(秒,0表示禁用自动发现)
  auto_discovery_interval: 300
  
  # 索引设置
  index:
    enable_fulltext_search: true
    max_index_size: 10000
    
  # 分类系统
  categories:
    predefined:
      - git
      - docker
      - kubernetes
      - aws
      - azure
      - gcp
      - testing
      - deployment
      - monitoring
    allow_custom: true
    
  # 标签系统
  tags:
    max_per_skill: 10
    blacklist: [ "deprecated", "experimental" ]

4. 自动发现与热重载

为了实现技能的自动发现和热重载,我们可以创建一个监控服务:

# skill_watcher.py
import time
import threading
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class SkillFileEventHandler(FileSystemEventHandler):
    """处理技能文件变化的事件处理器"""
    
    def __init__(self, registry: SkillRegistry):
        self.registry = registry
        self.parser = SkillDSLParser()
        
    def on_created(self, event):
        if self._is_skill_file(event.src_path):
            self._load_and_register_skill(event.src_path)
            
    def on_modified(self, event):
        if self._is_skill_file(event.src_path):
            self._load_and_register_skill(event.src_path)
            
    def on_deleted(self, event):
        if self._is_skill_file(event.src_path):
            self._unregister_skill_by_path(event.src_path)
    
    def _is_skill_file(self, path: str) -> bool:
        """判断是否为技能文件"""
        skill_patterns = ['*.skill.yaml', '*.skill.yml', 'skill.yaml', 'skill.yml']
        path_obj = Path(path)
        return any(path_obj.match(pattern) for pattern in skill_patterns)
    
    def _load_and_register_skill(self, skill_path: str):
        """加载并注册技能"""
        try:
            skill = self.parser.parse_file(skill_path)
            self.registry.register_skill(skill)
            logger.info(f"Skill updated from file: {skill_path}")
        except Exception as e:
            logger.error(f"Failed to load skill from {skill_path}: {e}")
    
    def _unregister_skill_by_path(self, skill_path: str):
        """根据路径取消注册技能"""
        # 需要根据路径找到对应的技能ID
        for skill_id, skill in self.registry.skills.items():
            if hasattr(skill, 'skill_path') and skill.skill_path == skill_path:
                self.registry.unregister_skill(skill_id)
                break

class SkillWatcher:
    """技能文件监控器"""
    
    def __init__(self, registry: SkillRegistry, watch_paths: List[str]):
        self.registry = registry
        self.watch_paths = watch_paths
        self.observer = Observer()
        self.event_handler = SkillFileEventHandler(registry)
        self.is_running = False
        
    def start(self):
        """启动监控"""
        if self.is_running:
            return
            
        for path in self.watch_paths:
            if os.path.exists(path):
                self.observer.schedule(self.event_handler, path, recursive=True)
                
        self.observer.start()
        self.is_running = True
        logger.info("Skill watcher started")
        
    def stop(self):
        """停止监控"""
        if self.is_running:
            self.observer.stop()
            self.observer.join()
            self.is_running = False
            logger.info("Skill watcher stopped")
            
    def __enter__(self):
        self.start()
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()

5. 技能索引持久化

为了提高性能,我们可以将技能索引持久化到磁盘:

# skill_index_persistence.py
import pickle
import json
from datetime import datetime
from pathlib import Path

class PersistentSkillRegistry(SkillRegistry):
    """支持持久化的技能注册表"""
    
    def __init__(self, persistence_path: str = "~/.opencode/registry"):
        super().__init__()
        self.persistence_path = Path(persistence_path).expanduser()
        self.persistence_path.mkdir(parents=True, exist_ok=True)
        
    def save(self):
        """保存注册表状态"""
        state = {
            'skills': {},
            'category_index': self.category_index,
            'tag_index': self.tag_index,
            'saved_at': datetime.now().isoformat()
        }
        
        # 只保存技能元数据,不保存完整的技能对象
        for skill_id, skill in self.skills.items():
            state['skills'][skill_id] = {
                'name': skill.metadata.name,
                'version': skill.metadata.version,
                'description': skill.metadata.description,
                'categories': skill.metadata.categories,
                'tags': skill.metadata.tags,
                'skill_path': skill.skill_path,
                'metadata': {
                    'author': skill.metadata.author,
                    'created_at': skill.metadata.created_at,
                    'updated_at': skill.metadata.updated_at
                }
            }
        
        # 保存到文件
        state_file = self.persistence_path / 'registry_state.json'
        with open(state_file, 'w') as f:
            json.dump(state, f, indent=2)
            
        logger.info(f"Registry state saved to {state_file}")
    
    def load(self):
        """加载注册表状态"""
        state_file = self.persistence_path / 'registry_state.json'
        
        if not state_file.exists():
            logger.info("No saved registry state found")
            return
            
        with open(state_file, 'r') as f:
            state = json.load(f)
            
        # 重新发现技能并注册
        # 注意:这里我们只从保存的状态中获取技能路径,然后重新加载技能
        skill_paths = set()
        for skill_id, skill_data in state['skills'].items():
            skill_path = skill_data.get('skill_path')
            if skill_path and os.path.exists(skill_path):
                skill_paths.add(skill_path)
        
        # 发现并注册技能
        for skill_path in skill_paths:
            try:
                skill = self.discovery.load_skill(skill_path)
                self.register_skill(skill)
            except Exception as e:
                logger.error(f"Failed to load skill from {skill_path}: {e}")
                
        logger.info(f"Registry state loaded from {state_file}")

三、使用示例

1. 初始化注册表

# 初始化注册表
registry = SkillRegistry()

# 从配置文件中读取搜索路径
config = load_config('opencode_skills_config.yaml')
search_paths = config['skill_registry']['search_paths']

# 发现并注册技能
registry.discover_skills(search_paths)

# 获取注册表管理器
manager = SkillRegistryManager(registry)

# 生成报告
report = manager.generate_report()
print(f"Total skills: {report['total_skills']}")
print(f"Categories: {report['categories']}")

2. 搜索和使用技能

# 根据分类查找技能
git_skills = registry.get_skills_by_category('git')
for skill in git_skills:
    print(f"Git skill: {skill.metadata.name} - {skill.metadata.description}")

# 根据标签查找技能
release_skills = registry.get_skills_by_tag('release')
for skill in release_skills:
    print(f"Release skill: {skill.metadata.name}")

# 全文搜索
search_results = registry.search_skills('automated release')
for skill in search_results:
    print(f"Found: {skill.metadata.name}")

# 执行技能
skill = registry.find_skill_by_name('git-release')
if skill:
    context = ExecutionContext(project_path='./my-project')
    result = skill.execute(context)
    print(f"Execution result: {result}")

3. 监控技能文件变化

# 启动文件监控
with SkillWatcher(registry, search_paths):
    # 在监控期间,技能文件的任何变化都会自动更新注册表
    time.sleep(3600)  # 运行1小时

4. 持久化注册表状态


# 使用持久化注册表
persistent_registry = PersistentSkillRegistry()

# 加载之前保存的状态
persistent_registry.load()

# 发现新技能
persistent_registry.discover_skills(search_paths)

# 保存当前状态
persistent_registry.save()

四、性能优化

1. 索引优化


class OptimizedSearchEngine(SearchEngine):
    """优化的搜索引擎"""
    
    def __init__(self):
        super().__init__()
        self.skill_vectors = {}  # 技能向量(用于相似性搜索)
        self.tfidf_index = {}    # TF-IDF索引
        
    def index_skill(self, skill: Skill, skill_path: str):
        """索引技能,使用TF-IDF加权"""
        # 计算TF-IDF权重
        terms = self._extract_terms_with_frequency(skill)
        
        # 更新文档频率
        for term in terms:
            if term not in self.tfidf_index:
                self.tfidf_index[term] = {'df': 0, 'skills': {}}
            self.tfidf_index[term]['skills'][skill.metadata.name] = terms[term]
            
        # 更新文档频率计数
        for term in set(terms.keys()):
            self.tfidf_index[term]['df'] += 1
            
        # 同时维护简单索引(用于快速查找)
        super().index_skill(skill, skill_path)
        
    def search(self, query: str, use_tfidf: bool = True) -> List[str]:
        """搜索技能,支持TF-IDF排序"""
        if not use_tfidf:
            return super().search(query)
            
        query_terms = self._extract_terms_from_query(query)
        
        # 计算技能得分
        skill_scores = {}
        total_skills = len(self.skill_vectors)
        
        for term in query_terms:
            if term in self.tfidf_index:
                df = self.tfidf_index[term]['df']
                idf = math.log(total_skills / (1 + df))
                
                for skill_name, tf in self.tfidf_index[term]['skills'].items():
                    score = tf * idf
                    skill_scores[skill_name] = skill_scores.get(skill_name, 0) + score
                    
        # 按得分排序
        sorted_skills = sorted(skill_scores.items(), key=lambda x: x[1], reverse=True)
        return [skill_name for skill_name, score in sorted_skills]
    
    def _extract_terms_with_frequency(self, skill: Skill) -> Dict[str, float]:
        """提取术语及其频率"""
        # 实现术语频率计算
        pass

2. 缓存优化


class CachedSkillRegistry(SkillRegistry):
    """带缓存的技能注册表"""
    
    def __init__(self, cache_size: int = 1000):
        super().__init__()
        self.cache = {}
        self.cache_size = cache_size
        self.access_times = {}
        
    def find_skill(self, skill_id: str) -> Optional[Skill]:
        """查找技能,带缓存"""
        # 检查缓存
        if skill_id in self.cache:
            self.access_times[skill_id] = time.time()
            return self.cache[skill_id]
            
        # 从主存储获取
        skill = super().find_skill(skill_id)
        if skill:
            self._add_to_cache(skill_id, skill)
            
        return skill
        
    def _add_to_cache(self, skill_id: str, skill: Skill):
        """添加到缓存"""
        if len(self.cache) >= self.cache_size:
            # 移除最近最少使用的项
            lru_id = min(self.access_times.items(), key=lambda x: x[1])[0]
            del self.cache[lru_id]
            del self.access_times[lru_id]
            
        self.cache[skill_id] = skill
        self.access_times[skill_id] = time.time()

五、测试方案

1. 单元测试

# test_skill_registry.py
import pytest
import tempfile
import shutil
from pathlib import Path

from skill_registry import SkillRegistry, SkillDiscovery

class TestSkillRegistry:
    
    def setup_method(self):
        self.registry = SkillRegistry()
        self.temp_dir = tempfile.mkdtemp()
        
    def teardown_method(self):
        shutil.rmtree(self.temp_dir)
        
    def test_register_and_find_skill(self):
        """测试注册和查找技能"""
        # 创建测试技能文件
        skill_content = """---
metadata:
  name: test-skill
  version: 1.0.0
  description: A test skill
  categories: [test]
  tags: [unit-test]
steps:
  - name: step1
    type: command
    parameters:
      command: echo
      args: ["Hello"]
---
"""
        skill_file = Path(self.temp_dir) / "test.skill.yaml"
        skill_file.write_text(skill_content)
        
        # 发现并注册技能
        self.registry.discover_skills([str(self.temp_dir)])
        
        # 查找技能
        skill = self.registry.find_skill_by_name('test-skill')
        assert skill is not None
        assert skill.metadata.name == 'test-skill'
        assert 'test' in skill.metadata.categories
        
    def test_category_index(self):
        """测试分类索引"""
        # 创建多个测试技能
        for i in range(3):
            skill_content = f"""---
metadata:
  name: test-skill-{i}
  version: 1.0.0
  description: Test skill {i}
  categories: [test, category-{i % 2}]
steps: []
---
"""
            skill_file = Path(self.temp_dir) / f"test{i}.skill.yaml"
            skill_file.write_text(skill_content)
            
        # 发现并注册技能
        self.registry.discover_skills([str(self.temp_dir)])
        
        # 检查分类索引
        test_skills = self.registry.get_skills_by_category('test')
        assert len(test_skills) == 3
        
        category0_skills = self.registry.get_skills_by_category('category-0')
        assert len(category0_skills) == 2
        
    def test_search_skills(self):
        """测试技能搜索"""
        skill_content = """---
metadata:
  name: git-release-advanced
  version: 2.0.0
  description: Advanced git release automation with changelog generation
  categories: [git, automation]
  tags: [release, versioning, ci-cd]
steps: []
---
"""
        skill_file = Path(self.temp_dir) / "git.skill.yaml"
        skill_file.write_text(skill_content)
        
        self.registry.discover_skills([str(self.temp_dir)])
        
        # 搜索测试
        results = self.registry.search_skills('release automation')
        assert len(results) == 1
        assert results[0].metadata.name == 'git-release-advanced'

六、部署与配置

1. Docker容器化


FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY skill_registry/ ./skill_registry/
COPY skill_parser/ ./skill_parser/

# 创建技能目录
RUN mkdir -p /skills

# 环境变量
ENV SKILL_PATHS="/skills"
ENV REGISTRY_PERSISTENCE_PATH="/data/registry"

# 启动脚本
COPY start_registry.py .
CMD ["python", "start_registry.py"]

2. Kubernetes部署



apiVersion: apps/v1
kind: Deployment
metadata:
  name: opencode-skill-registry
spec:
  replicas: 3
  selector:
    matchLabels:
      app: skill-registry
  template:
    metadata:
      labels:
        app: skill-registry
    spec:
      containers:
      - name: registry
        image: opencode/skill-registry:latest
        ports:
        - containerPort: 8080
        env:
        - name: SKILL_PATHS
          value: "/skills:/shared-skills"
        - name: REGISTRY_PERSISTENCE_PATH
          value: "/data/registry"
        volumeMounts:
        - name: skill-storage
          mountPath: /skills
          readOnly: true
        - name: shared-skill-storage
          mountPath: /shared-skills
          readOnly: true
        - name: registry-data
          mountPath: /data/registry
      volumes:
      - name: skill-storage
        configMap:
          name: builtin-skills
      - name: shared-skill-storage
        persistentVolumeClaim:
          claimName: shared-skills-pvc
      - name: registry-data
        persistentVolumeClaim:
          claimName: registry-data-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: skill-registry-service
spec:
  selector:
    app: skill-registry
  ports:
  - port: 8080
    targetPort: 8080
  type: LoadBalancer

概述

这个技能注册表方案提供了完整的技能发现、索引和分类功能,具有以下特点:

  1. 灵活的发现机制:支持多个搜索路径,自动扫描技能文件

  2. 强大的索引系统:包括分类索引、标签索引和全文搜索

  3. 实时更新:通过文件监控实现技能热重载

  4. 高性能:支持缓存和持久化,TF-IDF搜索优化

  5. 可扩展性:模块化设计,易于添加新的索引策略或存储后端

  6. 生产就绪:支持容器化部署,配置灵活

这个方案可以满足OpenCode技能系统的基础需求,并为未来的扩展奠定了基础。

下面将详细说明:

OpenCode技能系统基础:技能注册表详细方案

我将为你提供一个完整的技能注册表设计方案,涵盖技能发现、索引构建和分类标签系统。

一、系统架构设计:UML建模

1. 核心类图设计

contains

uses

uses

uses

uses

uses

uses

wraps

uses

SkillRegistry

-Map<String, RegisteredSkill> skills

-CategoryIndex categoryIndex

-TagIndex tagIndex

-FullTextIndex fullTextIndex

-SkillLoader skillLoader

-DiscoveryEngine discoveryEngine

+register(skill: Skill) : RegisteredSkill

+unregister(skillId: String)

+findById(skillId: String) : RegisteredSkill

+findByName(name: String) : List<RegisteredSkill>

+search(query: String) : SearchResult

+getByCategory(category: String) : List<RegisteredSkill>

+getByTag(tag: String) : List<RegisteredSkill>

+discover(paths: List<String>) : DiscoveryResult

+refresh() : void

+export() : RegistryExport

+import(export: RegistryExport) : void

RegisteredSkill

+String id

+Skill skill

+SkillMetadata metadata

+String sourcePath

+DateTime registeredAt

+DateTime lastUpdated

+Map<String, Object> usageStats

+boolean isEnabled

+enable()

+disable()

+update(skill: Skill)

SkillLoader

-SkillParser parser

-List<ValidationRule> validationRules

+load(path: String) : Skill

+validate(skill: Skill) : ValidationResult

+parseContent(content: String) : Skill

DiscoveryEngine

-List<DiscoveryStrategy> strategies

-FileSystemWatcher watcher

+discover(paths: List<String>) : List<DiscoveryResult>

+addStrategy(strategy: DiscoveryStrategy)

+removeStrategy(strategyId: String)

+watch(path: String) : WatchHandle

+unwatch(handle: WatchHandle)

CategoryIndex

-Map~String, Set<String~> index

-CategoryHierarchy hierarchy

+addSkill(skill: RegisteredSkill)

+removeSkill(skillId: String)

+getSkills(category: String) : Set<String>

+getCategories() : Set<String>

+getSubcategories(parent: String) : Set<String>

+suggestCategories(query: String) : List<String>

TagIndex

-Map~String, Set<String~> index

-TagStatistics statistics

+addSkill(skill: RegisteredSkill)

+removeSkill(skillId: String)

+getSkills(tag: String) : Set<String>

+getTags() : Set<String>

+getRelatedTags(tag: String) : Set<String>

+getPopularTags(limit: int) : List<TagStats>

FullTextIndex

-SearchEngine engine

-DocumentStore documentStore

+indexSkill(skill: RegisteredSkill)

+removeSkill(skillId: String)

+updateSkill(skill: RegisteredSkill)

+search(query: String) : List<SearchHit>

+suggest(query: String) : List<Suggestion>

«interface»

DiscoveryStrategy

+canHandle(path: String) : boolean

+discover(path: String) : List<DiscoveryItem>

+getPriority() : int

FileSystemDiscovery

-List<String> patterns

+discover(path: String) : List<DiscoveryItem>

+addPattern(pattern: String)

GitRepositoryDiscovery

-String branch

+discover(repoUrl: String) : List<DiscoveryItem>

+cloneIfNeeded(repoUrl: String) : String

RemoteRegistryDiscovery

-String registryUrl

-HttpClient client

+discover() : List<DiscoveryItem>

+search(query: String) : List<DiscoveryItem>

DiscoveryItem

+String path

+String type

+DateTime discoveredAt

+Map<String, Object> metadata

+Skill load() : Skill

SearchResult

+List<SearchHit> hits

+int totalHits

+double maxScore

+Map<String, Aggregation> aggregations

+List<Suggestion> suggestions

SearchHit

+String skillId

+float score

+Map~String, List<String~> highlights

+RegisteredSkill skill

Skill

SkillParser

2. 技能发现与注册序列图

FullTextIndex TagIndex CategoryIndex FileSystemDiscovery SkillLoader DiscoveryEngine SkillRegistry User FullTextIndex TagIndex CategoryIndex FileSystemDiscovery SkillLoader DiscoveryEngine SkillRegistry User loop [For each discovery item] discover("/skills") discover(paths) discover("/skills") DiscoveryItem load(item.path) Skill object register(skill) create RegisteredSkill addSkill(registeredSkill) category mapping addSkill(registeredSkill) tag mapping indexSkill(registeredSkill) indexing complete discovery result registration summary

3. 搜索查询序列图

ResultCache TagIndex CategoryIndex FullTextIndex SkillRegistry User ResultCache TagIndex CategoryIndex FullTextIndex SkillRegistry User loop [For each hit] alt [Cache hit] [Cache miss] search("git release automation") getCached(query) cached result (if exists) cached result search(query) search hits getCategories(skillId) categories getTags(skillId) tags buildSearchResult(hits) cacheResult(query, result) search result

二、核心实现方案

1. 技能注册表核心实现

# registry/core.py
import json
import hashlib
import threading
import asyncio
from datetime import datetime
from typing import Dict, List, Set, Optional, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import logging
from concurrent.futures import ThreadPoolExecutor

logger = logging.getLogger(__name__)

class SkillStatus(Enum):
    """技能状态"""
    DISABLED = "disabled"
    ENABLED = "enabled"
    DEPRECATED = "deprecated"
    EXPERIMENTAL = "experimental"

class SkillSource(Enum):
    """技能来源"""
    LOCAL = "local"
    GIT = "git"
    REGISTRY = "registry"
    USER = "user"

@dataclass
class RegisteredSkill:
    """已注册的技能"""
    id: str
    skill: 'Skill'  # 从skill_models导入
    metadata: 'SkillMetadata'
    source_path: str
    source_type: SkillSource
    registered_at: datetime = field(default_factory=datetime.now)
    last_updated: datetime = field(default_factory=datetime.now)
    status: SkillStatus = SkillStatus.ENABLED
    usage_stats: Dict[str, Any] = field(default_factory=dict)
    
    # 索引字段
    categories: Set[str] = field(default_factory=set)
    tags: Set[str] = field(default_factory=set)
    
    def enable(self):
        """启用技能"""
        self.status = SkillStatus.ENABLED
        
    def disable(self):
        """禁用技能"""
        self.status = SkillStatus.DISABLED
        
    def mark_deprecated(self):
        """标记为已弃用"""
        self.status = SkillStatus.DEPRECATED
        
    def update(self, skill: 'Skill', metadata: 'SkillMetadata' = None):
        """更新技能"""
        self.skill = skill
        if metadata:
            self.metadata = metadata
        self.last_updated = datetime.now()
        
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            'id': self.id,
            'name': self.skill.metadata.name,
            'version': self.skill.metadata.version,
            'description': self.skill.metadata.description,
            'source_path': self.source_path,
            'source_type': self.source_type.value,
            'registered_at': self.registered_at.isoformat(),
            'last_updated': self.last_updated.isoformat(),
            'status': self.status.value,
            'usage_stats': self.usage_stats,
            'categories': list(self.categories),
            'tags': list(self.tags)
        }

class SkillRegistry:
    """技能注册表"""
    
    def __init__(self, 
                 storage_path: Optional[Path] = None,
                 max_workers: int = 4):
        """
        初始化技能注册表
        
        Args:
            storage_path: 持久化存储路径
            max_workers: 最大工作线程数
        """
        self.storage_path = storage_path or Path.home() / '.opencode' / 'registry'
        self.storage_path.mkdir(parents=True, exist_ok=True)
        
        # 核心存储
        self._skills: Dict[str, RegisteredSkill] = {}
        self._name_index: Dict[str, Set[str]] = {}  # 名称 -> 技能ID集合
        self._lock = threading.RLock()
        
        # 索引系统
        self.category_index = CategoryIndex()
        self.tag_index = TagIndex()
        self.fulltext_index = FullTextIndex()
        
        # 发现和加载系统
        self.skill_loader = SkillLoader()
        self.discovery_engine = DiscoveryEngine()
        
        # 配置
        self.max_workers = max_workers
        self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
        
        # 缓存
        self._cache = {}
        self._cache_lock = threading.Lock()
        
        # 统计数据
        self.stats = {
            'total_registered': 0,
            'last_discovery': None,
            'discovery_errors': 0
        }
        
        # 加载持久化数据
        self._load_persistent_data()
    
    def register(self, 
                skill: 'Skill', 
                source_path: str,
                source_type: SkillSource = SkillSource.LOCAL,
                metadata: Optional['SkillMetadata'] = None) -> RegisteredSkill:
        """
        注册技能
        
        Args:
            skill: 技能对象
            source_path: 源路径
            source_type: 来源类型
            metadata: 元数据(可选)
            
        Returns:
            已注册的技能对象
        """
        with self._lock:
            # 生成技能ID
            skill_id = self._generate_skill_id(skill, source_path)
            
            # 检查是否已注册
            if skill_id in self._skills:
                logger.info(f"Skill {skill.metadata.name} already registered, updating...")
                return self._update_existing_skill(skill_id, skill, metadata)
            
            # 使用提供的元数据或从技能中提取
            skill_metadata = metadata or skill.metadata
            
            # 创建已注册的技能对象
            registered_skill = RegisteredSkill(
                id=skill_id,
                skill=skill,
                metadata=skill_metadata,
                source_path=source_path,
                source_type=source_type
            )
            
            # 提取分类和标签
            registered_skill.categories = set(skill_metadata.categories)
            registered_skill.tags = set(skill_metadata.tags)
            
            # 存储到主字典
            self._skills[skill_id] = registered_skill
            
            # 更新名称索引
            self._update_name_index(skill_id, skill.metadata.name)
            
            # 更新分类索引
            self.category_index.add_skill(registered_skill)
            
            # 更新标签索引
            self.tag_index.add_skill(registered_skill)
            
            # 更新全文索引
            self.fulltext_index.index_skill(registered_skill)
            
            # 更新统计
            self.stats['total_registered'] += 1
            
            logger.info(f"Registered skill: {skill.metadata.name} v{skill.metadata.version} (id: {skill_id})")
            
            # 持久化
            self._save_persistent_data()
            
            return registered_skill
    
    def unregister(self, skill_id: str) -> bool:
        """
        取消注册技能
        
        Args:
            skill_id: 技能ID
            
        Returns:
            是否成功取消注册
        """
        with self._lock:
            if skill_id not in self._skills:
                return False
            
            skill = self._skills[skill_id]
            
            # 从名称索引中移除
            self._remove_from_name_index(skill_id, skill.skill.metadata.name)
            
            # 从分类索引中移除
            self.category_index.remove_skill(skill_id)
            
            # 从标签索引中移除
            self.tag_index.remove_skill(skill_id)
            
            # 从全文索引中移除
            self.fulltext_index.remove_skill(skill_id)
            
            # 从主字典中移除
            del self._skills[skill_id]
            
            # 更新统计
            self.stats['total_registered'] = max(0, self.stats['total_registered'] - 1)
            
            logger.info(f"Unregistered skill: {skill.skill.metadata.name}")
            
            # 持久化
            self._save_persistent_data()
            
            return True
    
    def find_by_id(self, skill_id: str) -> Optional[RegisteredSkill]:
        """根据ID查找技能"""
        with self._lock:
            return self._skills.get(skill_id)
    
    def find_by_name(self, name: str) -> List[RegisteredSkill]:
        """根据名称查找技能"""
        with self._lock:
            skill_ids = self._name_index.get(name.lower(), set())
            return [self._skills[skill_id] for skill_id in skill_ids 
                   if skill_id in self._skills]
    
    def find_by_name_version(self, name: str, version: str) -> Optional[RegisteredSkill]:
        """根据名称和版本查找技能"""
        candidates = self.find_by_name(name)
        for skill in candidates:
            if skill.skill.metadata.version == version:
                return skill
        return None
    
    async def discover(self, 
                      paths: List[str],
                      recursive: bool = True,
                      async_load: bool = True) -> 'DiscoveryResult':
        """
        发现并注册技能
        
        Args:
            paths: 要扫描的路径列表
            recursive: 是否递归扫描
            async_load: 是否异步加载
            
        Returns:
            发现结果
        """
        discovery_result = DiscoveryResult()
        self.stats['last_discovery'] = datetime.now()
        
        # 使用发现引擎发现技能
        discovery_items = await self.discovery_engine.discover(
            paths, 
            recursive=recursive
        )
        
        discovery_result.total_found = len(discovery_items)
        
        # 加载和注册技能
        if async_load:
            # 异步加载
            tasks = []
            for item in discovery_items:
                task = asyncio.create_task(self._load_and_register_item(item, discovery_result))
                tasks.append(task)
            
            await asyncio.gather(*tasks, return_exceptions=True)
        else:
            # 同步加载
            for item in discovery_items:
                await self._load_and_register_item(item, discovery_result)
        
        # 持久化
        self._save_persistent_data()
        
        return discovery_result
    
    async def _load_and_register_item(self, 
                                     item: 'DiscoveryItem',
                                     result: 'DiscoveryResult') -> None:
        """加载并注册发现项"""
        try:
            # 加载技能
            skill = await self.skill_loader.load_async(item.path)
            
            # 确定来源类型
            source_type = self._determine_source_type(item.path)
            
            # 注册技能
            registered_skill = self.register(
                skill=skill,
                source_path=item.path,
                source_type=source_type
            )
            
            result.registered.append(registered_skill.id)
            result.successful += 1
            
        except Exception as e:
            logger.error(f"Failed to load skill from {item.path}: {e}")
            result.errors.append({
                'path': item.path,
                'error': str(e)
            })
            result.failed += 1
    
    def search(self, 
               query: str,
               categories: Optional[List[str]] = None,
               tags: Optional[List[str]] = None,
               status: Optional[SkillStatus] = None,
               limit: int = 20,
               offset: int = 0) -> 'SearchResult':
        """
        搜索技能
        
        Args:
            query: 搜索查询
            categories: 过滤分类
            tags: 过滤标签
            status: 过滤状态
            limit: 结果数量限制
            offset: 偏移量
            
        Returns:
            搜索结果
        """
        # 生成缓存键
        cache_key = self._generate_cache_key({
            'query': query,
            'categories': categories,
            'tags': tags,
            'status': status,
            'limit': limit,
            'offset': offset
        })
        
        # 检查缓存
        with self._cache_lock:
            if cache_key in self._cache:
                cached_result, timestamp = self._cache[cache_key]
                # 检查缓存是否过期(5分钟)
                if (datetime.now() - timestamp).seconds < 300:
                    return cached_result
        
        # 执行搜索
        search_result = self._execute_search(
            query=query,
            categories=categories,
            tags=tags,
            status=status,
            limit=limit,
            offset=offset
        )
        
        # 缓存结果
        with self._cache_lock:
            self._cache[cache_key] = (search_result, datetime.now())
            # 清理过期缓存
            self._clean_expired_cache()
        
        return search_result
    
    def _execute_search(self, **kwargs) -> 'SearchResult':
        """执行搜索"""
        # 首先通过全文索引获取候选
        if kwargs['query']:
            text_hits = self.fulltext_index.search(
                query=kwargs['query'],
                limit=kwargs['limit'] * 2  # 获取更多结果以便过滤
            )
            candidate_ids = {hit.skill_id for hit in text_hits}
        else:
            candidate_ids = set(self._skills.keys())
        
        # 应用过滤器
        filtered_skills = []
        for skill_id in candidate_ids:
            skill = self._skills.get(skill_id)
            if not skill:
                continue
            
            # 状态过滤
            if kwargs['status'] and skill.status != kwargs['status']:
                continue
            
            # 分类过滤
            if kwargs['categories']:
                if not any(cat in skill.categories for cat in kwargs['categories']):
                    continue
            
            # 标签过滤
            if kwargs['tags']:
                if not any(tag in skill.tags for tag in kwargs['tags']):
                    continue
            
            filtered_skills.append(skill)
        
        # 排序
        if kwargs['query']:
            # 如果有查询,按相关性排序
            scored_skills = []
            for skill in filtered_skills:
                # 计算简单得分(实际应该使用更复杂的算法)
                score = self._calculate_relevance_score(skill, kwargs['query'])
                scored_skills.append((score, skill))
            
            scored_skills.sort(key=lambda x: x[0], reverse=True)
            filtered_skills = [skill for _, skill in scored_skills]
        else:
            # 否则按名称排序
            filtered_skills.sort(key=lambda x: x.skill.metadata.name.lower())
        
        # 分页
        start = kwargs['offset']
        end = start + kwargs['limit']
        paged_skills = filtered_skills[start:end]
        
        # 构建搜索结果
        search_hits = []
        for i, skill in enumerate(paged_skills):
            hit = SearchHit(
                skill_id=skill.id,
                skill=skill,
                score=(len(filtered_skills) - i) / len(filtered_skills) if filtered_skills else 0
            )
            search_hits.append(hit)
        
        # 构建聚合
        aggregations = self._build_aggregations(filtered_skills)
        
        # 构建建议
        suggestions = self._generate_suggestions(kwargs['query'], filtered_skills)
        
        return SearchResult(
            hits=search_hits,
            total_hits=len(filtered_skills),
            aggregations=aggregations,
            suggestions=suggestions
        )
    
    def get_by_category(self, category: str) -> List[RegisteredSkill]:
        """根据分类获取技能"""
        skill_ids = self.category_index.get_skills(category)
        return [self._skills[skill_id] for skill_id in skill_ids 
               if skill_id in self._skills]
    
    def get_by_tag(self, tag: str) -> List[RegisteredSkill]:
        """根据标签获取技能"""
        skill_ids = self.tag_index.get_skills(tag)
        return [self._skills[skill_id] for skill_id in skill_ids 
               if skill_id in self._skills]
    
    def get_all_categories(self) -> Set[str]:
        """获取所有分类"""
        return self.category_index.get_categories()
    
    def get_all_tags(self) -> Set[str]:
        """获取所有标签"""
        return self.tag_index.get_tags()
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            **self.stats,
            'total_skills': len(self._skills),
            'categories_count': len(self.get_all_categories()),
            'tags_count': len(self.get_all_tags()),
            'cache_size': len(self._cache)
        }
    
    def refresh(self) -> Dict[str, Any]:
        """刷新注册表(重新发现和重新索引)"""
        result = {
            'reindexed': 0,
            'removed': 0,
            'errors': 0
        }
        
        with self._lock:
            # 重新索引所有技能
            for skill_id, skill in list(self._skills.items()):
                try:
                    # 重新加载技能
                    loaded_skill = self.skill_loader.load(skill.source_path)
                    
                    # 更新技能
                    skill.update(loaded_skill, loaded_skill.metadata)
                    
                    # 重新索引
                    self.category_index.remove_skill(skill_id)
                    self.category_index.add_skill(skill)
                    
                    self.tag_index.remove_skill(skill_id)
                    self.tag_index.add_skill(skill)
                    
                    self.fulltext_index.update_skill(skill)
                    
                    result['reindexed'] += 1
                    
                except Exception as e:
                    logger.error(f"Failed to refresh skill {skill_id}: {e}")
                    result['errors'] += 1
            
            # 清理不存在的技能
            for skill_id in list(self._skills.keys()):
                skill = self._skills[skill_id]
                if not Path(skill.source_path).exists():
                    self.unregister(skill_id)
                    result['removed'] += 1
        
        return result
    
    def export(self, 
               include_skills: bool = False,
               format: str = 'json') -> 'RegistryExport':
        """导出注册表"""
        export_data = {
            'exported_at': datetime.now().isoformat(),
            'version': '1.0.0',
            'stats': self.get_stats(),
            'skills': []
        }
        
        for skill in self._skills.values():
            skill_data = skill.to_dict()
            
            if include_skills:
                # 包含技能内容
                skill_data['skill_content'] = self._serialize_skill(skill.skill)
            
            export_data['skills'].append(skill_data)
        
        return RegistryExport(data=export_data, format=format)
    
    def import_export(self, export: 'RegistryExport') -> Dict[str, Any]:
        """导入注册表"""
        result = {
            'imported': 0,
            'skipped': 0,
            'errors': 0
        }
        
        data = export.data
        
        for skill_data in data['skills']:
            try:
                # 检查是否已存在
                existing = self.find_by_name_version(
                    skill_data['name'],
                    skill_data.get('version', '1.0.0')
                )
                
                if existing:
                    result['skipped'] += 1
                    continue
                
                # 如果是完整导出,重新注册技能
                if 'skill_content' in skill_data:
                    skill = self._deserialize_skill(skill_data['skill_content'])
                    self.register(
                        skill=skill,
                        source_path=skill_data['source_path'],
                        source_type=SkillSource(skill_data['source_type'])
                    )
                else:
                    # 只注册元数据
                    pass
                
                result['imported'] += 1
                
            except Exception as e:
                logger.error(f"Failed to import skill {skill_data.get('name')}: {e}")
                result['errors'] += 1
        
        return result
    
    # 私有方法
    def _generate_skill_id(self, skill: 'Skill', source_path: str) -> str:
        """生成技能ID"""
        content = f"{skill.metadata.name}:{skill.metadata.version}:{source_path}"
        return f"skill_{hashlib.sha256(content.encode()).hexdigest()[:16]}"
    
    def _update_existing_skill(self, 
                              skill_id: str, 
                              skill: 'Skill',
                              metadata: Optional['SkillMetadata']) -> RegisteredSkill:
        """更新现有技能"""
        existing = self._skills[skill_id]
        
        # 更新名称索引
        if existing.skill.metadata.name != skill.metadata.name:
            self._remove_from_name_index(skill_id, existing.skill.metadata.name)
            self._update_name_index(skill_id, skill.metadata.name)
        
        # 更新技能
        existing.update(skill, metadata)
        
        # 重新索引
        self.category_index.remove_skill(skill_id)
        self.category_index.add_skill(existing)
        
        self.tag_index.remove_skill(skill_id)
        self.tag_index.add_skill(existing)
        
        self.fulltext_index.update_skill(existing)
        
        return existing
    
    def _update_name_index(self, skill_id: str, name: str):
        """更新名称索引"""
        name_key = name.lower()
        if name_key not in self._name_index:
            self._name_index[name_key] = set()
        self._name_index[name_key].add(skill_id)
    
    def _remove_from_name_index(self, skill_id: str, name: str):
        """从名称索引中移除"""
        name_key = name.lower()
        if name_key in self._name_index:
            self._name_index[name_key].discard(skill_id)
            if not self._name_index[name_key]:
                del self._name_index[name_key]
    
    def _determine_source_type(self, path: str) -> SkillSource:
        """确定来源类型"""
        if path.startswith('http://') or path.startswith('https://'):
            if 'github.com' in path or 'gitlab.com' in path:
                return SkillSource.GIT
            else:
                return SkillSource.REGISTRY
        elif path.startswith('git@'):
            return SkillSource.GIT
        else:
            return SkillSource.LOCAL
    
    def _generate_cache_key(self, params: Dict) -> str:
        """生成缓存键"""
        import json
        key_data = json.dumps(params, sort_keys=True)
        return hashlib.md5(key_data.encode()).hexdigest()
    
    def _clean_expired_cache(self, max_age: int = 300):
        """清理过期缓存"""
        now = datetime.now()
        expired_keys = []
        
        for key, (_, timestamp) in self._cache.items():
            if (now - timestamp).seconds > max_age:
                expired_keys.append(key)
        
        for key in expired_keys:
            del self._cache[key]
    
    def _calculate_relevance_score(self, skill: RegisteredSkill, query: str) -> float:
        """计算相关性得分"""
        score = 0.0
        
        # 名称匹配
        if query.lower() in skill.skill.metadata.name.lower():
            score += 10.0
        
        # 描述匹配
        if query.lower() in skill.skill.metadata.description.lower():
            score += 5.0
        
        # 标签匹配
        for tag in skill.tags:
            if query.lower() in tag.lower():
                score += 3.0
        
        # 分类匹配
        for category in skill.categories:
            if query.lower() in category.lower():
                score += 2.0
        
        return score
    
    def _build_aggregations(self, skills: List[RegisteredSkill]) -> Dict[str, Any]:
        """构建聚合数据"""
        aggregations = {
            'categories': {},
            'tags': {},
            'status': {}
        }
        
        for skill in skills:
            # 分类聚合
            for category in skill.categories:
                aggregations['categories'][category] = aggregations['categories'].get(category, 0) + 1
            
            # 标签聚合
            for tag in skill.tags:
                aggregations['tags'][tag] = aggregations['tags'].get(tag, 0) + 1
            
            # 状态聚合
            status = skill.status.value
            aggregations['status'][status] = aggregations['status'].get(status, 0) + 1
        
        return aggregations
    
    def _generate_suggestions(self, query: str, skills: List[RegisteredSkill]) -> List['Suggestion']:
        """生成搜索建议"""
        suggestions = []
        
        if not query or len(query) < 2:
            return suggestions
        
        query_lower = query.lower()
        
        # 从技能中提取建议
        seen_suggestions = set()
        
        for skill in skills:
            # 名称建议
            if query_lower in skill.skill.metadata.name.lower():
                suggestion = skill.skill.metadata.name
                if suggestion not in seen_suggestions:
                    suggestions.append(Suggestion(
                        text=suggestion,
                        type='skill_name'
                    ))
                    seen_suggestions.add(suggestion)
            
            # 标签建议
            for tag in skill.tags:
                if query_lower in tag.lower():
                    if tag not in seen_suggestions:
                        suggestions.append(Suggestion(
                            text=tag,
                            type='tag'
                        ))
                        seen_suggestions.add(tag)
            
            # 分类建议
            for category in skill.categories:
                if query_lower in category.lower():
                    if category not in seen_suggestions:
                        suggestions.append(Suggestion(
                            text=category,
                            type='category'
                        ))
                        seen_suggestions.add(category)
        
        # 限制数量
        return suggestions[:10]
    
    def _save_persistent_data(self):
        """保存持久化数据"""
        try:
            data = {
                'skills': {},
                'name_index': self._name_index,
                'stats': self.stats,
                'saved_at': datetime.now().isoformat()
            }
            
            # 只保存技能元数据,不保存完整技能对象
            for skill_id, skill in self._skills.items():
                data['skills'][skill_id] = skill.to_dict()
            
            file_path = self.storage_path / 'registry.json'
            with open(file_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, default=str)
            
            logger.debug(f"Registry data saved to {file_path}")
            
        except Exception as e:
            logger.error(f"Failed to save registry data: {e}")
    
    def _load_persistent_data(self):
        """加载持久化数据"""
        file_path = self.storage_path / 'registry.json'
        
        if not file_path.exists():
            logger.info("No persistent registry data found")
            return
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            # 加载名称索引和统计
            self._name_index = data.get('name_index', {})
            self.stats.update(data.get('stats', {}))
            
            logger.info(f"Loaded registry data from {file_path}")
            
        except Exception as e:
            logger.error(f"Failed to load registry data: {e}")
    
    def _serialize_skill(self, skill: 'Skill') -> str:
        """序列化技能"""
        # 使用之前的技能解析器的序列化功能
        from parsers.skill_parser import SkillDSLParser
        parser = SkillDSLParser()
        return parser.serialize(skill, format='yaml')
    
    def _deserialize_skill(self, content: str) -> 'Skill':
        """反序列化技能"""
        from parsers.skill_parser import SkillDSLParser
        parser = SkillDSLParser()
        return parser.parse(content)

2. 分类索引系统实现

# registry/indexes/category_index.py
import re
from typing import Dict, Set, List, Optional
from collections import defaultdict
import unicodedata

class CategoryHierarchy:
    """分类层次结构"""
    
    def __init__(self):
        self.parents: Dict[str, str] = {}  # 子分类 -> 父分类
        self.children: Dict[str, Set[str]] = defaultdict(set)  # 父分类 -> 子分类集合
    
    def add_category(self, category: str, parent: Optional[str] = None):
        """添加分类"""
        if parent:
            self.parents[category] = parent
            self.children[parent].add(category)
        elif category not in self.parents:
            # 顶级分类
            self.parents[category] = None
    
    def get_parent(self, category: str) -> Optional[str]:
        """获取父分类"""
        return self.parents.get(category)
    
    def get_children(self, category: str) -> Set[str]:
        """获取子分类"""
        return self.children.get(category, set())
    
    def get_ancestors(self, category: str) -> List[str]:
        """获取所有祖先分类"""
        ancestors = []
        current = category
        
        while current in self.parents and self.parents[current]:
            current = self.parents[current]
            ancestors.append(current)
        
        return ancestors
    
    def get_descendants(self, category: str) -> Set[str]:
        """获取所有后代分类"""
        descendants = set()
        
        def collect_children(cat: str):
            for child in self.get_children(cat):
                descendants.add(child)
                collect_children(child)
        
        collect_children(category)
        return descendants
    
    def normalize_path(self, category: str) -> str:
        """规范化分类路径"""
        parts = []
        current = category
        
        while current:
            parts.insert(0, current)
            current = self.get_parent(current)
        
        return '::'.join(parts)

class CategoryIndex:
    """分类索引"""
    
    def __init__(self):
        self.index: Dict[str, Set[str]] = defaultdict(set)  # 分类 -> 技能ID集合
        self.hierarchy = CategoryHierarchy()
        self._normalized_names: Dict[str, str] = {}  # 规范化名称 -> 原始名称
    
    def add_skill(self, skill: 'RegisteredSkill'):
        """添加技能到分类索引"""
        skill_id = skill.id
        
        for category in skill.categories:
            # 规范化分类名称
            normalized = self._normalize_category_name(category)
            
            # 检查是否有父分类
            if '::' in normalized:
                parts = normalized.split('::')
                for i in range(1, len(parts)):
                    parent = '::'.join(parts[:i])
                    child = '::'.join(parts[:i+1])
                    self.hierarchy.add_category(child, parent)
            
            # 添加到索引
            self.index[normalized].add(skill_id)
            
            # 同时添加到所有祖先分类
            ancestors = self.hierarchy.get_ancestors(normalized)
            for ancestor in ancestors:
                self.index[ancestor].add(skill_id)
    
    def remove_skill(self, skill_id: str):
        """从分类索引中移除技能"""
        categories_to_remove = []
        
        for category, skill_ids in self.index.items():
            if skill_id in skill_ids:
                skill_ids.discard(skill_id)
                if not skill_ids:
                    categories_to_remove.append(category)
        
        # 清理空分类
        for category in categories_to_remove:
            del self.index[category]
    
    def get_skills(self, category: str) -> Set[str]:
        """获取分类下的技能ID"""
        normalized = self._normalize_category_name(category)
        
        # 获取直接分类下的技能
        direct_skills = self.index.get(normalized, set()).copy()
        
        # 获取子分类下的技能
        descendants = self.hierarchy.get_descendants(normalized)
        for descendant in descendants:
            direct_skills.update(self.index.get(descendant, set()))
        
        return direct_skills
    
    def get_categories(self) -> Set[str]:
        """获取所有分类"""
        # 返回原始分类名称
        original_names = set()
        for normalized in self.index.keys():
            original = self._normalized_names.get(normalized, normalized)
            original_names.add(original)
        
        return original_names
    
    def get_subcategories(self, category: str) -> Set[str]:
        """获取子分类"""
        normalized = self._normalize_category_name(category)
        children = self.hierarchy.get_children(normalized)
        
        # 转换回原始名称
        original_children = set()
        for child in children:
            original = self._denormalize_category_name(child)
            original_children.add(original)
        
        return original_children
    
    def suggest_categories(self, query: str, limit: int = 10) -> List[str]:
        """建议分类"""
        suggestions = []
        query_lower = query.lower()
        
        for normalized, original in self._normalized_names.items():
            # 检查是否匹配
            if (query_lower in normalized.lower() or 
                query_lower in original.lower()):
                suggestions.append(original)
            
            if len(suggestions) >= limit:
                break
        
        return suggestions
    
    def get_category_stats(self) -> Dict[str, int]:
        """获取分类统计"""
        stats = {}
        
        for category, skill_ids in self.index.items():
            original = self._denormalize_category_name(category)
            stats[original] = len(skill_ids)
        
        return stats
    
    def _normalize_category_name(self, category: str) -> str:
        """规范化分类名称"""
        # 转换为小写,移除多余空格
        normalized = category.strip().lower()
        
        # 移除特殊字符,只保留字母、数字、空格和::
        normalized = re.sub(r'[^a-z0-9\s:]', '', normalized)
        
        # 替换多个空格为单个空格
        normalized = re.sub(r'\s+', ' ', normalized)
        
        # 分割层级
        if '::' in normalized:
            parts = normalized.split('::')
            parts = [part.strip() for part in parts]
            normalized = '::'.join(parts)
        
        # 存储映射
        if normalized != category:
            self._normalized_names[normalized] = category
        
        return normalized
    
    def _denormalize_category_name(self, normalized: str) -> str:
        """反规范化分类名称"""
        return self._normalized_names.get(normalized, normalized)
    
    def build_hierarchy_tree(self) -> Dict[str, Any]:
        """构建分类层次树"""
        tree = {}
        
        # 找到所有顶级分类
        top_level = set()
        for category in self.index.keys():
            if '::' not in category:
                top_level.add(category)
            else:
                parent = category.rsplit('::', 1)[0]
                if parent not in self.index:
                    top_level.add(parent)
        
        # 递归构建树
        def build_node(category: str) -> Dict[str, Any]:
            node = {
                'name': self._denormalize_category_name(category),
                'skill_count': len(self.index.get(category, set())),
                'children': []
            }
            
            children = self.hierarchy.get_children(category)
            for child in children:
                child_node = build_node(child)
                node['children'].append(child_node)
            
            return node
        
        for top_category in sorted(top_level):
            tree[top_category] = build_node(top_category)
        
        return tree

3. 标签索引系统实现

# registry/indexes/tag_index.py
from typing import Dict, Set, List, Tuple
from collections import defaultdict, Counter
import heapq

class TagStatistics:
    """标签统计"""
    
    def __init__(self):
        self.tag_freq: Dict[str, int] = defaultdict(int)  # 标签频率
        self.cooccurrence: Dict[Tuple[str, str], int] = defaultdict(int)  # 共现统计
        self.tag_timeline: Dict[str, List[int]] = defaultdict(list)  # 时间线统计
    
    def record_tag(self, tag: str, timestamp: int = None):
        """记录标签使用"""
        self.tag_freq[tag] += 1
        
        if timestamp:
            self.tag_timeline[tag].append(timestamp)
    
    def record_cooccurrence(self, tag1: str, tag2: str):
        """记录标签共现"""
        if tag1 != tag2:
            key = tuple(sorted([tag1, tag2]))
            self.cooccurrence[key] += 1
    
    def get_popular_tags(self, limit: int = 20) -> List[Tuple[str, int]]:
        """获取热门标签"""
        return heapq.nlargest(limit, self.tag_freq.items(), key=lambda x: x[1])
    
    def get_related_tags(self, tag: str, limit: int = 10) -> List[Tuple[str, int]]:
        """获取相关标签"""
        related = []
        
        for (tag1, tag2), count in self.cooccurrence.items():
            if tag1 == tag:
                related.append((tag2, count))
            elif tag2 == tag:
                related.append((tag1, count))
        
        # 按共现次数排序
        related.sort(key=lambda x: x[1], reverse=True)
        return related[:limit]
    
    def get_tag_trend(self, tag: str, time_window: str = 'week') -> Dict[str, int]:
        """获取标签趋势"""
        # 这里可以实现时间窗口分析
        return {'current': self.tag_freq.get(tag, 0)}

class TagIndex:
    """标签索引"""
    
    def __init__(self):
        self.index: Dict[str, Set[str]] = defaultdict(set)  # 标签 -> 技能ID集合
        self.skill_tags: Dict[str, Set[str]] = defaultdict(set)  # 技能ID -> 标签集合
        self.statistics = TagStatistics()
        self._normalized_tags: Dict[str, str] = {}  # 规范化标签 -> 原始标签
    
    def add_skill(self, skill: 'RegisteredSkill'):
        """添加技能到标签索引"""
        skill_id = skill.id
        tags = skill.tags
        
        # 记录技能标签
        self.skill_tags[skill_id] = tags.copy()
        
        # 添加到索引
        for tag in tags:
            normalized = self._normalize_tag(tag)
            self.index[normalized].add(skill_id)
            
            # 更新统计
            self.statistics.record_tag(normalized)
        
        # 记录共现
        tag_list = list(tags)
        for i in range(len(tag_list)):
            for j in range(i + 1, len(tag_list)):
                self.statistics.record_cooccurrence(
                    self._normalize_tag(tag_list[i]),
                    self._normalize_tag(tag_list[j])
                )
    
    def remove_skill(self, skill_id: str):
        """从标签索引中移除技能"""
        if skill_id not in self.skill_tags:
            return
        
        # 从索引中移除
        tags = self.skill_tags[skill_id]
        for tag in tags:
            normalized = self._normalize_tag(tag)
            if normalized in self.index:
                self.index[normalized].discard(skill_id)
                if not self.index[normalized]:
                    del self.index[normalized]
        
        # 从技能标签映射中移除
        del self.skill_tags[skill_id]
    
    def get_skills(self, tag: str) -> Set[str]:
        """获取标签下的技能ID"""
        normalized = self._normalize_tag(tag)
        return self.index.get(normalized, set()).copy()
    
    def get_tags(self) -> Set[str]:
        """获取所有标签"""
        # 返回原始标签名称
        original_tags = set()
        for normalized in self.index.keys():
            original = self._normalized_tags.get(normalized, normalized)
            original_tags.add(original)
        
        return original_tags
    
    def get_skill_tags(self, skill_id: str) -> Set[str]:
        """获取技能的标签"""
        return self.skill_tags.get(skill_id, set()).copy()
    
    def get_related_tags(self, tag: str) -> Set[str]:
        """获取相关标签"""
        normalized = self._normalize_tag(tag)
        related = self.statistics.get_related_tags(normalized)
        
        # 转换回原始标签名称
        original_related = set()
        for related_tag, _ in related:
            original = self._denormalize_tag(related_tag)
            original_related.add(original)
        
        return original_related
    
    def get_popular_tags(self, limit: int = 20) -> List[Tuple[str, int]]:
        """获取热门标签"""
        popular = self.statistics.get_popular_tags(limit)
        
        # 转换回原始标签名称
        result = []
        for tag, count in popular:
            original = self._denormalize_tag(tag)
            result.append((original, count))
        
        return result
    
    def search_by_tags(self, 
                      include_tags: Set[str],
                      exclude_tags: Set[str] = None) -> Set[str]:
        """
        根据标签搜索技能
        
        Args:
            include_tags: 必须包含的标签
            exclude_tags: 必须排除的标签
            
        Returns:
            符合条件的技能ID集合
        """
        if not include_tags:
            return set()
        
        # 初始化为第一个标签的技能集合
        first_tag = next(iter(include_tags))
        result = self.get_skills(first_tag)
        
        # 应用其他包含标签
        for tag in include_tags:
            if tag == first_tag:
                continue
            result &= self.get_skills(tag)
        
        # 应用排除标签
        if exclude_tags:
            for tag in exclude_tags:
                result -= self.get_skills(tag)
        
        return result
    
    def suggest_tags(self, 
                    partial_tag: str, 
                    context_tags: Set[str] = None,
                    limit: int = 10) -> List[str]:
        """建议标签"""
        suggestions = []
        partial_lower = partial_tag.lower()
        
        # 基于部分匹配的建议
        for normalized, original in self._normalized_tags.items():
            if partial_lower in normalized.lower():
                suggestions.append(original)
        
        # 基于上下文标签的相关建议
        if context_tags:
            for tag in context_tags:
                related = self.get_related_tags(tag)
                for related_tag in related:
                    if partial_lower in related_tag.lower():
                        if related_tag not in suggestions:
                            suggestions.append(related_tag)
        
        # 去重并限制数量
        seen = set()
        unique_suggestions = []
        
        for suggestion in suggestions:
            if suggestion not in seen and len(unique_suggestions) < limit:
                seen.add(suggestion)
                unique_suggestions.append(suggestion)
        
        return unique_suggestions
    
    def get_tag_cloud_data(self, 
                          min_font: int = 12,
                          max_font: int = 48) -> List[Dict[str, Any]]:
        """获取标签云数据"""
        tags_with_freq = []
        
        for normalized, skill_ids in self.index.items():
            freq = len(skill_ids)
            if freq > 0:
                original = self._denormalize_tag(normalized)
                tags_with_freq.append({
                    'tag': original,
                    'frequency': freq,
                    'skills_count': freq
                })
        
        if not tags_with_freq:
            return []
        
        # 计算字体大小
        min_freq = min(tag['frequency'] for tag in tags_with_freq)
        max_freq = max(tag['frequency'] for tag in tags_with_freq)
        
        for tag in tags_with_freq:
            if max_freq == min_freq:
                tag['font_size'] = (min_font + max_font) // 2
            else:
                # 线性映射频率到字体大小
                normalized_freq = (tag['frequency'] - min_freq) / (max_freq - min_freq)
                tag['font_size'] = min_font + int(normalized_freq * (max_font - min_font))
        
        return tags_with_freq
    
    def _normalize_tag(self, tag: str) -> str:
        """规范化标签"""
        # 转换为小写,移除多余空格
        normalized = tag.strip().lower()
        
        # 移除特殊字符,只保留字母、数字和连字符
        normalized = re.sub(r'[^a-z0-9\-]', '', normalized)
        
        # 存储映射
        if normalized != tag:
            self._normalized_tags[normalized] = tag
        
        return normalized
    
    def _denormalize_tag(self, normalized: str) -> str:
        """反规范化标签"""
        return self._normalized_tags.get(normalized, normalized)

4. 发现引擎实现

# registry/discovery/engine.py
import asyncio
import os
import fnmatch
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import logging
from concurrent.futures import ThreadPoolExecutor

logger = logging.getLogger(__name__)

@dataclass
class DiscoveryItem:
    """发现项"""
    path: str
    type: str  # 'file', 'directory', 'git', 'remote'
    discovered_at: datetime
    metadata: Dict[str, Any] = None
    priority: int = 0
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

class DiscoveryStrategy:
    """发现策略基类"""
    
    def __init__(self, priority: int = 0):
        self.priority = priority
        self.name = self.__class__.__name__
    
    async def can_handle(self, path: str) -> bool:
        """检查是否能处理该路径"""
        raise NotImplementedError
    
    async def discover(self, path: str) -> List[DiscoveryItem]:
        """发现技能"""
        raise NotImplementedError
    
    def get_priority(self) -> int:
        """获取优先级"""
        return self.priority

class FileSystemDiscovery(DiscoveryStrategy):
    """文件系统发现策略"""
    
    def __init__(self, 
                 patterns: Optional[List[str]] = None,
                 recursive: bool = True,
                 priority: int = 10):
        super().__init__(priority)
        self.patterns = patterns or [
            '*.skill.yaml',
            '*.skill.yml',
            'skill.yaml',
            'skill.yml',
            '**/.opencode/skills/*.yaml',
            '**/.opencode/skills/*.yml',
            '**/.claude/skills/*.yaml',
            '**/.claude/skills/*.yml'
        ]
        self.recursive = recursive
        self.ignored_patterns = [
            '**/node_modules/**',
            '**/.git/**',
            '**/.svn/**',
            '**/__pycache__/**',
            '**/*.pyc',
            '**/.DS_Store'
        ]
    
    async def can_handle(self, path: str) -> bool:
        """检查是否能处理该路径"""
        path_obj = Path(path)
        return path_obj.exists() and (path_obj.is_file() or path_obj.is_dir())
    
    async def discover(self, path: str) -> List[DiscoveryItem]:
        """发现文件系统中的技能"""
        items = []
        path_obj = Path(path)
        
        if path_obj.is_file():
            # 单个文件
            if self._matches_patterns(path_obj):
                items.append(self._create_item(path_obj))
        elif path_obj.is_dir():
            # 目录
            if self.recursive:
                items.extend(await self._scan_directory_recursive(path_obj))
            else:
                items.extend(await self._scan_directory(path_obj))
        
        return items
    
    async def _scan_directory_recursive(self, directory: Path) -> List[DiscoveryItem]:
        """递归扫描目录"""
        items = []
        
        try:
            for root, dirs, files in os.walk(directory):
                # 跳过忽略的目录
                dirs[:] = [d for d in dirs if not self._should_ignore(Path(root) / d)]
                
                # 扫描文件
                for file in files:
                    file_path = Path(root) / file
                    if self._matches_patterns(file_path) and not self._should_ignore(file_path):
                        items.append(self._create_item(file_path))
        
        except Exception as e:
            logger.error(f"Error scanning directory {directory}: {e}")
        
        return items
    
    async def _scan_directory(self, directory: Path) -> List[DiscoveryItem]:
        """非递归扫描目录"""
        items = []
        
        try:
            for item in directory.iterdir():
                if item.is_file() and self._matches_patterns(item) and not self._should_ignore(item):
                    items.append(self._create_item(item))
        
        except Exception as e:
            logger.error(f"Error scanning directory {directory}: {e}")
        
        return items
    
    def _matches_patterns(self, path: Path) -> bool:
        """检查路径是否匹配模式"""
        path_str = str(path)
        
        for pattern in self.patterns:
            if fnmatch.fnmatch(path_str, pattern):
                return True
        
        return False
    
    def _should_ignore(self, path: Path) -> bool:
        """检查是否应该忽略路径"""
        path_str = str(path)
        
        for pattern in self.ignored_patterns:
            if fnmatch.fnmatch(path_str, pattern):
                return True
        
        return False
    
    def _create_item(self, path: Path) -> DiscoveryItem:
        """创建发现项"""
        return DiscoveryItem(
            path=str(path),
            type='file',
            discovered_at=datetime.now(),
            metadata={
                'size': path.stat().st_size if path.exists() else 0,
                'modified': path.stat().st_mtime if path.exists() else 0,
                'is_file': path.is_file()
            },
            priority=self.priority
        )

class GitRepositoryDiscovery(DiscoveryStrategy):
    """Git仓库发现策略"""
    
    def __init__(self, 
                 clone_temp_dir: Optional[str] = None,
                 priority: int = 20):
        super().__init__(priority)
        self.clone_temp_dir = clone_temp_dir or '/tmp/opencode_git_clones'
        Path(self.clone_temp_dir).mkdir(parents=True, exist_ok=True)
    
    async def can_handle(self, path: str) -> bool:
        """检查是否能处理该路径"""
        return (
            path.startswith('http://') or 
            path.startswith('https://') or 
            path.startswith('git@') or
            (Path(path).exists() and Path(path) / '.git' in Path(path).iterdir())
        )
    
    async def discover(self, path: str) -> List[DiscoveryItem]:
        """发现Git仓库中的技能"""
        items = []
        
        try:
            # 检查是否是本地Git仓库
            if Path(path).exists() and (Path(path) / '.git').exists():
                # 本地仓库
                items.extend(await self._discover_local_repo(path))
            else:
                # 远程仓库,需要克隆
                cloned_path = await self._clone_repository(path)
                if cloned_path:
                    items.extend(await self._discover_local_repo(cloned_path))
        
        except Exception as e:
            logger.error(f"Error discovering Git repository {path}: {e}")
        
        return items
    
    async def _discover_local_repo(self, repo_path: str) -> List[DiscoveryItem]:
        """发现本地Git仓库中的技能"""
        items = []
        
        # 使用文件系统发现器扫描仓库
        fs_discovery = FileSystemDiscovery(recursive=True)
        
        # 扫描技能文件
        skill_items = await fs_discovery.discover(repo_path)
        
        for item in skill_items:
            # 添加Git特定元数据
            item.metadata.update({
                'source_type': 'git',
                'repository': repo_path
            })
            items.append(item)
        
        return items
    
    async def _clone_repository(self, repo_url: str) -> Optional[str]:
        """克隆Git仓库"""
        import subprocess
        import uuid
        
        # 生成唯一的目录名
        repo_name = repo_url.split('/')[-1].replace('.git', '')
        clone_dir = Path(self.clone_temp_dir) / f"{repo_name}_{uuid.uuid4().hex[:8]}"
        
        try:
            # 克隆仓库
            cmd = ['git', 'clone', '--depth', '1', repo_url, str(clone_dir)]
            process = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            
            stdout, stderr = await process.communicate()
            
            if process.returncode == 0:
                logger.info(f"Cloned repository {repo_url} to {clone_dir}")
                return str(clone_dir)
            else:
                logger.error(f"Failed to clone repository {repo_url}: {stderr.decode()}")
                return None
        
        except Exception as e:
            logger.error(f"Error cloning repository {repo_url}: {e}")
            return None

class RemoteRegistryDiscovery(DiscoveryStrategy):
    """远程注册表发现策略"""
    
    def __init__(self, 
                 registry_url: str = "https://registry.opencode.ai",
                 priority: int = 30):
        super().__init__(priority)
        self.registry_url = registry_url.rstrip('/')
        self.api_client = None
    
    async def can_handle(self, path: str) -> bool:
        """检查是否能处理该路径"""
        return path.startswith(self.registry_url) or path == 'registry'
    
    async def discover(self, path: str) -> List[DiscoveryItem]:
        """发现远程注册表中的技能"""
        items = []
        
        try:
            if path == 'registry':
                # 发现所有技能
                skills = await self._fetch_all_skills()
            else:
                # 特定注册表URL
                skills = await self._fetch_from_url(path)
            
            for skill_data in skills:
                item = DiscoveryItem(
                    path=f"{self.registry_url}/skills/{skill_data['id']}",
                    type='remote',
                    discovered_at=datetime.now(),
                    metadata=skill_data,
                    priority=self.priority
                )
                items.append(item)
        
        except Exception as e:
            logger.error(f"Error discovering from remote registry {path}: {e}")
        
        return items
    
    async def _fetch_all_skills(self) -> List[Dict[str, Any]]:
        """获取所有技能"""
        # 这里实现API调用
        # 实际实现应该使用HTTP客户端
        import aiohttp
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(f"{self.registry_url}/api/v1/skills") as response:
                    if response.status == 200:
                        data = await response.json()
                        return data.get('skills', [])
        
        except Exception as e:
            logger.error(f"Error fetching skills from registry: {e}")
        
        return []
    
    async def _fetch_from_url(self, url: str) -> List[Dict[str, Any]]:
        """从特定URL获取技能"""
        # 实现特定URL的获取逻辑
        return []

class DiscoveryEngine:
    """发现引擎"""
    
    def __init__(self, max_concurrent: int = 5):
        self.strategies: List[DiscoveryStrategy] = []
        self.max_concurrent = max_concurrent
        self._semaphore = asyncio.Semaphore(max_concurrent)
        
        # 注册默认策略
        self._register_default_strategies()
    
    def _register_default_strategies(self):
        """注册默认策略"""
        self.register_strategy(FileSystemDiscovery())
        self.register_strategy(GitRepositoryDiscovery())
        self.register_strategy(RemoteRegistryDiscovery())
    
    def register_strategy(self, strategy: DiscoveryStrategy):
        """注册发现策略"""
        self.strategies.append(strategy)
        # 按优先级排序
        self.strategies.sort(key=lambda s: s.get_priority(), reverse=True)
    
    def unregister_strategy(self, strategy_name: str):
        """取消注册发现策略"""
        self.strategies = [s for s in self.strategies if s.name != strategy_name]
    
    async def discover(self, 
                      paths: List[str],
                      recursive: bool = True) -> List[DiscoveryItem]:
        """
        发现技能
        
        Args:
            paths: 要扫描的路径列表
            recursive: 是否递归扫描
            
        Returns:
            发现项列表
        """
        all_items = []
        
        # 为每个路径创建发现任务
        tasks = []
        for path in paths:
            task = asyncio.create_task(
                self._discover_path(path, recursive)
            )
            tasks.append(task)
        
        # 等待所有任务完成
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 收集结果
        for result in results:
            if isinstance(result, Exception):
                logger.error(f"Discovery error: {result}")
            elif isinstance(result, list):
                all_items.extend(result)
        
        # 去重(基于路径)
        unique_items = {}
        for item in all_items:
            if item.path not in unique_items or item.priority > unique_items[item.path].priority:
                unique_items[item.path] = item
        
        return list(unique_items.values())
    
    async def _discover_path(self, path: str, recursive: bool) -> List[DiscoveryItem]:
        """发现单个路径"""
        items = []
        
        # 查找能够处理该路径的策略
        for strategy in self.strategies:
            async with self._semaphore:
                if await strategy.can_handle(path):
                    try:
                        discovered = await strategy.discover(path)
                        items.extend(discovered)
                        break  # 使用第一个能处理的策略
                    except Exception as e:
                        logger.error(f"Strategy {strategy.name} failed for {path}: {e}")
        
        return items
    
    async def watch(self, 
                   path: str,
                   callback: callable,
                   poll_interval: int = 5) -> 'WatchHandle':
        """
        监视路径变化
        
        Args:
            path: 要监视的路径
            callback: 变化时的回调函数
            poll_interval: 轮询间隔(秒)
            
        Returns:
            监视句柄
        """
        from .watcher import DirectoryWatcher
        
        watcher = DirectoryWatcher(
            path=path,
            callback=callback,
            poll_interval=poll_interval
        )
        
        # 启动监视
        await watcher.start()
        
        return WatchHandle(watcher)
    
    async def batch_discover(self, 
                           config: Dict[str, Any]) -> Dict[str, List[DiscoveryItem]]:
        """
        批量发现
        
        Args:
            config: 配置字典
            
        Returns:
            按类型分组的结果
        """
        results = {}
        
        # 解析配置
        paths = config.get('paths', [])
        strategies = config.get('strategies', ['filesystem', 'git', 'remote'])
        
        # 过滤策略
        filtered_strategies = [
            s for s in self.strategies 
            if s.name.lower() in [strat.lower() for strat in strategies]
        ]
        
        # 使用过滤后的策略进行发现
        original_strategies = self.strategies
        self.strategies = filtered_strategies
        
        try:
            # 执行发现
            items = await self.discover(paths)
            
            # 按类型分组
            for item in items:
                if item.type not in results:
                    results[item.type] = []
                results[item.type].append(item)
        
        finally:
            # 恢复原始策略
            self.strategies = original_strategies
        
        return results

@dataclass
class WatchHandle:
    """监视句柄"""
    watcher: Any
    
    async def stop(self):
        """停止监视"""
        await self.watcher.stop()

5. 数据模型定义

# registry/models.py
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
from enum import Enum

@dataclass
class SearchHit:
    """搜索结果命中"""
    skill_id: str
    skill: 'RegisteredSkill'
    score: float
    highlights: Dict[str, List[str]] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'skill_id': self.skill_id,
            'score': self.score,
            'highlights': self.highlights,
            'skill': self.skill.to_dict() if self.skill else None
        }

@dataclass
class SearchResult:
    """搜索结果"""
    hits: List[SearchHit]
    total_hits: int
    aggregations: Dict[str, Any] = field(default_factory=dict)
    suggestions: List['Suggestion'] = field(default_factory=list)
    max_score: float = 0.0
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'hits': [hit.to_dict() for hit in self.hits],
            'total_hits': self.total_hits,
            'max_score': self.max_score,
            'aggregations': self.aggregations,
            'suggestions': [s.to_dict() for s in self.suggestions]
        }

@dataclass
class Suggestion:
    """搜索建议"""
    text: str
    type: str  # 'skill_name', 'category', 'tag', 'description'
    score: float = 1.0
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'text': self.text,
            'type': self.type,
            'score': self.score
        }

@dataclass
class DiscoveryResult:
    """发现结果"""
    registered: List[str] = field(default_factory=list)  # 已注册的技能ID
    successful: int = 0
    failed: int = 0
    errors: List[Dict[str, Any]] = field(default_factory=list)
    total_found: int = 0
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'registered': self.registered,
            'successful': self.successful,
            'failed': self.failed,
            'errors': self.errors,
            'total_found': self.total_found
        }

@dataclass
class RegistryExport:
    """注册表导出"""
    data: Dict[str, Any]
    format: str = 'json'
    exported_at: datetime = field(default_factory=datetime.now)
    
    def save(self, path: str):
        """保存到文件"""
        import json
        from pathlib import Path
        
        export_data = {
            'data': self.data,
            'format': self.format,
            'exported_at': self.exported_at.isoformat(),
            'version': '1.0.0'
        }
        
        path_obj = Path(path)
        path_obj.parent.mkdir(parents=True, exist_ok=True)
        
        with open(path_obj, 'w', encoding='utf-8') as f:
            json.dump(export_data, f, indent=2, default=str)
    
    @classmethod
    def load(cls, path: str) -> 'RegistryExport':
        """从文件加载"""
        import json
        from pathlib import Path
        
        path_obj = Path(path)
        if not path_obj.exists():
            raise FileNotFoundError(f"Export file not found: {path}")
        
        with open(path_obj, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        return cls(
            data=data['data'],
            format=data.get('format', 'json'),
            exported_at=datetime.fromisoformat(data['exported_at'])
        )

三、配置与使用

1. 配置文件示例

# config/registry_config.yaml
registry:
  # 存储设置
  storage:
    path: "~/.opencode/registry"
    backup_count: 5
    auto_save: true
    save_interval: 300  # 秒
  
  # 发现设置
  discovery:
    paths:
      - "~/.opencode/skills"
      - "~/.claude/skills"
      - "./skills"
      - "./.opencode/skills"
      - "https://github.com/opencode-skills/official-skills"
    
    strategies:
      - filesystem
      - git
      - remote
    
    recursive: true
    watch_for_changes: true
    watch_interval: 30
  
  # 索引设置
  indexing:
    fulltext_enabled: true
    category_hierarchy_enabled: true
    tag_analysis_enabled: true
    
    # 全文索引配置
    fulltext:
      analyzer: "standard"
      stopwords: ["a", "an", "the", "and", "or", "but"]
      boost_fields:
        name: 2.0
        description: 1.5
        tags: 1.2
        categories: 1.0
  
  # 缓存设置
  cache:
    enabled: true
    ttl: 300  # 秒
    max_size: 1000
    strategy: "lru"
  
  # 搜索设置
  search:
    default_limit: 20
    max_limit: 100
    suggest_enabled: true
    highlight_enabled: true
    
    # 权重设置
    weights:
      name: 10.0
      description: 5.0
      tags: 3.0
      categories: 2.0
      content: 1.0
  
  # 分类系统
  categories:
    predefined:
      - name: "git"
        description: "Git operations and workflows"
        parent: null
        
      - name: "docker"
        description: "Docker container operations"
        parent: null
        
      - name: "kubernetes"
        description: "Kubernetes cluster operations"
        parent: null
        
      - name: "ci-cd"
        description: "Continuous integration and deployment"
        parent: null
        children:
          - "jenkins"
          - "github-actions"
          - "gitlab-ci"
    
    allow_custom: true
    max_custom_categories: 10
  
  # 标签系统
  tags:
    max_per_skill: 15
    blacklist:
      - "deprecated"
      - "broken"
      - "test-only"
    
    suggestions:
      enabled: true
      min_length: 2
      max_suggestions: 10
  
  # 统计和监控
  monitoring:
    enabled: true
    track_usage: true
    report_interval: 3600  # 秒
    
    metrics:
      - "registrations"
      - "searches"
      - "executions"
      - "errors"

2. 使用示例

# examples/registry_usage.py
import asyncio
from pathlib import Path
import yaml

from registry.core import SkillRegistry, SkillSource
from registry.discovery.engine import DiscoveryEngine
from parsers.skill_parser import SkillDSLParser

async def main():
    """使用示例"""
    
    # 1. 创建注册表
    registry = SkillRegistry(
        storage_path=Path.home() / '.opencode' / 'registry_v2'
    )
    
    # 2. 加载配置
    config_path = Path('config/registry_config.yaml')
    if config_path.exists():
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        
        # 配置发现路径
        discovery_paths = config['registry']['discovery']['paths']
    else:
        discovery_paths = [
            str(Path.home() / '.opencode' / 'skills'),
            './skills'
        ]
    
    # 3. 发现并注册技能
    print("Discovering skills...")
    result = await registry.discover(discovery_paths)
    
    print(f"Discovery complete:")
    print(f"  Found: {result.total_found}")
    print(f"  Registered: {result.successful}")
    print(f"  Failed: {result.failed}")
    
    if result.errors:
        print(f"  Errors: {len(result.errors)}")
        for error in result.errors[:3]:  # 显示前3个错误
            print(f"    - {error['path']}: {error['error']}")
    
    # 4. 查看统计
    stats = registry.get_stats()
    print(f"\nRegistry statistics:")
    print(f"  Total skills: {stats['total_skills']}")
    print(f"  Categories: {stats['categories_count']}")
    print(f"  Tags: {stats['tags_count']}")
    
    # 5. 搜索示例
    print("\nSearching for 'git' skills...")
    search_result = registry.search("git", limit=5)
    
    print(f"Found {search_result.total_hits} skills:")
    for i, hit in enumerate(search_result.hits, 1):
        skill = hit.skill
        print(f"  {i}. {skill.skill.metadata.name} v{skill.skill.metadata.version}")
        print(f"     {skill.skill.metadata.description}")
        if skill.tags:
            print(f"     Tags: {', '.join(sorted(skill.tags)[:5])}")
        print()
    
    # 6. 按分类浏览
    print("Categories available:")
    categories = registry.get_all_categories()
    for category in sorted(categories)[:10]:  # 显示前10个分类
        skills = registry.get_by_category(category)
        print(f"  - {category}: {len(skills)} skills")
    
    # 7. 按标签浏览
    print("\nPopular tags:")
    popular_tags = registry.tag_index.get_popular_tags(limit=10)
    for tag, count in popular_tags:
        print(f"  - #{tag}: {count} skills")
    
    # 8. 导出注册表
    print("\nExporting registry...")
    export = registry.export(include_skills=True)
    export.save('registry_export.json')
    print(f"Exported to registry_export.json")
    
    # 9. 手动注册技能示例
    print("\nManual registration example:")
    
    # 创建技能解析器
    parser = SkillDSLParser()
    
    # 解析技能文件
    skill_content = """---
metadata:
  name: manual-test-skill
  version: 1.0.0
  description: A manually registered test skill
  categories: [test, example]
  tags: [manual, test, example]
  author: Test User
  created_at: "2024-01-01T00:00:00Z"
  
steps:
  - name: test_step
    type: command
    description: Test step
    parameters:
      command: echo
      args: ["Hello from manual skill!"]
---
"""
    
    skill = parser.parse(skill_content)
    
    # 注册技能
    registered = registry.register(
        skill=skill,
        source_path="/manual/test.skill.yaml",
        source_type=SkillSource.USER
    )
    
    print(f"Manually registered: {registered.skill.metadata.name}")
    
    # 10. 刷新注册表(重新索引)
    print("\nRefreshing registry...")
    refresh_result = registry.refresh()
    print(f"Reindexed: {refresh_result['reindexed']}")
    print(f"Removed: {refresh_result['removed']}")
    print(f"Errors: {refresh_result['errors']}")

if __name__ == "__main__":
    asyncio.run(main())

3. REST API 接口

# api/registry_api.py
from fastapi import FastAPI, HTTPException, Query, Depends
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Optional
import uvicorn

from registry.core import SkillRegistry, SkillStatus
from registry.models import SearchResult, RegistryExport

app = FastAPI(
    title="OpenCode Skill Registry API",
    description="API for managing and searching OpenCode skills",
    version="1.0.0"
)

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 全局注册表实例
_registry = None

def get_registry():
    """获取注册表实例"""
    global _registry
    if _registry is None:
        _registry = SkillRegistry()
    return _registry

@app.get("/")
async def root():
    """API根路径"""
    return {
        "service": "OpenCode Skill Registry",
        "version": "1.0.0",
        "endpoints": [
            "/skills",
            "/skills/search",
            "/categories",
            "/tags",
            "/stats"
        ]
    }

@app.get("/skills")
async def get_skills(
    registry: SkillRegistry = Depends(get_registry),
    category: Optional[str] = None,
    tag: Optional[str] = None,
    status: Optional[str] = None,
    limit: int = Query(20, ge=1, le=100),
    offset: int = Query(0, ge=0)
):
    """获取技能列表"""
    skills = []
    
    # 应用过滤器
    if category:
        skills = registry.get_by_category(category)
    elif tag:
        skills = registry.get_by_tag(tag)
    else:
        # 获取所有技能
        skills = list(registry._skills.values())
    
    # 状态过滤
    if status:
        try:
            status_enum = SkillStatus(status)
            skills = [s for s in skills if s.status == status_enum]
        except ValueError:
            raise HTTPException(status_code=400, detail=f"Invalid status: {status}")
    
    # 排序
    skills.sort(key=lambda s: s.skill.metadata.name.lower())
    
    # 分页
    start = offset
    end = start + limit
    paged_skills = skills[start:end]
    
    return {
        "skills": [skill.to_dict() for skill in paged_skills],
        "pagination": {
            "total": len(skills),
            "limit": limit,
            "offset": offset,
            "has_more": end < len(skills)
        }
    }

@app.get("/skills/{skill_id}")
async def get_skill(
    skill_id: str,
    registry: SkillRegistry = Depends(get_registry)
):
    """获取单个技能"""
    skill = registry.find_by_id(skill_id)
    if not skill:
        raise HTTPException(status_code=404, detail="Skill not found")
    
    return skill.to_dict()

@app.get("/skills/search")
async def search_skills(
    query: str = Query(..., description="搜索查询"),
    categories: Optional[List[str]] = Query(None),
    tags: Optional[List[str]] = Query(None),
    limit: int = Query(20, ge=1, le=100),
    offset: int = Query(0, ge=0),
    registry: SkillRegistry = Depends(get_registry)
):
    """搜索技能"""
    result = registry.search(
        query=query,
        categories=categories,
        tags=tags,
        limit=limit,
        offset=offset
    )
    
    return result.to_dict()

@app.get("/categories")
async def get_categories(
    registry: SkillRegistry = Depends(get_registry)
):
    """获取所有分类"""
    categories = registry.get_all_categories()
    
    # 获取分类统计
    category_stats = registry.category_index.get_category_stats()
    
    # 构建层次树
    hierarchy = registry.category_index.build_hierarchy_tree()
    
    return {
        "categories": sorted(categories),
        "statistics": category_stats,
        "hierarchy": hierarchy
    }

@app.get("/categories/{category}/skills")
async def get_skills_by_category(
    category: str,
    registry: SkillRegistry = Depends(get_registry)
):
    """获取分类下的技能"""
    skills = registry.get_by_category(category)
    
    return {
        "category": category,
        "skills": [skill.to_dict() for skill in skills],
        "count": len(skills)
    }

@app.get("/tags")
async def get_tags(
    registry: SkillRegistry = Depends(get_registry)
):
    """获取所有标签"""
    tags = registry.get_all_tags()
    
    # 获取标签云数据
    tag_cloud = registry.tag_index.get_tag_cloud_data()
    
    # 获取热门标签
    popular_tags = registry.tag_index.get_popular_tags(limit=50)
    
    return {
        "tags": sorted(tags),
        "tag_cloud": tag_cloud,
        "popular_tags": popular_tags
    }

@app.get("/tags/{tag}/skills")
async def get_skills_by_tag(
    tag: str,
    registry: SkillRegistry = Depends(get_registry)
):
    """获取标签下的技能"""
    skills = registry.get_by_tag(tag)
    
    # 获取相关标签
    related_tags = registry.tag_index.get_related_tags(tag)
    
    return {
        "tag": tag,
        "skills": [skill.to_dict() for skill in skills],
        "count": len(skills),
        "related_tags": list(related_tags)
    }

@app.get("/stats")
async def get_stats(
    registry: SkillRegistry = Depends(get_registry)
):
    """获取统计信息"""
    stats = registry.get_stats()
    
    # 添加详细统计
    detailed_stats = {
        **stats,
        "category_distribution": registry.category_index.get_category_stats(),
        "tag_distribution": dict(registry.tag_index.get_popular_tags(limit=20))
    }
    
    return detailed_stats

@app.post("/discover")
async def discover_skills(
    paths: List[str],
    recursive: bool = True,
    registry: SkillRegistry = Depends(get_registry)
):
    """发现技能"""
    result = await registry.discover(paths, recursive=recursive)
    
    return {
        "action": "discover",
        "paths": paths,
        "result": result.to_dict()
    }

@app.post("/refresh")
async def refresh_registry(
    registry: SkillRegistry = Depends(get_registry)
):
    """刷新注册表"""
    result = registry.refresh()
    
    return {
        "action": "refresh",
        "result": result
    }

@app.get("/export")
async def export_registry(
    include_skills: bool = False,
    registry: SkillRegistry = Depends(get_registry)
):
    """导出注册表"""
    export = registry.export(include_skills=include_skills)
    
    return export.data

@app.post("/import")
async def import_registry(
    export_data: dict,
    registry: SkillRegistry = Depends(get_registry)
):
    """导入注册表"""
    export = RegistryExport(data=export_data)
    result = registry.import_export(export)
    
    return {
        "action": "import",
        "result": result
    }

@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        log_level="info"
    )

四、测试方案

1. 单元测试

# tests/test_registry.py
import pytest
import tempfile
import shutil
from pathlib import Path
import asyncio

from registry.core import SkillRegistry, SkillSource
from parsers.skill_parser import SkillDSLParser

class TestSkillRegistry:
    
    @pytest.fixture
    def temp_dir(self):
        """创建临时目录"""
        dir_path = tempfile.mkdtemp()
        yield Path(dir_path)
        shutil.rmtree(dir_path)
    
    @pytest.fixture
    def skill_parser(self):
        """创建技能解析器"""
        return SkillDSLParser()
    
    @pytest.fixture
    def sample_skill_content(self):
        """示例技能内容"""
        return """---
metadata:
  name: test-skill
  version: 1.0.0
  description: A test skill for unit testing
  categories: [test, unit]
  tags: [testing, unit-test]
  author: Test User

steps:
  - name: test_step
    type: command
    description: Test step
    parameters:
      command: echo
      args: ["Hello, World!"]
---
"""
    
    @pytest.fixture
    def registry(self, temp_dir):
        """创建注册表"""
        storage_path = temp_dir / "registry"
        return SkillRegistry(storage_path=storage_path)
    
    @pytest.mark.asyncio
    async def test_register_skill(self, registry, skill_parser, sample_skill_content):
        """测试注册技能"""
        # 解析技能
        skill = skill_parser.parse(sample_skill_content)
        
        # 注册技能
        registered = registry.register(
            skill=skill,
            source_path="/test/path.skill.yaml",
            source_type=SkillSource.LOCAL
        )
        
        # 验证注册
        assert registered.id is not None
        assert registered.skill.metadata.name == "test-skill"
        assert registered.source_type == SkillSource.LOCAL
        
        # 验证查找
        found = registry.find_by_id(registered.id)
        assert found is not None
        assert found.skill.metadata.name == "test-skill"
    
    @pytest.mark.asyncio
    async def test_find_by_name(self, registry, skill_parser, sample_skill_content):
        """测试按名称查找"""
        # 注册技能
        skill = skill_parser.parse(sample_skill_content)
        registered = registry.register(
            skill=skill,
            source_path="/test/path.skill.yaml",
            source_type=SkillSource.LOCAL
        )
        
        # 按名称查找
        results = registry.find_by_name("test-skill")
        assert len(results) == 1
        assert results[0].id == registered.id
        
        # 大小写不敏感
        results = registry.find_by_name("TEST-SKILL")
        assert len(results) == 1
    
    @pytest.mark.asyncio
    async def test_unregister_skill(self, registry, skill_parser, sample_skill_content):
        """测试取消注册技能"""
        # 注册技能
        skill = skill_parser.parse(sample_skill_content)
        registered = registry.register(
            skill=skill,
            source_path="/test/path.skill.yaml",
            source_type=SkillSource.LOCAL
        )
        
        # 取消注册
        success = registry.unregister(registered.id)
        assert success is True
        
        # 验证已取消注册
        found = registry.find_by_id(registered.id)
        assert found is None
        
        # 验证名称索引
        results = registry.find_by_name("test-skill")
        assert len(results) == 0
    
    @pytest.mark.asyncio
    async def test_category_index(self, registry, skill_parser, sample_skill_content):
        """测试分类索引"""
        # 注册技能
        skill = skill_parser.parse(sample_skill_content)
        registered = registry.register(
            skill=skill,
            source_path="/test/path.skill.yaml",
            source_type=SkillSource.LOCAL
        )
        
        # 验证分类
        categories = registry.get_all_categories()
        assert "test" in categories
        assert "unit" in categories
        
        # 验证按分类获取技能
        test_skills = registry.get_by_category("test")
        assert len(test_skills) == 1
        assert test_skills[0].id == registered.id
    
    @pytest.mark.asyncio
    async def test_tag_index(self, registry, skill_parser, sample_skill_content):
        """测试标签索引"""
        # 注册技能
        skill = skill_parser.parse(sample_skill_content)
        registered = registry.register(
            skill=skill,
            source_path="/test/path.skill.yaml",
            source_type=SkillSource.LOCAL
        )
        
        # 验证标签
        tags = registry.get_all_tags()
        assert "testing" in tags
        assert "unit-test" in tags
        
        # 验证按标签获取技能
        testing_skills = registry.get_by_tag("testing")
        assert len(testing_skills) == 1
        assert testing_skills[0].id == registered.id
    
    @pytest.mark.asyncio
    async def test_search(self, registry, skill_parser, sample_skill_content):
        """测试搜索"""
        # 注册技能
        skill = skill_parser.parse(sample_skill_content)
        registry.register(
            skill=skill,
            source_path="/test/path.skill.yaml",
            source_type=SkillSource.LOCAL
        )
        
        # 搜索
        result = registry.search("test", limit=10)
        assert result.total_hits >= 1
        
        # 验证搜索结果
        assert len(result.hits) >= 1
        assert result.hits[0].skill.skill.metadata.name == "test-skill"
    
    @pytest.mark.asyncio
    async def test_discovery(self, registry, temp_dir, skill_parser, sample_skill_content):
        """测试技能发现"""
        # 创建技能文件
        skills_dir = temp_dir / "skills"
        skills_dir.mkdir()
        
        skill_file = skills_dir / "test.skill.yaml"
        skill_file.write_text(sample_skill_content)
        
        # 发现技能
        result = await registry.discover([str(skills_dir)])
        
        # 验证发现结果
        assert result.successful >= 1
        assert result.total_found >= 1
        
        # 验证已注册
        stats = registry.get_stats()
        assert stats['total_skills'] >= 1
    
    @pytest.mark.asyncio
    async def test_export_import(self, registry, skill_parser, sample_skill_content):
        """测试导出和导入"""
        # 注册技能
        skill = skill_parser.parse(sample_skill_content)
        registered = registry.register(
            skill=skill,
            source_path="/test/path.skill.yaml",
            source_type=SkillSource.LOCAL
        )
        
        # 导出
        export = registry.export(include_skills=True)
        
        # 创建新注册表并导入
        new_registry = SkillRegistry()
        import_result = new_registry.import_export(export)
        
        # 验证导入结果
        assert import_result['imported'] >= 1
        
        # 验证技能已导入
        imported = new_registry.find_by_name("test-skill")
        assert len(imported) >= 1
    
    def test_stats(self, registry, skill_parser, sample_skill_content):
        """测试统计"""
        # 初始统计
        initial_stats = registry.get_stats()
        assert initial_stats['total_skills'] == 0
        
        # 注册技能
        skill = skill_parser.parse(sample_skill_content)
        registry.register(
            skill=skill,
            source_path="/test/path.skill.yaml",
            source_type=SkillSource.LOCAL
        )
        
        # 验证更新后的统计
        updated_stats = registry.get_stats()
        assert updated_stats['total_skills'] == 1
    
    @pytest.mark.asyncio
    async def test_refresh(self, registry, skill_parser, sample_skill_content, temp_dir):
        """测试刷新"""
        # 创建技能文件
        skill_file = temp_dir / "test.skill.yaml"
        skill_file.write_text(sample_skill_content)
        
        # 注册技能
        skill = skill_parser.parse(sample_skill_content)
        registered = registry.register(
            skill=skill,
            source_path=str(skill_file),
            source_type=SkillSource.LOCAL
        )
        
        # 修改技能文件
        modified_content = sample_skill_content.replace(
            "A test skill for unit testing",
            "A modified test skill"
        )
        skill_file.write_text(modified_content)
        
        # 刷新注册表
        refresh_result = registry.refresh()
        
        # 验证刷新结果
        assert refresh_result['reindexed'] >= 1
        
        # 验证技能已更新
        updated = registry.find_by_id(registered.id)
        assert updated.skill.metadata.description == "A modified test skill"

if __name__ == "__main__":
    pytest.main([__file__, "-v"])

五、部署方案

1. Docker容器化

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    git \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY registry/ ./registry/
COPY parsers/ ./parsers/
COPY api/ ./api/

# 创建数据目录
RUN mkdir -p /data/registry
RUN mkdir -p /data/skills

# 环境变量
ENV REGISTRY_STORAGE_PATH=/data/registry
ENV SKILL_PATHS=/data/skills
ENV PYTHONPATH=/app

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "-m", "api.registry_api"]

2. Docker Compose配置

# docker-compose.yml
version: '3.8'

services:
  registry-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - REGISTRY_STORAGE_PATH=/data/registry
      - SKILL_PATHS=/data/skills:/shared-skills
      - LOG_LEVEL=INFO
    volumes:
      - registry-data:/data/registry
      - shared-skills:/shared-skills
      - ./local-skills:/data/skills:ro
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  registry-worker:
    build: .
    command: python -m registry.worker
    environment:
      - REGISTRY_STORAGE_PATH=/data/registry
      - SKILL_PATHS=/data/skills:/shared-skills
      - REDIS_URL=redis://redis:6379/0
    volumes:
      - registry-data:/data/registry
      - shared-skills:/shared-skills
    depends_on:
      - registry-api
      - redis
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    command: redis-server --appendonly yes
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./ssl:/etc/nginx/ssl:ro
    depends_on:
      - registry-api
    restart: unless-stopped

volumes:
  registry-data:
  shared-skills:
  redis-data:

3. Kubernetes部署

# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: opencode-registry
  namespace: opencode
  labels:
    app: opencode-registry
spec:
  replicas: 3
  selector:
    matchLabels:
      app: opencode-registry
  template:
    metadata:
      labels:
        app: opencode-registry
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
    spec:
      containers:
      - name: registry-api
        image: opencode/registry:latest
        ports:
        - containerPort: 8000
        env:
        - name: REGISTRY_STORAGE_PATH
          value: /data/registry
        - name: SKILL_PATHS
          value: /data/skills:/shared-skills
        - name: LOG_LEVEL
          value: INFO
        - name: REDIS_URL
          value: redis://redis.opencode.svc.cluster.local:6379/0
        volumeMounts:
        - name: registry-data
          mountPath: /data/registry
        - name: shared-skills
          mountPath: /shared-skills
        - name: local-skills
          mountPath: /data/skills
          readOnly: true
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
      volumes:
      - name: registry-data
        persistentVolumeClaim:
          claimName: registry-data-pvc
      - name: shared-skills
        persistentVolumeClaim:
          claimName: shared-skills-pvc
      - name: local-skills
        configMap:
          name: builtin-skills

---
apiVersion: v1
kind: Service
metadata:
  name: registry-service
  namespace: opencode
spec:
  selector:
    app: opencode-registry
  ports:
  - port: 80
    targetPort: 8000
    protocol: TCP
    name: http
  - port: 443
    targetPort: 8000
    protocol: TCP
    name: https
  type: LoadBalancer

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: registry-hpa
  namespace: opencode
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: opencode-registry
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

六、性能优化建议

1. 缓存策略优化

# registry/cache/optimized_cache.py
import asyncio
import time
from typing import Any, Optional
from collections import OrderedDict
import threading

class LRUCache:
    """LRU缓存实现"""
    
    def __init__(self, max_size: int = 1000, ttl: int = 300):
        self.max_size = max_size
        self.ttl = ttl  # 秒
        self.cache = OrderedDict()
        self.timestamps = {}
        self.lock = threading.RLock()
    
    def get(self, key: str) -> Optional[Any]:
        """获取缓存项"""
        with self.lock:
            if key not in self.cache:
                return None
            
            # 检查是否过期
            if self._is_expired(key):
                self._remove(key)
                return None
            
            # 移动到最近使用
            value = self.cache.pop(key)
            self.cache[key] = value
            self.timestamps[key] = time.time()
            
            return value
    
    def set(self, key: str, value: Any):
        """设置缓存项"""
        with self.lock:
            # 如果已存在,先移除
            if key in self.cache:
                self._remove(key)
            
            # 检查是否达到最大大小
            if len(self.cache) >= self.max_size:
                # 移除最旧的项
                oldest_key = next(iter(self.cache))
                self._remove(oldest_key)
            
            # 添加新项
            self.cache[key] = value
            self.timestamps[key] = time.time()
    
    def _remove(self, key: str):
        """移除缓存项"""
        if key in self.cache:
            del self.cache[key]
        if key in self.timestamps:
            del self.timestamps[key]
    
    def _is_expired(self, key: str) -> bool:
        """检查是否过期"""
        if key not in self.timestamps:
            return True
        
        elapsed = time.time() - self.timestamps[key]
        return elapsed > self.ttl
    
    def clear(self):
        """清空缓存"""
        with self.lock:
            self.cache.clear()
            self.timestamps.clear()
    
    def cleanup(self):
        """清理过期项"""
        with self.lock:
            expired_keys = []
            for key in list(self.cache.keys()):
                if self._is_expired(key):
                    expired_keys.append(key)
            
            for key in expired_keys:
                self._remove(key)

2. 异步批处理

# registry/async/batch_processor.py
import asyncio
from typing import List, Any, Callable, Dict
from concurrent.futures import ThreadPoolExecutor
import time

class BatchProcessor:
    """批处理器"""
    
    def __init__(self, 
                 batch_size: int = 100,
                 max_workers: int = 4,
                 flush_interval: float = 1.0):
        self.batch_size = batch_size
        self.max_workers = max_workers
        self.flush_interval = flush_interval
        
        self.buffer: List[Any] = []
        self.buffer_lock = asyncio.Lock()
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.flush_task = None
        self.is_running = False
    
    async def start(self):
        """启动批处理器"""
        if self.is_running:
            return
        
        self.is_running = True
        self.flush_task = asyncio.create_task(self._flush_loop())
    
    async def stop(self):
        """停止批处理器"""
        if not self.is_running:
            return
        
        self.is_running = False
        if self.flush_task:
            self.flush_task.cancel()
            try:
                await self.flush_task
            except asyncio.CancelledError:
                pass
        
        # 刷新剩余数据
        await self.flush()
        
        # 关闭执行器
        self.executor.shutdown(wait=True)
    
    async def add(self, item: Any):
        """添加项目到缓冲区"""
        async with self.buffer_lock:
            self.buffer.append(item)
            
            # 如果达到批处理大小,立即刷新
            if len(self.buffer) >= self.batch_size:
                await self.flush()
    
    async def flush(self):
        """刷新缓冲区"""
        async with self.buffer_lock:
            if not self.buffer:
                return
            
            # 获取当前缓冲区内容
            batch = self.buffer.copy()
            self.buffer.clear()
        
        # 在单独的线程中处理批处理
        await asyncio.get_event_loop().run_in_executor(
            self.executor,
            self._process_batch,
            batch
        )
    
    def _process_batch(self, batch: List[Any]):
        """处理批处理(在单独线程中运行)"""
        # 这里可以添加实际的批处理逻辑
        # 例如:批量索引、批量存储等
        pass
    
    async def _flush_loop(self):
        """定期刷新循环"""
        while self.is_running:
            await asyncio.sleep(self.flush_interval)
            await self.flush()

总结

这个技能注册表方案提供了完整的技能发现、索引和分类系统,具有以下特点:

核心功能:

  1. 灵活的发现机制:支持文件系统、Git仓库、远程注册表多种发现方式
  2. 强大的索引系统:分类索引、标签索引、全文搜索三位一体
  3. 智能分类和标签:支持层次分类、标签云、相关标签推荐
  4. 高性能搜索:支持全文搜索、过滤、聚合、建议
  5. 实时更新:支持文件监视和自动刷新

高级特性:

  1. 持久化存储:技能注册表状态可持久化保存和恢复
  2. 导入导出:支持完整的注册表导入导出
  3. 统计监控:详细的统计信息和监控指标
  4. REST API:完整的HTTP API接口
  5. 可扩展架构:模块化设计,易于扩展新的发现策略和索引类型

生产就绪:

  1. 线程安全:所有操作都经过适当的同步
  2. 错误处理:完善的错误处理和日志记录
  3. 性能优化:缓存、批处理、异步处理
  4. 容器化部署:支持Docker和Kubernetes部署
  5. 配置驱动:灵活的配置文件支持

这个方案可以满足OpenCode技能系统的基础需求,并为未来的扩展提供了坚实的基础。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐