YouTube 내용 요약
아래는 YouTube 동영상에서 음성을 추출하고 이를 텍스트로 변환한 후 요약하는 기능입니다. LangChain의 MapReduceDocumentsChain을 사용하여 분석과 요약을 수행합니다.
이 코드는 다음과 같은 기능을 포함합니다:
- YouTube 비디오에서 오디오 추출
- Whisper 모델을 사용한 음성-텍스트 변환
- LangChain 기반 텍스트 요약
- Streamlit을 통한 UI 구성
주요 기술
이 시스템은 다음과 같은 주요 기술과 라이브러리를 활용합니다:
- LangChain: LLM을 활용한 문서 요약
- yt-dlp: YouTube 오디오 다운로드
- Whisper: OpenAI의 음성 인식 모델
- Streamlit: 간단한 대화형 UI
- PyTorch: GPU 가속 지원
코드 개요
1. 클래스 초기화 및 환경 설정
import torch
import yt_dlp
import whisper
import re
import streamlit as st
from datetime import datetime
from langchain.document_loaders import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import LLMChain, MapReduceDocumentsChain
from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOllama
class MyAssistantYoutube:
def __init__(self):
self.device = 0 if torch.cuda.is_available() else -1
self.start_time = datetime.now()
self.status_container = st.empty()
self.llm = ChatOllama(model="exaone3.5", temperature=0)
이 코드는 시스템을 초기화하고 GPU 사용 여부를 확인합니다.
2. YouTube 링크 처리
def extract_url(self, request_string):
url_pattern = r'(https?://[^\s]+)'
match = re.search(url_pattern, request_string)
return match.group(0) if match else None
def remove_url(self, request_string):
url_pattern = r'https?://[^\s]+'
return re.sub(url_pattern, '', request_string).strip()
이 함수들은 사용자의 질문에서 URL을 추출하고 제거하는 역할을 합니다.
3. YouTube 오디오 다운로드
def download_audio(self, youtube_link):
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': '.downloaded_audio.%(ext)s',
'postprocessors': [{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "320",
}],
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_link])
except yt_dlp.utils.DownloadError:
return "다운로드 실패"
YouTube에서 오디오를 추출하고 MP3 파일로 저장하는 기능을 담당합니다.
4. 음성-텍스트 변환
def transcribe_audio(self):
model = whisper.load_model("base")
result = model.transcribe(".downloaded_audio.mp3")
return result["text"]
Whisper 모델을 활용하여 음성을 텍스트로 변환하는 과정입니다.
5. 텍스트 요약
def summarize_text(self, text):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=200)
docs = [Document(page_content=x) for x in text_splitter.split_text(text)]
map_reduce_chain = MapReduceDocumentsChain(
llm_chain=LLMChain(llm=self.llm, prompt=ChatPromptTemplate.from_messages([("human", "요약: {docs}")]))
)
return map_reduce_chain.run(docs)
LangChain의 MapReduce 체인을 사용하여 텍스트를 요약합니다.
결론
이 코드는 는 YouTube에서 정보를 추출하고 요약합니다. LangChain, Whisper, yt-dlp 등의 최신 기술을 활용하여 영상 데이터를 효과적으로 처리할 수 있습니다. 향후 개선 사항으로는 더 빠른 요약 속도, 다양한 언어 지원 등이 있습니다.
'''
# reference from https://python.langchain.com/docs/versions/migrating_chains/map_reduce_chain/
'''
import os
from typing import List, Optional, TypedDict
import torch
from langchain_ollama import ChatOllama
from datetime import datetime
import streamlit as st
import re
import yt_dlp
import whisper
from langchain.schema.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.llm import LLMChain
from langchain_core.output_parsers import StrOutputParser
# MapReduceChain 구성
from langchain.chains import MapReduceDocumentsChain, ReduceDocumentsChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain_core.prompts import ChatPromptTemplate
# 그래프 상태 속성 정의
class State(TypedDict):
question: str
generation: str
data: List[str]
code: List[str]
class MyAssistantYoutube:
def __init__(
self,
myassistant_path: Optional[str] = None,
myassistant_description: Optional[str] = None,
) -> None:
# self.myassistant_path = myassistant_path
# self.myassistant_description = myassistant_description
status_container = st.empty()
# GPU 사용 여부 확인
self.device = 0 if torch.cuda.is_available() else -1 # GPU가 사용 가능하면 0, 아니면 -1 (CPU)
self.start_time = datetime.now() # 현재 시간을 시작 시간으로 설정
time_difference = self.time_difference(self.start_time)
print(f"\033[34m소요 시간: {time_difference} (클래스 초기화 및 모델 로딩, device={self.device}, 0:GPU)\033[0m") # 진행 시간을 출력
self.status_container = st.empty()
self.status_container.write(f"유튜브 분석 에이전트가 확인하겠습니다. {time_difference}")
self.llm = ChatOllama(model="exaone3.5", temperature=0)
def youtube_answer(self, state: State):
"""
데이터 검색을 수행합니다.
Args:
state (dict): 현재 그래프 상태
Returns:
state (dict): 검색된 데이터를 포함한 새로운 State
"""
time_difference = self.time_difference(self.start_time)
# self.status_container.write(f"사용자 질문을 분석 중입니다. {time_difference}")
print(f"\033[34m소요 시간: {time_difference} (youtube_answer 시작)\033[0m") # 진행 시간을 출력
print(f"MyAssistantPics/youtube_answer(state): {state}")
# state['question'] 에서 질문과 URL을 분리해서 query4url과 image_file에 저장
youtube_link = self.extract_url(state['question'])
youtube_link = "https://youtu.be/mCyY8pQDpJM?si=sd324t9fTg9EDZBM"
print(f"MyAssistantPics/pics_answer(youtube_link): {youtube_link}")
query_from_question = self.remove_url(state['question'])
print(f"MyAssistantPics/pics_answer(query_from_question): {query_from_question}")
time_difference = self.time_difference(self.start_time)
print(f"\033[34m소요 시간: {time_difference} (YouTube 비디오에서 음성 추출 시작)\033[0m") # 진행 시간을 출력
'''
youtube_link 문자열 변수 내용과 load_youtube_link() 함수에서 리턴한 문자열이 다르면:
리턴한 문자열이 같으면:
'''
saved_link = self.load_youtube_link()
if youtube_link != saved_link: # 새로운 유튜브 컨텐츠에 대한 요청
print("MyAssistantPics/youtube_answer: 새로운 유튜브 컨텐츠 저장")
# YouTube 비디오에서 음성 추출
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': '.downloaded_audio.%(ext)s',
'postprocessors': [{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "320",
}],
}
# YouTube 비디오에서 동영상 추출
# ydl_opts = {
# 'format': 'bestvideo/best'+'bestaudio/best',
# 'outtmpl': '.downloaded_video.%(ext)s',
# 'postprocessors': [{
# "key": "FFmpegVideoConvertor",
# "preferredcodec": "mp4",
# "preferredquality": "320",
# }],
# }
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_link])
except yt_dlp.utils.DownloadError as e:
print(f"Error downloading audio: {e}")
# 에러 메시지를 로그에 기록하거나, 사용자에게 알리고 싶다면 여기에 추가할 수 있습니다.
print("MyAssistantPics/youtube_answer(return): 해당 유튜브 컨텐츠를 찾을 수 없습니다.")
answer = "해당 유튜브 컨텐츠를 찾을 수 없습니다."
# st.error(f"Error downloading audio: {e}")
return {
"question": None,
"generation": answer,
"code": [],
"data": [],
}
except Exception as e:
print(f"An unexpected error occurred: {e}")
answer = "해당 유튜브 컨텐츠를 읽을 수 없습니다."
return {
"question": None,
"generation": answer,
"code": [],
"data": [],
}
else: # 동일한 유튜브 컨텐츠에 대한 요청이라면,
print("MyAssistantPics/youtube_answer: 동일한 유튜브 컨텐츠는 다운로드하지 않음")
time_difference = self.time_difference(self.start_time)
print(f"\033[34m소요 시간: {time_difference} (YouTube 비디오에서 음성 추출 완료)\033[0m") # 진행 시간을 출력
print(f"\033[34m소요 시간: {time_difference} (음성에서 텍스트 추출 시작)\033[0m") # 진행 시간을 출력
self.status_container.write(f"유튜브 내용에서 텍스트 추출 중입니다. {time_difference}")
'''
youtube_link 문자열 변수 내용과 load_youtube_link() 함수에서 리턴한 문자열이 다르면:
youtube_link 문자열 변수 내용을 ./.cache_youtube_link에 저장
result["text"] 에 저장된 문자열을 ./.cache_youtube_transcription 에 저장
리턴한 문자열이 같으면 ./.cache_youtube_transcription 에서 읽어서 result 문자열 변수에 저장한다.
'''
saved_link = self.load_youtube_link()
if youtube_link != saved_link: # 새로운 유튜브 컨텐츠에 대한 요청
print("MyAssistantPics/youtube_answer: 새로운 유튜브 컨텐츠에 대한 요청")
self.save_youtube_link(youtube_link)
# 음성에서 텍스트 추출
model = whisper.load_model("base") # "tiny", "base", "small", "medium", "large"
result = model.transcribe(".downloaded_audio.mp3")
youtube_transcription_string = result["text"]
self.save_youtube_transcription(youtube_transcription_string)
else: # 동일한 유튜브 컨텐츠에 대한 요청이라면,
print("MyAssistantPics/youtube_answer: 동일한 유튜브 컨텐츠에 대한 요청")
youtube_transcription_string = self.load_youtube_transcription()
# # 추출된 텍스트의 첫 1000자
# print("추출된 텍스트의 첫 1000자: ", result["text"][:1000])
# # 세그먼트 정보 확인
# print("세그먼트 정보: ", result["segments"][:1])
time_difference = self.time_difference(self.start_time)
print(f"\033[34m소요 시간: {time_difference} (음성에서 텍스트 추출 완료)\033[0m") # 진행 시간을 출력
print("추출된 텍스트의 길이: ", len(youtube_transcription_string))
time_difference = self.time_difference(self.start_time)
print(f"\033[34m소요 시간: {time_difference} (음성에서 추출한 텍스트 요약 시작)\033[0m") # 진행 시간을 출력
self.status_container.write(f"유튜브 내용을 요약합니다. {time_difference}")
# 텍스트를 요약
text_splitter = RecursiveCharacterTextSplitter( # RecursiveCharacterTextSplitter 초기화
chunk_size=4000, # 원하는 청크 크기
chunk_overlap=200 # 청크 간의 중첩 크기
)
docs = [Document(page_content=x) for x in text_splitter.split_text(youtube_transcription_string)]
split_docs = text_splitter.split_documents(docs)
print("split_docs 정보: ", split_docs)
current_datetime_string = self.get_current_datetime_string()
# Map
# Map 프롬프트
map_template = """다음은 여러 개의 문서입니다.
{docs}
이 문서 목록을 기반으로 주요 테마를 식별해 주세요.
"""
map_prompt = ChatPromptTemplate([
("human", current_datetime_string),
("human", map_template),
("human", query_from_question),
])
# Map 체인
map_chain = LLMChain(llm=self.llm, prompt=map_prompt)
# Reduce
# Reduce 프롬프트
reduce_template = """당신은 리포터입니다. 물론입니다 등의 불필요한 말은 사용하지 마세요. 다음은 여러 개의 요약입니다:
{doc_summaries}
이 요약들을 바탕으로 주요 테마를 읽기 쉽도록 나누어 자연스러운 문맥으로 이야기 해주세요.
"""
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
# Reduce 체인
reduce_chain = LLMChain(llm=self.llm, prompt=reduce_prompt)
# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
# StuffDocumentsChain는 문서를 결합하는 체인으로, 각 문서의 내용을 일정한 형식으로 결합하여 LLM이 처리할 수 있도록 준비
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="doc_summaries"
)
# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=4000,
)
# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
sum_result = map_reduce_chain.run(split_docs)
time_difference = self.time_difference(self.start_time)
print(f"\033[34m소요 시간: {time_difference} (음성에서 추출한 텍스트 요약 완료)\033[0m") # 진행 시간을 출력
self.status_container.write("")
# # additionalquestion
# # <additional question> 내용이 없다면 아무 것도 적지 말고 ^^ 만 적어주세요. 추가 설명도 적지 마세요.
# additionalquestion_template = """<contents>{sum_result}</contents>
# <additional question>{query_from_question}</additional question>
# <additional question> tag에 내용이 있다면 <contents> 내용을 바탕으로 추가 질문에 대한 의견만 자연스로운 구어체로 적어주세요
# """
# chat_prompt = ChatPromptTemplate([("human", additionalquestion_template)])
# chain_additionalquestion = chat_prompt | self.llm | StrOutputParser()
# # invoke 메서드에 전달할 인자를 딕셔너리 형태로 수정
# input_data = {
# "sum_result": sum_result,
# "query_from_question": query_from_question,
# }
# answer = sum_result + "\n\n" + chain_additionalquestion.invoke(input_data)
answer = sum_result
print(f"MyAssistantPics/youtube_answer: {answer}")
data = youtube_link
state["data"].append(data)
print(state["data"])
return {
"question": None,
"generation": answer,
"code": [],
"data": state["data"],
}
# 예시
# request4url = "다음 질문의 http://images.cocodataset.org/val2017/000000039769.jpg 에서 URL만 추출해줘."
# url = extract_url(request4url)
# print(url) # 출력: http://images.cocodataset.org/val2017/000000039769.jpg
def extract_url(self, request_string):
# 정규 표현식을 사용하여 URL 추출
url_pattern = r'(https?://[^\s]+)'
match = re.search(url_pattern, request_string)
if match:
return match.group(0)
return None
# # 예시
# request4url = "다음 질문의 http://images.cocodataset.org/val2017/000000039769.jpg 에서 URL만 추출해줘."
# result = remove_url(request4url)
# print(result) # 출력: 다음 질문의 에서 URL만 추출해줘.
def remove_url(self, request_string):
# 정규 표현식을 사용하여 URL을 찾아서 제거
url_pattern = r'https?://[^\s]+'
cleaned_string = re.sub(url_pattern, '', request_string)
return cleaned_string.strip() # 앞뒤 공백 제거
def extract_assistant_content(self, data):
for item in data:
# 'generated_text' 리스트에서 'role'이 'assistant'인 항목을 찾기
for generated in item['generated_text']:
if generated['role'] == 'assistant':
return generated['content']
return None
# # 함수 호출 예시
# result = get_current_datetime_string()
def get_current_datetime_string(self):
# 현재 날짜와 시간을 가져옵니다.
now = datetime.now()
# 날짜와 시간을 포맷팅합니다.
formatted_date = now.strftime("%Y년 %m월 %d일")
formatted_time = now.strftime("%H시 %M분")
# 결과 문자열을 반환합니다.
return f"오늘 날짜는 {formatted_date} 입니다. 현재 시간은 {formatted_time}입니다."
def save_youtube_link(self, youtube_link):
"""
youtube_link 문자열을 ./.cache_youtube_link 파일에 저장합니다.
"""
with open('./.cache_youtube_link', 'w') as f:
f.write(youtube_link)
def load_youtube_link(self):
"""
./.cache_youtube_link 파일에서 문자열을 읽어서 리턴합니다.
파일이 존재하지 않으면 None을 리턴합니다.
"""
try:
with open('./.cache_youtube_link', 'r') as f:
return f.read().strip()
except FileNotFoundError:
return None
def save_youtube_transcription(self, youtube_transcription):
with open('./.cache_youtube_transcription', 'w') as f:
f.write(youtube_transcription)
def load_youtube_transcription(self):
try:
with open('./.cache_youtube_transcription', 'r') as f:
return f.read().strip()
except FileNotFoundError:
return None
# 시간 차이 계산 및 출력
# difference = time_difference(start_time)
# status_container.write(f"내용 분석 중입니다. {difference}")
def time_difference(self, start_time):
# 현재 시간 계산
current_time = datetime.now()
# 시간 차이 계산
difference = current_time - start_time
# 초 단위로 변환
total_seconds = difference.total_seconds()
# 분과 초로 변환
# minutes, seconds = divmod(total_seconds, 60)
# 포맷팅된 문자열 생성
# formatted_time = f"{int(minutes)}분 {int(seconds):02d}초"
formatted_time = f"(Lab time: {int(total_seconds):d}초)"
return formatted_time
0 댓글