VLM을 이용해서 이미지를 요약하는 방법

VLM을 이용해서 이미지를 요약하는 방법

1. 개요

최근 Vision-Language Model(VLM)을 활용한 이미지 분석 기술이 발전하면서, 이미지에서 의미 있는 정보를 추출하고 요약하는 것이 가능해졌습니다. 본 포스트에서는 HuggingFaceTB/SmolVLM-Instruct 모델을 활용하여 이미지를 분석하고 요약하는 방법을 설명합니다.

2. 환경 설정

필수 라이브러리 설치

import os
import torch
import requests
import re
from typing import List, Optional, TypedDict
from datetime import datetime
from PIL import Image
from transformers import AutoProcessor, LlavaOnevisionForConditionalGeneration, pipeline, AutoTokenizer, AutoModelForVision2Seq
from langchain_ollama import ChatOllama
import streamlit as st

3. 모델 설정 및 초기화

class MyAssistantPics:
    def __init__(self, myassistant_path: Optional[str] = None, myassistant_description: Optional[str] = None) -> None:
        self.myassistant_path = myassistant_path
        self.myassistant_description = myassistant_description
        self.device = 0 if torch.cuda.is_available() else -1  # GPU 사용 여부 확인
        self.start_time = datetime.now()
        self.status_container = st.empty()
        self.status_container.write("이미지 분석 에이전트가 확인하겠습니다.")

        model_id = "HuggingFaceTB/SmolVLM-Instruct"
        self.model = AutoModelForVision2Seq.from_pretrained(
            model_id, torch_dtype=torch.bfloat16,
            _attn_implementation="flash_attention_2" if self.device == 0 else "eager"
        ).to('cuda' if self.device == 0 else 'cpu')

        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.processor = AutoProcessor.from_pretrained(model_id)
        
        adapter_path = "/home/cloud/stevenyk.kim/model_huggingface/SmolVLM-Instruct-trl-sft-K-Food"
        self.model.load_adapter(adapter_path)
        
        self.pipe = pipeline("image-text-to-text", model=self.model, tokenizer=self.tokenizer, processor=self.processor, device=self.device)

4. 이미지 분석 함수 구현

def pics_answer(self, state: State):
    image_file = self.extract_url(state['question'])
    query4url = self.remove_url(state['question'])

    chat_model = ChatOllama(model="exaone3.5", temperature=0)
    question_eng = chat_model.invoke(query4url + " 을 영어로 번역해줘.")

    messages = [{
        "role": "user",
        "content": [
            {"type": "image", "url": image_file},
            {"type": "text", "text": question_eng.content},
        ],
    }]

    if image_file is not None:
        out = self.pipe(text=messages, max_new_tokens=1000)
        assistant_content = self.extract_assistant_content(out)
    else:
        assistant_content = query4url

    response = chat_model.invoke("<내용>" + assistant_content + "</내용> <내용>을 한국어로 적어줘.")
    self.answer = response.content
    state["data"].append(image_file)
    
    return {"question": None, "generation": self.answer, "code": [], "data": state["data"]}

5. 보조 함수 구현

URL 추출 및 제거

def extract_url(self, request_string):
    url_pattern = r'(https?://[^\s]+)'
    match = re.search(url_pattern, request_string)
    return match.group(0) if match else None

def remove_url(self, request_string):
    url_pattern = r'https?://[^\s]+'
    return re.sub(url_pattern, '', request_string).strip()
def time_difference(self, start_time):
    current_time = datetime.now()
    total_seconds = (current_time - start_time).total_seconds()
    return f"(Lab time: {int(total_seconds)}초)"

6. 결론

본 포스트에서는 HuggingFaceTB/SmolVLM-Instruct 모델을 활용하여 이미지를 요약하는 방법을 설명하였습니다. 본 기술을 활용하면 다양한 분야에서 이미지의 의미를 자동으로 추출하고 요약하는 데 활용할 수 있습니다.

''' 

# reference from https://huggingface.co/llava-hf/llava-onevision-qwen2-7b-ov-chat-hf

https://media-cdn.tripadvisor.com/media/attractions-splice-spp-720x480/07/be/71/01.jpg 그림 속 건물에 대해서 자세히 설명하고, 이 건물이 갖는 역사적인 의미에 대해서 설명해줘

'''

import os

from typing import List, Optional, TypedDict

import requests

from PIL import Image


import torch

from transformers import AutoProcessor, LlavaOnevisionForConditionalGeneration, pipeline

from langchain_ollama import ChatOllama

from datetime import datetime

import streamlit as st


import re



# 그래프 상태 속성 정의 

class State(TypedDict):

    question: str

    generation: str

    data: List[str]

    code: List[str]



class MyAssistantPics:

    def __init__(

        self, 

        myassistant_path: Optional[str] = None,

        myassistant_description: Optional[str] = None,

    ) -> None:

        self.myassistant_path = myassistant_path

        self.myassistant_description = myassistant_description


        # GPU 사용 여부 확인

        self.device = 0 if torch.cuda.is_available() else -1  # GPU가 사용 가능하면 0, 아니면 -1 (CPU)


        self.start_time = datetime.now()  # 현재 시간을 시작 시간으로 설정

        time_difference = self.time_difference(self.start_time)

        self.status_container = st.empty()

        self.status_container.write(f"이미지 분석 에이전트가 확인하겠습니다. {time_difference}")

        print(f"\033[34m소요 시간: {time_difference}초 (클래스 초기화 및 모델 로딩, device={self.device}, 0:GPU)\033[0m") # 진행 시간을 출력


        # self.pipe = pipeline("image-text-to-text", model="llava-hf/llava-onevision-qwen2-0.5b-ov-hf", device=self.device)

        # print("model = llava-hf/llava-onevision-qwen2-0.5b-ov-hf")


        # self.pipe = pipeline("image-text-to-text", model="Qwen/Qwen2-VL-7B-Instruct", device=self.device)

        # print("model = Qwen/Qwen2-VL-7B-Instruct") # Huggingface 모델 로딩에 25초 걸린다.


        self.pipe = pipeline("image-text-to-text", model="HuggingFaceTB/SmolVLM-Instruct", device=self.device)

        print("model = HuggingFaceTB/SmolVLM-Instruct") # Huggingface 모델 로딩에 10초 걸린다.


        # trainable params: 11,269,248 || all params: 2,257,542,128 || trainable%: 0.4992

        # QLoRA Training 10,000 rows

        from transformers import pipeline, AutoTokenizer, AutoModelForVision2Seq

        DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

        model_id = "HuggingFaceTB/SmolVLM-Instruct"

        model = AutoModelForVision2Seq.from_pretrained(

            model_id,

            torch_dtype=torch.bfloat16,

            _attn_implementation="flash_attention_2" if DEVICE == "cuda" else "eager",

        )

        model.to('cuda') # 모델을 GPU로 이동합니다.

        tokenizer = AutoTokenizer.from_pretrained(model_id)

        processor = AutoProcessor.from_pretrained(model_id)

        adapter_path = "/home/cloud/stevenyk.kim/model_huggingface/SmolVLM-Instruct-trl-sft-K-Food"

        model.load_adapter(adapter_path)


        # 파이프라인을 생성합니다.

        self.pipe = pipeline("image-text-to-text", model=model, tokenizer=tokenizer, processor=processor, device=self.device)

        print("model = SmolVLM-Instruct-trl-sft-K-Food")

        '''

        '''

        trainable params: 11,269,248 || all params: 2,257,542,128 || trainable%: 0.4992

        QLoRA Training 150,600 rows

        '''

        # # trainable params: 11,269,248 || all params: 2,257,542,128 || trainable%: 0.4992

        # # QLoRA Training 100,000 rows

        # from transformers import pipeline, AutoTokenizer, AutoModelForVision2Seq

        # DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

        # model_id = "HuggingFaceTB/SmolVLM-Instruct"

        # model = AutoModelForVision2Seq.from_pretrained(

        #     model_id,

        #     torch_dtype=torch.bfloat16,

        #     _attn_implementation="flash_attention_2" if DEVICE == "cuda" else "eager",

        # )

        # model.to('cuda') # 모델을 GPU로 이동합니다.

        # tokenizer = AutoTokenizer.from_pretrained(model_id)

        # processor = AutoProcessor.from_pretrained(model_id)

        # adapter_path = "/home/cloud/stevenyk.kim/model_huggingface/SmolVLM-Instruct-trl-sft-K-Food-100k"

        # model.load_adapter(adapter_path)


        # self.pipe = pipeline("image-text-to-text", model=model, tokenizer=tokenizer, processor=processor, device=self.device)

        # print("model = SmolVLM-Instruct-trl-sft-K-Food-100k")


    def pics_answer(self, state: State):

        """

        데이터 검색을 수행합니다.

        Args:

            state (dict): 현재 그래프 상태

        Returns:

            state (dict): 검색된 데이터를 포함한 새로운 State

        """

        time_difference = self.time_difference(self.start_time)

        print(f"MyAssistantPics/pics_answer(state): {state}")

        self.status_container.write(f"사용자 질문을 분석 중입니다. {time_difference}")

        print(f"\033[34m소요 시간: {time_difference}초 (pics_answer 시작)\033[0m") # 진행 시간을 출력


        # request4url = "<system> 내용을 참고해서 대답하세요. <system> 이 사진의 내용을 자세히 설명하고 각 객체를 묘사하고 어떤 감성을 표현하는지 알려주세요. " \

        #        "그 후에 이미지가 수식이라면 공식을 명확하고 이해하기 쉽게 설명해 주세요. 이 공식을 어떻게 계산하는지, 각 구성 요소가 무엇을 의미하는지, 그리고 어떻게 결론을 내는지 설명하고, " \

        #        "이미지가 다이어그램이라면 각 구성 요소의 의미와 연관 관계를 설명하고, " \

        #        "이미지가 음식이라면 음식 이름, 재료, 1인분 최저 칼로리를 알려주세요.</system>\n"

        #        "<question>"

        systemprompt4url = "Please respond based on the information provided in <system> tag. <system>Please provide a detailed description of the content of this image, describing each object and the emotions it conveys. " \

        "Please don't ask an additional information."

        "</system>\n<question>"

        # "If the image is a formula, please explain the formula in a clear and understandable way. I would like to know how it is calculated, what each component of the formula represents, and why it is used for evaluating the answer., " \

        # "and if the image is a diagram, explain the meaning and relationships of each component, " \

        # "and if the image is food, please provide the food name, ingredients, and the minimum calories for one serving."

        

        # state['question'] 에서 질문과 URL을 분리해서 query4url과 image_file에 저장

        image_file = self.extract_url(state['question'])

        print(f"MyAssistantPics/pics_answer(URL): {image_file}")


        query4url = self.remove_url(state['question'])

        print(f"MyAssistantPics/pics_answer(query4url): {query4url}")


        time_difference = self.time_difference(self.start_time)

        print(f"\033[34m소요 시간: {time_difference}초 (영어 번역 시작)\033[0m") # 진행 시간을 출력


        # question_eng = ChatOllama(model="exaone3.5",temperature=0,).invoke(query4url+" 을 영어로")

        chat_model = ChatOllama(model="exaone3.5",temperature=0,)

        # chat_model.to('cuda:0')  # ChatOllama 모델을 GPU로 이동

        # question_eng = chat_model.invoke(query4url+" 을 영어로 번역하되 그냥 직역하고 추가 설명은 하지 말아줘.")

        question_eng = chat_model.invoke(query4url+" 을 영어로 번역해줘.")

        print(f"MyAssistantPics/pics_answer(question_eng): {question_eng}")


        time_difference = self.time_difference(self.start_time)

        print(f"\033[34m소요 시간: {time_difference}초 (영어 번역 종료)\033[0m") # 진행 시간을 출력


        query4url = systemprompt4url + question_eng.content

        print(f"MyAssistantPics/pics_answer(systemprompt4url + query4url): {query4url}")

        ###


        messages = [

            {

            "role": "user",

            "content": [

                {"type": "image", "url": image_file},

                {"type": "text", "text": query4url},

                ],

            },

        ]

        print(f"MyAssistantPics/messages: {messages}")


        if image_file is not None:

            time_difference = self.time_difference(self.start_time)

            print(f"\033[34m소요 시간: {time_difference}초 (PIPELINE 시작)\033[0m") # 진행 시간을 출력

            self.status_container.write(f"이미지 내용을 확인 중입니다. {time_difference}")


            out = self.pipe(text=messages, max_new_tokens=1000)

            assistant_content = self.extract_assistant_content(out)

            print(f"MyAssistantPics/assistant_content: {assistant_content}")


            time_difference = self.time_difference(self.start_time)

            print(f"\033[34m소요 시간: {time_difference}초 (PIPELINE 종료)\033[0m") # 진행 시간을 출력

        else:

            assistant_content = query4url

            image_file = "./plot.png"


        time_difference = self.time_difference(self.start_time)

        self.status_container.write(f"확인한 내용을 정리하고 있습니다. {time_difference}")

        print(f"\033[34m소요 시간: {time_difference}초 (한글 번역 시작)\033[0m") # 진행 시간을 출력


        # response = ChatOllama(model="exaone3.5",temperature=0,).invoke("<내용>"+assistant_content+"</내용> <내용>을 한국어로 적어줘.")

        response = chat_model.invoke("<내용>"+assistant_content+"</내용> <내용>을 한국어로 적어줘.")

        print(f"MyAssistantPics/response: {response}")


        time_difference = self.time_difference(self.start_time)

        print(f"\033[34m소요 시간: {time_difference}초 (한글 번역 종료)\033[0m") # 진행 시간을 출력


        self.answer = response.content

        print(f"MyAssistantPics/self.answer: {self.answer}")


        data = image_file

        state["data"].append(data)

        print(f"MyAssistantPics/messages: state[data]")

        print(state["data"])


        return {

            "question": None,

            "generation": self.answer,

            "code": [],

            "data": state["data"],

        }


    # 예시 사용

    # request4url = "다음 질문의 http://images.cocodataset.org/val2017/000000039769.jpg 에서 URL만 추출해줘."

    # url = extract_url(request4url)

    # print(url)  # 출력: http://images.cocodataset.org/val2017/000000039769.jpg

    def extract_url(self, request_string):

        # 정규 표현식을 사용하여 URL 추출

        url_pattern = r'(https?://[^\s]+)'

        match = re.search(url_pattern, request_string)

        if match:

            return match.group(0)

        return None


    # # 사용 예시

    # request4url = "다음 질문의 http://images.cocodataset.org/val2017/000000039769.jpg 에서 URL만 추출해줘."

    # result = remove_url(request4url)

    # print(result)  # 출력: 다음 질문의 에서 URL만 추출해줘.

    def remove_url(self, request_string):

        # 정규 표현식을 사용하여 URL을 찾아서 제거

        url_pattern = r'https?://[^\s]+'

        cleaned_string = re.sub(url_pattern, '', request_string)

        return cleaned_string.strip()  # 앞뒤 공백 제거


    def extract_assistant_content(self, data):

        for item in data:

            # 'generated_text' 리스트에서 'role'이 'assistant'인 항목을 찾기

            for generated in item['generated_text']:

                if generated['role'] == 'assistant':

                    return generated['content']

        return None



    # 시간 차이 계산 및 출력

    # difference = time_difference(start_time)

    # self.status_container.write(f"내용 분석 중입니다. {difference}")

    def time_difference(self, start_time):

        # 현재 시간 계산

        current_time = datetime.now()


        # 시간 차이 계산

        difference = current_time - start_time


        # 초 단위로 변환

        total_seconds = difference.total_seconds()


        # 분과 초로 변환

        # minutes, seconds = divmod(total_seconds, 60)


        # 포맷팅된 문자열 생성

        # formatted_time = f"{int(minutes)}분 {int(seconds):02d}초"

        formatted_time = f"(Lab time: {int(total_seconds):d}초)"


        return formatted_time




댓글 쓰기

0 댓글