본문 바로가기

Linux

Spring-Elasticsearch 연동 (Java Socket+Centos7+Python3.7)

개발환경: Centos7, Python3.7, Elasticsearch8.3.3

 


1. 들어가기 전에...

Spring-Elasticsearch 연동을 검색하면 Docker를 사용해 설치하고... Gradle에 필요한 컴파일 요소들 넣고.. 관련 클래스 상속받은 파일에서 코드작성.. 조회.. 인데 까보면 상당히 복잡한 구조다. json, http 등 설정 파일 불러오고 사용하고, jpa관련 소스 작성 등...

 

내가 사용한 방법은 다음과 같다.

1. Elasticsearch가 설치되어있는 서버(Centos7)에 연동용 Python소스(elasticsearchSocket.py) 작성

- Python 내에서 listen으로 특정 포트에 띄워둘수 있도록 한다.

2. .sh 실행파일로 elasticsearchSocket.py실행(특정 포트에 띄워지게됨)

3. Spring안에서 Java Socket으로 서버의 elasticsearchSocket.py에 요청

4. elasticsearchSocket.py 안에서 Elasticsearch에 연결. 원하는 작업 처리 및 결과 리턴

5. Spring에서 결과 확인

 

이렇게 구현하는것도 쉬운 구조는 아닌데 굳이 이렇게 한 이유는

1. Query DSL사용이 가능하다.

2. tensorflow_hub 를 사용해서 데이터 백터값으로 변환 가능

- 이렇게 변환한 데이터로 유사어 검색>일치율 확인다양하게 활용 가능

이 두가지 때문이다.

 

따라서 이 글의 목표는...

1. 사용자가 질문을 입력하면

2. 질문을 python socket을 통해서 백터값으로 변환&elasticsearch로 검색

3. 일치율이 높은 상위3개 답변 리턴

 

그래서 사용자 입장에서 이 기능을 사용했을시 다음과 같은 경험을 얻도록 할 예정이다.

사용자: 가산역은 몇호선인가요? (질문입력)

elasticsearch 검색 결과//

1. 질문: 가산역은 몇호선인가요?

    답변: 1호선입니다.

 

2. 질문: 가산디지털단지역은 몇호선인가요?

    답변: 1호선입니다.

 

3. 질문: 가산디지털단지역은 지하철 몇호선인가요?

    답변: 1호선입니다.

이런식으로 활용할 예정이다.

 

2. elasticsearch, Python3.7/pip3설치

내가 이전의 작성해둔 글을 그대로 따라하면 된다. 현재 목표에서 키바나는 필요없으므로 설치를 생략한다.

만약 직접 조회해서 일단 테스트해보는걸 원하면 키바나 설치도 진행하면 된다.

1. elasticsearch 설치

https://kje1218.tistory.com/14

 

1-1. Kibana 설치(선택)

https://kje1218.tistory.com/15

 

2. Python3.7/pip3설치

https://kje1218.tistory.com/16

 

3. elasticsearchSocket.py 작성

1. 사용할 모듈들을 설치해준다.

※ 설치할때 python3.7 -m pip install (모듈이름) 으로 해야지 python3.7버전에 설치된다.

만약 다른 python버전이 동시에 설치되어있는 상황에서 그냥 pip install (모듈이름)으로 설치한다면, 원하는 버전에 설치안되고 엉뚱한 버전에 설치될 가능성이 있다...

python3.7 -m pip install elasticsearch 
python3.7 -m pip install tensorflow_text

 

2. elasticsearch 설치 경로로 이동해서 elasticsearchSocket.py를 작성한다.

cd /etc/elasticsearch

 

import socket
import threading
import time
from elasticsearch import Elasticsearch
import tensorflow_text
import tensorflow_hub as hub
import logging

logger = logging.getLogger(__name__)

logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - [%(funcName)s:%(lineno)d] - %(message)s")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

if __name__ == '__main__':
    INDEX_NAME = "kje"
    SEARCH_SIZE = 3
    logger.info('Downloading pre-trained embeddings from tensorflow hub...')
    embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder-multilingual/3")

	# elasticsearch가 설치되어있는 주소&port
    client = Elasticsearch('http://192.168.0.33:9200')
   
def search(data):

    question_vector = embed(data['transcript'])

    query = {
        "script_score": {
            "query": {"match_all": {}},
            "script": {
                "source": "cosineSimilarity(params.query_vector, 'title_vector') + 1.0",
                "params": {"question_vector": question_vector[0].numpy().tolist()}
            }
        }
    }

    try:
        response = client.search(index=INDEX_NAME, body=query)
    except Exception as e:
        print(e)

    return response

def binder(client_socket, addr):
    logger.info('접속 정보 : %s',addr)
    try:
        chunks = []
        while True:
            data = client_socket.recv(1)
            chunks.append(data)
            if (data == b'\n'):
                s = b''.join(chunks)
                data = s.decode()

                logger.info('넘어온 파라미터 : %s', data)

                #처리 분기
                type = eval(data)['type']
                if(type == "search"):
                    valus = search(eval(data))
                    valus = str(valus)
                    client_socket.sendall(valus.encode())
                    break
    except:
        logger.info('접속이 끊김 : %s', addr)
    finally:
        logger.info('소켓 종료\n')
    client_socket.close()
 
# 소켓을 만든다.
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 소켓 레벨과 데이터 형태를 설정한다.
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 서버는 복수 ip를 사용하는 pc의 경우는 ip를 지정하고 그렇지 않으면 None이 아닌 ''로 설정한다.
# 포트는 pc내에서 비어있는 포트를 사용한다. cmd에서 netstat -an | find "LISTEN"으로 확인할 수 있다.
# 현재 서버의 ip주소&port > 소캣이 실행되어 띄워질 주소&포트
server_socket.bind(('192.168.0.33', 9994))
# server 설정이 완료되면 listen를 시작한다.
server_socket.listen()
 
try:
  # 서버는 여러 클라이언트를 상대하기 때문에 무한 루프를 사용한다.
  while True:
    # client로 접속이 발생하면 accept가 발생한다.
    # 그럼 client 소켓과 addr(주소)를 튜플로 받는다.
    client_socket, addr = server_socket.accept()
    th = threading.Thread(target=binder, args = (client_socket,addr))
    th.start()
except:
  print("server")
finally:
  logger.info('소켓 종료')
  server_socket.close()

 

4. start.sh/stop.sh/status.sh 실행파일 작성

1. elasticsearchSocket.py를 실행/상태확인/정지 시킬 실행파일을 작성한다. 경로: cd /etc/elasticsearch

- start.sh

#!/bin/sh
PID=`ps -ef | grep elasticsearchSocket | grep -v "tail" | grep -v "grep"| grep -v "vim"| awk '{print $2}'`
if [ "${PID}" != "" ]
then
    echo "elasticsearchSocket is aleady running ${PID}"
else
    nohup /usr/local/bin/python3.7 ./elasticsearchSocket.py >> ./log/log.log 2>&1 &

    PID=`ps -ef | grep elasticsearchSocket | grep -v "tail" | grep -v "grep"| awk '{print $2}'`
    echo "elasticsearchSocket is running pid = ${PID}"
fi

- status.sh

#!/bin/sh

#PID=`pgrep -f elasticsearchSocket`
PID=`ps -ef | grep "elasticsearchSocket" | grep -v "grep" | grep -v "tail" | awk '{print $2}'`

if [ "${PID}" != "" ]
then
	echo "elasticsearchSocket is running ${PID}"
else
	echo "elasticsearchSocket is not running"
fi

- stop.sh

#!/bin/sh
# stop
#PID=`pgrep -f elasticsearchSocket`
PID=`ps -ef | grep "elasticsearchSocket" | grep -v "grep" | grep -v "tail" | awk '{print $2}'`
if [ "${PID}" != "" ]
then
   echo elasticsearchSocket is stopped.
   kill -9 ${PID}
   exit
else
   echo elasticsearchSocket is NOT running !!!
fi

 

2. 권한 부여를 통해서 실행할수 있는 상태로 변경한다.

chmod 755 ./status.sh
chmod 755 ./start.sh
chmod 755 ./stop.sh

 

3. 실행 로그가 쌓일 파일을 생성한다.

#elasticsearch 설치 경로
cd /etc/elasticsearch 
#log폴더 생성
mkdir log
#log폴더로 이동
cd log
#log.log파일 생성
vim log.log
#esc누른 후 :wq입력해서 저장하고 나가기(파일이 저장됨)
:wq

다 끝났다면 /etc/elasticsearch 경로 파일 목록은 이런 상태가 될 것이다.

 

4. elasticsearchSocket.py를 실행시킨다.

./start.sh

 

elasticsearchSocket.py 실행 로그. tail -f /etc/elasticsearch/log/log.log <명령어 입력하면 실시간으로 갱신되는 로그를 확인할 수 있다.

 

 

5. Spring-Java Socket 사용

파이썬 소캣을 작성하고 실행상태로 뒀다면, 거기로 요청을 보내는 코드 역시 작성해야 활용할수 있을 것이다.

spring 환경에서 java socket을 활용해 요청을 보내보자.

 

- params.get("data") > 가산역은 몇호선인가요? 

데이터가 있다고 보면 된다. 화면에서 입력한 데이터(가산역은 몇호선인가요?)를 java socket을 통해서 요청을 보내는것이다.

package com.slescb.admin.api.elasticsearch;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.google.gson.Gson;

@Service
public class DataSocket {
	
	private static Socket socket = null;
	private static final Logger log = LoggerFactory.getLogger(DataSocket.class);
    // 서버ip주소를 입력한다.
    // elasticsearchSocket.py에서 입력한 소캣이 실행되어 띄워질 주소&포트
	private static final String serverIp = "192.168.0.33";
	
	public String dataTest(Map<String, Object> params) {
		String resultStr = "";
		Map<String, Object> reqMap = new HashMap<String, Object>();
		String receiveStr = "";
		
		BufferedWriter bw = null;
		BufferedReader br = null;
		
		try {
			
			// 소켓 생성
			socket = new Socket(serverIp, 9994);
			
			bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
			br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			
			reqMap.put("type", "search");
			reqMap.put("transcript", params.get("data"));

			String reqMapStr = new JSONObject(reqMap).toString();
			
			log.info(reqMapStr);
			
			bw.write(reqMapStr);
			
			bw.newLine();
			
			bw.flush();
			
			receiveStr = br.readLine();
			
			log.info(receiveStr);
			
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if(bw != null) {
					bw.close(); 
				} else if (br != null) {
					br.close();
				} else if (socket != null) {
					socket.close();
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
		return resultStr;
	}
    
}

 

6. insert.py, test_data1.json, index.json 작성

이제 테스트를 위한 데이터를 넣어놔야 한다. 명령어로 실행시킬 파이썬 파일과 그와 연관된 json파일들을 작성해야한다.

cd /etc/elasticsearch 경로에서 작성해야한다.

 

1. insert.py 작성

기존 모든 index를 삭제하고(데이터 역시 같이 날아감) 새로운 index생성 및 데이터를 bulk를 통해 insert한다.

sql로 비유하자면... 모든 db를 삭제하고 새로운 db와 테이블 생성이라고 보면 된다.

import json
import time

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

import tensorflow_text
import tensorflow.compat.v1 as tf
import tensorflow_hub as hub
import numpy as np

def index_data():
    print("Creating the " + INDEX_NAME + " index.")
    client.indices.delete(index=INDEX_NAME, ignore=[404])

    with open(INDEX_FILE,"r",encoding="utf8") as index_file:
        indexObj = json.load(index_file)

        client.indices.create(index=INDEX_NAME, settings=indexObj["settings"], mappings=indexObj["mappings"])

    docs = []
    count = 0

    with open(DATA_FILE,"r",encoding="utf8") as data_file:
        for line in data_file:
            line = line.strip()

            doc = json.loads(line)

            docs.append(doc)
            count += 1

            if count % BATCH_SIZE == 0:
                index_batch(docs)
                docs = []
                print("Indexed {} documents.".format(count))
        
        if docs:
            index_batch(docs)
            print("Indexed {} documents.".format(count))

    client.indices.refresh(index=INDEX_NAME)
    print("Done indexing.")

def index_batch(docs):
    question = [doc["question"] for doc in docs]
    question_vectors = embed(question)

    requests = []
    for i, doc in enumerate(docs):
        request = doc
        request["_op_type"] = "index"
        request["_index"] = INDEX_NAME
        request["question_vector"] = question_vectors[i].numpy().tolist()

        requests.append(request)

    bulk(client, requests)


if __name__ == '__main__':
    INDEX_NAME = "kje"
    INDEX_FILE = "./index.json"

    DATA_FILE = "./test_data1.json"
    BATCH_SIZE = 1000

    SEARCH_SIZE = 3

    print("Downloading pre-trained embeddings from tensorflow hub...")

    embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder-multilingual/3")
    text_ph = ""
    
    print("Done.")

	# elasticsearch가 설치되어있는 서버 주소&포트
    client = Elasticsearch('http://127.0.0.1:9200')

    index_data()

 

2. index.json

{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 1
  },
   "mappings": {
    "dynamic": "true",
    "_source": {
      "enabled": "true"
    },
    "properties": {
      "category": {
        "type":"keyword"
      },
      "question": {
        "type": "keyword"
      },
      "answer": {
        "type": "keyword"
      },
      "question_vector": {
        "type": "dense_vector",
        "dims": 512
      }
    }
  }
}

 

3. test_data1.json

{"category":"지하철", "question":"가산역은 몇호선인가요?", "answer":"1호선입니다."}
{"category":"지하철", "question":"가산디지털단지역은 몇호선인가요?", "answer":"1호선입니다."}
{"category":"지하철", "question":"가산디지털단지역은 지하철 몇호선인가요?", "answer":"1호선입니다."}
{"category":"문자", "question":"문자로 전송해주세요.", "answer":"해당 내용을 문자로 전송해드렸습니다."}
{"category":"문자", "question":"문자로 보내주세요", "answer":"해당 내용을 문자로 보내드렸습니다."}
{"category":"문자", "question":"문자로 보내 줘", "answer":"해당 내용을 문자로 보내드렸습니다!!"}

 

완료됐다면 아래 명령어를 통해 insert.py를 실행시킨다.

python3.7 ./insert.py

로그가 올라오면서 실행되는 insert.py

 

7. 테스트

'가산역은 몇호선인가요?'를 java socket을 통해 요청을 보내고 응답 받은걸 보면 아래와 같이 나온다.

이제 이 데이터를 목적에 맞게 파싱해서 사용하면 된다.

 

{
	'took':93,
	'timed_out':False,
	'_shards':{
		'total':2,
		'successful':2,
		'skipped':0,
		'failed':0
	},
	'hits':{
		'total':{
			'value':18,
			'relation':'eq'
		},
		'max_score':2.0,
		'hits':[
			{
				'_index':'kje',
				'_id':'uKFMXoYBIr1kIUcyPHsa',
				'_score':2.0,
				'_source':{
					'question':'가산역은 몇호선인가요?',
					'answer':'1호선입니다.',
					'category':'지하철'
				}
			},
			{
				'_index':'kje',
				'_id':'uaFMXoYBIr1kIUcyPHsa',
				'_score':1.8661379,
				'_source':{
					'question':'가산디지털단지역은 몇호선인가요?',
					'answer':'1호선입니다.',
					'category':'지하철'
				}
			},
			{
				'_index':'kje',
				'_id':'uqFMXoYBIr1kIUcyPHsa',
				'_score':1.7480072,
				'_source':{
					'question':'가산디지털단지역은 지하철 몇호선인가요?',
					'answer':'1호선입니다.',
					'category':'지하철'
				}
			}
		]
	}
}

- 일치율이 높은 데이터 3개만 리턴된 형태이다.

- 원본에 가까울수록 _score가 높아지는걸 확인할 수 있다. > 유사어 검색에 활용 가능.

 

 

 

'Linux' 카테고리의 다른 글

[CentOS7] python3.7 설치, pip3 설치  (0) 2023.01.26
[CentOS7] Kibana 설치  (0) 2023.01.18
[CentOS7] Elasticsearch 설치  (0) 2023.01.18