Python Opensearch-py库连接和操作OpenSearch数据库

发布于:2025-06-20 ⋅ 阅读:(20) ⋅ 点赞:(0)

项目结构

opensearch-demo/
├── docker-compose.yml
├── requirements.txt          # Add this file
├── opensearch_client.py
├── README.md
├── .venv                     # Keep if virtual environment is used
├── .gitignore                # Keep for Git ignore rules
├── .python-version           # Keep if specific Python version is required
├── opensearch_demo.ipynb     # Keep if Jupyter notebook is useful
└── pyproject.toml            # Keep if using Poetry or similar tool

功能特性

  • ✅ Docker Compose 一键启动 OpenSearch 集群
  • ✅ OpenSearch Dashboards 可视化界面
  • ✅ Python 客户端连接和操作
  • ✅ 完整的 CRUD 操作演示
  • ✅ 全文搜索、精确匹配、范围查询
  • ✅ 聚合查询和统计分析

1. Docker Compose 配置

创建 docker-compose.yml 文件:

services:
  opensearch:
    container_name: opensearch-demo
    image: opensearchproject/opensearch:2.19.1
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node1
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=StrongPass123!@
      - plugins.security.disabled=true
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data:/usr/share/opensearch/data
    ports:
      - "9200:9200"
      - "9600:9600"
    networks:
      - opensearch-net
    healthcheck:
      test: ["CMD-SHELL", "curl -f http://localhost:9200 || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 5

  opensearch-dashboards:
    container_name: opensearch-dashboards-demo
    image: opensearchproject/opensearch-dashboards:2.19.1
    ports:
      - "5601:5601"
    expose:
      - "5601"
    environment:
      OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
      DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
    networks:
      - opensearch-net
    depends_on:
      opensearch:
        condition: service_healthy

volumes:
  opensearch-data:

networks:
  opensearch-net:

2. Python 环境准备

uv init
uv venv
.venv/Script/activate
uv pip install opensearch-py requests pandas

3. opensearch_client 客户端

创建 opensearch_client.py 文件:

import os
from opensearchpy import OpenSearch
from typing import Dict, List


class OpenSearchClient:
    def __init__(self):
        self.host = 'localhost'
        self.port = 9200
        self.auth = ('admin', os.getenv('OPENSEARCH_PASSWORD', 'Admin123!'))
        
        # 创建OpenSearch客户端
        self.client = OpenSearch(
            hosts=[{'host': self.host, 'port': self.port}],
            http_auth=self.auth,
            use_ssl=False,
            verify_certs=False,
            ssl_assert_hostname=False,
            ssl_show_warn=False,
        )
    
    def test_connection(self) -> bool:
        """测试连接"""
        try:
            info = self.client.info()
            print(f"连接成功! OpenSearch版本: {info['version']['number']}")
            return True
        except Exception as e:
            print(f"连接失败: {e}")
            return False
    
    def create_index(self, index_name: str, mapping: Dict = None) -> bool:
        """创建索引"""
        try:
            if self.client.indices.exists(index=index_name):
                print(f"索引 '{index_name}' 已存在")
                return True
            
            body = {}
            if mapping:
                body['mappings'] = mapping
            
            response = self.client.indices.create(index=index_name, body=body)
            print(f"索引 '{index_name}' 创建成功")
            return True
        except Exception as e:
            print(f"创建索引失败: {e}")
            return False
    
    def delete_index(self, index_name: str) -> bool:
        """删除索引"""
        try:
            if not self.client.indices.exists(index=index_name):
                print(f"索引 '{index_name}' 不存在")
                return True
            
            self.client.indices.delete(index=index_name)
            print(f"索引 '{index_name}' 删除成功")
            return True
        except Exception as e:
            print(f"删除索引失败: {e}")
            return False
    
    def index_document(self, index_name: str, doc_id: str, document: Dict) -> bool:
        """索引文档"""
        try:
            response = self.client.index(
                index=index_name,
                id=doc_id,
                body=document
            )
            print(f"文档 '{doc_id}' 索引成功")
            return True
        except Exception as e:
            print(f"索引文档失败: {e}")
            return False
    
    def get_document(self, index_name: str, doc_id: str) -> Dict:
        """获取文档"""
        try:
            response = self.client.get(index=index_name, id=doc_id)
            return response['_source']
        except Exception as e:
            print(f"获取文档失败: {e}")
            return {}
    
    def search_documents(self, index_name: str, query: Dict) -> List[Dict]:
        """搜索文档"""
        try:
            response = self.client.search(index=index_name, body=query)
            hits = response['hits']['hits']
            return [hit['_source'] for hit in hits]
        except Exception as e:
            print(f"搜索失败: {e}")
            return []
    
    def bulk_index(self, index_name: str, documents: List[Dict]) -> bool:
        """批量索引文档"""
        try:
            actions = []
            for i, doc in enumerate(documents):
                action = {
                    "_index": index_name,
                    "_id": doc.get('id', i),
                    "_source": doc
                }
                actions.append(action)
            
            response = self.client.bulk(body=actions)
            if response['errors']:
                print("批量索引过程中出现错误")
                return False
            else:
                print(f"成功批量索引 {len(documents)} 个文档")
                return True
        except Exception as e:
            print(f"批量索引失败: {e}")
            return False
    
    def get_cluster_health(self) -> Dict:
        """获取集群健康状态"""
        try:
            return self.client.cluster.health()
        except Exception as e:
            print(f"获取集群健康状态失败: {e}")
            return {}
    
    def list_indices(self) -> List[str]:
        """列出所有索引"""
        try:
            response = self.client.cat.indices(format='json')
            return [index['index'] for index in response]
        except Exception as e:
            print(f"获取索引列表失败: {e}")
            return []

4、快速开始

启动服务

# 启动 OpenSearch 服务
docker-compose up -d

# 查看服务状态
docker-compose ps

# 查看日志
docker-compose logs -f opensearch

在这里插入图片描述

访问 Web 界面

  • OpenSearch API: http://localhost:9200
  • OpenSearch Dashboards: http://localhost:5601

OpenSearch 基本操作演示

这个notebook演示了如何使用Python连接OpenSearch并进行基本操作。

# 导入必要的库
from opensearch_client import OpenSearchClient
import json
import pandas as pd

1. 连接到OpenSearch

# 创建OpenSearch客户端
client = OpenSearchClient()

# 测试连接
client.test_connection()
连接成功! OpenSearch版本: 2.19.1

True

2. 查看集群健康状态

# 获取集群健康状态
health = client.get_cluster_health()
print(json.dumps(health, indent=2))
{
  "cluster_name": "opensearch-cluster",
  "status": "green",
  "timed_out": false,
  "number_of_nodes": 1,
  "number_of_data_nodes": 1,
  "discovered_master": true,
  "discovered_cluster_manager": true,
  "active_primary_shards": 4,
  "active_shards": 4,
  "relocating_shards": 0,
  "initializing_shards": 0,
  "unassigned_shards": 0,
  "delayed_unassigned_shards": 0,
  "number_of_pending_tasks": 0,
  "number_of_in_flight_fetch": 0,
  "task_max_waiting_in_queue_millis": 0,
  "active_shards_percent_as_number": 100.0
}

3. 创建索引

# 定义索引映射
mapping = {
    'properties': {
        'title': {'type': 'text'},
        'content': {'type': 'text'},
        'author': {'type': 'keyword'},
        'publish_date': {'type': 'date'},
        'tags': {'type': 'keyword'},
        'views': {'type': 'integer'}
    }
}

# 创建索引
index_name = 'blog_posts'
client.create_index(index_name, mapping)
索引 'blog_posts' 创建成功
True

4. 索引单个文档

# 创建示例文档
document = {
    'title': 'OpenSearch入门指南',
    'content': '这是一篇关于OpenSearch基本使用的文章...',
    'author': '张三',
    'publish_date': '2024-01-15',
    'tags': ['opensearch', '搜索引擎', '教程'],
    'views': 1250
}

# 索引文档
client.index_document(index_name, '1', document)
文档 '1' 索引成功


True

5. 批量索引文档

# 创建多个示例文档
documents = [
    {
        'id': '2',
        'title': 'Elasticsearch vs OpenSearch',
        'content': '比较Elasticsearch和OpenSearch的异同点...',
        'author': '李四',
        'publish_date': '2024-01-20',
        'tags': ['elasticsearch', 'opensearch', '比较'],
        'views': 890
    },
    {
        'id': '3',
        'title': '搜索引擎优化技巧',
        'content': '提高搜索性能的各种技巧和最佳实践...',
        'author': '王五',
        'publish_date': '2024-01-25',
        'tags': ['优化', '性能', '搜索'],
        'views': 2100
    },
    {
        'id': '4',
        'title': 'Python与OpenSearch集成',
        'content': '如何在Python项目中集成OpenSearch...',
        'author': '张三',
        'publish_date': '2024-02-01',
        'tags': ['python', 'opensearch', '集成'],
        'views': 1560
    }
]

# 批量索引
client.bulk_index(index_name, documents)
成功批量索引 3 个文档

True

6. 获取单个文档

# 获取文档
doc = client.get_document(index_name, '1')
print(json.dumps(doc, indent=2, ensure_ascii=False))
{
  "title": "OpenSearch入门指南",
  "content": "这是一篇关于OpenSearch基本使用的文章...",
  "author": "张三",
  "publish_date": "2024-01-15",
  "tags": [
    "opensearch",
    "搜索引擎",
    "教程"
  ],
  "views": 1250
}

7. 搜索文档

# 简单文本搜索
query = {
    'query': {
        'match': {
            'content': 'OpenSearch'
        }
    }
}

results = client.search_documents(index_name, query)
print(f'找到 {len(results)} 个结果:')
for i, result in enumerate(results, 1):
    print(f'{i}. {result["title"]} - 作者: {result["author"]}')
找到 3 个结果:
1. Elasticsearch vs OpenSearch - 作者: 李四
2. Python与OpenSearch集成 - 作者: 张三
3. OpenSearch入门指南 - 作者: 张三
# 复合查询 - 搜索特定作者的文章
query = {
    'query': {
        'bool': {
            'must': [
                {'term': {'author': '张三'}}
            ]
        }
    }
}

results = client.search_documents(index_name, query)
print(f'张三的文章 ({len(results)} 篇):')
for result in results:
    print(f'- {result["title"]} (浏览量: {result["views"]})')
张三的文章 (2 篇):
- OpenSearch入门指南 (浏览量: 1250)
- Python与OpenSearch集成 (浏览量: 1560)
# 范围查询 - 查找浏览量大于1000的文章
query = {
    'query': {
        'range': {
            'views': {
                'gt': 1000
            }
        }
    },
    'sort': [
        {'views': {'order': 'desc'}}
    ]
}

results = client.search_documents(index_name, query)
print('浏览量大于1000的文章 (按浏览量降序):')
for result in results:
    print(f'- {result["title"]} (浏览量: {result["views"]})')
浏览量大于1000的文章 (按浏览量降序):
- 搜索引擎优化技巧 (浏览量: 2100)
- Python与OpenSearch集成 (浏览量: 1560)
- OpenSearch入门指南 (浏览量: 1250)

8. 聚合查询

# 聚合查询 - 按作者统计文章数量
query = {
    'size': 0,
    'aggs': {
        'authors': {
            'terms': {
                'field': 'author'
            }
        },
        'avg_views': {
            'avg': {
                'field': 'views'
            }
        }
    }
}

try:
    response = client.client.search(index=index_name, body=query)
    
    print('作者文章统计:')
    for bucket in response['aggregations']['authors']['buckets']:
        print(f'- {bucket["key"]}: {bucket["doc_count"]} 篇文章')
    
    avg_views = response['aggregations']['avg_views']['value']
    print(f'\n平均浏览量: {avg_views:.2f}')
except Exception as e:
    print(f'聚合查询失败: {e}')
作者文章统计:
- 张三: 2 篇文章
- 李四: 1 篇文章
- 王五: 1 篇文章

平均浏览量: 1450.00

9. 使用Pandas展示结果

# 获取所有文档并转换为DataFrame
query = {'query': {'match_all': {}}}
all_docs = client.search_documents(index_name, query)

df = pd.DataFrame(all_docs)
print('所有文章数据:')
print(df[['title', 'author', 'views', 'publish_date']])
所有文章数据:
                         title author  views publish_date
0               OpenSearch入门指南     张三   1250   2024-01-15
1  Elasticsearch vs OpenSearch     李四    890   2024-01-20
2                     搜索引擎优化技巧     王五   2100   2024-01-25
3          Python与OpenSearch集成     张三   1560   2024-02-01
# 简单的数据分析
print('数据统计:')
print(f'总文章数: {len(df)}')
print(f'总浏览量: {df["views"].sum()}')
print(f'平均浏览量: {df["views"].mean():.2f}')
print(f'最高浏览量: {df["views"].max()}')
print(f'最低浏览量: {df["views"].min()}')

print('按作者分组:')
author_stats = df.groupby('author').agg({
    'title': 'count',
    'views': ['sum', 'mean']
}).round(2)
print(author_stats)
数据统计:
总文章数: 4
总浏览量: 5800
平均浏览量: 1450.00
最高浏览量: 2100
最低浏览量: 890
按作者分组:
       title views        
       count   sum    mean
author                    
张三         2  2810  1405.0
李四         1   890   890.0
王五         1  2100  2100.0

10. 清理资源

# 列出所有索引
indices = client.list_indices()
print('当前索引:', indices)


当前索引: ['.plugins-ml-config', '.opensearch-observability', 'blog_posts', '.kibana_1']

client.delete_index(index_name)
print(f'索引 {index_name} 已删除') 
索引 'blog_posts' 删除成功
索引 blog_posts 已删除

网站公告

今日签到

点亮在社区的每一天
去签到