From bdb4064acce0d3a204d73acd4d96e8ce39bfae32 Mon Sep 17 00:00:00 2001 From: rmgr Date: Sat, 4 May 2024 21:10:46 +0930 Subject: [PATCH] Rework ngram generation. Greatly improve performance of indexer. Commit horrendous sql sins --- src/crawl.py | 10 ++++- src/index.py | 115 +++++++++++++++++++++++++++++++++++++------------- src/models.py | 15 ++++++- src/search.py | 59 +++++++++++++++++--------- todo | 13 +++--- 5 files changed, 155 insertions(+), 57 deletions(-) diff --git a/src/crawl.py b/src/crawl.py index c62f4a9..467b434 100755 --- a/src/crawl.py +++ b/src/crawl.py @@ -20,6 +20,9 @@ engine = create_engine(DATABASE_URI) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) +excluded_domains = ['amazon.', 'news.ycombinator.', + 'facebook.com', 'amzn', 'fb.com'] + def get_html(url: str) -> str: response = requests.get(url) @@ -36,6 +39,7 @@ def parse_youtube(video_url: str) -> bool: 'allsubtitles': True, 'skip_download': True, # We only want to fetch metadata 'subtitleslangs': [subtitle_language] if subtitle_language else None, + 'extractor-args': {'youtube': {'player_client': 'ios,web'}}, } # Initialize youtube_dl object @@ -132,6 +136,8 @@ def parse_html(url: str, html: str, recursion: int = 0, traversed_links=[], robo continue if "http" not in link: link = urljoin(url, link) + link = link.split('?')[0] + link = link.split('#')[0] if (recursion > 0 and link not in traversed_links): try: traversed_links.append(link) @@ -156,8 +162,10 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("url", help="URL of the webpage to be crawled") parser.add_argument('-f', "--followlinks", action="store_true") - max_recursion = 4 + parser.add_argument('-r', "--max-recursion", help="", type=int, default=1) + args = parser.parse_args() + max_recursion = int(args.max_recursion) if args.url == "links": with open('data/links.txt', 'r+') as linksfile: while line := linksfile.readline(): diff --git a/src/index.py b/src/index.py index 4629c75..542424c 100644 --- a/src/index.py +++ b/src/index.py @@ -1,51 +1,55 @@ #!/usr/bin/python3 import argparse -from sqlalchemy import create_engine, or_ +from sqlalchemy import create_engine, or_, text +from sqlalchemy import Table, Column, String, Integer from config import DATABASE_URI +from sqlalchemy.dialects.postgresql import UUID from models import Base, Documents, Document_Tokens, Tokens, NGrams, Document_NGrams from sqlalchemy.orm import sessionmaker +from sqlalchemy.exc import SQLAlchemyError import uuid import datetime +import time import re +import random from multiprocessing import Pool engine = create_engine(DATABASE_URI) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) +# https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html + + +def contains_latin(text): + latin_pattern = r'[a-zA-ZÀ-ÖØ-öø-ÿ]' + return bool(re.search(latin_pattern, text)) def build_index_chunk(document_chunk): session = Session() print(len(document_chunk)) + start_time = time.time_ns() for document in document_chunk: print(document.url) - content = re.sub(r'[^\w\s]', '', str(document.text_content)) + content = re.sub(r'[.,?!]', ' ', str(document.text_content)) + content = re.sub(r'[^\w\s]', '', str(content)) content_words = content.split() - build_ngrams(2, content_words, session, document.id) - build_ngrams(3, content_words, session, document.id) - build_ngrams(4, content_words, session, document.id) - build_ngrams(5, content_words, session, document.id) - for word in content_words: - word = word.lower() - if len(word) > 50: - continue - token = session.query(Tokens).filter_by(token=word).first() - if token is None: - token = Tokens(token=word, id=uuid.uuid4()) - session.add(token) - document_token = Document_Tokens( - document_id=document.id, token_id=token.id) - session.add(document_token) + build_ngrams(1, content_words, document.id) + build_ngrams(2, content_words, document.id) + build_ngrams(3, content_words, document.id) + build_ngrams(4, content_words, document.id) + build_ngrams(5, content_words, document.id) + document.last_index_date = datetime.datetime.now() - session.add(document) + session.merge(document) session.commit() session.close() def build_index(): - session = Session() while True: + session = Session() documents_query = session.query(Documents).filter(or_(Documents.last_index_date.is_( None), Documents.last_index_date < Documents.last_crawl_date)).limit(100) session.close() @@ -54,16 +58,62 @@ def build_index(): documents = list(documents_query) if len(documents) == 0: return + build_index_chunk(documents) + continue chunk_size = 10 document_chunks = [documents[i:i+chunk_size] for i in range(0, len(documents), chunk_size)] - with Pool() as pool: pool.map(build_index_chunk, document_chunks) -def build_ngrams(size: int, corpus: str, session: sessionmaker, document_id: str): +def zip_ngrams(size: int, corpus, document_id): + size = int(size) + connection = engine.connect() + temptbl_name = 'temp_del_{}'.format(random.randint(100000, 9999999)) + temptbl = Table(temptbl_name, Base.metadata, Column('id', UUID(as_uuid=True), index=True), Column( + 'gram', String, index=True), Column('size', Integer, index=True), extend_existing=True) + + try: + # Start transaction + with connection.begin(): + temptbl.create(engine) + insert_grams = [] + grams = zip(*[corpus[i:] for i in range(size)]) + for gram in grams: + gram = ' '.join(gram).lower() + insert_grams.append( + {"id": uuid.uuid4(), "gram": gram, "size": size}) + connection.execute(temptbl.insert().values(insert_grams)) + connection.execute(text("UPDATE " + temptbl_name + + " SET id = ngrams.id FROM ngrams WHERE ngrams.gram = " + + temptbl_name + ".gram;")) + connection.execute(text("INSERT INTO ngrams (id, gram, size) SELECT " + + " distinct t.id, t.gram as gram, t.size FROM " + + temptbl_name + " t LEFT JOIN ngrams on ngrams.gram = " + + "t.gram WHERE ngrams.id is null and t.size is not null " + " ON CONFLICT DO NOTHING;")) + connection.execute(text("INSERT INTO document_ngrams(id, document_id, ngram_id) SELECT DISTINCT " + + "uuid_generate_v4() , '" + str(document_id) + "'::UUID, t.id FROM " + temptbl_name + " t;")) + except SQLAlchemyError as e: + # Handle exceptions + print("An error occurred:", e) + # Rollback transaction + connection.rollback() + else: + # Commit transaction if no exceptions occurred + connection.commit() + finally: + connection.close() + # Drop table outside the transaction block + temptbl.drop(engine) + + +def build_ngrams(size: int, corpus: str, document_id: str): + session = Session() + zip_ngrams(size, corpus, document_id) + return i = 0 + grams = [] while i < len(corpus): if i + size >= len(corpus): i = len(corpus) @@ -73,18 +123,23 @@ def build_ngrams(size: int, corpus: str, session: sessionmaker, document_id: str break gram += corpus[i+n] + ' ' gram = gram.strip().lower() - if len(gram) > 4000: + if len(gram) > 1000 or gram in grams or not contains_latin(gram): i += 1 continue - ngram = session.query(NGrams).filter_by(gram=gram).first() - if ngram is None: - ngram = NGrams(id=uuid.uuid4(), size=size, gram=gram) - session.add(ngram) - document_ngram = Document_NGrams( - document_id=document_id, ngram_id=ngram.id) - session.add(document_ngram) - # session.commit() + grams.append(gram) + if (len(gram) > 1): + ngram = session.query(NGrams).filter_by( + gram=gram).filter_by(size=size).first() + if ngram is None: + ngram = NGrams(id=uuid.uuid4(), size=size, gram=gram) + session.add(ngram) + document_ngram = Document_NGrams( + document_id=document_id, ngram_id=ngram.id) + session.add(document_ngram) + session.commit() i += 1 +# print(str((time.time_ns() - start_time)//1_000_000)) + session.close() if __name__ == "__main__": diff --git a/src/models.py b/src/models.py index c73ea7d..50010b6 100644 --- a/src/models.py +++ b/src/models.py @@ -32,6 +32,12 @@ class Document_Tokens(Base): document = relationship( "Documents", back_populates="document_tokens", uselist=False) token = relationship("Tokens", back_populates="document_tokens") + __table_args__ = ( + Index('idx_document_tokens_document_id_token_id', 'document_id', + 'token_id', unique=True, postgresql_using='hash'), + Index('idx_document_tokens_clustered', 'document_id', + 'token_id', postgresql_using='hash'), + ) class Tokens(Base): @@ -53,9 +59,14 @@ class Document_NGrams(Base): __tablename__ = 'document_ngrams' id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) document_id = mapped_column(ForeignKey("documents.id")) - # Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) ngram_id = mapped_column(ForeignKey("ngrams.id")) - # Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) document = relationship( "Documents", back_populates="document_ngrams", uselist=False) ngram = relationship("NGrams", back_populates="document_ngrams") + + __table_args__ = ( + Index('idx_document_ngrams_document_id_ngram_id', 'document_id', + 'ngram_id', unique=True, postgresql_using='hash'), + Index('idx_document_ngrams_clustered', 'document_id', + 'ngram_id', postgresql_using='hash'), + ) diff --git a/src/search.py b/src/search.py index 0dedf77..6033e60 100755 --- a/src/search.py +++ b/src/search.py @@ -1,7 +1,7 @@ #!/usr/bin/python3 from sqlalchemy import create_engine, func from config import DATABASE_URI -from models import Base, Tokens, Documents, Document_Tokens, NGrams +from models import Base, Tokens, Documents, Document_Tokens, NGrams, Document_NGrams from sqlalchemy.orm import sessionmaker from sqlalchemy.sql.expression import distinct import time @@ -37,9 +37,9 @@ def split_query(query): n += 1 result['ngrams'].append( quoted_query[1:len(quoted_query)-2].rstrip()) - i += n + i += n + 1 continue - result['words'].append(query_words[i]) + result['ngrams'].append(query_words[i]) i += 1 return result @@ -50,6 +50,7 @@ def search(query): session = Session() results = {} query_words = split_query(unquote(query)) + print(query_words) if len(query_words['ands']) > 0: for a in query_words['ands']: query = session.query(Documents.url, func.count(1)). \ @@ -68,35 +69,55 @@ def search(query): print('entering ngrams: ' + str((time.time_ns() - start_time) // 1_000_000) + "ms") - q = session.query(NGrams) + q = session.query(Documents.url, func.count(1)) \ + .join(Document_NGrams, Documents.id == Document_NGrams.document_id) \ + .join(NGrams, Document_NGrams.ngram_id == NGrams.id) \ + .group_by(Documents.url) for ngram in query_words['ngrams']: q = q.filter_by(size=len(ngram.split(' '))).filter_by(gram=ngram) print('query built: ' + str((time.time_ns() - start_time) // 1_000_000) + "ms") - + print(q) x = q.all() - for y in x: - for document_ngram in y.document_ngrams: - if document_ngram.document.url in results.keys(): - results[document_ngram.document.url] += 1 - else: - results[document_ngram.document.url] = 1 + print('query executed: ' + + str((time.time_ns() - start_time) // 1_000_000) + "ms") + print(x) + for result in x: + if result[0] in results.keys(): + results[result[0]] += result[1] + else: + results[result[0]] = result[1] +# for y in x: +# print(y) +# for document_ngram in y.document_ngrams: +# if document_ngram.document.url in results.keys(): +# results[document_ngram.document.url] += 1 +# else: +# results[document_ngram.document.url] = 1 print('exiting ngrams: ' + str((time.time_ns() - start_time) // 1_000_000) + "ms") if len(query_words['words']) > 0: print('entering words: ' + str((time.time_ns() - start_time) // 1_000_000) + "ms") - x = session.query(Tokens).filter( - Tokens.token.in_(query_words['words'])).limit(1000) - for y in x: - for document_token in y.document_tokens: - if document_token.document.url in results.keys(): - results[document_token.document.url] += 1 - else: - results[document_token.document.url] = 1 + q = session.query(Documents.url, func.count(1)) \ + .join(Document_Tokens, Documents.id == Document_Tokens.document_id) \ + .join(Tokens, Document_Tokens.token_id == Tokens.id) \ + .group_by(Documents.url).filter(Tokens.token.in_(query_words['words'])) + + print('query built: ' + str((time.time_ns() - start_time) // 1_000_000) + "ms") + print(q) + x = q.all() + print('query executed: ' + + str((time.time_ns() - start_time) // 1_000_000) + "ms") + for result in x: + if result[0] in results.keys(): + results[result[0]] += result[1] + else: + results[result[0]] = result[1] print('exiting words: ' + str((time.time_ns() - start_time) // 1_000_000) + "ms") print(str((time.time_ns() - start_time) // 1_000_000) + "ms") + session.close() return sorted(results.items(), key=lambda x: x[1], reverse=True)[:10] # @app.route("/search/") diff --git a/todo b/todo index 2c7e8cc..328320b 100644 --- a/todo +++ b/todo @@ -1,6 +1,9 @@ -[ ] Refactor website table to generic document table (maybe using URN instead of URL?) -[ ] Define tokens table FKed to document table -[ ] Refactor index.py to tokenize input into tokens table -[ ] Define N-Grams table -[ ] Add N-Gram generation to index.py +[x] Refactor website table to generic document table (maybe using URN instead of URL?) +[x] Define tokens table FKed to document table +[x] Refactor index.py to tokenize input into tokens table +[x] Define N-Grams table +[x] Add N-Gram generation to index.py +[x] Add clustered index to document_ngrams table model +[x] Add clustered index to document_tokens table model +[ ] Add ddl command to create partition tables