File size: 6,669 Bytes
d2e6da0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
import re
import numpy as np
import tensorflow_hub as hub
import openai
import os
import tensorflow_text
from sklearn.neighbors import NearestNeighbors
import gradio as gr
import requests
import json
import fitz
#这里填写调用openai需要的密钥
openai.api_key = '9481961416fa4c8e883047c5679cf971'
openai.api_base = 'https://demopro-oai-we2.openai.azure.com/'
openai.api_type = 'azure'
openai.api_version = '2022-12-01'
#将嵌套的列表展平
def flatten(_2d_list):
flat_list = []
for element in _2d_list:
if type(element) is list:
for item in element:
flat_list.append(item)
else:
flat_list.append(element)
return flat_list
def preprocess(text):
text = text.replace('\n', ' ')
text = re.sub('\s+', ' ', text)
return text
#将pdf文档按段落分
# def pdf_to_text(path):
# doc = pdfplumber.open(path)
# pages = doc.pages
# text_list=[]
# for page,d in enumerate(pages):
# d=d.extract_text()
# d=preprocess(d)
# text_list.append(d)
# doc.close()
# return text_list
def pdf_to_text(path, start_page=1, end_page=None):
doc = fitz.open(path)
total_pages = doc.page_count
if end_page is None:
end_page = total_pages
text_list = []
for i in range(start_page - 1, end_page):
text = doc.load_page(i).get_text("text")
text = preprocess(text)
text_list.append(text)
doc.close()
return text_list
def text_to_chunks(texts, word_length=150, start_page=1):
text_toks = [t.split(' ') for t in texts]
page_nums = []
chunks = []
for idx, words in enumerate(text_toks):
for i in range(0, len(words), word_length):
chunk = words[i : i + word_length]
if (
(i + word_length) > len(words)
and (len(chunk) < word_length)
and (len(text_toks) != (idx + 1))
):
text_toks[idx + 1] = chunk + text_toks[idx + 1]
continue
chunk = ' '.join(chunk).strip()
chunk = f'[Page no. {idx+start_page}]' + ' ' + '"' + chunk + '"'
chunks.append(chunk)
return chunks
history=pdf_to_text('The Elements of Statisitcal Learning.pdf',start_page=20)
history=text_to_chunks(history,start_page=1)
def encoder(text):
embed=openai.Embedding.create(input=text, engine="text-embedding-ada-002")
return embed.get('data')[0].get('embedding')
#定义语义搜索类
class SemanticSearch:
def __init__(self):
#类初始化,使用google公司的多语言语句编码,第一次运行时需要十几分钟的时间下载
self.use =hub.load('https://tfhub.dev/google/universal-sentence-encoder-multilingual/3')
self.fitted = False
def get_text_embedding(self, texts, batch=1000):
embeddings = []
for i in range(0, len(texts), batch):
text_batch = texts[i : (i + batch)]
emb_batch = self.use(text_batch)
embeddings.append(emb_batch)
embeddings = np.vstack(embeddings)
return embeddings
#K近邻算法,找到与问题最相似的 k 个段落,这里的 k 即n_neighbors=10
def fit(self, data, batch=1000, n_neighbors=5):
self.data = data
self.embeddings = self.get_text_embedding(data, batch=batch)
n_neighbors = min(n_neighbors, len(self.embeddings))
self.nn = NearestNeighbors(n_neighbors=n_neighbors)
self.nn.fit(self.embeddings)
self.fitted = True
#定义了该方法后,实例就可以被当作函数调用,text参数即用户提出的问题,inp_emb为其转化成的向量
def __call__(self, text, return_data=True):
inp_emb = self.use([text])
neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0]
if return_data:
return [self.data[i] for i in neighbors]
else:
return neighbors
#openai的api接口,engine参数为我们选择的大语言模型,prompt即提示词
def generate_text(prompt, engine="text-davinci-003"):
completions = openai.Completion.create(
engine=engine,
prompt=prompt,
max_tokens=512,
n=1,
stop=None,
temperature=0.7,
)
message = completions.choices[0].text
return message
def generate_answer(question):
#匹配与问题最相近的n个段落,前面定义了n=10
topn_chunks = recommender(question)
prompt = ""
prompt += 'search results:\n\n'
#把匹配到的段落加进提示词
for c in topn_chunks:
prompt += c + '\n\n'
#提示词
prompt += '''
Instructions: 如果搜索结果中找不到相关信息,只需要回答'未在该文档中找到相关信息'。
如果找到了相关信息,请使用中文回答,回答尽量精确简洁。并在句子的末尾使用[七年级上册/七年级下册页码]符号引用每个参考文献(每个结果的开头都有这个编号)
如果不确定答案是否正确,就仅给出相似段落的来源,不要回复错误的答案。
\n\nQuery: {question}\nAnswer:
'''
prompt += f"Query: {question}\nAnswer:"
answer = generate_text(prompt,"text-davinci-003")
return answer
recommender = SemanticSearch()
recommender.fit(history)
#以下为web客户端搭建,运行后产生客户端界面
def ask_api(question):
if question.strip() == '':
return '[ERROR]: 未输入问题'
return generate_answer(question)
title = 'Chat With Statistical Learning'
description = """ 该机器人将以Trevor Hastie等人所著的The Elements of Statistical Learning Data Mining, Inference, and Prediction
(即我们上课所用的课本)为主题回答你的问题,如果所问问题与书的内容无关,将会返回"未在该文档中找到相关信息"
"""
with gr.Blocks() as demo:
gr.Markdown(f'<center><h1>{title}</h1></center>')
gr.Markdown(description)
with gr.Row():
with gr.Group():
question = gr.Textbox(label='请输入你的问题')
btn = gr.Button(value='提交')
btn.style(full_width=True)
with gr.Group():
answer = gr.Textbox(label='回答:')
btn.click(
ask_api,
inputs=[question],
outputs=[answer]
)
#参数share=True会产生一个公开网页,别人可以通过访问该网页使用你的模型,前提是你需要正在运行这段代码(将自己的电脑当作服务器)
demo.launch() |