1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| ''' 读取渠道履约提供的标签库,导入到知识库中 ''' import json import chardet import requests import pandas as pd class CommodityKnowledge: api_key = "dataset-tkRjxM0CmA4Vot3Y4K79xxx" dataset_url = "http://172.xxx/v1/datasets/" dataset_name = "商品标签库" dataset_id = "7d99fecf-39cb-48f4-885b-8b1eb74xxx" def __init__(self): pass def create_by_text(self, name, text): url = f"{self.dataset_url}{self.dataset_id}/document/create_by_text" payload = json.dumps({ "name": name, "text": text, "indexing_technique": "high_quality", "process_rule": { "mode": "custom", "rules": { "pre_processing_rules": [ { "id": "remove_extra_spaces", "enabled": True } ], "segmentation": { "separator": "\n", "max_tokens": 1000 } } } }) headers = { 'Authorization': f'Bearer {self.api_key}', 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) result_json = json.loads(response.text) document_id = result_json["document"]["id"] return document_id def add_segments(self, text, document_id): url = f"{self.dataset_url}{self.dataset_id}/documents/{document_id}/segments" payload = json.dumps({ "segments": [ { "content": text, "keywords": [] } ] }) headers = { 'Authorization': f'Bearer {self.api_key}', 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) print(response.text) def detect_encoding(self, file_path): with open(file_path, 'rb') as f: result = chardet.detect(f.read()) return result['encoding'] def read_label_file(self, file_path): df = pd.read_excel(file_path, engine='openpyxl') data = "" for index, row in df.iterrows(): name = f"{row['类别']}-{row['标签']}" document_id = "" label_name = row['标签'] label_category = row['类别'] label_description = row['标签描述'] label_keywords = self.analyze_keywords(row['关键词']) label_type = row['标签分类'] for keyword in label_keywords: if document_id == "": row_data = self.build_document_block(label_name, label_category, label_description, keyword, label_type) document_id = self.create_by_text(name, row_data) else: row_data = self.build_document_block(label_name, label_category, label_description, keyword, label_type) self.add_segments(row_data, document_id) return data def build_document_block(self, label_name, label_category, label_description, label_keywords, label_type): block = { "label_name": f"{label_name}", "label_category": f"{label_category}", "label_description": f"{label_description}", "label_keywords": f"{label_keywords}", "label_type": f"{label_type}" } print(block) return block def analyze_keywords(self, label_keywords): if isinstance(label_keywords, int) or isinstance(label_keywords, float): return [str(label_keywords)] if pd.isna(label_keywords) or label_keywords == None or label_keywords == '': return ["无"] new_keywords_list = [] if len(label_keywords) > 500: for i in range(0, len(label_keywords), 500): start = i if i != 0: start = i - 20 end = i + 500 segment = label_keywords[start:end] new_keywords_list.append(segment) else: new_keywords_list.append(label_keywords) return new_keywords_list if __name__ == '__main__': commodity_knowledge = CommodityKnowledge() file_path = "C:\\Users\\xxx.xxx\\Desktop\\商品评价分类及其关键词.xlsx" text = commodity_knowledge.read_label_file(file_path) print(text)
|