1. 들어가기 전에...
2. elasticsearch, Python3.7/pip3설치
3. elasticsearchSocket.py 작성
4. start.sh/stop.sh/status.sh 실행파일 작성
5. Spring-Java Socket 사용
6. insert.py, test_data1.json, index.json 작성
7. 테스트
Spring-Elasticsearch 연동을 검색하면 Docker를 사용해 설치하고... Gradle에 필요한 컴파일 요소들 넣고.. 관련 클래스 상속받은 파일에서 코드작성.. 조회.. 인데 까보면 상당히 복잡한 구조다. json, http 등 설정 파일 불러오고 사용하고, jpa관련 소스 작성 등...
내가 사용한 방법은 다음과 같다.
1. Elasticsearch가 설치되어있는 서버(Centos7)에 연동용 Python소스(elasticsearchSocket.py) 작성
- Python 내에서 listen으로 특정 포트에 띄워둘수 있도록 한다.
2. .sh 실행파일로 elasticsearchSocket.py실행(특정 포트에 띄워지게됨)
3. Spring안에서 Java Socket으로 서버의 elasticsearchSocket.py에 요청
4. elasticsearchSocket.py 안에서 Elasticsearch에 연결. 원하는 작업 처리 및 결과 리턴
5. Spring에서 결과 확인
이렇게 구현하는것도 쉬운 구조는 아닌데 굳이 이렇게 한 이유는
1. Query DSL사용이 가능하다.
2. tensorflow_hub 를 사용해서 데이터 백터값으로 변환 가능
- 이렇게 변환한 데이터로 유사어 검색>일치율 확인 등 다양하게 활용 가능
이 두가지 때문이다.
따라서 이 글의 목표는...
1. 사용자가 질문을 입력하면
2. 질문을 python socket을 통해서 백터값으로 변환&elasticsearch로 검색
3. 일치율이 높은 상위3개 답변 리턴
그래서 사용자 입장에서 이 기능을 사용했을시 다음과 같은 경험을 얻도록 할 예정이다.
사용자: 가산역은 몇호선인가요? (질문입력)
elasticsearch 검색 결과//
1. 질문: 가산역은 몇호선인가요?
답변: 1호선입니다.
2. 질문: 가산디지털단지역은 몇호선인가요?
답변: 1호선입니다.
3. 질문: 가산디지털단지역은 지하철 몇호선인가요?
답변: 1호선입니다.
내가 이전의 작성해둔 글을 그대로 따라하면 된다. 현재 목표에서 키바나는 필요없으므로 설치를 생략한다.
만약 직접 조회해서 일단 테스트해보는걸 원하면 키바나 설치도 진행하면 된다.
1. elasticsearch 설치
https://kje1218.tistory.com/14
1-1. Kibana 설치(선택)
https://kje1218.tistory.com/15
2. Python3.7/pip3설치
https://kje1218.tistory.com/16
1. 사용할 모듈들을 설치해준다.
※ 설치할때 python3.7 -m pip install (모듈이름) 으로 해야지 python3.7버전에 설치된다.
만약 다른 python버전이 동시에 설치되어있는 상황에서 그냥 pip install (모듈이름)으로 설치한다면, 원하는 버전에 설치안되고 엉뚱한 버전에 설치될 가능성이 있다...
python3.7 -m pip install elasticsearch
python3.7 -m pip install tensorflow_text
2. elasticsearch 설치 경로로 이동해서 elasticsearchSocket.py를 작성한다.
cd /etc/elasticsearch
import socket
import threading
import time
from elasticsearch import Elasticsearch
import tensorflow_text
import tensorflow_hub as hub
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - [%(funcName)s:%(lineno)d] - %(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)
if __name__ == '__main__':
INDEX_NAME = "kje"
SEARCH_SIZE = 3
logger.info('Downloading pre-trained embeddings from tensorflow hub...')
embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder-multilingual/3")
# elasticsearch가 설치되어있는 주소&port
client = Elasticsearch('http://192.168.0.33:9200')
def search(data):
question_vector = embed(data['transcript'])
query = {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'title_vector') + 1.0",
"params": {"question_vector": question_vector[0].numpy().tolist()}
}
}
}
try:
response = client.search(index=INDEX_NAME, body=query)
except Exception as e:
print(e)
return response
def binder(client_socket, addr):
logger.info('접속 정보 : %s',addr)
try:
chunks = []
while True:
data = client_socket.recv(1)
chunks.append(data)
if (data == b'\n'):
s = b''.join(chunks)
data = s.decode()
logger.info('넘어온 파라미터 : %s', data)
#처리 분기
type = eval(data)['type']
if(type == "search"):
valus = search(eval(data))
valus = str(valus)
client_socket.sendall(valus.encode())
break
except:
logger.info('접속이 끊김 : %s', addr)
finally:
logger.info('소켓 종료\n')
client_socket.close()
# 소켓을 만든다.
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 소켓 레벨과 데이터 형태를 설정한다.
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 서버는 복수 ip를 사용하는 pc의 경우는 ip를 지정하고 그렇지 않으면 None이 아닌 ''로 설정한다.
# 포트는 pc내에서 비어있는 포트를 사용한다. cmd에서 netstat -an | find "LISTEN"으로 확인할 수 있다.
# 현재 서버의 ip주소&port > 소캣이 실행되어 띄워질 주소&포트
server_socket.bind(('192.168.0.33', 9994))
# server 설정이 완료되면 listen를 시작한다.
server_socket.listen()
try:
# 서버는 여러 클라이언트를 상대하기 때문에 무한 루프를 사용한다.
while True:
# client로 접속이 발생하면 accept가 발생한다.
# 그럼 client 소켓과 addr(주소)를 튜플로 받는다.
client_socket, addr = server_socket.accept()
th = threading.Thread(target=binder, args = (client_socket,addr))
th.start()
except:
print("server")
finally:
logger.info('소켓 종료')
server_socket.close()
1. elasticsearchSocket.py를 실행/상태확인/정지 시킬 실행파일을 작성한다. 경로: cd /etc/elasticsearch
- start.sh
#!/bin/sh
PID=`ps -ef | grep elasticsearchSocket | grep -v "tail" | grep -v "grep"| grep -v "vim"| awk '{print $2}'`
if [ "${PID}" != "" ]
then
echo "elasticsearchSocket is aleady running ${PID}"
else
nohup /usr/local/bin/python3.7 ./elasticsearchSocket.py >> ./log/log.log 2>&1 &
PID=`ps -ef | grep elasticsearchSocket | grep -v "tail" | grep -v "grep"| awk '{print $2}'`
echo "elasticsearchSocket is running pid = ${PID}"
fi
- status.sh
#!/bin/sh
#PID=`pgrep -f elasticsearchSocket`
PID=`ps -ef | grep "elasticsearchSocket" | grep -v "grep" | grep -v "tail" | awk '{print $2}'`
if [ "${PID}" != "" ]
then
echo "elasticsearchSocket is running ${PID}"
else
echo "elasticsearchSocket is not running"
fi
- stop.sh
#!/bin/sh
# stop
#PID=`pgrep -f elasticsearchSocket`
PID=`ps -ef | grep "elasticsearchSocket" | grep -v "grep" | grep -v "tail" | awk '{print $2}'`
if [ "${PID}" != "" ]
then
echo elasticsearchSocket is stopped.
kill -9 ${PID}
exit
else
echo elasticsearchSocket is NOT running !!!
fi
2. 권한 부여를 통해서 실행할수 있는 상태로 변경한다.
chmod 755 ./status.sh
chmod 755 ./start.sh
chmod 755 ./stop.sh
3. 실행 로그가 쌓일 파일을 생성한다.
#elasticsearch 설치 경로
cd /etc/elasticsearch
#log폴더 생성
mkdir log
#log폴더로 이동
cd log
#log.log파일 생성
vim log.log
#esc누른 후 :wq입력해서 저장하고 나가기(파일이 저장됨)
:wq
4. elasticsearchSocket.py를 실행시킨다.
./start.sh
파이썬 소캣을 작성하고 실행상태로 뒀다면, 거기로 요청을 보내는 코드 역시 작성해야 활용할수 있을 것이다.
spring 환경에서 java socket을 활용해 요청을 보내보자.
- params.get("data") > 가산역은 몇호선인가요?
데이터가 있다고 보면 된다. 화면에서 입력한 데이터(가산역은 몇호선인가요?)를 java socket을 통해서 요청을 보내는것이다.
package com.slescb.admin.api.elasticsearch;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.google.gson.Gson;
@Service
public class DataSocket {
private static Socket socket = null;
private static final Logger log = LoggerFactory.getLogger(DataSocket.class);
// 서버ip주소를 입력한다.
// elasticsearchSocket.py에서 입력한 소캣이 실행되어 띄워질 주소&포트
private static final String serverIp = "192.168.0.33";
public String dataTest(Map<String, Object> params) {
String resultStr = "";
Map<String, Object> reqMap = new HashMap<String, Object>();
String receiveStr = "";
BufferedWriter bw = null;
BufferedReader br = null;
try {
// 소켓 생성
socket = new Socket(serverIp, 9994);
bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
reqMap.put("type", "search");
reqMap.put("transcript", params.get("data"));
String reqMapStr = new JSONObject(reqMap).toString();
log.info(reqMapStr);
bw.write(reqMapStr);
bw.newLine();
bw.flush();
receiveStr = br.readLine();
log.info(receiveStr);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(bw != null) {
bw.close();
} else if (br != null) {
br.close();
} else if (socket != null) {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
return resultStr;
}
}
이제 테스트를 위한 데이터를 넣어놔야 한다. 명령어로 실행시킬 파이썬 파일과 그와 연관된 json파일들을 작성해야한다.
cd /etc/elasticsearch 경로에서 작성해야한다.
1. insert.py 작성
기존 모든 index를 삭제하고(데이터 역시 같이 날아감) 새로운 index생성 및 데이터를 bulk를 통해 insert한다.
sql로 비유하자면... 모든 db를 삭제하고 새로운 db와 테이블 생성이라고 보면 된다.
import json
import time
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import tensorflow_text
import tensorflow.compat.v1 as tf
import tensorflow_hub as hub
import numpy as np
def index_data():
print("Creating the " + INDEX_NAME + " index.")
client.indices.delete(index=INDEX_NAME, ignore=[404])
with open(INDEX_FILE,"r",encoding="utf8") as index_file:
indexObj = json.load(index_file)
client.indices.create(index=INDEX_NAME, settings=indexObj["settings"], mappings=indexObj["mappings"])
docs = []
count = 0
with open(DATA_FILE,"r",encoding="utf8") as data_file:
for line in data_file:
line = line.strip()
doc = json.loads(line)
docs.append(doc)
count += 1
if count % BATCH_SIZE == 0:
index_batch(docs)
docs = []
print("Indexed {} documents.".format(count))
if docs:
index_batch(docs)
print("Indexed {} documents.".format(count))
client.indices.refresh(index=INDEX_NAME)
print("Done indexing.")
def index_batch(docs):
question = [doc["question"] for doc in docs]
question_vectors = embed(question)
requests = []
for i, doc in enumerate(docs):
request = doc
request["_op_type"] = "index"
request["_index"] = INDEX_NAME
request["question_vector"] = question_vectors[i].numpy().tolist()
requests.append(request)
bulk(client, requests)
if __name__ == '__main__':
INDEX_NAME = "kje"
INDEX_FILE = "./index.json"
DATA_FILE = "./test_data1.json"
BATCH_SIZE = 1000
SEARCH_SIZE = 3
print("Downloading pre-trained embeddings from tensorflow hub...")
embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder-multilingual/3")
text_ph = ""
print("Done.")
# elasticsearch가 설치되어있는 서버 주소&포트
client = Elasticsearch('http://127.0.0.1:9200')
index_data()
2. index.json
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"dynamic": "true",
"_source": {
"enabled": "true"
},
"properties": {
"category": {
"type":"keyword"
},
"question": {
"type": "keyword"
},
"answer": {
"type": "keyword"
},
"question_vector": {
"type": "dense_vector",
"dims": 512
}
}
}
}
3. test_data1.json
{"category":"지하철", "question":"가산역은 몇호선인가요?", "answer":"1호선입니다."}
{"category":"지하철", "question":"가산디지털단지역은 몇호선인가요?", "answer":"1호선입니다."}
{"category":"지하철", "question":"가산디지털단지역은 지하철 몇호선인가요?", "answer":"1호선입니다."}
{"category":"문자", "question":"문자로 전송해주세요.", "answer":"해당 내용을 문자로 전송해드렸습니다."}
{"category":"문자", "question":"문자로 보내주세요", "answer":"해당 내용을 문자로 보내드렸습니다."}
{"category":"문자", "question":"문자로 보내 줘", "answer":"해당 내용을 문자로 보내드렸습니다!!"}
완료됐다면 아래 명령어를 통해 insert.py를 실행시킨다.
python3.7 ./insert.py
'가산역은 몇호선인가요?'를 java socket을 통해 요청을 보내고 응답 받은걸 보면 아래와 같이 나온다.
이제 이 데이터를 목적에 맞게 파싱해서 사용하면 된다.
{
'took':93,
'timed_out':False,
'_shards':{
'total':2,
'successful':2,
'skipped':0,
'failed':0
},
'hits':{
'total':{
'value':18,
'relation':'eq'
},
'max_score':2.0,
'hits':[
{
'_index':'kje',
'_id':'uKFMXoYBIr1kIUcyPHsa',
'_score':2.0,
'_source':{
'question':'가산역은 몇호선인가요?',
'answer':'1호선입니다.',
'category':'지하철'
}
},
{
'_index':'kje',
'_id':'uaFMXoYBIr1kIUcyPHsa',
'_score':1.8661379,
'_source':{
'question':'가산디지털단지역은 몇호선인가요?',
'answer':'1호선입니다.',
'category':'지하철'
}
},
{
'_index':'kje',
'_id':'uqFMXoYBIr1kIUcyPHsa',
'_score':1.7480072,
'_source':{
'question':'가산디지털단지역은 지하철 몇호선인가요?',
'answer':'1호선입니다.',
'category':'지하철'
}
}
]
}
}
- 일치율이 높은 데이터 3개만 리턴된 형태이다.
- 원본에 가까울수록 _score가 높아지는걸 확인할 수 있다. > 유사어 검색에 활용 가능.
'Linux' 카테고리의 다른 글
[CentOS7] python3.7 설치, pip3 설치 (0) | 2023.01.26 |
---|---|
[CentOS7] Kibana 설치 (0) | 2023.01.18 |
[CentOS7] Elasticsearch 설치 (0) | 2023.01.18 |