OpenCode技能系统基础:技能注册表详细方案
OpenCode技能注册表设计方案摘要(149字): 本文提出了OpenCode技能注册表的核心设计方案,通过UML建模详细阐述了系统架构。类图展示了SkillRegistry、Skill、SkillMetadata等核心组件及其关系,其中注册表负责管理技能发现、注册和检索功能。序列图描述了从技能发现到注册的完整流程,状态图则明确了技能生命周期状态转换。技术实现部分包含Python代码示例,重点介
·
正在设计OpenCode的技能注册表,这是一个关键组件,负责技能的发现、索引和分类。以下是详细的方案,包括UML建模和具体实现。
一、UML建模
1. 核心类图
2. 序列图:技能发现与注册
3. 状态图:技能生命周期
二、详细设计方案
1. 技能注册表核心实现
# skill_registry.py
import os
import logging
from pathlib import Path
from typing import Dict, List, Set, Optional, Any
from dataclasses import dataclass, field
import hashlib
import json
from datetime import datetime
import fnmatch
from .skill_parser import SkillDSLParser, Skill, SkillMetadata
logger = logging.getLogger(__name__)
@dataclass
class SkillIndexEntry:
"""技能索引条目"""
skill_id: str
name: str
description: str
categories: List[str]
tags: List[str]
metadata: Dict[str, Any]
skill_path: str
last_updated: datetime
class SearchEngine:
"""搜索引擎,用于技能全文检索"""
def __init__(self):
self.index = {} # 简化实现,实际可以使用Whoosh、Elasticsearch等
def index_skill(self, skill: Skill, skill_path: str):
"""索引技能"""
# 这里实现简单的倒排索引
terms = self._extract_terms(skill)
for term in terms:
if term not in self.index:
self.index[term] = set()
self.index[term].add(skill.metadata.name)
def remove_skill(self, skill_name: str):
"""从索引中移除技能"""
# 遍历索引,移除包含该技能的项
for term, skills in self.index.items():
if skill_name in skills:
skills.remove(skill_name)
def search(self, query: str) -> List[str]:
"""搜索技能,返回技能名称列表"""
query_terms = self._extract_terms_from_query(query)
results = set()
for term in query_terms:
if term in self.index:
results.update(self.index[term])
return list(results)
def _extract_terms(self, skill: Skill) -> List[str]:
"""从技能中提取索引项"""
terms = []
# 从名称中提取
terms.extend(skill.metadata.name.lower().split())
# 从描述中提取
terms.extend(skill.metadata.description.lower().split())
# 从标签中提取
terms.extend(tag.lower() for tag in skill.metadata.tags)
# 从分类中提取
terms.extend(category.lower() for category in skill.metadata.categories)
# 去重并过滤停用词
terms = set(terms)
stop_words = {'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for'}
terms = [term for term in terms if term not in stop_words and len(term) > 2]
return terms
def _extract_terms_from_query(self, query: str) -> List[str]:
"""从查询中提取搜索项"""
return [term.lower() for term in query.split() if len(term) > 2]
class SkillDiscovery:
"""技能发现机制"""
def __init__(self, parser: SkillDSLParser):
self.parser = parser
self.search_paths = []
def add_search_path(self, path: str):
"""添加搜索路径"""
self.search_paths.append(path)
def discover(self) -> List[Skill]:
"""发现所有技能"""
skills = []
for search_path in self.search_paths:
if not os.path.exists(search_path):
logger.warning(f"Search path does not exist: {search_path}")
continue
skills.extend(self.scan_directory(search_path))
return skills
def scan_directory(self, directory: str) -> List[Skill]:
"""扫描目录中的技能"""
skills = []
for root, dirs, files in os.walk(directory):
# 跳过隐藏目录
dirs[:] = [d for d in dirs if not d.startswith('.')]
for file in files:
if self._is_skill_file(file):
skill_path = os.path.join(root, file)
try:
skill = self.load_skill(skill_path)
skills.append(skill)
except Exception as e:
logger.error(f"Failed to load skill from {skill_path}: {e}")
return skills
def load_skill(self, skill_path: str) -> Skill:
"""加载单个技能"""
# 使用解析器解析技能文件
skill = self.parser.parse_file(skill_path)
# 设置技能路径
skill.skill_path = skill_path
return skill
def _is_skill_file(self, filename: str) -> bool:
"""判断是否为技能文件"""
skill_patterns = ['*.skill.yaml', '*.skill.yml', 'skill.yaml', 'skill.yml']
return any(fnmatch.fnmatch(filename, pattern) for pattern in skill_patterns)
class SkillRegistry:
"""技能注册表"""
def __init__(self):
self.skills: Dict[str, Skill] = {}
self.category_index: Dict[str, Set[str]] = {}
self.tag_index: Dict[str, Set[str]] = {}
self.search_engine = SearchEngine()
self.discovery = SkillDiscovery(SkillDSLParser())
def discover_skills(self, paths: List[str]):
"""发现并注册技能"""
for path in paths:
self.discovery.add_search_path(path)
skills = self.discovery.discover()
for skill in skills:
self.register_skill(skill)
logger.info(f"Discovered and registered {len(skills)} skills")
def register_skill(self, skill: Skill):
"""注册单个技能"""
skill_id = self._generate_skill_id(skill)
if skill_id in self.skills:
logger.warning(f"Skill {skill.metadata.name} already registered, updating...")
self.unregister_skill(skill_id)
# 注册到主字典
self.skills[skill_id] = skill
# 更新分类索引
for category in skill.metadata.categories:
if category not in self.category_index:
self.category_index[category] = set()
self.category_index[category].add(skill_id)
# 更新标签索引
for tag in skill.metadata.tags:
if tag not in self.tag_index:
self.tag_index[tag] = set()
self.tag_index[tag].add(skill_id)
# 更新搜索索引
self.search_engine.index_skill(skill, skill.skill_path)
logger.info(f"Registered skill: {skill.metadata.name} (id: {skill_id})")
def unregister_skill(self, skill_id: str):
"""取消注册技能"""
if skill_id not in self.skills:
return
skill = self.skills[skill_id]
# 从分类索引中移除
for category in skill.metadata.categories:
if category in self.category_index and skill_id in self.category_index[category]:
self.category_index[category].remove(skill_id)
if not self.category_index[category]:
del self.category_index[category]
# 从标签索引中移除
for tag in skill.metadata.tags:
if tag in self.tag_index and skill_id in self.tag_index[tag]:
self.tag_index[tag].remove(skill_id)
if not self.tag_index[tag]:
del self.tag_index[tag]
# 从搜索索引中移除
self.search_engine.remove_skill(skill.metadata.name)
# 从主字典中移除
del self.skills[skill_id]
logger.info(f"Unregistered skill: {skill.metadata.name}")
def find_skill(self, skill_id: str) -> Optional[Skill]:
"""根据ID查找技能"""
return self.skills.get(skill_id)
def find_skill_by_name(self, name: str) -> Optional[Skill]:
"""根据名称查找技能"""
for skill in self.skills.values():
if skill.metadata.name == name:
return skill
return None
def search_skills(self, query: str) -> List[Skill]:
"""搜索技能"""
skill_names = self.search_engine.search(query)
skills = []
for skill_name in skill_names:
skill = self.find_skill_by_name(skill_name)
if skill:
skills.append(skill)
return skills
def get_skills_by_category(self, category: str) -> List[Skill]:
"""根据分类获取技能"""
skill_ids = self.category_index.get(category, set())
return [self.skills[skill_id] for skill_id in skill_ids if skill_id in self.skills]
def get_skills_by_tag(self, tag: str) -> List[Skill]:
"""根据标签获取技能"""
skill_ids = self.tag_index.get(tag, set())
return [self.skills[skill_id] for skill_id in skill_ids if skill_id in self.skills]
def get_all_categories(self) -> Set[str]:
"""获取所有分类"""
return set(self.category_index.keys())
def get_all_tags(self) -> Set[str]:
"""获取所有标签"""
return set(self.tag_index.keys())
def get_skill_count(self) -> int:
"""获取技能总数"""
return len(self.skills)
def _generate_skill_id(self, skill: Skill) -> str:
"""生成技能ID"""
# 使用名称、版本和路径的哈希作为ID
content = f"{skill.metadata.name}:{skill.metadata.version}:{skill.skill_path}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
class SkillRegistryManager:
"""技能注册表管理器,提供高级功能"""
def __init__(self, registry: SkillRegistry):
self.registry = registry
def export_registry(self, export_path: str):
"""导出注册表信息"""
export_data = {
'exported_at': datetime.now().isoformat(),
'skill_count': self.registry.get_skill_count(),
'skills': []
}
for skill_id, skill in self.registry.skills.items():
skill_data = {
'id': skill_id,
'name': skill.metadata.name,
'version': skill.metadata.version,
'description': skill.metadata.description,
'categories': skill.metadata.categories,
'tags': skill.metadata.tags,
'skill_path': skill.skill_path,
'metadata': {
'author': skill.metadata.author,
'created_at': skill.metadata.created_at,
'updated_at': skill.metadata.updated_at,
'dependencies': skill.metadata.dependencies,
'required_permissions': skill.metadata.required_permissions
}
}
export_data['skills'].append(skill_data)
with open(export_path, 'w') as f:
json.dump(export_data, f, indent=2)
logger.info(f"Exported registry to {export_path}")
def import_registry(self, import_path: str):
"""导入注册表信息"""
if not os.path.exists(import_path):
logger.error(f"Import file not found: {import_path}")
return
with open(import_path, 'r') as f:
import_data = json.load(f)
# 这里可以根据导入数据重新注册技能
# 注意:这不会重新发现技能,只是从导出文件中恢复索引
logger.info(f"Imported registry from {import_path}")
def generate_report(self) -> Dict[str, Any]:
"""生成注册表报告"""
return {
'total_skills': self.registry.get_skill_count(),
'categories': {
category: len(skill_ids)
for category, skill_ids in self.registry.category_index.items()
},
'tags': {
tag: len(skill_ids)
for tag, skill_ids in self.registry.tag_index.items()
},
'skills_by_author': self._group_skills_by_author()
}
def _group_skills_by_author(self) -> Dict[str, int]:
"""按作者分组技能"""
authors = {}
for skill in self.registry.skills.values():
author = skill.metadata.author or 'Unknown'
authors[author] = authors.get(author, 0) + 1
return authors
# 单例模式,全局注册表实例
_registry_instance = None
def get_registry() -> SkillRegistry:
"""获取全局技能注册表实例"""
global _registry_instance
if _registry_instance is None:
_registry_instance = SkillRegistry()
return _registry_instance
2. 技能模型扩展
我们需要扩展之前定义的Skill模型,以包含分类和标签信息:
# skill_models.py
from typing import List, Dict, Optional
from pydantic import BaseModel, Field
class SkillMetadata(BaseModel):
"""技能元数据扩展"""
name: str
version: str
description: str
author: Optional[str] = None
categories: List[str] = Field(default_factory=list)
tags: List[str] = Field(default_factory=list)
created_at: Optional[str] = None
updated_at: Optional[str] = None
dependencies: Dict[str, str] = Field(default_factory=dict)
required_permissions: List[str] = Field(default_factory=list)
class Config:
json_schema_extra = {
"example": {
"name": "git-release",
"version": "1.0.0",
"description": "Automates semantic versioning and release process",
"author": "OpenCode Team",
"categories": ["git", "automation"],
"tags": ["release", "versioning", "ci-cd"],
"dependencies": {
"git": ">=2.25",
"gh": ">=2.0"
},
"required_permissions": ["file.read", "file.write", "network"]
}
}
3. 配置文件支持
我们可以通过配置文件来指定技能搜索路径和其他设置:
# opencode_skills_config.yaml
skill_registry:
# 技能搜索路径
search_paths:
- ~/.opencode/skills
- ~/.claude/skills
- ./skills
- ./opencode/skills
# 自动发现间隔(秒,0表示禁用自动发现)
auto_discovery_interval: 300
# 索引设置
index:
enable_fulltext_search: true
max_index_size: 10000
# 分类系统
categories:
predefined:
- git
- docker
- kubernetes
- aws
- azure
- gcp
- testing
- deployment
- monitoring
allow_custom: true
# 标签系统
tags:
max_per_skill: 10
blacklist: [ "deprecated", "experimental" ]
4. 自动发现与热重载
为了实现技能的自动发现和热重载,我们可以创建一个监控服务:
# skill_watcher.py
import time
import threading
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class SkillFileEventHandler(FileSystemEventHandler):
"""处理技能文件变化的事件处理器"""
def __init__(self, registry: SkillRegistry):
self.registry = registry
self.parser = SkillDSLParser()
def on_created(self, event):
if self._is_skill_file(event.src_path):
self._load_and_register_skill(event.src_path)
def on_modified(self, event):
if self._is_skill_file(event.src_path):
self._load_and_register_skill(event.src_path)
def on_deleted(self, event):
if self._is_skill_file(event.src_path):
self._unregister_skill_by_path(event.src_path)
def _is_skill_file(self, path: str) -> bool:
"""判断是否为技能文件"""
skill_patterns = ['*.skill.yaml', '*.skill.yml', 'skill.yaml', 'skill.yml']
path_obj = Path(path)
return any(path_obj.match(pattern) for pattern in skill_patterns)
def _load_and_register_skill(self, skill_path: str):
"""加载并注册技能"""
try:
skill = self.parser.parse_file(skill_path)
self.registry.register_skill(skill)
logger.info(f"Skill updated from file: {skill_path}")
except Exception as e:
logger.error(f"Failed to load skill from {skill_path}: {e}")
def _unregister_skill_by_path(self, skill_path: str):
"""根据路径取消注册技能"""
# 需要根据路径找到对应的技能ID
for skill_id, skill in self.registry.skills.items():
if hasattr(skill, 'skill_path') and skill.skill_path == skill_path:
self.registry.unregister_skill(skill_id)
break
class SkillWatcher:
"""技能文件监控器"""
def __init__(self, registry: SkillRegistry, watch_paths: List[str]):
self.registry = registry
self.watch_paths = watch_paths
self.observer = Observer()
self.event_handler = SkillFileEventHandler(registry)
self.is_running = False
def start(self):
"""启动监控"""
if self.is_running:
return
for path in self.watch_paths:
if os.path.exists(path):
self.observer.schedule(self.event_handler, path, recursive=True)
self.observer.start()
self.is_running = True
logger.info("Skill watcher started")
def stop(self):
"""停止监控"""
if self.is_running:
self.observer.stop()
self.observer.join()
self.is_running = False
logger.info("Skill watcher stopped")
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
5. 技能索引持久化
为了提高性能,我们可以将技能索引持久化到磁盘:
# skill_index_persistence.py
import pickle
import json
from datetime import datetime
from pathlib import Path
class PersistentSkillRegistry(SkillRegistry):
"""支持持久化的技能注册表"""
def __init__(self, persistence_path: str = "~/.opencode/registry"):
super().__init__()
self.persistence_path = Path(persistence_path).expanduser()
self.persistence_path.mkdir(parents=True, exist_ok=True)
def save(self):
"""保存注册表状态"""
state = {
'skills': {},
'category_index': self.category_index,
'tag_index': self.tag_index,
'saved_at': datetime.now().isoformat()
}
# 只保存技能元数据,不保存完整的技能对象
for skill_id, skill in self.skills.items():
state['skills'][skill_id] = {
'name': skill.metadata.name,
'version': skill.metadata.version,
'description': skill.metadata.description,
'categories': skill.metadata.categories,
'tags': skill.metadata.tags,
'skill_path': skill.skill_path,
'metadata': {
'author': skill.metadata.author,
'created_at': skill.metadata.created_at,
'updated_at': skill.metadata.updated_at
}
}
# 保存到文件
state_file = self.persistence_path / 'registry_state.json'
with open(state_file, 'w') as f:
json.dump(state, f, indent=2)
logger.info(f"Registry state saved to {state_file}")
def load(self):
"""加载注册表状态"""
state_file = self.persistence_path / 'registry_state.json'
if not state_file.exists():
logger.info("No saved registry state found")
return
with open(state_file, 'r') as f:
state = json.load(f)
# 重新发现技能并注册
# 注意:这里我们只从保存的状态中获取技能路径,然后重新加载技能
skill_paths = set()
for skill_id, skill_data in state['skills'].items():
skill_path = skill_data.get('skill_path')
if skill_path and os.path.exists(skill_path):
skill_paths.add(skill_path)
# 发现并注册技能
for skill_path in skill_paths:
try:
skill = self.discovery.load_skill(skill_path)
self.register_skill(skill)
except Exception as e:
logger.error(f"Failed to load skill from {skill_path}: {e}")
logger.info(f"Registry state loaded from {state_file}")
三、使用示例
1. 初始化注册表
# 初始化注册表
registry = SkillRegistry()
# 从配置文件中读取搜索路径
config = load_config('opencode_skills_config.yaml')
search_paths = config['skill_registry']['search_paths']
# 发现并注册技能
registry.discover_skills(search_paths)
# 获取注册表管理器
manager = SkillRegistryManager(registry)
# 生成报告
report = manager.generate_report()
print(f"Total skills: {report['total_skills']}")
print(f"Categories: {report['categories']}")
2. 搜索和使用技能
# 根据分类查找技能
git_skills = registry.get_skills_by_category('git')
for skill in git_skills:
print(f"Git skill: {skill.metadata.name} - {skill.metadata.description}")
# 根据标签查找技能
release_skills = registry.get_skills_by_tag('release')
for skill in release_skills:
print(f"Release skill: {skill.metadata.name}")
# 全文搜索
search_results = registry.search_skills('automated release')
for skill in search_results:
print(f"Found: {skill.metadata.name}")
# 执行技能
skill = registry.find_skill_by_name('git-release')
if skill:
context = ExecutionContext(project_path='./my-project')
result = skill.execute(context)
print(f"Execution result: {result}")
3. 监控技能文件变化
# 启动文件监控
with SkillWatcher(registry, search_paths):
# 在监控期间,技能文件的任何变化都会自动更新注册表
time.sleep(3600) # 运行1小时
4. 持久化注册表状态
# 使用持久化注册表
persistent_registry = PersistentSkillRegistry()
# 加载之前保存的状态
persistent_registry.load()
# 发现新技能
persistent_registry.discover_skills(search_paths)
# 保存当前状态
persistent_registry.save()
四、性能优化
1. 索引优化
class OptimizedSearchEngine(SearchEngine):
"""优化的搜索引擎"""
def __init__(self):
super().__init__()
self.skill_vectors = {} # 技能向量(用于相似性搜索)
self.tfidf_index = {} # TF-IDF索引
def index_skill(self, skill: Skill, skill_path: str):
"""索引技能,使用TF-IDF加权"""
# 计算TF-IDF权重
terms = self._extract_terms_with_frequency(skill)
# 更新文档频率
for term in terms:
if term not in self.tfidf_index:
self.tfidf_index[term] = {'df': 0, 'skills': {}}
self.tfidf_index[term]['skills'][skill.metadata.name] = terms[term]
# 更新文档频率计数
for term in set(terms.keys()):
self.tfidf_index[term]['df'] += 1
# 同时维护简单索引(用于快速查找)
super().index_skill(skill, skill_path)
def search(self, query: str, use_tfidf: bool = True) -> List[str]:
"""搜索技能,支持TF-IDF排序"""
if not use_tfidf:
return super().search(query)
query_terms = self._extract_terms_from_query(query)
# 计算技能得分
skill_scores = {}
total_skills = len(self.skill_vectors)
for term in query_terms:
if term in self.tfidf_index:
df = self.tfidf_index[term]['df']
idf = math.log(total_skills / (1 + df))
for skill_name, tf in self.tfidf_index[term]['skills'].items():
score = tf * idf
skill_scores[skill_name] = skill_scores.get(skill_name, 0) + score
# 按得分排序
sorted_skills = sorted(skill_scores.items(), key=lambda x: x[1], reverse=True)
return [skill_name for skill_name, score in sorted_skills]
def _extract_terms_with_frequency(self, skill: Skill) -> Dict[str, float]:
"""提取术语及其频率"""
# 实现术语频率计算
pass
2. 缓存优化
class CachedSkillRegistry(SkillRegistry):
"""带缓存的技能注册表"""
def __init__(self, cache_size: int = 1000):
super().__init__()
self.cache = {}
self.cache_size = cache_size
self.access_times = {}
def find_skill(self, skill_id: str) -> Optional[Skill]:
"""查找技能,带缓存"""
# 检查缓存
if skill_id in self.cache:
self.access_times[skill_id] = time.time()
return self.cache[skill_id]
# 从主存储获取
skill = super().find_skill(skill_id)
if skill:
self._add_to_cache(skill_id, skill)
return skill
def _add_to_cache(self, skill_id: str, skill: Skill):
"""添加到缓存"""
if len(self.cache) >= self.cache_size:
# 移除最近最少使用的项
lru_id = min(self.access_times.items(), key=lambda x: x[1])[0]
del self.cache[lru_id]
del self.access_times[lru_id]
self.cache[skill_id] = skill
self.access_times[skill_id] = time.time()
五、测试方案
1. 单元测试
# test_skill_registry.py
import pytest
import tempfile
import shutil
from pathlib import Path
from skill_registry import SkillRegistry, SkillDiscovery
class TestSkillRegistry:
def setup_method(self):
self.registry = SkillRegistry()
self.temp_dir = tempfile.mkdtemp()
def teardown_method(self):
shutil.rmtree(self.temp_dir)
def test_register_and_find_skill(self):
"""测试注册和查找技能"""
# 创建测试技能文件
skill_content = """---
metadata:
name: test-skill
version: 1.0.0
description: A test skill
categories: [test]
tags: [unit-test]
steps:
- name: step1
type: command
parameters:
command: echo
args: ["Hello"]
---
"""
skill_file = Path(self.temp_dir) / "test.skill.yaml"
skill_file.write_text(skill_content)
# 发现并注册技能
self.registry.discover_skills([str(self.temp_dir)])
# 查找技能
skill = self.registry.find_skill_by_name('test-skill')
assert skill is not None
assert skill.metadata.name == 'test-skill'
assert 'test' in skill.metadata.categories
def test_category_index(self):
"""测试分类索引"""
# 创建多个测试技能
for i in range(3):
skill_content = f"""---
metadata:
name: test-skill-{i}
version: 1.0.0
description: Test skill {i}
categories: [test, category-{i % 2}]
steps: []
---
"""
skill_file = Path(self.temp_dir) / f"test{i}.skill.yaml"
skill_file.write_text(skill_content)
# 发现并注册技能
self.registry.discover_skills([str(self.temp_dir)])
# 检查分类索引
test_skills = self.registry.get_skills_by_category('test')
assert len(test_skills) == 3
category0_skills = self.registry.get_skills_by_category('category-0')
assert len(category0_skills) == 2
def test_search_skills(self):
"""测试技能搜索"""
skill_content = """---
metadata:
name: git-release-advanced
version: 2.0.0
description: Advanced git release automation with changelog generation
categories: [git, automation]
tags: [release, versioning, ci-cd]
steps: []
---
"""
skill_file = Path(self.temp_dir) / "git.skill.yaml"
skill_file.write_text(skill_content)
self.registry.discover_skills([str(self.temp_dir)])
# 搜索测试
results = self.registry.search_skills('release automation')
assert len(results) == 1
assert results[0].metadata.name == 'git-release-advanced'
六、部署与配置
1. Docker容器化
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY skill_registry/ ./skill_registry/
COPY skill_parser/ ./skill_parser/
# 创建技能目录
RUN mkdir -p /skills
# 环境变量
ENV SKILL_PATHS="/skills"
ENV REGISTRY_PERSISTENCE_PATH="/data/registry"
# 启动脚本
COPY start_registry.py .
CMD ["python", "start_registry.py"]
2. Kubernetes部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: opencode-skill-registry
spec:
replicas: 3
selector:
matchLabels:
app: skill-registry
template:
metadata:
labels:
app: skill-registry
spec:
containers:
- name: registry
image: opencode/skill-registry:latest
ports:
- containerPort: 8080
env:
- name: SKILL_PATHS
value: "/skills:/shared-skills"
- name: REGISTRY_PERSISTENCE_PATH
value: "/data/registry"
volumeMounts:
- name: skill-storage
mountPath: /skills
readOnly: true
- name: shared-skill-storage
mountPath: /shared-skills
readOnly: true
- name: registry-data
mountPath: /data/registry
volumes:
- name: skill-storage
configMap:
name: builtin-skills
- name: shared-skill-storage
persistentVolumeClaim:
claimName: shared-skills-pvc
- name: registry-data
persistentVolumeClaim:
claimName: registry-data-pvc
---
apiVersion: v1
kind: Service
metadata:
name: skill-registry-service
spec:
selector:
app: skill-registry
ports:
- port: 8080
targetPort: 8080
type: LoadBalancer
概述
这个技能注册表方案提供了完整的技能发现、索引和分类功能,具有以下特点:
-
灵活的发现机制:支持多个搜索路径,自动扫描技能文件
-
强大的索引系统:包括分类索引、标签索引和全文搜索
-
实时更新:通过文件监控实现技能热重载
-
高性能:支持缓存和持久化,TF-IDF搜索优化
-
可扩展性:模块化设计,易于添加新的索引策略或存储后端
-
生产就绪:支持容器化部署,配置灵活
这个方案可以满足OpenCode技能系统的基础需求,并为未来的扩展奠定了基础。
下面将详细说明:
OpenCode技能系统基础:技能注册表详细方案
我将为你提供一个完整的技能注册表设计方案,涵盖技能发现、索引构建和分类标签系统。
一、系统架构设计:UML建模
1. 核心类图设计
2. 技能发现与注册序列图
3. 搜索查询序列图
二、核心实现方案
1. 技能注册表核心实现
# registry/core.py
import json
import hashlib
import threading
import asyncio
from datetime import datetime
from typing import Dict, List, Set, Optional, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import logging
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
class SkillStatus(Enum):
"""技能状态"""
DISABLED = "disabled"
ENABLED = "enabled"
DEPRECATED = "deprecated"
EXPERIMENTAL = "experimental"
class SkillSource(Enum):
"""技能来源"""
LOCAL = "local"
GIT = "git"
REGISTRY = "registry"
USER = "user"
@dataclass
class RegisteredSkill:
"""已注册的技能"""
id: str
skill: 'Skill' # 从skill_models导入
metadata: 'SkillMetadata'
source_path: str
source_type: SkillSource
registered_at: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
status: SkillStatus = SkillStatus.ENABLED
usage_stats: Dict[str, Any] = field(default_factory=dict)
# 索引字段
categories: Set[str] = field(default_factory=set)
tags: Set[str] = field(default_factory=set)
def enable(self):
"""启用技能"""
self.status = SkillStatus.ENABLED
def disable(self):
"""禁用技能"""
self.status = SkillStatus.DISABLED
def mark_deprecated(self):
"""标记为已弃用"""
self.status = SkillStatus.DEPRECATED
def update(self, skill: 'Skill', metadata: 'SkillMetadata' = None):
"""更新技能"""
self.skill = skill
if metadata:
self.metadata = metadata
self.last_updated = datetime.now()
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'id': self.id,
'name': self.skill.metadata.name,
'version': self.skill.metadata.version,
'description': self.skill.metadata.description,
'source_path': self.source_path,
'source_type': self.source_type.value,
'registered_at': self.registered_at.isoformat(),
'last_updated': self.last_updated.isoformat(),
'status': self.status.value,
'usage_stats': self.usage_stats,
'categories': list(self.categories),
'tags': list(self.tags)
}
class SkillRegistry:
"""技能注册表"""
def __init__(self,
storage_path: Optional[Path] = None,
max_workers: int = 4):
"""
初始化技能注册表
Args:
storage_path: 持久化存储路径
max_workers: 最大工作线程数
"""
self.storage_path = storage_path or Path.home() / '.opencode' / 'registry'
self.storage_path.mkdir(parents=True, exist_ok=True)
# 核心存储
self._skills: Dict[str, RegisteredSkill] = {}
self._name_index: Dict[str, Set[str]] = {} # 名称 -> 技能ID集合
self._lock = threading.RLock()
# 索引系统
self.category_index = CategoryIndex()
self.tag_index = TagIndex()
self.fulltext_index = FullTextIndex()
# 发现和加载系统
self.skill_loader = SkillLoader()
self.discovery_engine = DiscoveryEngine()
# 配置
self.max_workers = max_workers
self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
# 缓存
self._cache = {}
self._cache_lock = threading.Lock()
# 统计数据
self.stats = {
'total_registered': 0,
'last_discovery': None,
'discovery_errors': 0
}
# 加载持久化数据
self._load_persistent_data()
def register(self,
skill: 'Skill',
source_path: str,
source_type: SkillSource = SkillSource.LOCAL,
metadata: Optional['SkillMetadata'] = None) -> RegisteredSkill:
"""
注册技能
Args:
skill: 技能对象
source_path: 源路径
source_type: 来源类型
metadata: 元数据(可选)
Returns:
已注册的技能对象
"""
with self._lock:
# 生成技能ID
skill_id = self._generate_skill_id(skill, source_path)
# 检查是否已注册
if skill_id in self._skills:
logger.info(f"Skill {skill.metadata.name} already registered, updating...")
return self._update_existing_skill(skill_id, skill, metadata)
# 使用提供的元数据或从技能中提取
skill_metadata = metadata or skill.metadata
# 创建已注册的技能对象
registered_skill = RegisteredSkill(
id=skill_id,
skill=skill,
metadata=skill_metadata,
source_path=source_path,
source_type=source_type
)
# 提取分类和标签
registered_skill.categories = set(skill_metadata.categories)
registered_skill.tags = set(skill_metadata.tags)
# 存储到主字典
self._skills[skill_id] = registered_skill
# 更新名称索引
self._update_name_index(skill_id, skill.metadata.name)
# 更新分类索引
self.category_index.add_skill(registered_skill)
# 更新标签索引
self.tag_index.add_skill(registered_skill)
# 更新全文索引
self.fulltext_index.index_skill(registered_skill)
# 更新统计
self.stats['total_registered'] += 1
logger.info(f"Registered skill: {skill.metadata.name} v{skill.metadata.version} (id: {skill_id})")
# 持久化
self._save_persistent_data()
return registered_skill
def unregister(self, skill_id: str) -> bool:
"""
取消注册技能
Args:
skill_id: 技能ID
Returns:
是否成功取消注册
"""
with self._lock:
if skill_id not in self._skills:
return False
skill = self._skills[skill_id]
# 从名称索引中移除
self._remove_from_name_index(skill_id, skill.skill.metadata.name)
# 从分类索引中移除
self.category_index.remove_skill(skill_id)
# 从标签索引中移除
self.tag_index.remove_skill(skill_id)
# 从全文索引中移除
self.fulltext_index.remove_skill(skill_id)
# 从主字典中移除
del self._skills[skill_id]
# 更新统计
self.stats['total_registered'] = max(0, self.stats['total_registered'] - 1)
logger.info(f"Unregistered skill: {skill.skill.metadata.name}")
# 持久化
self._save_persistent_data()
return True
def find_by_id(self, skill_id: str) -> Optional[RegisteredSkill]:
"""根据ID查找技能"""
with self._lock:
return self._skills.get(skill_id)
def find_by_name(self, name: str) -> List[RegisteredSkill]:
"""根据名称查找技能"""
with self._lock:
skill_ids = self._name_index.get(name.lower(), set())
return [self._skills[skill_id] for skill_id in skill_ids
if skill_id in self._skills]
def find_by_name_version(self, name: str, version: str) -> Optional[RegisteredSkill]:
"""根据名称和版本查找技能"""
candidates = self.find_by_name(name)
for skill in candidates:
if skill.skill.metadata.version == version:
return skill
return None
async def discover(self,
paths: List[str],
recursive: bool = True,
async_load: bool = True) -> 'DiscoveryResult':
"""
发现并注册技能
Args:
paths: 要扫描的路径列表
recursive: 是否递归扫描
async_load: 是否异步加载
Returns:
发现结果
"""
discovery_result = DiscoveryResult()
self.stats['last_discovery'] = datetime.now()
# 使用发现引擎发现技能
discovery_items = await self.discovery_engine.discover(
paths,
recursive=recursive
)
discovery_result.total_found = len(discovery_items)
# 加载和注册技能
if async_load:
# 异步加载
tasks = []
for item in discovery_items:
task = asyncio.create_task(self._load_and_register_item(item, discovery_result))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
else:
# 同步加载
for item in discovery_items:
await self._load_and_register_item(item, discovery_result)
# 持久化
self._save_persistent_data()
return discovery_result
async def _load_and_register_item(self,
item: 'DiscoveryItem',
result: 'DiscoveryResult') -> None:
"""加载并注册发现项"""
try:
# 加载技能
skill = await self.skill_loader.load_async(item.path)
# 确定来源类型
source_type = self._determine_source_type(item.path)
# 注册技能
registered_skill = self.register(
skill=skill,
source_path=item.path,
source_type=source_type
)
result.registered.append(registered_skill.id)
result.successful += 1
except Exception as e:
logger.error(f"Failed to load skill from {item.path}: {e}")
result.errors.append({
'path': item.path,
'error': str(e)
})
result.failed += 1
def search(self,
query: str,
categories: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
status: Optional[SkillStatus] = None,
limit: int = 20,
offset: int = 0) -> 'SearchResult':
"""
搜索技能
Args:
query: 搜索查询
categories: 过滤分类
tags: 过滤标签
status: 过滤状态
limit: 结果数量限制
offset: 偏移量
Returns:
搜索结果
"""
# 生成缓存键
cache_key = self._generate_cache_key({
'query': query,
'categories': categories,
'tags': tags,
'status': status,
'limit': limit,
'offset': offset
})
# 检查缓存
with self._cache_lock:
if cache_key in self._cache:
cached_result, timestamp = self._cache[cache_key]
# 检查缓存是否过期(5分钟)
if (datetime.now() - timestamp).seconds < 300:
return cached_result
# 执行搜索
search_result = self._execute_search(
query=query,
categories=categories,
tags=tags,
status=status,
limit=limit,
offset=offset
)
# 缓存结果
with self._cache_lock:
self._cache[cache_key] = (search_result, datetime.now())
# 清理过期缓存
self._clean_expired_cache()
return search_result
def _execute_search(self, **kwargs) -> 'SearchResult':
"""执行搜索"""
# 首先通过全文索引获取候选
if kwargs['query']:
text_hits = self.fulltext_index.search(
query=kwargs['query'],
limit=kwargs['limit'] * 2 # 获取更多结果以便过滤
)
candidate_ids = {hit.skill_id for hit in text_hits}
else:
candidate_ids = set(self._skills.keys())
# 应用过滤器
filtered_skills = []
for skill_id in candidate_ids:
skill = self._skills.get(skill_id)
if not skill:
continue
# 状态过滤
if kwargs['status'] and skill.status != kwargs['status']:
continue
# 分类过滤
if kwargs['categories']:
if not any(cat in skill.categories for cat in kwargs['categories']):
continue
# 标签过滤
if kwargs['tags']:
if not any(tag in skill.tags for tag in kwargs['tags']):
continue
filtered_skills.append(skill)
# 排序
if kwargs['query']:
# 如果有查询,按相关性排序
scored_skills = []
for skill in filtered_skills:
# 计算简单得分(实际应该使用更复杂的算法)
score = self._calculate_relevance_score(skill, kwargs['query'])
scored_skills.append((score, skill))
scored_skills.sort(key=lambda x: x[0], reverse=True)
filtered_skills = [skill for _, skill in scored_skills]
else:
# 否则按名称排序
filtered_skills.sort(key=lambda x: x.skill.metadata.name.lower())
# 分页
start = kwargs['offset']
end = start + kwargs['limit']
paged_skills = filtered_skills[start:end]
# 构建搜索结果
search_hits = []
for i, skill in enumerate(paged_skills):
hit = SearchHit(
skill_id=skill.id,
skill=skill,
score=(len(filtered_skills) - i) / len(filtered_skills) if filtered_skills else 0
)
search_hits.append(hit)
# 构建聚合
aggregations = self._build_aggregations(filtered_skills)
# 构建建议
suggestions = self._generate_suggestions(kwargs['query'], filtered_skills)
return SearchResult(
hits=search_hits,
total_hits=len(filtered_skills),
aggregations=aggregations,
suggestions=suggestions
)
def get_by_category(self, category: str) -> List[RegisteredSkill]:
"""根据分类获取技能"""
skill_ids = self.category_index.get_skills(category)
return [self._skills[skill_id] for skill_id in skill_ids
if skill_id in self._skills]
def get_by_tag(self, tag: str) -> List[RegisteredSkill]:
"""根据标签获取技能"""
skill_ids = self.tag_index.get_skills(tag)
return [self._skills[skill_id] for skill_id in skill_ids
if skill_id in self._skills]
def get_all_categories(self) -> Set[str]:
"""获取所有分类"""
return self.category_index.get_categories()
def get_all_tags(self) -> Set[str]:
"""获取所有标签"""
return self.tag_index.get_tags()
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
**self.stats,
'total_skills': len(self._skills),
'categories_count': len(self.get_all_categories()),
'tags_count': len(self.get_all_tags()),
'cache_size': len(self._cache)
}
def refresh(self) -> Dict[str, Any]:
"""刷新注册表(重新发现和重新索引)"""
result = {
'reindexed': 0,
'removed': 0,
'errors': 0
}
with self._lock:
# 重新索引所有技能
for skill_id, skill in list(self._skills.items()):
try:
# 重新加载技能
loaded_skill = self.skill_loader.load(skill.source_path)
# 更新技能
skill.update(loaded_skill, loaded_skill.metadata)
# 重新索引
self.category_index.remove_skill(skill_id)
self.category_index.add_skill(skill)
self.tag_index.remove_skill(skill_id)
self.tag_index.add_skill(skill)
self.fulltext_index.update_skill(skill)
result['reindexed'] += 1
except Exception as e:
logger.error(f"Failed to refresh skill {skill_id}: {e}")
result['errors'] += 1
# 清理不存在的技能
for skill_id in list(self._skills.keys()):
skill = self._skills[skill_id]
if not Path(skill.source_path).exists():
self.unregister(skill_id)
result['removed'] += 1
return result
def export(self,
include_skills: bool = False,
format: str = 'json') -> 'RegistryExport':
"""导出注册表"""
export_data = {
'exported_at': datetime.now().isoformat(),
'version': '1.0.0',
'stats': self.get_stats(),
'skills': []
}
for skill in self._skills.values():
skill_data = skill.to_dict()
if include_skills:
# 包含技能内容
skill_data['skill_content'] = self._serialize_skill(skill.skill)
export_data['skills'].append(skill_data)
return RegistryExport(data=export_data, format=format)
def import_export(self, export: 'RegistryExport') -> Dict[str, Any]:
"""导入注册表"""
result = {
'imported': 0,
'skipped': 0,
'errors': 0
}
data = export.data
for skill_data in data['skills']:
try:
# 检查是否已存在
existing = self.find_by_name_version(
skill_data['name'],
skill_data.get('version', '1.0.0')
)
if existing:
result['skipped'] += 1
continue
# 如果是完整导出,重新注册技能
if 'skill_content' in skill_data:
skill = self._deserialize_skill(skill_data['skill_content'])
self.register(
skill=skill,
source_path=skill_data['source_path'],
source_type=SkillSource(skill_data['source_type'])
)
else:
# 只注册元数据
pass
result['imported'] += 1
except Exception as e:
logger.error(f"Failed to import skill {skill_data.get('name')}: {e}")
result['errors'] += 1
return result
# 私有方法
def _generate_skill_id(self, skill: 'Skill', source_path: str) -> str:
"""生成技能ID"""
content = f"{skill.metadata.name}:{skill.metadata.version}:{source_path}"
return f"skill_{hashlib.sha256(content.encode()).hexdigest()[:16]}"
def _update_existing_skill(self,
skill_id: str,
skill: 'Skill',
metadata: Optional['SkillMetadata']) -> RegisteredSkill:
"""更新现有技能"""
existing = self._skills[skill_id]
# 更新名称索引
if existing.skill.metadata.name != skill.metadata.name:
self._remove_from_name_index(skill_id, existing.skill.metadata.name)
self._update_name_index(skill_id, skill.metadata.name)
# 更新技能
existing.update(skill, metadata)
# 重新索引
self.category_index.remove_skill(skill_id)
self.category_index.add_skill(existing)
self.tag_index.remove_skill(skill_id)
self.tag_index.add_skill(existing)
self.fulltext_index.update_skill(existing)
return existing
def _update_name_index(self, skill_id: str, name: str):
"""更新名称索引"""
name_key = name.lower()
if name_key not in self._name_index:
self._name_index[name_key] = set()
self._name_index[name_key].add(skill_id)
def _remove_from_name_index(self, skill_id: str, name: str):
"""从名称索引中移除"""
name_key = name.lower()
if name_key in self._name_index:
self._name_index[name_key].discard(skill_id)
if not self._name_index[name_key]:
del self._name_index[name_key]
def _determine_source_type(self, path: str) -> SkillSource:
"""确定来源类型"""
if path.startswith('http://') or path.startswith('https://'):
if 'github.com' in path or 'gitlab.com' in path:
return SkillSource.GIT
else:
return SkillSource.REGISTRY
elif path.startswith('git@'):
return SkillSource.GIT
else:
return SkillSource.LOCAL
def _generate_cache_key(self, params: Dict) -> str:
"""生成缓存键"""
import json
key_data = json.dumps(params, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()
def _clean_expired_cache(self, max_age: int = 300):
"""清理过期缓存"""
now = datetime.now()
expired_keys = []
for key, (_, timestamp) in self._cache.items():
if (now - timestamp).seconds > max_age:
expired_keys.append(key)
for key in expired_keys:
del self._cache[key]
def _calculate_relevance_score(self, skill: RegisteredSkill, query: str) -> float:
"""计算相关性得分"""
score = 0.0
# 名称匹配
if query.lower() in skill.skill.metadata.name.lower():
score += 10.0
# 描述匹配
if query.lower() in skill.skill.metadata.description.lower():
score += 5.0
# 标签匹配
for tag in skill.tags:
if query.lower() in tag.lower():
score += 3.0
# 分类匹配
for category in skill.categories:
if query.lower() in category.lower():
score += 2.0
return score
def _build_aggregations(self, skills: List[RegisteredSkill]) -> Dict[str, Any]:
"""构建聚合数据"""
aggregations = {
'categories': {},
'tags': {},
'status': {}
}
for skill in skills:
# 分类聚合
for category in skill.categories:
aggregations['categories'][category] = aggregations['categories'].get(category, 0) + 1
# 标签聚合
for tag in skill.tags:
aggregations['tags'][tag] = aggregations['tags'].get(tag, 0) + 1
# 状态聚合
status = skill.status.value
aggregations['status'][status] = aggregations['status'].get(status, 0) + 1
return aggregations
def _generate_suggestions(self, query: str, skills: List[RegisteredSkill]) -> List['Suggestion']:
"""生成搜索建议"""
suggestions = []
if not query or len(query) < 2:
return suggestions
query_lower = query.lower()
# 从技能中提取建议
seen_suggestions = set()
for skill in skills:
# 名称建议
if query_lower in skill.skill.metadata.name.lower():
suggestion = skill.skill.metadata.name
if suggestion not in seen_suggestions:
suggestions.append(Suggestion(
text=suggestion,
type='skill_name'
))
seen_suggestions.add(suggestion)
# 标签建议
for tag in skill.tags:
if query_lower in tag.lower():
if tag not in seen_suggestions:
suggestions.append(Suggestion(
text=tag,
type='tag'
))
seen_suggestions.add(tag)
# 分类建议
for category in skill.categories:
if query_lower in category.lower():
if category not in seen_suggestions:
suggestions.append(Suggestion(
text=category,
type='category'
))
seen_suggestions.add(category)
# 限制数量
return suggestions[:10]
def _save_persistent_data(self):
"""保存持久化数据"""
try:
data = {
'skills': {},
'name_index': self._name_index,
'stats': self.stats,
'saved_at': datetime.now().isoformat()
}
# 只保存技能元数据,不保存完整技能对象
for skill_id, skill in self._skills.items():
data['skills'][skill_id] = skill.to_dict()
file_path = self.storage_path / 'registry.json'
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, default=str)
logger.debug(f"Registry data saved to {file_path}")
except Exception as e:
logger.error(f"Failed to save registry data: {e}")
def _load_persistent_data(self):
"""加载持久化数据"""
file_path = self.storage_path / 'registry.json'
if not file_path.exists():
logger.info("No persistent registry data found")
return
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 加载名称索引和统计
self._name_index = data.get('name_index', {})
self.stats.update(data.get('stats', {}))
logger.info(f"Loaded registry data from {file_path}")
except Exception as e:
logger.error(f"Failed to load registry data: {e}")
def _serialize_skill(self, skill: 'Skill') -> str:
"""序列化技能"""
# 使用之前的技能解析器的序列化功能
from parsers.skill_parser import SkillDSLParser
parser = SkillDSLParser()
return parser.serialize(skill, format='yaml')
def _deserialize_skill(self, content: str) -> 'Skill':
"""反序列化技能"""
from parsers.skill_parser import SkillDSLParser
parser = SkillDSLParser()
return parser.parse(content)
2. 分类索引系统实现
# registry/indexes/category_index.py
import re
from typing import Dict, Set, List, Optional
from collections import defaultdict
import unicodedata
class CategoryHierarchy:
"""分类层次结构"""
def __init__(self):
self.parents: Dict[str, str] = {} # 子分类 -> 父分类
self.children: Dict[str, Set[str]] = defaultdict(set) # 父分类 -> 子分类集合
def add_category(self, category: str, parent: Optional[str] = None):
"""添加分类"""
if parent:
self.parents[category] = parent
self.children[parent].add(category)
elif category not in self.parents:
# 顶级分类
self.parents[category] = None
def get_parent(self, category: str) -> Optional[str]:
"""获取父分类"""
return self.parents.get(category)
def get_children(self, category: str) -> Set[str]:
"""获取子分类"""
return self.children.get(category, set())
def get_ancestors(self, category: str) -> List[str]:
"""获取所有祖先分类"""
ancestors = []
current = category
while current in self.parents and self.parents[current]:
current = self.parents[current]
ancestors.append(current)
return ancestors
def get_descendants(self, category: str) -> Set[str]:
"""获取所有后代分类"""
descendants = set()
def collect_children(cat: str):
for child in self.get_children(cat):
descendants.add(child)
collect_children(child)
collect_children(category)
return descendants
def normalize_path(self, category: str) -> str:
"""规范化分类路径"""
parts = []
current = category
while current:
parts.insert(0, current)
current = self.get_parent(current)
return '::'.join(parts)
class CategoryIndex:
"""分类索引"""
def __init__(self):
self.index: Dict[str, Set[str]] = defaultdict(set) # 分类 -> 技能ID集合
self.hierarchy = CategoryHierarchy()
self._normalized_names: Dict[str, str] = {} # 规范化名称 -> 原始名称
def add_skill(self, skill: 'RegisteredSkill'):
"""添加技能到分类索引"""
skill_id = skill.id
for category in skill.categories:
# 规范化分类名称
normalized = self._normalize_category_name(category)
# 检查是否有父分类
if '::' in normalized:
parts = normalized.split('::')
for i in range(1, len(parts)):
parent = '::'.join(parts[:i])
child = '::'.join(parts[:i+1])
self.hierarchy.add_category(child, parent)
# 添加到索引
self.index[normalized].add(skill_id)
# 同时添加到所有祖先分类
ancestors = self.hierarchy.get_ancestors(normalized)
for ancestor in ancestors:
self.index[ancestor].add(skill_id)
def remove_skill(self, skill_id: str):
"""从分类索引中移除技能"""
categories_to_remove = []
for category, skill_ids in self.index.items():
if skill_id in skill_ids:
skill_ids.discard(skill_id)
if not skill_ids:
categories_to_remove.append(category)
# 清理空分类
for category in categories_to_remove:
del self.index[category]
def get_skills(self, category: str) -> Set[str]:
"""获取分类下的技能ID"""
normalized = self._normalize_category_name(category)
# 获取直接分类下的技能
direct_skills = self.index.get(normalized, set()).copy()
# 获取子分类下的技能
descendants = self.hierarchy.get_descendants(normalized)
for descendant in descendants:
direct_skills.update(self.index.get(descendant, set()))
return direct_skills
def get_categories(self) -> Set[str]:
"""获取所有分类"""
# 返回原始分类名称
original_names = set()
for normalized in self.index.keys():
original = self._normalized_names.get(normalized, normalized)
original_names.add(original)
return original_names
def get_subcategories(self, category: str) -> Set[str]:
"""获取子分类"""
normalized = self._normalize_category_name(category)
children = self.hierarchy.get_children(normalized)
# 转换回原始名称
original_children = set()
for child in children:
original = self._denormalize_category_name(child)
original_children.add(original)
return original_children
def suggest_categories(self, query: str, limit: int = 10) -> List[str]:
"""建议分类"""
suggestions = []
query_lower = query.lower()
for normalized, original in self._normalized_names.items():
# 检查是否匹配
if (query_lower in normalized.lower() or
query_lower in original.lower()):
suggestions.append(original)
if len(suggestions) >= limit:
break
return suggestions
def get_category_stats(self) -> Dict[str, int]:
"""获取分类统计"""
stats = {}
for category, skill_ids in self.index.items():
original = self._denormalize_category_name(category)
stats[original] = len(skill_ids)
return stats
def _normalize_category_name(self, category: str) -> str:
"""规范化分类名称"""
# 转换为小写,移除多余空格
normalized = category.strip().lower()
# 移除特殊字符,只保留字母、数字、空格和::
normalized = re.sub(r'[^a-z0-9\s:]', '', normalized)
# 替换多个空格为单个空格
normalized = re.sub(r'\s+', ' ', normalized)
# 分割层级
if '::' in normalized:
parts = normalized.split('::')
parts = [part.strip() for part in parts]
normalized = '::'.join(parts)
# 存储映射
if normalized != category:
self._normalized_names[normalized] = category
return normalized
def _denormalize_category_name(self, normalized: str) -> str:
"""反规范化分类名称"""
return self._normalized_names.get(normalized, normalized)
def build_hierarchy_tree(self) -> Dict[str, Any]:
"""构建分类层次树"""
tree = {}
# 找到所有顶级分类
top_level = set()
for category in self.index.keys():
if '::' not in category:
top_level.add(category)
else:
parent = category.rsplit('::', 1)[0]
if parent not in self.index:
top_level.add(parent)
# 递归构建树
def build_node(category: str) -> Dict[str, Any]:
node = {
'name': self._denormalize_category_name(category),
'skill_count': len(self.index.get(category, set())),
'children': []
}
children = self.hierarchy.get_children(category)
for child in children:
child_node = build_node(child)
node['children'].append(child_node)
return node
for top_category in sorted(top_level):
tree[top_category] = build_node(top_category)
return tree
3. 标签索引系统实现
# registry/indexes/tag_index.py
from typing import Dict, Set, List, Tuple
from collections import defaultdict, Counter
import heapq
class TagStatistics:
"""标签统计"""
def __init__(self):
self.tag_freq: Dict[str, int] = defaultdict(int) # 标签频率
self.cooccurrence: Dict[Tuple[str, str], int] = defaultdict(int) # 共现统计
self.tag_timeline: Dict[str, List[int]] = defaultdict(list) # 时间线统计
def record_tag(self, tag: str, timestamp: int = None):
"""记录标签使用"""
self.tag_freq[tag] += 1
if timestamp:
self.tag_timeline[tag].append(timestamp)
def record_cooccurrence(self, tag1: str, tag2: str):
"""记录标签共现"""
if tag1 != tag2:
key = tuple(sorted([tag1, tag2]))
self.cooccurrence[key] += 1
def get_popular_tags(self, limit: int = 20) -> List[Tuple[str, int]]:
"""获取热门标签"""
return heapq.nlargest(limit, self.tag_freq.items(), key=lambda x: x[1])
def get_related_tags(self, tag: str, limit: int = 10) -> List[Tuple[str, int]]:
"""获取相关标签"""
related = []
for (tag1, tag2), count in self.cooccurrence.items():
if tag1 == tag:
related.append((tag2, count))
elif tag2 == tag:
related.append((tag1, count))
# 按共现次数排序
related.sort(key=lambda x: x[1], reverse=True)
return related[:limit]
def get_tag_trend(self, tag: str, time_window: str = 'week') -> Dict[str, int]:
"""获取标签趋势"""
# 这里可以实现时间窗口分析
return {'current': self.tag_freq.get(tag, 0)}
class TagIndex:
"""标签索引"""
def __init__(self):
self.index: Dict[str, Set[str]] = defaultdict(set) # 标签 -> 技能ID集合
self.skill_tags: Dict[str, Set[str]] = defaultdict(set) # 技能ID -> 标签集合
self.statistics = TagStatistics()
self._normalized_tags: Dict[str, str] = {} # 规范化标签 -> 原始标签
def add_skill(self, skill: 'RegisteredSkill'):
"""添加技能到标签索引"""
skill_id = skill.id
tags = skill.tags
# 记录技能标签
self.skill_tags[skill_id] = tags.copy()
# 添加到索引
for tag in tags:
normalized = self._normalize_tag(tag)
self.index[normalized].add(skill_id)
# 更新统计
self.statistics.record_tag(normalized)
# 记录共现
tag_list = list(tags)
for i in range(len(tag_list)):
for j in range(i + 1, len(tag_list)):
self.statistics.record_cooccurrence(
self._normalize_tag(tag_list[i]),
self._normalize_tag(tag_list[j])
)
def remove_skill(self, skill_id: str):
"""从标签索引中移除技能"""
if skill_id not in self.skill_tags:
return
# 从索引中移除
tags = self.skill_tags[skill_id]
for tag in tags:
normalized = self._normalize_tag(tag)
if normalized in self.index:
self.index[normalized].discard(skill_id)
if not self.index[normalized]:
del self.index[normalized]
# 从技能标签映射中移除
del self.skill_tags[skill_id]
def get_skills(self, tag: str) -> Set[str]:
"""获取标签下的技能ID"""
normalized = self._normalize_tag(tag)
return self.index.get(normalized, set()).copy()
def get_tags(self) -> Set[str]:
"""获取所有标签"""
# 返回原始标签名称
original_tags = set()
for normalized in self.index.keys():
original = self._normalized_tags.get(normalized, normalized)
original_tags.add(original)
return original_tags
def get_skill_tags(self, skill_id: str) -> Set[str]:
"""获取技能的标签"""
return self.skill_tags.get(skill_id, set()).copy()
def get_related_tags(self, tag: str) -> Set[str]:
"""获取相关标签"""
normalized = self._normalize_tag(tag)
related = self.statistics.get_related_tags(normalized)
# 转换回原始标签名称
original_related = set()
for related_tag, _ in related:
original = self._denormalize_tag(related_tag)
original_related.add(original)
return original_related
def get_popular_tags(self, limit: int = 20) -> List[Tuple[str, int]]:
"""获取热门标签"""
popular = self.statistics.get_popular_tags(limit)
# 转换回原始标签名称
result = []
for tag, count in popular:
original = self._denormalize_tag(tag)
result.append((original, count))
return result
def search_by_tags(self,
include_tags: Set[str],
exclude_tags: Set[str] = None) -> Set[str]:
"""
根据标签搜索技能
Args:
include_tags: 必须包含的标签
exclude_tags: 必须排除的标签
Returns:
符合条件的技能ID集合
"""
if not include_tags:
return set()
# 初始化为第一个标签的技能集合
first_tag = next(iter(include_tags))
result = self.get_skills(first_tag)
# 应用其他包含标签
for tag in include_tags:
if tag == first_tag:
continue
result &= self.get_skills(tag)
# 应用排除标签
if exclude_tags:
for tag in exclude_tags:
result -= self.get_skills(tag)
return result
def suggest_tags(self,
partial_tag: str,
context_tags: Set[str] = None,
limit: int = 10) -> List[str]:
"""建议标签"""
suggestions = []
partial_lower = partial_tag.lower()
# 基于部分匹配的建议
for normalized, original in self._normalized_tags.items():
if partial_lower in normalized.lower():
suggestions.append(original)
# 基于上下文标签的相关建议
if context_tags:
for tag in context_tags:
related = self.get_related_tags(tag)
for related_tag in related:
if partial_lower in related_tag.lower():
if related_tag not in suggestions:
suggestions.append(related_tag)
# 去重并限制数量
seen = set()
unique_suggestions = []
for suggestion in suggestions:
if suggestion not in seen and len(unique_suggestions) < limit:
seen.add(suggestion)
unique_suggestions.append(suggestion)
return unique_suggestions
def get_tag_cloud_data(self,
min_font: int = 12,
max_font: int = 48) -> List[Dict[str, Any]]:
"""获取标签云数据"""
tags_with_freq = []
for normalized, skill_ids in self.index.items():
freq = len(skill_ids)
if freq > 0:
original = self._denormalize_tag(normalized)
tags_with_freq.append({
'tag': original,
'frequency': freq,
'skills_count': freq
})
if not tags_with_freq:
return []
# 计算字体大小
min_freq = min(tag['frequency'] for tag in tags_with_freq)
max_freq = max(tag['frequency'] for tag in tags_with_freq)
for tag in tags_with_freq:
if max_freq == min_freq:
tag['font_size'] = (min_font + max_font) // 2
else:
# 线性映射频率到字体大小
normalized_freq = (tag['frequency'] - min_freq) / (max_freq - min_freq)
tag['font_size'] = min_font + int(normalized_freq * (max_font - min_font))
return tags_with_freq
def _normalize_tag(self, tag: str) -> str:
"""规范化标签"""
# 转换为小写,移除多余空格
normalized = tag.strip().lower()
# 移除特殊字符,只保留字母、数字和连字符
normalized = re.sub(r'[^a-z0-9\-]', '', normalized)
# 存储映射
if normalized != tag:
self._normalized_tags[normalized] = tag
return normalized
def _denormalize_tag(self, normalized: str) -> str:
"""反规范化标签"""
return self._normalized_tags.get(normalized, normalized)
4. 发现引擎实现
# registry/discovery/engine.py
import asyncio
import os
import fnmatch
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import logging
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
@dataclass
class DiscoveryItem:
"""发现项"""
path: str
type: str # 'file', 'directory', 'git', 'remote'
discovered_at: datetime
metadata: Dict[str, Any] = None
priority: int = 0
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class DiscoveryStrategy:
"""发现策略基类"""
def __init__(self, priority: int = 0):
self.priority = priority
self.name = self.__class__.__name__
async def can_handle(self, path: str) -> bool:
"""检查是否能处理该路径"""
raise NotImplementedError
async def discover(self, path: str) -> List[DiscoveryItem]:
"""发现技能"""
raise NotImplementedError
def get_priority(self) -> int:
"""获取优先级"""
return self.priority
class FileSystemDiscovery(DiscoveryStrategy):
"""文件系统发现策略"""
def __init__(self,
patterns: Optional[List[str]] = None,
recursive: bool = True,
priority: int = 10):
super().__init__(priority)
self.patterns = patterns or [
'*.skill.yaml',
'*.skill.yml',
'skill.yaml',
'skill.yml',
'**/.opencode/skills/*.yaml',
'**/.opencode/skills/*.yml',
'**/.claude/skills/*.yaml',
'**/.claude/skills/*.yml'
]
self.recursive = recursive
self.ignored_patterns = [
'**/node_modules/**',
'**/.git/**',
'**/.svn/**',
'**/__pycache__/**',
'**/*.pyc',
'**/.DS_Store'
]
async def can_handle(self, path: str) -> bool:
"""检查是否能处理该路径"""
path_obj = Path(path)
return path_obj.exists() and (path_obj.is_file() or path_obj.is_dir())
async def discover(self, path: str) -> List[DiscoveryItem]:
"""发现文件系统中的技能"""
items = []
path_obj = Path(path)
if path_obj.is_file():
# 单个文件
if self._matches_patterns(path_obj):
items.append(self._create_item(path_obj))
elif path_obj.is_dir():
# 目录
if self.recursive:
items.extend(await self._scan_directory_recursive(path_obj))
else:
items.extend(await self._scan_directory(path_obj))
return items
async def _scan_directory_recursive(self, directory: Path) -> List[DiscoveryItem]:
"""递归扫描目录"""
items = []
try:
for root, dirs, files in os.walk(directory):
# 跳过忽略的目录
dirs[:] = [d for d in dirs if not self._should_ignore(Path(root) / d)]
# 扫描文件
for file in files:
file_path = Path(root) / file
if self._matches_patterns(file_path) and not self._should_ignore(file_path):
items.append(self._create_item(file_path))
except Exception as e:
logger.error(f"Error scanning directory {directory}: {e}")
return items
async def _scan_directory(self, directory: Path) -> List[DiscoveryItem]:
"""非递归扫描目录"""
items = []
try:
for item in directory.iterdir():
if item.is_file() and self._matches_patterns(item) and not self._should_ignore(item):
items.append(self._create_item(item))
except Exception as e:
logger.error(f"Error scanning directory {directory}: {e}")
return items
def _matches_patterns(self, path: Path) -> bool:
"""检查路径是否匹配模式"""
path_str = str(path)
for pattern in self.patterns:
if fnmatch.fnmatch(path_str, pattern):
return True
return False
def _should_ignore(self, path: Path) -> bool:
"""检查是否应该忽略路径"""
path_str = str(path)
for pattern in self.ignored_patterns:
if fnmatch.fnmatch(path_str, pattern):
return True
return False
def _create_item(self, path: Path) -> DiscoveryItem:
"""创建发现项"""
return DiscoveryItem(
path=str(path),
type='file',
discovered_at=datetime.now(),
metadata={
'size': path.stat().st_size if path.exists() else 0,
'modified': path.stat().st_mtime if path.exists() else 0,
'is_file': path.is_file()
},
priority=self.priority
)
class GitRepositoryDiscovery(DiscoveryStrategy):
"""Git仓库发现策略"""
def __init__(self,
clone_temp_dir: Optional[str] = None,
priority: int = 20):
super().__init__(priority)
self.clone_temp_dir = clone_temp_dir or '/tmp/opencode_git_clones'
Path(self.clone_temp_dir).mkdir(parents=True, exist_ok=True)
async def can_handle(self, path: str) -> bool:
"""检查是否能处理该路径"""
return (
path.startswith('http://') or
path.startswith('https://') or
path.startswith('git@') or
(Path(path).exists() and Path(path) / '.git' in Path(path).iterdir())
)
async def discover(self, path: str) -> List[DiscoveryItem]:
"""发现Git仓库中的技能"""
items = []
try:
# 检查是否是本地Git仓库
if Path(path).exists() and (Path(path) / '.git').exists():
# 本地仓库
items.extend(await self._discover_local_repo(path))
else:
# 远程仓库,需要克隆
cloned_path = await self._clone_repository(path)
if cloned_path:
items.extend(await self._discover_local_repo(cloned_path))
except Exception as e:
logger.error(f"Error discovering Git repository {path}: {e}")
return items
async def _discover_local_repo(self, repo_path: str) -> List[DiscoveryItem]:
"""发现本地Git仓库中的技能"""
items = []
# 使用文件系统发现器扫描仓库
fs_discovery = FileSystemDiscovery(recursive=True)
# 扫描技能文件
skill_items = await fs_discovery.discover(repo_path)
for item in skill_items:
# 添加Git特定元数据
item.metadata.update({
'source_type': 'git',
'repository': repo_path
})
items.append(item)
return items
async def _clone_repository(self, repo_url: str) -> Optional[str]:
"""克隆Git仓库"""
import subprocess
import uuid
# 生成唯一的目录名
repo_name = repo_url.split('/')[-1].replace('.git', '')
clone_dir = Path(self.clone_temp_dir) / f"{repo_name}_{uuid.uuid4().hex[:8]}"
try:
# 克隆仓库
cmd = ['git', 'clone', '--depth', '1', repo_url, str(clone_dir)]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
logger.info(f"Cloned repository {repo_url} to {clone_dir}")
return str(clone_dir)
else:
logger.error(f"Failed to clone repository {repo_url}: {stderr.decode()}")
return None
except Exception as e:
logger.error(f"Error cloning repository {repo_url}: {e}")
return None
class RemoteRegistryDiscovery(DiscoveryStrategy):
"""远程注册表发现策略"""
def __init__(self,
registry_url: str = "https://registry.opencode.ai",
priority: int = 30):
super().__init__(priority)
self.registry_url = registry_url.rstrip('/')
self.api_client = None
async def can_handle(self, path: str) -> bool:
"""检查是否能处理该路径"""
return path.startswith(self.registry_url) or path == 'registry'
async def discover(self, path: str) -> List[DiscoveryItem]:
"""发现远程注册表中的技能"""
items = []
try:
if path == 'registry':
# 发现所有技能
skills = await self._fetch_all_skills()
else:
# 特定注册表URL
skills = await self._fetch_from_url(path)
for skill_data in skills:
item = DiscoveryItem(
path=f"{self.registry_url}/skills/{skill_data['id']}",
type='remote',
discovered_at=datetime.now(),
metadata=skill_data,
priority=self.priority
)
items.append(item)
except Exception as e:
logger.error(f"Error discovering from remote registry {path}: {e}")
return items
async def _fetch_all_skills(self) -> List[Dict[str, Any]]:
"""获取所有技能"""
# 这里实现API调用
# 实际实现应该使用HTTP客户端
import aiohttp
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.registry_url}/api/v1/skills") as response:
if response.status == 200:
data = await response.json()
return data.get('skills', [])
except Exception as e:
logger.error(f"Error fetching skills from registry: {e}")
return []
async def _fetch_from_url(self, url: str) -> List[Dict[str, Any]]:
"""从特定URL获取技能"""
# 实现特定URL的获取逻辑
return []
class DiscoveryEngine:
"""发现引擎"""
def __init__(self, max_concurrent: int = 5):
self.strategies: List[DiscoveryStrategy] = []
self.max_concurrent = max_concurrent
self._semaphore = asyncio.Semaphore(max_concurrent)
# 注册默认策略
self._register_default_strategies()
def _register_default_strategies(self):
"""注册默认策略"""
self.register_strategy(FileSystemDiscovery())
self.register_strategy(GitRepositoryDiscovery())
self.register_strategy(RemoteRegistryDiscovery())
def register_strategy(self, strategy: DiscoveryStrategy):
"""注册发现策略"""
self.strategies.append(strategy)
# 按优先级排序
self.strategies.sort(key=lambda s: s.get_priority(), reverse=True)
def unregister_strategy(self, strategy_name: str):
"""取消注册发现策略"""
self.strategies = [s for s in self.strategies if s.name != strategy_name]
async def discover(self,
paths: List[str],
recursive: bool = True) -> List[DiscoveryItem]:
"""
发现技能
Args:
paths: 要扫描的路径列表
recursive: 是否递归扫描
Returns:
发现项列表
"""
all_items = []
# 为每个路径创建发现任务
tasks = []
for path in paths:
task = asyncio.create_task(
self._discover_path(path, recursive)
)
tasks.append(task)
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 收集结果
for result in results:
if isinstance(result, Exception):
logger.error(f"Discovery error: {result}")
elif isinstance(result, list):
all_items.extend(result)
# 去重(基于路径)
unique_items = {}
for item in all_items:
if item.path not in unique_items or item.priority > unique_items[item.path].priority:
unique_items[item.path] = item
return list(unique_items.values())
async def _discover_path(self, path: str, recursive: bool) -> List[DiscoveryItem]:
"""发现单个路径"""
items = []
# 查找能够处理该路径的策略
for strategy in self.strategies:
async with self._semaphore:
if await strategy.can_handle(path):
try:
discovered = await strategy.discover(path)
items.extend(discovered)
break # 使用第一个能处理的策略
except Exception as e:
logger.error(f"Strategy {strategy.name} failed for {path}: {e}")
return items
async def watch(self,
path: str,
callback: callable,
poll_interval: int = 5) -> 'WatchHandle':
"""
监视路径变化
Args:
path: 要监视的路径
callback: 变化时的回调函数
poll_interval: 轮询间隔(秒)
Returns:
监视句柄
"""
from .watcher import DirectoryWatcher
watcher = DirectoryWatcher(
path=path,
callback=callback,
poll_interval=poll_interval
)
# 启动监视
await watcher.start()
return WatchHandle(watcher)
async def batch_discover(self,
config: Dict[str, Any]) -> Dict[str, List[DiscoveryItem]]:
"""
批量发现
Args:
config: 配置字典
Returns:
按类型分组的结果
"""
results = {}
# 解析配置
paths = config.get('paths', [])
strategies = config.get('strategies', ['filesystem', 'git', 'remote'])
# 过滤策略
filtered_strategies = [
s for s in self.strategies
if s.name.lower() in [strat.lower() for strat in strategies]
]
# 使用过滤后的策略进行发现
original_strategies = self.strategies
self.strategies = filtered_strategies
try:
# 执行发现
items = await self.discover(paths)
# 按类型分组
for item in items:
if item.type not in results:
results[item.type] = []
results[item.type].append(item)
finally:
# 恢复原始策略
self.strategies = original_strategies
return results
@dataclass
class WatchHandle:
"""监视句柄"""
watcher: Any
async def stop(self):
"""停止监视"""
await self.watcher.stop()
5. 数据模型定义
# registry/models.py
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
from enum import Enum
@dataclass
class SearchHit:
"""搜索结果命中"""
skill_id: str
skill: 'RegisteredSkill'
score: float
highlights: Dict[str, List[str]] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
'skill_id': self.skill_id,
'score': self.score,
'highlights': self.highlights,
'skill': self.skill.to_dict() if self.skill else None
}
@dataclass
class SearchResult:
"""搜索结果"""
hits: List[SearchHit]
total_hits: int
aggregations: Dict[str, Any] = field(default_factory=dict)
suggestions: List['Suggestion'] = field(default_factory=list)
max_score: float = 0.0
def to_dict(self) -> Dict[str, Any]:
return {
'hits': [hit.to_dict() for hit in self.hits],
'total_hits': self.total_hits,
'max_score': self.max_score,
'aggregations': self.aggregations,
'suggestions': [s.to_dict() for s in self.suggestions]
}
@dataclass
class Suggestion:
"""搜索建议"""
text: str
type: str # 'skill_name', 'category', 'tag', 'description'
score: float = 1.0
def to_dict(self) -> Dict[str, Any]:
return {
'text': self.text,
'type': self.type,
'score': self.score
}
@dataclass
class DiscoveryResult:
"""发现结果"""
registered: List[str] = field(default_factory=list) # 已注册的技能ID
successful: int = 0
failed: int = 0
errors: List[Dict[str, Any]] = field(default_factory=list)
total_found: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
'registered': self.registered,
'successful': self.successful,
'failed': self.failed,
'errors': self.errors,
'total_found': self.total_found
}
@dataclass
class RegistryExport:
"""注册表导出"""
data: Dict[str, Any]
format: str = 'json'
exported_at: datetime = field(default_factory=datetime.now)
def save(self, path: str):
"""保存到文件"""
import json
from pathlib import Path
export_data = {
'data': self.data,
'format': self.format,
'exported_at': self.exported_at.isoformat(),
'version': '1.0.0'
}
path_obj = Path(path)
path_obj.parent.mkdir(parents=True, exist_ok=True)
with open(path_obj, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, default=str)
@classmethod
def load(cls, path: str) -> 'RegistryExport':
"""从文件加载"""
import json
from pathlib import Path
path_obj = Path(path)
if not path_obj.exists():
raise FileNotFoundError(f"Export file not found: {path}")
with open(path_obj, 'r', encoding='utf-8') as f:
data = json.load(f)
return cls(
data=data['data'],
format=data.get('format', 'json'),
exported_at=datetime.fromisoformat(data['exported_at'])
)
三、配置与使用
1. 配置文件示例
# config/registry_config.yaml
registry:
# 存储设置
storage:
path: "~/.opencode/registry"
backup_count: 5
auto_save: true
save_interval: 300 # 秒
# 发现设置
discovery:
paths:
- "~/.opencode/skills"
- "~/.claude/skills"
- "./skills"
- "./.opencode/skills"
- "https://github.com/opencode-skills/official-skills"
strategies:
- filesystem
- git
- remote
recursive: true
watch_for_changes: true
watch_interval: 30
# 索引设置
indexing:
fulltext_enabled: true
category_hierarchy_enabled: true
tag_analysis_enabled: true
# 全文索引配置
fulltext:
analyzer: "standard"
stopwords: ["a", "an", "the", "and", "or", "but"]
boost_fields:
name: 2.0
description: 1.5
tags: 1.2
categories: 1.0
# 缓存设置
cache:
enabled: true
ttl: 300 # 秒
max_size: 1000
strategy: "lru"
# 搜索设置
search:
default_limit: 20
max_limit: 100
suggest_enabled: true
highlight_enabled: true
# 权重设置
weights:
name: 10.0
description: 5.0
tags: 3.0
categories: 2.0
content: 1.0
# 分类系统
categories:
predefined:
- name: "git"
description: "Git operations and workflows"
parent: null
- name: "docker"
description: "Docker container operations"
parent: null
- name: "kubernetes"
description: "Kubernetes cluster operations"
parent: null
- name: "ci-cd"
description: "Continuous integration and deployment"
parent: null
children:
- "jenkins"
- "github-actions"
- "gitlab-ci"
allow_custom: true
max_custom_categories: 10
# 标签系统
tags:
max_per_skill: 15
blacklist:
- "deprecated"
- "broken"
- "test-only"
suggestions:
enabled: true
min_length: 2
max_suggestions: 10
# 统计和监控
monitoring:
enabled: true
track_usage: true
report_interval: 3600 # 秒
metrics:
- "registrations"
- "searches"
- "executions"
- "errors"
2. 使用示例
# examples/registry_usage.py
import asyncio
from pathlib import Path
import yaml
from registry.core import SkillRegistry, SkillSource
from registry.discovery.engine import DiscoveryEngine
from parsers.skill_parser import SkillDSLParser
async def main():
"""使用示例"""
# 1. 创建注册表
registry = SkillRegistry(
storage_path=Path.home() / '.opencode' / 'registry_v2'
)
# 2. 加载配置
config_path = Path('config/registry_config.yaml')
if config_path.exists():
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# 配置发现路径
discovery_paths = config['registry']['discovery']['paths']
else:
discovery_paths = [
str(Path.home() / '.opencode' / 'skills'),
'./skills'
]
# 3. 发现并注册技能
print("Discovering skills...")
result = await registry.discover(discovery_paths)
print(f"Discovery complete:")
print(f" Found: {result.total_found}")
print(f" Registered: {result.successful}")
print(f" Failed: {result.failed}")
if result.errors:
print(f" Errors: {len(result.errors)}")
for error in result.errors[:3]: # 显示前3个错误
print(f" - {error['path']}: {error['error']}")
# 4. 查看统计
stats = registry.get_stats()
print(f"\nRegistry statistics:")
print(f" Total skills: {stats['total_skills']}")
print(f" Categories: {stats['categories_count']}")
print(f" Tags: {stats['tags_count']}")
# 5. 搜索示例
print("\nSearching for 'git' skills...")
search_result = registry.search("git", limit=5)
print(f"Found {search_result.total_hits} skills:")
for i, hit in enumerate(search_result.hits, 1):
skill = hit.skill
print(f" {i}. {skill.skill.metadata.name} v{skill.skill.metadata.version}")
print(f" {skill.skill.metadata.description}")
if skill.tags:
print(f" Tags: {', '.join(sorted(skill.tags)[:5])}")
print()
# 6. 按分类浏览
print("Categories available:")
categories = registry.get_all_categories()
for category in sorted(categories)[:10]: # 显示前10个分类
skills = registry.get_by_category(category)
print(f" - {category}: {len(skills)} skills")
# 7. 按标签浏览
print("\nPopular tags:")
popular_tags = registry.tag_index.get_popular_tags(limit=10)
for tag, count in popular_tags:
print(f" - #{tag}: {count} skills")
# 8. 导出注册表
print("\nExporting registry...")
export = registry.export(include_skills=True)
export.save('registry_export.json')
print(f"Exported to registry_export.json")
# 9. 手动注册技能示例
print("\nManual registration example:")
# 创建技能解析器
parser = SkillDSLParser()
# 解析技能文件
skill_content = """---
metadata:
name: manual-test-skill
version: 1.0.0
description: A manually registered test skill
categories: [test, example]
tags: [manual, test, example]
author: Test User
created_at: "2024-01-01T00:00:00Z"
steps:
- name: test_step
type: command
description: Test step
parameters:
command: echo
args: ["Hello from manual skill!"]
---
"""
skill = parser.parse(skill_content)
# 注册技能
registered = registry.register(
skill=skill,
source_path="/manual/test.skill.yaml",
source_type=SkillSource.USER
)
print(f"Manually registered: {registered.skill.metadata.name}")
# 10. 刷新注册表(重新索引)
print("\nRefreshing registry...")
refresh_result = registry.refresh()
print(f"Reindexed: {refresh_result['reindexed']}")
print(f"Removed: {refresh_result['removed']}")
print(f"Errors: {refresh_result['errors']}")
if __name__ == "__main__":
asyncio.run(main())
3. REST API 接口
# api/registry_api.py
from fastapi import FastAPI, HTTPException, Query, Depends
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Optional
import uvicorn
from registry.core import SkillRegistry, SkillStatus
from registry.models import SearchResult, RegistryExport
app = FastAPI(
title="OpenCode Skill Registry API",
description="API for managing and searching OpenCode skills",
version="1.0.0"
)
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 全局注册表实例
_registry = None
def get_registry():
"""获取注册表实例"""
global _registry
if _registry is None:
_registry = SkillRegistry()
return _registry
@app.get("/")
async def root():
"""API根路径"""
return {
"service": "OpenCode Skill Registry",
"version": "1.0.0",
"endpoints": [
"/skills",
"/skills/search",
"/categories",
"/tags",
"/stats"
]
}
@app.get("/skills")
async def get_skills(
registry: SkillRegistry = Depends(get_registry),
category: Optional[str] = None,
tag: Optional[str] = None,
status: Optional[str] = None,
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0)
):
"""获取技能列表"""
skills = []
# 应用过滤器
if category:
skills = registry.get_by_category(category)
elif tag:
skills = registry.get_by_tag(tag)
else:
# 获取所有技能
skills = list(registry._skills.values())
# 状态过滤
if status:
try:
status_enum = SkillStatus(status)
skills = [s for s in skills if s.status == status_enum]
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid status: {status}")
# 排序
skills.sort(key=lambda s: s.skill.metadata.name.lower())
# 分页
start = offset
end = start + limit
paged_skills = skills[start:end]
return {
"skills": [skill.to_dict() for skill in paged_skills],
"pagination": {
"total": len(skills),
"limit": limit,
"offset": offset,
"has_more": end < len(skills)
}
}
@app.get("/skills/{skill_id}")
async def get_skill(
skill_id: str,
registry: SkillRegistry = Depends(get_registry)
):
"""获取单个技能"""
skill = registry.find_by_id(skill_id)
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
return skill.to_dict()
@app.get("/skills/search")
async def search_skills(
query: str = Query(..., description="搜索查询"),
categories: Optional[List[str]] = Query(None),
tags: Optional[List[str]] = Query(None),
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
registry: SkillRegistry = Depends(get_registry)
):
"""搜索技能"""
result = registry.search(
query=query,
categories=categories,
tags=tags,
limit=limit,
offset=offset
)
return result.to_dict()
@app.get("/categories")
async def get_categories(
registry: SkillRegistry = Depends(get_registry)
):
"""获取所有分类"""
categories = registry.get_all_categories()
# 获取分类统计
category_stats = registry.category_index.get_category_stats()
# 构建层次树
hierarchy = registry.category_index.build_hierarchy_tree()
return {
"categories": sorted(categories),
"statistics": category_stats,
"hierarchy": hierarchy
}
@app.get("/categories/{category}/skills")
async def get_skills_by_category(
category: str,
registry: SkillRegistry = Depends(get_registry)
):
"""获取分类下的技能"""
skills = registry.get_by_category(category)
return {
"category": category,
"skills": [skill.to_dict() for skill in skills],
"count": len(skills)
}
@app.get("/tags")
async def get_tags(
registry: SkillRegistry = Depends(get_registry)
):
"""获取所有标签"""
tags = registry.get_all_tags()
# 获取标签云数据
tag_cloud = registry.tag_index.get_tag_cloud_data()
# 获取热门标签
popular_tags = registry.tag_index.get_popular_tags(limit=50)
return {
"tags": sorted(tags),
"tag_cloud": tag_cloud,
"popular_tags": popular_tags
}
@app.get("/tags/{tag}/skills")
async def get_skills_by_tag(
tag: str,
registry: SkillRegistry = Depends(get_registry)
):
"""获取标签下的技能"""
skills = registry.get_by_tag(tag)
# 获取相关标签
related_tags = registry.tag_index.get_related_tags(tag)
return {
"tag": tag,
"skills": [skill.to_dict() for skill in skills],
"count": len(skills),
"related_tags": list(related_tags)
}
@app.get("/stats")
async def get_stats(
registry: SkillRegistry = Depends(get_registry)
):
"""获取统计信息"""
stats = registry.get_stats()
# 添加详细统计
detailed_stats = {
**stats,
"category_distribution": registry.category_index.get_category_stats(),
"tag_distribution": dict(registry.tag_index.get_popular_tags(limit=20))
}
return detailed_stats
@app.post("/discover")
async def discover_skills(
paths: List[str],
recursive: bool = True,
registry: SkillRegistry = Depends(get_registry)
):
"""发现技能"""
result = await registry.discover(paths, recursive=recursive)
return {
"action": "discover",
"paths": paths,
"result": result.to_dict()
}
@app.post("/refresh")
async def refresh_registry(
registry: SkillRegistry = Depends(get_registry)
):
"""刷新注册表"""
result = registry.refresh()
return {
"action": "refresh",
"result": result
}
@app.get("/export")
async def export_registry(
include_skills: bool = False,
registry: SkillRegistry = Depends(get_registry)
):
"""导出注册表"""
export = registry.export(include_skills=include_skills)
return export.data
@app.post("/import")
async def import_registry(
export_data: dict,
registry: SkillRegistry = Depends(get_registry)
):
"""导入注册表"""
export = RegistryExport(data=export_data)
result = registry.import_export(export)
return {
"action": "import",
"result": result
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)
四、测试方案
1. 单元测试
# tests/test_registry.py
import pytest
import tempfile
import shutil
from pathlib import Path
import asyncio
from registry.core import SkillRegistry, SkillSource
from parsers.skill_parser import SkillDSLParser
class TestSkillRegistry:
@pytest.fixture
def temp_dir(self):
"""创建临时目录"""
dir_path = tempfile.mkdtemp()
yield Path(dir_path)
shutil.rmtree(dir_path)
@pytest.fixture
def skill_parser(self):
"""创建技能解析器"""
return SkillDSLParser()
@pytest.fixture
def sample_skill_content(self):
"""示例技能内容"""
return """---
metadata:
name: test-skill
version: 1.0.0
description: A test skill for unit testing
categories: [test, unit]
tags: [testing, unit-test]
author: Test User
steps:
- name: test_step
type: command
description: Test step
parameters:
command: echo
args: ["Hello, World!"]
---
"""
@pytest.fixture
def registry(self, temp_dir):
"""创建注册表"""
storage_path = temp_dir / "registry"
return SkillRegistry(storage_path=storage_path)
@pytest.mark.asyncio
async def test_register_skill(self, registry, skill_parser, sample_skill_content):
"""测试注册技能"""
# 解析技能
skill = skill_parser.parse(sample_skill_content)
# 注册技能
registered = registry.register(
skill=skill,
source_path="/test/path.skill.yaml",
source_type=SkillSource.LOCAL
)
# 验证注册
assert registered.id is not None
assert registered.skill.metadata.name == "test-skill"
assert registered.source_type == SkillSource.LOCAL
# 验证查找
found = registry.find_by_id(registered.id)
assert found is not None
assert found.skill.metadata.name == "test-skill"
@pytest.mark.asyncio
async def test_find_by_name(self, registry, skill_parser, sample_skill_content):
"""测试按名称查找"""
# 注册技能
skill = skill_parser.parse(sample_skill_content)
registered = registry.register(
skill=skill,
source_path="/test/path.skill.yaml",
source_type=SkillSource.LOCAL
)
# 按名称查找
results = registry.find_by_name("test-skill")
assert len(results) == 1
assert results[0].id == registered.id
# 大小写不敏感
results = registry.find_by_name("TEST-SKILL")
assert len(results) == 1
@pytest.mark.asyncio
async def test_unregister_skill(self, registry, skill_parser, sample_skill_content):
"""测试取消注册技能"""
# 注册技能
skill = skill_parser.parse(sample_skill_content)
registered = registry.register(
skill=skill,
source_path="/test/path.skill.yaml",
source_type=SkillSource.LOCAL
)
# 取消注册
success = registry.unregister(registered.id)
assert success is True
# 验证已取消注册
found = registry.find_by_id(registered.id)
assert found is None
# 验证名称索引
results = registry.find_by_name("test-skill")
assert len(results) == 0
@pytest.mark.asyncio
async def test_category_index(self, registry, skill_parser, sample_skill_content):
"""测试分类索引"""
# 注册技能
skill = skill_parser.parse(sample_skill_content)
registered = registry.register(
skill=skill,
source_path="/test/path.skill.yaml",
source_type=SkillSource.LOCAL
)
# 验证分类
categories = registry.get_all_categories()
assert "test" in categories
assert "unit" in categories
# 验证按分类获取技能
test_skills = registry.get_by_category("test")
assert len(test_skills) == 1
assert test_skills[0].id == registered.id
@pytest.mark.asyncio
async def test_tag_index(self, registry, skill_parser, sample_skill_content):
"""测试标签索引"""
# 注册技能
skill = skill_parser.parse(sample_skill_content)
registered = registry.register(
skill=skill,
source_path="/test/path.skill.yaml",
source_type=SkillSource.LOCAL
)
# 验证标签
tags = registry.get_all_tags()
assert "testing" in tags
assert "unit-test" in tags
# 验证按标签获取技能
testing_skills = registry.get_by_tag("testing")
assert len(testing_skills) == 1
assert testing_skills[0].id == registered.id
@pytest.mark.asyncio
async def test_search(self, registry, skill_parser, sample_skill_content):
"""测试搜索"""
# 注册技能
skill = skill_parser.parse(sample_skill_content)
registry.register(
skill=skill,
source_path="/test/path.skill.yaml",
source_type=SkillSource.LOCAL
)
# 搜索
result = registry.search("test", limit=10)
assert result.total_hits >= 1
# 验证搜索结果
assert len(result.hits) >= 1
assert result.hits[0].skill.skill.metadata.name == "test-skill"
@pytest.mark.asyncio
async def test_discovery(self, registry, temp_dir, skill_parser, sample_skill_content):
"""测试技能发现"""
# 创建技能文件
skills_dir = temp_dir / "skills"
skills_dir.mkdir()
skill_file = skills_dir / "test.skill.yaml"
skill_file.write_text(sample_skill_content)
# 发现技能
result = await registry.discover([str(skills_dir)])
# 验证发现结果
assert result.successful >= 1
assert result.total_found >= 1
# 验证已注册
stats = registry.get_stats()
assert stats['total_skills'] >= 1
@pytest.mark.asyncio
async def test_export_import(self, registry, skill_parser, sample_skill_content):
"""测试导出和导入"""
# 注册技能
skill = skill_parser.parse(sample_skill_content)
registered = registry.register(
skill=skill,
source_path="/test/path.skill.yaml",
source_type=SkillSource.LOCAL
)
# 导出
export = registry.export(include_skills=True)
# 创建新注册表并导入
new_registry = SkillRegistry()
import_result = new_registry.import_export(export)
# 验证导入结果
assert import_result['imported'] >= 1
# 验证技能已导入
imported = new_registry.find_by_name("test-skill")
assert len(imported) >= 1
def test_stats(self, registry, skill_parser, sample_skill_content):
"""测试统计"""
# 初始统计
initial_stats = registry.get_stats()
assert initial_stats['total_skills'] == 0
# 注册技能
skill = skill_parser.parse(sample_skill_content)
registry.register(
skill=skill,
source_path="/test/path.skill.yaml",
source_type=SkillSource.LOCAL
)
# 验证更新后的统计
updated_stats = registry.get_stats()
assert updated_stats['total_skills'] == 1
@pytest.mark.asyncio
async def test_refresh(self, registry, skill_parser, sample_skill_content, temp_dir):
"""测试刷新"""
# 创建技能文件
skill_file = temp_dir / "test.skill.yaml"
skill_file.write_text(sample_skill_content)
# 注册技能
skill = skill_parser.parse(sample_skill_content)
registered = registry.register(
skill=skill,
source_path=str(skill_file),
source_type=SkillSource.LOCAL
)
# 修改技能文件
modified_content = sample_skill_content.replace(
"A test skill for unit testing",
"A modified test skill"
)
skill_file.write_text(modified_content)
# 刷新注册表
refresh_result = registry.refresh()
# 验证刷新结果
assert refresh_result['reindexed'] >= 1
# 验证技能已更新
updated = registry.find_by_id(registered.id)
assert updated.skill.metadata.description == "A modified test skill"
if __name__ == "__main__":
pytest.main([__file__, "-v"])
五、部署方案
1. Docker容器化
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
git \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY registry/ ./registry/
COPY parsers/ ./parsers/
COPY api/ ./api/
# 创建数据目录
RUN mkdir -p /data/registry
RUN mkdir -p /data/skills
# 环境变量
ENV REGISTRY_STORAGE_PATH=/data/registry
ENV SKILL_PATHS=/data/skills
ENV PYTHONPATH=/app
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "-m", "api.registry_api"]
2. Docker Compose配置
# docker-compose.yml
version: '3.8'
services:
registry-api:
build: .
ports:
- "8000:8000"
environment:
- REGISTRY_STORAGE_PATH=/data/registry
- SKILL_PATHS=/data/skills:/shared-skills
- LOG_LEVEL=INFO
volumes:
- registry-data:/data/registry
- shared-skills:/shared-skills
- ./local-skills:/data/skills:ro
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
registry-worker:
build: .
command: python -m registry.worker
environment:
- REGISTRY_STORAGE_PATH=/data/registry
- SKILL_PATHS=/data/skills:/shared-skills
- REDIS_URL=redis://redis:6379/0
volumes:
- registry-data:/data/registry
- shared-skills:/shared-skills
depends_on:
- registry-api
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
command: redis-server --appendonly yes
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- registry-api
restart: unless-stopped
volumes:
registry-data:
shared-skills:
redis-data:
3. Kubernetes部署
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: opencode-registry
namespace: opencode
labels:
app: opencode-registry
spec:
replicas: 3
selector:
matchLabels:
app: opencode-registry
template:
metadata:
labels:
app: opencode-registry
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
spec:
containers:
- name: registry-api
image: opencode/registry:latest
ports:
- containerPort: 8000
env:
- name: REGISTRY_STORAGE_PATH
value: /data/registry
- name: SKILL_PATHS
value: /data/skills:/shared-skills
- name: LOG_LEVEL
value: INFO
- name: REDIS_URL
value: redis://redis.opencode.svc.cluster.local:6379/0
volumeMounts:
- name: registry-data
mountPath: /data/registry
- name: shared-skills
mountPath: /shared-skills
- name: local-skills
mountPath: /data/skills
readOnly: true
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: registry-data
persistentVolumeClaim:
claimName: registry-data-pvc
- name: shared-skills
persistentVolumeClaim:
claimName: shared-skills-pvc
- name: local-skills
configMap:
name: builtin-skills
---
apiVersion: v1
kind: Service
metadata:
name: registry-service
namespace: opencode
spec:
selector:
app: opencode-registry
ports:
- port: 80
targetPort: 8000
protocol: TCP
name: http
- port: 443
targetPort: 8000
protocol: TCP
name: https
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: registry-hpa
namespace: opencode
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: opencode-registry
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
六、性能优化建议
1. 缓存策略优化
# registry/cache/optimized_cache.py
import asyncio
import time
from typing import Any, Optional
from collections import OrderedDict
import threading
class LRUCache:
"""LRU缓存实现"""
def __init__(self, max_size: int = 1000, ttl: int = 300):
self.max_size = max_size
self.ttl = ttl # 秒
self.cache = OrderedDict()
self.timestamps = {}
self.lock = threading.RLock()
def get(self, key: str) -> Optional[Any]:
"""获取缓存项"""
with self.lock:
if key not in self.cache:
return None
# 检查是否过期
if self._is_expired(key):
self._remove(key)
return None
# 移动到最近使用
value = self.cache.pop(key)
self.cache[key] = value
self.timestamps[key] = time.time()
return value
def set(self, key: str, value: Any):
"""设置缓存项"""
with self.lock:
# 如果已存在,先移除
if key in self.cache:
self._remove(key)
# 检查是否达到最大大小
if len(self.cache) >= self.max_size:
# 移除最旧的项
oldest_key = next(iter(self.cache))
self._remove(oldest_key)
# 添加新项
self.cache[key] = value
self.timestamps[key] = time.time()
def _remove(self, key: str):
"""移除缓存项"""
if key in self.cache:
del self.cache[key]
if key in self.timestamps:
del self.timestamps[key]
def _is_expired(self, key: str) -> bool:
"""检查是否过期"""
if key not in self.timestamps:
return True
elapsed = time.time() - self.timestamps[key]
return elapsed > self.ttl
def clear(self):
"""清空缓存"""
with self.lock:
self.cache.clear()
self.timestamps.clear()
def cleanup(self):
"""清理过期项"""
with self.lock:
expired_keys = []
for key in list(self.cache.keys()):
if self._is_expired(key):
expired_keys.append(key)
for key in expired_keys:
self._remove(key)
2. 异步批处理
# registry/async/batch_processor.py
import asyncio
from typing import List, Any, Callable, Dict
from concurrent.futures import ThreadPoolExecutor
import time
class BatchProcessor:
"""批处理器"""
def __init__(self,
batch_size: int = 100,
max_workers: int = 4,
flush_interval: float = 1.0):
self.batch_size = batch_size
self.max_workers = max_workers
self.flush_interval = flush_interval
self.buffer: List[Any] = []
self.buffer_lock = asyncio.Lock()
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.flush_task = None
self.is_running = False
async def start(self):
"""启动批处理器"""
if self.is_running:
return
self.is_running = True
self.flush_task = asyncio.create_task(self._flush_loop())
async def stop(self):
"""停止批处理器"""
if not self.is_running:
return
self.is_running = False
if self.flush_task:
self.flush_task.cancel()
try:
await self.flush_task
except asyncio.CancelledError:
pass
# 刷新剩余数据
await self.flush()
# 关闭执行器
self.executor.shutdown(wait=True)
async def add(self, item: Any):
"""添加项目到缓冲区"""
async with self.buffer_lock:
self.buffer.append(item)
# 如果达到批处理大小,立即刷新
if len(self.buffer) >= self.batch_size:
await self.flush()
async def flush(self):
"""刷新缓冲区"""
async with self.buffer_lock:
if not self.buffer:
return
# 获取当前缓冲区内容
batch = self.buffer.copy()
self.buffer.clear()
# 在单独的线程中处理批处理
await asyncio.get_event_loop().run_in_executor(
self.executor,
self._process_batch,
batch
)
def _process_batch(self, batch: List[Any]):
"""处理批处理(在单独线程中运行)"""
# 这里可以添加实际的批处理逻辑
# 例如:批量索引、批量存储等
pass
async def _flush_loop(self):
"""定期刷新循环"""
while self.is_running:
await asyncio.sleep(self.flush_interval)
await self.flush()
总结
这个技能注册表方案提供了完整的技能发现、索引和分类系统,具有以下特点:
核心功能:
- 灵活的发现机制:支持文件系统、Git仓库、远程注册表多种发现方式
- 强大的索引系统:分类索引、标签索引、全文搜索三位一体
- 智能分类和标签:支持层次分类、标签云、相关标签推荐
- 高性能搜索:支持全文搜索、过滤、聚合、建议
- 实时更新:支持文件监视和自动刷新
高级特性:
- 持久化存储:技能注册表状态可持久化保存和恢复
- 导入导出:支持完整的注册表导入导出
- 统计监控:详细的统计信息和监控指标
- REST API:完整的HTTP API接口
- 可扩展架构:模块化设计,易于扩展新的发现策略和索引类型
生产就绪:
- 线程安全:所有操作都经过适当的同步
- 错误处理:完善的错误处理和日志记录
- 性能优化:缓存、批处理、异步处理
- 容器化部署:支持Docker和Kubernetes部署
- 配置驱动:灵活的配置文件支持
这个方案可以满足OpenCode技能系统的基础需求,并为未来的扩展提供了坚实的基础。
更多推荐
所有评论(0)