ES04-批量写入

发布于:2025-09-09 ⋅ 阅读:(15) ⋅ 点赞:(0)

ES04-批量写入


1-Docker部署ES-有kibana

下面给出一份「开箱即用」的 Elasticsearch 8.x Docker 部署脚本合集,全部命令在 Ubuntu 22.04/24.04 + Docker 20.10+ 验证通过,直接复制即可运行。
如想一键完成,可直接用 Elastic 官方脚本(方式 1);如需自定义目录、内存、集群名等,可用手动 docker-compose 脚本(方式 2)。


方式 1:官方「一键本地体验」脚本(最快)

# 1. 安装 Docker(已安装可跳过)
curl -fsSL https://get.docker.com | sudo sh

# 2. 一键拉起 ES+Kibana(默认最新 8.x)
curl -fsSL https://elastic.co/start-local | sh

执行完后会在当前目录生成 elastic-start-local/ 文件夹,里面自带

  • start.sh – 启动
  • stop.sh – 停止
  • docker-compose.yml – 如要改端口、内存、开启远程访问可直接编辑。

默认只能本机访问,需要外网访问时把 docker-compose.yml 里的 127.0.0.1:9200127.0.0.1:5601 中的 127.0.0.1: 删掉即可,再执行 ./stop.sh && ./start.sh


方式 2:自定义 docker-compose 脚本(推荐生产/开发)

  1. 创建目录
mkdir -p /data/es/{data,logs,plugins,config,certs} \
         /data/kibana/{data,logs,config} \
         /data/es_docker
cd /data/es_docker
  1. 自定义环境变量 .env
cat > .env <<'EOF'
ELASTIC_VERSION=8.18.0
ES_JAVA_OPTS="-Xms1g -Xmx1g"
ELASTIC_PASSWORD=Elastic@123
KIBANA_PASSWORD=Kibana@123
CLUSTER_NAME=demo-es
COMPOSE_PROJECT_NAME=es
EOF
  1. 编写 docker-compose.yml
version: "3.9"
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION}
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=${CLUSTER_NAME}
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=${ES_JAVA_OPTS}"
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=false   # 如要https可改为true
      - ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - /data/es/data:/usr/share/elasticsearch/data
      - /data/es/logs:/usr/share/elasticsearch/logs
      - /data/es/plugins:/usr/share/elasticsearch/plugins
    ports:
      - "9200:9200"
      - "9300:9300"
    networks:
      - elastic
    healthcheck:
      test: ["CMD-SHELL", "curl -s -u elastic:${ELASTIC_PASSWORD} http://localhost:9200 | grep -q cluster_uuid"]
      interval: 30s
      timeout: 10s
      retries: 5

  kibana:
    image: docker.elastic.co/kibana/kibana:${ELASTIC_VERSION}
    container_name: kib01
    depends_on:
      - elasticsearch
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - ELASTICSEARCH_USERNAME=kibana_system
      - ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD}
      - I18N_LOCALE=zh-CN
    volumes:
      - /data/kibana/data:/usr/share/kibana/data
    ports:
      - "5601:5601"
    networks:
      - elastic

networks:
  elastic:
    driver: bridge
  1. 赋权 & 启动
sudo chown -R 1000:1000 /data/es /data/kibana
docker compose up -d
  1. 查看状态
docker compose ps
curl -u elastic:Elastic@123 http://localhost:9200

浏览器访问 http://<服务器IP>:5601 即可打开 Kibana,用户名 elastic,密码即 .env 里设置的 ELASTIC_PASSWORD


常用附属命令

  • 关闭/重启
docker compose stop      # 停止
docker compose start     # 再次启动
docker compose down -v   # 删除容器+网络(数据卷保留)
  • 重置 elastic/kibana_system 密码
docker exec -it es01 bin/elasticsearch-reset-password -u elastic -i
docker exec -it es01 bin/elasticsearch-reset-password -u kibana_system -i
  • 查看日志
docker logs -f es01
docker logs -f kib01

端口 & 防火墙

# 如启 firewalld 需放行
sudo firewall-cmd --permanent --add-port=9200/tcp --add-port=5601/tcp
sudo firewall-cmd --reload

云服务器还需在控制台安全组开放对应端口。

至此,Elasticsearch 与 Kibana 的 Docker 部署执行脚本已全部给出,可直接落地使用。


2-Docker部署ES-无kibana

下面给出「只跑 Elasticsearch,不装 Kibana」的最小可运行方案,依旧基于 Docker / docker-compose,复制即可用。
目录、内存、端口、集群名全部可自定义,单节点开发/测试/小规格生产 都能直接落地。


1. 建目录 & 赋权

sudo mkdir -p /data/es/{data,logs,plugins,config,certs}
# 容器内 elasticsearch 进程默认 uid:gid = 1000:1000
sudo chown -R 1000:1000 /data/es

2. 环境变量文件 .env

cat >/data/es/.env <<'EOF'
ELASTIC_VERSION=8.18.0
CLUSTER_NAME=es-single
NODE_NAME=es01
ES_JAVA_OPTS="-Xms1g -Xmx1g"
ELASTIC_PASSWORD=Elastic@123
# 如要外网访问,把下面改成 0.0.0.0
NETWORK_HOST=0.0.0.0
EOF

3. 纯 ES 的 docker-compose.yml

version: "3.9"
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION}
    container_name: ${NODE_NAME}
    restart: unless-stopped
    environment:
      - node.name=${NODE_NAME}
      - cluster.name=${CLUSTER_NAME}
      - discovery.type=single-node        # 单节点必加
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=${ES_JAVA_OPTS}"
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=false   # 关闭 https,省证书
      - ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
      - network.host=${NETWORK_HOST}
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - /data/es/data:/usr/share/elasticsearch/data
      - /data/es/logs:/usr/share/elasticsearch/logs
      - /data/es/plugins:/usr/share/elasticsearch/plugins
    ports:
      - "9200:9200"
      - "9300:9300"
    networks:
      - esnet
    healthcheck:
      test: ["CMD-SHELL", "curl -s -u elastic:${ELASTIC_PASSWORD} http://localhost:9200 | grep -q cluster_uuid"]
      interval: 30s
      timeout: 10s
      retries: 5

networks:
  esnet:
    driver: bridge

4. 启动

cd /data/es
docker compose up -d          # 拉镜像 & 后台运行
docker compose ps             # 看状态
docker compose logs -f        # 实时日志

5. 验证

curl -u elastic:Elastic@123 http://localhost:9200
# 返回集群信息即成功

6. 常用运维

# 重启/停止/删除
docker compose restart
docker compose stop
docker compose down           # 容器+网络删除,数据卷保留

# 改密码
docker exec -it es01 bin/elasticsearch-reset-password -u elastic -i

# 升级小版本
# 改 .env 里的 ELASTIC_VERSION → docker compose pull → docker compose up -d

7. 注意事项

  1. 生产多节点请去掉 discovery.type=single-node,改用
    discovery.seed_hosts / cluster.initial_master_nodes
  2. 如果 内存不足 1 G,把 .env 里的 ES_JAVA_OPTS 改成 -Xms512m -Xmx512m,但官方最低建议 1 GB。
  3. 需要 HTTPS 时把
    xpack.security.http.ssl.enabled=true 并挂证书目录即可。
  4. 云服务器记得 安全组/防火墙 放通 9200。

至此,纯 Elasticsearch 的 Docker 部署脚本 已就绪,复制即可用,Kibana 完全不装。


3-ES从CSV进行数据导入

注意:CSV需要有文件头(对应的是index的key)

下面我将为你创建一个完整的Python项目,用于:

  1. 生成本地CSV文件(模拟数据)
  2. 将CSV数据转换为Elasticsearch(ES)批量插入格式(bulk API)
  3. 使用带用户名密码认证的ES客户端批量插入数据

✅ 项目结构

csv_to_es/
│
├── data/
│   └── sample_data.csv          # 自动生成的CSV文件
├── scripts/
│   └── bulk_insert.py           # 主脚本:CSV → ES
├── requirements.txt              # 依赖
├── .env                          # 环境变量(ES连接信息)
└── README.md                     # 使用说明

✅ 1. requirements.txt

elasticsearch==8.13.0
python-dotenv==1.0.0
pandas==2.2.2

✅ 2. .env不要提交到Git

ES_HOST=https://localhost:9200
ES_USER=elastic
ES_PASSWORD=your_password
ES_INDEX=my_index
ES_CA_CERT=/path/to/http_ca.crt  # 如果是自签名证书

✅ 3. scripts/bulk_insert.py

import os
import csv
import pandas as pd
from elasticsearch import Elasticsearch
from dotenv import load_dotenv

load_dotenv()

# 配置
CSV_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'sample_data.csv')
ES_HOST = os.getenv("ES_HOST")
ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_INDEX = os.getenv("ES_INDEX")
ES_CA_CERT = os.getenv("ES_CA_CERT")

def create_sample_csv():
    os.makedirs(os.path.dirname(CSV_PATH), exist_ok=True)
    data = [
        {"id": 1, "name": "Alice", "age": 30},
        {"id": 2, "name": "Bob", "age": 25},
        {"id": 3, "name": "Charlie", "age": 35},
    ]
    df = pd.DataFrame(data)
    df.to_csv(CSV_PATH, index=False)
    print(f"[INFO] CSV文件已生成:{CSV_PATH}")

def csv_to_bulk_actions(csv_file, index_name):
    actions = []
    with open(csv_file, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            action = {
                "_index": index_name,
                "_source": row
            }
            actions.append(action)
    return actions

def bulk_insert_to_es(actions):
    es = Elasticsearch(
        [ES_HOST],
        basic_auth=(ES_USER, ES_PASSWORD),
        ca_certs=ES_CA_CERT if ES_CA_CERT else None,
        verify_certs=bool(ES_CA_CERT)
    )

    if not es.ping():
        raise ConnectionError("无法连接到Elasticsearch")

    from elasticsearch.helpers import bulk
    success, _ = bulk(es, actions)
    print(f"[INFO] 成功插入 {success} 条记录到索引 '{ES_INDEX}'")

def main():
    create_sample_csv()
    actions = csv_to_bulk_actions(CSV_PATH, ES_INDEX)
    bulk_insert_to_es(actions)

if __name__ == "__main__":
    main()

✅ 4. README.md

# CSV to Elasticsearch Bulk Insert

## 使用步骤

1. 克隆项目
```bash
git clone <your-repo>
cd csv_to_es
  1. 安装依赖
pip install -r requirements.txt
  1. 配置环境变量
    复制 .env.example.env 并填写你的ES连接信息。

  2. 运行脚本

python scripts/bulk_insert.py

注意事项

  • 确保ES服务可访问
  • 如果使用自签名证书,请填写 ES_CA_CERT 路径
  • 默认索引为 my_index,可在 .env 中修改

---

### ✅ 5. 快速运行(本地测试)

```bash
# 创建虚拟环境(可选)
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 安装依赖
pip install -r requirements.txt

# 配置 .env 文件(填写你的ES信息)

# 运行
python scripts/bulk_insert.py

✅ 6. 输出示例

[INFO] CSV文件已生成:.../csv_to_es/data/sample_data.csv
[INFO] 成功插入 3 条记录到索引 'my_index'

如需支持:

  • 自动创建索引并设置mapping
  • 增量更新(按id去重)
  • 定时任务(cron/airflow)

可以继续扩展,我可以帮你加上。需要吗?


4-ES从CSV进行数据导入-分批插入数据

from elasticsearch import Elasticsearch, helpers
import pandas as pd

# 1. 连接写法(8.x 推荐用 Elasticsearch(hosts=…))
es = Elasticsearch(
    hosts=[{'scheme': 'http', 'host': '127.0.0.1', 'port': 9200}],
    basic_auth=('username', 'password')
)

# 2. 必须给 index 一个名字
INDEX_NAME = 'your_index_name'

# 3. 批处理函数
def process_chunk(chunk: pd.DataFrame):
    actions = []
    for _, row in chunk.iterrows():          # iterrows() 是方法,要加括号
        row_dict = row.astype(str).to_dict()
        # 将usrId转化为字符串,并确保长度为6,不足在前面添加0
        row_dict['usrId'] = str(row_dict['usrId']).zfill(6)

        actions.append({
            "_index": INDEX_NAME,
            "_source": row_dict
        })

    # 4. 真正写入 ES
    if actions:
        print(f'开始添加 {len(actions)} 条记录到 {INDEX_NAME}')
        try:
            helpers.bulk(es, actions, raise_on_error=True)
            print(f'成功写入 {len(actions)} 条记录')
        except Exception as e:               # 注意大小写
            print(f'索引过程失败: {e}')

# 5. 主流程
def main():
    csv_file = './xxx/xxx.csv'
    chunk_size = 10_000

    for chunk in pd.read_csv(csv_file, chunksize=chunk_size, encoding='gbk'):
        process_chunk(chunk)

if __name__ == '__main__':
    main()

网站公告

今日签到

点亮在社区的每一天
去签到