Rework ngram generation. Greatly improve performance of indexer. Commit horrendous sql sins

This commit is contained in:
rmgr 2024-05-04 21:10:46 +09:30
parent 9f0e7e6b29
commit bdb4064acc
5 changed files with 155 additions and 57 deletions

View file

@ -20,6 +20,9 @@ engine = create_engine(DATABASE_URI)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
excluded_domains = ['amazon.', 'news.ycombinator.',
'facebook.com', 'amzn', 'fb.com']
def get_html(url: str) -> str: def get_html(url: str) -> str:
response = requests.get(url) response = requests.get(url)
@ -36,6 +39,7 @@ def parse_youtube(video_url: str) -> bool:
'allsubtitles': True, 'allsubtitles': True,
'skip_download': True, # We only want to fetch metadata 'skip_download': True, # We only want to fetch metadata
'subtitleslangs': [subtitle_language] if subtitle_language else None, 'subtitleslangs': [subtitle_language] if subtitle_language else None,
'extractor-args': {'youtube': {'player_client': 'ios,web'}},
} }
# Initialize youtube_dl object # Initialize youtube_dl object
@ -132,6 +136,8 @@ def parse_html(url: str, html: str, recursion: int = 0, traversed_links=[], robo
continue continue
if "http" not in link: if "http" not in link:
link = urljoin(url, link) link = urljoin(url, link)
link = link.split('?')[0]
link = link.split('#')[0]
if (recursion > 0 and link not in traversed_links): if (recursion > 0 and link not in traversed_links):
try: try:
traversed_links.append(link) traversed_links.append(link)
@ -156,8 +162,10 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("url", help="URL of the webpage to be crawled") parser.add_argument("url", help="URL of the webpage to be crawled")
parser.add_argument('-f', "--followlinks", action="store_true") parser.add_argument('-f', "--followlinks", action="store_true")
max_recursion = 4 parser.add_argument('-r', "--max-recursion", help="", type=int, default=1)
args = parser.parse_args() args = parser.parse_args()
max_recursion = int(args.max_recursion)
if args.url == "links": if args.url == "links":
with open('data/links.txt', 'r+') as linksfile: with open('data/links.txt', 'r+') as linksfile:
while line := linksfile.readline(): while line := linksfile.readline():

View file

@ -1,51 +1,55 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
from sqlalchemy import create_engine, or_ from sqlalchemy import create_engine, or_, text
from sqlalchemy import Table, Column, String, Integer
from config import DATABASE_URI from config import DATABASE_URI
from sqlalchemy.dialects.postgresql import UUID
from models import Base, Documents, Document_Tokens, Tokens, NGrams, Document_NGrams from models import Base, Documents, Document_Tokens, Tokens, NGrams, Document_NGrams
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
import uuid import uuid
import datetime import datetime
import time
import re import re
import random
from multiprocessing import Pool from multiprocessing import Pool
engine = create_engine(DATABASE_URI) engine = create_engine(DATABASE_URI)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
# https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html
def contains_latin(text):
latin_pattern = r'[a-zA-ZÀ-ÖØ-öø-ÿ]'
return bool(re.search(latin_pattern, text))
def build_index_chunk(document_chunk): def build_index_chunk(document_chunk):
session = Session() session = Session()
print(len(document_chunk)) print(len(document_chunk))
start_time = time.time_ns()
for document in document_chunk: for document in document_chunk:
print(document.url) print(document.url)
content = re.sub(r'[^\w\s]', '', str(document.text_content)) content = re.sub(r'[.,?!]', ' ', str(document.text_content))
content = re.sub(r'[^\w\s]', '', str(content))
content_words = content.split() content_words = content.split()
build_ngrams(2, content_words, session, document.id) build_ngrams(1, content_words, document.id)
build_ngrams(3, content_words, session, document.id) build_ngrams(2, content_words, document.id)
build_ngrams(4, content_words, session, document.id) build_ngrams(3, content_words, document.id)
build_ngrams(5, content_words, session, document.id) build_ngrams(4, content_words, document.id)
for word in content_words: build_ngrams(5, content_words, document.id)
word = word.lower()
if len(word) > 50:
continue
token = session.query(Tokens).filter_by(token=word).first()
if token is None:
token = Tokens(token=word, id=uuid.uuid4())
session.add(token)
document_token = Document_Tokens(
document_id=document.id, token_id=token.id)
session.add(document_token)
document.last_index_date = datetime.datetime.now() document.last_index_date = datetime.datetime.now()
session.add(document) session.merge(document)
session.commit() session.commit()
session.close() session.close()
def build_index(): def build_index():
session = Session()
while True: while True:
session = Session()
documents_query = session.query(Documents).filter(or_(Documents.last_index_date.is_( documents_query = session.query(Documents).filter(or_(Documents.last_index_date.is_(
None), Documents.last_index_date < Documents.last_crawl_date)).limit(100) None), Documents.last_index_date < Documents.last_crawl_date)).limit(100)
session.close() session.close()
@ -54,16 +58,62 @@ def build_index():
documents = list(documents_query) documents = list(documents_query)
if len(documents) == 0: if len(documents) == 0:
return return
build_index_chunk(documents)
continue
chunk_size = 10 chunk_size = 10
document_chunks = [documents[i:i+chunk_size] document_chunks = [documents[i:i+chunk_size]
for i in range(0, len(documents), chunk_size)] for i in range(0, len(documents), chunk_size)]
with Pool() as pool: with Pool() as pool:
pool.map(build_index_chunk, document_chunks) pool.map(build_index_chunk, document_chunks)
def build_ngrams(size: int, corpus: str, session: sessionmaker, document_id: str): def zip_ngrams(size: int, corpus, document_id):
size = int(size)
connection = engine.connect()
temptbl_name = 'temp_del_{}'.format(random.randint(100000, 9999999))
temptbl = Table(temptbl_name, Base.metadata, Column('id', UUID(as_uuid=True), index=True), Column(
'gram', String, index=True), Column('size', Integer, index=True), extend_existing=True)
try:
# Start transaction
with connection.begin():
temptbl.create(engine)
insert_grams = []
grams = zip(*[corpus[i:] for i in range(size)])
for gram in grams:
gram = ' '.join(gram).lower()
insert_grams.append(
{"id": uuid.uuid4(), "gram": gram, "size": size})
connection.execute(temptbl.insert().values(insert_grams))
connection.execute(text("UPDATE " + temptbl_name +
" SET id = ngrams.id FROM ngrams WHERE ngrams.gram = "
+ temptbl_name + ".gram;"))
connection.execute(text("INSERT INTO ngrams (id, gram, size) SELECT " +
" distinct t.id, t.gram as gram, t.size FROM " +
temptbl_name + " t LEFT JOIN ngrams on ngrams.gram = " +
"t.gram WHERE ngrams.id is null and t.size is not null " + " ON CONFLICT DO NOTHING;"))
connection.execute(text("INSERT INTO document_ngrams(id, document_id, ngram_id) SELECT DISTINCT " +
"uuid_generate_v4() , '" + str(document_id) + "'::UUID, t.id FROM " + temptbl_name + " t;"))
except SQLAlchemyError as e:
# Handle exceptions
print("An error occurred:", e)
# Rollback transaction
connection.rollback()
else:
# Commit transaction if no exceptions occurred
connection.commit()
finally:
connection.close()
# Drop table outside the transaction block
temptbl.drop(engine)
def build_ngrams(size: int, corpus: str, document_id: str):
session = Session()
zip_ngrams(size, corpus, document_id)
return
i = 0 i = 0
grams = []
while i < len(corpus): while i < len(corpus):
if i + size >= len(corpus): if i + size >= len(corpus):
i = len(corpus) i = len(corpus)
@ -73,18 +123,23 @@ def build_ngrams(size: int, corpus: str, session: sessionmaker, document_id: str
break break
gram += corpus[i+n] + ' ' gram += corpus[i+n] + ' '
gram = gram.strip().lower() gram = gram.strip().lower()
if len(gram) > 4000: if len(gram) > 1000 or gram in grams or not contains_latin(gram):
i += 1 i += 1
continue continue
ngram = session.query(NGrams).filter_by(gram=gram).first() grams.append(gram)
if (len(gram) > 1):
ngram = session.query(NGrams).filter_by(
gram=gram).filter_by(size=size).first()
if ngram is None: if ngram is None:
ngram = NGrams(id=uuid.uuid4(), size=size, gram=gram) ngram = NGrams(id=uuid.uuid4(), size=size, gram=gram)
session.add(ngram) session.add(ngram)
document_ngram = Document_NGrams( document_ngram = Document_NGrams(
document_id=document_id, ngram_id=ngram.id) document_id=document_id, ngram_id=ngram.id)
session.add(document_ngram) session.add(document_ngram)
# session.commit() session.commit()
i += 1 i += 1
# print(str((time.time_ns() - start_time)//1_000_000))
session.close()
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -32,6 +32,12 @@ class Document_Tokens(Base):
document = relationship( document = relationship(
"Documents", back_populates="document_tokens", uselist=False) "Documents", back_populates="document_tokens", uselist=False)
token = relationship("Tokens", back_populates="document_tokens") token = relationship("Tokens", back_populates="document_tokens")
__table_args__ = (
Index('idx_document_tokens_document_id_token_id', 'document_id',
'token_id', unique=True, postgresql_using='hash'),
Index('idx_document_tokens_clustered', 'document_id',
'token_id', postgresql_using='hash'),
)
class Tokens(Base): class Tokens(Base):
@ -53,9 +59,14 @@ class Document_NGrams(Base):
__tablename__ = 'document_ngrams' __tablename__ = 'document_ngrams'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
document_id = mapped_column(ForeignKey("documents.id")) document_id = mapped_column(ForeignKey("documents.id"))
# Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
ngram_id = mapped_column(ForeignKey("ngrams.id")) ngram_id = mapped_column(ForeignKey("ngrams.id"))
# Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
document = relationship( document = relationship(
"Documents", back_populates="document_ngrams", uselist=False) "Documents", back_populates="document_ngrams", uselist=False)
ngram = relationship("NGrams", back_populates="document_ngrams") ngram = relationship("NGrams", back_populates="document_ngrams")
__table_args__ = (
Index('idx_document_ngrams_document_id_ngram_id', 'document_id',
'ngram_id', unique=True, postgresql_using='hash'),
Index('idx_document_ngrams_clustered', 'document_id',
'ngram_id', postgresql_using='hash'),
)

View file

@ -1,7 +1,7 @@
#!/usr/bin/python3 #!/usr/bin/python3
from sqlalchemy import create_engine, func from sqlalchemy import create_engine, func
from config import DATABASE_URI from config import DATABASE_URI
from models import Base, Tokens, Documents, Document_Tokens, NGrams from models import Base, Tokens, Documents, Document_Tokens, NGrams, Document_NGrams
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql.expression import distinct from sqlalchemy.sql.expression import distinct
import time import time
@ -37,9 +37,9 @@ def split_query(query):
n += 1 n += 1
result['ngrams'].append( result['ngrams'].append(
quoted_query[1:len(quoted_query)-2].rstrip()) quoted_query[1:len(quoted_query)-2].rstrip())
i += n i += n + 1
continue continue
result['words'].append(query_words[i]) result['ngrams'].append(query_words[i])
i += 1 i += 1
return result return result
@ -50,6 +50,7 @@ def search(query):
session = Session() session = Session()
results = {} results = {}
query_words = split_query(unquote(query)) query_words = split_query(unquote(query))
print(query_words)
if len(query_words['ands']) > 0: if len(query_words['ands']) > 0:
for a in query_words['ands']: for a in query_words['ands']:
query = session.query(Documents.url, func.count(1)). \ query = session.query(Documents.url, func.count(1)). \
@ -68,35 +69,55 @@ def search(query):
print('entering ngrams: ' + print('entering ngrams: ' +
str((time.time_ns() - start_time) // 1_000_000) + "ms") str((time.time_ns() - start_time) // 1_000_000) + "ms")
q = session.query(NGrams) q = session.query(Documents.url, func.count(1)) \
.join(Document_NGrams, Documents.id == Document_NGrams.document_id) \
.join(NGrams, Document_NGrams.ngram_id == NGrams.id) \
.group_by(Documents.url)
for ngram in query_words['ngrams']: for ngram in query_words['ngrams']:
q = q.filter_by(size=len(ngram.split(' '))).filter_by(gram=ngram) q = q.filter_by(size=len(ngram.split(' '))).filter_by(gram=ngram)
print('query built: ' + str((time.time_ns() - start_time) // 1_000_000) + "ms") print('query built: ' + str((time.time_ns() - start_time) // 1_000_000) + "ms")
print(q)
x = q.all() x = q.all()
for y in x: print('query executed: ' +
for document_ngram in y.document_ngrams: str((time.time_ns() - start_time) // 1_000_000) + "ms")
if document_ngram.document.url in results.keys(): print(x)
results[document_ngram.document.url] += 1 for result in x:
if result[0] in results.keys():
results[result[0]] += result[1]
else: else:
results[document_ngram.document.url] = 1 results[result[0]] = result[1]
# for y in x:
# print(y)
# for document_ngram in y.document_ngrams:
# if document_ngram.document.url in results.keys():
# results[document_ngram.document.url] += 1
# else:
# results[document_ngram.document.url] = 1
print('exiting ngrams: ' + print('exiting ngrams: ' +
str((time.time_ns() - start_time) // 1_000_000) + "ms") str((time.time_ns() - start_time) // 1_000_000) + "ms")
if len(query_words['words']) > 0: if len(query_words['words']) > 0:
print('entering words: ' + print('entering words: ' +
str((time.time_ns() - start_time) // 1_000_000) + "ms") str((time.time_ns() - start_time) // 1_000_000) + "ms")
x = session.query(Tokens).filter( q = session.query(Documents.url, func.count(1)) \
Tokens.token.in_(query_words['words'])).limit(1000) .join(Document_Tokens, Documents.id == Document_Tokens.document_id) \
for y in x: .join(Tokens, Document_Tokens.token_id == Tokens.id) \
for document_token in y.document_tokens: .group_by(Documents.url).filter(Tokens.token.in_(query_words['words']))
if document_token.document.url in results.keys():
results[document_token.document.url] += 1 print('query built: ' + str((time.time_ns() - start_time) // 1_000_000) + "ms")
print(q)
x = q.all()
print('query executed: ' +
str((time.time_ns() - start_time) // 1_000_000) + "ms")
for result in x:
if result[0] in results.keys():
results[result[0]] += result[1]
else: else:
results[document_token.document.url] = 1 results[result[0]] = result[1]
print('exiting words: ' + print('exiting words: ' +
str((time.time_ns() - start_time) // 1_000_000) + "ms") str((time.time_ns() - start_time) // 1_000_000) + "ms")
print(str((time.time_ns() - start_time) // 1_000_000) + "ms") print(str((time.time_ns() - start_time) // 1_000_000) + "ms")
session.close()
return sorted(results.items(), key=lambda x: x[1], reverse=True)[:10] return sorted(results.items(), key=lambda x: x[1], reverse=True)[:10]
# @app.route("/search/<query>") # @app.route("/search/<query>")

13
todo
View file

@ -1,6 +1,9 @@
[ ] Refactor website table to generic document table (maybe using URN instead of URL?) [x] Refactor website table to generic document table (maybe using URN instead of URL?)
[ ] Define tokens table FKed to document table [x] Define tokens table FKed to document table
[ ] Refactor index.py to tokenize input into tokens table [x] Refactor index.py to tokenize input into tokens table
[ ] Define N-Grams table [x] Define N-Grams table
[ ] Add N-Gram generation to index.py [x] Add N-Gram generation to index.py
[x] Add clustered index to document_ngrams table model
[x] Add clustered index to document_tokens table model
[ ] Add ddl command to create partition tables