1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| from typing import Dict
class FinanceAssistantSkill: """财务智能助手 Skill""" def __init__(self): self.db = Database() self.ocr_service = OCRService() self.invoice_service = InvoiceService(self.ocr_service, self.db) self.report_service = ReportService(self.db, './templates') def handle_message(self, message: str, context: Dict) -> str: """处理用户消息""" intent = self._classify_intent(message) if intent == 'recognize_invoice': return "请上传发票图片,我将为您识别发票信息。" elif intent == 'generate_report': return self._handle_report_request(message) elif intent == 'query_invoice': return self._handle_invoice_query(message) elif intent == 'statistics': return self._handle_statistics(message) else: return self._handle_qa(message) def handle_file(self, file_path: str, context: Dict) -> str: """处理文件上传""" result = self.invoice_service.recognize_invoice(file_path) if result['validation']['is_valid']: return f""" 发票识别成功! - 发票代码:{result.get('invoice_code')} - 发票号码:{result.get('invoice_number')} - 开票日期:{result.get('date')} - 销售方:{result.get('seller_name')} - 金额:¥{result.get('amount')} - 税额:¥{result.get('tax')} - 价税合计:¥{result.get('total')} """.strip() else: errors = '\n'.join(result['validation']['errors']) return f"发票识别完成,但存在以下问题:\n{errors}" def _classify_intent(self, message: str) -> str: """意图分类""" keywords = { 'recognize_invoice': ['识别发票', '录入发票', '发票识别'], 'generate_report': ['生成报表', '报表', '资产负债表', '利润表'], 'query_invoice': ['查询发票', '查找发票', '发票查询'], 'statistics': ['统计', '汇总', '分析'] } for intent, words in keywords.items(): if any(word in message for word in words): return intent return 'qa' def _handle_report_request(self, message: str) -> str: """处理报表请求""" import re year_match = re.search(r'(\d{4})年', message) month_match = re.search(r'(\d{1,2})月', message) year = int(year_match.group(1)) if year_match else 2024 month = int(month_match.group(1)) if month_match else 1 if '资产负债' in message: report_path = self.report_service.generate_balance_sheet(year, month) return f"资产负债表已生成:{report_path}" elif '利润' in message: report_path = self.report_service.generate_income_statement(year, month) return f"利润表已生成:{report_path}" else: return "请指定要生成的报表类型(资产负债表/利润表)" def _handle_invoice_query(self, message: str) -> str: """处理发票查询""" invoices = self.invoice_service.query_invoices(limit=10) if not invoices: return "未找到发票记录" result = "最近的发票记录:\n" for inv in invoices: result += f"- {inv.get('date')} {inv.get('seller_name')} ¥{inv.get('amount')}\n" return result def _handle_statistics(self, message: str) -> str: """处理统计请求""" import re month_match = re.search(r'(\d{4})年(\d{1,2})月', message) if month_match: year = int(month_match.group(1)) month = int(month_match.group(2)) month_str = f"{year}-{month:02d}" else: month_str = datetime.now().strftime('%Y-%m') stats = self.invoice_service.get_statistics(month_str) return f""" {month_str} 发票统计: - 发票数量:{stats.get('count', 0)} 张 - 总金额:¥{stats.get('total_amount', 0):,.2f} - 总税额:¥{stats.get('total_tax', 0):,.2f} - 价税合计:¥{stats.get('total_with_tax', 0):,.2f} """.strip() def _handle_qa(self, message: str) -> str: """处理问答""" faq = { '怎么使用': '您可以上传发票图片让我识别,或者让我生成财务报表。', '支持哪些功能': '目前支持发票识别、报表生成、发票查询和统计分析。', '数据安全吗': '您的数据仅保存在本地,不会上传到云端。' } for key, value in faq.items(): if key in message: return value return "抱歉,我不太理解您的问题。您可以问"怎么使用"或"支持哪些功能"。"
|