Add first pass youtube subtitle indexer
This commit is contained in:
parent
7ee9d978b2
commit
343410e62f
1 changed files with 70 additions and 12 deletions
76
src/crawl.py
76
src/crawl.py
|
|
@ -13,6 +13,7 @@ from models import Base, Documents, Document_Tokens, Tokens
|
|||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy import create_engine
|
||||
import datetime
|
||||
import yt_dlp as youtube_dl
|
||||
# TODO- Handle gemini/gopher links
|
||||
|
||||
engine = create_engine(DATABASE_URI)
|
||||
|
|
@ -25,7 +26,64 @@ def get_html(url: str) -> str:
|
|||
return response.content
|
||||
|
||||
|
||||
def parse_html(url: str, html: str, recursion: int = 0, traversed_links = [], robots = {}) -> bool:
|
||||
def parse_youtube(video_url: str) -> bool:
|
||||
# Language preference for subtitles (set to None for auto-generated)
|
||||
# Change this to 'en' for English subtitles, or None for auto-generated
|
||||
subtitle_language = 'en'
|
||||
# Options for youtube_dl
|
||||
ydl_opts = {
|
||||
'writesubtitles': True,
|
||||
'allsubtitles': True,
|
||||
'skip_download': True, # We only want to fetch metadata
|
||||
'subtitleslangs': [subtitle_language] if subtitle_language else None,
|
||||
}
|
||||
|
||||
# Initialize youtube_dl object
|
||||
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
||||
# Download metadata
|
||||
info_dict = ydl.extract_info(video_url, download=False)
|
||||
|
||||
# Extract subtitles
|
||||
subtitles = info_dict.get('subtitles')
|
||||
subtitles_text = ""
|
||||
# Print available subtitles
|
||||
if subtitles:
|
||||
for subs in subtitles.values():
|
||||
for sub in subs:
|
||||
subtitle_url = sub['url']
|
||||
with youtube_dl.YoutubeDL({}) as ydl:
|
||||
subtitle_info = ydl.extract_info(
|
||||
subtitle_url, download=False)
|
||||
for subtitle in subtitle_info['subtitles'][subtitle_language]:
|
||||
if subtitle["ext"] == "srv1":
|
||||
soup = BeautifulSoup(
|
||||
get_html(subtitle["url"]), 'html.parser')
|
||||
subtitles_text = soup.get_text()
|
||||
|
||||
s = Session()
|
||||
existing_website = s.query(
|
||||
Documents).filter_by(url=video_url).first()
|
||||
if existing_website is None:
|
||||
website = Documents(
|
||||
url=video_url,
|
||||
text_content=subtitles_text,
|
||||
html_content=None, # soup.prettify(),
|
||||
first_crawl_date=datetime.datetime.now(),
|
||||
last_crawl_date=datetime.datetime.now(),
|
||||
last_index_date=None
|
||||
)
|
||||
s.add(website)
|
||||
else:
|
||||
existing_website.last_crawl_date = datetime.datetime.now()
|
||||
s.add(existing_website)
|
||||
s.commit()
|
||||
s.close()
|
||||
|
||||
|
||||
def parse_html(url: str, html: str, recursion: int = 0, traversed_links=[], robots={}) -> bool:
|
||||
if "youtube.com" in url:
|
||||
parse_youtube(url)
|
||||
return
|
||||
rp = urllib.robotparser.RobotFileParser()
|
||||
print(url)
|
||||
print(recursion)
|
||||
|
|
@ -49,13 +107,13 @@ def parse_html(url: str, html: str, recursion: int = 0, traversed_links = [], ro
|
|||
existing_website = s.query(Documents).filter_by(url=url).first()
|
||||
if existing_website is None:
|
||||
website = Documents(
|
||||
url=url,
|
||||
text_content=soup.get_text(),
|
||||
html_content=soup.prettify(),
|
||||
first_crawl_date=datetime.datetime.now(),
|
||||
last_crawl_date=datetime.datetime.now(),
|
||||
last_index_date=None
|
||||
)
|
||||
url=url,
|
||||
text_content=soup.get_text(),
|
||||
html_content=soup.prettify(),
|
||||
first_crawl_date=datetime.datetime.now(),
|
||||
last_crawl_date=datetime.datetime.now(),
|
||||
last_index_date=None
|
||||
)
|
||||
s.add(website)
|
||||
else:
|
||||
existing_website.last_crawl_date = datetime.datetime.now()
|
||||
|
|
@ -78,7 +136,7 @@ def parse_html(url: str, html: str, recursion: int = 0, traversed_links = [], ro
|
|||
try:
|
||||
traversed_links.append(link)
|
||||
link_html = get_html(link)
|
||||
r = recursion -1
|
||||
r = recursion - 1
|
||||
sleep(0.5)
|
||||
parse_html(link, link_html, r, traversed_links)
|
||||
except:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue