第21讲:性能优化与监控

第21讲:性能优化与监控

掌握 Skill 的性能优化和监控技巧,确保 Skill 高效稳定运行。

一、性能瓶颈分析

1.1 常见性能问题

问题类型表现原因
响应慢用户等待时间长同步处理、大文件操作
内存高内存占用持续增长内存泄漏、大对象缓存
CPU 高处理卡顿复杂计算、循环处理
超时请求超时失败外部 API 慢、大文件处理

1.2 性能分析工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
import functools
import tracemalloc
from typing import Callable

class PerformanceProfiler:
"""性能分析器"""

@staticmethod
def timer(func: Callable) -> Callable:
"""计时装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} 耗时: {elapsed:.3f}s")
return result
return wrapper

@staticmethod
def memory_tracker(func: Callable) -> Callable:
"""内存追踪装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
tracemalloc.start()
result = func(*args, **kwargs)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"{func.__name__} 内存使用: {current/1024/1024:.2f}MB, 峰值: {peak/1024/1024:.2f}MB")
return result
return wrapper

二、响应时间优化

2.1 异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor

class AsyncSkillProcessor:
"""异步处理器"""

def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10)

async def process_async(self, tasks: list) -> list:
"""异步处理多个任务"""
loop = asyncio.get_event_loop()

# 将同步任务转为异步
futures = [
loop.run_in_executor(self.executor, task)
for task in tasks
]

results = await asyncio.gather(*futures, return_exceptions=True)
return results

async def fetch_urls(self, urls: list) -> list:
"""异步获取多个 URL"""
async with aiohttp.ClientSession() as session:
tasks = [self._fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)

async def _fetch_one(self, session: aiohttp.ClientSession, url: str):
"""获取单个 URL"""
async with session.get(url, timeout=30) as response:
return await response.text()

2.2 流式处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class StreamingProcessor:
"""流式处理器"""

def process_large_file(self, file_path: str, chunk_size: int = 8192):
"""流式处理大文件"""
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield self.process_chunk(chunk)

def process_chunk(self, chunk: bytes) -> bytes:
"""处理数据块"""
# 实现具体处理逻辑
return chunk

三、缓存策略

3.1 多级缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import functools
import hashlib
import time
from typing import Any, Optional

class CacheManager:
"""缓存管理器"""

def __init__(self):
self.memory_cache = {}
self.cache_stats = {'hits': 0, 'misses': 0}

def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
if key in self.memory_cache:
entry = self.memory_cache[key]
if entry['expiry'] > time.time():
self.cache_stats['hits'] += 1
return entry['value']
else:
del self.memory_cache[key]

self.cache_stats['misses'] += 1
return None

def set(self, key: str, value: Any, ttl: int = 300):
"""设置缓存"""
self.memory_cache[key] = {
'value': value,
'expiry': time.time() + ttl
}

def memoize(self, ttl: int = 300):
"""缓存装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
key = self._generate_key(func.__name__, args, kwargs)

# 尝试获取缓存
cached = self.get(key)
if cached is not None:
return cached

# 执行函数并缓存结果
result = func(*args, **kwargs)
self.set(key, result, ttl)
return result
return wrapper
return decorator

def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""生成缓存键"""
key_data = f"{func_name}:{str(args)}:{str(kwargs)}"
return hashlib.md5(key_data.encode()).hexdigest()

# 使用示例
cache = CacheManager()

@cache.memoize(ttl=600)
def expensive_operation(param: str) -> str:
"""耗时操作"""
time.sleep(2)
return f"Result for {param}"

四、监控告警

4.1 性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import statistics
from dataclasses import dataclass
from typing import List

@dataclass
class Metric:
"""指标数据"""
timestamp: float
name: str
value: float
labels: dict

class PerformanceMonitor:
"""性能监控器"""

def __init__(self, storage):
self.storage = storage
self.metrics_buffer = []

def record(self, name: str, value: float, labels: dict = None):
"""记录指标"""
metric = Metric(
timestamp=time.time(),
name=name,
value=value,
labels=labels or {}
)
self.metrics_buffer.append(metric)

# 批量保存
if len(self.metrics_buffer) >= 100:
self._flush_metrics()

def _flush_metrics(self):
"""刷新指标到存储"""
for metric in self.metrics_buffer:
key = f"metrics:{metric.name}:{int(metric.timestamp)}"
self.storage.save(key, {
'timestamp': metric.timestamp,
'value': metric.value,
'labels': metric.labels
})
self.metrics_buffer.clear()

def get_statistics(self, name: str, hours: int = 24) -> dict:
"""获取统计信息"""
# 查询最近数据
metrics = self._query_metrics(name, hours)

if not metrics:
return {}

values = [m['value'] for m in metrics]

return {
'count': len(values),
'mean': statistics.mean(values),
'median': statistics.median(values),
'min': min(values),
'max': max(values),
'p95': sorted(values)[int(len(values) * 0.95)],
'p99': sorted(values)[int(len(values) * 0.99)]
}

五、实战练习

练习 1:性能分析

为一个 Skill 添加性能分析,找出瓶颈。

练习 2:缓存实现

实现一个缓存系统,提升响应速度。

练习 3:监控告警

搭建监控告警系统,及时发现性能问题。

六、下节预告

下一章我们将进入 综合项目实战,通过完整项目巩固所学知识。


加入学习群

👉 加入AI编程学习交流群

点击加入


本讲是《AI Skills 从入门到实践》系列课程的第21讲。

🎓 AI 编程实战课程

想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!