You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
"""Elasticsearch vector store for 全文索引---- for 全文检索."""from __future__ importannotationsimportjsonimportloggingimportosfromtypingimportAny, Iterable, List, Optionalfromdbgpt._private.pydanticimportFieldfromdbgpt.coreimportChunk, Embeddingsfromdbgpt.core.awel.flowimportParameter, ResourceCategory, register_resourcefromdbgpt.storage.vector_store.baseimport (
_COMMON_PARAMETERS,
VectorStoreBase,
VectorStoreConfig,
)
fromdbgpt.storage.vector_store.filtersimportFilterOperator, MetadataFiltersfromdbgpt.utilimportstring_utilsfromdbgpt.util.i18n_utilsimport_logger=logging.getLogger(__name__)
try:
importjiebaimportjieba.analysefromlangchain.schemaimportDocumentfromlangchain.vectorstores.elasticsearchimportElasticsearchStorefromelasticsearchimportElasticsearchexceptImportError:
raiseValueError(
"Could not import elasticsearch python package. ""Please install it with `pip install elasticsearch`."
)
@register_resource(_("ElasticSearch Vector Store"),"elasticsearch_vector_store",category=ResourceCategory.VECTOR_STORE,parameters=[*_COMMON_PARAMETERS,Parameter.build_from(_("Uri"),"uri",str,description=_("The uri of elasticsearch store, if not set, will use the default ""uri." ),optional=True,default="localhost", ),Parameter.build_from(_("Port"),"port",str,description=_("The port of elasticsearch store, if not set, will use the default ""port." ),optional=True,default="9200", ),Parameter.build_from(_("Alias"),"alias",str,description=_("The alias of elasticsearch store, if not set, will use the default ""alias." ),optional=True,default="default", ),Parameter.build_from(_("Index Name"),"index_name",str,description=_("The index name of elasticsearch store, if not set, will use the ""default index name." ),optional=True,default="index_name_test", ),Parameter.build_from(_("Text Field"),"text_field",str,description=_("The text field of elasticsearch store, if not set, will use the ""default text field." ),optional=True,default="content", ),Parameter.build_from(_("Embedding Field"),"embedding_field",str,description=_("The embedding field of elasticsearch store, if not set, will use the ""default embedding field." ),optional=True,default="vector", ), ],description=_("Elasticsearch vector store."),)classElasticsearchVectorConfig(VectorStoreConfig):
"""Elasticsearch vector store config."""classConfig:
"""Config for BaseModel."""arbitrary_types_allowed=Trueuri: str=Field(
default="localhost",
description="The uri of elasticsearch store, if not set, will use the default uri.",
)
port: str=Field(
default="9200",
description="The port of elasticsearch store, if not set, will use the default port.",
)
alias: str=Field(
default="default",
description="The alias of elasticsearch store, if not set, will use the default ""alias.",
)
index_name: str=Field(
default="index_name_test",
description="The index name of elasticsearch store, if not set, will use the ""default index name.",
)
text_field: str=Field(
default="content",
description="The text field of elasticsearch store, if not set, will use the default ""text field.",
)
embedding_field: str=Field(
default="vector",
description="The embedding field of elasticsearch store, if not set, will use the ""default embedding field.",
)
metadata_field: str=Field(
default="metadata",
description="The metadata field of elasticsearch store, if not set, will use the ""default metadata field.",
)
secure: str=Field(
default="",
description="The secure of elasticsearch store, if not set, will use the default ""secure.",
)
classElasticStore(VectorStoreBase):
"""Elasticsearch vector store."""def__init__(self, vector_store_config: ElasticsearchVectorConfig) ->None:
"""Create a ElasticsearchStore instance. Args: vector_store_config (ElasticsearchVectorConfig): ElasticsearchStore config. """connect_kwargs= {}
elasticsearch_vector_config=vector_store_config.dict()
self.uri=elasticsearch_vector_config.get("uri") oros.getenv(
"ElasticSearch_URL", "localhost"
)
self.port=elasticsearch_vector_config.get("post") oros.getenv(
"ElasticSearch_PORT", "9200"
)
self.username=elasticsearch_vector_config.get("username") oros.getenv("ElasticSearch_USERNAME")
self.password=elasticsearch_vector_config.get("password") oros.getenv(
"ElasticSearch_PASSWORD"
)
self.collection_name= (
elasticsearch_vector_config.get("name") orvector_store_config.name
)
ifstring_utils.is_all_chinese(self.collection_name):
bytes_str=self.collection_name.encode("utf-8")
hex_str=bytes_str.hex()
self.collection_name=hex_strifvector_store_config.embedding_fnisNone:
# Perform runtime checks on self.embedding to# ensure it has been correctly set and loadedraiseValueError("embedding_fn is required for ElasticSearchStore")
self.index_name=self.collection_name.lower()
self.embedding: Embeddings=vector_store_config.embedding_fnself.fields: List= []
if (self.usernameisNone) != (self.passwordisNone):
raiseValueError(
"Both username and password must be set to use authentication for ""ElasticSearch"
)
ifself.username:
connect_kwargs["username"] =self.usernameconnect_kwargs["password"] =self.password# 创建索引的配置===单节点情况下self.index_settings= { "settings": {
"number_of_shards": 1,
"number_of_replicas": 0# 设置副本数量为0
}}
""""""# ES python客户端连接(仅连接)try:
ifself.username!=""andself.password!="":
self.es_client_python=Elasticsearch(f"http://{self.uri}:{self.port}",
basic_auth=(self.username,self.password))
# 不创建索引,要不然会报错#if not self.vector_name_exists():# self.es_client_python.indices.create(index=self.index_name, body=self.index_settings)else:
logger.warning("ES未配置用户名和密码")
self.es_client_python=Elasticsearch(f"http://{self.uri}:{self.port}")
#if not self.vector_name_exists():# self.es_client_python.indices.create(index=self.index_name, body=self.index_settings)exceptConnectionError:
logger.error("连接到 Elasticsearch 失败!")
exceptExceptionase:
logger.error(f"ES python客户端连接(仅连接)===Error 发生 : {e}")
# langchain ES 连接、创建索引try:
ifself.username!=""andself.password!="":
self.db_init=ElasticsearchStore(
es_url=f"http://{self.uri}:{self.port}",
index_name=self.index_name,
query_field="context",
vector_query_field="dense_vector",
embedding=self.embedding,
es_user=self.username,
es_password=self.password
)
else:
logger.warning("ES未配置用户名和密码")
self.db_init=ElasticsearchStore(
es_url=f"http://{self.uri}:{self.port}",
index_name=self.index_name,
query_field="context",
vector_query_field="dense_vector",
embedding=self.embedding,
)
exceptConnectionError:
print("### 连接到 Elasticsearch 失败!")
logger.error("### 连接到 Elasticsearch 失败!")
exceptExceptionase:
logger.error(f"langchain ES 连接、创建索引===Error 发生 : {e}")
defload_document(
self,
#docs: Iterable[str], chunks: List[Chunk]
) ->List[str]:
"""Add text data into ElastcSearch. 将docs写入到ES中 """logger.info("ElasticStore load document")
try:
# 连接 + 同时写入文档 texts= [chunk.contentforchunkinchunks]
metadatas= [chunk.metadataforchunkinchunks]
ids= [chunk.chunk_idforchunkinchunks]
ifself.username!=""andself.password!="":
logger.info(f"wwt docs metadatas[0] === ElasticsearchStore.from_texts:::{metadatas[0]}: len={len(metadatas)}")
self.db=ElasticsearchStore.from_texts(
texts=texts,
embedding=self.embedding,
metadatas=metadatas,
ids=ids,
es_url=f"http://{self.uri}:{self.port}",
index_name=self.index_name,
distance_strategy="COSINE", # Defaults to COSINE. Can be one of COSINE, EUCLIDEAN_DISTANCE, or DOT_PRODUCT.query_field="context", ## Name of the field to store the texts in.vector_query_field="dense_vector", # Optional. Name of the field to store the embedding vectors in. es_user=self.username,
es_password=self.password,
)
logger.info(f"wwt add Embedding success.......")
else:
self.db=ElasticsearchStore.from_documents(
texts=texts,
embedding=self.embedding,
metadatas=metadatas,
ids=ids,
es_url=f"http://{self.uri}:{self.port}",
index_name=self.index_name,
distance_strategy="COSINE",
query_field="context",
vector_query_field="dense_vector",
#verify_certs=False,
)
returnidsexceptConnectionErrorasce:
print(ce)
print("连接到 Elasticsearch 失败!")
logger.error("连接到 Elasticsearch 失败!")
exceptExceptionase:
logger.error(f"load_document===Error 发生 : {e}")
print(e)
defdelete_by_ids(self, ids):
"""Delete vector by ids."""logger.info(f"1begin delete elasticsearch len ids: {len(ids)}")
logger.info(f"1begin delete elasticsearch type ids: {type(ids)}")
ids=ids.split(",")
logger.info(f"2begin delete elasticsearch len ids: {len(ids)}")
logger.info(f"2begin delete elasticsearch type ids: {type(ids)}")
#es_client= self.db_init.connect_to_elasticsearch(# es_url=f"http://{self.uri}:{self.port}", # es_user=self.username,# es_password=self.password, #)try:
self.db_init.delete(ids=ids)
self.es_client_python.indices.refresh(index=self.index_name)
exceptExceptionase:
logger.error(f"Error 发生 : {e}")
defsimilar_search(
self, text: str, topk: int, score_threshold: float, filters: Optional[MetadataFilters] =None
) ->List[Chunk]:
"""Perform a search on a query string and return results. # TODO: 语义分词后期配置可换 """query=textprint(
f" similar_search 输入的query参数为:{query}")
query_list=jieba.analyse.textrank(query, topK=20, withWeight=False)
iflen(query_list) ==0:
query_list= [query]
body= {
"query": {
"match": {
"context": " ".join(query_list)
}
}
}
search_results=self.es_client_python.search(index=self.index_name, body=body, size=topk)
search_results=search_results['hits']['hits']
# 判断搜索结果是否为空ifnotsearch_results:
return []
info_docs= []
byte_count=0forresultinsearch_results:
index_name=result["_index"]
vector_doc=result["dense_vector"] # 文本的稠密向量表示doc_id=result["_id"]
source=result["_source"]
context=source["context"]
metadata=source["metadata"]
score=result["_score"]
# 如果下一个context会超过总字节数限制,则截断contextVS_TYPE_PROMPT_TOTAL_BYTE_SIZE=3000### 每种向量库的prompt字节的最大长度,超过则截断,后面放到.env中if (byte_count+len(context)) >VS_TYPE_PROMPT_TOTAL_BYTE_SIZE:
context=context[:VS_TYPE_PROMPT_TOTAL_BYTE_SIZE-byte_count]
doc_with_score= [Document(page_content=context, metadata=metadata), score, doc_id]
info_docs.append(doc_with_score)
byte_count+=len(context)
# 如果字节数已经达到限制,则结束循环ifbyte_count>=VS_TYPE_PROMPT_TOTAL_BYTE_SIZE:
breakprint(f"ES搜索到{len(info_docs)}个结果:")
# 将结果写入文件result_file=open("es_search_results.txt", "w", encoding="utf-8")
result_file.write(f"query:{query}")
result_file.write(f"ES搜索到{len(info_docs)}个结果:\n")
foritemininfo_docs:
doc=item[0]
result_file.write(doc.page_content+"\n")
result_file.write("*"*20)
result_file.write("\n")
result_file.flush()
print(doc.page_content+"\n")
print("*"*20)
print("\n")
result_file.close()
return [
Chunk(
metadata=json.loads(doc.metadata.get("metadata", "")),
content=doc.page_content,
)
fordoc, score, idininfo_docs
]
#def similar_search_with_scores(self, text: str, topk: int, score_threshold: float,): defsimilar_search_with_scores(
self, text, topk, score_threshold, filters: Optional[MetadataFilters] =None
) ->List[Chunk]:
"""Perform a search on a query string and return results with score. For more information about the search parameters, take a look at the pyElasticSearch documentation found here: https://ElasticSearch.io/api-reference/pyElasticSearch/v2.2.6/Collection/search().md Args: text (str): The query text. topk (int): The number of similar documents to return. score_threshold (float): Optional, a floating point value between 0 to 1. filters (Optional[MetadataFilters]): Optional, metadata filters. Returns: List[Tuple[Document, float]]: Result doc and score. """query=textprint(f" similar_search 输入的query参数为:{query}")
query_list=jieba.analyse.textrank(query, topK=20, withWeight=False)
iflen(query_list) ==0:
query_list= [query]
body= {
"query": {
"match": {
"context": " ".join(query_list)
}
}
}
search_results=self.es_client_python.search(index=self.index_name, body=body, size=topk)
search_results=search_results['hits']['hits']
# 判断搜索结果是否为空ifnotsearch_results:
return []
info_docs= []
byte_count=0forresultinsearch_results:
# logger.info(f"wwt add query result==={result}")## 全部列出了index_name=result["_index"]
#vector_doc = result["dense_vector"] # 文本的稠密向量表示doc_id=result["_id"]
source=result["_source"] # 源头context=source["context"] # 文本内容metadata=source["metadata"] ## 文本来源路径score=result["_score"] /100# 分数,100分zhi# 如果下一个context会超过总字节数限制,则截断contextVS_TYPE_PROMPT_TOTAL_BYTE_SIZE=3000### 每种向量库的prompt字节的最大长度,超过则截断,后面放到.env中if (byte_count+len(context)) >VS_TYPE_PROMPT_TOTAL_BYTE_SIZE:
context=context[:VS_TYPE_PROMPT_TOTAL_BYTE_SIZE-byte_count]
doc_with_score= [Document(page_content=context, metadata=metadata), score, doc_id]
info_docs.append(doc_with_score)
byte_count+=len(context)
# 如果字节数已经达到限制,则结束循环ifbyte_count>=VS_TYPE_PROMPT_TOTAL_BYTE_SIZE:
breakprint(f"ES搜索到{len(info_docs)}个结果:")
logger.info(f"ES搜索到{len(info_docs)}个结果:")
# 将结果写入文件result_file=open("es_search_results.txt", "w", encoding="utf-8")
result_file.write(f"query:{query}\n")
result_file.write(f"ES搜索到{len(info_docs)}个结果:\n")
foritemininfo_docs:
doc=item[0]
result_file.write(doc.page_content+"\n")
result_file.write("*"*50)
result_file.write("\n")
result_file.flush()
print(doc.page_content+"\n")
print("*"*50)
print("\n\n")
result_file.close()
ifany(score<0.0orscore>1.0for_, score, _ininfo_docs):
logger.warning(
"similarity score need between"f" 0 and 1, got {info_docs}"
)
logger.info(f"wwt add score_threshold: {score_threshold}")
ifscore_thresholdisnotNone:
docs_and_scores= [
Chunk(
metadata=doc.metadata,
content=doc.page_content,
score=score,
chunk_id=id,
)
fordoc, score, idininfo_docsifscore>=score_threshold
]
iflen(docs_and_scores) ==0:
logger.warning(
"No relevant docs were retrieved using the relevance score"f" threshold {score_threshold}"
)
returndocs_and_scoresdefvector_name_exists(self):
"""Whether vector name exists.""""""is vector store name exist."""returnself.es_client_python.indices.exists(index=self.index_name)
defdelete_vector_name(self, vector_name: str):
"""Delete vector name/index_name.""""""从知识库(知识库名的小写部分)删除全部向量"""ifself.es_client_python.indices.exists(index=self.index_name):
self.es_client_python.indices.delete(index=self.index_name)
#self.es_client_python.indices.delete(index=self.kb_name)
The text was updated successfully, but these errors were encountered:
已经验证通过的功能:
1)可以新建知识空间(仅支持英文,不支持中文)
2)可以上传文档进行EMBEDDING,
3)可以逐个删除上传的每一个文档。
4)可以搜索对话。
涉及修改的文件内容如下:
1).env 添加如下
VECTOR_STORE_TYPE=ElasticSearch
ElasticSearch_URL=127.0.0.1
ElasticSearch_PORT=9200
ElasticSearch_USERNAME=elastic
ElasticSearch_PASSWORD=i=+iLw9y0Jduq86XTi6W
2)dbgpt/_private/config.py 添加如下
3)dbgpt/app/knowledge/service.py 的 def delete_document():修改如下
4)dbgpt/storage/vector_store/init.py 新增修改如下:
5)dbgpt/storage/vector_store/ 新增文件elastic_store.py如下:
The text was updated successfully, but these errors were encountered: